YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/reactor-test.cc
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <thread>
34
35
#include "yb/rpc/rpc-test-base.h"
36
37
#include "yb/util/countdown_latch.h"
38
#include "yb/util/test_macros.h"
39
#include "yb/util/thread.h"
40
41
using std::shared_ptr;
42
using namespace std::literals;
43
using namespace std::placeholders;
44
45
namespace yb {
46
namespace rpc {
47
48
4
MessengerOptions MakeMessengerOptions() {
49
4
  auto result = kDefaultClientMessengerOptions;
50
4
  result.n_reactors = 4;
51
4
  return result;
52
4
}
53
54
class ReactorTest : public RpcTestBase {
55
 public:
56
  ReactorTest()
57
    : messenger_(CreateMessenger("my_messenger", MakeMessengerOptions()).release()),
58
4
      latch_(1) {
59
4
  }
60
61
3
  void ScheduledTask(const Status& status, const Status& expected_status) {
62
3
    ASSERT_EQ(expected_status.CodeAsString(), status.CodeAsString());
63
3
    latch_.CountDown();
64
3
  }
65
66
1
  void ScheduledTaskCheckThread(const Status& status, const Thread* thread) {
67
1
    ASSERT_OK(status);
68
1
    ASSERT_EQ(thread, Thread::current_thread());
69
1
    latch_.CountDown();
70
1
  }
71
72
1
  void ScheduledTaskScheduleAgain(const Status& status) {
73
1
    auto task_id = messenger_->ScheduleOnReactor(
74
1
        std::bind(&ReactorTest::ScheduledTaskCheckThread, this, _1, Thread::current_thread()),
75
1
        0s, SOURCE_LOCATION(), nullptr /* messenger */);
76
1
    ASSERT_EQ(task_id, 0);
77
1
    latch_.CountDown();
78
1
  }
79
80
 protected:
81
  AutoShutdownMessengerHolder messenger_;
82
  CountDownLatch latch_;
83
};
84
85
1
TEST_F(ReactorTest, TestFunctionIsCalled) {
86
1
  auto task_id = messenger_->ScheduleOnReactor(
87
1
      std::bind(&ReactorTest::ScheduledTask, this, _1, Status::OK()), 0s,
88
1
      SOURCE_LOCATION(), nullptr /* messenger */);
89
1
  ASSERT_EQ(task_id, 0);
90
1
  latch_.Wait();
91
1
}
92
93
1
TEST_F(ReactorTest, TestFunctionIsCalledAtTheRightTime) {
94
1
  MonoTime before = MonoTime::Now();
95
1
  auto task_id = messenger_->ScheduleOnReactor(
96
1
      std::bind(&ReactorTest::ScheduledTask, this, _1, Status::OK()),
97
1
      100ms, SOURCE_LOCATION(), nullptr /* messenger */);
98
1
  ASSERT_EQ(task_id, 0);
99
1
  latch_.Wait();
100
1
  MonoTime after = MonoTime::Now();
101
1
  MonoDelta delta = after.GetDeltaSince(before);
102
1
  CHECK_GE(delta.ToMilliseconds(), 100);
103
1
}
104
105
1
TEST_F(ReactorTest, TestFunctionIsCalledIfReactorShutdown) {
106
1
  auto task_id = messenger_->ScheduleOnReactor(
107
1
      std::bind(&ReactorTest::ScheduledTask, this, _1, STATUS(Aborted, "doesn't matter")),
108
1
      60s, SOURCE_LOCATION(), nullptr /* messenger */);
109
1
  ASSERT_EQ(task_id, 0);
110
1
  messenger_->Shutdown();
111
1
  latch_.Wait();
112
1
}
113
114
1
TEST_F(ReactorTest, TestReschedulesOnSameReactorThread) {
115
  // Our scheduled task will schedule yet another task.
116
1
  latch_.Reset(2);
117
118
1
  auto task_id = messenger_->ScheduleOnReactor(
119
1
      std::bind(&ReactorTest::ScheduledTaskScheduleAgain, this, _1), 0s,
120
1
      SOURCE_LOCATION(), nullptr /* messenger */);
121
1
  ASSERT_EQ(task_id, 0);
122
1
  latch_.Wait();
123
1
  latch_.Wait();
124
1
}
125
126
} // namespace rpc
127
} // namespace yb