YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/strand.h
Line
Count
Source
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_RPC_STRAND_H
15
#define YB_RPC_STRAND_H
16
17
#include "yb/rpc/thread_pool.h"
18
19
#include "yb/util/lockfree.h"
20
21
namespace yb {
22
namespace rpc {
23
24
class StrandTask : public MPSCQueueEntry<StrandTask>, public ThreadPoolTask {
25
 protected:
26
3.34M
  ~StrandTask() = default;
27
};
28
29
template <class F>
30
FunctorThreadPoolTask<F, StrandTask>* MakeFunctorStrandTask(const F& f) {
31
  return new FunctorThreadPoolTask<F, StrandTask>(f);
32
}
33
34
template <class F>
35
FunctorThreadPoolTask<F, StrandTask>* MakeFunctorStrandTask(F&& f) {
36
  return new FunctorThreadPoolTask<F, StrandTask>(std::move(f));
37
}
38
39
// Strand prevent concurrent execution of enqueued tasks.
40
// If task is submitted into strand and it already has enqueued tasks, new task will be executed
41
// after all previously enqueued tasks.
42
//
43
// Submitted task should inherit StrandTask or wrapped by class that provides such inheritance.
44
class Strand : public ThreadPoolTask {
45
 public:
46
  explicit Strand(ThreadPool* thread_pool);
47
  virtual ~Strand();
48
49
  void Enqueue(StrandTask* task);
50
51
  template <class F>
52
  void EnqueueFunctor(const F& f) {
53
    Enqueue(MakeFunctorStrandTask(f));
54
  }
55
56
  template <class F>
57
  void EnqueueFunctor(F&& f) {
58
    Enqueue(MakeFunctorStrandTask(std::move(f)));
59
  }
60
61
  void Shutdown();
62
63
 private:
64
  void Run() override;
65
66
  void Done(const Status& status) override;
67
68
  void ProcessTasks(const Status& status, bool allow_closing);
69
70
  ThreadPool& thread_pool_;
71
  std::atomic<size_t> active_tasks_{0};
72
  MPSCQueue<StrandTask> queue_;
73
  std::atomic<bool> running_{false};
74
  std::atomic<bool> closing_{false};
75
};
76
77
} // namespace rpc
78
} // namespace yb
79
80
#endif // YB_RPC_STRAND_H