YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/rocksdb/util/thread_local_test.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under the BSD-style license found in the
3
//  LICENSE file in the root directory of this source tree. An additional grant
4
//  of patent rights can be found in the PATENTS file in the same directory.
5
//
6
// The following only applies to changes made to this file as part of YugaByte development.
7
//
8
// Portions Copyright (c) YugaByte, Inc.
9
//
10
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
11
// in compliance with the License.  You may obtain a copy of the License at
12
//
13
// http://www.apache.org/licenses/LICENSE-2.0
14
//
15
// Unless required by applicable law or agreed to in writing, software distributed under the License
16
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
17
// or implied.  See the License for the specific language governing permissions and limitations
18
// under the License.
19
//
20
21
#include <thread>
22
#include <atomic>
23
#include <string>
24
25
#include "yb/rocksdb/env.h"
26
#include "yb/rocksdb/port/port.h"
27
#include "yb/rocksdb/util/autovector.h"
28
#include "yb/rocksdb/util/sync_point.h"
29
#include <gtest/gtest.h>
30
#include "yb/rocksdb/util/thread_local.h"
31
#include "yb/rocksdb/util/testutil.h"
32
#include "yb/util/tostring.h"
33
34
namespace rocksdb {
35
36
class ThreadLocalTest : public RocksDBTest {
37
 public:
38
7
  ThreadLocalTest() : env_(Env::Default()) {}
39
40
  Env* env_;
41
};
42
43
namespace {
44
45
struct Params {
46
  Params(port::Mutex* m, port::CondVar* c, int* u, int n,
47
         UnrefHandler handler = nullptr)
48
      : mu(m),
49
        cv(c),
50
        unref(u),
51
        total(n),
52
        started(0),
53
        completed(0),
54
        doWrite(false),
55
        tls1(handler),
56
44
        tls2(nullptr) {}
57
58
  port::Mutex* mu;
59
  port::CondVar* cv;
60
  int* unref;
61
  int total;
62
  int started;
63
  int completed;
64
  bool doWrite;
65
  ThreadLocalPtr tls1;
66
  ThreadLocalPtr* tls2;
67
};
68
69
class IDChecker : public ThreadLocalPtr {
70
 public:
71
2.06k
  static uint32_t PeekId() { return Instance()->PeekId(); }
72
3
  static vector<uint32_t> PeekIds() { return Instance()->PeekIds(); }
73
};
74
75
}  // anonymous namespace
76
77
1
TEST_F(ThreadLocalTest, UniqueIdTest) {
78
1
  {
79
1
    port::Mutex mu;
80
1
    port::CondVar cv(&mu);
81
82
1
    ASSERT_EQ(IDChecker::PeekId(), 0u);
83
    // New ThreadLocal instance bumps id by 1
84
1
    {
85
      // Id used 0
86
1
      Params p1(&mu, &cv, nullptr, 1u);
87
1
      ASSERT_EQ(IDChecker::PeekId(), 1u);
88
      // Id used 1
89
1
      Params p2(&mu, &cv, nullptr, 1u);
90
1
      ASSERT_EQ(IDChecker::PeekId(), 2u);
91
      // Id used 2
92
1
      Params p3(&mu, &cv, nullptr, 1u);
93
1
      ASSERT_EQ(IDChecker::PeekId(), 3u);
94
      // Id used 3
95
1
      Params p4(&mu, &cv, nullptr, 1u);
96
1
      ASSERT_EQ(IDChecker::PeekId(), 4u);
97
1
    }
98
1
    ASSERT_EQ(IDChecker::PeekId(), 0u);
99
    // id 3, 2, 1, 0 are in the free queue in order
100
1
    ASSERT_EQ("[3, 2, 1, 0]", yb::ToString(IDChecker::PeekIds()));
101
102
    // pick up 0
103
1
    Params p1(&mu, &cv, nullptr, 1u);
104
1
    ASSERT_EQ(IDChecker::PeekId(), 1u);
105
    // pick up 1
106
1
    Params* p2 = new Params(&mu, &cv, nullptr, 1u);
107
1
    ASSERT_EQ(IDChecker::PeekId(), 2u);
108
    // pick up 2
109
1
    Params p3(&mu, &cv, nullptr, 1u);
110
1
    ASSERT_EQ(IDChecker::PeekId(), 3u);
111
    // return up 1
112
1
    delete p2;
113
1
    ASSERT_EQ(IDChecker::PeekId(), 1u);
114
    // Now we have 3, 1 in queue
115
    // pick up 1
116
1
    Params p4(&mu, &cv, nullptr, 1u);
117
1
    ASSERT_EQ(IDChecker::PeekId(), 3u);
118
    // pick up 3
119
1
    Params p5(&mu, &cv, nullptr, 1u);
120
    // next new id
121
1
    ASSERT_EQ(IDChecker::PeekId(), 4u);
122
1
  }
123
  // After exit, id sequence in queue:
124
1
  ASSERT_EQ("[3, 1, 2, 0]", yb::ToString(IDChecker::PeekIds()));
125
1
}
126
127
1
TEST_F(ThreadLocalTest, SequentialReadWriteTest) {
128
1
  ASSERT_EQ(0u, IDChecker::PeekId());
129
1
  ASSERT_EQ("[3, 1, 2, 0]", yb::ToString(IDChecker::PeekIds()));
130
131
1
  port::Mutex mu;
132
1
  port::CondVar cv(&mu);
133
1
  Params p(&mu, &cv, nullptr, 1);
134
1
  ThreadLocalPtr tls2;
135
1
  p.tls2 = &tls2;
136
1
  ASSERT_EQ(1u, IDChecker::PeekId());
137
138
1.02k
  auto func = [](void* ptr) {
139
1.02k
    auto& params = *static_cast<Params*>(ptr);
140
141
1.02k
    ASSERT_TRUE(params.tls1.Get() == nullptr);
142
1.02k
    params.tls1.Reset(reinterpret_cast<int*>(1));
143
1.02k
    ASSERT_TRUE(params.tls1.Get() == reinterpret_cast<int*>(1));
144
1.02k
    params.tls1.Reset(reinterpret_cast<int*>(2));
145
1.02k
    ASSERT_TRUE(params.tls1.Get() == reinterpret_cast<int*>(2));
146
147
1.02k
    ASSERT_TRUE(params.tls2->Get() == nullptr);
148
1.02k
    params.tls2->Reset(reinterpret_cast<int*>(1));
149
1.02k
    ASSERT_TRUE(params.tls2->Get() == reinterpret_cast<int*>(1));
150
1.02k
    params.tls2->Reset(reinterpret_cast<int*>(2));
151
1.02k
    ASSERT_TRUE(params.tls2->Get() == reinterpret_cast<int*>(2));
152
153
1.02k
    params.mu->Lock();
154
1.02k
    ++(params.completed);
155
1.02k
    params.cv->SignalAll();
156
1.02k
    params.mu->Unlock();
157
1.02k
  };
158
159
1.02k
  for (int iter = 0; iter < 1024; ++iter) {
160
2.04k
    ASSERT_EQ(1u, IDChecker::PeekId()) << "At iter=" << iter;
161
    // Another new thread, read/write should not see value from previous thread
162
1.02k
    env_->StartThread(func, static_cast<void*>(&p));
163
1.02k
    mu.Lock();
164
2.04k
    while (p.completed != iter + 1) {
165
1.01k
      cv.Wait();
166
1.01k
    }
167
1.02k
    mu.Unlock();
168
1.02k
    ASSERT_EQ(1u, IDChecker::PeekId());
169
1.02k
  }
170
1
}
171
172
1
TEST_F(ThreadLocalTest, ConcurrentReadWriteTest) {
173
  // global id list carries over 3, 1, 2, 0
174
1
  ASSERT_EQ(IDChecker::PeekId(), 0u);
175
176
1
  ThreadLocalPtr tls2;
177
1
  port::Mutex mu1;
178
1
  port::CondVar cv1(&mu1);
179
1
  Params p1(&mu1, &cv1, nullptr, 16);
180
1
  p1.tls2 = &tls2;
181
182
1
  port::Mutex mu2;
183
1
  port::CondVar cv2(&mu2);
184
1
  Params p2(&mu2, &cv2, nullptr, 16);
185
1
  p2.doWrite = true;
186
1
  p2.tls2 = &tls2;
187
188
32
  auto func = [](void* ptr) {
189
32
    auto& p = *static_cast<Params*>(ptr);
190
191
32
    p.mu->Lock();
192
    // Size_T switches size along with the ptr size
193
    // we want to cast to.
194
32
    size_t own = ++(p.started);
195
32
    p.cv->SignalAll();
196
151
    while (p.started != p.total) {
197
119
      p.cv->Wait();
198
119
    }
199
32
    p.mu->Unlock();
200
201
    // Let write threads write a different value from the read threads
202
32
    if (p.doWrite) {
203
16
      own += 8192;
204
16
    }
205
206
32
    ASSERT_TRUE(p.tls1.Get() == nullptr);
207
32
    ASSERT_TRUE(p.tls2->Get() == nullptr);
208
209
32
    auto* env = Env::Default();
210
32
    auto start = env->NowMicros();
211
212
32
    p.tls1.Reset(reinterpret_cast<size_t*>(own));
213
32
    p.tls2->Reset(reinterpret_cast<size_t*>(own + 1));
214
    // Loop for 1 second
215
670k
    while (env->NowMicros() - start < 1000 * 1000) {
216
2.84M
      for (int iter = 0; iter < 100000; ++iter) {
217
2.17M
        ASSERT_TRUE(p.tls1.Get() == reinterpret_cast<size_t*>(own));
218
2.49M
        ASSERT_TRUE(p.tls2->Get() == reinterpret_cast<size_t*>(own + 1));
219
2.84M
        if (p.doWrite) {
220
1.58M
          p.tls1.Reset(reinterpret_cast<size_t*>(own));
221
1.58M
          p.tls2->Reset(reinterpret_cast<size_t*>(own + 1));
222
1.58M
        }
223
2.84M
      }
224
61
    }
225
226
670k
    p.mu->Lock();
227
670k
    ++(p.completed);
228
670k
    p.cv->SignalAll();
229
670k
    p.mu->Unlock();
230
670k
  };
231
232
  // Initiate 2 instnaces: one keeps writing and one keeps reading.
233
  // The read instance should not see data from the write instance.
234
  // Each thread local copy of the value are also different from each
235
  // other.
236
17
  for (int th = 0; th < p1.total; ++th) {
237
16
    env_->StartThread(func, static_cast<void*>(&p1));
238
16
  }
239
17
  for (int th = 0; th < p2.total; ++th) {
240
16
    env_->StartThread(func, static_cast<void*>(&p2));
241
16
  }
242
243
1
  mu1.Lock();
244
17
  while (p1.completed != p1.total) {
245
16
    cv1.Wait();
246
16
  }
247
1
  mu1.Unlock();
248
249
1
  mu2.Lock();
250
6
  while (p2.completed != p2.total) {
251
5
    cv2.Wait();
252
5
  }
253
1
  mu2.Unlock();
254
255
1
  ASSERT_EQ(IDChecker::PeekId(), 3u);
256
1
}
257
258
1
TEST_F(ThreadLocalTest, Unref) {
259
1
  ASSERT_EQ(IDChecker::PeekId(), 0u);
260
261
1.02k
  auto unref = [](void* ptr) {
262
1.02k
    auto& p = *static_cast<Params*>(ptr);
263
1.02k
    p.mu->Lock();
264
1.02k
    ++(*p.unref);
265
1.02k
    p.mu->Unlock();
266
1.02k
  };
267
268
  // Case 0: no unref triggered if ThreadLocalPtr is never accessed
269
253
  auto func0 = [](void* ptr) {
270
253
    auto& p = *static_cast<Params*>(ptr);
271
272
253
    p.mu->Lock();
273
253
    ++(p.started);
274
253
    p.cv->SignalAll();
275
1.49k
    while (p.started != p.total) {
276
1.24k
      p.cv->Wait();
277
1.24k
    }
278
253
    p.mu->Unlock();
279
253
  };
280
281
9
  for (int th = 1; th <= 128; th += th) {
282
8
    port::Mutex mu;
283
8
    port::CondVar cv(&mu);
284
8
    int unref_count = 0;
285
8
    Params p(&mu, &cv, &unref_count, th, unref);
286
287
263
    for (int i = 0; i < p.total; ++i) {
288
255
      env_->StartThread(func0, static_cast<void*>(&p));
289
255
    }
290
8
    env_->WaitForJoin();
291
8
    ASSERT_EQ(unref_count, 0);
292
8
  }
293
294
  // Case 1: unref triggered by thread exit
295
255
  auto func1 = [](void* ptr) {
296
255
    auto& p = *static_cast<Params*>(ptr);
297
298
255
    p.mu->Lock();
299
255
    ++(p.started);
300
255
    p.cv->SignalAll();
301
1.77k
    while (p.started != p.total) {
302
1.51k
      p.cv->Wait();
303
1.51k
    }
304
255
    p.mu->Unlock();
305
306
255
    ASSERT_TRUE(p.tls1.Get() == nullptr);
307
253
    ASSERT_TRUE(p.tls2->Get() == nullptr);
308
309
253
    p.tls1.Reset(ptr);
310
253
    p.tls2->Reset(ptr);
311
312
253
    p.tls1.Reset(ptr);
313
253
    p.tls2->Reset(ptr);
314
253
  };
315
316
9
  for (int th = 1; th <= 128; th += th) {
317
8
    port::Mutex mu;
318
8
    port::CondVar cv(&mu);
319
8
    int unref_count = 0;
320
8
    ThreadLocalPtr tls2(unref);
321
8
    Params p(&mu, &cv, &unref_count, th, unref);
322
8
    p.tls2 = &tls2;
323
324
263
    for (int i = 0; i < p.total; ++i) {
325
255
      env_->StartThread(func1, static_cast<void*>(&p));
326
255
    }
327
328
8
    env_->WaitForJoin();
329
330
    // N threads x 2 ThreadLocal instance cleanup on thread exit
331
8
    ASSERT_EQ(unref_count, 2 * p.total);
332
8
  }
333
334
  // Case 2: unref triggered by ThreadLocal instance destruction
335
254
  auto func2 = [](void* ptr) {
336
254
    auto& p = *static_cast<Params*>(ptr);
337
338
254
    p.mu->Lock();
339
254
    ++(p.started);
340
254
    p.cv->SignalAll();
341
1.64k
    while (p.started != p.total) {
342
1.39k
      p.cv->Wait();
343
1.39k
    }
344
254
    p.mu->Unlock();
345
346
254
    ASSERT_TRUE(p.tls1.Get() == nullptr);
347
254
    ASSERT_TRUE(p.tls2->Get() == nullptr);
348
349
254
    p.tls1.Reset(ptr);
350
254
    p.tls2->Reset(ptr);
351
352
254
    p.tls1.Reset(ptr);
353
254
    p.tls2->Reset(ptr);
354
355
254
    p.mu->Lock();
356
254
    ++(p.completed);
357
254
    p.cv->SignalAll();
358
359
    // Waiting for instruction to exit thread
360
1.18k
    while (p.completed != 0) {
361
926
      p.cv->Wait();
362
926
    }
363
254
    p.mu->Unlock();
364
254
  };
365
366
9
  for (int th = 1; th <= 128; th += th) {
367
8
    port::Mutex mu;
368
8
    port::CondVar cv(&mu);
369
8
    int unref_count = 0;
370
8
    Params p(&mu, &cv, &unref_count, th, unref);
371
8
    p.tls2 = new ThreadLocalPtr(unref);
372
373
263
    for (int i = 0; i < p.total; ++i) {
374
255
      env_->StartThread(func2, static_cast<void*>(&p));
375
255
    }
376
377
    // Wait for all threads to finish using Params
378
8
    mu.Lock();
379
55
    while (p.completed != p.total) {
380
47
      cv.Wait();
381
47
    }
382
8
    mu.Unlock();
383
384
    // Now destroy one ThreadLocal instance
385
8
    delete p.tls2;
386
8
    p.tls2 = nullptr;
387
    // instance destroy for N threads
388
8
    ASSERT_EQ(unref_count, p.total);
389
390
    // Signal to exit
391
8
    mu.Lock();
392
8
    p.completed = 0;
393
8
    cv.SignalAll();
394
8
    mu.Unlock();
395
8
    env_->WaitForJoin();
396
    // additional N threads exit unref for the left instance
397
8
    ASSERT_EQ(unref_count, 2 * p.total);
398
8
  }
399
1
}
400
401
1
TEST_F(ThreadLocalTest, Swap) {
402
1
  ThreadLocalPtr tls;
403
1
  tls.Reset(reinterpret_cast<void*>(1));
404
1
  ASSERT_EQ(reinterpret_cast<int64_t>(tls.Swap(nullptr)), 1);
405
1
  ASSERT_TRUE(tls.Swap(reinterpret_cast<void*>(2)) == nullptr);
406
1
  ASSERT_EQ(reinterpret_cast<int64_t>(tls.Get()), 2);
407
1
  ASSERT_EQ(reinterpret_cast<int64_t>(tls.Swap(reinterpret_cast<void*>(3))), 2);
408
1
}
409
410
1
TEST_F(ThreadLocalTest, Scrape) {
411
0
  auto unref = [](void* ptr) {
412
0
    auto& p = *static_cast<Params*>(ptr);
413
0
    p.mu->Lock();
414
0
    ++(*p.unref);
415
0
    p.mu->Unlock();
416
0
  };
417
418
253
  auto func = [](void* ptr) {
419
253
    auto& p = *static_cast<Params*>(ptr);
420
421
253
    ASSERT_TRUE(p.tls1.Get() == nullptr);
422
255
    ASSERT_TRUE(p.tls2->Get() == nullptr);
423
424
255
    p.tls1.Reset(ptr);
425
255
    p.tls2->Reset(ptr);
426
427
255
    p.tls1.Reset(ptr);
428
255
    p.tls2->Reset(ptr);
429
430
255
    p.mu->Lock();
431
255
    ++(p.completed);
432
255
    p.cv->SignalAll();
433
434
    // Waiting for instruction to exit thread
435
1.67k
    while (p.completed != 0) {
436
1.42k
      p.cv->Wait();
437
1.42k
    }
438
255
    p.mu->Unlock();
439
255
  };
440
441
9
  for (int th = 1; th <= 128; th += th) {
442
8
    port::Mutex mu;
443
8
    port::CondVar cv(&mu);
444
8
    int unref_count = 0;
445
8
    Params p(&mu, &cv, &unref_count, th, unref);
446
8
    p.tls2 = new ThreadLocalPtr(unref);
447
448
263
    for (int i = 0; i < p.total; ++i) {
449
255
      env_->StartThread(func, static_cast<void*>(&p));
450
255
    }
451
452
    // Wait for all threads to finish using Params
453
8
    mu.Lock();
454
20
    while (p.completed != p.total) {
455
12
      cv.Wait();
456
12
    }
457
8
    mu.Unlock();
458
459
8
    ASSERT_EQ(unref_count, 0);
460
461
    // Scrape all thread local data. No unref at thread
462
    // exit or ThreadLocalPtr destruction
463
8
    autovector<void*> ptrs;
464
8
    p.tls1.Scrape(&ptrs, nullptr);
465
8
    p.tls2->Scrape(&ptrs, nullptr);
466
8
    delete p.tls2;
467
    // Signal to exit
468
8
    mu.Lock();
469
8
    p.completed = 0;
470
8
    cv.SignalAll();
471
8
    mu.Unlock();
472
8
    env_->WaitForJoin();
473
474
8
    ASSERT_EQ(unref_count, 0);
475
8
  }
476
1
}
477
478
1
TEST_F(ThreadLocalTest, CompareAndSwap) {
479
1
  ThreadLocalPtr tls;
480
1
  ASSERT_TRUE(tls.Swap(reinterpret_cast<void*>(1)) == nullptr);
481
1
  void* expected = reinterpret_cast<void*>(1);
482
  // Swap in 2
483
1
  ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast<void*>(2), expected));
484
1
  expected = reinterpret_cast<void*>(100);
485
  // Fail Swap, still 2
486
1
  ASSERT_TRUE(!tls.CompareAndSwap(reinterpret_cast<void*>(2), expected));
487
1
  ASSERT_EQ(expected, reinterpret_cast<void*>(2));
488
  // Swap in 3
489
1
  expected = reinterpret_cast<void*>(2);
490
1
  ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast<void*>(3), expected));
491
1
  ASSERT_EQ(tls.Get(), reinterpret_cast<void*>(3));
492
1
}
493
494
namespace {
495
496
0
void* AccessThreadLocal(void* arg) {
497
0
  TEST_SYNC_POINT("AccessThreadLocal:Start");
498
0
  ThreadLocalPtr tlp;
499
0
  tlp.Reset(new std::string("hello RocksDB"));
500
0
  TEST_SYNC_POINT("AccessThreadLocal:End");
501
0
  return nullptr;
502
0
}
503
504
}  // namespace
505
506
// The following test is disabled as it requires manual steps to run it
507
// correctly.
508
//
509
// Currently we have no way to acess SyncPoint w/o ASAN error when the
510
// child thread dies after the main thread dies.  So if you manually enable
511
// this test and only see an ASAN error on SyncPoint, it means you pass the
512
// test.
513
0
TEST_F(ThreadLocalTest, DISABLED_MainThreadDiesFirst) {
514
0
  rocksdb::SyncPoint::GetInstance()->LoadDependency(
515
0
      {{"AccessThreadLocal:Start", "MainThreadDiesFirst:End"},
516
0
       {"PosixEnv::~PosixEnv():End", "AccessThreadLocal:End"}});
517
518
  // Triggers the initialization of singletons.
519
0
  Env::Default();
520
521
0
#ifndef ROCKSDB_LITE
522
0
  try {
523
0
#endif  // ROCKSDB_LITE
524
0
    std::thread th(&AccessThreadLocal, nullptr);
525
0
    th.detach();
526
0
    TEST_SYNC_POINT("MainThreadDiesFirst:End");
527
0
#ifndef ROCKSDB_LITE
528
0
  } catch (const std::system_error& ex) {
529
0
    std::cerr << "Start thread: " << ex.code() << std::endl;
530
0
    ASSERT_TRUE(false);
531
0
  }
532
0
#endif  // ROCKSDB_LITE
533
0
}
534
535
}  // namespace rocksdb
536
537
13.2k
int main(int argc, char** argv) {
538
13.2k
  ::testing::InitGoogleTest(&argc, argv);
539
13.2k
  return RUN_ALL_TESTS();
540
13.2k
}