/Users/deen/code/yugabyte-db/src/yb/rpc/scheduler-test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // |
2 | | // Copyright (c) YugaByte, Inc. |
3 | | // |
4 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
5 | | // in compliance with the License. You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
10 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
11 | | // or implied. See the License for the specific language governing permissions and limitations |
12 | | // under the License. |
13 | | // |
14 | | // |
15 | | |
16 | | #include <future> |
17 | | #include <thread> |
18 | | |
19 | | #include "yb/rpc/io_thread_pool.h" |
20 | | #include "yb/rpc/rpc-test-base.h" |
21 | | #include "yb/rpc/scheduler.h" |
22 | | |
23 | | #include "yb/util/countdown_latch.h" |
24 | | #include "yb/util/test_macros.h" |
25 | | #include "yb/util/tostring.h" |
26 | | |
27 | | namespace yb { |
28 | | namespace rpc { |
29 | | |
30 | | using std::shared_ptr; |
31 | | using namespace std::placeholders; |
32 | | using namespace std::literals; |
33 | | |
34 | | class SchedulerTest : public RpcTestBase { |
35 | | public: |
36 | 5 | void SetUp() override { |
37 | 5 | pool_.emplace("test", 1); |
38 | 5 | scheduler_.emplace(&pool_->io_service()); |
39 | 5 | } |
40 | | |
41 | 5 | void TearDown() override { |
42 | 5 | scheduler_->Shutdown(); |
43 | 5 | pool_->Shutdown(); |
44 | 5 | pool_->Join(); |
45 | 5 | } |
46 | | |
47 | | protected: |
48 | | boost::optional<IoThreadPool> pool_; |
49 | | boost::optional<Scheduler> scheduler_; |
50 | | }; |
51 | | |
52 | | const int kCycles = 1000; |
53 | | |
54 | 2.01k | auto SetPromiseValueToStatusFunctor(std::promise<Status>* promise) { |
55 | 2.01k | return [promise](Status status) { promise->set_value(std::move(status)); }; |
56 | 2.01k | } |
57 | | |
58 | 1 | TEST_F(SchedulerTest, TestFunctionIsCalled) { |
59 | 1.00k | for (int i = 0; i != kCycles; ++i) { |
60 | 1.00k | std::promise<Status> promise; |
61 | 1.00k | auto future = promise.get_future(); |
62 | 1.00k | scheduler_->Schedule(SetPromiseValueToStatusFunctor(&promise), 0s); |
63 | 1.00k | ASSERT_OK(future.get()); |
64 | 1.00k | } |
65 | 1 | } |
66 | | |
67 | 1 | TEST_F(SchedulerTest, TestFunctionIsCalledAtTheRightTime) { |
68 | 1 | using yb::ToString; |
69 | | |
70 | 11 | for (int i = 0; i != 10; ++i) { |
71 | 10 | auto before = std::chrono::steady_clock::now(); |
72 | 10 | auto delay = 50ms; |
73 | 10 | std::promise<Status> promise; |
74 | 10 | auto future = promise.get_future(); |
75 | 10 | scheduler_->Schedule(SetPromiseValueToStatusFunctor(&promise), delay); |
76 | 10 | ASSERT_OK(future.get()); |
77 | 10 | auto after = std::chrono::steady_clock::now(); |
78 | 10 | auto delta = after - before; |
79 | 10 | #if defined(OS_MACOSX) |
80 | 10 | auto upper = delay + 200ms; |
81 | | #else |
82 | | auto upper = delay + 50ms; |
83 | | #endif |
84 | 0 | CHECK(delta >= delay) << "Delta: " << ToString(delta) << ", lower bound: " << ToString(delay); |
85 | 0 | CHECK(delta < upper) << "Delta: " << ToString(delta) << ", upper bound: " << ToString(upper); |
86 | 10 | } |
87 | 1 | } |
88 | | |
89 | 1 | TEST_F(SchedulerTest, TestFunctionIsCalledIfReactorShutdown) { |
90 | 1 | std::promise<Status> promise; |
91 | 1 | auto future = promise.get_future(); |
92 | 1 | scheduler_->Schedule(SetPromiseValueToStatusFunctor(&promise), 60s); |
93 | 1 | scheduler_->Shutdown(); |
94 | 1 | ASSERT_TRUE(future.get().IsAborted()); |
95 | 1 | } |
96 | | |
97 | 1 | TEST_F(SchedulerTest, Abort) { |
98 | 1.00k | for (int i = 0; i != kCycles; ++i) { |
99 | 1.00k | std::promise<Status> promise; |
100 | 1.00k | auto future = promise.get_future(); |
101 | 1.00k | auto task_id = scheduler_->Schedule(SetPromiseValueToStatusFunctor(&promise), 1s); |
102 | 1.00k | scheduler_->Abort(task_id); |
103 | 1.00k | ASSERT_EQ(std::future_status::ready, future.wait_for(100ms)); |
104 | 1.00k | auto status = future.get(); |
105 | 1.00k | ASSERT_TRUE(status.IsAborted()); |
106 | 1.00k | } |
107 | 1 | } |
108 | | |
109 | 1 | TEST_F(SchedulerTest, Shutdown) { |
110 | 1 | const size_t kThreads = 8; |
111 | 1 | std::vector<std::thread> threads; |
112 | 1 | std::atomic<size_t> scheduled(0); |
113 | 1 | std::atomic<size_t> executed(0); |
114 | 1 | std::atomic<bool> failed(false); |
115 | 9 | while (threads.size() != kThreads) { |
116 | 8 | threads.emplace_back([this, &scheduled, &executed, &failed] { |
117 | 365 | while (!failed.load(std::memory_order_acquire)) { |
118 | 357 | ++scheduled; |
119 | 365 | scheduler_->Schedule([&failed, &executed](const Status& status) { |
120 | 365 | ++executed; |
121 | 365 | if (!status.ok()) { |
122 | 365 | failed.store(true, std::memory_order_release); |
123 | 365 | } |
124 | 365 | }, 0ms); |
125 | 357 | } |
126 | 8 | }); |
127 | 8 | } |
128 | 1 | scheduler_->Shutdown(); |
129 | 8 | for (auto& thread : threads) { |
130 | 8 | thread.join(); |
131 | 8 | } |
132 | 1 | ASSERT_GT(scheduled.load(std::memory_order_acquire), 0); |
133 | 2 | for (int i = 0; i != 20; ++i) { |
134 | 2 | if (scheduled.load(std::memory_order_acquire) == executed.load(std::memory_order_acquire)) { |
135 | 1 | break; |
136 | 1 | } |
137 | 1 | std::this_thread::sleep_for(200ms); |
138 | 1 | } |
139 | 1 | ASSERT_EQ(scheduled.load(std::memory_order_acquire), executed.load(std::memory_order_acquire)); |
140 | 1 | } |
141 | | |
142 | | } // namespace rpc |
143 | | } // namespace yb |