/Users/deen/code/yugabyte-db/src/postgres/src/backend/storage/ipc/barrier.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * barrier.c |
4 | | * Barriers for synchronizing cooperating processes. |
5 | | * |
6 | | * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group |
7 | | * Portions Copyright (c) 1994, Regents of the University of California |
8 | | * |
9 | | * From Wikipedia[1]: "In parallel computing, a barrier is a type of |
10 | | * synchronization method. A barrier for a group of threads or processes in |
11 | | * the source code means any thread/process must stop at this point and cannot |
12 | | * proceed until all other threads/processes reach this barrier." |
13 | | * |
14 | | * This implementation of barriers allows for static sets of participants |
15 | | * known up front, or dynamic sets of participants which processes can join or |
16 | | * leave at any time. In the dynamic case, a phase number can be used to |
17 | | * track progress through a parallel algorithm, and may be necessary to |
18 | | * synchronize with the current phase of a multi-phase algorithm when a new |
19 | | * participant joins. In the static case, the phase number is used |
20 | | * internally, but it isn't strictly necessary for client code to access it |
21 | | * because the phase can only advance when the declared number of participants |
22 | | * reaches the barrier, so client code should be in no doubt about the current |
23 | | * phase of computation at all times. |
24 | | * |
25 | | * Consider a parallel algorithm that involves separate phases of computation |
26 | | * A, B and C where the output of each phase is needed before the next phase |
27 | | * can begin. |
28 | | * |
29 | | * In the case of a static barrier initialized with 4 participants, each |
30 | | * participant works on phase A, then calls BarrierArriveAndWait to wait until |
31 | | * all 4 participants have reached that point. When BarrierArriveAndWait |
32 | | * returns control, each participant can work on B, and so on. Because the |
33 | | * barrier knows how many participants to expect, the phases of computation |
34 | | * don't need labels or numbers, since each process's program counter implies |
35 | | * the current phase. Even if some of the processes are slow to start up and |
36 | | * begin running phase A, the other participants are expecting them and will |
37 | | * patiently wait at the barrier. The code could be written as follows: |
38 | | * |
39 | | * perform_a(); |
40 | | * BarrierArriveAndWait(&barrier, ...); |
41 | | * perform_b(); |
42 | | * BarrierArriveAndWait(&barrier, ...); |
43 | | * perform_c(); |
44 | | * BarrierArriveAndWait(&barrier, ...); |
45 | | * |
46 | | * If the number of participants is not known up front, then a dynamic barrier |
47 | | * is needed and the number should be set to zero at initialization. New |
48 | | * complications arise because the number necessarily changes over time as |
49 | | * participants attach and detach, and therefore phases B, C or even the end |
50 | | * of processing may be reached before any given participant has started |
51 | | * running and attached. Therefore the client code must perform an initial |
52 | | * test of the phase number after attaching, because it needs to find out |
53 | | * which phase of the algorithm has been reached by any participants that are |
54 | | * already attached in order to synchronize with that work. Once the program |
55 | | * counter or some other representation of current progress is synchronized |
56 | | * with the barrier's phase, normal control flow can be used just as in the |
57 | | * static case. Our example could be written using a switch statement with |
58 | | * cases that fall-through, as follows: |
59 | | * |
60 | | * phase = BarrierAttach(&barrier); |
61 | | * switch (phase) |
62 | | * { |
63 | | * case PHASE_A: |
64 | | * perform_a(); |
65 | | * BarrierArriveAndWait(&barrier, ...); |
66 | | * case PHASE_B: |
67 | | * perform_b(); |
68 | | * BarrierArriveAndWait(&barrier, ...); |
69 | | * case PHASE_C: |
70 | | * perform_c(); |
71 | | * BarrierArriveAndWait(&barrier, ...); |
72 | | * } |
73 | | * BarrierDetach(&barrier); |
74 | | * |
75 | | * Static barriers behave similarly to POSIX's pthread_barrier_t. Dynamic |
76 | | * barriers behave similarly to Java's java.util.concurrent.Phaser. |
77 | | * |
78 | | * [1] https://en.wikipedia.org/wiki/Barrier_(computer_science) |
79 | | * |
80 | | * IDENTIFICATION |
81 | | * src/backend/storage/ipc/barrier.c |
82 | | * |
83 | | *------------------------------------------------------------------------- |
84 | | */ |
85 | | |
86 | | #include "postgres.h" |
87 | | #include "storage/barrier.h" |
88 | | |
89 | | static inline bool BarrierDetachImpl(Barrier *barrier, bool arrive); |
90 | | |
91 | | /* |
92 | | * Initialize this barrier. To use a static party size, provide the number of |
93 | | * participants to wait for at each phase indicating that that number of |
94 | | * backends is implicitly attached. To use a dynamic party size, specify zero |
95 | | * here and then use BarrierAttach() and |
96 | | * BarrierDetach()/BarrierArriveAndDetach() to register and deregister |
97 | | * participants explicitly. |
98 | | */ |
99 | | void |
100 | | BarrierInit(Barrier *barrier, int participants) |
101 | 0 | { |
102 | 0 | SpinLockInit(&barrier->mutex); |
103 | 0 | barrier->participants = participants; |
104 | 0 | barrier->arrived = 0; |
105 | 0 | barrier->phase = 0; |
106 | 0 | barrier->elected = 0; |
107 | 0 | barrier->static_party = participants > 0; |
108 | 0 | ConditionVariableInit(&barrier->condition_variable); |
109 | 0 | } |
110 | | |
111 | | /* |
112 | | * Arrive at this barrier, wait for all other attached participants to arrive |
113 | | * too and then return. Increments the current phase. The caller must be |
114 | | * attached. |
115 | | * |
116 | | * While waiting, pg_stat_activity shows a wait_event_class and wait_event |
117 | | * controlled by the wait_event_info passed in, which should be a value from |
118 | | * from one of the WaitEventXXX enums defined in pgstat.h. |
119 | | * |
120 | | * Return true in one arbitrarily chosen participant. Return false in all |
121 | | * others. The return code can be used to elect one participant to execute a |
122 | | * phase of work that must be done serially while other participants wait. |
123 | | */ |
124 | | bool |
125 | | BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info) |
126 | 0 | { |
127 | 0 | bool release = false; |
128 | 0 | bool elected; |
129 | 0 | int start_phase; |
130 | 0 | int next_phase; |
131 | |
|
132 | 0 | SpinLockAcquire(&barrier->mutex); |
133 | 0 | start_phase = barrier->phase; |
134 | 0 | next_phase = start_phase + 1; |
135 | 0 | ++barrier->arrived; |
136 | 0 | if (barrier->arrived == barrier->participants) |
137 | 0 | { |
138 | 0 | release = true; |
139 | 0 | barrier->arrived = 0; |
140 | 0 | barrier->phase = next_phase; |
141 | 0 | barrier->elected = next_phase; |
142 | 0 | } |
143 | 0 | SpinLockRelease(&barrier->mutex); |
144 | | |
145 | | /* |
146 | | * If we were the last expected participant to arrive, we can release our |
147 | | * peers and return true to indicate that this backend has been elected to |
148 | | * perform any serial work. |
149 | | */ |
150 | 0 | if (release) |
151 | 0 | { |
152 | 0 | ConditionVariableBroadcast(&barrier->condition_variable); |
153 | |
|
154 | 0 | return true; |
155 | 0 | } |
156 | | |
157 | | /* |
158 | | * Otherwise we have to wait for the last participant to arrive and |
159 | | * advance the phase. |
160 | | */ |
161 | 0 | elected = false; |
162 | 0 | ConditionVariablePrepareToSleep(&barrier->condition_variable); |
163 | 0 | for (;;) |
164 | 0 | { |
165 | | /* |
166 | | * We know that phase must either be start_phase, indicating that we |
167 | | * need to keep waiting, or next_phase, indicating that the last |
168 | | * participant that we were waiting for has either arrived or detached |
169 | | * so that the next phase has begun. The phase cannot advance any |
170 | | * further than that without this backend's participation, because |
171 | | * this backend is attached. |
172 | | */ |
173 | 0 | SpinLockAcquire(&barrier->mutex); |
174 | 0 | Assert(barrier->phase == start_phase || barrier->phase == next_phase); |
175 | 0 | release = barrier->phase == next_phase; |
176 | 0 | if (release && barrier->elected != next_phase) |
177 | 0 | { |
178 | | /* |
179 | | * Usually the backend that arrives last and releases the other |
180 | | * backends is elected to return true (see above), so that it can |
181 | | * begin processing serial work while it has a CPU timeslice. |
182 | | * However, if the barrier advanced because someone detached, then |
183 | | * one of the backends that is awoken will need to be elected. |
184 | | */ |
185 | 0 | barrier->elected = barrier->phase; |
186 | 0 | elected = true; |
187 | 0 | } |
188 | 0 | SpinLockRelease(&barrier->mutex); |
189 | 0 | if (release) |
190 | 0 | break; |
191 | 0 | ConditionVariableSleep(&barrier->condition_variable, wait_event_info); |
192 | 0 | } |
193 | 0 | ConditionVariableCancelSleep(); |
194 | |
|
195 | 0 | return elected; |
196 | 0 | } |
197 | | |
198 | | /* |
199 | | * Arrive at this barrier, but detach rather than waiting. Returns true if |
200 | | * the caller was the last to detach. |
201 | | */ |
202 | | bool |
203 | | BarrierArriveAndDetach(Barrier *barrier) |
204 | 0 | { |
205 | 0 | return BarrierDetachImpl(barrier, true); |
206 | 0 | } |
207 | | |
208 | | /* |
209 | | * Attach to a barrier. All waiting participants will now wait for this |
210 | | * participant to call BarrierArriveAndWait(), BarrierDetach() or |
211 | | * BarrierArriveAndDetach(). Return the current phase. |
212 | | */ |
213 | | int |
214 | | BarrierAttach(Barrier *barrier) |
215 | 0 | { |
216 | 0 | int phase; |
217 | |
|
218 | 0 | Assert(!barrier->static_party); |
219 | |
|
220 | 0 | SpinLockAcquire(&barrier->mutex); |
221 | 0 | ++barrier->participants; |
222 | 0 | phase = barrier->phase; |
223 | 0 | SpinLockRelease(&barrier->mutex); |
224 | |
|
225 | 0 | return phase; |
226 | 0 | } |
227 | | |
228 | | /* |
229 | | * Detach from a barrier. This may release other waiters from BarrierWait and |
230 | | * advance the phase if they were only waiting for this backend. Return true |
231 | | * if this participant was the last to detach. |
232 | | */ |
233 | | bool |
234 | | BarrierDetach(Barrier *barrier) |
235 | 0 | { |
236 | 0 | return BarrierDetachImpl(barrier, false); |
237 | 0 | } |
238 | | |
239 | | /* |
240 | | * Return the current phase of a barrier. The caller must be attached. |
241 | | */ |
242 | | int |
243 | | BarrierPhase(Barrier *barrier) |
244 | 0 | { |
245 | | /* |
246 | | * It is OK to read barrier->phase without locking, because it can't |
247 | | * change without us (we are attached to it), and we executed a memory |
248 | | * barrier when we either attached or participated in changing it last |
249 | | * time. |
250 | | */ |
251 | 0 | return barrier->phase; |
252 | 0 | } |
253 | | |
254 | | /* |
255 | | * Return an instantaneous snapshot of the number of participants currently |
256 | | * attached to this barrier. For debugging purposes only. |
257 | | */ |
258 | | int |
259 | | BarrierParticipants(Barrier *barrier) |
260 | 0 | { |
261 | 0 | int participants; |
262 | |
|
263 | 0 | SpinLockAcquire(&barrier->mutex); |
264 | 0 | participants = barrier->participants; |
265 | 0 | SpinLockRelease(&barrier->mutex); |
266 | |
|
267 | 0 | return participants; |
268 | 0 | } |
269 | | |
270 | | /* |
271 | | * Detach from a barrier. If 'arrive' is true then also increment the phase |
272 | | * if there are no other participants. If there are other participants |
273 | | * waiting, then the phase will be advanced and they'll be released if they |
274 | | * were only waiting for the caller. Return true if this participant was the |
275 | | * last to detach. |
276 | | */ |
277 | | static inline bool |
278 | | BarrierDetachImpl(Barrier *barrier, bool arrive) |
279 | 0 | { |
280 | 0 | bool release; |
281 | 0 | bool last; |
282 | |
|
283 | 0 | Assert(!barrier->static_party); |
284 | |
|
285 | 0 | SpinLockAcquire(&barrier->mutex); |
286 | 0 | Assert(barrier->participants > 0); |
287 | 0 | --barrier->participants; |
288 | | |
289 | | /* |
290 | | * If any other participants are waiting and we were the last participant |
291 | | * waited for, release them. If no other participants are waiting, but |
292 | | * this is a BarrierArriveAndDetach() call, then advance the phase too. |
293 | | */ |
294 | 0 | if ((arrive || barrier->participants > 0) && |
295 | 0 | barrier->arrived == barrier->participants) |
296 | 0 | { |
297 | 0 | release = true; |
298 | 0 | barrier->arrived = 0; |
299 | 0 | ++barrier->phase; |
300 | 0 | } |
301 | 0 | else |
302 | 0 | release = false; |
303 | |
|
304 | 0 | last = barrier->participants == 0; |
305 | 0 | SpinLockRelease(&barrier->mutex); |
306 | |
|
307 | 0 | if (release) |
308 | 0 | ConditionVariableBroadcast(&barrier->condition_variable); |
309 | |
|
310 | 0 | return last; |
311 | 0 | } |