/Users/deen/code/yugabyte-db/src/yb/util/taskstream-test.cc
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | // |
18 | | // The following only applies to changes made to this file as part of YugaByte development. |
19 | | // |
20 | | // Portions Copyright (c) YugaByte, Inc. |
21 | | // |
22 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
23 | | // in compliance with the License. You may obtain a copy of the License at |
24 | | // |
25 | | // http://www.apache.org/licenses/LICENSE-2.0 |
26 | | // |
27 | | // Unless required by applicable law or agreed to in writing, software distributed under the License |
28 | | // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
29 | | // or implied. See the License for the specific language governing permissions and limitations |
30 | | // under the License. |
31 | | // |
32 | | |
33 | | #include <atomic> |
34 | | #include <chrono> |
35 | | #include <functional> |
36 | | #include <limits> |
37 | | #include <memory> |
38 | | #include <thread> |
39 | | #include <vector> |
40 | | |
41 | | #include <gflags/gflags.h> |
42 | | #include <glog/logging.h> |
43 | | #include <gtest/gtest.h> |
44 | | |
45 | | #include "yb/gutil/atomicops.h" |
46 | | |
47 | | #include "yb/util/blocking_queue.h" |
48 | | #include "yb/util/countdown_latch.h" |
49 | | #include "yb/util/scope_exit.h" |
50 | | #include "yb/util/status.h" |
51 | | #include "yb/util/taskstream.h" |
52 | | #include "yb/util/test_macros.h" |
53 | | #include "yb/util/threadpool.h" |
54 | | |
55 | | using std::atomic; |
56 | | using std::shared_ptr; |
57 | | using std::string; |
58 | | using std::thread; |
59 | | using std::unique_ptr; |
60 | | using std::vector; |
61 | | |
62 | | using strings::Substitute; |
63 | | DECLARE_bool(enable_tracing); |
64 | | |
65 | | using std::shared_ptr; |
66 | | |
67 | | namespace yb { |
68 | | |
69 | | namespace { |
70 | | |
71 | | static CHECKED_STATUS BuildMinMaxTestPool( |
72 | 2 | int min_threads, int max_threads, std::unique_ptr<ThreadPool> *pool) { |
73 | 2 | return ThreadPoolBuilder("test").set_min_threads(min_threads) |
74 | 2 | .set_max_threads(max_threads) |
75 | 2 | .Build(pool); |
76 | 2 | } |
77 | | |
78 | | } // anonymous namespace |
79 | | |
80 | | class TestTaskStream : public ::testing::Test { |
81 | | public: |
82 | 2 | TestTaskStream() { |
83 | 2 | FLAGS_enable_tracing = true; |
84 | 2 | } |
85 | | protected: |
86 | | const int32_t kTaskstreamQueueMaxSize = 100000; |
87 | | const MonoDelta kTaskstreamQueueMaxWait = MonoDelta::FromMilliseconds(1000); |
88 | | }; |
89 | | |
90 | 16 | static void SimpleTaskStreamMethod(int* value, std::atomic<int32_t>* counter) { |
91 | 16 | if (value == nullptr) { |
92 | 3 | return; |
93 | 3 | } |
94 | 13 | int n = *value; |
95 | 96 | while (n--) { |
96 | 83 | (*counter)++; |
97 | 83 | } |
98 | 13 | } |
99 | | |
100 | 1 | TEST_F(TestTaskStream, TestSimpleTaskStream) { |
101 | 1 | using namespace std::placeholders; |
102 | 1 | std::unique_ptr<ThreadPool> thread_pool; |
103 | 1 | ASSERT_OK(BuildMinMaxTestPool(1, 1, &thread_pool)); |
104 | | |
105 | 1 | std::atomic<int32_t> counter(0); |
106 | 1 | std::function<void (int*)> f1 = std::bind(&SimpleTaskStreamMethod, _1, &counter); |
107 | | |
108 | 1 | TaskStream<int> taskStream(f1, thread_pool.get(), kTaskstreamQueueMaxSize, |
109 | 1 | kTaskstreamQueueMaxWait); |
110 | 1 | ASSERT_OK(taskStream.Start()); |
111 | 1 | int a[4] = {10, 9, 8, 7}; |
112 | 5 | for (int i = 0; i < 4; i++) { |
113 | 4 | ASSERT_OK(taskStream.Submit(&a[i])); |
114 | 4 | } |
115 | 1 | thread_pool->Wait(); |
116 | 1 | ASSERT_EQ(34, counter.load(std::memory_order_acquire)); |
117 | 1 | taskStream.Stop(); |
118 | 1 | thread_pool->Shutdown(); |
119 | 1 | } |
120 | | |
121 | 1 | TEST_F(TestTaskStream, TestTwoTaskStreams) { |
122 | 1 | using namespace std::placeholders; |
123 | 1 | std::unique_ptr<ThreadPool> thread_pool; |
124 | 1 | ASSERT_OK(BuildMinMaxTestPool(1, 1, &thread_pool)); |
125 | | |
126 | 1 | std::atomic<int32_t> counter0(0); |
127 | 1 | std::atomic<int32_t> counter1(0); |
128 | 1 | std::function<void (int*)> f0 = std::bind(&SimpleTaskStreamMethod, _1, &counter0); |
129 | 1 | std::function<void (int*)> f1 = std::bind(&SimpleTaskStreamMethod, _1, &counter1); |
130 | | |
131 | 1 | TaskStream<int> taskStream0(f0, thread_pool.get(), kTaskstreamQueueMaxSize, |
132 | 1 | kTaskstreamQueueMaxWait); |
133 | 1 | TaskStream<int> taskStream1(f1, thread_pool.get(), kTaskstreamQueueMaxSize, |
134 | 1 | kTaskstreamQueueMaxWait); |
135 | 1 | ASSERT_OK(taskStream0.Start()); |
136 | 1 | ASSERT_OK(taskStream1.Start()); |
137 | 1 | int a[4] = {10, 9, 8, 7}; |
138 | 5 | for (int i = 0; i < 4; i++) { |
139 | 4 | ASSERT_OK(taskStream0.Submit(&a[i])); |
140 | 4 | } |
141 | 1 | int b[5] = {1, 2, 3, 4, 5}; |
142 | 6 | for (int i = 0; i < 5; i++) { |
143 | 5 | ASSERT_OK(taskStream1.Submit(&b[i])); |
144 | 5 | } |
145 | 1 | thread_pool->Wait(); |
146 | 1 | ASSERT_EQ(34, counter0.load(std::memory_order_acquire)); |
147 | 1 | ASSERT_EQ(15, counter1.load(std::memory_order_acquire)); |
148 | 1 | taskStream0.Stop(); |
149 | 1 | taskStream1.Stop(); |
150 | 1 | thread_pool->Shutdown(); |
151 | 1 | } |
152 | | } // namespace yb |