/Users/deen/code/yugabyte-db/src/postgres/src/backend/storage/ipc/shm_mq.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * shm_mq.c |
4 | | * single-reader, single-writer shared memory message queue |
5 | | * |
6 | | * Both the sender and the receiver must have a PGPROC; their respective |
7 | | * process latches are used for synchronization. Only the sender may send, |
8 | | * and only the receiver may receive. This is intended to allow a user |
9 | | * backend to communicate with worker backends that it has registered. |
10 | | * |
11 | | * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group |
12 | | * Portions Copyright (c) 1994, Regents of the University of California |
13 | | * |
14 | | * src/include/storage/shm_mq.h |
15 | | * |
16 | | *------------------------------------------------------------------------- |
17 | | */ |
18 | | |
19 | | #include "postgres.h" |
20 | | |
21 | | #include "miscadmin.h" |
22 | | #include "pgstat.h" |
23 | | #include "postmaster/bgworker.h" |
24 | | #include "storage/procsignal.h" |
25 | | #include "storage/shm_mq.h" |
26 | | #include "storage/spin.h" |
27 | | |
28 | | /* |
29 | | * This structure represents the actual queue, stored in shared memory. |
30 | | * |
31 | | * Some notes on synchronization: |
32 | | * |
33 | | * mq_receiver and mq_bytes_read can only be changed by the receiver; and |
34 | | * mq_sender and mq_bytes_written can only be changed by the sender. |
35 | | * mq_receiver and mq_sender are protected by mq_mutex, although, importantly, |
36 | | * they cannot change once set, and thus may be read without a lock once this |
37 | | * is known to be the case. |
38 | | * |
39 | | * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead, |
40 | | * they are written atomically using 8 byte loads and stores. Memory barriers |
41 | | * must be carefully used to synchronize reads and writes of these values with |
42 | | * reads and writes of the actual data in mq_ring. |
43 | | * |
44 | | * mq_detached needs no locking. It can be set by either the sender or the |
45 | | * receiver, but only ever from false to true, so redundant writes don't |
46 | | * matter. It is important that if we set mq_detached and then set the |
47 | | * counterparty's latch, the counterparty must be certain to see the change |
48 | | * after waking up. Since SetLatch begins with a memory barrier and ResetLatch |
49 | | * ends with one, this should be OK. |
50 | | * |
51 | | * mq_ring_size and mq_ring_offset never change after initialization, and |
52 | | * can therefore be read without the lock. |
53 | | * |
54 | | * Importantly, mq_ring can be safely read and written without a lock. |
55 | | * At any given time, the difference between mq_bytes_read and |
56 | | * mq_bytes_written defines the number of bytes within mq_ring that contain |
57 | | * unread data, and mq_bytes_read defines the position where those bytes |
58 | | * begin. The sender can increase the number of unread bytes at any time, |
59 | | * but only the receiver can give license to overwrite those bytes, by |
60 | | * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read |
61 | | * the unread bytes it knows to be present without the lock. Conversely, |
62 | | * the sender can write to the unused portion of the ring buffer without |
63 | | * the lock, because nobody else can be reading or writing those bytes. The |
64 | | * receiver could be making more bytes unused by incrementing mq_bytes_read, |
65 | | * but that's OK. Note that it would be unsafe for the receiver to read any |
66 | | * data it's already marked as read, or to write any data; and it would be |
67 | | * unsafe for the sender to reread any data after incrementing |
68 | | * mq_bytes_written, but fortunately there's no need for any of that. |
69 | | */ |
70 | | struct shm_mq |
71 | | { |
72 | | slock_t mq_mutex; |
73 | | PGPROC *mq_receiver; |
74 | | PGPROC *mq_sender; |
75 | | pg_atomic_uint64 mq_bytes_read; |
76 | | pg_atomic_uint64 mq_bytes_written; |
77 | | Size mq_ring_size; |
78 | | bool mq_detached; |
79 | | uint8 mq_ring_offset; |
80 | | char mq_ring[FLEXIBLE_ARRAY_MEMBER]; |
81 | | }; |
82 | | |
83 | | /* |
84 | | * This structure is a backend-private handle for access to a queue. |
85 | | * |
86 | | * mqh_queue is a pointer to the queue we've attached, and mqh_segment is |
87 | | * an optional pointer to the dynamic shared memory segment that contains it. |
88 | | * (If mqh_segment is provided, we register an on_dsm_detach callback to |
89 | | * make sure we detach from the queue before detaching from DSM.) |
90 | | * |
91 | | * If this queue is intended to connect the current process with a background |
92 | | * worker that started it, the user can pass a pointer to the worker handle |
93 | | * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this |
94 | | * is to allow us to begin sending to or receiving from that queue before the |
95 | | * process we'll be communicating with has even been started. If it fails |
96 | | * to start, the handle will allow us to notice that and fail cleanly, rather |
97 | | * than waiting forever; see shm_mq_wait_internal. This is mostly useful in |
98 | | * simple cases - e.g. where there are just 2 processes communicating; in |
99 | | * more complex scenarios, every process may not have a BackgroundWorkerHandle |
100 | | * available, or may need to watch for the failure of more than one other |
101 | | * process at a time. |
102 | | * |
103 | | * When a message exists as a contiguous chunk of bytes in the queue - that is, |
104 | | * it is smaller than the size of the ring buffer and does not wrap around |
105 | | * the end - we return the message to the caller as a pointer into the buffer. |
106 | | * For messages that are larger or happen to wrap, we reassemble the message |
107 | | * locally by copying the chunks into a backend-local buffer. mqh_buffer is |
108 | | * the buffer, and mqh_buflen is the number of bytes allocated for it. |
109 | | * |
110 | | * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete |
111 | | * are used to track the state of non-blocking operations. When the caller |
112 | | * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they |
113 | | * are expected to retry the call at a later time with the same argument; |
114 | | * we need to retain enough state to pick up where we left off. |
115 | | * mqh_length_word_complete tracks whether we are done sending or receiving |
116 | | * (whichever we're doing) the entire length word. mqh_partial_bytes tracks |
117 | | * the number of bytes read or written for either the length word or the |
118 | | * message itself, and mqh_expected_bytes - which is used only for reads - |
119 | | * tracks the expected total size of the payload. |
120 | | * |
121 | | * mqh_counterparty_attached tracks whether we know the counterparty to have |
122 | | * attached to the queue at some previous point. This lets us avoid some |
123 | | * mutex acquisitions. |
124 | | * |
125 | | * mqh_context is the memory context in effect at the time we attached to |
126 | | * the shm_mq. The shm_mq_handle itself is allocated in this context, and |
127 | | * we make sure any other allocations we do happen in this context as well, |
128 | | * to avoid nasty surprises. |
129 | | */ |
130 | | struct shm_mq_handle |
131 | | { |
132 | | shm_mq *mqh_queue; |
133 | | dsm_segment *mqh_segment; |
134 | | BackgroundWorkerHandle *mqh_handle; |
135 | | char *mqh_buffer; |
136 | | Size mqh_buflen; |
137 | | Size mqh_consume_pending; |
138 | | Size mqh_partial_bytes; |
139 | | Size mqh_expected_bytes; |
140 | | bool mqh_length_word_complete; |
141 | | bool mqh_counterparty_attached; |
142 | | MemoryContext mqh_context; |
143 | | }; |
144 | | |
145 | | static void shm_mq_detach_internal(shm_mq *mq); |
146 | | static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, |
147 | | const void *data, bool nowait, Size *bytes_written); |
148 | | static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, |
149 | | Size bytes_needed, bool nowait, Size *nbytesp, |
150 | | void **datap); |
151 | | static bool shm_mq_counterparty_gone(shm_mq *mq, |
152 | | BackgroundWorkerHandle *handle); |
153 | | static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, |
154 | | BackgroundWorkerHandle *handle); |
155 | | static void shm_mq_inc_bytes_read(shm_mq *mq, Size n); |
156 | | static void shm_mq_inc_bytes_written(shm_mq *mq, Size n); |
157 | | static void shm_mq_detach_callback(dsm_segment *seg, Datum arg); |
158 | | |
159 | | /* Minimum queue size is enough for header and at least one chunk of data. */ |
160 | | const Size shm_mq_minimum_size = |
161 | | MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF; |
162 | | |
163 | 0 | #define MQH_INITIAL_BUFSIZE 8192 |
164 | | |
165 | | /* |
166 | | * Initialize a new shared message queue. |
167 | | */ |
168 | | shm_mq * |
169 | | shm_mq_create(void *address, Size size) |
170 | 0 | { |
171 | 0 | shm_mq *mq = address; |
172 | 0 | Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring)); |
173 | | |
174 | | /* If the size isn't MAXALIGN'd, just discard the odd bytes. */ |
175 | 0 | size = MAXALIGN_DOWN(size); |
176 | | |
177 | | /* Queue size must be large enough to hold some data. */ |
178 | 0 | Assert(size > data_offset); |
179 | | |
180 | | /* Initialize queue header. */ |
181 | 0 | SpinLockInit(&mq->mq_mutex); |
182 | 0 | mq->mq_receiver = NULL; |
183 | 0 | mq->mq_sender = NULL; |
184 | 0 | pg_atomic_init_u64(&mq->mq_bytes_read, 0); |
185 | 0 | pg_atomic_init_u64(&mq->mq_bytes_written, 0); |
186 | 0 | mq->mq_ring_size = size - data_offset; |
187 | 0 | mq->mq_detached = false; |
188 | 0 | mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring); |
189 | |
|
190 | 0 | return mq; |
191 | 0 | } |
192 | | |
193 | | /* |
194 | | * Set the identity of the process that will receive from a shared message |
195 | | * queue. |
196 | | */ |
197 | | void |
198 | | shm_mq_set_receiver(shm_mq *mq, PGPROC *proc) |
199 | 0 | { |
200 | 0 | PGPROC *sender; |
201 | |
|
202 | 0 | SpinLockAcquire(&mq->mq_mutex); |
203 | 0 | Assert(mq->mq_receiver == NULL); |
204 | 0 | mq->mq_receiver = proc; |
205 | 0 | sender = mq->mq_sender; |
206 | 0 | SpinLockRelease(&mq->mq_mutex); |
207 | |
|
208 | 0 | if (sender != NULL) |
209 | 0 | SetLatch(&sender->procLatch); |
210 | 0 | } |
211 | | |
212 | | /* |
213 | | * Set the identity of the process that will send to a shared message queue. |
214 | | */ |
215 | | void |
216 | | shm_mq_set_sender(shm_mq *mq, PGPROC *proc) |
217 | 0 | { |
218 | 0 | PGPROC *receiver; |
219 | |
|
220 | 0 | SpinLockAcquire(&mq->mq_mutex); |
221 | 0 | Assert(mq->mq_sender == NULL); |
222 | 0 | mq->mq_sender = proc; |
223 | 0 | receiver = mq->mq_receiver; |
224 | 0 | SpinLockRelease(&mq->mq_mutex); |
225 | |
|
226 | 0 | if (receiver != NULL) |
227 | 0 | SetLatch(&receiver->procLatch); |
228 | 0 | } |
229 | | |
230 | | /* |
231 | | * Get the configured receiver. |
232 | | */ |
233 | | PGPROC * |
234 | | shm_mq_get_receiver(shm_mq *mq) |
235 | 0 | { |
236 | 0 | PGPROC *receiver; |
237 | |
|
238 | 0 | SpinLockAcquire(&mq->mq_mutex); |
239 | 0 | receiver = mq->mq_receiver; |
240 | 0 | SpinLockRelease(&mq->mq_mutex); |
241 | |
|
242 | 0 | return receiver; |
243 | 0 | } |
244 | | |
245 | | /* |
246 | | * Get the configured sender. |
247 | | */ |
248 | | PGPROC * |
249 | | shm_mq_get_sender(shm_mq *mq) |
250 | 0 | { |
251 | 0 | PGPROC *sender; |
252 | |
|
253 | 0 | SpinLockAcquire(&mq->mq_mutex); |
254 | 0 | sender = mq->mq_sender; |
255 | 0 | SpinLockRelease(&mq->mq_mutex); |
256 | |
|
257 | 0 | return sender; |
258 | 0 | } |
259 | | |
260 | | /* |
261 | | * Attach to a shared message queue so we can send or receive messages. |
262 | | * |
263 | | * The memory context in effect at the time this function is called should |
264 | | * be one which will last for at least as long as the message queue itself. |
265 | | * We'll allocate the handle in that context, and future allocations that |
266 | | * are needed to buffer incoming data will happen in that context as well. |
267 | | * |
268 | | * If seg != NULL, the queue will be automatically detached when that dynamic |
269 | | * shared memory segment is detached. |
270 | | * |
271 | | * If handle != NULL, the queue can be read or written even before the |
272 | | * other process has attached. We'll wait for it to do so if needed. The |
273 | | * handle must be for a background worker initialized with bgw_notify_pid |
274 | | * equal to our PID. |
275 | | * |
276 | | * shm_mq_detach() should be called when done. This will free the |
277 | | * shm_mq_handle and mark the queue itself as detached, so that our |
278 | | * counterpart won't get stuck waiting for us to fill or drain the queue |
279 | | * after we've already lost interest. |
280 | | */ |
281 | | shm_mq_handle * |
282 | | shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) |
283 | 0 | { |
284 | 0 | shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle)); |
285 | |
|
286 | 0 | Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc); |
287 | 0 | mqh->mqh_queue = mq; |
288 | 0 | mqh->mqh_segment = seg; |
289 | 0 | mqh->mqh_handle = handle; |
290 | 0 | mqh->mqh_buffer = NULL; |
291 | 0 | mqh->mqh_buflen = 0; |
292 | 0 | mqh->mqh_consume_pending = 0; |
293 | 0 | mqh->mqh_partial_bytes = 0; |
294 | 0 | mqh->mqh_expected_bytes = 0; |
295 | 0 | mqh->mqh_length_word_complete = false; |
296 | 0 | mqh->mqh_counterparty_attached = false; |
297 | 0 | mqh->mqh_context = GetCurrentMemoryContext(); |
298 | |
|
299 | 0 | if (seg != NULL) |
300 | 0 | on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq)); |
301 | |
|
302 | 0 | return mqh; |
303 | 0 | } |
304 | | |
305 | | /* |
306 | | * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had |
307 | | * been passed to shm_mq_attach. |
308 | | */ |
309 | | void |
310 | | shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle) |
311 | 0 | { |
312 | 0 | Assert(mqh->mqh_handle == NULL); |
313 | 0 | mqh->mqh_handle = handle; |
314 | 0 | } |
315 | | |
316 | | /* |
317 | | * Write a message into a shared message queue. |
318 | | */ |
319 | | shm_mq_result |
320 | | shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait) |
321 | 0 | { |
322 | 0 | shm_mq_iovec iov; |
323 | |
|
324 | 0 | iov.data = data; |
325 | 0 | iov.len = nbytes; |
326 | |
|
327 | 0 | return shm_mq_sendv(mqh, &iov, 1, nowait); |
328 | 0 | } |
329 | | |
330 | | /* |
331 | | * Write a message into a shared message queue, gathered from multiple |
332 | | * addresses. |
333 | | * |
334 | | * When nowait = false, we'll wait on our process latch when the ring buffer |
335 | | * fills up, and then continue writing once the receiver has drained some data. |
336 | | * The process latch is reset after each wait. |
337 | | * |
338 | | * When nowait = true, we do not manipulate the state of the process latch; |
339 | | * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In |
340 | | * this case, the caller should call this function again, with the same |
341 | | * arguments, each time the process latch is set. (Once begun, the sending |
342 | | * of a message cannot be aborted except by detaching from the queue; changing |
343 | | * the length or payload will corrupt the queue.) |
344 | | */ |
345 | | shm_mq_result |
346 | | shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait) |
347 | 0 | { |
348 | 0 | shm_mq_result res; |
349 | 0 | shm_mq *mq = mqh->mqh_queue; |
350 | 0 | PGPROC *receiver; |
351 | 0 | Size nbytes = 0; |
352 | 0 | Size bytes_written; |
353 | 0 | int i; |
354 | 0 | int which_iov = 0; |
355 | 0 | Size offset; |
356 | |
|
357 | 0 | Assert(mq->mq_sender == MyProc); |
358 | | |
359 | | /* Compute total size of write. */ |
360 | 0 | for (i = 0; i < iovcnt; ++i) |
361 | 0 | nbytes += iov[i].len; |
362 | | |
363 | | /* Try to write, or finish writing, the length word into the buffer. */ |
364 | 0 | while (!mqh->mqh_length_word_complete) |
365 | 0 | { |
366 | 0 | Assert(mqh->mqh_partial_bytes < sizeof(Size)); |
367 | 0 | res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes, |
368 | 0 | ((char *) &nbytes) + mqh->mqh_partial_bytes, |
369 | 0 | nowait, &bytes_written); |
370 | |
|
371 | 0 | if (res == SHM_MQ_DETACHED) |
372 | 0 | { |
373 | | /* Reset state in case caller tries to send another message. */ |
374 | 0 | mqh->mqh_partial_bytes = 0; |
375 | 0 | mqh->mqh_length_word_complete = false; |
376 | 0 | return res; |
377 | 0 | } |
378 | 0 | mqh->mqh_partial_bytes += bytes_written; |
379 | |
|
380 | 0 | if (mqh->mqh_partial_bytes >= sizeof(Size)) |
381 | 0 | { |
382 | 0 | Assert(mqh->mqh_partial_bytes == sizeof(Size)); |
383 | |
|
384 | 0 | mqh->mqh_partial_bytes = 0; |
385 | 0 | mqh->mqh_length_word_complete = true; |
386 | 0 | } |
387 | |
|
388 | 0 | if (res != SHM_MQ_SUCCESS) |
389 | 0 | return res; |
390 | | |
391 | | /* Length word can't be split unless bigger than required alignment. */ |
392 | 0 | Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF); |
393 | 0 | } |
394 | | |
395 | | /* Write the actual data bytes into the buffer. */ |
396 | 0 | Assert(mqh->mqh_partial_bytes <= nbytes); |
397 | 0 | offset = mqh->mqh_partial_bytes; |
398 | 0 | do |
399 | 0 | { |
400 | 0 | Size chunksize; |
401 | | |
402 | | /* Figure out which bytes need to be sent next. */ |
403 | 0 | if (offset >= iov[which_iov].len) |
404 | 0 | { |
405 | 0 | offset -= iov[which_iov].len; |
406 | 0 | ++which_iov; |
407 | 0 | if (which_iov >= iovcnt) |
408 | 0 | break; |
409 | 0 | continue; |
410 | 0 | } |
411 | | |
412 | | /* |
413 | | * We want to avoid copying the data if at all possible, but every |
414 | | * chunk of bytes we write into the queue has to be MAXALIGN'd, except |
415 | | * the last. Thus, if a chunk other than the last one ends on a |
416 | | * non-MAXALIGN'd boundary, we have to combine the tail end of its |
417 | | * data with data from one or more following chunks until we either |
418 | | * reach the last chunk or accumulate a number of bytes which is |
419 | | * MAXALIGN'd. |
420 | | */ |
421 | 0 | if (which_iov + 1 < iovcnt && |
422 | 0 | offset + MAXIMUM_ALIGNOF > iov[which_iov].len) |
423 | 0 | { |
424 | 0 | char tmpbuf[MAXIMUM_ALIGNOF]; |
425 | 0 | int j = 0; |
426 | |
|
427 | 0 | for (;;) |
428 | 0 | { |
429 | 0 | if (offset < iov[which_iov].len) |
430 | 0 | { |
431 | 0 | tmpbuf[j] = iov[which_iov].data[offset]; |
432 | 0 | j++; |
433 | 0 | offset++; |
434 | 0 | if (j == MAXIMUM_ALIGNOF) |
435 | 0 | break; |
436 | 0 | } |
437 | 0 | else |
438 | 0 | { |
439 | 0 | offset -= iov[which_iov].len; |
440 | 0 | which_iov++; |
441 | 0 | if (which_iov >= iovcnt) |
442 | 0 | break; |
443 | 0 | } |
444 | 0 | } |
445 | |
|
446 | 0 | res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written); |
447 | |
|
448 | 0 | if (res == SHM_MQ_DETACHED) |
449 | 0 | { |
450 | | /* Reset state in case caller tries to send another message. */ |
451 | 0 | mqh->mqh_partial_bytes = 0; |
452 | 0 | mqh->mqh_length_word_complete = false; |
453 | 0 | return res; |
454 | 0 | } |
455 | | |
456 | 0 | mqh->mqh_partial_bytes += bytes_written; |
457 | 0 | if (res != SHM_MQ_SUCCESS) |
458 | 0 | return res; |
459 | 0 | continue; |
460 | 0 | } |
461 | | |
462 | | /* |
463 | | * If this is the last chunk, we can write all the data, even if it |
464 | | * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to |
465 | | * MAXALIGN_DOWN the write size. |
466 | | */ |
467 | 0 | chunksize = iov[which_iov].len - offset; |
468 | 0 | if (which_iov + 1 < iovcnt) |
469 | 0 | chunksize = MAXALIGN_DOWN(chunksize); |
470 | 0 | res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset], |
471 | 0 | nowait, &bytes_written); |
472 | |
|
473 | 0 | if (res == SHM_MQ_DETACHED) |
474 | 0 | { |
475 | | /* Reset state in case caller tries to send another message. */ |
476 | 0 | mqh->mqh_length_word_complete = false; |
477 | 0 | mqh->mqh_partial_bytes = 0; |
478 | 0 | return res; |
479 | 0 | } |
480 | | |
481 | 0 | mqh->mqh_partial_bytes += bytes_written; |
482 | 0 | offset += bytes_written; |
483 | 0 | if (res != SHM_MQ_SUCCESS) |
484 | 0 | return res; |
485 | 0 | } while (mqh->mqh_partial_bytes < nbytes); |
486 | | |
487 | | /* Reset for next message. */ |
488 | 0 | mqh->mqh_partial_bytes = 0; |
489 | 0 | mqh->mqh_length_word_complete = false; |
490 | | |
491 | | /* If queue has been detached, let caller know. */ |
492 | 0 | if (mq->mq_detached) |
493 | 0 | return SHM_MQ_DETACHED; |
494 | | |
495 | | /* |
496 | | * If the counterparty is known to have attached, we can read mq_receiver |
497 | | * without acquiring the spinlock and assume it isn't NULL. Otherwise, |
498 | | * more caution is needed. |
499 | | */ |
500 | 0 | if (mqh->mqh_counterparty_attached) |
501 | 0 | receiver = mq->mq_receiver; |
502 | 0 | else |
503 | 0 | { |
504 | 0 | SpinLockAcquire(&mq->mq_mutex); |
505 | 0 | receiver = mq->mq_receiver; |
506 | 0 | SpinLockRelease(&mq->mq_mutex); |
507 | 0 | if (receiver == NULL) |
508 | 0 | return SHM_MQ_SUCCESS; |
509 | 0 | mqh->mqh_counterparty_attached = true; |
510 | 0 | } |
511 | | |
512 | | /* Notify receiver of the newly-written data, and return. */ |
513 | 0 | SetLatch(&receiver->procLatch); |
514 | 0 | return SHM_MQ_SUCCESS; |
515 | 0 | } |
516 | | |
517 | | /* |
518 | | * Receive a message from a shared message queue. |
519 | | * |
520 | | * We set *nbytes to the message length and *data to point to the message |
521 | | * payload. If the entire message exists in the queue as a single, |
522 | | * contiguous chunk, *data will point directly into shared memory; otherwise, |
523 | | * it will point to a temporary buffer. This mostly avoids data copying in |
524 | | * the hoped-for case where messages are short compared to the buffer size, |
525 | | * while still allowing longer messages. In either case, the return value |
526 | | * remains valid until the next receive operation is performed on the queue. |
527 | | * |
528 | | * When nowait = false, we'll wait on our process latch when the ring buffer |
529 | | * is empty and we have not yet received a full message. The sender will |
530 | | * set our process latch after more data has been written, and we'll resume |
531 | | * processing. Each call will therefore return a complete message |
532 | | * (unless the sender detaches the queue). |
533 | | * |
534 | | * When nowait = true, we do not manipulate the state of the process latch; |
535 | | * instead, whenever the buffer is empty and we need to read from it, we |
536 | | * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this |
537 | | * function again after the process latch has been set. |
538 | | */ |
539 | | shm_mq_result |
540 | | shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait) |
541 | 0 | { |
542 | 0 | shm_mq *mq = mqh->mqh_queue; |
543 | 0 | shm_mq_result res; |
544 | 0 | Size rb = 0; |
545 | 0 | Size nbytes; |
546 | 0 | void *rawdata; |
547 | |
|
548 | 0 | Assert(mq->mq_receiver == MyProc); |
549 | | |
550 | | /* We can't receive data until the sender has attached. */ |
551 | 0 | if (!mqh->mqh_counterparty_attached) |
552 | 0 | { |
553 | 0 | if (nowait) |
554 | 0 | { |
555 | 0 | int counterparty_gone; |
556 | | |
557 | | /* |
558 | | * We shouldn't return at this point at all unless the sender |
559 | | * hasn't attached yet. However, the correct return value depends |
560 | | * on whether the sender is still attached. If we first test |
561 | | * whether the sender has ever attached and then test whether the |
562 | | * sender has detached, there's a race condition: a sender that |
563 | | * attaches and detaches very quickly might fool us into thinking |
564 | | * the sender never attached at all. So, test whether our |
565 | | * counterparty is definitively gone first, and only afterwards |
566 | | * check whether the sender ever attached in the first place. |
567 | | */ |
568 | 0 | counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle); |
569 | 0 | if (shm_mq_get_sender(mq) == NULL) |
570 | 0 | { |
571 | 0 | if (counterparty_gone) |
572 | 0 | return SHM_MQ_DETACHED; |
573 | 0 | else |
574 | 0 | return SHM_MQ_WOULD_BLOCK; |
575 | 0 | } |
576 | 0 | } |
577 | 0 | else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle) |
578 | 0 | && shm_mq_get_sender(mq) == NULL) |
579 | 0 | { |
580 | 0 | mq->mq_detached = true; |
581 | 0 | return SHM_MQ_DETACHED; |
582 | 0 | } |
583 | 0 | mqh->mqh_counterparty_attached = true; |
584 | 0 | } |
585 | | |
586 | | /* |
587 | | * If we've consumed an amount of data greater than 1/4th of the ring |
588 | | * size, mark it consumed in shared memory. We try to avoid doing this |
589 | | * unnecessarily when only a small amount of data has been consumed, |
590 | | * because SetLatch() is fairly expensive and we don't want to do it too |
591 | | * often. |
592 | | */ |
593 | 0 | if (mqh->mqh_consume_pending > mq->mq_ring_size / 4) |
594 | 0 | { |
595 | 0 | shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending); |
596 | 0 | mqh->mqh_consume_pending = 0; |
597 | 0 | } |
598 | | |
599 | | /* Try to read, or finish reading, the length word from the buffer. */ |
600 | 0 | while (!mqh->mqh_length_word_complete) |
601 | 0 | { |
602 | | /* Try to receive the message length word. */ |
603 | 0 | Assert(mqh->mqh_partial_bytes < sizeof(Size)); |
604 | 0 | res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes, |
605 | 0 | nowait, &rb, &rawdata); |
606 | 0 | if (res != SHM_MQ_SUCCESS) |
607 | 0 | return res; |
608 | | |
609 | | /* |
610 | | * Hopefully, we'll receive the entire message length word at once. |
611 | | * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over |
612 | | * multiple reads. |
613 | | */ |
614 | 0 | if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size)) |
615 | 0 | { |
616 | 0 | Size needed; |
617 | |
|
618 | 0 | nbytes = *(Size *) rawdata; |
619 | | |
620 | | /* If we've already got the whole message, we're done. */ |
621 | 0 | needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes); |
622 | 0 | if (rb >= needed) |
623 | 0 | { |
624 | 0 | mqh->mqh_consume_pending += needed; |
625 | 0 | *nbytesp = nbytes; |
626 | 0 | *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size)); |
627 | 0 | return SHM_MQ_SUCCESS; |
628 | 0 | } |
629 | | |
630 | | /* |
631 | | * We don't have the whole message, but we at least have the whole |
632 | | * length word. |
633 | | */ |
634 | 0 | mqh->mqh_expected_bytes = nbytes; |
635 | 0 | mqh->mqh_length_word_complete = true; |
636 | 0 | mqh->mqh_consume_pending += MAXALIGN(sizeof(Size)); |
637 | 0 | rb -= MAXALIGN(sizeof(Size)); |
638 | 0 | } |
639 | 0 | else |
640 | 0 | { |
641 | 0 | Size lengthbytes; |
642 | | |
643 | | /* Can't be split unless bigger than required alignment. */ |
644 | 0 | Assert(sizeof(Size) > MAXIMUM_ALIGNOF); |
645 | | |
646 | | /* Message word is split; need buffer to reassemble. */ |
647 | 0 | if (mqh->mqh_buffer == NULL) |
648 | 0 | { |
649 | 0 | mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, |
650 | 0 | MQH_INITIAL_BUFSIZE); |
651 | 0 | mqh->mqh_buflen = MQH_INITIAL_BUFSIZE; |
652 | 0 | } |
653 | 0 | Assert(mqh->mqh_buflen >= sizeof(Size)); |
654 | | |
655 | | /* Copy partial length word; remember to consume it. */ |
656 | 0 | if (mqh->mqh_partial_bytes + rb > sizeof(Size)) |
657 | 0 | lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes; |
658 | 0 | else |
659 | 0 | lengthbytes = rb; |
660 | 0 | memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, |
661 | 0 | lengthbytes); |
662 | 0 | mqh->mqh_partial_bytes += lengthbytes; |
663 | 0 | mqh->mqh_consume_pending += MAXALIGN(lengthbytes); |
664 | 0 | rb -= lengthbytes; |
665 | | |
666 | | /* If we now have the whole word, we're ready to read payload. */ |
667 | 0 | if (mqh->mqh_partial_bytes >= sizeof(Size)) |
668 | 0 | { |
669 | 0 | Assert(mqh->mqh_partial_bytes == sizeof(Size)); |
670 | 0 | mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer; |
671 | 0 | mqh->mqh_length_word_complete = true; |
672 | 0 | mqh->mqh_partial_bytes = 0; |
673 | 0 | } |
674 | 0 | } |
675 | 0 | } |
676 | 0 | nbytes = mqh->mqh_expected_bytes; |
677 | |
|
678 | 0 | if (mqh->mqh_partial_bytes == 0) |
679 | 0 | { |
680 | | /* |
681 | | * Try to obtain the whole message in a single chunk. If this works, |
682 | | * we need not copy the data and can return a pointer directly into |
683 | | * shared memory. |
684 | | */ |
685 | 0 | res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata); |
686 | 0 | if (res != SHM_MQ_SUCCESS) |
687 | 0 | return res; |
688 | 0 | if (rb >= nbytes) |
689 | 0 | { |
690 | 0 | mqh->mqh_length_word_complete = false; |
691 | 0 | mqh->mqh_consume_pending += MAXALIGN(nbytes); |
692 | 0 | *nbytesp = nbytes; |
693 | 0 | *datap = rawdata; |
694 | 0 | return SHM_MQ_SUCCESS; |
695 | 0 | } |
696 | | |
697 | | /* |
698 | | * The message has wrapped the buffer. We'll need to copy it in order |
699 | | * to return it to the client in one chunk. First, make sure we have |
700 | | * a large enough buffer available. |
701 | | */ |
702 | 0 | if (mqh->mqh_buflen < nbytes) |
703 | 0 | { |
704 | 0 | Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE); |
705 | |
|
706 | 0 | while (newbuflen < nbytes) |
707 | 0 | newbuflen *= 2; |
708 | |
|
709 | 0 | if (mqh->mqh_buffer != NULL) |
710 | 0 | { |
711 | 0 | pfree(mqh->mqh_buffer); |
712 | 0 | mqh->mqh_buffer = NULL; |
713 | 0 | mqh->mqh_buflen = 0; |
714 | 0 | } |
715 | 0 | mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen); |
716 | 0 | mqh->mqh_buflen = newbuflen; |
717 | 0 | } |
718 | 0 | } |
719 | | |
720 | | /* Loop until we've copied the entire message. */ |
721 | 0 | for (;;) |
722 | 0 | { |
723 | 0 | Size still_needed; |
724 | | |
725 | | /* Copy as much as we can. */ |
726 | 0 | Assert(mqh->mqh_partial_bytes + rb <= nbytes); |
727 | 0 | memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb); |
728 | 0 | mqh->mqh_partial_bytes += rb; |
729 | | |
730 | | /* |
731 | | * Update count of bytes that can be consumed, accounting for |
732 | | * alignment padding. Note that this will never actually insert any |
733 | | * padding except at the end of a message, because the buffer size is |
734 | | * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well. |
735 | | */ |
736 | 0 | Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb)); |
737 | 0 | mqh->mqh_consume_pending += MAXALIGN(rb); |
738 | | |
739 | | /* If we got all the data, exit the loop. */ |
740 | 0 | if (mqh->mqh_partial_bytes >= nbytes) |
741 | 0 | break; |
742 | | |
743 | | /* Wait for some more data. */ |
744 | 0 | still_needed = nbytes - mqh->mqh_partial_bytes; |
745 | 0 | res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata); |
746 | 0 | if (res != SHM_MQ_SUCCESS) |
747 | 0 | return res; |
748 | 0 | if (rb > still_needed) |
749 | 0 | rb = still_needed; |
750 | 0 | } |
751 | | |
752 | | /* Return the complete message, and reset for next message. */ |
753 | 0 | *nbytesp = nbytes; |
754 | 0 | *datap = mqh->mqh_buffer; |
755 | 0 | mqh->mqh_length_word_complete = false; |
756 | 0 | mqh->mqh_partial_bytes = 0; |
757 | 0 | return SHM_MQ_SUCCESS; |
758 | 0 | } |
759 | | |
760 | | /* |
761 | | * Wait for the other process that's supposed to use this queue to attach |
762 | | * to it. |
763 | | * |
764 | | * The return value is SHM_MQ_DETACHED if the worker has already detached or |
765 | | * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached. |
766 | | * Note that we will only be able to detect that the worker has died before |
767 | | * attaching if a background worker handle was passed to shm_mq_attach(). |
768 | | */ |
769 | | shm_mq_result |
770 | | shm_mq_wait_for_attach(shm_mq_handle *mqh) |
771 | 0 | { |
772 | 0 | shm_mq *mq = mqh->mqh_queue; |
773 | 0 | PGPROC **victim; |
774 | |
|
775 | 0 | if (shm_mq_get_receiver(mq) == MyProc) |
776 | 0 | victim = &mq->mq_sender; |
777 | 0 | else |
778 | 0 | { |
779 | 0 | Assert(shm_mq_get_sender(mq) == MyProc); |
780 | 0 | victim = &mq->mq_receiver; |
781 | 0 | } |
782 | |
|
783 | 0 | if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle)) |
784 | 0 | return SHM_MQ_SUCCESS; |
785 | 0 | else |
786 | 0 | return SHM_MQ_DETACHED; |
787 | 0 | } |
788 | | |
789 | | /* |
790 | | * Detach from a shared message queue, and destroy the shm_mq_handle. |
791 | | */ |
792 | | void |
793 | | shm_mq_detach(shm_mq_handle *mqh) |
794 | 0 | { |
795 | | /* Notify counterparty that we're outta here. */ |
796 | 0 | shm_mq_detach_internal(mqh->mqh_queue); |
797 | | |
798 | | /* Cancel on_dsm_detach callback, if any. */ |
799 | 0 | if (mqh->mqh_segment) |
800 | 0 | cancel_on_dsm_detach(mqh->mqh_segment, |
801 | 0 | shm_mq_detach_callback, |
802 | 0 | PointerGetDatum(mqh->mqh_queue)); |
803 | | |
804 | | /* Release local memory associated with handle. */ |
805 | 0 | if (mqh->mqh_buffer != NULL) |
806 | 0 | pfree(mqh->mqh_buffer); |
807 | 0 | pfree(mqh); |
808 | 0 | } |
809 | | |
810 | | /* |
811 | | * Notify counterparty that we're detaching from shared message queue. |
812 | | * |
813 | | * The purpose of this function is to make sure that the process |
814 | | * with which we're communicating doesn't block forever waiting for us to |
815 | | * fill or drain the queue once we've lost interest. When the sender |
816 | | * detaches, the receiver can read any messages remaining in the queue; |
817 | | * further reads will return SHM_MQ_DETACHED. If the receiver detaches, |
818 | | * further attempts to send messages will likewise return SHM_MQ_DETACHED. |
819 | | * |
820 | | * This is separated out from shm_mq_detach() because if the on_dsm_detach |
821 | | * callback fires, we only want to do this much. We do not try to touch |
822 | | * the local shm_mq_handle, as it may have been pfree'd already. |
823 | | */ |
824 | | static void |
825 | | shm_mq_detach_internal(shm_mq *mq) |
826 | 0 | { |
827 | 0 | PGPROC *victim; |
828 | |
|
829 | 0 | SpinLockAcquire(&mq->mq_mutex); |
830 | 0 | if (mq->mq_sender == MyProc) |
831 | 0 | victim = mq->mq_receiver; |
832 | 0 | else |
833 | 0 | { |
834 | 0 | Assert(mq->mq_receiver == MyProc); |
835 | 0 | victim = mq->mq_sender; |
836 | 0 | } |
837 | 0 | mq->mq_detached = true; |
838 | 0 | SpinLockRelease(&mq->mq_mutex); |
839 | |
|
840 | 0 | if (victim != NULL) |
841 | 0 | SetLatch(&victim->procLatch); |
842 | 0 | } |
843 | | |
844 | | /* |
845 | | * Get the shm_mq from handle. |
846 | | */ |
847 | | shm_mq * |
848 | | shm_mq_get_queue(shm_mq_handle *mqh) |
849 | 0 | { |
850 | 0 | return mqh->mqh_queue; |
851 | 0 | } |
852 | | |
853 | | /* |
854 | | * Write bytes into a shared message queue. |
855 | | */ |
856 | | static shm_mq_result |
857 | | shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, |
858 | | bool nowait, Size *bytes_written) |
859 | 0 | { |
860 | 0 | shm_mq *mq = mqh->mqh_queue; |
861 | 0 | Size sent = 0; |
862 | 0 | uint64 used; |
863 | 0 | Size ringsize = mq->mq_ring_size; |
864 | 0 | Size available; |
865 | |
|
866 | 0 | while (sent < nbytes) |
867 | 0 | { |
868 | 0 | uint64 rb; |
869 | 0 | uint64 wb; |
870 | | |
871 | | /* Compute number of ring buffer bytes used and available. */ |
872 | 0 | rb = pg_atomic_read_u64(&mq->mq_bytes_read); |
873 | 0 | wb = pg_atomic_read_u64(&mq->mq_bytes_written); |
874 | 0 | Assert(wb >= rb); |
875 | 0 | used = wb - rb; |
876 | 0 | Assert(used <= ringsize); |
877 | 0 | available = Min(ringsize - used, nbytes - sent); |
878 | | |
879 | | /* |
880 | | * Bail out if the queue has been detached. Note that we would be in |
881 | | * trouble if the compiler decided to cache the value of |
882 | | * mq->mq_detached in a register or on the stack across loop |
883 | | * iterations. It probably shouldn't do that anyway since we'll |
884 | | * always return, call an external function that performs a system |
885 | | * call, or reach a memory barrier at some point later in the loop, |
886 | | * but just to be sure, insert a compiler barrier here. |
887 | | */ |
888 | 0 | pg_compiler_barrier(); |
889 | 0 | if (mq->mq_detached) |
890 | 0 | { |
891 | 0 | *bytes_written = sent; |
892 | 0 | return SHM_MQ_DETACHED; |
893 | 0 | } |
894 | | |
895 | 0 | if (available == 0 && !mqh->mqh_counterparty_attached) |
896 | 0 | { |
897 | | /* |
898 | | * The queue is full, so if the receiver isn't yet known to be |
899 | | * attached, we must wait for that to happen. |
900 | | */ |
901 | 0 | if (nowait) |
902 | 0 | { |
903 | 0 | if (shm_mq_counterparty_gone(mq, mqh->mqh_handle)) |
904 | 0 | { |
905 | 0 | *bytes_written = sent; |
906 | 0 | return SHM_MQ_DETACHED; |
907 | 0 | } |
908 | 0 | if (shm_mq_get_receiver(mq) == NULL) |
909 | 0 | { |
910 | 0 | *bytes_written = sent; |
911 | 0 | return SHM_MQ_WOULD_BLOCK; |
912 | 0 | } |
913 | 0 | } |
914 | 0 | else if (!shm_mq_wait_internal(mq, &mq->mq_receiver, |
915 | 0 | mqh->mqh_handle)) |
916 | 0 | { |
917 | 0 | mq->mq_detached = true; |
918 | 0 | *bytes_written = sent; |
919 | 0 | return SHM_MQ_DETACHED; |
920 | 0 | } |
921 | 0 | mqh->mqh_counterparty_attached = true; |
922 | | |
923 | | /* |
924 | | * The receiver may have read some data after attaching, so we |
925 | | * must not wait without rechecking the queue state. |
926 | | */ |
927 | 0 | } |
928 | 0 | else if (available == 0) |
929 | 0 | { |
930 | | /* |
931 | | * Since mq->mqh_counterparty_attached is known to be true at this |
932 | | * point, mq_receiver has been set, and it can't change once set. |
933 | | * Therefore, we can read it without acquiring the spinlock. |
934 | | */ |
935 | 0 | Assert(mqh->mqh_counterparty_attached); |
936 | 0 | SetLatch(&mq->mq_receiver->procLatch); |
937 | | |
938 | | /* Skip manipulation of our latch if nowait = true. */ |
939 | 0 | if (nowait) |
940 | 0 | { |
941 | 0 | *bytes_written = sent; |
942 | 0 | return SHM_MQ_WOULD_BLOCK; |
943 | 0 | } |
944 | | |
945 | | /* |
946 | | * Wait for our latch to be set. It might already be set for some |
947 | | * unrelated reason, but that'll just result in one extra trip |
948 | | * through the loop. It's worth it to avoid resetting the latch |
949 | | * at top of loop, because setting an already-set latch is much |
950 | | * cheaper than setting one that has been reset. |
951 | | */ |
952 | 0 | WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND); |
953 | | |
954 | | /* Reset the latch so we don't spin. */ |
955 | 0 | ResetLatch(MyLatch); |
956 | | |
957 | | /* An interrupt may have occurred while we were waiting. */ |
958 | 0 | CHECK_FOR_INTERRUPTS(); |
959 | 0 | } |
960 | 0 | else |
961 | 0 | { |
962 | 0 | Size offset; |
963 | 0 | Size sendnow; |
964 | |
|
965 | 0 | offset = wb % (uint64) ringsize; |
966 | 0 | sendnow = Min(available, ringsize - offset); |
967 | | |
968 | | /* |
969 | | * Write as much data as we can via a single memcpy(). Make sure |
970 | | * these writes happen after the read of mq_bytes_read, above. |
971 | | * This barrier pairs with the one in shm_mq_inc_bytes_read. |
972 | | * (Since we're separating the read of mq_bytes_read from a |
973 | | * subsequent write to mq_ring, we need a full barrier here.) |
974 | | */ |
975 | 0 | pg_memory_barrier(); |
976 | 0 | memcpy(&mq->mq_ring[mq->mq_ring_offset + offset], |
977 | 0 | (char *) data + sent, sendnow); |
978 | 0 | sent += sendnow; |
979 | | |
980 | | /* |
981 | | * Update count of bytes written, with alignment padding. Note |
982 | | * that this will never actually insert any padding except at the |
983 | | * end of a run of bytes, because the buffer size is a multiple of |
984 | | * MAXIMUM_ALIGNOF, and each read is as well. |
985 | | */ |
986 | 0 | Assert(sent == nbytes || sendnow == MAXALIGN(sendnow)); |
987 | 0 | shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow)); |
988 | | |
989 | | /* |
990 | | * For efficiency, we don't set the reader's latch here. We'll do |
991 | | * that only when the buffer fills up or after writing an entire |
992 | | * message. |
993 | | */ |
994 | 0 | } |
995 | 0 | } |
996 | |
|
997 | 0 | *bytes_written = sent; |
998 | 0 | return SHM_MQ_SUCCESS; |
999 | 0 | } |
1000 | | |
1001 | | /* |
1002 | | * Wait until at least *nbytesp bytes are available to be read from the |
1003 | | * shared message queue, or until the buffer wraps around. If the queue is |
1004 | | * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait |
1005 | | * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set |
1006 | | * to the location at which data bytes can be read, *nbytesp is set to the |
1007 | | * number of bytes which can be read at that address, and the return value |
1008 | | * is SHM_MQ_SUCCESS. |
1009 | | */ |
1010 | | static shm_mq_result |
1011 | | shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, |
1012 | | Size *nbytesp, void **datap) |
1013 | 0 | { |
1014 | 0 | shm_mq *mq = mqh->mqh_queue; |
1015 | 0 | Size ringsize = mq->mq_ring_size; |
1016 | 0 | uint64 used; |
1017 | 0 | uint64 written; |
1018 | |
|
1019 | 0 | for (;;) |
1020 | 0 | { |
1021 | 0 | Size offset; |
1022 | 0 | uint64 read; |
1023 | | |
1024 | | /* Get bytes written, so we can compute what's available to read. */ |
1025 | 0 | written = pg_atomic_read_u64(&mq->mq_bytes_written); |
1026 | | |
1027 | | /* |
1028 | | * Get bytes read. Include bytes we could consume but have not yet |
1029 | | * consumed. |
1030 | | */ |
1031 | 0 | read = pg_atomic_read_u64(&mq->mq_bytes_read) + |
1032 | 0 | mqh->mqh_consume_pending; |
1033 | 0 | used = written - read; |
1034 | 0 | Assert(used <= ringsize); |
1035 | 0 | offset = read % (uint64) ringsize; |
1036 | | |
1037 | | /* If we have enough data or buffer has wrapped, we're done. */ |
1038 | 0 | if (used >= bytes_needed || offset + used >= ringsize) |
1039 | 0 | { |
1040 | 0 | *nbytesp = Min(used, ringsize - offset); |
1041 | 0 | *datap = &mq->mq_ring[mq->mq_ring_offset + offset]; |
1042 | | |
1043 | | /* |
1044 | | * Separate the read of mq_bytes_written, above, from caller's |
1045 | | * attempt to read the data itself. Pairs with the barrier in |
1046 | | * shm_mq_inc_bytes_written. |
1047 | | */ |
1048 | 0 | pg_read_barrier(); |
1049 | 0 | return SHM_MQ_SUCCESS; |
1050 | 0 | } |
1051 | | |
1052 | | /* |
1053 | | * Fall out before waiting if the queue has been detached. |
1054 | | * |
1055 | | * Note that we don't check for this until *after* considering whether |
1056 | | * the data already available is enough, since the receiver can finish |
1057 | | * receiving a message stored in the buffer even after the sender has |
1058 | | * detached. |
1059 | | */ |
1060 | 0 | if (mq->mq_detached) |
1061 | 0 | { |
1062 | | /* |
1063 | | * If the writer advanced mq_bytes_written and then set |
1064 | | * mq_detached, we might not have read the final value of |
1065 | | * mq_bytes_written above. Insert a read barrier and then check |
1066 | | * again if mq_bytes_written has advanced. |
1067 | | */ |
1068 | 0 | pg_read_barrier(); |
1069 | 0 | if (written != pg_atomic_read_u64(&mq->mq_bytes_written)) |
1070 | 0 | continue; |
1071 | | |
1072 | 0 | return SHM_MQ_DETACHED; |
1073 | 0 | } |
1074 | | |
1075 | | /* |
1076 | | * We didn't get enough data to satisfy the request, so mark any data |
1077 | | * previously-consumed as read to make more buffer space. |
1078 | | */ |
1079 | 0 | if (mqh->mqh_consume_pending > 0) |
1080 | 0 | { |
1081 | 0 | shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending); |
1082 | 0 | mqh->mqh_consume_pending = 0; |
1083 | 0 | } |
1084 | | |
1085 | | /* Skip manipulation of our latch if nowait = true. */ |
1086 | 0 | if (nowait) |
1087 | 0 | return SHM_MQ_WOULD_BLOCK; |
1088 | | |
1089 | | /* |
1090 | | * Wait for our latch to be set. It might already be set for some |
1091 | | * unrelated reason, but that'll just result in one extra trip through |
1092 | | * the loop. It's worth it to avoid resetting the latch at top of |
1093 | | * loop, because setting an already-set latch is much cheaper than |
1094 | | * setting one that has been reset. |
1095 | | */ |
1096 | 0 | WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_RECEIVE); |
1097 | | |
1098 | | /* Reset the latch so we don't spin. */ |
1099 | 0 | ResetLatch(MyLatch); |
1100 | | |
1101 | | /* An interrupt may have occurred while we were waiting. */ |
1102 | 0 | CHECK_FOR_INTERRUPTS(); |
1103 | 0 | } |
1104 | 0 | } |
1105 | | |
1106 | | /* |
1107 | | * Test whether a counterparty who may not even be alive yet is definitely gone. |
1108 | | */ |
1109 | | static bool |
1110 | | shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle) |
1111 | 0 | { |
1112 | 0 | pid_t pid; |
1113 | | |
1114 | | /* If the queue has been detached, counterparty is definitely gone. */ |
1115 | 0 | if (mq->mq_detached) |
1116 | 0 | return true; |
1117 | | |
1118 | | /* If there's a handle, check worker status. */ |
1119 | 0 | if (handle != NULL) |
1120 | 0 | { |
1121 | 0 | BgwHandleStatus status; |
1122 | | |
1123 | | /* Check for unexpected worker death. */ |
1124 | 0 | status = GetBackgroundWorkerPid(handle, &pid); |
1125 | 0 | if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED) |
1126 | 0 | { |
1127 | | /* Mark it detached, just to make it official. */ |
1128 | 0 | mq->mq_detached = true; |
1129 | 0 | return true; |
1130 | 0 | } |
1131 | 0 | } |
1132 | | |
1133 | | /* Counterparty is not definitively gone. */ |
1134 | 0 | return false; |
1135 | 0 | } |
1136 | | |
1137 | | /* |
1138 | | * This is used when a process is waiting for its counterpart to attach to the |
1139 | | * queue. We exit when the other process attaches as expected, or, if |
1140 | | * handle != NULL, when the referenced background process or the postmaster |
1141 | | * dies. Note that if handle == NULL, and the process fails to attach, we'll |
1142 | | * potentially get stuck here forever waiting for a process that may never |
1143 | | * start. We do check for interrupts, though. |
1144 | | * |
1145 | | * ptr is a pointer to the memory address that we're expecting to become |
1146 | | * non-NULL when our counterpart attaches to the queue. |
1147 | | */ |
1148 | | static bool |
1149 | | shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle) |
1150 | 0 | { |
1151 | 0 | bool result = false; |
1152 | |
|
1153 | 0 | for (;;) |
1154 | 0 | { |
1155 | 0 | BgwHandleStatus status; |
1156 | 0 | pid_t pid; |
1157 | | |
1158 | | /* Acquire the lock just long enough to check the pointer. */ |
1159 | 0 | SpinLockAcquire(&mq->mq_mutex); |
1160 | 0 | result = (*ptr != NULL); |
1161 | 0 | SpinLockRelease(&mq->mq_mutex); |
1162 | | |
1163 | | /* Fail if detached; else succeed if initialized. */ |
1164 | 0 | if (mq->mq_detached) |
1165 | 0 | { |
1166 | 0 | result = false; |
1167 | 0 | break; |
1168 | 0 | } |
1169 | 0 | if (result) |
1170 | 0 | break; |
1171 | | |
1172 | 0 | if (handle != NULL) |
1173 | 0 | { |
1174 | | /* Check for unexpected worker death. */ |
1175 | 0 | status = GetBackgroundWorkerPid(handle, &pid); |
1176 | 0 | if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED) |
1177 | 0 | { |
1178 | 0 | result = false; |
1179 | 0 | break; |
1180 | 0 | } |
1181 | 0 | } |
1182 | | |
1183 | | /* Wait to be signalled. */ |
1184 | 0 | WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_INTERNAL); |
1185 | | |
1186 | | /* Reset the latch so we don't spin. */ |
1187 | 0 | ResetLatch(MyLatch); |
1188 | | |
1189 | | /* An interrupt may have occurred while we were waiting. */ |
1190 | 0 | CHECK_FOR_INTERRUPTS(); |
1191 | 0 | } |
1192 | |
|
1193 | 0 | return result; |
1194 | 0 | } |
1195 | | |
1196 | | /* |
1197 | | * Increment the number of bytes read. |
1198 | | */ |
1199 | | static void |
1200 | | shm_mq_inc_bytes_read(shm_mq *mq, Size n) |
1201 | 0 | { |
1202 | 0 | PGPROC *sender; |
1203 | | |
1204 | | /* |
1205 | | * Separate prior reads of mq_ring from the increment of mq_bytes_read |
1206 | | * which follows. This pairs with the full barrier in |
1207 | | * shm_mq_send_bytes(). We only need a read barrier here because the |
1208 | | * increment of mq_bytes_read is actually a read followed by a dependent |
1209 | | * write. |
1210 | | */ |
1211 | 0 | pg_read_barrier(); |
1212 | | |
1213 | | /* |
1214 | | * There's no need to use pg_atomic_fetch_add_u64 here, because nobody |
1215 | | * else can be changing this value. This method should be cheaper. |
1216 | | */ |
1217 | 0 | pg_atomic_write_u64(&mq->mq_bytes_read, |
1218 | 0 | pg_atomic_read_u64(&mq->mq_bytes_read) + n); |
1219 | | |
1220 | | /* |
1221 | | * We shouldn't have any bytes to read without a sender, so we can read |
1222 | | * mq_sender here without a lock. Once it's initialized, it can't change. |
1223 | | */ |
1224 | 0 | sender = mq->mq_sender; |
1225 | 0 | Assert(sender != NULL); |
1226 | 0 | SetLatch(&sender->procLatch); |
1227 | 0 | } |
1228 | | |
1229 | | /* |
1230 | | * Increment the number of bytes written. |
1231 | | */ |
1232 | | static void |
1233 | | shm_mq_inc_bytes_written(shm_mq *mq, Size n) |
1234 | 0 | { |
1235 | | /* |
1236 | | * Separate prior reads of mq_ring from the write of mq_bytes_written |
1237 | | * which we're about to do. Pairs with the read barrier found in |
1238 | | * shm_mq_get_receive_bytes. |
1239 | | */ |
1240 | 0 | pg_write_barrier(); |
1241 | | |
1242 | | /* |
1243 | | * There's no need to use pg_atomic_fetch_add_u64 here, because nobody |
1244 | | * else can be changing this value. This method avoids taking the bus |
1245 | | * lock unnecessarily. |
1246 | | */ |
1247 | 0 | pg_atomic_write_u64(&mq->mq_bytes_written, |
1248 | 0 | pg_atomic_read_u64(&mq->mq_bytes_written) + n); |
1249 | 0 | } |
1250 | | |
1251 | | /* Shim for on_dsm_callback. */ |
1252 | | static void |
1253 | | shm_mq_detach_callback(dsm_segment *seg, Datum arg) |
1254 | 0 | { |
1255 | 0 | shm_mq *mq = (shm_mq *) DatumGetPointer(arg); |
1256 | |
|
1257 | 0 | shm_mq_detach_internal(mq); |
1258 | 0 | } |