YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/taskstream-test.cc
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
//
18
// The following only applies to changes made to this file as part of YugaByte development.
19
//
20
// Portions Copyright (c) YugaByte, Inc.
21
//
22
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
23
// in compliance with the License.  You may obtain a copy of the License at
24
//
25
// http://www.apache.org/licenses/LICENSE-2.0
26
//
27
// Unless required by applicable law or agreed to in writing, software distributed under the License
28
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
29
// or implied.  See the License for the specific language governing permissions and limitations
30
// under the License.
31
//
32
33
#include <atomic>
34
#include <chrono>
35
#include <functional>
36
#include <limits>
37
#include <memory>
38
#include <thread>
39
#include <vector>
40
41
#include <gflags/gflags.h>
42
#include <glog/logging.h>
43
#include <gtest/gtest.h>
44
45
#include "yb/gutil/atomicops.h"
46
47
#include "yb/util/blocking_queue.h"
48
#include "yb/util/countdown_latch.h"
49
#include "yb/util/scope_exit.h"
50
#include "yb/util/status.h"
51
#include "yb/util/taskstream.h"
52
#include "yb/util/test_macros.h"
53
#include "yb/util/threadpool.h"
54
55
using std::atomic;
56
using std::shared_ptr;
57
using std::string;
58
using std::thread;
59
using std::unique_ptr;
60
using std::vector;
61
62
using strings::Substitute;
63
DECLARE_bool(enable_tracing);
64
65
using std::shared_ptr;
66
67
namespace yb {
68
69
namespace {
70
71
static CHECKED_STATUS BuildMinMaxTestPool(
72
2
    int min_threads, int max_threads, std::unique_ptr<ThreadPool> *pool) {
73
2
  return ThreadPoolBuilder("test").set_min_threads(min_threads)
74
2
      .set_max_threads(max_threads)
75
2
      .Build(pool);
76
2
}
77
78
} // anonymous namespace
79
80
class TestTaskStream : public ::testing::Test {
81
 public:
82
2
  TestTaskStream() {
83
2
    FLAGS_enable_tracing = true;
84
2
  }
85
 protected:
86
  const int32_t kTaskstreamQueueMaxSize = 100000;
87
  const MonoDelta kTaskstreamQueueMaxWait = MonoDelta::FromMilliseconds(1000);
88
};
89
90
16
static void SimpleTaskStreamMethod(int* value, std::atomic<int32_t>* counter) {
91
16
  if (value == nullptr) {
92
3
    return;
93
3
  }
94
13
  int n = *value;
95
96
  while (n--) {
96
83
    (*counter)++;
97
83
  }
98
13
}
99
100
1
TEST_F(TestTaskStream, TestSimpleTaskStream) {
101
1
  using namespace std::placeholders;
102
1
  std::unique_ptr<ThreadPool> thread_pool;
103
1
  ASSERT_OK(BuildMinMaxTestPool(1, 1, &thread_pool));
104
105
1
  std::atomic<int32_t> counter(0);
106
1
  std::function<void (int*)> f1 = std::bind(&SimpleTaskStreamMethod, _1, &counter);
107
108
1
  TaskStream<int> taskStream(f1, thread_pool.get(), kTaskstreamQueueMaxSize,
109
1
                             kTaskstreamQueueMaxWait);
110
1
  ASSERT_OK(taskStream.Start());
111
1
  int a[4] = {10, 9, 8, 7};
112
5
  for (int i = 0; i < 4; i++) {
113
4
    ASSERT_OK(taskStream.Submit(&a[i]));
114
4
  }
115
1
  thread_pool->Wait();
116
1
  ASSERT_EQ(34, counter.load(std::memory_order_acquire));
117
1
  taskStream.Stop();
118
1
  thread_pool->Shutdown();
119
1
}
120
121
1
TEST_F(TestTaskStream, TestTwoTaskStreams) {
122
1
  using namespace std::placeholders;
123
1
  std::unique_ptr<ThreadPool> thread_pool;
124
1
  ASSERT_OK(BuildMinMaxTestPool(1, 1, &thread_pool));
125
126
1
  std::atomic<int32_t> counter0(0);
127
1
  std::atomic<int32_t> counter1(0);
128
1
  std::function<void (int*)> f0 = std::bind(&SimpleTaskStreamMethod, _1, &counter0);
129
1
  std::function<void (int*)> f1 = std::bind(&SimpleTaskStreamMethod, _1, &counter1);
130
131
1
  TaskStream<int> taskStream0(f0, thread_pool.get(), kTaskstreamQueueMaxSize,
132
1
                              kTaskstreamQueueMaxWait);
133
1
  TaskStream<int> taskStream1(f1, thread_pool.get(), kTaskstreamQueueMaxSize,
134
1
                              kTaskstreamQueueMaxWait);
135
1
  ASSERT_OK(taskStream0.Start());
136
1
  ASSERT_OK(taskStream1.Start());
137
1
  int a[4] = {10, 9, 8, 7};
138
5
  for (int i = 0; i < 4; i++) {
139
4
    ASSERT_OK(taskStream0.Submit(&a[i]));
140
4
  }
141
1
  int b[5] = {1, 2, 3, 4, 5};
142
6
  for (int i = 0; i < 5; i++) {
143
5
    ASSERT_OK(taskStream1.Submit(&b[i]));
144
5
  }
145
1
  thread_pool->Wait();
146
1
  ASSERT_EQ(34, counter0.load(std::memory_order_acquire));
147
1
  ASSERT_EQ(15, counter1.load(std::memory_order_acquire));
148
1
  taskStream0.Stop();
149
1
  taskStream1.Stop();
150
1
  thread_pool->Shutdown();
151
1
}
152
} // namespace yb