YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/thread_posix.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
21
// Use of this source code is governed by a BSD-style license that can be
22
// found in the LICENSE file. See the AUTHORS file for names of contributors.
23
24
#include "yb/rocksdb/util/thread_posix.h"
25
26
#include <pthread.h>
27
#include <unistd.h>
28
29
#include <atomic>
30
#ifdef __linux__
31
#include <sys/syscall.h>
32
#endif
33
34
#include "yb/util/format.h"
35
#include "yb/util/status_log.h"
36
#include "yb/util/thread.h"
37
38
namespace rocksdb {
39
40
6.26M
void ThreadPool::PthreadCall(const char* label, int result) {
41
6.26M
  if (result != 0) {
42
0
    fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
43
0
    abort();
44
0
  }
45
6.26M
}
46
47
52.6k
ThreadPool::ThreadPool() {
48
52.6k
  PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
49
52.6k
  PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr));
50
52.6k
}
51
52
3.15k
ThreadPool::~ThreadPool() {
53
3.15k
  DCHECK(bgthreads_.empty());
54
3.15k
}
55
56
3.15k
void ThreadPool::JoinAllThreads() {
57
3.15k
  PthreadCall("lock", pthread_mutex_lock(&mu_));
58
3.15k
  DCHECK(!exit_all_threads_);
59
3.15k
  exit_all_threads_ = true;
60
3.15k
  PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_));
61
3.15k
  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
62
3.15k
  for (const auto& thread : bgthreads_) {
63
2.71k
    thread->Join();
64
2.71k
  }
65
3.15k
  bgthreads_.clear();
66
3.15k
}
67
68
0
void ThreadPool::LowerIOPriority() {
69
#ifdef __linux__
70
  PthreadCall("lock", pthread_mutex_lock(&mu_));
71
  low_io_priority_ = true;
72
  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
73
#endif
74
0
}
75
76
89.8k
void ThreadPool::BGThread(size_t thread_id) {
77
89.8k
  bool low_io_priority = false;
78
347k
  while (true) {
79
    // Wait until there is an item that is ready to run
80
260k
    PthreadCall("lock", pthread_mutex_lock(&mu_));
81
    // Stop waiting if the thread needs to do work or needs to terminate.
82
464k
    while (!exit_all_threads_ && 
!IsLastExcessiveThread(thread_id)374k
&&
83
464k
           
(374k
queue_.empty()374k
||
IsExcessiveThread(thread_id)170k
)) {
84
203k
      PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
85
203k
    }
86
260k
    if (exit_all_threads_) {  // mechanism to let BG threads exit safely
87
2.71k
      PthreadCall("unlock", pthread_mutex_unlock(&mu_));
88
2.71k
      break;
89
2.71k
    }
90
257k
    if (IsLastExcessiveThread(thread_id)) {
91
      // Current thread is the last generated one and is excessive.
92
      // We always terminate excessive thread in the reverse order of
93
      // generation time.
94
28
      bgthreads_.pop_back();
95
28
      if (HasExcessiveThread()) {
96
        // There is still at least more excessive thread to terminate.
97
17
        WakeUpAllThreads();
98
17
      }
99
28
      PthreadCall("unlock", pthread_mutex_unlock(&mu_));
100
28
      break;
101
28
    }
102
257k
    void (*function)(void*) = queue_.front().function;
103
257k
    void* arg = queue_.front().arg;
104
257k
    queue_.pop_front();
105
257k
    queue_len_.store(static_cast<unsigned int>(queue_.size()),
106
257k
                     std::memory_order_relaxed);
107
108
257k
    bool decrease_io_priority = (low_io_priority != low_io_priority_);
109
257k
    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
110
111
#ifdef __linux__
112
    if (decrease_io_priority) {
113
#define IOPRIO_CLASS_SHIFT (13)
114
#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
115
      // Put schedule into IOPRIO_CLASS_IDLE class (lowest)
116
      // These system calls only have an effect when used in conjunction
117
      // with an I/O scheduler that supports I/O priorities. As at
118
      // kernel 2.6.17 the only such scheduler is the Completely
119
      // Fair Queuing (CFQ) I/O scheduler.
120
      // To change scheduler:
121
      //  echo cfq > /sys/block/<device_name>/queue/schedule
122
      // Tunables to consider:
123
      //  /sys/block/<device_name>/queue/slice_idle
124
      //  /sys/block/<device_name>/queue/slice_sync
125
      syscall(SYS_ioprio_set, 1,  // IOPRIO_WHO_PROCESS
126
              0,                  // current thread
127
              IOPRIO_PRIO_VALUE(3, 0));
128
      low_io_priority = true;
129
    }
130
#else
131
257k
    (void)decrease_io_priority;  // avoid 'unused variable' error
132
257k
#endif
133
257k
    (*function)(arg);
134
257k
  }
135
89.8k
}
136
137
35.5k
void ThreadPool::WakeUpAllThreads() {
138
35.5k
  PthreadCall("signalall", pthread_cond_broadcast(&bgsignal_));
139
35.5k
}
140
141
1.71M
void ThreadPool::SetBackgroundThreadsInternal(int num, bool allow_reduce) {
142
1.71M
  PthreadCall("lock", pthread_mutex_lock(&mu_));
143
1.71M
  if (exit_all_threads_) {
144
0
    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
145
0
    return;
146
0
  }
147
1.71M
  if (num > total_threads_limit_ ||
148
1.71M
      
(1.68M
num < total_threads_limit_1.68M
&&
allow_reduce120
)) {
149
35.5k
    total_threads_limit_ = std::max(1, num);
150
35.5k
    WakeUpAllThreads();
151
35.5k
    StartBGThreads();
152
35.5k
  }
153
1.71M
  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
154
1.71M
}
155
156
1.71M
void ThreadPool::IncBackgroundThreadsIfNeeded(int num) {
157
1.71M
  SetBackgroundThreadsInternal(num, false);
158
1.71M
}
159
160
1.17k
void ThreadPool::SetBackgroundThreads(int num) {
161
1.17k
  SetBackgroundThreadsInternal(num, true);
162
1.17k
}
163
164
206k
void ThreadPool::StartBGThreads() {
165
  // Start background thread if necessary
166
206k
  std::string category_name = yb::Format(
167
206k
      "rocksdb:$0", GetThreadPriority() == Env::Priority::HIGH ? 
"high"77.3k
:
"low"129k
);
168
296k
  while (static_cast<int>(bgthreads_.size()) < total_threads_limit_) {
169
89.8k
    size_t tid = bgthreads_.size();
170
89.8k
    std::string thread_name = yb::Format("$0:$1", category_name, tid);
171
89.8k
    yb::ThreadPtr thread;
172
89.8k
    CHECK_OK(yb::Thread::Create(
173
89.8k
        category_name, std::move(thread_name),
174
89.8k
        [this, tid]() { this->BGThread(tid); }, &thread));
175
176
89.8k
    bgthreads_.push_back(thread);
177
89.8k
  }
178
206k
}
179
180
void ThreadPool::Schedule(void (*function)(void* arg1), void* arg, void* tag,
181
170k
                          void (*unschedFunction)(void* arg)) {
182
170k
  PthreadCall("lock", pthread_mutex_lock(&mu_));
183
184
170k
  if (exit_all_threads_) {
185
0
    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
186
0
    return;
187
0
  }
188
189
170k
  StartBGThreads();
190
191
  // Add to priority queue
192
170k
  queue_.push_back(BGItem());
193
170k
  queue_.back().function = function;
194
170k
  queue_.back().arg = arg;
195
170k
  queue_.back().tag = tag;
196
170k
  queue_.back().unschedFunction = unschedFunction;
197
170k
  queue_len_.store(static_cast<unsigned int>(queue_.size()),
198
170k
                   std::memory_order_relaxed);
199
200
170k
  if (!HasExcessiveThread()) {
201
    // Wake up at least one waiting thread.
202
170k
    PthreadCall("signal", pthread_cond_signal(&bgsignal_));
203
170k
  } else {
204
    // Need to wake up all threads to make sure the one woken
205
    // up is not the one to terminate.
206
0
    WakeUpAllThreads();
207
0
  }
208
209
170k
  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
210
170k
}
211
212
792k
int ThreadPool::UnSchedule(void* arg) {
213
792k
  int count = 0;
214
792k
  PthreadCall("lock", pthread_mutex_lock(&mu_));
215
216
  // Remove from priority queue
217
792k
  BGQueue::iterator it = queue_.begin();
218
792k
  while (it != queue_.end()) {
219
28
    if (arg == (*it).tag) {
220
8
      void (*unschedFunction)(void*) = (*it).unschedFunction;
221
8
      void* arg1 = (*it).arg;
222
8
      if (unschedFunction != nullptr) {
223
2
        (*unschedFunction)(arg1);
224
2
      }
225
8
      it = queue_.erase(it);
226
8
      count++;
227
20
    } else {
228
20
      it++;
229
20
    }
230
28
  }
231
792k
  queue_len_.store(static_cast<unsigned int>(queue_.size()),
232
792k
                   std::memory_order_relaxed);
233
792k
  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
234
792k
  return count;
235
792k
}
236
237
}  // namespace rocksdb