/Users/deen/code/yugabyte-db/src/yb/rpc/strand.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) YugaByte, Inc. |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
4 | | // in compliance with the License. You may obtain a copy of the License at |
5 | | // |
6 | | // http://www.apache.org/licenses/LICENSE-2.0 |
7 | | // |
8 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
9 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
10 | | // or implied. See the License for the specific language governing permissions and limitations |
11 | | // under the License. |
12 | | // |
13 | | #include "yb/rpc/strand.h" |
14 | | |
15 | | #include <thread> |
16 | | |
17 | | #include "yb/util/flag_tags.h" |
18 | | #include "yb/util/status.h" |
19 | | |
20 | | DEFINE_test_flag(int32, strand_done_inject_delay_ms, 0, |
21 | | "Inject into Strand::Done after resetting running flag."); |
22 | | |
23 | | using namespace std::literals; |
24 | | |
25 | | namespace yb { |
26 | | namespace rpc { |
27 | | |
28 | | namespace { |
29 | | |
30 | 17 | const Status& StrandAbortedStatus() { |
31 | 17 | static const Status result = STATUS(Aborted, "Strand shutdown"); |
32 | 17 | return result; |
33 | 17 | } |
34 | | |
35 | | } |
36 | | |
37 | 150k | Strand::Strand(ThreadPool* thread_pool) : thread_pool_(*thread_pool) {} |
38 | | |
39 | 75.6k | Strand::~Strand() { |
40 | 75.6k | const auto running = running_.load(); |
41 | 75.6k | const auto closing = closing_.load(); |
42 | 75.6k | const auto active_tasks = active_tasks_.load(); |
43 | 75.6k | LOG_IF(DFATAL, running || !closing || active_tasks) << Format( |
44 | 220 | "Strand $0 has not been fully shut down, running: $1, closing: $2, active_tasks: $3", |
45 | 220 | static_cast<void*>(this), running, closing, active_tasks); |
46 | 75.6k | } |
47 | | |
48 | 1.67M | void Strand::Enqueue(StrandTask* task) { |
49 | 1.67M | if (closing_.load(std::memory_order_acquire)) { |
50 | 1 | task->Done(STATUS(Aborted, "Strand closing")); |
51 | 1 | return; |
52 | 1 | } |
53 | | |
54 | 1.67M | active_tasks_.fetch_add(1, std::memory_order_release); |
55 | 1.67M | queue_.Push(task); |
56 | | |
57 | 1.67M | bool expected = false; |
58 | 1.67M | if (running_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { |
59 | 1.34M | thread_pool_.Enqueue(this); |
60 | 1.34M | } |
61 | 1.67M | } |
62 | | |
63 | 75.6k | void Strand::Shutdown() { |
64 | 75.6k | bool expected = false; |
65 | 75.6k | if (!closing_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { |
66 | 0 | LOG(DFATAL) << "Strand already closed"; |
67 | 0 | return; |
68 | 0 | } |
69 | | |
70 | 136k | while (75.6k active_tasks_.load(std::memory_order_acquire) || |
71 | 136k | running_.load(std::memory_order_acquire)75.6k ) { |
72 | | // We expected shutdown to happen rarely, so just use busy wait here. |
73 | 60.4k | std::this_thread::sleep_for(1ms); |
74 | 60.4k | } |
75 | 75.6k | } |
76 | | |
77 | 1.35M | void Strand::Run() { |
78 | | // Actual work is performed in Done. |
79 | | // Because we need `Status`, i.e. if `Strand` task aborted, because of thread pool shutdown. |
80 | | // We should abort all enqueued tasks. |
81 | 1.35M | } |
82 | | |
83 | 1.35M | void Strand::Done(const Status& status) { |
84 | 1.35M | for (;;) { |
85 | 1.35M | size_t tasks_fetched = 0; |
86 | 3.02M | while (StrandTask *task = queue_.Pop()) { |
87 | 1.67M | ++tasks_fetched; |
88 | | |
89 | 1.67M | const auto& actual_status = |
90 | 1.67M | !closing_.load(std::memory_order_acquire) ? status1.67M : StrandAbortedStatus()32 ; |
91 | 1.67M | if (actual_status.ok()) { |
92 | 1.67M | task->Run(); |
93 | 1.67M | } |
94 | 1.67M | task->Done(actual_status); |
95 | 1.67M | } |
96 | 1.35M | running_.store(false, std::memory_order_release); |
97 | 1.35M | if (FLAGS_TEST_strand_done_inject_delay_ms > 0) { |
98 | 10 | std::this_thread::sleep_for(FLAGS_TEST_strand_done_inject_delay_ms * 1ms); |
99 | 10 | } |
100 | | // Decrease active_tasks_ and check whether tasks have been added while we were setting running |
101 | | // to false. |
102 | 1.35M | if (active_tasks_.fetch_sub(tasks_fetched) > tasks_fetched) { |
103 | | // Got more operations, try stay in the loop. |
104 | 363 | bool expected = false; |
105 | 363 | if (running_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { |
106 | 362 | continue; |
107 | 362 | } |
108 | | // If someone else has flipped running_ to true, we can safely exit this function because |
109 | | // another task is already submitted to thread pool. |
110 | 363 | } |
111 | 1.35M | break; |
112 | 1.35M | } |
113 | 1.35M | } |
114 | | |
115 | | } // namespace rpc |
116 | | } // namespace yb |