YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/postgres/src/backend/storage/ipc/shm_mq.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * shm_mq.c
4
 *    single-reader, single-writer shared memory message queue
5
 *
6
 * Both the sender and the receiver must have a PGPROC; their respective
7
 * process latches are used for synchronization.  Only the sender may send,
8
 * and only the receiver may receive.  This is intended to allow a user
9
 * backend to communicate with worker backends that it has registered.
10
 *
11
 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
12
 * Portions Copyright (c) 1994, Regents of the University of California
13
 *
14
 * src/include/storage/shm_mq.h
15
 *
16
 *-------------------------------------------------------------------------
17
 */
18
19
#include "postgres.h"
20
21
#include "miscadmin.h"
22
#include "pgstat.h"
23
#include "postmaster/bgworker.h"
24
#include "storage/procsignal.h"
25
#include "storage/shm_mq.h"
26
#include "storage/spin.h"
27
28
/*
29
 * This structure represents the actual queue, stored in shared memory.
30
 *
31
 * Some notes on synchronization:
32
 *
33
 * mq_receiver and mq_bytes_read can only be changed by the receiver; and
34
 * mq_sender and mq_bytes_written can only be changed by the sender.
35
 * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
36
 * they cannot change once set, and thus may be read without a lock once this
37
 * is known to be the case.
38
 *
39
 * mq_bytes_read and mq_bytes_written are not protected by the mutex.  Instead,
40
 * they are written atomically using 8 byte loads and stores.  Memory barriers
41
 * must be carefully used to synchronize reads and writes of these values with
42
 * reads and writes of the actual data in mq_ring.
43
 *
44
 * mq_detached needs no locking.  It can be set by either the sender or the
45
 * receiver, but only ever from false to true, so redundant writes don't
46
 * matter.  It is important that if we set mq_detached and then set the
47
 * counterparty's latch, the counterparty must be certain to see the change
48
 * after waking up.  Since SetLatch begins with a memory barrier and ResetLatch
49
 * ends with one, this should be OK.
50
 *
51
 * mq_ring_size and mq_ring_offset never change after initialization, and
52
 * can therefore be read without the lock.
53
 *
54
 * Importantly, mq_ring can be safely read and written without a lock.
55
 * At any given time, the difference between mq_bytes_read and
56
 * mq_bytes_written defines the number of bytes within mq_ring that contain
57
 * unread data, and mq_bytes_read defines the position where those bytes
58
 * begin.  The sender can increase the number of unread bytes at any time,
59
 * but only the receiver can give license to overwrite those bytes, by
60
 * incrementing mq_bytes_read.  Therefore, it's safe for the receiver to read
61
 * the unread bytes it knows to be present without the lock.  Conversely,
62
 * the sender can write to the unused portion of the ring buffer without
63
 * the lock, because nobody else can be reading or writing those bytes.  The
64
 * receiver could be making more bytes unused by incrementing mq_bytes_read,
65
 * but that's OK.  Note that it would be unsafe for the receiver to read any
66
 * data it's already marked as read, or to write any data; and it would be
67
 * unsafe for the sender to reread any data after incrementing
68
 * mq_bytes_written, but fortunately there's no need for any of that.
69
 */
70
struct shm_mq
71
{
72
  slock_t   mq_mutex;
73
  PGPROC     *mq_receiver;
74
  PGPROC     *mq_sender;
75
  pg_atomic_uint64 mq_bytes_read;
76
  pg_atomic_uint64 mq_bytes_written;
77
  Size    mq_ring_size;
78
  bool    mq_detached;
79
  uint8   mq_ring_offset;
80
  char    mq_ring[FLEXIBLE_ARRAY_MEMBER];
81
};
82
83
/*
84
 * This structure is a backend-private handle for access to a queue.
85
 *
86
 * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
87
 * an optional pointer to the dynamic shared memory segment that contains it.
88
 * (If mqh_segment is provided, we register an on_dsm_detach callback to
89
 * make sure we detach from the queue before detaching from DSM.)
90
 *
91
 * If this queue is intended to connect the current process with a background
92
 * worker that started it, the user can pass a pointer to the worker handle
93
 * to shm_mq_attach(), and we'll store it in mqh_handle.  The point of this
94
 * is to allow us to begin sending to or receiving from that queue before the
95
 * process we'll be communicating with has even been started.  If it fails
96
 * to start, the handle will allow us to notice that and fail cleanly, rather
97
 * than waiting forever; see shm_mq_wait_internal.  This is mostly useful in
98
 * simple cases - e.g. where there are just 2 processes communicating; in
99
 * more complex scenarios, every process may not have a BackgroundWorkerHandle
100
 * available, or may need to watch for the failure of more than one other
101
 * process at a time.
102
 *
103
 * When a message exists as a contiguous chunk of bytes in the queue - that is,
104
 * it is smaller than the size of the ring buffer and does not wrap around
105
 * the end - we return the message to the caller as a pointer into the buffer.
106
 * For messages that are larger or happen to wrap, we reassemble the message
107
 * locally by copying the chunks into a backend-local buffer.  mqh_buffer is
108
 * the buffer, and mqh_buflen is the number of bytes allocated for it.
109
 *
110
 * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
111
 * are used to track the state of non-blocking operations.  When the caller
112
 * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
113
 * are expected to retry the call at a later time with the same argument;
114
 * we need to retain enough state to pick up where we left off.
115
 * mqh_length_word_complete tracks whether we are done sending or receiving
116
 * (whichever we're doing) the entire length word.  mqh_partial_bytes tracks
117
 * the number of bytes read or written for either the length word or the
118
 * message itself, and mqh_expected_bytes - which is used only for reads -
119
 * tracks the expected total size of the payload.
120
 *
121
 * mqh_counterparty_attached tracks whether we know the counterparty to have
122
 * attached to the queue at some previous point.  This lets us avoid some
123
 * mutex acquisitions.
124
 *
125
 * mqh_context is the memory context in effect at the time we attached to
126
 * the shm_mq.  The shm_mq_handle itself is allocated in this context, and
127
 * we make sure any other allocations we do happen in this context as well,
128
 * to avoid nasty surprises.
129
 */
130
struct shm_mq_handle
131
{
132
  shm_mq     *mqh_queue;
133
  dsm_segment *mqh_segment;
134
  BackgroundWorkerHandle *mqh_handle;
135
  char     *mqh_buffer;
136
  Size    mqh_buflen;
137
  Size    mqh_consume_pending;
138
  Size    mqh_partial_bytes;
139
  Size    mqh_expected_bytes;
140
  bool    mqh_length_word_complete;
141
  bool    mqh_counterparty_attached;
142
  MemoryContext mqh_context;
143
};
144
145
static void shm_mq_detach_internal(shm_mq *mq);
146
static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
147
          const void *data, bool nowait, Size *bytes_written);
148
static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
149
           Size bytes_needed, bool nowait, Size *nbytesp,
150
           void **datap);
151
static bool shm_mq_counterparty_gone(shm_mq *mq,
152
             BackgroundWorkerHandle *handle);
153
static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
154
           BackgroundWorkerHandle *handle);
155
static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
156
static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
157
static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
158
159
/* Minimum queue size is enough for header and at least one chunk of data. */
160
const Size  shm_mq_minimum_size =
161
MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
162
163
0
#define MQH_INITIAL_BUFSIZE       8192
164
165
/*
166
 * Initialize a new shared message queue.
167
 */
168
shm_mq *
169
shm_mq_create(void *address, Size size)
170
0
{
171
0
  shm_mq     *mq = address;
172
0
  Size    data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
173
174
  /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
175
0
  size = MAXALIGN_DOWN(size);
176
177
  /* Queue size must be large enough to hold some data. */
178
0
  Assert(size > data_offset);
179
180
  /* Initialize queue header. */
181
0
  SpinLockInit(&mq->mq_mutex);
182
0
  mq->mq_receiver = NULL;
183
0
  mq->mq_sender = NULL;
184
0
  pg_atomic_init_u64(&mq->mq_bytes_read, 0);
185
0
  pg_atomic_init_u64(&mq->mq_bytes_written, 0);
186
0
  mq->mq_ring_size = size - data_offset;
187
0
  mq->mq_detached = false;
188
0
  mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
189
190
0
  return mq;
191
0
}
192
193
/*
194
 * Set the identity of the process that will receive from a shared message
195
 * queue.
196
 */
197
void
198
shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
199
0
{
200
0
  PGPROC     *sender;
201
202
0
  SpinLockAcquire(&mq->mq_mutex);
203
0
  Assert(mq->mq_receiver == NULL);
204
0
  mq->mq_receiver = proc;
205
0
  sender = mq->mq_sender;
206
0
  SpinLockRelease(&mq->mq_mutex);
207
208
0
  if (sender != NULL)
209
0
    SetLatch(&sender->procLatch);
210
0
}
211
212
/*
213
 * Set the identity of the process that will send to a shared message queue.
214
 */
215
void
216
shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
217
0
{
218
0
  PGPROC     *receiver;
219
220
0
  SpinLockAcquire(&mq->mq_mutex);
221
0
  Assert(mq->mq_sender == NULL);
222
0
  mq->mq_sender = proc;
223
0
  receiver = mq->mq_receiver;
224
0
  SpinLockRelease(&mq->mq_mutex);
225
226
0
  if (receiver != NULL)
227
0
    SetLatch(&receiver->procLatch);
228
0
}
229
230
/*
231
 * Get the configured receiver.
232
 */
233
PGPROC *
234
shm_mq_get_receiver(shm_mq *mq)
235
0
{
236
0
  PGPROC     *receiver;
237
238
0
  SpinLockAcquire(&mq->mq_mutex);
239
0
  receiver = mq->mq_receiver;
240
0
  SpinLockRelease(&mq->mq_mutex);
241
242
0
  return receiver;
243
0
}
244
245
/*
246
 * Get the configured sender.
247
 */
248
PGPROC *
249
shm_mq_get_sender(shm_mq *mq)
250
0
{
251
0
  PGPROC     *sender;
252
253
0
  SpinLockAcquire(&mq->mq_mutex);
254
0
  sender = mq->mq_sender;
255
0
  SpinLockRelease(&mq->mq_mutex);
256
257
0
  return sender;
258
0
}
259
260
/*
261
 * Attach to a shared message queue so we can send or receive messages.
262
 *
263
 * The memory context in effect at the time this function is called should
264
 * be one which will last for at least as long as the message queue itself.
265
 * We'll allocate the handle in that context, and future allocations that
266
 * are needed to buffer incoming data will happen in that context as well.
267
 *
268
 * If seg != NULL, the queue will be automatically detached when that dynamic
269
 * shared memory segment is detached.
270
 *
271
 * If handle != NULL, the queue can be read or written even before the
272
 * other process has attached.  We'll wait for it to do so if needed.  The
273
 * handle must be for a background worker initialized with bgw_notify_pid
274
 * equal to our PID.
275
 *
276
 * shm_mq_detach() should be called when done.  This will free the
277
 * shm_mq_handle and mark the queue itself as detached, so that our
278
 * counterpart won't get stuck waiting for us to fill or drain the queue
279
 * after we've already lost interest.
280
 */
281
shm_mq_handle *
282
shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
283
0
{
284
0
  shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
285
286
0
  Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
287
0
  mqh->mqh_queue = mq;
288
0
  mqh->mqh_segment = seg;
289
0
  mqh->mqh_handle = handle;
290
0
  mqh->mqh_buffer = NULL;
291
0
  mqh->mqh_buflen = 0;
292
0
  mqh->mqh_consume_pending = 0;
293
0
  mqh->mqh_partial_bytes = 0;
294
0
  mqh->mqh_expected_bytes = 0;
295
0
  mqh->mqh_length_word_complete = false;
296
0
  mqh->mqh_counterparty_attached = false;
297
0
  mqh->mqh_context = GetCurrentMemoryContext();
298
299
0
  if (seg != NULL)
300
0
    on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
301
302
0
  return mqh;
303
0
}
304
305
/*
306
 * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
307
 * been passed to shm_mq_attach.
308
 */
309
void
310
shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
311
0
{
312
0
  Assert(mqh->mqh_handle == NULL);
313
0
  mqh->mqh_handle = handle;
314
0
}
315
316
/*
317
 * Write a message into a shared message queue.
318
 */
319
shm_mq_result
320
shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
321
0
{
322
0
  shm_mq_iovec iov;
323
324
0
  iov.data = data;
325
0
  iov.len = nbytes;
326
327
0
  return shm_mq_sendv(mqh, &iov, 1, nowait);
328
0
}
329
330
/*
331
 * Write a message into a shared message queue, gathered from multiple
332
 * addresses.
333
 *
334
 * When nowait = false, we'll wait on our process latch when the ring buffer
335
 * fills up, and then continue writing once the receiver has drained some data.
336
 * The process latch is reset after each wait.
337
 *
338
 * When nowait = true, we do not manipulate the state of the process latch;
339
 * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK.  In
340
 * this case, the caller should call this function again, with the same
341
 * arguments, each time the process latch is set.  (Once begun, the sending
342
 * of a message cannot be aborted except by detaching from the queue; changing
343
 * the length or payload will corrupt the queue.)
344
 */
345
shm_mq_result
346
shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
347
0
{
348
0
  shm_mq_result res;
349
0
  shm_mq     *mq = mqh->mqh_queue;
350
0
  PGPROC     *receiver;
351
0
  Size    nbytes = 0;
352
0
  Size    bytes_written;
353
0
  int     i;
354
0
  int     which_iov = 0;
355
0
  Size    offset;
356
357
0
  Assert(mq->mq_sender == MyProc);
358
359
  /* Compute total size of write. */
360
0
  for (i = 0; i < iovcnt; ++i)
361
0
    nbytes += iov[i].len;
362
363
  /* Try to write, or finish writing, the length word into the buffer. */
364
0
  while (!mqh->mqh_length_word_complete)
365
0
  {
366
0
    Assert(mqh->mqh_partial_bytes < sizeof(Size));
367
0
    res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
368
0
                ((char *) &nbytes) + mqh->mqh_partial_bytes,
369
0
                nowait, &bytes_written);
370
371
0
    if (res == SHM_MQ_DETACHED)
372
0
    {
373
      /* Reset state in case caller tries to send another message. */
374
0
      mqh->mqh_partial_bytes = 0;
375
0
      mqh->mqh_length_word_complete = false;
376
0
      return res;
377
0
    }
378
0
    mqh->mqh_partial_bytes += bytes_written;
379
380
0
    if (mqh->mqh_partial_bytes >= sizeof(Size))
381
0
    {
382
0
      Assert(mqh->mqh_partial_bytes == sizeof(Size));
383
384
0
      mqh->mqh_partial_bytes = 0;
385
0
      mqh->mqh_length_word_complete = true;
386
0
    }
387
388
0
    if (res != SHM_MQ_SUCCESS)
389
0
      return res;
390
391
    /* Length word can't be split unless bigger than required alignment. */
392
0
    Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
393
0
  }
394
395
  /* Write the actual data bytes into the buffer. */
396
0
  Assert(mqh->mqh_partial_bytes <= nbytes);
397
0
  offset = mqh->mqh_partial_bytes;
398
0
  do
399
0
  {
400
0
    Size    chunksize;
401
402
    /* Figure out which bytes need to be sent next. */
403
0
    if (offset >= iov[which_iov].len)
404
0
    {
405
0
      offset -= iov[which_iov].len;
406
0
      ++which_iov;
407
0
      if (which_iov >= iovcnt)
408
0
        break;
409
0
      continue;
410
0
    }
411
412
    /*
413
     * We want to avoid copying the data if at all possible, but every
414
     * chunk of bytes we write into the queue has to be MAXALIGN'd, except
415
     * the last.  Thus, if a chunk other than the last one ends on a
416
     * non-MAXALIGN'd boundary, we have to combine the tail end of its
417
     * data with data from one or more following chunks until we either
418
     * reach the last chunk or accumulate a number of bytes which is
419
     * MAXALIGN'd.
420
     */
421
0
    if (which_iov + 1 < iovcnt &&
422
0
      offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
423
0
    {
424
0
      char    tmpbuf[MAXIMUM_ALIGNOF];
425
0
      int     j = 0;
426
427
0
      for (;;)
428
0
      {
429
0
        if (offset < iov[which_iov].len)
430
0
        {
431
0
          tmpbuf[j] = iov[which_iov].data[offset];
432
0
          j++;
433
0
          offset++;
434
0
          if (j == MAXIMUM_ALIGNOF)
435
0
            break;
436
0
        }
437
0
        else
438
0
        {
439
0
          offset -= iov[which_iov].len;
440
0
          which_iov++;
441
0
          if (which_iov >= iovcnt)
442
0
            break;
443
0
        }
444
0
      }
445
446
0
      res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
447
448
0
      if (res == SHM_MQ_DETACHED)
449
0
      {
450
        /* Reset state in case caller tries to send another message. */
451
0
        mqh->mqh_partial_bytes = 0;
452
0
        mqh->mqh_length_word_complete = false;
453
0
        return res;
454
0
      }
455
456
0
      mqh->mqh_partial_bytes += bytes_written;
457
0
      if (res != SHM_MQ_SUCCESS)
458
0
        return res;
459
0
      continue;
460
0
    }
461
462
    /*
463
     * If this is the last chunk, we can write all the data, even if it
464
     * isn't a multiple of MAXIMUM_ALIGNOF.  Otherwise, we need to
465
     * MAXALIGN_DOWN the write size.
466
     */
467
0
    chunksize = iov[which_iov].len - offset;
468
0
    if (which_iov + 1 < iovcnt)
469
0
      chunksize = MAXALIGN_DOWN(chunksize);
470
0
    res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
471
0
                nowait, &bytes_written);
472
473
0
    if (res == SHM_MQ_DETACHED)
474
0
    {
475
      /* Reset state in case caller tries to send another message. */
476
0
      mqh->mqh_length_word_complete = false;
477
0
      mqh->mqh_partial_bytes = 0;
478
0
      return res;
479
0
    }
480
481
0
    mqh->mqh_partial_bytes += bytes_written;
482
0
    offset += bytes_written;
483
0
    if (res != SHM_MQ_SUCCESS)
484
0
      return res;
485
0
  } while (mqh->mqh_partial_bytes < nbytes);
486
487
  /* Reset for next message. */
488
0
  mqh->mqh_partial_bytes = 0;
489
0
  mqh->mqh_length_word_complete = false;
490
491
  /* If queue has been detached, let caller know. */
492
0
  if (mq->mq_detached)
493
0
    return SHM_MQ_DETACHED;
494
495
  /*
496
   * If the counterparty is known to have attached, we can read mq_receiver
497
   * without acquiring the spinlock and assume it isn't NULL.  Otherwise,
498
   * more caution is needed.
499
   */
500
0
  if (mqh->mqh_counterparty_attached)
501
0
    receiver = mq->mq_receiver;
502
0
  else
503
0
  {
504
0
    SpinLockAcquire(&mq->mq_mutex);
505
0
    receiver = mq->mq_receiver;
506
0
    SpinLockRelease(&mq->mq_mutex);
507
0
    if (receiver == NULL)
508
0
      return SHM_MQ_SUCCESS;
509
0
    mqh->mqh_counterparty_attached = true;
510
0
  }
511
512
  /* Notify receiver of the newly-written data, and return. */
513
0
  SetLatch(&receiver->procLatch);
514
0
  return SHM_MQ_SUCCESS;
515
0
}
516
517
/*
518
 * Receive a message from a shared message queue.
519
 *
520
 * We set *nbytes to the message length and *data to point to the message
521
 * payload.  If the entire message exists in the queue as a single,
522
 * contiguous chunk, *data will point directly into shared memory; otherwise,
523
 * it will point to a temporary buffer.  This mostly avoids data copying in
524
 * the hoped-for case where messages are short compared to the buffer size,
525
 * while still allowing longer messages.  In either case, the return value
526
 * remains valid until the next receive operation is performed on the queue.
527
 *
528
 * When nowait = false, we'll wait on our process latch when the ring buffer
529
 * is empty and we have not yet received a full message.  The sender will
530
 * set our process latch after more data has been written, and we'll resume
531
 * processing.  Each call will therefore return a complete message
532
 * (unless the sender detaches the queue).
533
 *
534
 * When nowait = true, we do not manipulate the state of the process latch;
535
 * instead, whenever the buffer is empty and we need to read from it, we
536
 * return SHM_MQ_WOULD_BLOCK.  In this case, the caller should call this
537
 * function again after the process latch has been set.
538
 */
539
shm_mq_result
540
shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
541
0
{
542
0
  shm_mq     *mq = mqh->mqh_queue;
543
0
  shm_mq_result res;
544
0
  Size    rb = 0;
545
0
  Size    nbytes;
546
0
  void     *rawdata;
547
548
0
  Assert(mq->mq_receiver == MyProc);
549
550
  /* We can't receive data until the sender has attached. */
551
0
  if (!mqh->mqh_counterparty_attached)
552
0
  {
553
0
    if (nowait)
554
0
    {
555
0
      int     counterparty_gone;
556
557
      /*
558
       * We shouldn't return at this point at all unless the sender
559
       * hasn't attached yet.  However, the correct return value depends
560
       * on whether the sender is still attached.  If we first test
561
       * whether the sender has ever attached and then test whether the
562
       * sender has detached, there's a race condition: a sender that
563
       * attaches and detaches very quickly might fool us into thinking
564
       * the sender never attached at all.  So, test whether our
565
       * counterparty is definitively gone first, and only afterwards
566
       * check whether the sender ever attached in the first place.
567
       */
568
0
      counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
569
0
      if (shm_mq_get_sender(mq) == NULL)
570
0
      {
571
0
        if (counterparty_gone)
572
0
          return SHM_MQ_DETACHED;
573
0
        else
574
0
          return SHM_MQ_WOULD_BLOCK;
575
0
      }
576
0
    }
577
0
    else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
578
0
         && shm_mq_get_sender(mq) == NULL)
579
0
    {
580
0
      mq->mq_detached = true;
581
0
      return SHM_MQ_DETACHED;
582
0
    }
583
0
    mqh->mqh_counterparty_attached = true;
584
0
  }
585
586
  /*
587
   * If we've consumed an amount of data greater than 1/4th of the ring
588
   * size, mark it consumed in shared memory.  We try to avoid doing this
589
   * unnecessarily when only a small amount of data has been consumed,
590
   * because SetLatch() is fairly expensive and we don't want to do it too
591
   * often.
592
   */
593
0
  if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
594
0
  {
595
0
    shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
596
0
    mqh->mqh_consume_pending = 0;
597
0
  }
598
599
  /* Try to read, or finish reading, the length word from the buffer. */
600
0
  while (!mqh->mqh_length_word_complete)
601
0
  {
602
    /* Try to receive the message length word. */
603
0
    Assert(mqh->mqh_partial_bytes < sizeof(Size));
604
0
    res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
605
0
                   nowait, &rb, &rawdata);
606
0
    if (res != SHM_MQ_SUCCESS)
607
0
      return res;
608
609
    /*
610
     * Hopefully, we'll receive the entire message length word at once.
611
     * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
612
     * multiple reads.
613
     */
614
0
    if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
615
0
    {
616
0
      Size    needed;
617
618
0
      nbytes = *(Size *) rawdata;
619
620
      /* If we've already got the whole message, we're done. */
621
0
      needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
622
0
      if (rb >= needed)
623
0
      {
624
0
        mqh->mqh_consume_pending += needed;
625
0
        *nbytesp = nbytes;
626
0
        *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
627
0
        return SHM_MQ_SUCCESS;
628
0
      }
629
630
      /*
631
       * We don't have the whole message, but we at least have the whole
632
       * length word.
633
       */
634
0
      mqh->mqh_expected_bytes = nbytes;
635
0
      mqh->mqh_length_word_complete = true;
636
0
      mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
637
0
      rb -= MAXALIGN(sizeof(Size));
638
0
    }
639
0
    else
640
0
    {
641
0
      Size    lengthbytes;
642
643
      /* Can't be split unless bigger than required alignment. */
644
0
      Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
645
646
      /* Message word is split; need buffer to reassemble. */
647
0
      if (mqh->mqh_buffer == NULL)
648
0
      {
649
0
        mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
650
0
                           MQH_INITIAL_BUFSIZE);
651
0
        mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
652
0
      }
653
0
      Assert(mqh->mqh_buflen >= sizeof(Size));
654
655
      /* Copy partial length word; remember to consume it. */
656
0
      if (mqh->mqh_partial_bytes + rb > sizeof(Size))
657
0
        lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
658
0
      else
659
0
        lengthbytes = rb;
660
0
      memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
661
0
           lengthbytes);
662
0
      mqh->mqh_partial_bytes += lengthbytes;
663
0
      mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
664
0
      rb -= lengthbytes;
665
666
      /* If we now have the whole word, we're ready to read payload. */
667
0
      if (mqh->mqh_partial_bytes >= sizeof(Size))
668
0
      {
669
0
        Assert(mqh->mqh_partial_bytes == sizeof(Size));
670
0
        mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
671
0
        mqh->mqh_length_word_complete = true;
672
0
        mqh->mqh_partial_bytes = 0;
673
0
      }
674
0
    }
675
0
  }
676
0
  nbytes = mqh->mqh_expected_bytes;
677
678
0
  if (mqh->mqh_partial_bytes == 0)
679
0
  {
680
    /*
681
     * Try to obtain the whole message in a single chunk.  If this works,
682
     * we need not copy the data and can return a pointer directly into
683
     * shared memory.
684
     */
685
0
    res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
686
0
    if (res != SHM_MQ_SUCCESS)
687
0
      return res;
688
0
    if (rb >= nbytes)
689
0
    {
690
0
      mqh->mqh_length_word_complete = false;
691
0
      mqh->mqh_consume_pending += MAXALIGN(nbytes);
692
0
      *nbytesp = nbytes;
693
0
      *datap = rawdata;
694
0
      return SHM_MQ_SUCCESS;
695
0
    }
696
697
    /*
698
     * The message has wrapped the buffer.  We'll need to copy it in order
699
     * to return it to the client in one chunk.  First, make sure we have
700
     * a large enough buffer available.
701
     */
702
0
    if (mqh->mqh_buflen < nbytes)
703
0
    {
704
0
      Size    newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
705
706
0
      while (newbuflen < nbytes)
707
0
        newbuflen *= 2;
708
709
0
      if (mqh->mqh_buffer != NULL)
710
0
      {
711
0
        pfree(mqh->mqh_buffer);
712
0
        mqh->mqh_buffer = NULL;
713
0
        mqh->mqh_buflen = 0;
714
0
      }
715
0
      mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
716
0
      mqh->mqh_buflen = newbuflen;
717
0
    }
718
0
  }
719
720
  /* Loop until we've copied the entire message. */
721
0
  for (;;)
722
0
  {
723
0
    Size    still_needed;
724
725
    /* Copy as much as we can. */
726
0
    Assert(mqh->mqh_partial_bytes + rb <= nbytes);
727
0
    memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
728
0
    mqh->mqh_partial_bytes += rb;
729
730
    /*
731
     * Update count of bytes that can be consumed, accounting for
732
     * alignment padding.  Note that this will never actually insert any
733
     * padding except at the end of a message, because the buffer size is
734
     * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
735
     */
736
0
    Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
737
0
    mqh->mqh_consume_pending += MAXALIGN(rb);
738
739
    /* If we got all the data, exit the loop. */
740
0
    if (mqh->mqh_partial_bytes >= nbytes)
741
0
      break;
742
743
    /* Wait for some more data. */
744
0
    still_needed = nbytes - mqh->mqh_partial_bytes;
745
0
    res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
746
0
    if (res != SHM_MQ_SUCCESS)
747
0
      return res;
748
0
    if (rb > still_needed)
749
0
      rb = still_needed;
750
0
  }
751
752
  /* Return the complete message, and reset for next message. */
753
0
  *nbytesp = nbytes;
754
0
  *datap = mqh->mqh_buffer;
755
0
  mqh->mqh_length_word_complete = false;
756
0
  mqh->mqh_partial_bytes = 0;
757
0
  return SHM_MQ_SUCCESS;
758
0
}
759
760
/*
761
 * Wait for the other process that's supposed to use this queue to attach
762
 * to it.
763
 *
764
 * The return value is SHM_MQ_DETACHED if the worker has already detached or
765
 * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
766
 * Note that we will only be able to detect that the worker has died before
767
 * attaching if a background worker handle was passed to shm_mq_attach().
768
 */
769
shm_mq_result
770
shm_mq_wait_for_attach(shm_mq_handle *mqh)
771
0
{
772
0
  shm_mq     *mq = mqh->mqh_queue;
773
0
  PGPROC    **victim;
774
775
0
  if (shm_mq_get_receiver(mq) == MyProc)
776
0
    victim = &mq->mq_sender;
777
0
  else
778
0
  {
779
0
    Assert(shm_mq_get_sender(mq) == MyProc);
780
0
    victim = &mq->mq_receiver;
781
0
  }
782
783
0
  if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
784
0
    return SHM_MQ_SUCCESS;
785
0
  else
786
0
    return SHM_MQ_DETACHED;
787
0
}
788
789
/*
790
 * Detach from a shared message queue, and destroy the shm_mq_handle.
791
 */
792
void
793
shm_mq_detach(shm_mq_handle *mqh)
794
0
{
795
  /* Notify counterparty that we're outta here. */
796
0
  shm_mq_detach_internal(mqh->mqh_queue);
797
798
  /* Cancel on_dsm_detach callback, if any. */
799
0
  if (mqh->mqh_segment)
800
0
    cancel_on_dsm_detach(mqh->mqh_segment,
801
0
               shm_mq_detach_callback,
802
0
               PointerGetDatum(mqh->mqh_queue));
803
804
  /* Release local memory associated with handle. */
805
0
  if (mqh->mqh_buffer != NULL)
806
0
    pfree(mqh->mqh_buffer);
807
0
  pfree(mqh);
808
0
}
809
810
/*
811
 * Notify counterparty that we're detaching from shared message queue.
812
 *
813
 * The purpose of this function is to make sure that the process
814
 * with which we're communicating doesn't block forever waiting for us to
815
 * fill or drain the queue once we've lost interest.  When the sender
816
 * detaches, the receiver can read any messages remaining in the queue;
817
 * further reads will return SHM_MQ_DETACHED.  If the receiver detaches,
818
 * further attempts to send messages will likewise return SHM_MQ_DETACHED.
819
 *
820
 * This is separated out from shm_mq_detach() because if the on_dsm_detach
821
 * callback fires, we only want to do this much.  We do not try to touch
822
 * the local shm_mq_handle, as it may have been pfree'd already.
823
 */
824
static void
825
shm_mq_detach_internal(shm_mq *mq)
826
0
{
827
0
  PGPROC     *victim;
828
829
0
  SpinLockAcquire(&mq->mq_mutex);
830
0
  if (mq->mq_sender == MyProc)
831
0
    victim = mq->mq_receiver;
832
0
  else
833
0
  {
834
0
    Assert(mq->mq_receiver == MyProc);
835
0
    victim = mq->mq_sender;
836
0
  }
837
0
  mq->mq_detached = true;
838
0
  SpinLockRelease(&mq->mq_mutex);
839
840
0
  if (victim != NULL)
841
0
    SetLatch(&victim->procLatch);
842
0
}
843
844
/*
845
 * Get the shm_mq from handle.
846
 */
847
shm_mq *
848
shm_mq_get_queue(shm_mq_handle *mqh)
849
0
{
850
0
  return mqh->mqh_queue;
851
0
}
852
853
/*
854
 * Write bytes into a shared message queue.
855
 */
856
static shm_mq_result
857
shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
858
          bool nowait, Size *bytes_written)
859
0
{
860
0
  shm_mq     *mq = mqh->mqh_queue;
861
0
  Size    sent = 0;
862
0
  uint64    used;
863
0
  Size    ringsize = mq->mq_ring_size;
864
0
  Size    available;
865
866
0
  while (sent < nbytes)
867
0
  {
868
0
    uint64    rb;
869
0
    uint64    wb;
870
871
    /* Compute number of ring buffer bytes used and available. */
872
0
    rb = pg_atomic_read_u64(&mq->mq_bytes_read);
873
0
    wb = pg_atomic_read_u64(&mq->mq_bytes_written);
874
0
    Assert(wb >= rb);
875
0
    used = wb - rb;
876
0
    Assert(used <= ringsize);
877
0
    available = Min(ringsize - used, nbytes - sent);
878
879
    /*
880
     * Bail out if the queue has been detached.  Note that we would be in
881
     * trouble if the compiler decided to cache the value of
882
     * mq->mq_detached in a register or on the stack across loop
883
     * iterations.  It probably shouldn't do that anyway since we'll
884
     * always return, call an external function that performs a system
885
     * call, or reach a memory barrier at some point later in the loop,
886
     * but just to be sure, insert a compiler barrier here.
887
     */
888
0
    pg_compiler_barrier();
889
0
    if (mq->mq_detached)
890
0
    {
891
0
      *bytes_written = sent;
892
0
      return SHM_MQ_DETACHED;
893
0
    }
894
895
0
    if (available == 0 && !mqh->mqh_counterparty_attached)
896
0
    {
897
      /*
898
       * The queue is full, so if the receiver isn't yet known to be
899
       * attached, we must wait for that to happen.
900
       */
901
0
      if (nowait)
902
0
      {
903
0
        if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
904
0
        {
905
0
          *bytes_written = sent;
906
0
          return SHM_MQ_DETACHED;
907
0
        }
908
0
        if (shm_mq_get_receiver(mq) == NULL)
909
0
        {
910
0
          *bytes_written = sent;
911
0
          return SHM_MQ_WOULD_BLOCK;
912
0
        }
913
0
      }
914
0
      else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
915
0
                       mqh->mqh_handle))
916
0
      {
917
0
        mq->mq_detached = true;
918
0
        *bytes_written = sent;
919
0
        return SHM_MQ_DETACHED;
920
0
      }
921
0
      mqh->mqh_counterparty_attached = true;
922
923
      /*
924
       * The receiver may have read some data after attaching, so we
925
       * must not wait without rechecking the queue state.
926
       */
927
0
    }
928
0
    else if (available == 0)
929
0
    {
930
      /*
931
       * Since mq->mqh_counterparty_attached is known to be true at this
932
       * point, mq_receiver has been set, and it can't change once set.
933
       * Therefore, we can read it without acquiring the spinlock.
934
       */
935
0
      Assert(mqh->mqh_counterparty_attached);
936
0
      SetLatch(&mq->mq_receiver->procLatch);
937
938
      /* Skip manipulation of our latch if nowait = true. */
939
0
      if (nowait)
940
0
      {
941
0
        *bytes_written = sent;
942
0
        return SHM_MQ_WOULD_BLOCK;
943
0
      }
944
945
      /*
946
       * Wait for our latch to be set.  It might already be set for some
947
       * unrelated reason, but that'll just result in one extra trip
948
       * through the loop.  It's worth it to avoid resetting the latch
949
       * at top of loop, because setting an already-set latch is much
950
       * cheaper than setting one that has been reset.
951
       */
952
0
      WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_SEND);
953
954
      /* Reset the latch so we don't spin. */
955
0
      ResetLatch(MyLatch);
956
957
      /* An interrupt may have occurred while we were waiting. */
958
0
      CHECK_FOR_INTERRUPTS();
959
0
    }
960
0
    else
961
0
    {
962
0
      Size    offset;
963
0
      Size    sendnow;
964
965
0
      offset = wb % (uint64) ringsize;
966
0
      sendnow = Min(available, ringsize - offset);
967
968
      /*
969
       * Write as much data as we can via a single memcpy(). Make sure
970
       * these writes happen after the read of mq_bytes_read, above.
971
       * This barrier pairs with the one in shm_mq_inc_bytes_read.
972
       * (Since we're separating the read of mq_bytes_read from a
973
       * subsequent write to mq_ring, we need a full barrier here.)
974
       */
975
0
      pg_memory_barrier();
976
0
      memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
977
0
           (char *) data + sent, sendnow);
978
0
      sent += sendnow;
979
980
      /*
981
       * Update count of bytes written, with alignment padding.  Note
982
       * that this will never actually insert any padding except at the
983
       * end of a run of bytes, because the buffer size is a multiple of
984
       * MAXIMUM_ALIGNOF, and each read is as well.
985
       */
986
0
      Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
987
0
      shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
988
989
      /*
990
       * For efficiency, we don't set the reader's latch here.  We'll do
991
       * that only when the buffer fills up or after writing an entire
992
       * message.
993
       */
994
0
    }
995
0
  }
996
997
0
  *bytes_written = sent;
998
0
  return SHM_MQ_SUCCESS;
999
0
}
1000
1001
/*
1002
 * Wait until at least *nbytesp bytes are available to be read from the
1003
 * shared message queue, or until the buffer wraps around.  If the queue is
1004
 * detached, returns SHM_MQ_DETACHED.  If nowait is specified and a wait
1005
 * would be required, returns SHM_MQ_WOULD_BLOCK.  Otherwise, *datap is set
1006
 * to the location at which data bytes can be read, *nbytesp is set to the
1007
 * number of bytes which can be read at that address, and the return value
1008
 * is SHM_MQ_SUCCESS.
1009
 */
1010
static shm_mq_result
1011
shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
1012
           Size *nbytesp, void **datap)
1013
0
{
1014
0
  shm_mq     *mq = mqh->mqh_queue;
1015
0
  Size    ringsize = mq->mq_ring_size;
1016
0
  uint64    used;
1017
0
  uint64    written;
1018
1019
0
  for (;;)
1020
0
  {
1021
0
    Size    offset;
1022
0
    uint64    read;
1023
1024
    /* Get bytes written, so we can compute what's available to read. */
1025
0
    written = pg_atomic_read_u64(&mq->mq_bytes_written);
1026
1027
    /*
1028
     * Get bytes read.  Include bytes we could consume but have not yet
1029
     * consumed.
1030
     */
1031
0
    read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1032
0
      mqh->mqh_consume_pending;
1033
0
    used = written - read;
1034
0
    Assert(used <= ringsize);
1035
0
    offset = read % (uint64) ringsize;
1036
1037
    /* If we have enough data or buffer has wrapped, we're done. */
1038
0
    if (used >= bytes_needed || offset + used >= ringsize)
1039
0
    {
1040
0
      *nbytesp = Min(used, ringsize - offset);
1041
0
      *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1042
1043
      /*
1044
       * Separate the read of mq_bytes_written, above, from caller's
1045
       * attempt to read the data itself.  Pairs with the barrier in
1046
       * shm_mq_inc_bytes_written.
1047
       */
1048
0
      pg_read_barrier();
1049
0
      return SHM_MQ_SUCCESS;
1050
0
    }
1051
1052
    /*
1053
     * Fall out before waiting if the queue has been detached.
1054
     *
1055
     * Note that we don't check for this until *after* considering whether
1056
     * the data already available is enough, since the receiver can finish
1057
     * receiving a message stored in the buffer even after the sender has
1058
     * detached.
1059
     */
1060
0
    if (mq->mq_detached)
1061
0
    {
1062
      /*
1063
       * If the writer advanced mq_bytes_written and then set
1064
       * mq_detached, we might not have read the final value of
1065
       * mq_bytes_written above.  Insert a read barrier and then check
1066
       * again if mq_bytes_written has advanced.
1067
       */
1068
0
      pg_read_barrier();
1069
0
      if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1070
0
        continue;
1071
1072
0
      return SHM_MQ_DETACHED;
1073
0
    }
1074
1075
    /*
1076
     * We didn't get enough data to satisfy the request, so mark any data
1077
     * previously-consumed as read to make more buffer space.
1078
     */
1079
0
    if (mqh->mqh_consume_pending > 0)
1080
0
    {
1081
0
      shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
1082
0
      mqh->mqh_consume_pending = 0;
1083
0
    }
1084
1085
    /* Skip manipulation of our latch if nowait = true. */
1086
0
    if (nowait)
1087
0
      return SHM_MQ_WOULD_BLOCK;
1088
1089
    /*
1090
     * Wait for our latch to be set.  It might already be set for some
1091
     * unrelated reason, but that'll just result in one extra trip through
1092
     * the loop.  It's worth it to avoid resetting the latch at top of
1093
     * loop, because setting an already-set latch is much cheaper than
1094
     * setting one that has been reset.
1095
     */
1096
0
    WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_RECEIVE);
1097
1098
    /* Reset the latch so we don't spin. */
1099
0
    ResetLatch(MyLatch);
1100
1101
    /* An interrupt may have occurred while we were waiting. */
1102
0
    CHECK_FOR_INTERRUPTS();
1103
0
  }
1104
0
}
1105
1106
/*
1107
 * Test whether a counterparty who may not even be alive yet is definitely gone.
1108
 */
1109
static bool
1110
shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
1111
0
{
1112
0
  pid_t   pid;
1113
1114
  /* If the queue has been detached, counterparty is definitely gone. */
1115
0
  if (mq->mq_detached)
1116
0
    return true;
1117
1118
  /* If there's a handle, check worker status. */
1119
0
  if (handle != NULL)
1120
0
  {
1121
0
    BgwHandleStatus status;
1122
1123
    /* Check for unexpected worker death. */
1124
0
    status = GetBackgroundWorkerPid(handle, &pid);
1125
0
    if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1126
0
    {
1127
      /* Mark it detached, just to make it official. */
1128
0
      mq->mq_detached = true;
1129
0
      return true;
1130
0
    }
1131
0
  }
1132
1133
  /* Counterparty is not definitively gone. */
1134
0
  return false;
1135
0
}
1136
1137
/*
1138
 * This is used when a process is waiting for its counterpart to attach to the
1139
 * queue.  We exit when the other process attaches as expected, or, if
1140
 * handle != NULL, when the referenced background process or the postmaster
1141
 * dies.  Note that if handle == NULL, and the process fails to attach, we'll
1142
 * potentially get stuck here forever waiting for a process that may never
1143
 * start.  We do check for interrupts, though.
1144
 *
1145
 * ptr is a pointer to the memory address that we're expecting to become
1146
 * non-NULL when our counterpart attaches to the queue.
1147
 */
1148
static bool
1149
shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
1150
0
{
1151
0
  bool    result = false;
1152
1153
0
  for (;;)
1154
0
  {
1155
0
    BgwHandleStatus status;
1156
0
    pid_t   pid;
1157
1158
    /* Acquire the lock just long enough to check the pointer. */
1159
0
    SpinLockAcquire(&mq->mq_mutex);
1160
0
    result = (*ptr != NULL);
1161
0
    SpinLockRelease(&mq->mq_mutex);
1162
1163
    /* Fail if detached; else succeed if initialized. */
1164
0
    if (mq->mq_detached)
1165
0
    {
1166
0
      result = false;
1167
0
      break;
1168
0
    }
1169
0
    if (result)
1170
0
      break;
1171
1172
0
    if (handle != NULL)
1173
0
    {
1174
      /* Check for unexpected worker death. */
1175
0
      status = GetBackgroundWorkerPid(handle, &pid);
1176
0
      if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1177
0
      {
1178
0
        result = false;
1179
0
        break;
1180
0
      }
1181
0
    }
1182
1183
    /* Wait to be signalled. */
1184
0
    WaitLatch(MyLatch, WL_LATCH_SET, 0, WAIT_EVENT_MQ_INTERNAL);
1185
1186
    /* Reset the latch so we don't spin. */
1187
0
    ResetLatch(MyLatch);
1188
1189
    /* An interrupt may have occurred while we were waiting. */
1190
0
    CHECK_FOR_INTERRUPTS();
1191
0
  }
1192
1193
0
  return result;
1194
0
}
1195
1196
/*
1197
 * Increment the number of bytes read.
1198
 */
1199
static void
1200
shm_mq_inc_bytes_read(shm_mq *mq, Size n)
1201
0
{
1202
0
  PGPROC     *sender;
1203
1204
  /*
1205
   * Separate prior reads of mq_ring from the increment of mq_bytes_read
1206
   * which follows.  This pairs with the full barrier in
1207
   * shm_mq_send_bytes(). We only need a read barrier here because the
1208
   * increment of mq_bytes_read is actually a read followed by a dependent
1209
   * write.
1210
   */
1211
0
  pg_read_barrier();
1212
1213
  /*
1214
   * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1215
   * else can be changing this value.  This method should be cheaper.
1216
   */
1217
0
  pg_atomic_write_u64(&mq->mq_bytes_read,
1218
0
            pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1219
1220
  /*
1221
   * We shouldn't have any bytes to read without a sender, so we can read
1222
   * mq_sender here without a lock.  Once it's initialized, it can't change.
1223
   */
1224
0
  sender = mq->mq_sender;
1225
0
  Assert(sender != NULL);
1226
0
  SetLatch(&sender->procLatch);
1227
0
}
1228
1229
/*
1230
 * Increment the number of bytes written.
1231
 */
1232
static void
1233
shm_mq_inc_bytes_written(shm_mq *mq, Size n)
1234
0
{
1235
  /*
1236
   * Separate prior reads of mq_ring from the write of mq_bytes_written
1237
   * which we're about to do.  Pairs with the read barrier found in
1238
   * shm_mq_get_receive_bytes.
1239
   */
1240
0
  pg_write_barrier();
1241
1242
  /*
1243
   * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1244
   * else can be changing this value.  This method avoids taking the bus
1245
   * lock unnecessarily.
1246
   */
1247
0
  pg_atomic_write_u64(&mq->mq_bytes_written,
1248
0
            pg_atomic_read_u64(&mq->mq_bytes_written) + n);
1249
0
}
1250
1251
/* Shim for on_dsm_callback. */
1252
static void
1253
shm_mq_detach_callback(dsm_segment *seg, Datum arg)
1254
0
{
1255
0
  shm_mq     *mq = (shm_mq *) DatumGetPointer(arg);
1256
1257
0
  shm_mq_detach_internal(mq);
1258
0
}