YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rpc/strand.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
#include "yb/rpc/strand.h"
14
15
#include <thread>
16
17
#include "yb/util/flag_tags.h"
18
#include "yb/util/status.h"
19
20
DEFINE_test_flag(int32, strand_done_inject_delay_ms, 0,
21
                 "Inject into Strand::Done after resetting running flag.");
22
23
using namespace std::literals;
24
25
namespace yb {
26
namespace rpc {
27
28
namespace {
29
30
12
const Status& StrandAbortedStatus() {
31
12
  static const Status result = STATUS(Aborted, "Strand shutdown");
32
12
  return result;
33
12
}
34
35
}
36
37
88.7k
Strand::Strand(ThreadPool* thread_pool) : thread_pool_(*thread_pool) {}
38
39
47.7k
Strand::~Strand() {
40
47.7k
  const auto running = running_.load();
41
47.7k
  const auto closing = closing_.load();
42
47.7k
  const auto active_tasks = active_tasks_.load();
43
108
  LOG_IF(DFATAL, running || !closing || active_tasks) << Format(
44
108
      "Strand $0 has not been fully shut down, running: $1, closing: $2, active_tasks: $3",
45
108
      static_cast<void*>(this), running, closing, active_tasks);
46
47.7k
}
47
48
968k
void Strand::Enqueue(StrandTask* task) {
49
968k
  if (closing_.load(std::memory_order_acquire)) {
50
9
    task->Done(STATUS(Aborted, "Strand closing"));
51
9
    return;
52
9
  }
53
54
968k
  active_tasks_.fetch_add(1, std::memory_order_release);
55
968k
  queue_.Push(task);
56
57
968k
  bool expected = false;
58
968k
  if (running_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
59
816k
    thread_pool_.Enqueue(this);
60
816k
  }
61
968k
}
62
63
47.7k
void Strand::Shutdown() {
64
47.7k
  bool expected = false;
65
47.7k
  if (!closing_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
66
0
    LOG(DFATAL) << "Strand already closed";
67
0
    return;
68
0
  }
69
70
53.9k
  while (active_tasks_.load(std::memory_order_acquire) ||
71
47.7k
         running_.load(std::memory_order_acquire)) {
72
    // We expected shutdown to happen rarely, so just use busy wait here.
73
6.15k
    std::this_thread::sleep_for(1ms);
74
6.15k
  }
75
47.7k
}
76
77
818k
void Strand::Run() {
78
  // Actual work is performed in Done.
79
  // Because we need `Status`, i.e. if `Strand` task aborted, because of thread pool shutdown.
80
  // We should abort all enqueued tasks.
81
818k
}
82
83
818k
void Strand::Done(const Status& status) {
84
818k
  for (;;) {
85
818k
    size_t tasks_fetched = 0;
86
1.78M
    while (StrandTask *task = queue_.Pop()) {
87
968k
      ++tasks_fetched;
88
89
968k
      const auto& actual_status =
90
968k
          !closing_.load(std::memory_order_acquire) ? status : StrandAbortedStatus();
91
968k
      if (actual_status.ok()) {
92
968k
        task->Run();
93
968k
      }
94
968k
      task->Done(actual_status);
95
968k
    }
96
818k
    running_.store(false, std::memory_order_release);
97
818k
    if (FLAGS_TEST_strand_done_inject_delay_ms > 0) {
98
10
      std::this_thread::sleep_for(FLAGS_TEST_strand_done_inject_delay_ms * 1ms);
99
10
    }
100
    // Decrease active_tasks_ and check whether tasks have been added while we were setting running
101
    // to false.
102
818k
    if (active_tasks_.fetch_sub(tasks_fetched) > tasks_fetched) {
103
      // Got more operations, try stay in the loop.
104
13
      bool expected = false;
105
13
      if (running_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
106
8
        continue;
107
8
      }
108
      // If someone else has flipped running_ to true, we can safely exit this function because
109
      // another task is already submitted to thread pool.
110
13
    }
111
818k
    break;
112
818k
  }
113
818k
}
114
115
} // namespace rpc
116
} // namespace yb