YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/scheduler-test.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include <future>
17
#include <thread>
18
19
#include "yb/rpc/io_thread_pool.h"
20
#include "yb/rpc/rpc-test-base.h"
21
#include "yb/rpc/scheduler.h"
22
23
#include "yb/util/countdown_latch.h"
24
#include "yb/util/test_macros.h"
25
#include "yb/util/tostring.h"
26
27
namespace yb {
28
namespace rpc {
29
30
using std::shared_ptr;
31
using namespace std::placeholders;
32
using namespace std::literals;
33
34
class SchedulerTest : public RpcTestBase {
35
 public:
36
5
  void SetUp() override {
37
5
    pool_.emplace("test", 1);
38
5
    scheduler_.emplace(&pool_->io_service());
39
5
  }
40
41
5
  void TearDown() override {
42
5
    scheduler_->Shutdown();
43
5
    pool_->Shutdown();
44
5
    pool_->Join();
45
5
  }
46
47
 protected:
48
  boost::optional<IoThreadPool> pool_;
49
  boost::optional<Scheduler> scheduler_;
50
};
51
52
const int kCycles = 1000;
53
54
2.01k
auto SetPromiseValueToStatusFunctor(std::promise<Status>* promise) {
55
2.01k
  return [promise](Status status) { promise->set_value(std::move(status)); };
56
2.01k
}
57
58
1
TEST_F(SchedulerTest, TestFunctionIsCalled) {
59
1.00k
  for (int i = 0; i != kCycles; ++i) {
60
1.00k
    std::promise<Status> promise;
61
1.00k
    auto future = promise.get_future();
62
1.00k
    scheduler_->Schedule(SetPromiseValueToStatusFunctor(&promise), 0s);
63
1.00k
    ASSERT_OK(future.get());
64
1.00k
  }
65
1
}
66
67
1
TEST_F(SchedulerTest, TestFunctionIsCalledAtTheRightTime) {
68
1
  using yb::ToString;
69
70
11
  for (int i = 0; i != 10; ++i) {
71
10
    auto before = std::chrono::steady_clock::now();
72
10
    auto delay = 50ms;
73
10
    std::promise<Status> promise;
74
10
    auto future = promise.get_future();
75
10
    scheduler_->Schedule(SetPromiseValueToStatusFunctor(&promise), delay);
76
10
    ASSERT_OK(future.get());
77
10
    auto after = std::chrono::steady_clock::now();
78
10
    auto delta = after - before;
79
10
#if defined(OS_MACOSX)
80
10
    auto upper = delay + 200ms;
81
#else
82
    auto upper = delay + 50ms;
83
#endif
84
0
    CHECK(delta >= delay) << "Delta: " << ToString(delta) << ", lower bound: " << ToString(delay);
85
0
    CHECK(delta < upper) << "Delta: " << ToString(delta) << ", upper bound: " << ToString(upper);
86
10
  }
87
1
}
88
89
1
TEST_F(SchedulerTest, TestFunctionIsCalledIfReactorShutdown) {
90
1
  std::promise<Status> promise;
91
1
  auto future = promise.get_future();
92
1
  scheduler_->Schedule(SetPromiseValueToStatusFunctor(&promise), 60s);
93
1
  scheduler_->Shutdown();
94
1
  ASSERT_TRUE(future.get().IsAborted());
95
1
}
96
97
1
TEST_F(SchedulerTest, Abort) {
98
1.00k
  for (int i = 0; i != kCycles; ++i) {
99
1.00k
    std::promise<Status> promise;
100
1.00k
    auto future = promise.get_future();
101
1.00k
    auto task_id = scheduler_->Schedule(SetPromiseValueToStatusFunctor(&promise), 1s);
102
1.00k
    scheduler_->Abort(task_id);
103
1.00k
    ASSERT_EQ(std::future_status::ready, future.wait_for(100ms));
104
1.00k
    auto status = future.get();
105
1.00k
    ASSERT_TRUE(status.IsAborted());
106
1.00k
  }
107
1
}
108
109
1
TEST_F(SchedulerTest, Shutdown) {
110
1
  const size_t kThreads = 8;
111
1
  std::vector<std::thread> threads;
112
1
  std::atomic<size_t> scheduled(0);
113
1
  std::atomic<size_t> executed(0);
114
1
  std::atomic<bool> failed(false);
115
9
  while (threads.size() != kThreads) {
116
8
    threads.emplace_back([this, &scheduled, &executed, &failed] {
117
365
      while (!failed.load(std::memory_order_acquire)) {
118
357
        ++scheduled;
119
365
        scheduler_->Schedule([&failed, &executed](const Status& status) {
120
365
          ++executed;
121
365
          if (!status.ok()) {
122
365
            failed.store(true, std::memory_order_release);
123
365
          }
124
365
        }, 0ms);
125
357
      }
126
8
    });
127
8
  }
128
1
  scheduler_->Shutdown();
129
8
  for (auto& thread : threads) {
130
8
    thread.join();
131
8
  }
132
1
  ASSERT_GT(scheduled.load(std::memory_order_acquire), 0);
133
2
  for (int i = 0; i != 20; ++i) {
134
2
    if (scheduled.load(std::memory_order_acquire) == executed.load(std::memory_order_acquire)) {
135
1
      break;
136
1
    }
137
1
    std::this_thread::sleep_for(200ms);
138
1
  }
139
1
  ASSERT_EQ(scheduled.load(std::memory_order_acquire), executed.load(std::memory_order_acquire));
140
1
}
141
142
} // namespace rpc
143
} // namespace yb