/Users/deen/code/yugabyte-db/src/yb/rpc/reactor-test.cc
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <thread> |
34 | | |
35 | | #include "yb/rpc/rpc-test-base.h" |
36 | | |
37 | | #include "yb/util/countdown_latch.h" |
38 | | #include "yb/util/test_macros.h" |
39 | | #include "yb/util/thread.h" |
40 | | |
41 | | using std::shared_ptr; |
42 | | using namespace std::literals; |
43 | | using namespace std::placeholders; |
44 | | |
45 | | namespace yb { |
46 | | namespace rpc { |
47 | | |
48 | 4 | MessengerOptions MakeMessengerOptions() { |
49 | 4 | auto result = kDefaultClientMessengerOptions; |
50 | 4 | result.n_reactors = 4; |
51 | 4 | return result; |
52 | 4 | } |
53 | | |
54 | | class ReactorTest : public RpcTestBase { |
55 | | public: |
56 | | ReactorTest() |
57 | | : messenger_(CreateMessenger("my_messenger", MakeMessengerOptions()).release()), |
58 | 4 | latch_(1) { |
59 | 4 | } |
60 | | |
61 | 3 | void ScheduledTask(const Status& status, const Status& expected_status) { |
62 | 3 | ASSERT_EQ(expected_status.CodeAsString(), status.CodeAsString()); |
63 | 3 | latch_.CountDown(); |
64 | 3 | } |
65 | | |
66 | 1 | void ScheduledTaskCheckThread(const Status& status, const Thread* thread) { |
67 | 1 | ASSERT_OK(status); |
68 | 1 | ASSERT_EQ(thread, Thread::current_thread()); |
69 | 1 | latch_.CountDown(); |
70 | 1 | } |
71 | | |
72 | 1 | void ScheduledTaskScheduleAgain(const Status& status) { |
73 | 1 | auto task_id = messenger_->ScheduleOnReactor( |
74 | 1 | std::bind(&ReactorTest::ScheduledTaskCheckThread, this, _1, Thread::current_thread()), |
75 | 1 | 0s, SOURCE_LOCATION(), nullptr /* messenger */); |
76 | 1 | ASSERT_EQ(task_id, 0); |
77 | 1 | latch_.CountDown(); |
78 | 1 | } |
79 | | |
80 | | protected: |
81 | | AutoShutdownMessengerHolder messenger_; |
82 | | CountDownLatch latch_; |
83 | | }; |
84 | | |
85 | 1 | TEST_F(ReactorTest, TestFunctionIsCalled) { |
86 | 1 | auto task_id = messenger_->ScheduleOnReactor( |
87 | 1 | std::bind(&ReactorTest::ScheduledTask, this, _1, Status::OK()), 0s, |
88 | 1 | SOURCE_LOCATION(), nullptr /* messenger */); |
89 | 1 | ASSERT_EQ(task_id, 0); |
90 | 1 | latch_.Wait(); |
91 | 1 | } |
92 | | |
93 | 1 | TEST_F(ReactorTest, TestFunctionIsCalledAtTheRightTime) { |
94 | 1 | MonoTime before = MonoTime::Now(); |
95 | 1 | auto task_id = messenger_->ScheduleOnReactor( |
96 | 1 | std::bind(&ReactorTest::ScheduledTask, this, _1, Status::OK()), |
97 | 1 | 100ms, SOURCE_LOCATION(), nullptr /* messenger */); |
98 | 1 | ASSERT_EQ(task_id, 0); |
99 | 1 | latch_.Wait(); |
100 | 1 | MonoTime after = MonoTime::Now(); |
101 | 1 | MonoDelta delta = after.GetDeltaSince(before); |
102 | 1 | CHECK_GE(delta.ToMilliseconds(), 100); |
103 | 1 | } |
104 | | |
105 | 1 | TEST_F(ReactorTest, TestFunctionIsCalledIfReactorShutdown) { |
106 | 1 | auto task_id = messenger_->ScheduleOnReactor( |
107 | 1 | std::bind(&ReactorTest::ScheduledTask, this, _1, STATUS(Aborted, "doesn't matter")), |
108 | 1 | 60s, SOURCE_LOCATION(), nullptr /* messenger */); |
109 | 1 | ASSERT_EQ(task_id, 0); |
110 | 1 | messenger_->Shutdown(); |
111 | 1 | latch_.Wait(); |
112 | 1 | } |
113 | | |
114 | 1 | TEST_F(ReactorTest, TestReschedulesOnSameReactorThread) { |
115 | | // Our scheduled task will schedule yet another task. |
116 | 1 | latch_.Reset(2); |
117 | | |
118 | 1 | auto task_id = messenger_->ScheduleOnReactor( |
119 | 1 | std::bind(&ReactorTest::ScheduledTaskScheduleAgain, this, _1), 0s, |
120 | 1 | SOURCE_LOCATION(), nullptr /* messenger */); |
121 | 1 | ASSERT_EQ(task_id, 0); |
122 | 1 | latch_.Wait(); |
123 | 1 | latch_.Wait(); |
124 | 1 | } |
125 | | |
126 | | } // namespace rpc |
127 | | } // namespace yb |