YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rpc/strand.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
#include "yb/rpc/strand.h"
14
15
#include <thread>
16
17
#include "yb/util/flag_tags.h"
18
#include "yb/util/status.h"
19
20
DEFINE_test_flag(int32, strand_done_inject_delay_ms, 0,
21
                 "Inject into Strand::Done after resetting running flag.");
22
23
using namespace std::literals;
24
25
namespace yb {
26
namespace rpc {
27
28
namespace {
29
30
17
const Status& StrandAbortedStatus() {
31
17
  static const Status result = STATUS(Aborted, "Strand shutdown");
32
17
  return result;
33
17
}
34
35
}
36
37
150k
Strand::Strand(ThreadPool* thread_pool) : thread_pool_(*thread_pool) {}
38
39
75.6k
Strand::~Strand() {
40
75.6k
  const auto running = running_.load();
41
75.6k
  const auto closing = closing_.load();
42
75.6k
  const auto active_tasks = active_tasks_.load();
43
75.6k
  LOG_IF(DFATAL, running || !closing || active_tasks) << Format(
44
220
      "Strand $0 has not been fully shut down, running: $1, closing: $2, active_tasks: $3",
45
220
      static_cast<void*>(this), running, closing, active_tasks);
46
75.6k
}
47
48
1.67M
void Strand::Enqueue(StrandTask* task) {
49
1.67M
  if (closing_.load(std::memory_order_acquire)) {
50
1
    task->Done(STATUS(Aborted, "Strand closing"));
51
1
    return;
52
1
  }
53
54
1.67M
  active_tasks_.fetch_add(1, std::memory_order_release);
55
1.67M
  queue_.Push(task);
56
57
1.67M
  bool expected = false;
58
1.67M
  if (running_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
59
1.34M
    thread_pool_.Enqueue(this);
60
1.34M
  }
61
1.67M
}
62
63
75.6k
void Strand::Shutdown() {
64
75.6k
  bool expected = false;
65
75.6k
  if (!closing_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
66
0
    LOG(DFATAL) << "Strand already closed";
67
0
    return;
68
0
  }
69
70
136k
  
while (75.6k
active_tasks_.load(std::memory_order_acquire) ||
71
136k
         
running_.load(std::memory_order_acquire)75.6k
) {
72
    // We expected shutdown to happen rarely, so just use busy wait here.
73
60.4k
    std::this_thread::sleep_for(1ms);
74
60.4k
  }
75
75.6k
}
76
77
1.35M
void Strand::Run() {
78
  // Actual work is performed in Done.
79
  // Because we need `Status`, i.e. if `Strand` task aborted, because of thread pool shutdown.
80
  // We should abort all enqueued tasks.
81
1.35M
}
82
83
1.35M
void Strand::Done(const Status& status) {
84
1.35M
  for (;;) {
85
1.35M
    size_t tasks_fetched = 0;
86
3.02M
    while (StrandTask *task = queue_.Pop()) {
87
1.67M
      ++tasks_fetched;
88
89
1.67M
      const auto& actual_status =
90
1.67M
          !closing_.load(std::memory_order_acquire) ? 
status1.67M
:
StrandAbortedStatus()32
;
91
1.67M
      if (actual_status.ok()) {
92
1.67M
        task->Run();
93
1.67M
      }
94
1.67M
      task->Done(actual_status);
95
1.67M
    }
96
1.35M
    running_.store(false, std::memory_order_release);
97
1.35M
    if (FLAGS_TEST_strand_done_inject_delay_ms > 0) {
98
10
      std::this_thread::sleep_for(FLAGS_TEST_strand_done_inject_delay_ms * 1ms);
99
10
    }
100
    // Decrease active_tasks_ and check whether tasks have been added while we were setting running
101
    // to false.
102
1.35M
    if (active_tasks_.fetch_sub(tasks_fetched) > tasks_fetched) {
103
      // Got more operations, try stay in the loop.
104
363
      bool expected = false;
105
363
      if (running_.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) {
106
362
        continue;
107
362
      }
108
      // If someone else has flipped running_ to true, we can safely exit this function because
109
      // another task is already submitted to thread pool.
110
363
    }
111
1.35M
    break;
112
1.35M
  }
113
1.35M
}
114
115
} // namespace rpc
116
} // namespace yb