YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/yb/util/lockfree.h
Line
Count
Source
1
// Copyright (c) YugaByte, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
4
// in compliance with the License.  You may obtain a copy of the License at
5
//
6
// http://www.apache.org/licenses/LICENSE-2.0
7
//
8
// Unless required by applicable law or agreed to in writing, software distributed under the License
9
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
10
// or implied.  See the License for the specific language governing permissions and limitations
11
// under the License.
12
//
13
14
#ifndef YB_UTIL_LOCKFREE_H
15
#define YB_UTIL_LOCKFREE_H
16
17
#include <atomic>
18
19
#include <boost/atomic.hpp>
20
21
#include "yb/gutil/dynamic_annotations.h"
22
#include "yb/util/atomic.h" // For IsAcceptableAtomicImpl
23
24
namespace yb {
25
26
// Multi producer - singe consumer queue.
27
template <class T>
28
class MPSCQueue {
29
 public:
30
  // Thread safe - could be invoked from multiple threads.
31
5.68M
  void Push(T* value) {
32
5.68M
    T* old_head = push_head_.load(std::memory_order_acquire);
33
9.78M
    for (;;) {
34
9.78M
      SetNext(value, old_head);
35
9.78M
      if (push_head_.compare_exchange_weak(old_head, value, std::memory_order_acq_rel)) {
36
5.74M
        break;
37
5.74M
      }
38
9.78M
    }
39
5.68M
  }
Unexecuted instantiation: _ZN2yb9MPSCQueueINS_6client10KeyToCheckEE4PushEPS2_
_ZN2yb9MPSCQueueINS_9TestEntryEE4PushEPS1_
Line
Count
Source
31
1.93M
  void Push(T* value) {
32
1.93M
    T* old_head = push_head_.load(std::memory_order_acquire);
33
6.02M
    for (;;) {
34
6.02M
      SetNext(value, old_head);
35
6.02M
      if (push_head_.compare_exchange_weak(old_head, value, std::memory_order_acq_rel)) {
36
1.97M
        break;
37
1.97M
      }
38
6.02M
    }
39
1.93M
  }
_ZN2yb9MPSCQueueINS_6tablet15OperationDriverEE4PushEPS2_
Line
Count
Source
31
2.69M
  void Push(T* value) {
32
2.69M
    T* old_head = push_head_.load(std::memory_order_acquire);
33
2.69M
    for (;;) {
34
2.69M
      SetNext(value, old_head);
35
2.69M
      if (push_head_.compare_exchange_weak(old_head, value, std::memory_order_acq_rel)) {
36
2.69M
        break;
37
2.69M
      }
38
2.69M
    }
39
2.69M
  }
_ZN2yb9MPSCQueueINS_6client8internal10LookupDataEE4PushEPS3_
Line
Count
Source
31
96.7k
  void Push(T* value) {
32
96.7k
    T* old_head = push_head_.load(std::memory_order_acquire);
33
97.2k
    for (;;) {
34
97.2k
      SetNext(value, old_head);
35
97.2k
      if (push_head_.compare_exchange_weak(old_head, value, std::memory_order_acq_rel)) {
36
96.9k
        break;
37
96.9k
      }
38
97.2k
    }
39
96.7k
  }
Unexecuted instantiation: transaction_dump.cc:_ZN2yb9MPSCQueueINS_5docdb12_GLOBAL__N_19DumpEntryEE4PushEPS3_
_ZN2yb9MPSCQueueINS_3rpc10StrandTaskEE4PushEPS2_
Line
Count
Source
31
966k
  void Push(T* value) {
32
966k
    T* old_head = push_head_.load(std::memory_order_acquire);
33
968k
    for (;;) {
34
968k
      SetNext(value, old_head);
35
968k
      if (push_head_.compare_exchange_weak(old_head, value, std::memory_order_acq_rel)) {
36
967k
        break;
37
967k
      }
38
968k
    }
39
966k
  }
40
41
  // Could be invoked only by one thread at time.
42
19.7M
  T* Pop() {
43
19.7M
    if (!pop_head_) {
44
18.3M
      PreparePop();
45
18.3M
    }
46
19.7M
    auto result = pop_head_;
47
19.7M
    if (!result) {
48
13.9M
      return nullptr;
49
13.9M
    }
50
5.76M
    pop_head_ = GetNext(result);
51
5.76M
    return result;
52
5.76M
  }
Unexecuted instantiation: _ZN2yb9MPSCQueueINS_6client10KeyToCheckEE3PopEv
_ZN2yb9MPSCQueueINS_9TestEntryEE3PopEv
Line
Count
Source
42
2.04M
  T* Pop() {
43
2.04M
    if (!pop_head_) {
44
803k
      PreparePop();
45
803k
    }
46
2.04M
    auto result = pop_head_;
47
2.04M
    if (!result) {
48
43.8k
      return nullptr;
49
43.8k
    }
50
2.00M
    pop_head_ = GetNext(result);
51
2.00M
    return result;
52
2.00M
  }
_ZN2yb9MPSCQueueINS_6tablet15OperationDriverEE3PopEv
Line
Count
Source
42
15.7M
  T* Pop() {
43
15.7M
    if (!pop_head_) {
44
15.6M
      PreparePop();
45
15.6M
    }
46
15.7M
    auto result = pop_head_;
47
15.7M
    if (!result) {
48
13.0M
      return nullptr;
49
13.0M
    }
50
2.70M
    pop_head_ = GetNext(result);
51
2.70M
    return result;
52
2.70M
  }
_ZN2yb9MPSCQueueINS_6client8internal10LookupDataEE3PopEv
Line
Count
Source
42
154k
  T* Pop() {
43
154k
    if (!pop_head_) {
44
114k
      PreparePop();
45
114k
    }
46
154k
    auto result = pop_head_;
47
154k
    if (!result) {
48
57.7k
      return nullptr;
49
57.7k
    }
50
96.5k
    pop_head_ = GetNext(result);
51
96.5k
    return result;
52
96.5k
  }
Unexecuted instantiation: transaction_dump.cc:_ZN2yb9MPSCQueueINS_5docdb12_GLOBAL__N_19DumpEntryEE3PopEv
_ZN2yb9MPSCQueueINS_3rpc10StrandTaskEE3PopEv
Line
Count
Source
42
1.78M
  T* Pop() {
43
1.78M
    if (!pop_head_) {
44
1.70M
      PreparePop();
45
1.70M
    }
46
1.78M
    auto result = pop_head_;
47
1.78M
    if (!result) {
48
818k
      return nullptr;
49
818k
    }
50
968k
    pop_head_ = GetNext(result);
51
968k
    return result;
52
968k
  }
53
54
 private:
55
18.3M
  void PreparePop() {
56
18.3M
    T* current = push_head_.exchange(nullptr, std::memory_order_acq_rel);
57
    // Reverse original list.
58
18.3M
    T* prev = nullptr;
59
24.0M
    while (current) {
60
5.76M
      auto next = GetNext(current);
61
5.76M
      SetNext(current, prev);
62
5.76M
      prev = current;
63
5.76M
      current = next;
64
5.76M
    }
65
18.3M
    pop_head_ = prev;
66
18.3M
  }
Unexecuted instantiation: _ZN2yb9MPSCQueueINS_6client10KeyToCheckEE10PreparePopEv
_ZN2yb9MPSCQueueINS_9TestEntryEE10PreparePopEv
Line
Count
Source
55
803k
  void PreparePop() {
56
803k
    T* current = push_head_.exchange(nullptr, std::memory_order_acq_rel);
57
    // Reverse original list.
58
803k
    T* prev = nullptr;
59
2.80M
    while (current) {
60
2.00M
      auto next = GetNext(current);
61
2.00M
      SetNext(current, prev);
62
2.00M
      prev = current;
63
2.00M
      current = next;
64
2.00M
    }
65
803k
    pop_head_ = prev;
66
803k
  }
_ZN2yb9MPSCQueueINS_6tablet15OperationDriverEE10PreparePopEv
Line
Count
Source
55
15.6M
  void PreparePop() {
56
15.6M
    T* current = push_head_.exchange(nullptr, std::memory_order_acq_rel);
57
    // Reverse original list.
58
15.6M
    T* prev = nullptr;
59
18.3M
    while (current) {
60
2.70M
      auto next = GetNext(current);
61
2.70M
      SetNext(current, prev);
62
2.70M
      prev = current;
63
2.70M
      current = next;
64
2.70M
    }
65
15.6M
    pop_head_ = prev;
66
15.6M
  }
_ZN2yb9MPSCQueueINS_6client8internal10LookupDataEE10PreparePopEv
Line
Count
Source
55
114k
  void PreparePop() {
56
114k
    T* current = push_head_.exchange(nullptr, std::memory_order_acq_rel);
57
    // Reverse original list.
58
114k
    T* prev = nullptr;
59
211k
    while (current) {
60
96.5k
      auto next = GetNext(current);
61
96.5k
      SetNext(current, prev);
62
96.5k
      prev = current;
63
96.5k
      current = next;
64
96.5k
    }
65
114k
    pop_head_ = prev;
66
114k
  }
Unexecuted instantiation: transaction_dump.cc:_ZN2yb9MPSCQueueINS_5docdb12_GLOBAL__N_19DumpEntryEE10PreparePopEv
_ZN2yb9MPSCQueueINS_3rpc10StrandTaskEE10PreparePopEv
Line
Count
Source
55
1.70M
  void PreparePop() {
56
1.70M
    T* current = push_head_.exchange(nullptr, std::memory_order_acq_rel);
57
    // Reverse original list.
58
1.70M
    T* prev = nullptr;
59
2.67M
    while (current) {
60
968k
      auto next = GetNext(current);
61
968k
      SetNext(current, prev);
62
968k
      prev = current;
63
968k
      current = next;
64
968k
    }
65
1.70M
    pop_head_ = prev;
66
1.70M
  }
67
68
  // List of entries ready for pop, pop head points to the entry that should be returned first.
69
  T* pop_head_ = nullptr;
70
  // List of push entries, push head points to last pushed entry.
71
  std::atomic<T*> push_head_{nullptr};
72
};
73
74
template <class T>
75
class MPSCQueueEntry {
76
 public:
77
20.7M
  void SetNext(T* next) {
78
20.7M
    next_ = next;
79
20.7M
  }
_ZN2yb14MPSCQueueEntryINS_9TestEntryEE7SetNextEPS1_
Line
Count
Source
77
5.93M
  void SetNext(T* next) {
78
5.93M
    next_ = next;
79
5.93M
  }
lockfree-test.cc:_ZN2yb14MPSCQueueEntryIZNS_23LockfreeTest_Stack_Test8TestBodyEvE5EntryE7SetNextEPS2_
Line
Count
Source
77
7.25M
  void SetNext(T* next) {
78
7.25M
    next_ = next;
79
7.25M
  }
_ZN2yb14MPSCQueueEntryINS_6tablet15OperationDriverEE7SetNextEPS2_
Line
Count
Source
77
5.39M
  void SetNext(T* next) {
78
5.39M
    next_ = next;
79
5.39M
  }
_ZN2yb14MPSCQueueEntryINS_6client8internal10LookupDataEE7SetNextEPS3_
Line
Count
Source
77
193k
  void SetNext(T* next) {
78
193k
    next_ = next;
79
193k
  }
stack_trace.cc:_ZN2yb14MPSCQueueEntryINS_12_GLOBAL__N_116ThreadStackEntryEE7SetNextEPS2_
Line
Count
Source
77
6.27k
  void SetNext(T* next) {
78
6.27k
    next_ = next;
79
6.27k
  }
_ZN2yb14MPSCQueueEntryINS_3rpc10StrandTaskEE7SetNextEPS2_
Line
Count
Source
77
1.93M
  void SetNext(T* next) {
78
1.93M
    next_ = next;
79
1.93M
  }
80
81
19.2M
  T* GetNext() const {
82
19.2M
    return next_;
83
19.2M
  }
_ZNK2yb14MPSCQueueEntryINS_9TestEntryEE7GetNextEv
Line
Count
Source
81
4.00M
  T* GetNext() const {
82
4.00M
    return next_;
83
4.00M
  }
lockfree-test.cc:_ZNK2yb14MPSCQueueEntryIZNS_23LockfreeTest_Stack_Test8TestBodyEvE5EntryE7GetNextEv
Line
Count
Source
81
7.76M
  T* GetNext() const {
82
7.76M
    return next_;
83
7.76M
  }
_ZNK2yb14MPSCQueueEntryINS_6tablet15OperationDriverEE7GetNextEv
Line
Count
Source
81
5.40M
  T* GetNext() const {
82
5.40M
    return next_;
83
5.40M
  }
_ZNK2yb14MPSCQueueEntryINS_6client8internal10LookupDataEE7GetNextEv
Line
Count
Source
81
193k
  T* GetNext() const {
82
193k
    return next_;
83
193k
  }
stack_trace.cc:_ZNK2yb14MPSCQueueEntryINS_12_GLOBAL__N_116ThreadStackEntryEE7GetNextEv
Line
Count
Source
81
4.08k
  T* GetNext() const {
82
4.08k
    return next_;
83
4.08k
  }
_ZNK2yb14MPSCQueueEntryINS_3rpc10StrandTaskEE7GetNextEv
Line
Count
Source
81
1.93M
  T* GetNext() const {
82
1.93M
    return next_;
83
1.93M
  }
84
85
 private:
86
  T* next_ = nullptr;
87
};
88
89
template <class T>
90
20.7M
void SetNext(MPSCQueueEntry<T>* entry, T* next) {
91
20.7M
  entry->SetNext(next);
92
20.7M
}
_ZN2yb7SetNextINS_9TestEntryEEEvPNS_14MPSCQueueEntryIT_EEPS3_
Line
Count
Source
90
5.90M
void SetNext(MPSCQueueEntry<T>* entry, T* next) {
91
5.90M
  entry->SetNext(next);
92
5.90M
}
lockfree-test.cc:_ZN2yb7SetNextIZNS_23LockfreeTest_Stack_Test8TestBodyEvE5EntryEEvPNS_14MPSCQueueEntryIT_EEPS4_
Line
Count
Source
90
7.26M
void SetNext(MPSCQueueEntry<T>* entry, T* next) {
91
7.26M
  entry->SetNext(next);
92
7.26M
}
_ZN2yb7SetNextINS_6tablet15OperationDriverEEEvPNS_14MPSCQueueEntryIT_EEPS4_
Line
Count
Source
90
5.39M
void SetNext(MPSCQueueEntry<T>* entry, T* next) {
91
5.39M
  entry->SetNext(next);
92
5.39M
}
_ZN2yb7SetNextINS_6client8internal10LookupDataEEEvPNS_14MPSCQueueEntryIT_EEPS5_
Line
Count
Source
90
193k
void SetNext(MPSCQueueEntry<T>* entry, T* next) {
91
193k
  entry->SetNext(next);
92
193k
}
stack_trace.cc:_ZN2yb7SetNextINS_12_GLOBAL__N_116ThreadStackEntryEEEvPNS_14MPSCQueueEntryIT_EEPS4_
Line
Count
Source
90
6.27k
void SetNext(MPSCQueueEntry<T>* entry, T* next) {
91
6.27k
  entry->SetNext(next);
92
6.27k
}
_ZN2yb7SetNextINS_3rpc10StrandTaskEEEvPNS_14MPSCQueueEntryIT_EEPS4_
Line
Count
Source
90
1.93M
void SetNext(MPSCQueueEntry<T>* entry, T* next) {
91
1.93M
  entry->SetNext(next);
92
1.93M
}
93
94
template <class T>
95
19.2M
T* GetNext(const MPSCQueueEntry<T>* entry) {
96
19.2M
  return entry->GetNext();
97
19.2M
}
_ZN2yb7GetNextINS_9TestEntryEEEPT_PKNS_14MPSCQueueEntryIS2_EE
Line
Count
Source
95
4.00M
T* GetNext(const MPSCQueueEntry<T>* entry) {
96
4.00M
  return entry->GetNext();
97
4.00M
}
lockfree-test.cc:_ZN2yb7GetNextIZNS_23LockfreeTest_Stack_Test8TestBodyEvE5EntryEEPT_PKNS_14MPSCQueueEntryIS3_EE
Line
Count
Source
95
7.66M
T* GetNext(const MPSCQueueEntry<T>* entry) {
96
7.66M
  return entry->GetNext();
97
7.66M
}
_ZN2yb7GetNextINS_6tablet15OperationDriverEEEPT_PKNS_14MPSCQueueEntryIS3_EE
Line
Count
Source
95
5.40M
T* GetNext(const MPSCQueueEntry<T>* entry) {
96
5.40M
  return entry->GetNext();
97
5.40M
}
_ZN2yb7GetNextINS_6client8internal10LookupDataEEEPT_PKNS_14MPSCQueueEntryIS4_EE
Line
Count
Source
95
193k
T* GetNext(const MPSCQueueEntry<T>* entry) {
96
193k
  return entry->GetNext();
97
193k
}
stack_trace.cc:_ZN2yb7GetNextINS_12_GLOBAL__N_116ThreadStackEntryEEEPT_PKNS_14MPSCQueueEntryIS3_EE
Line
Count
Source
95
4.08k
T* GetNext(const MPSCQueueEntry<T>* entry) {
96
4.08k
  return entry->GetNext();
97
4.08k
}
_ZN2yb7GetNextINS_3rpc10StrandTaskEEEPT_PKNS_14MPSCQueueEntryIS3_EE
Line
Count
Source
95
1.93M
T* GetNext(const MPSCQueueEntry<T>* entry) {
96
1.93M
  return entry->GetNext();
97
1.93M
}
98
99
// Intrusive stack implementation based on linked list.
100
template <class T>
101
class LockFreeStack {
102
 public:
103
41.3k
  LockFreeStack() {
104
41.3k
    CHECK(IsAcceptableAtomicImpl(head_));
105
41.3k
  }
lockfree-test.cc:_ZN2yb13LockFreeStackIZNS_23LockfreeTest_Stack_Test8TestBodyEvE5EntryEC2Ev
Line
Count
Source
103
1
  LockFreeStack() {
104
1
    CHECK(IsAcceptableAtomicImpl(head_));
105
1
  }
stack_trace.cc:_ZN2yb13LockFreeStackINS_12_GLOBAL__N_116ThreadStackEntryEEC2Ev
Line
Count
Source
103
41.3k
  LockFreeStack() {
104
41.3k
    CHECK(IsAcceptableAtomicImpl(head_));
105
41.3k
  }
106
107
3.27M
  void Push(T* value) {
108
3.27M
    Head old_head = head_.load(boost::memory_order_acquire);
109
7.31M
    for (;;) {
110
7.31M
      ANNOTATE_IGNORE_WRITES_BEGIN();
111
7.31M
      SetNext(value, old_head.pointer);
112
7.31M
      ANNOTATE_IGNORE_WRITES_END();
113
7.31M
      Head new_head{value, old_head.version + 1};
114
7.31M
      if (head_.compare_exchange_weak(old_head, new_head, boost::memory_order_acq_rel)) {
115
3.29M
        break;
116
3.29M
      }
117
7.31M
    }
118
3.27M
  }
lockfree-test.cc:_ZN2yb13LockFreeStackIZNS_23LockfreeTest_Stack_Test8TestBodyEvE5EntryE4PushEPS2_
Line
Count
Source
107
3.27M
  void Push(T* value) {
108
3.27M
    Head old_head = head_.load(boost::memory_order_acquire);
109
7.31M
    for (;;) {
110
7.31M
      ANNOTATE_IGNORE_WRITES_BEGIN();
111
7.31M
      SetNext(value, old_head.pointer);
112
7.31M
      ANNOTATE_IGNORE_WRITES_END();
113
7.31M
      Head new_head{value, old_head.version + 1};
114
7.31M
      if (head_.compare_exchange_weak(old_head, new_head, boost::memory_order_acq_rel)) {
115
3.28M
        break;
116
3.28M
      }
117
7.31M
    }
118
3.27M
  }
stack_trace.cc:_ZN2yb13LockFreeStackINS_12_GLOBAL__N_116ThreadStackEntryEE4PushEPS2_
Line
Count
Source
107
6.27k
  void Push(T* value) {
108
6.27k
    Head old_head = head_.load(boost::memory_order_acquire);
109
6.27k
    for (;;) {
110
6.27k
      ANNOTATE_IGNORE_WRITES_BEGIN();
111
6.27k
      SetNext(value, old_head.pointer);
112
6.27k
      ANNOTATE_IGNORE_WRITES_END();
113
6.27k
      Head new_head{value, old_head.version + 1};
114
6.27k
      if (head_.compare_exchange_weak(old_head, new_head, boost::memory_order_acq_rel)) {
115
6.27k
        break;
116
6.27k
      }
117
6.27k
    }
118
6.27k
  }
119
120
3.45M
  T* Pop() {
121
3.45M
    Head old_head = head_.load(boost::memory_order_acquire);
122
8.01M
    for (;;) {
123
8.01M
      if (!old_head.pointer) {
124
166k
        break;
125
166k
      }
126
7.85M
      ANNOTATE_IGNORE_READS_BEGIN();
127
7.85M
      Head new_head{GetNext(old_head.pointer), old_head.version + 1};
128
7.85M
      ANNOTATE_IGNORE_READS_END();
129
7.85M
      if (head_.compare_exchange_weak(old_head, new_head, boost::memory_order_acq_rel)) {
130
3.28M
        break;
131
3.28M
      }
132
7.85M
    }
133
3.45M
    return old_head.pointer;
134
3.45M
  }
lockfree-test.cc:_ZN2yb13LockFreeStackIZNS_23LockfreeTest_Stack_Test8TestBodyEvE5EntryE3PopEv
Line
Count
Source
120
3.44M
  T* Pop() {
121
3.44M
    Head old_head = head_.load(boost::memory_order_acquire);
122
8.01M
    for (;;) {
123
8.01M
      if (!old_head.pointer) {
124
164k
        break;
125
164k
      }
126
7.84M
      ANNOTATE_IGNORE_READS_BEGIN();
127
7.84M
      Head new_head{GetNext(old_head.pointer), old_head.version + 1};
128
7.84M
      ANNOTATE_IGNORE_READS_END();
129
7.84M
      if (head_.compare_exchange_weak(old_head, new_head, boost::memory_order_acq_rel)) {
130
3.28M
        break;
131
3.28M
      }
132
7.84M
    }
133
3.44M
    return old_head.pointer;
134
3.44M
  }
stack_trace.cc:_ZN2yb13LockFreeStackINS_12_GLOBAL__N_116ThreadStackEntryEE3PopEv
Line
Count
Source
120
6.12k
  T* Pop() {
121
6.12k
    Head old_head = head_.load(boost::memory_order_acquire);
122
6.12k
    for (;;) {
123
6.12k
      if (!old_head.pointer) {
124
2.04k
        break;
125
2.04k
      }
126
4.08k
      ANNOTATE_IGNORE_READS_BEGIN();
127
4.08k
      Head new_head{GetNext(old_head.pointer), old_head.version + 1};
128
4.08k
      ANNOTATE_IGNORE_READS_END();
129
4.08k
      if (head_.compare_exchange_weak(old_head, new_head, boost::memory_order_acq_rel)) {
130
4.08k
        break;
131
4.08k
      }
132
4.08k
    }
133
6.12k
    return old_head.pointer;
134
6.12k
  }
135
136
 private:
137
  // The clang compiler may generate code that requires 16-byte alignment
138
  // that causes SEGV if this struct is not aligned properly.
139
  struct Head {
140
    T* pointer;
141
    size_t version;
142
  } __attribute__((aligned(16)));
143
144
  boost::atomic<Head> head_{Head{nullptr, 0}};
145
};
146
147
} // namespace yb
148
149
#endif // YB_UTIL_LOCKFREE_H