YugabyteDB (2.13.0.0-b42, bfc6a6643e7399ac8a0e81d06a3ee6d6571b33ab)

Coverage Report

Created: 2022-03-09 17:30

/Users/deen/code/yugabyte-db/src/postgres/src/backend/storage/lmgr/lwlock.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * lwlock.c
4
 *    Lightweight lock manager
5
 *
6
 * Lightweight locks are intended primarily to provide mutual exclusion of
7
 * access to shared-memory data structures.  Therefore, they offer both
8
 * exclusive and shared lock modes (to support read/write and read-only
9
 * access to a shared object).  There are few other frammishes.  User-level
10
 * locking should be done with the full lock manager --- which depends on
11
 * LWLocks to protect its shared state.
12
 *
13
 * In addition to exclusive and shared modes, lightweight locks can be used to
14
 * wait until a variable changes value.  The variable is initially not set
15
 * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
16
 * value it was set to when the lock was released last, and can be updated
17
 * without releasing the lock by calling LWLockUpdateVar.  LWLockWaitForVar
18
 * waits for the variable to be updated, or until the lock is free.  When
19
 * releasing the lock with LWLockReleaseClearVar() the value can be set to an
20
 * appropriate value for a free lock.  The meaning of the variable is up to
21
 * the caller, the lightweight lock code just assigns and compares it.
22
 *
23
 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
24
 * Portions Copyright (c) 1994, Regents of the University of California
25
 *
26
 * IDENTIFICATION
27
 *    src/backend/storage/lmgr/lwlock.c
28
 *
29
 * NOTES:
30
 *
31
 * This used to be a pretty straight forward reader-writer lock
32
 * implementation, in which the internal state was protected by a
33
 * spinlock. Unfortunately the overhead of taking the spinlock proved to be
34
 * too high for workloads/locks that were taken in shared mode very
35
 * frequently. Often we were spinning in the (obviously exclusive) spinlock,
36
 * while trying to acquire a shared lock that was actually free.
37
 *
38
 * Thus a new implementation was devised that provides wait-free shared lock
39
 * acquisition for locks that aren't exclusively locked.
40
 *
41
 * The basic idea is to have a single atomic variable 'lockcount' instead of
42
 * the formerly separate shared and exclusive counters and to use atomic
43
 * operations to acquire the lock. That's fairly easy to do for plain
44
 * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
45
 * in the OS.
46
 *
47
 * For lock acquisition we use an atomic compare-and-exchange on the lockcount
48
 * variable. For exclusive lock we swap in a sentinel value
49
 * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
50
 *
51
 * To release the lock we use an atomic decrement to release the lock. If the
52
 * new value is zero (we get that atomically), we know we can/have to release
53
 * waiters.
54
 *
55
 * Obviously it is important that the sentinel value for exclusive locks
56
 * doesn't conflict with the maximum number of possible share lockers -
57
 * luckily MAX_BACKENDS makes that easily possible.
58
 *
59
 *
60
 * The attentive reader might have noticed that naively doing the above has a
61
 * glaring race condition: We try to lock using the atomic operations and
62
 * notice that we have to wait. Unfortunately by the time we have finished
63
 * queuing, the former locker very well might have already finished it's
64
 * work. That's problematic because we're now stuck waiting inside the OS.
65
66
 * To mitigate those races we use a two phased attempt at locking:
67
 *   Phase 1: Try to do it atomically, if we succeed, nice
68
 *   Phase 2: Add ourselves to the waitqueue of the lock
69
 *   Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
70
 *        the queue
71
 *   Phase 4: Sleep till wake-up, goto Phase 1
72
 *
73
 * This protects us against the problem from above as nobody can release too
74
 *    quick, before we're queued, since after Phase 2 we're already queued.
75
 * -------------------------------------------------------------------------
76
 */
77
#include "postgres.h"
78
79
#include "miscadmin.h"
80
#include "pgstat.h"
81
#include "pg_trace.h"
82
#include "postmaster/postmaster.h"
83
#include "replication/slot.h"
84
#include "storage/ipc.h"
85
#include "storage/predicate.h"
86
#include "storage/proc.h"
87
#include "storage/proclist.h"
88
#include "storage/spin.h"
89
#include "utils/memutils.h"
90
91
#ifdef LWLOCK_STATS
92
#include "utils/hsearch.h"
93
#endif
94
95
96
/* We use the ShmemLock spinlock to protect LWLockCounter */
97
extern slock_t *ShmemLock;
98
99
2.97M
#define LW_FLAG_HAS_WAITERS     ((uint32) 1 << 30)
100
99.6M
#define LW_FLAG_RELEASE_OK      ((uint32) 1 << 29)
101
61.4k
#define LW_FLAG_LOCKED        ((uint32) 1 << 28)
102
103
2.13M
#define LW_VAL_EXCLUSIVE      ((uint32) 1 << 24)
104
1.69M
#define LW_VAL_SHARED       1
105
106
639k
#define LW_LOCK_MASK        ((uint32) ((1 << 25)-1))
107
/* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
108
#define LW_SHARED_MASK        ((uint32) ((1 << 24)-1))
109
110
/*
111
 * This is indexed by tranche ID and stores the names of all tranches known
112
 * to the current backend.
113
 */
114
static const char **LWLockTrancheArray = NULL;
115
static int  LWLockTranchesAllocated = 0;
116
117
#define T_NAME(lock) \
118
  (LWLockTrancheArray[(lock)->tranche])
119
120
/*
121
 * This points to the main array of LWLocks in shared memory.  Backends inherit
122
 * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
123
 * where we have special measures to pass it down).
124
 */
125
LWLockPadded *MainLWLockArray = NULL;
126
127
/*
128
 * We use this structure to keep track of locked LWLocks for release
129
 * during error recovery.  Normally, only a few will be held at once, but
130
 * occasionally the number can be much higher; for example, the pg_buffercache
131
 * extension locks all buffer partitions simultaneously.
132
 */
133
1.48M
#define MAX_SIMUL_LWLOCKS 200
134
135
/* struct representing the LWLocks we're holding */
136
typedef struct LWLockHandle
137
{
138
  LWLock     *lock;
139
  LWLockMode  mode;
140
} LWLockHandle;
141
142
static int  num_held_lwlocks = 0;
143
static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
144
145
/* struct representing the LWLock tranche request for named tranche */
146
typedef struct NamedLWLockTrancheRequest
147
{
148
  char    tranche_name[NAMEDATALEN];
149
  int     num_lwlocks;
150
} NamedLWLockTrancheRequest;
151
152
NamedLWLockTrancheRequest *NamedLWLockTrancheRequestArray = NULL;
153
static int  NamedLWLockTrancheRequestsAllocated = 0;
154
int     NamedLWLockTrancheRequests = 0;
155
156
NamedLWLockTranche *NamedLWLockTrancheArray = NULL;
157
158
static bool lock_named_request_allowed = true;
159
160
static void InitializeLWLocks(void);
161
static void RegisterLWLockTranches(void);
162
163
static inline void LWLockReportWaitStart(LWLock *lock);
164
static inline void LWLockReportWaitEnd(void);
165
166
#ifdef LWLOCK_STATS
167
typedef struct lwlock_stats_key
168
{
169
  int     tranche;
170
  void     *instance;
171
}     lwlock_stats_key;
172
173
typedef struct lwlock_stats
174
{
175
  lwlock_stats_key key;
176
  int     sh_acquire_count;
177
  int     ex_acquire_count;
178
  int     block_count;
179
  int     dequeue_self_count;
180
  int     spin_delay_count;
181
}     lwlock_stats;
182
183
static HTAB *lwlock_stats_htab;
184
static lwlock_stats lwlock_stats_dummy;
185
#endif
186
187
#ifdef LOCK_DEBUG
188
bool    Trace_lwlocks = false;
189
190
inline static void
191
PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
192
{
193
  /* hide statement & context here, otherwise the log is just too verbose */
194
  if (Trace_lwlocks)
195
  {
196
    uint32    state = pg_atomic_read_u32(&lock->state);
197
198
    ereport(LOG,
199
        (errhidestmt(true),
200
         errhidecontext(true),
201
         errmsg_internal("%d: %s(%s %p): excl %u shared %u haswaiters %u waiters %u rOK %d",
202
                 MyProcPid,
203
                 where, T_NAME(lock), lock,
204
                 (state & LW_VAL_EXCLUSIVE) != 0,
205
                 state & LW_SHARED_MASK,
206
                 (state & LW_FLAG_HAS_WAITERS) != 0,
207
                 pg_atomic_read_u32(&lock->nwaiters),
208
                 (state & LW_FLAG_RELEASE_OK) != 0)));
209
  }
210
}
211
212
inline static void
213
LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
214
{
215
  /* hide statement & context here, otherwise the log is just too verbose */
216
  if (Trace_lwlocks)
217
  {
218
    ereport(LOG,
219
        (errhidestmt(true),
220
         errhidecontext(true),
221
         errmsg_internal("%s(%s %p): %s", where,
222
                 T_NAME(lock), lock, msg)));
223
  }
224
}
225
226
#else             /* not LOCK_DEBUG */
227
2.98M
#define PRINT_LWDEBUG(a,b,c) ((void)0)
228
1.48M
#define LOG_LWDEBUG(a,b,c) ((void)0)
229
#endif              /* LOCK_DEBUG */
230
231
#ifdef LWLOCK_STATS
232
233
static void init_lwlock_stats(void);
234
static void print_lwlock_stats(int code, Datum arg);
235
static lwlock_stats * get_lwlock_stats_entry(LWLock *lockid);
236
237
static void
238
init_lwlock_stats(void)
239
{
240
  HASHCTL   ctl;
241
  static MemoryContext lwlock_stats_cxt = NULL;
242
  static bool exit_registered = false;
243
244
  if (lwlock_stats_cxt != NULL)
245
    MemoryContextDelete(lwlock_stats_cxt);
246
247
  /*
248
   * The LWLock stats will be updated within a critical section, which
249
   * requires allocating new hash entries. Allocations within a critical
250
   * section are normally not allowed because running out of memory would
251
   * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally
252
   * turned on in production, so that's an acceptable risk. The hash entries
253
   * are small, so the risk of running out of memory is minimal in practice.
254
   */
255
  lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext,
256
                       "LWLock stats",
257
                       ALLOCSET_DEFAULT_SIZES);
258
  MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true);
259
260
  MemSet(&ctl, 0, sizeof(ctl));
261
  ctl.keysize = sizeof(lwlock_stats_key);
262
  ctl.entrysize = sizeof(lwlock_stats);
263
  ctl.hcxt = lwlock_stats_cxt;
264
  lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
265
                  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
266
  if (!exit_registered)
267
  {
268
    on_shmem_exit(print_lwlock_stats, 0);
269
    exit_registered = true;
270
  }
271
}
272
273
static void
274
print_lwlock_stats(int code, Datum arg)
275
{
276
  HASH_SEQ_STATUS scan;
277
  lwlock_stats *lwstats;
278
279
  hash_seq_init(&scan, lwlock_stats_htab);
280
281
  /* Grab an LWLock to keep different backends from mixing reports */
282
  LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
283
284
  while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
285
  {
286
    fprintf(stderr,
287
        "PID %d lwlock %s %p: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
288
        MyProcPid, LWLockTrancheArray[lwstats->key.tranche],
289
        lwstats->key.instance, lwstats->sh_acquire_count,
290
        lwstats->ex_acquire_count, lwstats->block_count,
291
        lwstats->spin_delay_count, lwstats->dequeue_self_count);
292
  }
293
294
  LWLockRelease(&MainLWLockArray[0].lock);
295
}
296
297
static lwlock_stats *
298
get_lwlock_stats_entry(LWLock *lock)
299
{
300
  lwlock_stats_key key;
301
  lwlock_stats *lwstats;
302
  bool    found;
303
304
  /*
305
   * During shared memory initialization, the hash table doesn't exist yet.
306
   * Stats of that phase aren't very interesting, so just collect operations
307
   * on all locks in a single dummy entry.
308
   */
309
  if (lwlock_stats_htab == NULL)
310
    return &lwlock_stats_dummy;
311
312
  /* Fetch or create the entry. */
313
  key.tranche = lock->tranche;
314
  key.instance = lock;
315
  lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
316
  if (!found)
317
  {
318
    lwstats->sh_acquire_count = 0;
319
    lwstats->ex_acquire_count = 0;
320
    lwstats->block_count = 0;
321
    lwstats->dequeue_self_count = 0;
322
    lwstats->spin_delay_count = 0;
323
  }
324
  return lwstats;
325
}
326
#endif              /* LWLOCK_STATS */
327
328
329
/*
330
 * Compute number of LWLocks required by named tranches.  These will be
331
 * allocated in the main array.
332
 */
333
static int
334
NumLWLocksByNamedTranches(void)
335
10.8k
{
336
10.8k
  int     numLocks = 0;
337
10.8k
  int     i;
338
339
13.5k
  for (i = 0; i < NamedLWLockTrancheRequests; i++)
340
2.70k
    numLocks += NamedLWLockTrancheRequestArray[i].num_lwlocks;
341
342
10.8k
  return numLocks;
343
10.8k
}
344
345
/*
346
 * Compute shmem space needed for LWLocks and named tranches.
347
 */
348
Size
349
LWLockShmemSize(void)
350
7.22k
{
351
7.22k
  Size    size;
352
7.22k
  int     i;
353
7.22k
  int     numLocks = NUM_FIXED_LWLOCKS;
354
355
7.22k
  numLocks += NumLWLocksByNamedTranches();
356
357
  /* Space for the LWLock array. */
358
7.22k
  size = mul_size(numLocks, sizeof(LWLockPadded));
359
360
  /* Space for dynamic allocation counter, plus room for alignment. */
361
7.22k
  size = add_size(size, sizeof(int) + LWLOCK_PADDED_SIZE);
362
363
  /* space for named tranches. */
364
7.22k
  size = add_size(size, mul_size(NamedLWLockTrancheRequests, sizeof(NamedLWLockTranche)));
365
366
  /* space for name of each tranche. */
367
9.03k
  for (i = 0; i < NamedLWLockTrancheRequests; i++)
368
1.80k
    size = add_size(size, strlen(NamedLWLockTrancheRequestArray[i].tranche_name) + 1);
369
370
  /* Disallow named LWLocks' requests after startup */
371
7.22k
  lock_named_request_allowed = false;
372
373
7.22k
  return size;
374
7.22k
}
375
376
/*
377
 * Allocate shmem space for the main LWLock array and all tranches and
378
 * initialize it.  We also register all the LWLock tranches here.
379
 */
380
void
381
CreateLWLocks(void)
382
3.61k
{
383
3.61k
  StaticAssertStmt(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS,
384
3.61k
           "MAX_BACKENDS too big for lwlock.c");
385
386
3.61k
  StaticAssertStmt(sizeof(LWLock) <= LWLOCK_MINIMAL_SIZE &&
387
3.61k
           sizeof(LWLock) <= LWLOCK_PADDED_SIZE,
388
3.61k
           "Miscalculated LWLock padding");
389
390
3.61k
  if (!IsUnderPostmaster)
391
3.61k
  {
392
3.61k
    Size    spaceLocks = LWLockShmemSize();
393
3.61k
    int      *LWLockCounter;
394
3.61k
    char     *ptr;
395
396
    /* Allocate space */
397
3.61k
    ptr = (char *) ShmemAlloc(spaceLocks);
398
399
    /* Leave room for dynamic allocation of tranches */
400
3.61k
    ptr += sizeof(int);
401
402
    /* Ensure desired alignment of LWLock array */
403
3.61k
    ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
404
405
3.61k
    MainLWLockArray = (LWLockPadded *) ptr;
406
407
    /*
408
     * Initialize the dynamic-allocation counter for tranches, which is
409
     * stored just before the first LWLock.
410
     */
411
3.61k
    LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
412
3.61k
    *LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED;
413
414
    /* Initialize all LWLocks */
415
3.61k
    InitializeLWLocks();
416
3.61k
  }
417
418
  /* Register all LWLock tranches */
419
3.61k
  RegisterLWLockTranches();
420
3.61k
}
421
422
/*
423
 * Initialize LWLocks that are fixed and those belonging to named tranches.
424
 */
425
static void
426
InitializeLWLocks(void)
427
3.61k
{
428
3.61k
  int     numNamedLocks = NumLWLocksByNamedTranches();
429
3.61k
  int     id;
430
3.61k
  int     i;
431
3.61k
  int     j;
432
3.61k
  LWLockPadded *lock;
433
434
  /* Initialize all individual LWLocks in main array */
435
169k
  for (id = 0, lock = MainLWLockArray; id < NUM_INDIVIDUAL_LWLOCKS; id++, lock++)
436
166k
    LWLockInitialize(&lock->lock, id);
437
438
  /* Initialize buffer mapping LWLocks in main array */
439
3.61k
  lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS;
440
466k
  for (id = 0; id < NUM_BUFFER_PARTITIONS; id++, lock++)
441
462k
    LWLockInitialize(&lock->lock, LWTRANCHE_BUFFER_MAPPING);
442
443
  /* Initialize lmgrs' LWLocks in main array */
444
3.61k
  lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS + NUM_BUFFER_PARTITIONS;
445
61.4k
  for (id = 0; id < NUM_LOCK_PARTITIONS; id++, lock++)
446
57.8k
    LWLockInitialize(&lock->lock, LWTRANCHE_LOCK_MANAGER);
447
448
  /* Initialize predicate lmgrs' LWLocks in main array */
449
3.61k
  lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS +
450
3.61k
    NUM_BUFFER_PARTITIONS + NUM_LOCK_PARTITIONS;
451
61.4k
  for (id = 0; id < NUM_PREDICATELOCK_PARTITIONS; id++, lock++)
452
57.8k
    LWLockInitialize(&lock->lock, LWTRANCHE_PREDICATE_LOCK_MANAGER);
453
454
  /* Initialize named tranches. */
455
3.61k
  if (NamedLWLockTrancheRequests > 0)
456
902
  {
457
902
    char     *trancheNames;
458
459
902
    NamedLWLockTrancheArray = (NamedLWLockTranche *)
460
902
      &MainLWLockArray[NUM_FIXED_LWLOCKS + numNamedLocks];
461
462
902
    trancheNames = (char *) NamedLWLockTrancheArray +
463
902
      (NamedLWLockTrancheRequests * sizeof(NamedLWLockTranche));
464
902
    lock = &MainLWLockArray[NUM_FIXED_LWLOCKS];
465
466
1.80k
    for (i = 0; i < NamedLWLockTrancheRequests; i++)
467
902
    {
468
902
      NamedLWLockTrancheRequest *request;
469
902
      NamedLWLockTranche *tranche;
470
902
      char     *name;
471
472
902
      request = &NamedLWLockTrancheRequestArray[i];
473
902
      tranche = &NamedLWLockTrancheArray[i];
474
475
902
      name = trancheNames;
476
902
      trancheNames += strlen(request->tranche_name) + 1;
477
902
      strcpy(name, request->tranche_name);
478
902
      tranche->trancheId = LWLockNewTrancheId();
479
902
      tranche->trancheName = name;
480
481
1.80k
      for (j = 0; j < request->num_lwlocks; j++, lock++)
482
902
        LWLockInitialize(&lock->lock, tranche->trancheId);
483
902
    }
484
902
  }
485
3.61k
}
486
487
/*
488
 * Register named tranches and tranches for fixed LWLocks.
489
 */
490
static void
491
RegisterLWLockTranches(void)
492
3.61k
{
493
3.61k
  int     i;
494
495
3.61k
  if (LWLockTrancheArray == NULL)
496
3.61k
  {
497
3.61k
    LWLockTranchesAllocated = 128;
498
3.61k
    LWLockTrancheArray = (const char **)
499
3.61k
      MemoryContextAllocZero(TopMemoryContext,
500
3.61k
                   LWLockTranchesAllocated * sizeof(char *));
501
3.61k
    Assert(LWLockTranchesAllocated >= LWTRANCHE_FIRST_USER_DEFINED);
502
3.61k
  }
503
504
169k
  for (i = 0; i < NUM_INDIVIDUAL_LWLOCKS; ++i)
505
166k
    LWLockRegisterTranche(i, MainLWLockNames[i]);
506
507
3.61k
  LWLockRegisterTranche(LWTRANCHE_BUFFER_MAPPING, "buffer_mapping");
508
3.61k
  LWLockRegisterTranche(LWTRANCHE_LOCK_MANAGER, "lock_manager");
509
3.61k
  LWLockRegisterTranche(LWTRANCHE_PREDICATE_LOCK_MANAGER,
510
3.61k
              "predicate_lock_manager");
511
3.61k
  LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA,
512
3.61k
              "parallel_query_dsa");
513
3.61k
  LWLockRegisterTranche(LWTRANCHE_SESSION_DSA,
514
3.61k
              "session_dsa");
515
3.61k
  LWLockRegisterTranche(LWTRANCHE_SESSION_RECORD_TABLE,
516
3.61k
              "session_record_table");
517
3.61k
  LWLockRegisterTranche(LWTRANCHE_SESSION_TYPMOD_TABLE,
518
3.61k
              "session_typmod_table");
519
3.61k
  LWLockRegisterTranche(LWTRANCHE_SHARED_TUPLESTORE,
520
3.61k
              "shared_tuplestore");
521
3.61k
  LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
522
3.61k
  LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append");
523
3.61k
  LWLockRegisterTranche(LWTRANCHE_PARALLEL_HASH_JOIN, "parallel_hash_join");
524
525
  /* Register named tranches. */
526
4.51k
  for (i = 0; i < NamedLWLockTrancheRequests; i++)
527
902
    LWLockRegisterTranche(NamedLWLockTrancheArray[i].trancheId,
528
902
                NamedLWLockTrancheArray[i].trancheName);
529
3.61k
}
530
531
/*
532
 * InitLWLockAccess - initialize backend-local state needed to hold LWLocks
533
 */
534
void
535
InitLWLockAccess(void)
536
3.45k
{
537
#ifdef LWLOCK_STATS
538
  init_lwlock_stats();
539
#endif
540
3.45k
}
541
542
/*
543
 * GetNamedLWLockTranche - returns the base address of LWLock from the
544
 *    specified tranche.
545
 *
546
 * Caller needs to retrieve the requested number of LWLocks starting from
547
 * the base lock address returned by this API.  This can be used for
548
 * tranches that are requested by using RequestNamedLWLockTranche() API.
549
 */
550
LWLockPadded *
551
GetNamedLWLockTranche(const char *tranche_name)
552
902
{
553
902
  int     lock_pos;
554
902
  int     i;
555
556
  /*
557
   * Obtain the position of base address of LWLock belonging to requested
558
   * tranche_name in MainLWLockArray.  LWLocks for named tranches are placed
559
   * in MainLWLockArray after fixed locks.
560
   */
561
902
  lock_pos = NUM_FIXED_LWLOCKS;
562
902
  for (i = 0; i < NamedLWLockTrancheRequests; i++)
563
902
  {
564
902
    if (strcmp(NamedLWLockTrancheRequestArray[i].tranche_name,
565
902
           tranche_name) == 0)
566
902
      return &MainLWLockArray[lock_pos];
567
568
0
    lock_pos += NamedLWLockTrancheRequestArray[i].num_lwlocks;
569
0
  }
570
571
0
  if (i >= NamedLWLockTrancheRequests)
572
0
    elog(ERROR, "requested tranche is not registered");
573
574
  /* just to keep compiler quiet */
575
0
  return NULL;
576
0
}
577
578
/*
579
 * Allocate a new tranche ID.
580
 */
581
int
582
LWLockNewTrancheId(void)
583
902
{
584
902
  int     result;
585
902
  int      *LWLockCounter;
586
587
902
  LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
588
902
  SpinLockAcquire(ShmemLock);
589
902
  result = (*LWLockCounter)++;
590
902
  SpinLockRelease(ShmemLock);
591
592
902
  return result;
593
902
}
594
595
/*
596
 * Register a tranche ID in the lookup table for the current process.  This
597
 * routine will save a pointer to the tranche name passed as an argument,
598
 * so the name should be allocated in a backend-lifetime context
599
 * (TopMemoryContext, static variable, or similar).
600
 */
601
void
602
LWLockRegisterTranche(int tranche_id, const char *tranche_name)
603
253k
{
604
253k
  Assert(LWLockTrancheArray != NULL);
605
606
253k
  if (tranche_id >= LWLockTranchesAllocated)
607
0
  {
608
0
    int     i = LWLockTranchesAllocated;
609
0
    int     j = LWLockTranchesAllocated;
610
611
0
    while (i <= tranche_id)
612
0
      i *= 2;
613
614
0
    LWLockTrancheArray = (const char **)
615
0
      repalloc(LWLockTrancheArray, i * sizeof(char *));
616
0
    LWLockTranchesAllocated = i;
617
0
    while (j < LWLockTranchesAllocated)
618
0
      LWLockTrancheArray[j++] = NULL;
619
0
  }
620
621
253k
  LWLockTrancheArray[tranche_id] = tranche_name;
622
253k
}
623
624
/*
625
 * RequestNamedLWLockTranche
626
 *    Request that extra LWLocks be allocated during postmaster
627
 *    startup.
628
 *
629
 * This is only useful for extensions if called from the _PG_init hook
630
 * of a library that is loaded into the postmaster via
631
 * shared_preload_libraries.  Once shared memory has been allocated, calls
632
 * will be ignored.  (We could raise an error, but it seems better to make
633
 * it a no-op, so that libraries containing such calls can be reloaded if
634
 * needed.)
635
 */
636
void
637
RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
638
902
{
639
902
  NamedLWLockTrancheRequest *request;
640
641
902
  if (IsUnderPostmaster || !lock_named_request_allowed)
642
0
    return;         /* too late */
643
644
902
  if (NamedLWLockTrancheRequestArray == NULL)
645
902
  {
646
902
    NamedLWLockTrancheRequestsAllocated = 16;
647
902
    NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
648
902
      MemoryContextAlloc(TopMemoryContext,
649
902
                 NamedLWLockTrancheRequestsAllocated
650
902
                 * sizeof(NamedLWLockTrancheRequest));
651
902
  }
652
653
902
  if (NamedLWLockTrancheRequests >= NamedLWLockTrancheRequestsAllocated)
654
0
  {
655
0
    int     i = NamedLWLockTrancheRequestsAllocated;
656
657
0
    while (i <= NamedLWLockTrancheRequests)
658
0
      i *= 2;
659
660
0
    NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
661
0
      repalloc(NamedLWLockTrancheRequestArray,
662
0
           i * sizeof(NamedLWLockTrancheRequest));
663
0
    NamedLWLockTrancheRequestsAllocated = i;
664
0
  }
665
666
902
  request = &NamedLWLockTrancheRequestArray[NamedLWLockTrancheRequests];
667
902
  Assert(strlen(tranche_name) + 1 < NAMEDATALEN);
668
902
  StrNCpy(request->tranche_name, tranche_name, NAMEDATALEN);
669
902
  request->num_lwlocks = num_lwlocks;
670
902
  NamedLWLockTrancheRequests++;
671
902
}
672
673
/*
674
 * LWLockInitialize - initialize a new lwlock; it's initially unlocked
675
 */
676
void
677
LWLockInitialize(LWLock *lock, int tranche_id)
678
96.6M
{
679
96.6M
  pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
680
#ifdef LOCK_DEBUG
681
  pg_atomic_init_u32(&lock->nwaiters, 0);
682
#endif
683
96.6M
  lock->tranche = tranche_id;
684
96.6M
  proclist_init(&lock->waiters);
685
96.6M
}
686
687
/*
688
 * Report start of wait event for light-weight locks.
689
 *
690
 * This function will be used by all the light-weight lock calls which
691
 * needs to wait to acquire the lock.  This function distinguishes wait
692
 * event based on tranche and lock id.
693
 */
694
static inline void
695
LWLockReportWaitStart(LWLock *lock)
696
314
{
697
314
  pgstat_report_wait_start(PG_WAIT_LWLOCK | lock->tranche);
698
314
}
699
700
/*
701
 * Report end of wait event for light-weight locks.
702
 */
703
static inline void
704
LWLockReportWaitEnd(void)
705
313
{
706
313
  pgstat_report_wait_end();
707
313
}
708
709
/*
710
 * Return an identifier for an LWLock based on the wait class and event.
711
 */
712
const char *
713
GetLWLockIdentifier(uint32 classId, uint16 eventId)
714
0
{
715
0
  Assert(classId == PG_WAIT_LWLOCK);
716
717
  /*
718
   * It is quite possible that user has registered tranche in one of the
719
   * backends (e.g. by allocating lwlocks in dynamic shared memory) but not
720
   * all of them, so we can't assume the tranche is registered here.
721
   */
722
0
  if (eventId >= LWLockTranchesAllocated ||
723
0
    LWLockTrancheArray[eventId] == NULL)
724
0
    return "extension";
725
726
0
  return LWLockTrancheArray[eventId];
727
0
}
728
729
/*
730
 * Internal function that tries to atomically acquire the lwlock in the passed
731
 * in mode.
732
 *
733
 * This function will not block waiting for a lock to become free - that's the
734
 * callers job.
735
 *
736
 * Returns true if the lock isn't free and we need to wait.
737
 */
738
static bool
739
LWLockAttemptLock(LWLock *lock, LWLockMode mode)
740
1.48M
{
741
1.48M
  uint32    old_state;
742
743
1.48M
  AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
744
745
  /*
746
   * Read once outside the loop, later iterations will get the newer value
747
   * via compare & exchange.
748
   */
749
1.48M
  old_state = pg_atomic_read_u32(&lock->state);
750
751
  /* loop until we've determined whether we could acquire the lock or not */
752
1.48M
  while (true)
753
1.48M
  {
754
1.48M
    uint32    desired_state;
755
1.48M
    bool    lock_free;
756
757
1.48M
    desired_state = old_state;
758
759
1.48M
    if (mode == LW_EXCLUSIVE)
760
638k
    {
761
638k
      lock_free = (old_state & LW_LOCK_MASK) == 0;
762
638k
      if (lock_free)
763
637k
        desired_state += LW_VAL_EXCLUSIVE;
764
638k
    }
765
847k
    else
766
847k
    {
767
847k
      lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
768
847k
      if (lock_free)
769
847k
        desired_state += LW_VAL_SHARED;
770
847k
    }
771
772
    /*
773
     * Attempt to swap in the state we are expecting. If we didn't see
774
     * lock to be free, that's just the old value. If we saw it as free,
775
     * we'll attempt to mark it acquired. The reason that we always swap
776
     * in the value is that this doubles as a memory barrier. We could try
777
     * to be smarter and only swap in values if we saw the lock as free,
778
     * but benchmark haven't shown it as beneficial so far.
779
     *
780
     * Retry if the value changed since we last looked at it.
781
     */
782
1.48M
    if (pg_atomic_compare_exchange_u32(&lock->state,
783
1.48M
                       &old_state, desired_state))
784
1.48M
    {
785
1.48M
      if (lock_free)
786
1.48M
      {
787
        /* Great! Got the lock. */
788
#ifdef LOCK_DEBUG
789
        if (mode == LW_EXCLUSIVE)
790
          lock->owner = MyProc;
791
#endif
792
1.48M
        return false;
793
1.48M
      }
794
837
      else
795
837
        return true; /* somebody else has the lock */
796
1.48M
    }
797
1.48M
  }
798
18.4E
  pg_unreachable();
799
1.48M
}
800
801
/*
802
 * Lock the LWLock's wait list against concurrent activity.
803
 *
804
 * NB: even though the wait list is locked, non-conflicting lock operations
805
 * may still happen concurrently.
806
 *
807
 * Time spent holding mutex should be short!
808
 */
809
static void
810
LWLockWaitListLock(LWLock *lock)
811
17.3k
{
812
17.3k
  uint32    old_state;
813
#ifdef LWLOCK_STATS
814
  lwlock_stats *lwstats;
815
  uint32    delays = 0;
816
817
  lwstats = get_lwlock_stats_entry(lock);
818
#endif
819
820
17.3k
  while (true)
821
17.4k
  {
822
    /* always try once to acquire lock directly */
823
17.4k
    old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
824
17.4k
    if (!(old_state & LW_FLAG_LOCKED))
825
17.3k
      break;       /* got lock */
826
827
    /* and then spin without atomic operations until lock is released */
828
93
    {
829
93
      SpinDelayStatus delayStatus;
830
831
93
      init_local_spin_delay(&delayStatus);
832
833
9.10k
      while (old_state & LW_FLAG_LOCKED)
834
9.01k
      {
835
9.01k
        perform_spin_delay(&delayStatus);
836
9.01k
        old_state = pg_atomic_read_u32(&lock->state);
837
9.01k
      }
838
#ifdef LWLOCK_STATS
839
      delays += delayStatus.delays;
840
#endif
841
93
      finish_spin_delay(&delayStatus);
842
93
    }
843
844
    /*
845
     * Retry. The lock might obviously already be re-acquired by the time
846
     * we're attempting to get it again.
847
     */
848
93
  }
849
850
#ifdef LWLOCK_STATS
851
  lwstats->spin_delay_count += delays;
852
#endif
853
17.3k
}
854
855
/*
856
 * Unlock the LWLock's wait list.
857
 *
858
 * Note that it can be more efficient to manipulate flags and release the
859
 * locks in a single atomic operation.
860
 */
861
static void
862
LWLockWaitListUnlock(LWLock *lock)
863
16.7k
{
864
16.7k
  uint32    old_state PG_USED_FOR_ASSERTS_ONLY;
865
866
16.7k
  old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
867
868
16.7k
  Assert(old_state & LW_FLAG_LOCKED);
869
16.7k
}
870
871
/*
872
 * Wakeup all the lockers that currently have a chance to acquire the lock.
873
 */
874
static void
875
LWLockWakeup(LWLock *lock)
876
682
{
877
682
  bool    new_release_ok;
878
682
  bool    wokeup_somebody = false;
879
682
  proclist_head wakeup;
880
682
  proclist_mutable_iter iter;
881
882
682
  proclist_init(&wakeup);
883
884
682
  new_release_ok = true;
885
886
  /* lock wait list while collecting backends to wake up */
887
682
  LWLockWaitListLock(lock);
888
889
682
  proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
890
354
  {
891
354
    PGPROC     *waiter = GetPGProcByNumber(iter.cur);
892
893
354
    if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
894
0
      continue;
895
896
354
    proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
897
354
    proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
898
899
354
    if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
900
354
    {
901
      /*
902
       * Prevent additional wakeups until retryer gets to run. Backends
903
       * that are just waiting for the lock to become free don't retry
904
       * automatically.
905
       */
906
354
      new_release_ok = false;
907
908
      /*
909
       * Don't wakeup (further) exclusive locks.
910
       */
911
354
      wokeup_somebody = true;
912
354
    }
913
914
    /*
915
     * Once we've woken up an exclusive lock, there's no point in waking
916
     * up anybody else.
917
     */
918
354
    if (waiter->lwWaitMode == LW_EXCLUSIVE)
919
352
      break;
920
354
  }
921
922
682
  Assert(proclist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
923
924
  /* unset required flags, and release lock, in one fell swoop */
925
682
  {
926
682
    uint32    old_state;
927
682
    uint32    desired_state;
928
929
682
    old_state = pg_atomic_read_u32(&lock->state);
930
682
    while (true)
931
691
    {
932
691
      desired_state = old_state;
933
934
      /* compute desired flags */
935
936
691
      if (new_release_ok)
937
335
        desired_state |= LW_FLAG_RELEASE_OK;
938
691
      else
939
356
        desired_state &= ~LW_FLAG_RELEASE_OK;
940
941
691
      if (proclist_is_empty(&wakeup))
942
335
        desired_state &= ~LW_FLAG_HAS_WAITERS;
943
944
691
      desired_state &= ~LW_FLAG_LOCKED; /* release lock */
945
946
691
      if (pg_atomic_compare_exchange_u32(&lock->state, &old_state,
947
691
                         desired_state))
948
682
        break;
949
691
    }
950
682
  }
951
952
  /* Awaken any waiters I removed from the queue. */
953
682
  proclist_foreach_modify(iter, &wakeup, lwWaitLink)
954
354
  {
955
354
    PGPROC     *waiter = GetPGProcByNumber(iter.cur);
956
957
354
    LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
958
354
    proclist_delete(&wakeup, iter.cur, lwWaitLink);
959
960
    /*
961
     * Guarantee that lwWaiting being unset only becomes visible once the
962
     * unlink from the link has completed. Otherwise the target backend
963
     * could be woken up for other reason and enqueue for a new lock - if
964
     * that happens before the list unlink happens, the list would end up
965
     * being corrupted.
966
     *
967
     * The barrier pairs with the LWLockWaitListLock() when enqueuing for
968
     * another lock.
969
     */
970
354
    pg_write_barrier();
971
354
    waiter->lwWaiting = false;
972
354
    PGSemaphoreUnlock(waiter->sem);
973
354
  }
974
682
}
975
976
/*
977
 * Add ourselves to the end of the queue.
978
 *
979
 * NB: Mode can be LW_WAIT_UNTIL_FREE here!
980
 */
981
static void
982
LWLockQueueSelf(LWLock *lock, LWLockMode mode)
983
524
{
984
  /*
985
   * If we don't have a PGPROC structure, there's no way to wait. This
986
   * should never occur, since MyProc should only be null during shared
987
   * memory initialization.
988
   */
989
524
  if (MyProc == NULL)
990
0
    elog(PANIC, "cannot wait without a PGPROC structure");
991
992
524
  if (MyProc->lwWaiting)
993
0
    elog(PANIC, "queueing for lock while waiting on another one");
994
995
524
  LWLockWaitListLock(lock);
996
997
  /* setting the flag is protected by the spinlock */
998
524
  pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
999
1000
524
  MyProc->lwWaiting = true;
1001
524
  MyProc->lwWaitMode = mode;
1002
1003
  /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
1004
524
  if (mode == LW_WAIT_UNTIL_FREE)
1005
0
    proclist_push_head(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1006
524
  else
1007
524
    proclist_push_tail(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1008
1009
  /* Can release the mutex now */
1010
524
  LWLockWaitListUnlock(lock);
1011
1012
#ifdef LOCK_DEBUG
1013
  pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
1014
#endif
1015
1016
524
}
1017
1018
/*
1019
 * Remove ourselves from the waitlist.
1020
 *
1021
 * This is used if we queued ourselves because we thought we needed to sleep
1022
 * but, after further checking, we discovered that we don't actually need to
1023
 * do so.
1024
 */
1025
static void
1026
LWLockDequeueSelf(LWLock *lock)
1027
210
{
1028
210
  bool    found = false;
1029
210
  proclist_mutable_iter iter;
1030
1031
#ifdef LWLOCK_STATS
1032
  lwlock_stats *lwstats;
1033
1034
  lwstats = get_lwlock_stats_entry(lock);
1035
1036
  lwstats->dequeue_self_count++;
1037
#endif
1038
1039
210
  LWLockWaitListLock(lock);
1040
1041
  /*
1042
   * Can't just remove ourselves from the list, but we need to iterate over
1043
   * all entries as somebody else could have dequeued us.
1044
   */
1045
210
  proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1046
170
  {
1047
170
    if (iter.cur == MyProc->pgprocno)
1048
170
    {
1049
170
      found = true;
1050
170
      proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1051
170
      break;
1052
170
    }
1053
170
  }
1054
1055
210
  if (proclist_is_empty(&lock->waiters) &&
1056
210
    (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
1057
210
  {
1058
210
    pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
1059
210
  }
1060
1061
  /* XXX: combine with fetch_and above? */
1062
210
  LWLockWaitListUnlock(lock);
1063
1064
  /* clear waiting state again, nice for debugging */
1065
210
  if (found)
1066
170
    MyProc->lwWaiting = false;
1067
210
  else
1068
40
  {
1069
40
    int     extraWaits = 0;
1070
1071
    /*
1072
     * Somebody else dequeued us and has or will wake us up. Deal with the
1073
     * superfluous absorption of a wakeup.
1074
     */
1075
1076
    /*
1077
     * Reset releaseOk if somebody woke us before we removed ourselves -
1078
     * they'll have set it to false.
1079
     */
1080
40
    pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1081
1082
    /*
1083
     * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
1084
     * get reset at some inconvenient point later. Most of the time this
1085
     * will immediately return.
1086
     */
1087
40
    for (;;)
1088
40
    {
1089
40
      PGSemaphoreLock(MyProc->sem);
1090
40
      if (!MyProc->lwWaiting)
1091
40
        break;
1092
0
      extraWaits++;
1093
0
    }
1094
1095
    /*
1096
     * Fix the process wait semaphore's count for any absorbed wakeups.
1097
     */
1098
40
    while (extraWaits-- > 0)
1099
0
      PGSemaphoreUnlock(MyProc->sem);
1100
40
  }
1101
1102
#ifdef LOCK_DEBUG
1103
  {
1104
    /* not waiting anymore */
1105
    uint32    nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1106
1107
    Assert(nwaiters < MAX_BACKENDS);
1108
  }
1109
#endif
1110
210
}
1111
1112
/*
1113
 * LWLockAcquire - acquire a lightweight lock in the specified mode
1114
 *
1115
 * If the lock is not available, sleep until it is.  Returns true if the lock
1116
 * was available immediately, false if we had to sleep.
1117
 *
1118
 * Side effect: cancel/die interrupts are held off until lock release.
1119
 */
1120
bool
1121
LWLockAcquire(LWLock *lock, LWLockMode mode)
1122
1.48M
{
1123
1.48M
  PGPROC     *proc = MyProc;
1124
1.48M
  bool    result = true;
1125
1.48M
  int     extraWaits = 0;
1126
#ifdef LWLOCK_STATS
1127
  lwlock_stats *lwstats;
1128
1129
  lwstats = get_lwlock_stats_entry(lock);
1130
#endif
1131
1132
1.48M
  AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1133
1134
1.48M
  PRINT_LWDEBUG("LWLockAcquire", lock, mode);
1135
1136
#ifdef LWLOCK_STATS
1137
  /* Count lock acquisition attempts */
1138
  if (mode == LW_EXCLUSIVE)
1139
    lwstats->ex_acquire_count++;
1140
  else
1141
    lwstats->sh_acquire_count++;
1142
#endif              /* LWLOCK_STATS */
1143
1144
  /*
1145
   * We can't wait if we haven't got a PGPROC.  This should only occur
1146
   * during bootstrap or shared memory initialization.  Put an Assert here
1147
   * to catch unsafe coding practices.
1148
   */
1149
1.48M
  Assert(!(proc == NULL && IsUnderPostmaster));
1150
1151
  /* Ensure we will have room to remember the lock */
1152
1.48M
  if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1153
0
    elog(ERROR, "too many LWLocks taken");
1154
1155
  /*
1156
   * Lock out cancel/die interrupts until we exit the code section protected
1157
   * by the LWLock.  This ensures that interrupts will not interfere with
1158
   * manipulations of data structures in shared memory.
1159
   */
1160
1.48M
  HOLD_INTERRUPTS();
1161
1162
  /*
1163
   * Loop here to try to acquire lock after each time we are signaled by
1164
   * LWLockRelease.
1165
   *
1166
   * NOTE: it might seem better to have LWLockRelease actually grant us the
1167
   * lock, rather than retrying and possibly having to go back to sleep. But
1168
   * in practice that is no good because it means a process swap for every
1169
   * lock acquisition when two or more processes are contending for the same
1170
   * lock.  Since LWLocks are normally used to protect not-very-long
1171
   * sections of computation, a process needs to be able to acquire and
1172
   * release the same lock many times during a single CPU time slice, even
1173
   * in the presence of contention.  The efficiency of being able to do that
1174
   * outweighs the inefficiency of sometimes wasting a process dispatch
1175
   * cycle because the lock is not free when a released waiter finally gets
1176
   * to run.  See pgsql-hackers archives for 29-Dec-01.
1177
   */
1178
1.48M
  for (;;)
1179
1.48M
  {
1180
1.48M
    bool    mustwait;
1181
1182
    /*
1183
     * Try to grab the lock the first time, we're not in the waitqueue
1184
     * yet/anymore.
1185
     */
1186
1.48M
    mustwait = LWLockAttemptLock(lock, mode);
1187
1188
1.48M
    if (!mustwait)
1189
1.48M
    {
1190
1.48M
      LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
1191
1.48M
      break;        /* got the lock */
1192
1.48M
    }
1193
1194
    /*
1195
     * Ok, at this point we couldn't grab the lock on the first try. We
1196
     * cannot simply queue ourselves to the end of the list and wait to be
1197
     * woken up because by now the lock could long have been released.
1198
     * Instead add us to the queue and try to grab the lock again. If we
1199
     * succeed we need to revert the queuing and be happy, otherwise we
1200
     * recheck the lock. If we still couldn't grab it, we know that the
1201
     * other locker will see our queue entries when releasing since they
1202
     * existed before we checked for the lock.
1203
     */
1204
1205
    /* add to the queue */
1206
210
    LWLockQueueSelf(lock, mode);
1207
1208
    /* we're now guaranteed to be woken up if necessary */
1209
210
    mustwait = LWLockAttemptLock(lock, mode);
1210
1211
    /* ok, grabbed the lock the second time round, need to undo queueing */
1212
210
    if (!mustwait)
1213
210
    {
1214
210
      LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
1215
1216
210
      LWLockDequeueSelf(lock);
1217
210
      break;
1218
210
    }
1219
1220
    /*
1221
     * Wait until awakened.
1222
     *
1223
     * Since we share the process wait semaphore with the regular lock
1224
     * manager and ProcWaitForSignal, and we may need to acquire an LWLock
1225
     * while one of those is pending, it is possible that we get awakened
1226
     * for a reason other than being signaled by LWLockRelease. If so,
1227
     * loop back and wait again.  Once we've gotten the LWLock,
1228
     * re-increment the sema by the number of additional signals received,
1229
     * so that the lock manager or signal manager will see the received
1230
     * signal when it next waits.
1231
     */
1232
0
    LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
1233
1234
#ifdef LWLOCK_STATS
1235
    lwstats->block_count++;
1236
#endif
1237
1238
0
    LWLockReportWaitStart(lock);
1239
0
    TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1240
1241
0
    for (;;)
1242
314
    {
1243
314
      PGSemaphoreLock(proc->sem);
1244
314
      if (!proc->lwWaiting)
1245
313
        break;
1246
1
      extraWaits++;
1247
1
    }
1248
1249
    /* Retrying, allow LWLockRelease to release waiters again. */
1250
0
    pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1251
1252
#ifdef LOCK_DEBUG
1253
    {
1254
      /* not waiting anymore */
1255
      uint32    nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1256
1257
      Assert(nwaiters < MAX_BACKENDS);
1258
    }
1259
#endif
1260
1261
0
    TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1262
0
    LWLockReportWaitEnd();
1263
1264
0
    LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
1265
1266
    /* Now loop back and try to acquire lock again. */
1267
0
    result = false;
1268
0
  }
1269
1270
1.48M
  TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
1271
1272
  /* Add lock to list of locks held by this backend */
1273
1.48M
  held_lwlocks[num_held_lwlocks].lock = lock;
1274
1.48M
  held_lwlocks[num_held_lwlocks++].mode = mode;
1275
1276
  /*
1277
   * Fix the process wait semaphore's count for any absorbed wakeups.
1278
   */
1279
1.48M
  while (extraWaits-- > 0)
1280
0
    PGSemaphoreUnlock(proc->sem);
1281
1282
1.48M
  return result;
1283
1.48M
}
1284
1285
/*
1286
 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
1287
 *
1288
 * If the lock is not available, return false with no side-effects.
1289
 *
1290
 * If successful, cancel/die interrupts are held off until lock release.
1291
 */
1292
bool
1293
LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
1294
222
{
1295
222
  bool    mustwait;
1296
1297
222
  AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1298
1299
222
  PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
1300
1301
  /* Ensure we will have room to remember the lock */
1302
222
  if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1303
0
    elog(ERROR, "too many LWLocks taken");
1304
1305
  /*
1306
   * Lock out cancel/die interrupts until we exit the code section protected
1307
   * by the LWLock.  This ensures that interrupts will not interfere with
1308
   * manipulations of data structures in shared memory.
1309
   */
1310
222
  HOLD_INTERRUPTS();
1311
1312
  /* Check for the lock */
1313
222
  mustwait = LWLockAttemptLock(lock, mode);
1314
1315
222
  if (mustwait)
1316
0
  {
1317
    /* Failed to get lock, so release interrupt holdoff */
1318
0
    RESUME_INTERRUPTS();
1319
1320
0
    LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
1321
0
    TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), mode);
1322
0
  }
1323
222
  else
1324
222
  {
1325
    /* Add lock to list of locks held by this backend */
1326
222
    held_lwlocks[num_held_lwlocks].lock = lock;
1327
222
    held_lwlocks[num_held_lwlocks++].mode = mode;
1328
222
    TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), mode);
1329
222
  }
1330
222
  return !mustwait;
1331
222
}
1332
1333
/*
1334
 * LWLockAcquireOrWait - Acquire lock, or wait until it's free
1335
 *
1336
 * The semantics of this function are a bit funky.  If the lock is currently
1337
 * free, it is acquired in the given mode, and the function returns true.  If
1338
 * the lock isn't immediately free, the function waits until it is released
1339
 * and returns false, but does not acquire the lock.
1340
 *
1341
 * This is currently used for WALWriteLock: when a backend flushes the WAL,
1342
 * holding WALWriteLock, it can flush the commit records of many other
1343
 * backends as a side-effect.  Those other backends need to wait until the
1344
 * flush finishes, but don't need to acquire the lock anymore.  They can just
1345
 * wake up, observe that their records have already been flushed, and return.
1346
 */
1347
bool
1348
LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
1349
984
{
1350
984
  PGPROC     *proc = MyProc;
1351
984
  bool    mustwait;
1352
984
  int     extraWaits = 0;
1353
#ifdef LWLOCK_STATS
1354
  lwlock_stats *lwstats;
1355
1356
  lwstats = get_lwlock_stats_entry(lock);
1357
#endif
1358
1359
984
  Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1360
1361
984
  PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
1362
1363
  /* Ensure we will have room to remember the lock */
1364
984
  if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1365
0
    elog(ERROR, "too many LWLocks taken");
1366
1367
  /*
1368
   * Lock out cancel/die interrupts until we exit the code section protected
1369
   * by the LWLock.  This ensures that interrupts will not interfere with
1370
   * manipulations of data structures in shared memory.
1371
   */
1372
984
  HOLD_INTERRUPTS();
1373
1374
  /*
1375
   * NB: We're using nearly the same twice-in-a-row lock acquisition
1376
   * protocol as LWLockAcquire(). Check its comments for details.
1377
   */
1378
984
  mustwait = LWLockAttemptLock(lock, mode);
1379
1380
984
  if (mustwait)
1381
0
  {
1382
0
    LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1383
1384
0
    mustwait = LWLockAttemptLock(lock, mode);
1385
1386
0
    if (mustwait)
1387
0
    {
1388
      /*
1389
       * Wait until awakened.  Like in LWLockAcquire, be prepared for
1390
       * bogus wakeups, because we share the semaphore with
1391
       * ProcWaitForSignal.
1392
       */
1393
0
      LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
1394
1395
#ifdef LWLOCK_STATS
1396
      lwstats->block_count++;
1397
#endif
1398
1399
0
      LWLockReportWaitStart(lock);
1400
0
      TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1401
1402
0
      for (;;)
1403
0
      {
1404
0
        PGSemaphoreLock(proc->sem);
1405
0
        if (!proc->lwWaiting)
1406
0
          break;
1407
0
        extraWaits++;
1408
0
      }
1409
1410
#ifdef LOCK_DEBUG
1411
      {
1412
        /* not waiting anymore */
1413
        uint32    nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1414
1415
        Assert(nwaiters < MAX_BACKENDS);
1416
      }
1417
#endif
1418
0
      TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1419
0
      LWLockReportWaitEnd();
1420
1421
0
      LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
1422
0
    }
1423
0
    else
1424
0
    {
1425
0
      LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
1426
1427
      /*
1428
       * Got lock in the second attempt, undo queueing. We need to treat
1429
       * this as having successfully acquired the lock, otherwise we'd
1430
       * not necessarily wake up people we've prevented from acquiring
1431
       * the lock.
1432
       */
1433
0
      LWLockDequeueSelf(lock);
1434
0
    }
1435
0
  }
1436
1437
  /*
1438
   * Fix the process wait semaphore's count for any absorbed wakeups.
1439
   */
1440
984
  while (extraWaits-- > 0)
1441
0
    PGSemaphoreUnlock(proc->sem);
1442
1443
984
  if (mustwait)
1444
0
  {
1445
    /* Failed to get lock, so release interrupt holdoff */
1446
0
    RESUME_INTERRUPTS();
1447
0
    LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
1448
0
    TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), mode);
1449
0
  }
1450
984
  else
1451
984
  {
1452
984
    LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
1453
    /* Add lock to list of locks held by this backend */
1454
984
    held_lwlocks[num_held_lwlocks].lock = lock;
1455
984
    held_lwlocks[num_held_lwlocks++].mode = mode;
1456
984
    TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), mode);
1457
984
  }
1458
1459
984
  return !mustwait;
1460
984
}
1461
1462
/*
1463
 * Does the lwlock in its current state need to wait for the variable value to
1464
 * change?
1465
 *
1466
 * If we don't need to wait, and it's because the value of the variable has
1467
 * changed, store the current value in newval.
1468
 *
1469
 * *result is set to true if the lock was free, and false otherwise.
1470
 */
1471
static bool
1472
LWLockConflictsWithVar(LWLock *lock,
1473
             uint64 *valptr, uint64 oldval, uint64 *newval,
1474
             bool *result)
1475
7.87k
{
1476
7.87k
  bool    mustwait;
1477
7.87k
  uint64    value;
1478
1479
  /*
1480
   * Test first to see if it the slot is free right now.
1481
   *
1482
   * XXX: the caller uses a spinlock before this, so we don't need a memory
1483
   * barrier here as far as the current usage is concerned.  But that might
1484
   * not be safe in general.
1485
   */
1486
7.87k
  mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
1487
1488
7.87k
  if (!mustwait)
1489
7.87k
  {
1490
7.87k
    *result = true;
1491
7.87k
    return false;
1492
7.87k
  }
1493
1494
0
  *result = false;
1495
1496
  /*
1497
   * Read value using the lwlock's wait list lock, as we can't generally
1498
   * rely on atomic 64 bit reads/stores.  TODO: On platforms with a way to
1499
   * do atomic 64 bit reads/writes the spinlock should be optimized away.
1500
   */
1501
0
  LWLockWaitListLock(lock);
1502
0
  value = *valptr;
1503
0
  LWLockWaitListUnlock(lock);
1504
1505
0
  if (value != oldval)
1506
0
  {
1507
0
    mustwait = false;
1508
0
    *newval = value;
1509
0
  }
1510
0
  else
1511
0
  {
1512
0
    mustwait = true;
1513
0
  }
1514
1515
0
  return mustwait;
1516
0
}
1517
1518
/*
1519
 * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
1520
 *
1521
 * If the lock is held and *valptr equals oldval, waits until the lock is
1522
 * either freed, or the lock holder updates *valptr by calling
1523
 * LWLockUpdateVar.  If the lock is free on exit (immediately or after
1524
 * waiting), returns true.  If the lock is still held, but *valptr no longer
1525
 * matches oldval, returns false and sets *newval to the current value in
1526
 * *valptr.
1527
 *
1528
 * Note: this function ignores shared lock holders; if the lock is held
1529
 * in shared mode, returns 'true'.
1530
 */
1531
bool
1532
LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
1533
7.87k
{
1534
7.87k
  PGPROC     *proc = MyProc;
1535
7.87k
  int     extraWaits = 0;
1536
7.87k
  bool    result = false;
1537
#ifdef LWLOCK_STATS
1538
  lwlock_stats *lwstats;
1539
1540
  lwstats = get_lwlock_stats_entry(lock);
1541
#endif
1542
1543
7.87k
  PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
1544
1545
  /*
1546
   * Lock out cancel/die interrupts while we sleep on the lock.  There is no
1547
   * cleanup mechanism to remove us from the wait queue if we got
1548
   * interrupted.
1549
   */
1550
7.87k
  HOLD_INTERRUPTS();
1551
1552
  /*
1553
   * Loop here to check the lock's status after each time we are signaled.
1554
   */
1555
7.87k
  for (;;)
1556
7.87k
  {
1557
7.87k
    bool    mustwait;
1558
1559
7.87k
    mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1560
7.87k
                      &result);
1561
1562
7.87k
    if (!mustwait)
1563
7.87k
      break;       /* the lock was free or value didn't match */
1564
1565
    /*
1566
     * Add myself to wait queue. Note that this is racy, somebody else
1567
     * could wakeup before we're finished queuing. NB: We're using nearly
1568
     * the same twice-in-a-row lock acquisition protocol as
1569
     * LWLockAcquire(). Check its comments for details. The only
1570
     * difference is that we also have to check the variable's values when
1571
     * checking the state of the lock.
1572
     */
1573
0
    LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1574
1575
    /*
1576
     * Set RELEASE_OK flag, to make sure we get woken up as soon as the
1577
     * lock is released.
1578
     */
1579
0
    pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1580
1581
    /*
1582
     * We're now guaranteed to be woken up if necessary. Recheck the lock
1583
     * and variables state.
1584
     */
1585
0
    mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1586
0
                      &result);
1587
1588
    /* Ok, no conflict after we queued ourselves. Undo queueing. */
1589
0
    if (!mustwait)
1590
0
    {
1591
0
      LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
1592
1593
0
      LWLockDequeueSelf(lock);
1594
0
      break;
1595
0
    }
1596
1597
    /*
1598
     * Wait until awakened.
1599
     *
1600
     * Since we share the process wait semaphore with the regular lock
1601
     * manager and ProcWaitForSignal, and we may need to acquire an LWLock
1602
     * while one of those is pending, it is possible that we get awakened
1603
     * for a reason other than being signaled by LWLockRelease. If so,
1604
     * loop back and wait again.  Once we've gotten the LWLock,
1605
     * re-increment the sema by the number of additional signals received,
1606
     * so that the lock manager or signal manager will see the received
1607
     * signal when it next waits.
1608
     */
1609
0
    LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
1610
1611
#ifdef LWLOCK_STATS
1612
    lwstats->block_count++;
1613
#endif
1614
1615
0
    LWLockReportWaitStart(lock);
1616
0
    TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), LW_EXCLUSIVE);
1617
1618
0
    for (;;)
1619
0
    {
1620
0
      PGSemaphoreLock(proc->sem);
1621
0
      if (!proc->lwWaiting)
1622
0
        break;
1623
0
      extraWaits++;
1624
0
    }
1625
1626
#ifdef LOCK_DEBUG
1627
    {
1628
      /* not waiting anymore */
1629
      uint32    nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1630
1631
      Assert(nwaiters < MAX_BACKENDS);
1632
    }
1633
#endif
1634
1635
0
    TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), LW_EXCLUSIVE);
1636
0
    LWLockReportWaitEnd();
1637
1638
0
    LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
1639
1640
    /* Now loop back and check the status of the lock again. */
1641
0
  }
1642
1643
7.87k
  TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), LW_EXCLUSIVE);
1644
1645
  /*
1646
   * Fix the process wait semaphore's count for any absorbed wakeups.
1647
   */
1648
7.87k
  while (extraWaits-- > 0)
1649
0
    PGSemaphoreUnlock(proc->sem);
1650
1651
  /*
1652
   * Now okay to allow cancel/die interrupts.
1653
   */
1654
7.87k
  RESUME_INTERRUPTS();
1655
1656
7.87k
  return result;
1657
7.87k
}
1658
1659
1660
/*
1661
 * LWLockUpdateVar - Update a variable and wake up waiters atomically
1662
 *
1663
 * Sets *valptr to 'val', and wakes up all processes waiting for us with
1664
 * LWLockWaitForVar().  Setting the value and waking up the processes happen
1665
 * atomically so that any process calling LWLockWaitForVar() on the same lock
1666
 * is guaranteed to see the new value, and act accordingly.
1667
 *
1668
 * The caller must be holding the lock in exclusive mode.
1669
 */
1670
void
1671
LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
1672
6.90k
{
1673
6.90k
  proclist_head wakeup;
1674
6.90k
  proclist_mutable_iter iter;
1675
1676
6.90k
  PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
1677
1678
6.90k
  proclist_init(&wakeup);
1679
1680
6.90k
  LWLockWaitListLock(lock);
1681
1682
6.90k
  Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
1683
1684
  /* Update the lock's value */
1685
6.90k
  *valptr = val;
1686
1687
  /*
1688
   * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
1689
   * up. They are always in the front of the queue.
1690
   */
1691
6.90k
  proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1692
0
  {
1693
0
    PGPROC     *waiter = GetPGProcByNumber(iter.cur);
1694
1695
0
    if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
1696
0
      break;
1697
1698
0
    proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1699
0
    proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
1700
0
  }
1701
1702
  /* We are done updating shared state of the lock itself. */
1703
6.90k
  LWLockWaitListUnlock(lock);
1704
1705
  /*
1706
   * Awaken any waiters I removed from the queue.
1707
   */
1708
6.90k
  proclist_foreach_modify(iter, &wakeup, lwWaitLink)
1709
0
  {
1710
0
    PGPROC     *waiter = GetPGProcByNumber(iter.cur);
1711
1712
0
    proclist_delete(&wakeup, iter.cur, lwWaitLink);
1713
    /* check comment in LWLockWakeup() about this barrier */
1714
0
    pg_write_barrier();
1715
0
    waiter->lwWaiting = false;
1716
0
    PGSemaphoreUnlock(waiter->sem);
1717
0
  }
1718
6.90k
}
1719
1720
1721
/*
1722
 * LWLockRelease - release a previously acquired lock
1723
 */
1724
void
1725
LWLockRelease(LWLock *lock)
1726
1.48M
{
1727
1.48M
  LWLockMode  mode;
1728
1.48M
  uint32    oldstate;
1729
1.48M
  bool    check_waiters;
1730
1.48M
  int     i;
1731
1732
  /*
1733
   * Remove lock from list of locks held.  Usually, but not always, it will
1734
   * be the latest-acquired lock; so search array backwards.
1735
   */
1736
1.53M
  for (i = num_held_lwlocks; --i >= 0;)
1737
1.53M
    if (lock == held_lwlocks[i].lock)
1738
1.48M
      break;
1739
1740
1.48M
  if (i < 0)
1741
0
    elog(ERROR, "lock %s is not held", T_NAME(lock));
1742
1743
1.48M
  mode = held_lwlocks[i].mode;
1744
1745
1.48M
  num_held_lwlocks--;
1746
1.53M
  for (; i < num_held_lwlocks; i++)
1747
51.3k
    held_lwlocks[i] = held_lwlocks[i + 1];
1748
1749
1.48M
  PRINT_LWDEBUG("LWLockRelease", lock, mode);
1750
1751
  /*
1752
   * Release my hold on lock, after that it can immediately be acquired by
1753
   * others, even if we still have to wakeup other waiters.
1754
   */
1755
1.48M
  if (mode == LW_EXCLUSIVE)
1756
637k
    oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
1757
847k
  else
1758
847k
    oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
1759
1760
  /* nobody else can have that kind of lock */
1761
1.48M
  Assert(!(oldstate & LW_VAL_EXCLUSIVE));
1762
1763
1764
  /*
1765
   * We're still waiting for backends to get scheduled, don't wake them up
1766
   * again.
1767
   */
1768
1.48M
  if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
1769
1.48M
    (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
1770
683
    (oldstate & LW_LOCK_MASK) == 0)
1771
682
    check_waiters = true;
1772
1.48M
  else
1773
1.48M
    check_waiters = false;
1774
1775
  /*
1776
   * As waking up waiters requires the spinlock to be acquired, only do so
1777
   * if necessary.
1778
   */
1779
1.48M
  if (check_waiters)
1780
682
  {
1781
    /* XXX: remove before commit? */
1782
682
    LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
1783
682
    LWLockWakeup(lock);
1784
682
  }
1785
1786
1.48M
  TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));
1787
1788
  /*
1789
   * Now okay to allow cancel/die interrupts.
1790
   */
1791
1.48M
  RESUME_INTERRUPTS();
1792
1.48M
}
1793
1794
/*
1795
 * LWLockReleaseClearVar - release a previously acquired lock, reset variable
1796
 */
1797
void
1798
LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
1799
9.05k
{
1800
9.05k
  LWLockWaitListLock(lock);
1801
1802
  /*
1803
   * Set the variable's value before releasing the lock, that prevents race
1804
   * a race condition wherein a new locker acquires the lock, but hasn't yet
1805
   * set the variables value.
1806
   */
1807
9.05k
  *valptr = val;
1808
9.05k
  LWLockWaitListUnlock(lock);
1809
1810
9.05k
  LWLockRelease(lock);
1811
9.05k
}
1812
1813
1814
/*
1815
 * LWLockReleaseAll - release all currently-held locks
1816
 *
1817
 * Used to clean up after ereport(ERROR). An important difference between this
1818
 * function and retail LWLockRelease calls is that InterruptHoldoffCount is
1819
 * unchanged by this operation.  This is necessary since InterruptHoldoffCount
1820
 * has been set to an appropriate level earlier in error recovery. We could
1821
 * decrement it below zero if we allow it to drop for each released lock!
1822
 */
1823
void
1824
LWLockReleaseAll(void)
1825
30.6k
{
1826
30.6k
  while (num_held_lwlocks > 0)
1827
36
  {
1828
36
    HOLD_INTERRUPTS();   /* match the upcoming RESUME_INTERRUPTS */
1829
1830
36
    LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
1831
36
  }
1832
30.6k
}
1833
1834
1835
/*
1836
 * LWLockHeldByMe - test whether my process holds a lock in any mode
1837
 *
1838
 * This is meant as debug support only.
1839
 */
1840
bool
1841
LWLockHeldByMe(LWLock *l)
1842
54.9k
{
1843
54.9k
  int     i;
1844
1845
54.9k
  for (i = 0; i < num_held_lwlocks; i++)
1846
54.9k
  {
1847
54.9k
    if (held_lwlocks[i].lock == l)
1848
54.9k
      return true;
1849
54.9k
  }
1850
2
  return false;
1851
54.9k
}
1852
1853
/*
1854
 * LWLockHeldByMeInMode - test whether my process holds a lock in given mode
1855
 *
1856
 * This is meant as debug support only.
1857
 */
1858
bool
1859
LWLockHeldByMeInMode(LWLock *l, LWLockMode mode)
1860
112
{
1861
112
  int     i;
1862
1863
112
  for (i = 0; i < num_held_lwlocks; i++)
1864
112
  {
1865
112
    if (held_lwlocks[i].lock == l && held_lwlocks[i].mode == mode)
1866
112
      return true;
1867
112
  }
1868
0
  return false;
1869
112
}