YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/scheduler.cc
Line
Count
Source (jump to first uncovered line)
1
//
2
// Copyright (c) YugaByte, Inc.
3
//
4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5
// in compliance with the License.  You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software distributed under the License
10
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11
// or implied.  See the License for the specific language governing permissions and limitations
12
// under the License.
13
//
14
//
15
16
#include "yb/rpc/scheduler.h"
17
18
#include <thread>
19
20
#include <boost/asio/steady_timer.hpp>
21
#include <boost/asio/strand.hpp>
22
23
#include <boost/multi_index_container.hpp>
24
#include <boost/multi_index/hashed_index.hpp>
25
#include <boost/multi_index/mem_fun.hpp>
26
#include <boost/multi_index/ordered_index.hpp>
27
28
#include <glog/logging.h>
29
30
#include "yb/util/errno.h"
31
#include "yb/util/logging.h"
32
#include "yb/util/status.h"
33
34
using namespace std::literals;
35
using namespace std::placeholders;
36
using boost::multi_index::const_mem_fun;
37
using boost::multi_index::hashed_unique;
38
using boost::multi_index::ordered_non_unique;
39
40
namespace yb {
41
namespace rpc {
42
43
namespace {
44
45
constexpr int64_t kShutdownMark = -(1ULL << 32U);
46
47
}
48
49
class Scheduler::Impl {
50
 public:
51
  explicit Impl(IoService* io_service)
52
54.5k
      : io_service_(*io_service), strand_(*io_service), timer_(*io_service) {}
53
54
9.03k
  ~Impl() {
55
9.03k
    Shutdown();
56
9.03k
    DCHECK_EQ(timer_counter_, 0);
57
9.03k
    DCHECK(tasks_.empty());
58
9.03k
  }
59
60
450k
  void Abort(ScheduledTaskId task_id) {
61
450k
    strand_.dispatch([this, task_id] {
62
450k
      auto& index = tasks_.get<IdTag>();
63
450k
      auto it = index.find(task_id);
64
450k
      if (it != index.end()) {
65
450k
        io_service_.post([task = *it] 
{ task->Run(450k
STATUS450k
(Aborted, "Task aborted")); });
66
450k
        index.erase(it);
67
450k
      }
68
450k
    });
69
450k
  }
70
71
17.5k
  void Shutdown() {
72
17.5k
    bool old_value = false;
73
17.5k
    if (closing_.compare_exchange_strong(old_value, true)) {
74
9.04k
      strand_.dispatch([this] {
75
9.04k
        boost::system::error_code ec;
76
9.04k
        timer_.cancel(ec);
77
18.4E
        LOG_IF(ERROR, ec) << "Failed to cancel timer: " << ec.message();
78
79
9.04k
        auto status = STATUS(
80
9.04k
            ServiceUnavailable, "Scheduler is shutting down", "" /* msg2 */, Errno(ESHUTDOWN));
81
        // Abort all scheduled tasks. It is ok to run task earlier than it was scheduled because
82
        // we pass error status to it.
83
9.04k
        for (auto task : tasks_) {
84
6.60k
          io_service_.post([task, status] 
{ task->Run(status); }6.48k
);
85
6.60k
        }
86
9.04k
        tasks_.clear();
87
9.04k
      });
88
9.04k
    }
89
17.5k
  }
90
91
39.0M
  void DoSchedule(std::shared_ptr<ScheduledTaskBase> task) {
92
39.3M
    strand_.dispatch([this, task] {
93
39.3M
      if (closing_.load(std::memory_order_acquire)) {
94
1.08M
        io_service_.post([task] {
95
1.08M
          task->Run(STATUS(Aborted, "Scheduler shutdown", "" /* msg2 */, Errno(ESHUTDOWN)));
96
1.08M
        });
97
1.08M
        return;
98
1.08M
      }
99
100
38.2M
      auto pair = tasks_.insert(task);
101
38.2M
      CHECK(pair.second);
102
38.2M
      if (pair.first == tasks_.begin()) {
103
7.24M
        StartTimer();
104
7.24M
      }
105
38.2M
    });
106
39.0M
  }
107
108
39.1M
  ScheduledTaskId NextId() {
109
39.1M
    return ++id_;
110
39.1M
  }
111
112
209k
  IoService& io_service() {
113
209k
    return io_service_;
114
209k
  }
115
116
 private:
117
35.0M
  void StartTimer() {
118
35.0M
    DCHECK(strand_.running_in_this_thread());
119
35.0M
    DCHECK(!tasks_.empty());
120
121
35.0M
    boost::system::error_code ec;
122
35.0M
    timer_.expires_at((*tasks_.begin())->time(), ec);
123
35.0M
    LOG_IF
(ERROR, ec) << "Reschedule timer failed: " << ec.message()2
;
124
35.0M
    ++timer_counter_;
125
35.0M
    timer_.async_wait(strand_.wrap(std::bind(&Impl::HandleTimer, this, _1)));
126
35.0M
  }
127
128
35.0M
  void HandleTimer(const boost::system::error_code& ec) {
129
35.0M
    DCHECK(strand_.running_in_this_thread());
130
35.0M
    --timer_counter_;
131
132
35.0M
    if (ec) {
133
18.4E
      LOG_IF(ERROR, ec != boost::asio::error::operation_aborted) << "Wait failed: " << ec.message();
134
7.05M
      return;
135
7.05M
    }
136
27.9M
    if (closing_.load(std::memory_order_acquire)) {
137
2
      return;
138
2
    }
139
140
27.9M
    auto now = std::chrono::steady_clock::now();
141
65.4M
    while (!tasks_.empty() && 
(*tasks_.begin())->time() <= now65.3M
) {
142
37.5M
      io_service_.post([task = *tasks_.begin()] 
{ task->Run(Status::OK()); }37.4M
);
143
37.5M
      tasks_.erase(tasks_.begin());
144
37.5M
    }
145
146
27.9M
    if (!tasks_.empty()) {
147
27.8M
      StartTimer();
148
27.8M
    }
149
27.9M
  }
150
151
  class IdTag;
152
153
  typedef boost::multi_index_container<
154
      std::shared_ptr<ScheduledTaskBase>,
155
      boost::multi_index::indexed_by<
156
          ordered_non_unique<
157
              const_mem_fun<ScheduledTaskBase, SteadyTimePoint, &ScheduledTaskBase::time>
158
          >,
159
          hashed_unique<
160
              boost::multi_index::tag<IdTag>,
161
              const_mem_fun<ScheduledTaskBase, ScheduledTaskId, &ScheduledTaskBase::id>
162
          >
163
      >
164
  > Tasks;
165
166
  IoService& io_service_;
167
  std::atomic<ScheduledTaskId> id_ = {0};
168
  Tasks tasks_;
169
  // Strand that protects tasks_ and timer_ fields.
170
  boost::asio::io_service::strand strand_;
171
  boost::asio::steady_timer timer_;
172
  int timer_counter_ = 0;
173
  std::atomic<bool> closing_ = {false};
174
};
175
176
54.5k
Scheduler::Scheduler(IoService* io_service) : impl_(new Impl(io_service)) {}
177
9.03k
Scheduler::~Scheduler() {}
178
179
8.56k
void Scheduler::Shutdown() {
180
8.56k
  impl_->Shutdown();
181
8.56k
}
182
183
450k
void Scheduler::Abort(ScheduledTaskId task_id) {
184
450k
  impl_->Abort(task_id);
185
450k
}
186
187
39.2M
void Scheduler::DoSchedule(std::shared_ptr<ScheduledTaskBase> task) {
188
39.2M
  impl_->DoSchedule(std::move(task));
189
39.2M
}
190
191
39.1M
ScheduledTaskId Scheduler::NextId() {
192
39.1M
  return impl_->NextId();
193
39.1M
}
194
195
209k
IoService& Scheduler::io_service() {
196
209k
  return impl_->io_service();
197
209k
}
198
199
ScheduledTaskTracker::ScheduledTaskTracker(Scheduler* scheduler)
200
167k
    : scheduler_(DCHECK_NOTNULL(scheduler)) {}
201
202
393k
void ScheduledTaskTracker::Abort() {
203
393k
  auto last_scheduled_task_id = last_scheduled_task_id_.load(std::memory_order_acquire);
204
393k
  if (last_scheduled_task_id != rpc::kInvalidTaskId) {
205
191
    scheduler_->Abort(last_scheduled_task_id);
206
191
  }
207
393k
}
208
209
76.0k
void ScheduledTaskTracker::StartShutdown() {
210
76.0k
  auto num_scheduled = num_scheduled_.load(std::memory_order_acquire);
211
228k
  while (num_scheduled >= 0) {
212
152k
    num_scheduled_.compare_exchange_strong(num_scheduled, num_scheduled + kShutdownMark);
213
152k
  }
214
76.0k
}
215
216
76.0k
void ScheduledTaskTracker::CompleteShutdown() {
217
76.2k
  for (;;) {
218
76.2k
    auto left = num_scheduled_.load(std::memory_order_acquire) - kShutdownMark;
219
76.2k
    if (left <= 0) {
220
76.0k
      LOG_IF
(DFATAL, left < 0) << "Negative number of tasks left: " << left0
;
221
76.0k
      break;
222
76.0k
    }
223
191
    YB_LOG_EVERY_N_SECS
(INFO, 1) << "Waiting " << left << " tasks to complete"145
;
224
191
    Abort();
225
191
    std::this_thread::sleep_for(1ms);
226
191
  }
227
76.0k
}
228
229
} // namespace rpc
230
} // namespace yb