YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/scheduler.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/rpc/scheduler.h"
17
18
#include <thread>
19
20
#include <boost/asio/steady_timer.hpp>
21
#include <boost/asio/strand.hpp>
22
23
#include <boost/multi_index_container.hpp>
24
#include <boost/multi_index/hashed_index.hpp>
25
#include <boost/multi_index/mem_fun.hpp>
26
#include <boost/multi_index/ordered_index.hpp>
27
28
#include <glog/logging.h>
29
30
#include "yb/util/errno.h"
31
#include "yb/util/logging.h"
32
#include "yb/util/status.h"
33
34
using namespace std::literals;
35
using namespace std::placeholders;
36
using boost::multi_index::const_mem_fun;
37
using boost::multi_index::hashed_unique;
38
using boost::multi_index::ordered_non_unique;
39
40
namespace yb {
41
namespace rpc {
42
43
namespace {
44
45
constexpr int64_t kShutdownMark = -(1ULL << 32U);
46
47
}
48
49
class Scheduler::Impl {
50
 public:
51
  explicit Impl(IoService* io_service)
52
36.2k
      : io_service_(*io_service), strand_(*io_service), timer_(*io_service) {}
53
54
4.04k
  ~Impl() {
55
4.04k
    Shutdown();
56
4.04k
    DCHECK_EQ(timer_counter_, 0);
57
4.04k
    DCHECK(tasks_.empty());
58
4.04k
  }
59
60
160k
  void Abort(ScheduledTaskId task_id) {
61
160k
    strand_.dispatch([this, task_id] {
62
160k
      auto& index = tasks_.get<IdTag>();
63
160k
      auto it = index.find(task_id);
64
160k
      if (it != index.end()) {
65
160k
        io_service_.post([task = *it] { task->Run(STATUS(Aborted, "Task aborted")); });
66
160k
        index.erase(it);
67
160k
      }
68
160k
    });
69
160k
  }
70
71
7.80k
  void Shutdown() {
72
7.80k
    bool old_value = false;
73
7.80k
    if (closing_.compare_exchange_strong(old_value, true)) {
74
4.05k
      strand_.dispatch([this] {
75
4.05k
        boost::system::error_code ec;
76
4.05k
        timer_.cancel(ec);
77
18.4E
        LOG_IF(ERROR, ec) << "Failed to cancel timer: " << ec.message();
78
79
4.05k
        auto status = STATUS(
80
4.05k
            ServiceUnavailable, "Scheduler is shutting down", "" /* msg2 */, Errno(ESHUTDOWN));
81
        // Abort all scheduled tasks. It is ok to run task earlier than it was scheduled because
82
        // we pass error status to it.
83
2.11k
        for (auto task : tasks_) {
84
2.01k
          io_service_.post([task, status] { task->Run(status); });
85
2.11k
        }
86
4.05k
        tasks_.clear();
87
4.05k
      });
88
4.05k
    }
89
7.80k
  }
90
91
7.38M
  void DoSchedule(std::shared_ptr<ScheduledTaskBase> task) {
92
7.44M
    strand_.dispatch([this, task] {
93
7.44M
      if (closing_.load(std::memory_order_acquire)) {
94
1.10M
        io_service_.post([task] {
95
1.10M
          task->Run(STATUS(Aborted, "Scheduler shutdown", "" /* msg2 */, Errno(ESHUTDOWN)));
96
1.10M
        });
97
1.10M
        return;
98
1.10M
      }
99
100
6.33M
      auto pair = tasks_.insert(task);
101
6.33M
      CHECK(pair.second);
102
6.33M
      if (pair.first == tasks_.begin()) {
103
428k
        StartTimer();
104
428k
      }
105
6.33M
    });
106
7.38M
  }
107
108
7.33M
  ScheduledTaskId NextId() {
109
7.33M
    return ++id_;
110
7.33M
  }
111
112
140k
  IoService& io_service() {
113
140k
    return io_service_;
114
140k
  }
115
116
 private:
117
4.44M
  void StartTimer() {
118
4.44M
    DCHECK(strand_.running_in_this_thread());
119
4.44M
    DCHECK(!tasks_.empty());
120
121
4.44M
    boost::system::error_code ec;
122
4.44M
    timer_.expires_at((*tasks_.begin())->time(), ec);
123
18.4E
    LOG_IF(ERROR, ec) << "Reschedule timer failed: " << ec.message();
124
4.44M
    ++timer_counter_;
125
4.44M
    timer_.async_wait(strand_.wrap(std::bind(&Impl::HandleTimer, this, _1)));
126
4.44M
  }
127
128
4.41M
  void HandleTimer(const boost::system::error_code& ec) {
129
4.41M
    DCHECK(strand_.running_in_this_thread());
130
4.41M
    --timer_counter_;
131
132
4.41M
    if (ec) {
133
5
      LOG_IF(ERROR, ec != boost::asio::error::operation_aborted) << "Wait failed: " << ec.message();
134
262k
      return;
135
262k
    }
136
4.15M
    if (closing_.load(std::memory_order_acquire)) {
137
1
      return;
138
1
    }
139
140
4.15M
    auto now = std::chrono::steady_clock::now();
141
10.1M
    while (!tasks_.empty() && (*tasks_.begin())->time() <= now) {
142
5.99M
      io_service_.post([task = *tasks_.begin()] { task->Run(Status::OK()); });
143
6.00M
      tasks_.erase(tasks_.begin());
144
6.00M
    }
145
146
4.15M
    if (!tasks_.empty()) {
147
4.01M
      StartTimer();
148
4.01M
    }
149
4.15M
  }
150
151
  class IdTag;
152
153
  typedef boost::multi_index_container<
154
      std::shared_ptr<ScheduledTaskBase>,
155
      boost::multi_index::indexed_by<
156
          ordered_non_unique<
157
              const_mem_fun<ScheduledTaskBase, SteadyTimePoint, &ScheduledTaskBase::time>
158
          >,
159
          hashed_unique<
160
              boost::multi_index::tag<IdTag>,
161
              const_mem_fun<ScheduledTaskBase, ScheduledTaskId, &ScheduledTaskBase::id>
162
          >
163
      >
164
  > Tasks;
165
166
  IoService& io_service_;
167
  std::atomic<ScheduledTaskId> id_ = {0};
168
  Tasks tasks_;
169
  // Strand that protects tasks_ and timer_ fields.
170
  boost::asio::io_service::strand strand_;
171
  boost::asio::steady_timer timer_;
172
  int timer_counter_ = 0;
173
  std::atomic<bool> closing_ = {false};
174
};
175
176
36.2k
Scheduler::Scheduler(IoService* io_service) : impl_(new Impl(io_service)) {}
177
4.05k
Scheduler::~Scheduler() {}
178
179
3.76k
void Scheduler::Shutdown() {
180
3.76k
  impl_->Shutdown();
181
3.76k
}
182
183
160k
void Scheduler::Abort(ScheduledTaskId task_id) {
184
160k
  impl_->Abort(task_id);
185
160k
}
186
187
7.38M
void Scheduler::DoSchedule(std::shared_ptr<ScheduledTaskBase> task) {
188
7.38M
  impl_->DoSchedule(std::move(task));
189
7.38M
}
190
191
7.38M
ScheduledTaskId Scheduler::NextId() {
192
7.38M
  return impl_->NextId();
193
7.38M
}
194
195
140k
IoService& Scheduler::io_service() {
196
140k
  return impl_->io_service();
197
140k
}
198
199
ScheduledTaskTracker::ScheduledTaskTracker(Scheduler* scheduler)
200
100k
    : scheduler_(DCHECK_NOTNULL(scheduler)) {}
201
202
30.9k
void ScheduledTaskTracker::Abort() {
203
30.9k
  auto last_scheduled_task_id = last_scheduled_task_id_.load(std::memory_order_acquire);
204
30.9k
  if (last_scheduled_task_id != rpc::kInvalidTaskId) {
205
170
    scheduler_->Abort(last_scheduled_task_id);
206
170
  }
207
30.9k
}
208
209
48.1k
void ScheduledTaskTracker::StartShutdown() {
210
48.1k
  auto num_scheduled = num_scheduled_.load(std::memory_order_acquire);
211
144k
  while (num_scheduled >= 0) {
212
96.3k
    num_scheduled_.compare_exchange_strong(num_scheduled, num_scheduled + kShutdownMark);
213
96.3k
  }
214
48.1k
}
215
216
48.1k
void ScheduledTaskTracker::CompleteShutdown() {
217
48.3k
  for (;;) {
218
48.3k
    auto left = num_scheduled_.load(std::memory_order_acquire) - kShutdownMark;
219
48.3k
    if (left <= 0) {
220
0
      LOG_IF(DFATAL, left < 0) << "Negative number of tasks left: " << left;
221
48.1k
      break;
222
48.1k
    }
223
166
    YB_LOG_EVERY_N_SECS(INFO, 1) << "Waiting " << left << " tasks to complete";
224
166
    Abort();
225
166
    std::this_thread::sleep_for(1ms);
226
166
  }
227
48.1k
}
228
229
} // namespace rpc
230
} // namespace yb