/Users/deen/code/yugabyte-db/src/yb/rpc/service_pool.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include "yb/rpc/service_pool.h" |
34 | | |
35 | | #include <pthread.h> |
36 | | #include <sys/types.h> |
37 | | |
38 | | #include <functional> |
39 | | #include <memory> |
40 | | #include <queue> |
41 | | #include <string> |
42 | | #include <vector> |
43 | | |
44 | | #include <boost/asio/strand.hpp> |
45 | | #include <cds/container/basket_queue.h> |
46 | | #include <cds/gc/dhp.h> |
47 | | #include <glog/logging.h> |
48 | | |
49 | | #include "yb/gutil/atomicops.h" |
50 | | #include "yb/gutil/ref_counted.h" |
51 | | #include "yb/gutil/strings/substitute.h" |
52 | | |
53 | | #include "yb/rpc/inbound_call.h" |
54 | | #include "yb/rpc/scheduler.h" |
55 | | #include "yb/rpc/service_if.h" |
56 | | |
57 | | #include "yb/util/countdown_latch.h" |
58 | | #include "yb/util/flag_tags.h" |
59 | | #include "yb/util/lockfree.h" |
60 | | #include "yb/util/logging.h" |
61 | | #include "yb/util/metrics.h" |
62 | | #include "yb/util/monotime.h" |
63 | | #include "yb/util/net/sockaddr.h" |
64 | | #include "yb/util/scope_exit.h" |
65 | | #include "yb/util/status.h" |
66 | | #include "yb/util/trace.h" |
67 | | |
68 | | using namespace std::literals; |
69 | | using namespace std::placeholders; |
70 | | using std::shared_ptr; |
71 | | using strings::Substitute; |
72 | | |
73 | | DEFINE_int64(max_time_in_queue_ms, 6000, |
74 | | "Fail calls that get stuck in the queue longer than the specified amount of time " |
75 | | "(in ms)"); |
76 | | TAG_FLAG(max_time_in_queue_ms, advanced); |
77 | | TAG_FLAG(max_time_in_queue_ms, runtime); |
78 | | DEFINE_int64(backpressure_recovery_period_ms, 600000, |
79 | | "Once we hit a backpressure/service-overflow we will consider dropping stale requests " |
80 | | "for this duration (in ms)"); |
81 | | TAG_FLAG(backpressure_recovery_period_ms, advanced); |
82 | | TAG_FLAG(backpressure_recovery_period_ms, runtime); |
83 | | DEFINE_test_flag(bool, enable_backpressure_mode_for_testing, false, |
84 | | "For testing purposes. Enables the rpc's to be considered timed out in the queue even " |
85 | | "when we have not had any backpressure in the recent past."); |
86 | | |
87 | | METRIC_DEFINE_coarse_histogram(server, rpc_incoming_queue_time, |
88 | | "RPC Queue Time", |
89 | | yb::MetricUnit::kMicroseconds, |
90 | | "Number of microseconds incoming RPC requests spend in the worker queue"); |
91 | | |
92 | | METRIC_DEFINE_counter(server, rpcs_timed_out_in_queue, |
93 | | "RPC Queue Timeouts", |
94 | | yb::MetricUnit::kRequests, |
95 | | "Number of RPCs whose timeout elapsed while waiting " |
96 | | "in the service queue, and thus were not processed. " |
97 | | "Does not include calls that were expired before we tried to execute them."); |
98 | | |
99 | | METRIC_DEFINE_counter(server, rpcs_timed_out_early_in_queue, |
100 | | "RPC Queue Timeouts", |
101 | | yb::MetricUnit::kRequests, |
102 | | "Number of RPCs whose timeout elapsed while waiting " |
103 | | "in the service queue, and thus were not processed. " |
104 | | "Timeout for those calls were detected before the calls tried to execute."); |
105 | | |
106 | | METRIC_DEFINE_counter(server, rpcs_queue_overflow, |
107 | | "RPC Queue Overflows", |
108 | | yb::MetricUnit::kRequests, |
109 | | "Number of RPCs dropped because the service queue " |
110 | | "was full."); |
111 | | |
112 | | namespace yb { |
113 | | namespace rpc { |
114 | | |
115 | | namespace { |
116 | | |
117 | | const CoarseDuration kTimeoutCheckGranularity = 100ms; |
118 | | const char* const kTimedOutInQueue = "Call waited in the queue past deadline"; |
119 | | |
120 | | } // namespace |
121 | | |
122 | | class ServicePoolImpl final : public InboundCallHandler { |
123 | | public: |
124 | | ServicePoolImpl(size_t max_tasks, |
125 | | ThreadPool* thread_pool, |
126 | | Scheduler* scheduler, |
127 | | ServiceIfPtr service, |
128 | | const scoped_refptr<MetricEntity>& entity) |
129 | | : max_queued_calls_(max_tasks), |
130 | | thread_pool_(*thread_pool), |
131 | | scheduler_(*scheduler), |
132 | | service_(std::move(service)), |
133 | | incoming_queue_time_(METRIC_rpc_incoming_queue_time.Instantiate(entity)), |
134 | | rpcs_timed_out_in_queue_(METRIC_rpcs_timed_out_in_queue.Instantiate(entity)), |
135 | | rpcs_timed_out_early_in_queue_( |
136 | | METRIC_rpcs_timed_out_early_in_queue.Instantiate(entity)), |
137 | | rpcs_queue_overflow_(METRIC_rpcs_queue_overflow.Instantiate(entity)), |
138 | | check_timeout_strand_(scheduler->io_service()), |
139 | 140k | log_prefix_(Format("$0: ", service_->service_name())) { |
140 | | |
141 | | // Create per service counter for rpcs_in_queue_. |
142 | 140k | auto id = Format("rpcs_in_queue_$0", service_->service_name()); |
143 | 140k | EscapeMetricNameForPrometheus(&id); |
144 | 140k | string description = id + " metric for ServicePoolImpl"; |
145 | 140k | rpcs_in_queue_ = entity->FindOrCreateGauge( |
146 | 140k | std::unique_ptr<GaugePrototype<int64_t>>(new OwningGaugePrototype<int64_t>( |
147 | 140k | entity->prototype().name(), std::move(id), |
148 | 140k | description, MetricUnit::kRequests, description, MetricLevel::kInfo)), |
149 | 140k | static_cast<int64>(0) /* initial_value */); |
150 | | |
151 | 140k | LOG_WITH_PREFIX(INFO) << "yb::rpc::ServicePoolImpl created at " << this; |
152 | 140k | } |
153 | | |
154 | 2.02k | ~ServicePoolImpl() { |
155 | 2.02k | StartShutdown(); |
156 | 2.02k | CompleteShutdown(); |
157 | 2.02k | } |
158 | | |
159 | 4.12k | void CompleteShutdown() { |
160 | 4.12k | shutdown_complete_latch_.Wait(); |
161 | 4.21k | while (scheduled_tasks_.load(std::memory_order_acquire) != 0) { |
162 | 90 | std::this_thread::sleep_for(10ms); |
163 | 90 | } |
164 | 4.12k | } |
165 | | |
166 | 4.12k | void StartShutdown() { |
167 | 4.12k | bool closing_state = false; |
168 | 4.12k | if (closing_.compare_exchange_strong(closing_state, true)) { |
169 | 2.02k | service_->Shutdown(); |
170 | | |
171 | 2.02k | auto check_timeout_task = check_timeout_task_.load(std::memory_order_acquire); |
172 | 2.02k | if (check_timeout_task != kUninitializedScheduledTaskId) { |
173 | 326 | scheduler_.Abort(check_timeout_task); |
174 | 326 | } |
175 | | |
176 | 2.01k | check_timeout_strand_.dispatch([this] { |
177 | 2.01k | std::weak_ptr<InboundCall> inbound_call_wrapper; |
178 | 44.1k | while (pre_check_timeout_queue_.pop(inbound_call_wrapper)) {} |
179 | 2.01k | shutdown_complete_latch_.CountDown(); |
180 | 2.01k | }); |
181 | 2.02k | } |
182 | 4.12k | } |
183 | | |
184 | 29.8M | void Enqueue(const InboundCallPtr& call) { |
185 | 29.8M | TRACE_TO(call->trace(), "Inserting onto call queue"); |
186 | | |
187 | 29.8M | auto task = call->BindTask(this); |
188 | 29.8M | if (!task) { |
189 | 1.10k | Overflow(call, "service", queued_calls_.load(std::memory_order_relaxed)); |
190 | 1.10k | return; |
191 | 1.10k | } |
192 | | |
193 | 29.8M | auto call_deadline = call->GetClientDeadline(); |
194 | 29.8M | if (call_deadline != CoarseTimePoint::max()) { |
195 | 29.7M | pre_check_timeout_queue_.push(call); |
196 | 29.7M | ScheduleCheckTimeout(call_deadline); |
197 | 29.7M | } |
198 | | |
199 | 29.8M | thread_pool_.Enqueue(task); |
200 | 29.8M | } |
201 | | |
202 | 1 | const Counter* RpcsTimedOutInQueueMetricForTests() const { |
203 | 1 | return rpcs_timed_out_early_in_queue_.get(); |
204 | 1 | } |
205 | | |
206 | 0 | const Counter* RpcsQueueOverflowMetric() const { |
207 | 0 | return rpcs_queue_overflow_.get(); |
208 | 0 | } |
209 | | |
210 | 0 | std::string service_name() const { |
211 | 0 | return service_->service_name(); |
212 | 0 | } |
213 | | |
214 | 0 | ServiceIfPtr TEST_get_service() const { |
215 | 0 | return service_; |
216 | 0 | } |
217 | | |
218 | 1.10k | void Overflow(const InboundCallPtr& call, const char* type, size_t limit) { |
219 | 1.10k | const auto err_msg = |
220 | 1.10k | Format("$0 request on $1 from $2 dropped due to backpressure. " |
221 | 1.10k | "The $3 queue is full, it has $4 items.", |
222 | 1.10k | call->method_name().ToBuffer(), |
223 | 1.10k | service_->service_name(), |
224 | 1.10k | call->remote_address(), |
225 | 1.10k | type, |
226 | 1.10k | limit); |
227 | 1.10k | YB_LOG_EVERY_N_SECS(WARNING, 3) << LogPrefix() << err_msg; |
228 | 1.10k | const auto response_status = STATUS(ServiceUnavailable, err_msg); |
229 | 1.10k | rpcs_queue_overflow_->Increment(); |
230 | 1.10k | call->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY, response_status); |
231 | 1.10k | last_backpressure_at_.store( |
232 | 1.10k | CoarseMonoClock::Now().time_since_epoch(), std::memory_order_release); |
233 | 1.10k | } |
234 | | |
235 | 7 | void Failure(const InboundCallPtr& call, const Status& status) override { |
236 | 7 | if (!call->TryStartProcessing()) { |
237 | 3 | return; |
238 | 3 | } |
239 | | |
240 | 4 | if (status.IsServiceUnavailable()) { |
241 | 0 | Overflow(call, "global", thread_pool_.options().queue_limit); |
242 | 0 | return; |
243 | 0 | } |
244 | 4 | YB_LOG_EVERY_N_SECS(WARNING, 1) |
245 | 3 | << LogPrefix() |
246 | 3 | << call->method_name() << " request on " << service_->service_name() << " from " |
247 | 3 | << call->remote_address() << " dropped because of: " << status.ToString(); |
248 | 4 | const auto response_status = STATUS(ServiceUnavailable, "Service is shutting down"); |
249 | 4 | call->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, response_status); |
250 | 4 | } |
251 | | |
252 | 140k | void FillEndpoints(const RpcServicePtr& service, RpcEndpointMap* map) { |
253 | 140k | service_->FillEndpoints(service, map); |
254 | 140k | } |
255 | | |
256 | 29.8M | void Handle(InboundCallPtr incoming) override { |
257 | 29.8M | incoming->RecordHandlingStarted(incoming_queue_time_); |
258 | 29.8M | ADOPT_TRACE(incoming->trace()); |
259 | | |
260 | 29.8M | const char* error_message; |
261 | 29.8M | if (PREDICT_FALSE(incoming->ClientTimedOut())) { |
262 | 52 | error_message = kTimedOutInQueue; |
263 | 29.8M | } else if (PREDICT_FALSE(ShouldDropRequestDuringHighLoad(incoming))) { |
264 | 0 | error_message = "The server is overloaded. Call waited in the queue past max_time_in_queue."; |
265 | 29.8M | } else { |
266 | 29.8M | TRACE_TO(incoming->trace(), "Handling call $0", yb::ToString(incoming->method_name())); |
267 | | |
268 | 29.8M | if (incoming->TryStartProcessing()) { |
269 | 29.8M | service_->Handle(std::move(incoming)); |
270 | 29.8M | } |
271 | 29.8M | return; |
272 | 29.8M | } |
273 | | |
274 | 52 | TRACE_TO(incoming->trace(), error_message); |
275 | 0 | VLOG_WITH_PREFIX(4) |
276 | 0 | << "Timing out call " << incoming->ToString() << " due to: " << error_message; |
277 | | |
278 | | // Respond as a failure, even though the client will probably ignore |
279 | | // the response anyway. |
280 | 52 | TimedOut(incoming.get(), error_message, rpcs_timed_out_in_queue_.get()); |
281 | 52 | } |
282 | | |
283 | | private: |
284 | 11.9k | void TimedOut(InboundCall* call, const char* error_message, Counter* metric) { |
285 | 11.9k | if (call->RespondTimedOutIfPending(error_message)) { |
286 | 55 | metric->Increment(); |
287 | 55 | } |
288 | 11.9k | } |
289 | | |
290 | 29.8M | bool ShouldDropRequestDuringHighLoad(const InboundCallPtr& incoming) { |
291 | 29.8M | CoarseTimePoint last_backpressure_at(last_backpressure_at_.load(std::memory_order_acquire)); |
292 | | |
293 | | // For testing purposes. |
294 | 29.8M | if (GetAtomicFlag(&FLAGS_TEST_enable_backpressure_mode_for_testing)) { |
295 | 0 | last_backpressure_at = CoarseMonoClock::Now(); |
296 | 0 | } |
297 | | |
298 | | // Test for a sentinel value, to avoid reading the clock. |
299 | 29.8M | if (last_backpressure_at == CoarseTimePoint()) { |
300 | 28.6M | return false; |
301 | 28.6M | } |
302 | | |
303 | 1.20M | auto now = CoarseMonoClock::Now(); |
304 | 1.20M | if (now > last_backpressure_at + FLAGS_backpressure_recovery_period_ms * 1ms) { |
305 | 0 | last_backpressure_at_.store(CoarseTimePoint().time_since_epoch(), std::memory_order_release); |
306 | 0 | return false; |
307 | 0 | } |
308 | | |
309 | 1.20M | return incoming->GetTimeInQueue().ToMilliseconds() > FLAGS_max_time_in_queue_ms; |
310 | 1.20M | } |
311 | | |
312 | 237k | void CheckTimeout(ScheduledTaskId task_id, CoarseTimePoint time, const Status& status) { |
313 | 237k | auto se = ScopeExit([this, task_id, time] { |
314 | 237k | auto expected_duration = time.time_since_epoch(); |
315 | 237k | next_check_timeout_.compare_exchange_strong( |
316 | 237k | expected_duration, CoarseTimePoint::max().time_since_epoch(), |
317 | 237k | std::memory_order_acq_rel); |
318 | 237k | auto expected_task_id = task_id; |
319 | 237k | check_timeout_task_.compare_exchange_strong( |
320 | 237k | expected_task_id, kUninitializedScheduledTaskId, std::memory_order_acq_rel); |
321 | 237k | scheduled_tasks_.fetch_sub(1, std::memory_order_acq_rel); |
322 | 237k | }); |
323 | 237k | if (!status.ok()) { |
324 | 139k | return; |
325 | 139k | } |
326 | | |
327 | 97.7k | auto now = CoarseMonoClock::now(); |
328 | 97.7k | { |
329 | 97.7k | std::weak_ptr<InboundCall> weak_inbound_call; |
330 | 25.5M | while (pre_check_timeout_queue_.pop(weak_inbound_call)) { |
331 | 25.4M | auto inbound_call = weak_inbound_call.lock(); |
332 | 25.4M | if (!inbound_call) { |
333 | 25.3M | continue; |
334 | 25.3M | } |
335 | 70.1k | if (now > inbound_call->GetClientDeadline()) { |
336 | 5.59k | TimedOut(inbound_call.get(), kTimedOutInQueue, rpcs_timed_out_early_in_queue_.get()); |
337 | 64.5k | } else { |
338 | 64.5k | check_timeout_queue_.emplace(inbound_call); |
339 | 64.5k | } |
340 | 70.1k | } |
341 | 97.7k | } |
342 | | |
343 | 147k | while (!check_timeout_queue_.empty() && now > check_timeout_queue_.top().time) { |
344 | 50.0k | auto call = check_timeout_queue_.top().call.lock(); |
345 | 50.0k | if (call) { |
346 | 6.28k | TimedOut(call.get(), kTimedOutInQueue, rpcs_timed_out_early_in_queue_.get()); |
347 | 6.28k | } |
348 | 50.0k | check_timeout_queue_.pop(); |
349 | 50.0k | } |
350 | | |
351 | 97.7k | if (!check_timeout_queue_.empty()) { |
352 | 10.8k | ScheduleCheckTimeout(check_timeout_queue_.top().time); |
353 | 10.8k | } |
354 | 97.7k | } |
355 | | |
356 | 29.7M | void ScheduleCheckTimeout(CoarseTimePoint time) { |
357 | 29.7M | if (closing_.load(std::memory_order_acquire)) { |
358 | 0 | return; |
359 | 0 | } |
360 | 29.7M | CoarseDuration next_check_timeout = next_check_timeout_.load(std::memory_order_acquire); |
361 | 29.7M | time += kTimeoutCheckGranularity; |
362 | 29.7M | while (CoarseTimePoint(next_check_timeout) > time) { |
363 | 266k | if (next_check_timeout_.compare_exchange_weak( |
364 | 265k | next_check_timeout, time.time_since_epoch(), std::memory_order_acq_rel)) { |
365 | 266k | check_timeout_strand_.dispatch([this, time] { |
366 | 266k | auto check_timeout_task = check_timeout_task_.load(std::memory_order_acquire); |
367 | 266k | if (check_timeout_task != kUninitializedScheduledTaskId) { |
368 | 139k | scheduler_.Abort(check_timeout_task); |
369 | 139k | } |
370 | 266k | scheduled_tasks_.fetch_add(1, std::memory_order_acq_rel); |
371 | 266k | auto task_id = scheduler_.Schedule( |
372 | 237k | [this, time](ScheduledTaskId task_id, const Status& status) { |
373 | 237k | check_timeout_strand_.dispatch([this, time, task_id, status] { |
374 | 237k | CheckTimeout(task_id, time, status); |
375 | 237k | }); |
376 | 237k | }, |
377 | 266k | ToSteady(time)); |
378 | 266k | check_timeout_task_.store(task_id, std::memory_order_release); |
379 | 266k | }); |
380 | 265k | break; |
381 | 265k | } |
382 | 266k | } |
383 | 29.7M | } |
384 | | |
385 | 140k | const std::string& LogPrefix() const { |
386 | 140k | return log_prefix_; |
387 | 140k | } |
388 | | |
389 | 29.8M | bool CallQueued() override { |
390 | 29.8M | auto queued_calls = queued_calls_.fetch_add(1, std::memory_order_acq_rel); |
391 | 29.8M | if (queued_calls < 0) { |
392 | 0 | YB_LOG_EVERY_N_SECS(DFATAL, 5) << "Negative number of queued calls: " << queued_calls; |
393 | 0 | } |
394 | | |
395 | 29.8M | if (implicit_cast<size_t>(queued_calls) >= max_queued_calls_) { |
396 | 1.10k | queued_calls_.fetch_sub(1, std::memory_order_relaxed); |
397 | 1.10k | return false; |
398 | 1.10k | } |
399 | | |
400 | 29.8M | rpcs_in_queue_->Increment(); |
401 | 29.8M | return true; |
402 | 29.8M | } |
403 | | |
404 | 29.8M | void CallDequeued() override { |
405 | 29.8M | queued_calls_.fetch_sub(1, std::memory_order_relaxed); |
406 | 29.8M | rpcs_in_queue_->Decrement(); |
407 | 29.8M | } |
408 | | |
409 | | const size_t max_queued_calls_; |
410 | | ThreadPool& thread_pool_; |
411 | | Scheduler& scheduler_; |
412 | | ServiceIfPtr service_; |
413 | | scoped_refptr<Histogram> incoming_queue_time_; |
414 | | scoped_refptr<Counter> rpcs_timed_out_in_queue_; |
415 | | scoped_refptr<Counter> rpcs_timed_out_early_in_queue_; |
416 | | scoped_refptr<Counter> rpcs_queue_overflow_; |
417 | | scoped_refptr<AtomicGauge<int64_t>> rpcs_in_queue_; |
418 | | // Have to use CoarseDuration here, since CoarseTimePoint does not work with clang + libstdc++ |
419 | | std::atomic<CoarseDuration> last_backpressure_at_{CoarseTimePoint().time_since_epoch()}; |
420 | | std::atomic<int64_t> queued_calls_{0}; |
421 | | |
422 | | // It is too expensive to update timeout priority queue when each call is received. |
423 | | // So we are doing the following trick. |
424 | | // All calls are added to pre_check_timeout_queue_, w/o priority. |
425 | | // Then before timeout check we move calls from this queue to priority queue. |
426 | | typedef cds::container::BasketQueue<cds::gc::DHP, std::weak_ptr<InboundCall>> |
427 | | PreCheckTimeoutQueue; |
428 | | PreCheckTimeoutQueue pre_check_timeout_queue_; |
429 | | |
430 | | // Used to track scheduled time, to avoid unnecessary rescheduling. |
431 | | std::atomic<CoarseDuration> next_check_timeout_{CoarseTimePoint::max().time_since_epoch()}; |
432 | | |
433 | | // Last scheduled task, required to abort scheduled task during reschedule. |
434 | | std::atomic<ScheduledTaskId> check_timeout_task_{kUninitializedScheduledTaskId}; |
435 | | |
436 | | std::atomic<int> scheduled_tasks_{0}; |
437 | | |
438 | | // Timeout checking synchronization. |
439 | | IoService::strand check_timeout_strand_; |
440 | | |
441 | | struct QueuedCheckDeadline { |
442 | | CoarseTimePoint time; |
443 | | // We use weak pointer to avoid retaining call that was already processed. |
444 | | std::weak_ptr<InboundCall> call; |
445 | | |
446 | | explicit QueuedCheckDeadline(const InboundCallPtr& inp) |
447 | 67.1k | : time(inp->GetClientDeadline()), call(inp) { |
448 | 67.1k | } |
449 | | }; |
450 | | |
451 | | // Priority queue puts the geatest value on top, so we invert comparison. |
452 | 1.16M | friend bool operator<(const QueuedCheckDeadline& lhs, const QueuedCheckDeadline& rhs) { |
453 | 1.16M | return lhs.time > rhs.time; |
454 | 1.16M | } |
455 | | |
456 | | std::priority_queue<QueuedCheckDeadline> check_timeout_queue_; |
457 | | |
458 | | std::atomic<bool> closing_ = {false}; |
459 | | CountDownLatch shutdown_complete_latch_{1}; |
460 | | std::string log_prefix_; |
461 | | }; |
462 | | |
463 | | ServicePool::ServicePool(size_t max_tasks, |
464 | | ThreadPool* thread_pool, |
465 | | Scheduler* scheduler, |
466 | | ServiceIfPtr service, |
467 | | const scoped_refptr<MetricEntity>& metric_entity) |
468 | | : impl_(new ServicePoolImpl( |
469 | 140k | max_tasks, thread_pool, scheduler, std::move(service), metric_entity)) { |
470 | 140k | } |
471 | | |
472 | 2.02k | ServicePool::~ServicePool() { |
473 | 2.02k | } |
474 | | |
475 | 2.10k | void ServicePool::StartShutdown() { |
476 | 2.10k | impl_->StartShutdown(); |
477 | 2.10k | } |
478 | | |
479 | 2.10k | void ServicePool::CompleteShutdown() { |
480 | 2.10k | impl_->CompleteShutdown(); |
481 | 2.10k | } |
482 | | |
483 | 29.8M | void ServicePool::QueueInboundCall(InboundCallPtr call) { |
484 | 29.8M | impl_->Enqueue(std::move(call)); |
485 | 29.8M | } |
486 | | |
487 | 0 | void ServicePool::Handle(InboundCallPtr call) { |
488 | 0 | impl_->Handle(std::move(call)); |
489 | 0 | } |
490 | | |
491 | 140k | void ServicePool::FillEndpoints(RpcEndpointMap* map) { |
492 | 140k | impl_->FillEndpoints(RpcServicePtr(this), map); |
493 | 140k | } |
494 | | |
495 | 1 | const Counter* ServicePool::RpcsTimedOutInQueueMetricForTests() const { |
496 | 1 | return impl_->RpcsTimedOutInQueueMetricForTests(); |
497 | 1 | } |
498 | | |
499 | 0 | const Counter* ServicePool::RpcsQueueOverflowMetric() const { |
500 | 0 | return impl_->RpcsQueueOverflowMetric(); |
501 | 0 | } |
502 | | |
503 | 0 | std::string ServicePool::service_name() const { |
504 | 0 | return impl_->service_name(); |
505 | 0 | } |
506 | | |
507 | 0 | ServiceIfPtr ServicePool::TEST_get_service() const { |
508 | 0 | return impl_->TEST_get_service(); |
509 | 0 | } |
510 | | |
511 | | } // namespace rpc |
512 | | } // namespace yb |