YugabyteDB (2.13.1.0-b60, 21121d69985fbf76aa6958d8f04a9bfa936293b5)

Coverage Report

Created: 2022-03-22 16:43

/Users/deen/code/yugabyte-db/src/yb/util/lockfree.h
Line
Count
Source
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_UTIL_LOCKFREE_H
15
#define YB_UTIL_LOCKFREE_H
16
17
#include <atomic>
18
19
#include <boost/atomic.hpp>
20
21
#include "yb/gutil/dynamic_annotations.h"
22
#include "yb/util/atomic.h" // For IsAcceptableAtomicImpl
23
24
namespace yb {
25
26
// Multi producer - singe consumer queue.
27
template <class T>
28
class MPSCQueue {
29
 public:
30
  // Thread safe - could be invoked from multiple threads.
31
6.87M
  void Push(T* value) {
32
6.87M
    T* old_head = push_head_.load(std::memory_order_acquire);
33
6.88M
    for (;;) {
34
6.88M
      SetNext(value, old_head);
35
6.88M
      if (push_head_.compare_exchange_weak(old_head, value, std::memory_order_acq_rel)) {
36
6.87M
        break;
37
6.87M
      }
38
6.88M
    }
39
6.87M
  }
yb::MPSCQueue<yb::tablet::OperationDriver>::Push(yb::tablet::OperationDriver*)
Line
Count
Source
31
5.04M
  void Push(T* value) {
32
5.04M
    T* old_head = push_head_.load(std::memory_order_acquire);
33
5.04M
    for (;;) {
34
5.04M
      SetNext(value, old_head);
35
5.04M
      if (push_head_.compare_exchange_weak(old_head, value, std::memory_order_acq_rel)) {
36
5.04M
        break;
37
5.04M
      }
38
5.04M
    }
39
5.04M
  }
yb::MPSCQueue<yb::rpc::StrandTask>::Push(yb::rpc::StrandTask*)
Line
Count
Source
31
1.66M
  void Push(T* value) {
32
1.66M
    T* old_head = push_head_.load(std::memory_order_acquire);
33
1.67M
    for (;;) {
34
1.67M
      SetNext(value, old_head);
35
1.67M
      if (push_head_.compare_exchange_weak(old_head, value, std::memory_order_acq_rel)) {
36
1.66M
        break;
37
1.66M
      }
38
1.67M
    }
39
1.66M
  }
yb::MPSCQueue<yb::client::internal::LookupData>::Push(yb::client::internal::LookupData*)
Line
Count
Source
31
160k
  void Push(T* value) {
32
160k
    T* old_head = push_head_.load(std::memory_order_acquire);
33
160k
    for (;;) {
34
160k
      SetNext(value, old_head);
35
160k
      if (push_head_.compare_exchange_weak(old_head, value, std::memory_order_acq_rel)) {
36
160k
        break;
37
160k
      }
38
160k
    }
39
160k
  }
Unexecuted instantiation: transaction_dump.cc:yb::MPSCQueue<yb::docdb::(anonymous namespace)::DumpEntry>::Push(yb::docdb::(anonymous namespace)::DumpEntry*)
40
41
  // Could be invoked only by one thread at time.
42
39.0M
  T* Pop() {
43
39.0M
    if (!pop_head_) {
44
38.6M
      PreparePop();
45
38.6M
    }
46
39.0M
    auto result = pop_head_;
47
39.0M
    if (!result) {
48
32.1M
      return nullptr;
49
32.1M
    }
50
6.87M
    pop_head_ = GetNext(result);
51
6.87M
    return result;
52
39.0M
  }
yb::MPSCQueue<yb::tablet::OperationDriver>::Pop()
Line
Count
Source
42
35.8M
  T* Pop() {
43
35.8M
    if (!pop_head_) {
44
35.6M
      PreparePop();
45
35.6M
    }
46
35.8M
    auto result = pop_head_;
47
35.8M
    if (!result) {
48
30.7M
      return nullptr;
49
30.7M
    }
50
5.04M
    pop_head_ = GetNext(result);
51
5.04M
    return result;
52
35.8M
  }
yb::MPSCQueue<yb::rpc::StrandTask>::Pop()
Line
Count
Source
42
3.02M
  T* Pop() {
43
3.02M
    if (!pop_head_) {
44
2.83M
      PreparePop();
45
2.83M
    }
46
3.02M
    auto result = pop_head_;
47
3.02M
    if (!result) {
48
1.35M
      return nullptr;
49
1.35M
    }
50
1.67M
    pop_head_ = GetNext(result);
51
1.67M
    return result;
52
3.02M
  }
yb::MPSCQueue<yb::client::internal::LookupData>::Pop()
Line
Count
Source
42
232k
  T* Pop() {
43
232k
    if (!pop_head_) {
44
142k
      PreparePop();
45
142k
    }
46
232k
    auto result = pop_head_;
47
232k
    if (!result) {
48
72.7k
      return nullptr;
49
72.7k
    }
50
160k
    pop_head_ = GetNext(result);
51
160k
    return result;
52
232k
  }
Unexecuted instantiation: transaction_dump.cc:yb::MPSCQueue<yb::docdb::(anonymous namespace)::DumpEntry>::Pop()
53
54
 private:
55
38.6M
  void PreparePop() {
56
38.6M
    T* current = push_head_.exchange(nullptr, std::memory_order_acq_rel);
57
    // Reverse original list.
58
38.6M
    T* prev = nullptr;
59
45.5M
    while (current) {
60
6.88M
      auto next = GetNext(current);
61
6.88M
      SetNext(current, prev);
62
6.88M
      prev = current;
63
6.88M
      current = next;
64
6.88M
    }
65
38.6M
    pop_head_ = prev;
66
38.6M
  }
yb::MPSCQueue<yb::tablet::OperationDriver>::PreparePop()
Line
Count
Source
55
35.6M
  void PreparePop() {
56
35.6M
    T* current = push_head_.exchange(nullptr, std::memory_order_acq_rel);
57
    // Reverse original list.
58
35.6M
    T* prev = nullptr;
59
40.7M
    while (current) {
60
5.05M
      auto next = GetNext(current);
61
5.05M
      SetNext(current, prev);
62
5.05M
      prev = current;
63
5.05M
      current = next;
64
5.05M
    }
65
35.6M
    pop_head_ = prev;
66
35.6M
  }
yb::MPSCQueue<yb::rpc::StrandTask>::PreparePop()
Line
Count
Source
55
2.83M
  void PreparePop() {
56
2.83M
    T* current = push_head_.exchange(nullptr, std::memory_order_acq_rel);
57
    // Reverse original list.
58
2.83M
    T* prev = nullptr;
59
4.50M
    while (current) {
60
1.67M
      auto next = GetNext(current);
61
1.67M
      SetNext(current, prev);
62
1.67M
      prev = current;
63
1.67M
      current = next;
64
1.67M
    }
65
2.83M
    pop_head_ = prev;
66
2.83M
  }
yb::MPSCQueue<yb::client::internal::LookupData>::PreparePop()
Line
Count
Source
55
142k
  void PreparePop() {
56
142k
    T* current = push_head_.exchange(nullptr, std::memory_order_acq_rel);
57
    // Reverse original list.
58
142k
    T* prev = nullptr;
59
302k
    while (current) {
60
160k
      auto next = GetNext(current);
61
160k
      SetNext(current, prev);
62
160k
      prev = current;
63
160k
      current = next;
64
160k
    }
65
142k
    pop_head_ = prev;
66
142k
  }
Unexecuted instantiation: transaction_dump.cc:yb::MPSCQueue<yb::docdb::(anonymous namespace)::DumpEntry>::PreparePop()
67
68
  // List of entries ready for pop, pop head points to the entry that should be returned first.
69
  T* pop_head_ = nullptr;
70
  // List of push entries, push head points to last pushed entry.
71
  std::atomic<T*> push_head_{nullptr};
72
};
73
74
template <class T>
75
class MPSCQueueEntry {
76
 public:
77
13.7M
  void SetNext(T* next) {
78
13.7M
    next_ = next;
79
13.7M
  }
yb::MPSCQueueEntry<yb::tablet::OperationDriver>::SetNext(yb::tablet::OperationDriver*)
Line
Count
Source
77
10.0M
  void SetNext(T* next) {
78
10.0M
    next_ = next;
79
10.0M
  }
yb::MPSCQueueEntry<yb::rpc::StrandTask>::SetNext(yb::rpc::StrandTask*)
Line
Count
Source
77
3.34M
  void SetNext(T* next) {
78
3.34M
    next_ = next;
79
3.34M
  }
yb::MPSCQueueEntry<yb::client::internal::LookupData>::SetNext(yb::client::internal::LookupData*)
Line
Count
Source
77
320k
  void SetNext(T* next) {
78
320k
    next_ = next;
79
320k
  }
stack_trace.cc:yb::MPSCQueueEntry<yb::(anonymous namespace)::ThreadStackEntry>::SetNext(yb::(anonymous namespace)::ThreadStackEntry*)
Line
Count
Source
77
22.5k
  void SetNext(T* next) {
78
22.5k
    next_ = next;
79
22.5k
  }
80
81
13.7M
  T* GetNext() const {
82
13.7M
    return next_;
83
13.7M
  }
yb::MPSCQueueEntry<yb::tablet::OperationDriver>::GetNext() const
Line
Count
Source
81
10.0M
  T* GetNext() const {
82
10.0M
    return next_;
83
10.0M
  }
yb::MPSCQueueEntry<yb::rpc::StrandTask>::GetNext() const
Line
Count
Source
81
3.34M
  T* GetNext() const {
82
3.34M
    return next_;
83
3.34M
  }
yb::MPSCQueueEntry<yb::client::internal::LookupData>::GetNext() const
Line
Count
Source
81
320k
  T* GetNext() const {
82
320k
    return next_;
83
320k
  }
stack_trace.cc:yb::MPSCQueueEntry<yb::(anonymous namespace)::ThreadStackEntry>::GetNext() const
Line
Count
Source
81
13.5k
  T* GetNext() const {
82
13.5k
    return next_;
83
13.5k
  }
84
85
 private:
86
  T* next_ = nullptr;
87
};
88
89
template <class T>
90
13.7M
void SetNext(MPSCQueueEntry<T>* entry, T* next) {
91
13.7M
  entry->SetNext(next);
92
13.7M
}
void yb::SetNext<yb::tablet::OperationDriver>(yb::MPSCQueueEntry<yb::tablet::OperationDriver>*, yb::tablet::OperationDriver*)
Line
Count
Source
90
10.0M
void SetNext(MPSCQueueEntry<T>* entry, T* next) {
91
10.0M
  entry->SetNext(next);
92
10.0M
}
void yb::SetNext<yb::rpc::StrandTask>(yb::MPSCQueueEntry<yb::rpc::StrandTask>*, yb::rpc::StrandTask*)
Line
Count
Source
90
3.34M
void SetNext(MPSCQueueEntry<T>* entry, T* next) {
91
3.34M
  entry->SetNext(next);
92
3.34M
}
void yb::SetNext<yb::client::internal::LookupData>(yb::MPSCQueueEntry<yb::client::internal::LookupData>*, yb::client::internal::LookupData*)
Line
Count
Source
90
320k
void SetNext(MPSCQueueEntry<T>* entry, T* next) {
91
320k
  entry->SetNext(next);
92
320k
}
stack_trace.cc:void yb::SetNext<yb::(anonymous namespace)::ThreadStackEntry>(yb::MPSCQueueEntry<yb::(anonymous namespace)::ThreadStackEntry>*, yb::(anonymous namespace)::ThreadStackEntry*)
Line
Count
Source
90
22.5k
void SetNext(MPSCQueueEntry<T>* entry, T* next) {
91
22.5k
  entry->SetNext(next);
92
22.5k
}
93
94
template <class T>
95
13.7M
T* GetNext(const MPSCQueueEntry<T>* entry) {
96
13.7M
  return entry->GetNext();
97
13.7M
}
yb::tablet::OperationDriver* yb::GetNext<yb::tablet::OperationDriver>(yb::MPSCQueueEntry<yb::tablet::OperationDriver> const*)
Line
Count
Source
95
10.0M
T* GetNext(const MPSCQueueEntry<T>* entry) {
96
10.0M
  return entry->GetNext();
97
10.0M
}
yb::rpc::StrandTask* yb::GetNext<yb::rpc::StrandTask>(yb::MPSCQueueEntry<yb::rpc::StrandTask> const*)
Line
Count
Source
95
3.34M
T* GetNext(const MPSCQueueEntry<T>* entry) {
96
3.34M
  return entry->GetNext();
97
3.34M
}
yb::client::internal::LookupData* yb::GetNext<yb::client::internal::LookupData>(yb::MPSCQueueEntry<yb::client::internal::LookupData> const*)
Line
Count
Source
95
320k
T* GetNext(const MPSCQueueEntry<T>* entry) {
96
320k
  return entry->GetNext();
97
320k
}
stack_trace.cc:yb::(anonymous namespace)::ThreadStackEntry* yb::GetNext<yb::(anonymous namespace)::ThreadStackEntry>(yb::MPSCQueueEntry<yb::(anonymous namespace)::ThreadStackEntry> const*)
Line
Count
Source
95
13.5k
T* GetNext(const MPSCQueueEntry<T>* entry) {
96
13.5k
  return entry->GetNext();
97
13.5k
}
98
99
// Intrusive stack implementation based on linked list.
100
template <class T>
101
class LockFreeStack {
102
 public:
103
65.4k
  LockFreeStack() {
104
65.4k
    CHECK(IsAcceptableAtomicImpl(head_));
105
65.4k
  }
106
107
22.5k
  void Push(T* value) {
108
22.5k
    Head old_head = head_.load(boost::memory_order_acquire);
109
22.5k
    for (;;) {
110
22.5k
      ANNOTATE_IGNORE_WRITES_BEGIN();
111
22.5k
      SetNext(value, old_head.pointer);
112
22.5k
      ANNOTATE_IGNORE_WRITES_END();
113
22.5k
      Head new_head{value, old_head.version + 1};
114
22.5k
      if (head_.compare_exchange_weak(old_head, new_head, boost::memory_order_acq_rel)) {
115
22.5k
        break;
116
22.5k
      }
117
22.5k
    }
118
22.5k
  }
119
120
20.3k
  T* Pop() {
121
20.3k
    Head old_head = head_.load(boost::memory_order_acquire);
122
20.3k
    for (;;) {
123
20.3k
      if (!old_head.pointer) {
124
6.79k
        break;
125
6.79k
      }
126
13.5k
      ANNOTATE_IGNORE_READS_BEGIN();
127
13.5k
      Head new_head{GetNext(old_head.pointer), old_head.version + 1};
128
13.5k
      ANNOTATE_IGNORE_READS_END();
129
13.5k
      if (head_.compare_exchange_weak(old_head, new_head, boost::memory_order_acq_rel)) {
130
13.5k
        break;
131
13.5k
      }
132
13.5k
    }
133
20.3k
    return old_head.pointer;
134
20.3k
  }
135
136
 private:
137
  // The clang compiler may generate code that requires 16-byte alignment
138
  // that causes SEGV if this struct is not aligned properly.
139
  struct Head {
140
    T* pointer;
141
    size_t version;
142
  } __attribute__((aligned(16)));
143
144
  boost::atomic<Head> head_{Head{nullptr, 0}};
145
};
146
147
} // namespace yb
148
149
#endif // YB_UTIL_LOCKFREE_H