YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/thread_posix.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/util/thread_posix.h"
25
26
#include <pthread.h>
27
#include <unistd.h>
28
29
#include <atomic>
30
#ifdef __linux__
31
#include <sys/syscall.h>
32
#endif
33
34
#include "yb/util/format.h"
35
#include "yb/util/status_log.h"
36
#include "yb/util/thread.h"
37
38
namespace rocksdb {
39
40
4.92M
void ThreadPool::PthreadCall(const char* label, int result) {
41
4.92M
  if (result != 0) {
42
0
    fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
43
0
    abort();
44
0
  }
45
4.92M
}
46
47
36.1k
ThreadPool::ThreadPool() {
48
36.1k
  PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
49
36.1k
  PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr));
50
36.1k
}
51
52
2.79k
ThreadPool::~ThreadPool() {
53
2.79k
  DCHECK(bgthreads_.empty());
54
2.79k
}
55
56
2.79k
void ThreadPool::JoinAllThreads() {
57
2.79k
  PthreadCall("lock", pthread_mutex_lock(&mu_));
58
2.79k
  DCHECK(!exit_all_threads_);
59
2.79k
  exit_all_threads_ = true;
60
2.79k
  PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_));
61
2.79k
  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
62
2.08k
  for (const auto& thread : bgthreads_) {
63
2.08k
    thread->Join();
64
2.08k
  }
65
2.79k
  bgthreads_.clear();
66
2.79k
}
67
68
0
void ThreadPool::LowerIOPriority() {
69
#ifdef __linux__
70
  PthreadCall("lock", pthread_mutex_lock(&mu_));
71
  low_io_priority_ = true;
72
  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
73
#endif
74
0
}
75
76
63.1k
void ThreadPool::BGThread(size_t thread_id) {
77
63.1k
  bool low_io_priority = false;
78
248k
  while (true) {
79
    // Wait until there is an item that is ready to run
80
187k
    PthreadCall("lock", pthread_mutex_lock(&mu_));
81
    // Stop waiting if the thread needs to do work or needs to terminate.
82
299k
    while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
83
236k
           (queue_.empty() || IsExcessiveThread(thread_id))) {
84
111k
      PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
85
111k
    }
86
187k
    if (exit_all_threads_) {  // mechanism to let BG threads exit safely
87
2.08k
      PthreadCall("unlock", pthread_mutex_unlock(&mu_));
88
2.08k
      break;
89
2.08k
    }
90
185k
    if (IsLastExcessiveThread(thread_id)) {
91
      // Current thread is the last generated one and is excessive.
92
      // We always terminate excessive thread in the reverse order of
93
      // generation time.
94
30
      bgthreads_.pop_back();
95
30
      if (HasExcessiveThread()) {
96
        // There is still at least more excessive thread to terminate.
97
18
        WakeUpAllThreads();
98
18
      }
99
30
      PthreadCall("unlock", pthread_mutex_unlock(&mu_));
100
30
      break;
101
30
    }
102
185k
    void (*function)(void*) = queue_.front().function;
103
185k
    void* arg = queue_.front().arg;
104
185k
    queue_.pop_front();
105
185k
    queue_len_.store(static_cast<unsigned int>(queue_.size()),
106
185k
                     std::memory_order_relaxed);
107
108
185k
    bool decrease_io_priority = (low_io_priority != low_io_priority_);
109
185k
    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
110
111
#ifdef __linux__
112
    if (decrease_io_priority) {
113
#define IOPRIO_CLASS_SHIFT (13)
114
#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
115
      // Put schedule into IOPRIO_CLASS_IDLE class (lowest)
116
      // These system calls only have an effect when used in conjunction
117
      // with an I/O scheduler that supports I/O priorities. As at
118
      // kernel 2.6.17 the only such scheduler is the Completely
119
      // Fair Queuing (CFQ) I/O scheduler.
120
      // To change scheduler:
121
      //  echo cfq > /sys/block/<device_name>/queue/schedule
122
      // Tunables to consider:
123
      //  /sys/block/<device_name>/queue/slice_idle
124
      //  /sys/block/<device_name>/queue/slice_sync
125
      syscall(SYS_ioprio_set, 1,  // IOPRIO_WHO_PROCESS
126
              0,                  // current thread
127
              IOPRIO_PRIO_VALUE(3, 0));
128
      low_io_priority = true;
129
    }
130
#else
131
185k
    (void)decrease_io_priority;  // avoid 'unused variable' error
132
185k
#endif
133
185k
    (*function)(arg);
134
185k
  }
135
63.1k
}
136
137
24.9k
void ThreadPool::WakeUpAllThreads() {
138
24.9k
  PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_));
139
24.9k
}
140
141
1.37M
void ThreadPool::SetBackgroundThreadsInternal(int num, bool allow_reduce) {
142
1.37M
  PthreadCall("lock", pthread_mutex_lock(&mu_));
143
1.37M
  if (exit_all_threads_) {
144
0
    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
145
0
    return;
146
0
  }
147
1.37M
  if (num > total_threads_limit_ ||
148
1.35M
      (num < total_threads_limit_ && allow_reduce)) {
149
24.8k
    total_threads_limit_ = std::max(1, num);
150
24.8k
    WakeUpAllThreads();
151
24.8k
    StartBGThreads();
152
24.8k
  }
153
1.37M
  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
154
1.37M
}
155
156
1.37M
void ThreadPool::IncBackgroundThreadsIfNeeded(int num) {
157
1.37M
  SetBackgroundThreadsInternal(num, false);
158
1.37M
}
159
160
1.18k
void ThreadPool::SetBackgroundThreads(int num) {
161
1.18k
  SetBackgroundThreadsInternal(num, true);
162
1.18k
}
163
164
149k
void ThreadPool::StartBGThreads() {
165
  // Start background thread if necessary
166
149k
  std::string category_name = yb::Format(
167
105k
      "rocksdb:$0", GetThreadPriority() == Env::Priority::HIGH ? "high" : "low");
168
212k
  while (static_cast<int>(bgthreads_.size()) < total_threads_limit_) {
169
63.1k
    size_t tid = bgthreads_.size();
170
63.1k
    std::string thread_name = yb::Format("$0:$1", category_name, tid);
171
63.1k
    yb::ThreadPtr thread;
172
63.1k
    CHECK_OK(yb::Thread::Create(
173
63.1k
        category_name, std::move(thread_name),
174
63.1k
        [this, tid]() { this->BGThread(tid); }, &thread));
175
176
63.1k
    bgthreads_.push_back(thread);
177
63.1k
  }
178
149k
}
179
180
void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag,
181
124k
                          void (*unschedFunction)(void* arg)) {
182
124k
  PthreadCall("lock", pthread_mutex_lock(&mu_));
183
184
124k
  if (exit_all_threads_) {
185
0
    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
186
0
    return;
187
0
  }
188
189
124k
  StartBGThreads();
190
191
  // Add to priority queue
192
124k
  queue_.push_back(BGItem());
193
124k
  queue_.back().function = function;
194
124k
  queue_.back().arg = arg;
195
124k
  queue_.back().tag = tag;
196
124k
  queue_.back().unschedFunction = unschedFunction;
197
124k
  queue_len_.store(static_cast<unsigned int>(queue_.size()),
198
124k
                   std::memory_order_relaxed);
199
200
124k
  if (!HasExcessiveThread()) {
201
    // Wake up at least one waiting thread.
202
124k
    PthreadCall("signal", pthread_cond_signal(&bgsignal_));
203
0
  } else {
204
    // Need to wake up all threads to make sure the one woken
205
    // up is not the one to terminate.
206
0
    WakeUpAllThreads();
207
0
  }
208
209
124k
  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
210
124k
}
211
212
646k
int ThreadPool::UnSchedule(void* arg) {
213
646k
  int count = 0;
214
646k
  PthreadCall("lock", pthread_mutex_lock(&mu_));
215
216
  // Remove from priority queue
217
646k
  BGQueue::iterator it = queue_.begin();
218
646k
  while (it != queue_.end()) {
219
27
    if (arg == (*it).tag) {
220
13
      void (*unschedFunction)(void*) = (*it).unschedFunction;
221
13
      void* arg1 = (*it).arg;
222
13
      if (unschedFunction != nullptr) {
223
0
        (*unschedFunction)(arg1);
224
0
      }
225
13
      it = queue_.erase(it);
226
13
      count++;
227
14
    } else {
228
14
      it++;
229
14
    }
230
27
  }
231
646k
  queue_len_.store(static_cast<unsigned int>(queue_.size()),
232
646k
                   std::memory_order_relaxed);
233
646k
  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
234
646k
  return count;
235
646k
}
236
237
}  // namespace rocksdb