/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/env_test.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under the BSD-style license found in the |
3 | | // LICENSE file in the root directory of this source tree. An additional grant |
4 | | // of patent rights can be found in the PATENTS file in the same directory. |
5 | | // |
6 | | // The following only applies to changes made to this file as part of YugaByte development. |
7 | | // |
8 | | // Portions Copyright (c) YugaByte, Inc. |
9 | | // |
10 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
11 | | // in compliance with the License. You may obtain a copy of the License at |
12 | | // |
13 | | // http://www.apache.org/licenses/LICENSE-2.0 |
14 | | // |
15 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
16 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
17 | | // or implied. See the License for the specific language governing permissions and limitations |
18 | | // under the License. |
19 | | // |
20 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
21 | | // Use of this source code is governed by a BSD-style license that can be |
22 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
23 | | |
24 | | #ifndef OS_WIN |
25 | | #include <sys/ioctl.h> |
26 | | #endif |
27 | | #include <sys/types.h> |
28 | | |
29 | | #include <atomic> |
30 | | #include <list> |
31 | | |
32 | | #ifdef __linux__ |
33 | | #include <fcntl.h> |
34 | | #include <linux/fs.h> |
35 | | #include <stdlib.h> |
36 | | #include <sys/stat.h> |
37 | | #include <unistd.h> |
38 | | #endif |
39 | | |
40 | | #ifdef ROCKSDB_FALLOCATE_PRESENT |
41 | | #include <errno.h> |
42 | | #endif |
43 | | |
44 | | #include "yb/rocksdb/env.h" |
45 | | #include "yb/rocksdb/port/port.h" |
46 | | #include "yb/rocksdb/util/coding.h" |
47 | | #include "yb/rocksdb/util/log_buffer.h" |
48 | | #include "yb/rocksdb/util/mutexlock.h" |
49 | | #include "yb/rocksdb/util/testharness.h" |
50 | | #include "yb/rocksdb/util/testutil.h" |
51 | | |
52 | | #include "yb/util/string_util.h" |
53 | | #include "yb/util/test_util.h" |
54 | | |
55 | | namespace rocksdb { |
56 | | |
57 | | namespace { |
58 | | |
59 | | // Use a global variable to avoid the failure in |
60 | | // https://detective.dev.yugabyte.com/job/yugabyte-centos-phabricator-clang-tsan/48/artifact/build/tsan-clang-dynamic/yb-test-logs/rocksdb-build__env_test/EnvPosixTest_UnSchedule.log |
61 | | // When this was a local variable, it looked like the stack memory was reused for something else, |
62 | | // and TSAN considered that a data race. |
63 | | std::atomic<bool> called; |
64 | | |
65 | | } |
66 | | |
67 | | static const int kDelayMicros = 100000; |
68 | | |
69 | | class EnvPosixTest : public RocksDBTest { |
70 | | private: |
71 | | port::Mutex mu_; |
72 | | std::string events_; |
73 | | |
74 | | public: |
75 | | Env* env_; |
76 | 11 | EnvPosixTest() : env_(Env::Default()) { } |
77 | | }; |
78 | | |
79 | 2 | static void SetBool(void* ptr) { |
80 | 2 | reinterpret_cast<std::atomic<bool>*>(ptr) |
81 | 2 | ->store(true, std::memory_order_relaxed); |
82 | 2 | } |
83 | | |
84 | 1 | TEST_F(EnvPosixTest, RunImmediately) { |
85 | 1 | called.store(false); |
86 | 1 | env_->Schedule(&SetBool, &called); |
87 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
88 | 1 | ASSERT_TRUE(called.load(std::memory_order_relaxed)); |
89 | 1 | } |
90 | | |
91 | 1 | TEST_F(EnvPosixTest, UnSchedule) { |
92 | 1 | called.store(false); |
93 | | |
94 | 1 | env_->SetBackgroundThreads(1, Env::LOW); |
95 | | |
96 | | /* Block the low priority queue */ |
97 | 1 | test::SleepingBackgroundTask sleeping_task, sleeping_task1; |
98 | 1 | env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, |
99 | 1 | Env::Priority::LOW); |
100 | | |
101 | | /* Schedule another task */ |
102 | 1 | env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task1, |
103 | 1 | Env::Priority::LOW, &sleeping_task1); |
104 | | |
105 | | /* Remove it with a different tag */ |
106 | 1 | ASSERT_EQ(0, env_->UnSchedule(&called, Env::Priority::LOW)); |
107 | | |
108 | | /* Remove it from the queue with the right tag */ |
109 | 1 | ASSERT_EQ(1, env_->UnSchedule(&sleeping_task1, Env::Priority::LOW)); |
110 | | |
111 | | // Unblock background thread |
112 | 1 | sleeping_task.WakeUp(); |
113 | | |
114 | | /* Schedule another task */ |
115 | 1 | env_->Schedule(&SetBool, &called); |
116 | 2 | for (int i = 0; i < kDelayMicros; i++) { |
117 | 2 | if (called.load(std::memory_order_relaxed)) { |
118 | 1 | break; |
119 | 1 | } |
120 | 1 | Env::Default()->SleepForMicroseconds(1); |
121 | 1 | } |
122 | 1 | ASSERT_TRUE(called.load(std::memory_order_relaxed)); |
123 | | |
124 | 1 | ASSERT_TRUE(!sleeping_task.IsSleeping() && !sleeping_task1.IsSleeping()); |
125 | 1 | } |
126 | | |
127 | 1 | TEST_F(EnvPosixTest, RunMany) { |
128 | 1 | std::atomic<int> last_id(0); |
129 | | |
130 | 1 | struct CB { |
131 | 1 | std::atomic<int>* last_id_ptr; // Pointer to shared slot |
132 | 1 | int id; // Order# for the execution of this callback |
133 | | |
134 | 4 | CB(std::atomic<int>* p, int i) : last_id_ptr(p), id(i) {} |
135 | | |
136 | 4 | static void Run(void* v) { |
137 | 4 | CB* cb = reinterpret_cast<CB*>(v); |
138 | 4 | int cur = cb->last_id_ptr->load(std::memory_order_relaxed); |
139 | 4 | ASSERT_EQ(cb->id - 1, cur); |
140 | 4 | cb->last_id_ptr->store(cb->id, std::memory_order_release); |
141 | 4 | } |
142 | 1 | }; |
143 | | |
144 | | // Schedule in different order than start time |
145 | 1 | CB cb1(&last_id, 1); |
146 | 1 | CB cb2(&last_id, 2); |
147 | 1 | CB cb3(&last_id, 3); |
148 | 1 | CB cb4(&last_id, 4); |
149 | 1 | env_->Schedule(&CB::Run, &cb1); |
150 | 1 | env_->Schedule(&CB::Run, &cb2); |
151 | 1 | env_->Schedule(&CB::Run, &cb3); |
152 | 1 | env_->Schedule(&CB::Run, &cb4); |
153 | | |
154 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
155 | 1 | int cur = last_id.load(std::memory_order_acquire); |
156 | 1 | ASSERT_EQ(4, cur); |
157 | 1 | } |
158 | | |
159 | | struct State { |
160 | | port::Mutex mu; |
161 | | int val; |
162 | | int num_running; |
163 | | }; |
164 | | |
165 | 3 | static void ThreadBody(void* arg) { |
166 | 3 | State* s = reinterpret_cast<State*>(arg); |
167 | 3 | s->mu.Lock(); |
168 | 3 | s->val += 1; |
169 | 3 | s->num_running -= 1; |
170 | 3 | s->mu.Unlock(); |
171 | 3 | } |
172 | | |
173 | 1 | TEST_F(EnvPosixTest, StartThread) { |
174 | 1 | State state; |
175 | 1 | state.val = 0; |
176 | 1 | state.num_running = 3; |
177 | 4 | for (int i = 0; i < 3; i++) { |
178 | 3 | env_->StartThread(&ThreadBody, &state); |
179 | 3 | } |
180 | 2 | while (true) { |
181 | 2 | state.mu.Lock(); |
182 | 2 | int num = state.num_running; |
183 | 2 | state.mu.Unlock(); |
184 | 2 | if (num == 0) { |
185 | 1 | break; |
186 | 1 | } |
187 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
188 | 1 | } |
189 | 1 | ASSERT_EQ(state.val, 3); |
190 | 1 | } |
191 | | |
192 | 1 | TEST_F(EnvPosixTest, TwoPools) { |
193 | 1 | class CB { |
194 | 1 | public: |
195 | 1 | CB(const std::string& pool_name, int pool_size) |
196 | 1 | : mu_(), |
197 | 1 | num_running_(0), |
198 | 1 | num_finished_(0), |
199 | 1 | pool_size_(pool_size), |
200 | 2 | pool_name_(pool_name) { } |
201 | | |
202 | 32 | static void Run(void* v) { |
203 | 32 | CB* cb = reinterpret_cast<CB*>(v); |
204 | 32 | cb->Run(); |
205 | 32 | } |
206 | | |
207 | 32 | void Run() { |
208 | 32 | { |
209 | 32 | MutexLock l(&mu_); |
210 | 32 | num_running_++; |
211 | | // make sure we don't have more than pool_size_ jobs running. |
212 | 32 | ASSERT_LE(num_running_, pool_size_.load()); |
213 | 32 | } |
214 | | |
215 | | // sleep for 1 sec |
216 | 32 | Env::Default()->SleepForMicroseconds(1000000); |
217 | | |
218 | 32 | { |
219 | 32 | MutexLock l(&mu_); |
220 | 32 | num_running_--; |
221 | 32 | num_finished_++; |
222 | 32 | } |
223 | 32 | } |
224 | | |
225 | 80 | int NumFinished() { |
226 | 80 | MutexLock l(&mu_); |
227 | 80 | return num_finished_; |
228 | 80 | } |
229 | | |
230 | 2 | void Reset(int pool_size) { |
231 | 2 | pool_size_.store(pool_size); |
232 | 2 | num_finished_ = 0; |
233 | 2 | } |
234 | | |
235 | 1 | private: |
236 | 1 | port::Mutex mu_; |
237 | 1 | int num_running_; |
238 | 1 | int num_finished_; |
239 | 1 | std::atomic<int> pool_size_; |
240 | 1 | std::string pool_name_; |
241 | 1 | }; |
242 | | |
243 | 1 | const int kLowPoolSize = 2; |
244 | 1 | const int kHighPoolSize = 4; |
245 | 1 | const int kJobs = 8; |
246 | | |
247 | 1 | CB low_pool_job("low", kLowPoolSize); |
248 | 1 | CB high_pool_job("high", kHighPoolSize); |
249 | | |
250 | 1 | env_->SetBackgroundThreads(kLowPoolSize); |
251 | 1 | env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH); |
252 | | |
253 | 1 | ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); |
254 | 1 | ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); |
255 | | |
256 | | // schedule same number of jobs in each pool |
257 | 9 | for (int i = 0; i < kJobs; i++) { |
258 | 8 | env_->Schedule(&CB::Run, &low_pool_job); |
259 | 8 | env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH); |
260 | 8 | } |
261 | | // Wait a short while for the jobs to be dispatched. |
262 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
263 | 1 | ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize), |
264 | 1 | env_->GetThreadPoolQueueLen()); |
265 | 1 | ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize), |
266 | 1 | env_->GetThreadPoolQueueLen(Env::Priority::LOW)); |
267 | 1 | ASSERT_EQ((unsigned int)(kJobs - kHighPoolSize), |
268 | 1 | env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); |
269 | | |
270 | | // wait for all jobs to finish |
271 | 39 | while (low_pool_job.NumFinished() < kJobs || |
272 | 38 | high_pool_job.NumFinished() < kJobs) { |
273 | 38 | env_->SleepForMicroseconds(kDelayMicros); |
274 | 38 | } |
275 | | |
276 | 1 | ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); |
277 | 1 | ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); |
278 | | |
279 | | // call IncBackgroundThreadsIfNeeded to two pools. One increasing and |
280 | | // the other decreasing |
281 | 1 | env_->IncBackgroundThreadsIfNeeded(kLowPoolSize - 1, Env::Priority::LOW); |
282 | 1 | env_->IncBackgroundThreadsIfNeeded(kHighPoolSize + 1, Env::Priority::HIGH); |
283 | 1 | high_pool_job.Reset(kHighPoolSize + 1); |
284 | 1 | low_pool_job.Reset(kLowPoolSize); |
285 | | |
286 | | // schedule same number of jobs in each pool |
287 | 9 | for (int i = 0; i < kJobs; i++) { |
288 | 8 | env_->Schedule(&CB::Run, &low_pool_job); |
289 | 8 | env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH); |
290 | 8 | } |
291 | | // Wait a short while for the jobs to be dispatched. |
292 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
293 | 1 | ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize), |
294 | 1 | env_->GetThreadPoolQueueLen()); |
295 | 1 | ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize), |
296 | 1 | env_->GetThreadPoolQueueLen(Env::Priority::LOW)); |
297 | 1 | ASSERT_EQ((unsigned int)(kJobs - (kHighPoolSize + 1)), |
298 | 1 | env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); |
299 | | |
300 | | // wait for all jobs to finish |
301 | 39 | while (low_pool_job.NumFinished() < kJobs || |
302 | 38 | high_pool_job.NumFinished() < kJobs) { |
303 | 38 | env_->SleepForMicroseconds(kDelayMicros); |
304 | 38 | } |
305 | | |
306 | 1 | env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH); |
307 | 1 | } |
308 | | |
309 | 1 | TEST_F(EnvPosixTest, DecreaseNumBgThreads) { |
310 | 1 | std::vector<test::SleepingBackgroundTask> tasks(10); |
311 | | |
312 | | // Set number of thread to 1 first. |
313 | 1 | env_->SetBackgroundThreads(1, Env::Priority::HIGH); |
314 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
315 | | |
316 | | // Schedule 3 tasks. 0 running; Task 1, 2 waiting. |
317 | 4 | for (size_t i = 0; i < 3; i++) { |
318 | 3 | env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i], |
319 | 3 | Env::Priority::HIGH); |
320 | 3 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
321 | 3 | } |
322 | 1 | ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); |
323 | 1 | ASSERT_TRUE(tasks[0].IsSleeping()); |
324 | 1 | ASSERT_TRUE(!tasks[1].IsSleeping()); |
325 | 1 | ASSERT_TRUE(!tasks[2].IsSleeping()); |
326 | | |
327 | | // Increase to 2 threads. Task 0, 1 running; 2 waiting |
328 | 1 | env_->SetBackgroundThreads(2, Env::Priority::HIGH); |
329 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
330 | 1 | ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); |
331 | 1 | ASSERT_TRUE(tasks[0].IsSleeping()); |
332 | 1 | ASSERT_TRUE(tasks[1].IsSleeping()); |
333 | 1 | ASSERT_TRUE(!tasks[2].IsSleeping()); |
334 | | |
335 | | // Shrink back to 1 thread. Still task 0, 1 running, 2 waiting |
336 | 1 | env_->SetBackgroundThreads(1, Env::Priority::HIGH); |
337 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
338 | 1 | ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); |
339 | 1 | ASSERT_TRUE(tasks[0].IsSleeping()); |
340 | 1 | ASSERT_TRUE(tasks[1].IsSleeping()); |
341 | 1 | ASSERT_TRUE(!tasks[2].IsSleeping()); |
342 | | |
343 | | // The last task finishes. Task 0 running, 2 waiting. |
344 | 1 | tasks[1].WakeUp(); |
345 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
346 | 1 | ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); |
347 | 1 | ASSERT_TRUE(tasks[0].IsSleeping()); |
348 | 1 | ASSERT_TRUE(!tasks[1].IsSleeping()); |
349 | 1 | ASSERT_TRUE(!tasks[2].IsSleeping()); |
350 | | |
351 | | // Increase to 5 threads. Task 0 and 2 running. |
352 | 1 | env_->SetBackgroundThreads(5, Env::Priority::HIGH); |
353 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
354 | 1 | ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); |
355 | 1 | ASSERT_TRUE(tasks[0].IsSleeping()); |
356 | 1 | ASSERT_TRUE(tasks[2].IsSleeping()); |
357 | | |
358 | | // Change number of threads a couple of times while there is no sufficient |
359 | | // tasks. |
360 | 1 | env_->SetBackgroundThreads(7, Env::Priority::HIGH); |
361 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
362 | 1 | tasks[2].WakeUp(); |
363 | 1 | ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); |
364 | 1 | env_->SetBackgroundThreads(3, Env::Priority::HIGH); |
365 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
366 | 1 | ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); |
367 | 1 | env_->SetBackgroundThreads(4, Env::Priority::HIGH); |
368 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
369 | 1 | ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); |
370 | 1 | env_->SetBackgroundThreads(5, Env::Priority::HIGH); |
371 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
372 | 1 | ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); |
373 | 1 | env_->SetBackgroundThreads(4, Env::Priority::HIGH); |
374 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
375 | 1 | ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); |
376 | | |
377 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros * 50); |
378 | | |
379 | | // Enqueue 5 more tasks. Thread pool size now is 4. |
380 | | // Task 0, 3, 4, 5 running;6, 7 waiting. |
381 | 6 | for (size_t i = 3; i < 8; i++) { |
382 | 5 | env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i], |
383 | 5 | Env::Priority::HIGH); |
384 | 5 | } |
385 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
386 | 1 | ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); |
387 | 1 | ASSERT_TRUE(tasks[3].IsSleeping()); |
388 | 1 | ASSERT_TRUE(tasks[4].IsSleeping()); |
389 | 1 | ASSERT_TRUE(tasks[5].IsSleeping()); |
390 | 1 | ASSERT_TRUE(!tasks[6].IsSleeping()); |
391 | 1 | ASSERT_TRUE(!tasks[7].IsSleeping()); |
392 | | |
393 | | // Wake up task 0, 3 and 4. Task 5, 6, 7 running. |
394 | 1 | tasks[0].WakeUp(); |
395 | 1 | tasks[3].WakeUp(); |
396 | 1 | tasks[4].WakeUp(); |
397 | | |
398 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
399 | 1 | ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); |
400 | 4 | for (size_t i = 5; i < 8; i++) { |
401 | 3 | ASSERT_TRUE(tasks[i].IsSleeping()); |
402 | 3 | } |
403 | | |
404 | | // Shrink back to 1 thread. Still task 5, 6, 7 running |
405 | 1 | env_->SetBackgroundThreads(1, Env::Priority::HIGH); |
406 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
407 | 1 | ASSERT_TRUE(tasks[5].IsSleeping()); |
408 | 1 | ASSERT_TRUE(tasks[6].IsSleeping()); |
409 | 1 | ASSERT_TRUE(tasks[7].IsSleeping()); |
410 | | |
411 | | // Wake up task 6. Task 5, 7 running |
412 | 1 | tasks[6].WakeUp(); |
413 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
414 | 1 | ASSERT_TRUE(tasks[5].IsSleeping()); |
415 | 1 | ASSERT_TRUE(!tasks[6].IsSleeping()); |
416 | 1 | ASSERT_TRUE(tasks[7].IsSleeping()); |
417 | | |
418 | | // Wake up threads 7. Task 5 running |
419 | 1 | tasks[7].WakeUp(); |
420 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
421 | 1 | ASSERT_TRUE(!tasks[7].IsSleeping()); |
422 | | |
423 | | // Enqueue thread 8 and 9. Task 5 running; one of 8, 9 might be running. |
424 | 1 | env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[8], |
425 | 1 | Env::Priority::HIGH); |
426 | 1 | env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[9], |
427 | 1 | Env::Priority::HIGH); |
428 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
429 | 1 | ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), (unsigned int)0); |
430 | 1 | ASSERT_TRUE(!tasks[8].IsSleeping() || !tasks[9].IsSleeping()); |
431 | | |
432 | | // Increase to 4 threads. Task 5, 8, 9 running. |
433 | 1 | env_->SetBackgroundThreads(4, Env::Priority::HIGH); |
434 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
435 | 1 | ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); |
436 | 1 | ASSERT_TRUE(tasks[8].IsSleeping()); |
437 | 1 | ASSERT_TRUE(tasks[9].IsSleeping()); |
438 | | |
439 | | // Shrink to 1 thread |
440 | 1 | env_->SetBackgroundThreads(1, Env::Priority::HIGH); |
441 | | |
442 | | // Wake up thread 9. |
443 | 1 | tasks[9].WakeUp(); |
444 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
445 | 1 | ASSERT_TRUE(!tasks[9].IsSleeping()); |
446 | 1 | ASSERT_TRUE(tasks[8].IsSleeping()); |
447 | | |
448 | | // Wake up thread 8 |
449 | 1 | tasks[8].WakeUp(); |
450 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
451 | 1 | ASSERT_TRUE(!tasks[8].IsSleeping()); |
452 | | |
453 | | // Wake up the last thread |
454 | 1 | tasks[5].WakeUp(); |
455 | | |
456 | 1 | Env::Default()->SleepForMicroseconds(kDelayMicros); |
457 | 1 | ASSERT_TRUE(!tasks[5].IsSleeping()); |
458 | 1 | } |
459 | | |
460 | | #ifdef __linux__ |
461 | | // Travis doesn't support fallocate or getting unique ID from files for whatever |
462 | | // reason. |
463 | | #ifndef TRAVIS |
464 | | |
465 | | namespace { |
466 | | bool IsSingleVarint(const std::string& s) { |
467 | | Slice slice(s); |
468 | | |
469 | | uint64_t v; |
470 | | if (!GetVarint64(&slice, &v)) { |
471 | | return false; |
472 | | } |
473 | | |
474 | | return slice.size() == 0; |
475 | | } |
476 | | |
477 | | bool IsUniqueIDValid(const std::string& s) { |
478 | | return !s.empty() && !IsSingleVarint(s); |
479 | | } |
480 | | |
481 | | const size_t MAX_ID_SIZE = 100; |
482 | | char temp_id[MAX_ID_SIZE]; |
483 | | |
484 | | |
485 | | } // namespace |
486 | | |
487 | | // Determine whether we can use the FS_IOC_GETVERSION ioctl |
488 | | // on a file in directory DIR. Create a temporary file therein, |
489 | | // try to apply the ioctl (save that result), cleanup and |
490 | | // return the result. Return true if it is supported, and |
491 | | // false if anything fails. |
492 | | // Note that this function "knows" that dir has just been created |
493 | | // and is empty, so we create a simply-named test file: "f". |
494 | | bool ioctl_support__FS_IOC_GETVERSION(const std::string& dir) { |
495 | | const std::string file = dir + "/f"; |
496 | | int fd; |
497 | | do { |
498 | | fd = open(file.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); |
499 | | } while (fd < 0 && errno == EINTR); |
500 | | int version; |
501 | | bool ok = (fd >= 0 && ioctl(fd, FS_IOC_GETVERSION, &version) >= 0); |
502 | | |
503 | | close(fd); |
504 | | unlink(file.c_str()); |
505 | | |
506 | | return ok; |
507 | | } |
508 | | |
509 | | // To ensure that Env::GetUniqueId-related tests work correctly, the files |
510 | | // should be stored in regular storage like "hard disk" or "flash device", |
511 | | // and not on a tmpfs file system (like /dev/shm and /tmp on some systems). |
512 | | // Otherwise we cannot get the correct id. |
513 | | // |
514 | | // This function serves as the replacement for test::TmpDir(), which may be |
515 | | // customized to be on a file system that doesn't work with GetUniqueId(). |
516 | | |
517 | | class IoctlFriendlyTmpdir { |
518 | | public: |
519 | | IoctlFriendlyTmpdir() { |
520 | | char dir_buf[100]; |
521 | | std::list<std::string> candidate_dir_list = {"/var/tmp", "/tmp"}; |
522 | | |
523 | | const char *fmt = "%s/rocksdb.XXXXXX"; |
524 | | const char *tmp = getenv("TEST_IOCTL_FRIENDLY_TMPDIR"); |
525 | | // If $TEST_IOCTL_FRIENDLY_TMPDIR/rocksdb.XXXXXX fits, use |
526 | | // $TEST_IOCTL_FRIENDLY_TMPDIR; subtract 2 for the "%s", and |
527 | | // add 1 for the trailing NUL byte. |
528 | | if (tmp && strlen(tmp) + strlen(fmt) - 2 + 1 <= sizeof dir_buf) { |
529 | | // use $TEST_IOCTL_FRIENDLY_TMPDIR value |
530 | | candidate_dir_list.push_front(tmp); |
531 | | } |
532 | | |
533 | | for (const std::string& d : candidate_dir_list) { |
534 | | snprintf(dir_buf, sizeof dir_buf, fmt, d.c_str()); |
535 | | if (mkdtemp(dir_buf)) { |
536 | | if (ioctl_support__FS_IOC_GETVERSION(dir_buf)) { |
537 | | dir_ = dir_buf; |
538 | | return; |
539 | | } else { |
540 | | // Diagnose ioctl-related failure only if this is the |
541 | | // directory specified via that envvar. |
542 | | if (tmp == d) { |
543 | | fprintf(stderr, "TEST_IOCTL_FRIENDLY_TMPDIR-specified directory is " |
544 | | "not suitable: %s\n", d.c_str()); |
545 | | } |
546 | | rmdir(dir_buf); // ignore failure |
547 | | } |
548 | | } else { |
549 | | // mkdtemp failed: diagnose it, but don't give up. |
550 | | fprintf(stderr, "mkdtemp(%s/...) failed: %s\n", d.c_str(), |
551 | | strerror(errno)); |
552 | | } |
553 | | } |
554 | | |
555 | | fprintf(stderr, "failed to find an ioctl-friendly temporary directory;" |
556 | | " specify one via the TEST_IOCTL_FRIENDLY_TMPDIR envvar\n"); |
557 | | std::abort(); |
558 | | } |
559 | | |
560 | | ~IoctlFriendlyTmpdir() { |
561 | | rmdir(dir_.c_str()); |
562 | | } |
563 | | const std::string& name() { |
564 | | return dir_; |
565 | | } |
566 | | |
567 | | private: |
568 | | std::string dir_; |
569 | | }; |
570 | | |
571 | | |
572 | | // Only works in linux platforms |
573 | | TEST_F(EnvPosixTest, RandomAccessUniqueID) { |
574 | | // Create file. |
575 | | const EnvOptions soptions; |
576 | | IoctlFriendlyTmpdir ift; |
577 | | std::string fname = ift.name() + "/testfile"; |
578 | | unique_ptr<WritableFile> wfile; |
579 | | ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); |
580 | | |
581 | | unique_ptr<RandomAccessFile> file; |
582 | | |
583 | | // Get Unique ID |
584 | | ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); |
585 | | size_t id_size = file->GetUniqueId(temp_id); |
586 | | ASSERT_GT(id_size, 0); |
587 | | std::string unique_id1(temp_id, id_size); |
588 | | ASSERT_TRUE(IsUniqueIDValid(unique_id1)); |
589 | | |
590 | | // Get Unique ID again |
591 | | ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); |
592 | | id_size = file->GetUniqueId(temp_id); |
593 | | ASSERT_GT(id_size, 0); |
594 | | std::string unique_id2(temp_id, id_size); |
595 | | ASSERT_TRUE(IsUniqueIDValid(unique_id2)); |
596 | | |
597 | | // Get Unique ID again after waiting some time. |
598 | | env_->SleepForMicroseconds(1000000); |
599 | | ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); |
600 | | id_size = file->GetUniqueId(temp_id); |
601 | | ASSERT_GT(id_size, 0); |
602 | | std::string unique_id3(temp_id, id_size); |
603 | | ASSERT_TRUE(IsUniqueIDValid(unique_id3)); |
604 | | |
605 | | // Check IDs are the same. |
606 | | ASSERT_EQ(unique_id1, unique_id2); |
607 | | ASSERT_EQ(unique_id2, unique_id3); |
608 | | |
609 | | // Delete the file |
610 | | ASSERT_OK(env_->DeleteFile(fname)); |
611 | | } |
612 | | |
613 | | // only works in linux platforms |
614 | | #ifdef ROCKSDB_FALLOCATE_PRESENT |
615 | | TEST_F(EnvPosixTest, AllocateTest) { |
616 | | IoctlFriendlyTmpdir ift; |
617 | | std::string fname = ift.name() + "/preallocate_testfile"; |
618 | | |
619 | | // Try fallocate in a file to see whether the target file system supports it. |
620 | | // Skip the test if fallocate is not supported. |
621 | | std::string fname_test_fallocate = ift.name() + "/preallocate_testfile_2"; |
622 | | int fd = -1; |
623 | | do { |
624 | | fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); |
625 | | } while (fd < 0 && errno == EINTR); |
626 | | ASSERT_GT(fd, 0); |
627 | | |
628 | | int alloc_status = fallocate(fd, 0, 0, 1); |
629 | | |
630 | | int err_number = 0; |
631 | | if (alloc_status != 0) { |
632 | | err_number = errno; |
633 | | fprintf(stderr, "Warning: fallocate() fails, %s\n", strerror(err_number)); |
634 | | } |
635 | | close(fd); |
636 | | ASSERT_OK(env_->DeleteFile(fname_test_fallocate)); |
637 | | if (alloc_status != 0 && err_number == EOPNOTSUPP) { |
638 | | // The filesystem containing the file does not support fallocate |
639 | | return; |
640 | | } |
641 | | |
642 | | EnvOptions soptions; |
643 | | soptions.use_mmap_writes = false; |
644 | | unique_ptr<WritableFile> wfile; |
645 | | ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); |
646 | | |
647 | | // allocate 100 MB |
648 | | size_t kPreallocateSize = 100 * 1024 * 1024; |
649 | | size_t kBlockSize = 512; |
650 | | size_t kPageSize = 4096; |
651 | | std::string data(1024 * 1024, 'a'); |
652 | | wfile->SetPreallocationBlockSize(kPreallocateSize); |
653 | | wfile->PrepareWrite(wfile->GetFileSize(), data.size()); |
654 | | ASSERT_OK(wfile->Append(Slice(data))); |
655 | | ASSERT_OK(wfile->Flush()); |
656 | | |
657 | | struct stat f_stat; |
658 | | stat(fname.c_str(), &f_stat); |
659 | | ASSERT_EQ((unsigned int)data.size(), f_stat.st_size); |
660 | | // verify that blocks are preallocated |
661 | | // Note here that we don't check the exact number of blocks preallocated -- |
662 | | // we only require that number of allocated blocks is at least what we expect. |
663 | | // It looks like some FS give us more blocks that we asked for. That's fine. |
664 | | // It might be worth investigating further. |
665 | | ASSERT_LE((unsigned int)(kPreallocateSize / kBlockSize), f_stat.st_blocks); |
666 | | |
667 | | // close the file, should deallocate the blocks |
668 | | wfile.reset(); |
669 | | |
670 | | stat(fname.c_str(), &f_stat); |
671 | | ASSERT_EQ(data.size(), static_cast<size_t>(f_stat.st_size)); |
672 | | // verify that preallocated blocks were deallocated on file close |
673 | | // Because the FS might give us more blocks, we add a full page to the size |
674 | | // and expect the number of blocks to be less or equal to that. |
675 | | ASSERT_GE((f_stat.st_size + kPageSize + kBlockSize - 1) / kBlockSize, |
676 | | static_cast<size_t>(f_stat.st_blocks)); |
677 | | } |
678 | | #endif // ROCKSDB_FALLOCATE_PRESENT |
679 | | |
680 | | // Returns true if any of the strings in ss are the prefix of another string. |
681 | | bool HasPrefix(const std::unordered_set<std::string>& ss) { |
682 | | for (const std::string& s : ss) { |
683 | | if (s.empty()) { |
684 | | return true; |
685 | | } |
686 | | for (size_t i = 1; i < s.size(); ++i) { |
687 | | if (ss.count(s.substr(0, i)) != 0) { |
688 | | return true; |
689 | | } |
690 | | } |
691 | | } |
692 | | return false; |
693 | | } |
694 | | |
695 | | // Only works in linux platforms |
696 | | TEST_F(EnvPosixTest, RandomAccessUniqueIDConcurrent) { |
697 | | // Check whether a bunch of concurrently existing files have unique IDs. |
698 | | const EnvOptions soptions; |
699 | | |
700 | | // Create the files |
701 | | IoctlFriendlyTmpdir ift; |
702 | | std::vector<std::string> fnames; |
703 | | for (int i = 0; i < 1000; ++i) { |
704 | | fnames.push_back(ift.name() + "/" + "testfile" + ToString(i)); |
705 | | |
706 | | // Create file. |
707 | | unique_ptr<WritableFile> wfile; |
708 | | ASSERT_OK(env_->NewWritableFile(fnames[i], &wfile, soptions)); |
709 | | } |
710 | | |
711 | | // Collect and check whether the IDs are unique. |
712 | | std::unordered_set<std::string> ids; |
713 | | for (const std::string& fname : fnames) { |
714 | | unique_ptr<RandomAccessFile> file; |
715 | | std::string unique_id; |
716 | | ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); |
717 | | size_t id_size = file->GetUniqueId(temp_id); |
718 | | ASSERT_GT(id_size, 0); |
719 | | unique_id = std::string(temp_id, id_size); |
720 | | ASSERT_TRUE(IsUniqueIDValid(unique_id)); |
721 | | |
722 | | ASSERT_EQ(ids.count(unique_id), 0); |
723 | | ids.insert(unique_id); |
724 | | } |
725 | | |
726 | | // Delete the files |
727 | | for (const std::string& fname : fnames) { |
728 | | ASSERT_OK(env_->DeleteFile(fname)); |
729 | | } |
730 | | |
731 | | ASSERT_TRUE(!HasPrefix(ids)); |
732 | | } |
733 | | |
734 | | // Only works in linux platforms |
735 | | TEST_F(EnvPosixTest, RandomAccessUniqueIDDeletes) { |
736 | | const EnvOptions soptions; |
737 | | |
738 | | IoctlFriendlyTmpdir ift; |
739 | | std::string fname = ift.name() + "/" + "testfile"; |
740 | | |
741 | | // Check that after file is deleted we don't get same ID again in a new file. |
742 | | std::unordered_set<std::string> ids; |
743 | | for (int i = 0; i < 1000; ++i) { |
744 | | // Create file. |
745 | | { |
746 | | unique_ptr<WritableFile> wfile; |
747 | | ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); |
748 | | } |
749 | | |
750 | | // Get Unique ID |
751 | | std::string unique_id; |
752 | | { |
753 | | unique_ptr<RandomAccessFile> file; |
754 | | ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); |
755 | | size_t id_size = file->GetUniqueId(temp_id); |
756 | | ASSERT_GT(id_size, 0); |
757 | | unique_id = std::string(temp_id, id_size); |
758 | | } |
759 | | |
760 | | ASSERT_TRUE(IsUniqueIDValid(unique_id)); |
761 | | ASSERT_EQ(ids.count(unique_id), 0); |
762 | | ids.insert(unique_id); |
763 | | |
764 | | // Delete the file |
765 | | ASSERT_OK(env_->DeleteFile(fname)); |
766 | | } |
767 | | |
768 | | ASSERT_TRUE(!HasPrefix(ids)); |
769 | | } |
770 | | |
771 | | // Only works in linux platforms |
772 | | TEST_F(EnvPosixTest, InvalidateCache) { |
773 | | const EnvOptions soptions; |
774 | | std::string fname = test::TmpDir() + "/" + "testfile"; |
775 | | |
776 | | // Create file. |
777 | | { |
778 | | unique_ptr<WritableFile> wfile; |
779 | | ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); |
780 | | ASSERT_OK(wfile.get()->Append(Slice("Hello world"))); |
781 | | ASSERT_OK(wfile.get()->InvalidateCache(0, 0)); |
782 | | ASSERT_OK(wfile.get()->Close()); |
783 | | } |
784 | | |
785 | | // Random Read |
786 | | { |
787 | | unique_ptr<RandomAccessFile> file; |
788 | | char scratch[100]; |
789 | | Slice result; |
790 | | ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); |
791 | | ASSERT_OK(file.get()->Read(0, 11, &result, scratch)); |
792 | | ASSERT_EQ(memcmp(scratch, "Hello world", 11), 0); |
793 | | ASSERT_OK(file.get()->InvalidateCache(0, 11)); |
794 | | ASSERT_OK(file.get()->InvalidateCache(0, 0)); |
795 | | } |
796 | | |
797 | | // Sequential Read |
798 | | { |
799 | | unique_ptr<SequentialFile> file; |
800 | | uint8_t scratch[100]; |
801 | | Slice result; |
802 | | ASSERT_OK(env_->NewSequentialFile(fname, &file, soptions)); |
803 | | ASSERT_OK(file.get()->Read(11, &result, scratch)); |
804 | | ASSERT_EQ(memcmp(scratch, "Hello world", 11), 0); |
805 | | ASSERT_OK(file.get()->InvalidateCache(0, 11)); |
806 | | ASSERT_OK(file.get()->InvalidateCache(0, 0)); |
807 | | } |
808 | | // Delete the file |
809 | | ASSERT_OK(env_->DeleteFile(fname)); |
810 | | } |
811 | | #endif // not TRAVIS |
812 | | #endif // __linux__ |
813 | | |
814 | | class TestLogger : public Logger { |
815 | | public: |
816 | | using Logger::Logv; |
817 | 6 | void Logv(const char* format, va_list ap) override { |
818 | 6 | log_count++; |
819 | | |
820 | 6 | char new_format[550]; |
821 | 6 | std::fill_n(new_format, sizeof(new_format), '2'); |
822 | 6 | { |
823 | 6 | va_list backup_ap; |
824 | 6 | va_copy(backup_ap, ap); |
825 | 6 | int n = vsnprintf(new_format, sizeof(new_format) - 1, format, backup_ap); |
826 | | // 48 bytes for extra information + bytes allocated |
827 | | |
828 | | // When we have n == -1 there is not a terminating zero expected |
829 | | #ifdef OS_WIN |
830 | | if (n < 0) { |
831 | | char_0_count++; |
832 | | } |
833 | | #endif |
834 | | |
835 | 6 | if (new_format[0] == '[') { |
836 | | // "[DEBUG] " |
837 | 1 | ASSERT_TRUE(n <= 56 + (512 - static_cast<int>(sizeof(struct timeval)))); |
838 | 5 | } else { |
839 | 5 | ASSERT_TRUE(n <= 48 + (512 - static_cast<int>(sizeof(struct timeval)))); |
840 | 5 | } |
841 | 6 | va_end(backup_ap); |
842 | 6 | } |
843 | | |
844 | 3.30k | for (size_t i = 0; i < sizeof(new_format); i++) { |
845 | 3.30k | if (new_format[i] == 'x') { |
846 | 10 | char_x_count++; |
847 | 3.29k | } else if (new_format[i] == '\0') { |
848 | 6 | char_0_count++; |
849 | 6 | } |
850 | 3.30k | } |
851 | 6 | } |
852 | | int log_count; |
853 | | int char_x_count; |
854 | | int char_0_count; |
855 | | }; |
856 | | |
857 | 1 | TEST_F(EnvPosixTest, LogBufferTest) { |
858 | 1 | TestLogger test_logger; |
859 | 1 | test_logger.SetInfoLogLevel(InfoLogLevel::INFO_LEVEL); |
860 | 1 | test_logger.log_count = 0; |
861 | 1 | test_logger.char_x_count = 0; |
862 | 1 | test_logger.char_0_count = 0; |
863 | 1 | LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, &test_logger); |
864 | 1 | LogBuffer log_buffer_debug(DEBUG_LEVEL, &test_logger); |
865 | | |
866 | 1 | char bytes200[200]; |
867 | 1 | std::fill_n(bytes200, sizeof(bytes200), '1'); |
868 | 1 | bytes200[sizeof(bytes200) - 1] = '\0'; |
869 | 1 | char bytes600[600]; |
870 | 1 | std::fill_n(bytes600, sizeof(bytes600), '1'); |
871 | 1 | bytes600[sizeof(bytes600) - 1] = '\0'; |
872 | 1 | char bytes9000[9000]; |
873 | 1 | std::fill_n(bytes9000, sizeof(bytes9000), '1'); |
874 | 1 | bytes9000[sizeof(bytes9000) - 1] = '\0'; |
875 | | |
876 | 1 | LOG_TO_BUFFER(&log_buffer, "x%sx", bytes200); |
877 | 1 | LOG_TO_BUFFER(&log_buffer, "x%sx", bytes600); |
878 | 1 | LOG_TO_BUFFER(&log_buffer, "x%sx%sx%sx", bytes200, bytes200, bytes200); |
879 | 1 | LOG_TO_BUFFER(&log_buffer, "x%sx%sx", bytes200, bytes600); |
880 | 1 | LOG_TO_BUFFER(&log_buffer, "x%sx%sx", bytes600, bytes9000); |
881 | | |
882 | 1 | LOG_TO_BUFFER(&log_buffer_debug, "x%sx", bytes200); |
883 | 1 | test_logger.SetInfoLogLevel(DEBUG_LEVEL); |
884 | 1 | LOG_TO_BUFFER(&log_buffer_debug, "x%sx%sx%sx", bytes600, bytes9000, bytes200); |
885 | | |
886 | 1 | ASSERT_EQ(0, test_logger.log_count); |
887 | 1 | log_buffer.FlushBufferToLog(); |
888 | 1 | log_buffer_debug.FlushBufferToLog(); |
889 | 1 | ASSERT_EQ(6, test_logger.log_count); |
890 | 1 | ASSERT_EQ(6, test_logger.char_0_count); |
891 | 1 | ASSERT_EQ(10, test_logger.char_x_count); |
892 | 1 | } |
893 | | |
894 | | struct BufferedLogRecord { |
895 | | const char* file; |
896 | | int line; |
897 | | struct timeval now_tv; // Timestamp of the log |
898 | | char message[1]; // Beginning of log message |
899 | | }; |
900 | | |
901 | | class TestLogger2 : public Logger { |
902 | | public: |
903 | 2 | explicit TestLogger2(size_t max_log_size) : max_log_size_(max_log_size) {} |
904 | | using Logger::Logv; |
905 | 2 | void Logv(const char* format, va_list ap) override { |
906 | 2 | char new_format[2000]; |
907 | 2 | std::fill_n(new_format, sizeof(new_format), '2'); |
908 | 2 | { |
909 | 2 | va_list backup_ap; |
910 | 2 | va_copy(backup_ap, ap); |
911 | 2 | int n = vsnprintf(new_format, sizeof(new_format) - 1, format, backup_ap); |
912 | | // We allocate max_log_size memory per record. |
913 | | // Begin of block is taken by header, and 1 byte is taken by zero terminator. |
914 | 2 | ASSERT_EQ(static_cast<int>(max_log_size_ - LogBuffer::HeaderSize() - 1), n); |
915 | 2 | va_end(backup_ap); |
916 | 2 | } |
917 | 2 | } |
918 | | size_t max_log_size_; |
919 | | }; |
920 | | |
921 | 1 | TEST_F(EnvPosixTest, LogBufferMaxSizeTest) { |
922 | 1 | char bytes9000[9000]; |
923 | 1 | std::fill_n(bytes9000, sizeof(bytes9000), '1'); |
924 | 1 | bytes9000[sizeof(bytes9000) - 1] = '\0'; |
925 | | |
926 | 3 | for (size_t max_log_size = 256; max_log_size <= 1024; |
927 | 2 | max_log_size += 1024 - 256) { |
928 | 2 | TestLogger2 test_logger(max_log_size); |
929 | 2 | test_logger.SetInfoLogLevel(InfoLogLevel::INFO_LEVEL); |
930 | 2 | LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, &test_logger); |
931 | 2 | LOG_TO_BUFFER(&log_buffer, max_log_size, "%s", bytes9000); |
932 | 2 | log_buffer.FlushBufferToLog(); |
933 | 2 | } |
934 | 1 | } |
935 | | |
936 | 1 | TEST_F(EnvPosixTest, Preallocation) { |
937 | 1 | const std::string src = test::TmpDir() + "/" + "testfile"; |
938 | 1 | unique_ptr<WritableFile> srcfile; |
939 | 1 | const EnvOptions soptions; |
940 | 1 | ASSERT_OK(env_->NewWritableFile(src, &srcfile, soptions)); |
941 | 1 | srcfile->SetPreallocationBlockSize(1024 * 1024); |
942 | | |
943 | | // No writes should mean no preallocation |
944 | 1 | size_t block_size, last_allocated_block; |
945 | 1 | srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); |
946 | 1 | ASSERT_EQ(last_allocated_block, 0UL); |
947 | | |
948 | | // Small write should preallocate one block |
949 | 1 | std::string str = "test"; |
950 | 1 | srcfile->PrepareWrite(srcfile->GetFileSize(), str.size()); |
951 | 1 | ASSERT_OK(srcfile->Append(str)); |
952 | 1 | srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); |
953 | 1 | ASSERT_EQ(last_allocated_block, 1UL); |
954 | | |
955 | | // Write an entire preallocation block, make sure we increased by two. |
956 | 1 | std::string buf(block_size, ' '); |
957 | 1 | srcfile->PrepareWrite(srcfile->GetFileSize(), buf.size()); |
958 | 1 | ASSERT_OK(srcfile->Append(buf)); |
959 | 1 | srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); |
960 | 1 | ASSERT_EQ(last_allocated_block, 2UL); |
961 | | |
962 | | // Write five more blocks at once, ensure we're where we need to be. |
963 | 1 | buf = std::string(block_size * 5, ' '); |
964 | 1 | srcfile->PrepareWrite(srcfile->GetFileSize(), buf.size()); |
965 | 1 | ASSERT_OK(srcfile->Append(buf)); |
966 | 1 | srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); |
967 | 1 | ASSERT_EQ(last_allocated_block, 7UL); |
968 | 1 | } |
969 | | |
970 | | // Test that the two ways to get children file attributes (in bulk or |
971 | | // individually) behave consistently. |
972 | 1 | TEST_F(EnvPosixTest, ConsistentChildrenAttributes) { |
973 | 1 | const EnvOptions soptions; |
974 | 1 | const int kNumChildren = 10; |
975 | | |
976 | 1 | std::string data; |
977 | 11 | for (int i = 0; i < kNumChildren; ++i) { |
978 | 10 | std::ostringstream oss; |
979 | 10 | oss << test::TmpDir() << "/testfile_" << i; |
980 | 10 | const std::string path = oss.str(); |
981 | 10 | unique_ptr<WritableFile> file; |
982 | 10 | ASSERT_OK(env_->NewWritableFile(path, &file, soptions)); |
983 | 10 | ASSERT_OK(file->Append(data)); |
984 | 10 | data.append("test"); |
985 | 10 | } |
986 | | |
987 | 1 | std::vector<Env::FileAttributes> file_attrs; |
988 | 1 | ASSERT_OK(env_->GetChildrenFileAttributes(test::TmpDir(), &file_attrs)); |
989 | 11 | for (int i = 0; i < kNumChildren; ++i) { |
990 | 10 | std::ostringstream oss; |
991 | 10 | oss << "testfile_" << i; |
992 | 10 | const std::string name = oss.str(); |
993 | 10 | const std::string path = test::TmpDir() + "/" + name; |
994 | | |
995 | 10 | auto file_attrs_iter = std::find_if( |
996 | 10 | file_attrs.begin(), file_attrs.end(), |
997 | 75 | [&name](const Env::FileAttributes& fm) { return fm.name == name; }); |
998 | 10 | ASSERT_TRUE(file_attrs_iter != file_attrs.end()); |
999 | 10 | uint64_t size; |
1000 | 10 | ASSERT_OK(env_->GetFileSize(path, &size)); |
1001 | 10 | ASSERT_EQ(size, 4 * i); |
1002 | 10 | ASSERT_EQ(size, file_attrs_iter->size_bytes); |
1003 | 10 | } |
1004 | 1 | } |
1005 | | |
1006 | | // Test that all WritableFileWrapper forwards all calls to WritableFile. |
1007 | 1 | TEST_F(EnvPosixTest, WritableFileWrapper) { |
1008 | 1 | class Base : public WritableFile { |
1009 | 1 | public: |
1010 | 1 | mutable int *step_; |
1011 | | |
1012 | 14 | void inc(int x) const { |
1013 | 14 | EXPECT_EQ(x, (*step_)++); |
1014 | 14 | } |
1015 | | |
1016 | 1 | explicit Base(int* step) : step_(step) { |
1017 | 1 | inc(0); |
1018 | 1 | } |
1019 | | |
1020 | 1 | Status Append(const Slice& data) override { inc(1); return Status::OK(); } |
1021 | 0 | Status Truncate(uint64_t size) override { return Status::OK(); } |
1022 | 1 | Status Close() override { inc(2); return Status::OK(); } |
1023 | 1 | Status Flush() override { inc(3); return Status::OK(); } |
1024 | 1 | Status Sync() override { inc(4); return Status::OK(); } |
1025 | 1 | Status Fsync() override { inc(5); return Status::OK(); } |
1026 | 1 | void SetIOPriority(Env::IOPriority pri) override { inc(6); } |
1027 | 1 | uint64_t GetFileSize() override { inc(7); return 0; } |
1028 | 1 | void GetPreallocationStatus(size_t* block_size, |
1029 | 1 | size_t* last_allocated_block) override { |
1030 | 1 | inc(8); |
1031 | 1 | } |
1032 | 1 | size_t GetUniqueId(char* id) const override { |
1033 | 1 | inc(9); |
1034 | 1 | return 0; |
1035 | 1 | } |
1036 | 1 | Status InvalidateCache(size_t offset, size_t length) override { |
1037 | 1 | inc(10); |
1038 | 1 | return Status::OK(); |
1039 | 1 | } |
1040 | | |
1041 | 1 | protected: |
1042 | 1 | Status Allocate(uint64_t offset, uint64_t len) override { |
1043 | 1 | inc(11); |
1044 | 1 | return Status::OK(); |
1045 | 1 | } |
1046 | 1 | Status RangeSync(uint64_t offset, uint64_t nbytes) override { |
1047 | 1 | inc(12); |
1048 | 1 | return Status::OK(); |
1049 | 1 | } |
1050 | | |
1051 | 1 | public: |
1052 | 1 | ~Base() { |
1053 | 1 | inc(13); |
1054 | 1 | } |
1055 | 1 | }; |
1056 | | |
1057 | 1 | class Wrapper : public WritableFileWrapper { |
1058 | 1 | public: |
1059 | 1 | explicit Wrapper(std::unique_ptr<WritableFile> target) : |
1060 | 1 | WritableFileWrapper(std::move(target)) {} |
1061 | | |
1062 | 1 | void CallProtectedMethods() { |
1063 | 1 | CHECK_OK(Allocate(0, 0)); |
1064 | 1 | CHECK_OK(RangeSync(0, 0)); |
1065 | 1 | } |
1066 | 1 | }; |
1067 | | |
1068 | 1 | int step = 0; |
1069 | | |
1070 | 1 | { |
1071 | 1 | auto b = std::make_unique<Base>(&step); |
1072 | 1 | Wrapper w(std::move(b)); |
1073 | 1 | ASSERT_OK(w.Append(Slice())); |
1074 | 1 | ASSERT_OK(w.Close()); |
1075 | 1 | ASSERT_OK(w.Flush()); |
1076 | 1 | ASSERT_OK(w.Sync()); |
1077 | 1 | ASSERT_OK(w.Fsync()); |
1078 | 1 | w.SetIOPriority(Env::IOPriority::IO_HIGH); |
1079 | 1 | w.GetFileSize(); |
1080 | 1 | w.GetPreallocationStatus(nullptr, nullptr); |
1081 | 1 | w.GetUniqueId(nullptr); |
1082 | 1 | ASSERT_OK(w.InvalidateCache(0, 0)); |
1083 | 1 | w.CallProtectedMethods(); |
1084 | 1 | } |
1085 | | |
1086 | 1 | EXPECT_EQ(14, step); |
1087 | 1 | } |
1088 | | |
1089 | | } // namespace rocksdb |
1090 | | |
1091 | 13.2k | int main(int argc, char** argv) { |
1092 | 13.2k | ::testing::InitGoogleTest(&argc, argv); |
1093 | 13.2k | return RUN_ALL_TESTS(); |
1094 | 13.2k | } |