Coverage Report

Created: 2025-09-27 06:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/postgres/src/backend/storage/ipc/barrier.c
Line
Count
Source
1
/*-------------------------------------------------------------------------
2
 *
3
 * barrier.c
4
 *    Barriers for synchronizing cooperating processes.
5
 *
6
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7
 * Portions Copyright (c) 1994, Regents of the University of California
8
 *
9
 * From Wikipedia[1]: "In parallel computing, a barrier is a type of
10
 * synchronization method.  A barrier for a group of threads or processes in
11
 * the source code means any thread/process must stop at this point and cannot
12
 * proceed until all other threads/processes reach this barrier."
13
 *
14
 * This implementation of barriers allows for static sets of participants
15
 * known up front, or dynamic sets of participants which processes can join or
16
 * leave at any time.  In the dynamic case, a phase number can be used to
17
 * track progress through a parallel algorithm, and may be necessary to
18
 * synchronize with the current phase of a multi-phase algorithm when a new
19
 * participant joins.  In the static case, the phase number is used
20
 * internally, but it isn't strictly necessary for client code to access it
21
 * because the phase can only advance when the declared number of participants
22
 * reaches the barrier, so client code should be in no doubt about the current
23
 * phase of computation at all times.
24
 *
25
 * Consider a parallel algorithm that involves separate phases of computation
26
 * A, B and C where the output of each phase is needed before the next phase
27
 * can begin.
28
 *
29
 * In the case of a static barrier initialized with 4 participants, each
30
 * participant works on phase A, then calls BarrierArriveAndWait to wait until
31
 * all 4 participants have reached that point.  When BarrierArriveAndWait
32
 * returns control, each participant can work on B, and so on.  Because the
33
 * barrier knows how many participants to expect, the phases of computation
34
 * don't need labels or numbers, since each process's program counter implies
35
 * the current phase.  Even if some of the processes are slow to start up and
36
 * begin running phase A, the other participants are expecting them and will
37
 * patiently wait at the barrier.  The code could be written as follows:
38
 *
39
 *     perform_a();
40
 *     BarrierArriveAndWait(&barrier, ...);
41
 *     perform_b();
42
 *     BarrierArriveAndWait(&barrier, ...);
43
 *     perform_c();
44
 *     BarrierArriveAndWait(&barrier, ...);
45
 *
46
 * If the number of participants is not known up front, then a dynamic barrier
47
 * is needed and the number should be set to zero at initialization.  New
48
 * complications arise because the number necessarily changes over time as
49
 * participants attach and detach, and therefore phases B, C or even the end
50
 * of processing may be reached before any given participant has started
51
 * running and attached.  Therefore the client code must perform an initial
52
 * test of the phase number after attaching, because it needs to find out
53
 * which phase of the algorithm has been reached by any participants that are
54
 * already attached in order to synchronize with that work.  Once the program
55
 * counter or some other representation of current progress is synchronized
56
 * with the barrier's phase, normal control flow can be used just as in the
57
 * static case.  Our example could be written using a switch statement with
58
 * cases that fall-through, as follows:
59
 *
60
 *     phase = BarrierAttach(&barrier);
61
 *     switch (phase)
62
 *     {
63
 *     case PHASE_A:
64
 *         perform_a();
65
 *         BarrierArriveAndWait(&barrier, ...);
66
 *     case PHASE_B:
67
 *         perform_b();
68
 *         BarrierArriveAndWait(&barrier, ...);
69
 *     case PHASE_C:
70
 *         perform_c();
71
 *         BarrierArriveAndWait(&barrier, ...);
72
 *     }
73
 *     BarrierDetach(&barrier);
74
 *
75
 * Static barriers behave similarly to POSIX's pthread_barrier_t.  Dynamic
76
 * barriers behave similarly to Java's java.util.concurrent.Phaser.
77
 *
78
 * [1] https://en.wikipedia.org/wiki/Barrier_(computer_science)
79
 *
80
 * IDENTIFICATION
81
 *    src/backend/storage/ipc/barrier.c
82
 *
83
 *-------------------------------------------------------------------------
84
 */
85
86
#include "postgres.h"
87
#include "storage/barrier.h"
88
89
static inline bool BarrierDetachImpl(Barrier *barrier, bool arrive);
90
91
/*
92
 * Initialize this barrier.  To use a static party size, provide the number of
93
 * participants to wait for at each phase indicating that that number of
94
 * backends is implicitly attached.  To use a dynamic party size, specify zero
95
 * here and then use BarrierAttach() and
96
 * BarrierDetach()/BarrierArriveAndDetach() to register and deregister
97
 * participants explicitly.
98
 */
99
void
100
BarrierInit(Barrier *barrier, int participants)
101
0
{
102
0
  SpinLockInit(&barrier->mutex);
103
0
  barrier->participants = participants;
104
0
  barrier->arrived = 0;
105
0
  barrier->phase = 0;
106
0
  barrier->elected = 0;
107
0
  barrier->static_party = participants > 0;
108
0
  ConditionVariableInit(&barrier->condition_variable);
109
0
}
110
111
/*
112
 * Arrive at this barrier, wait for all other attached participants to arrive
113
 * too and then return.  Increments the current phase.  The caller must be
114
 * attached.
115
 *
116
 * While waiting, pg_stat_activity shows a wait_event_type and wait_event
117
 * controlled by the wait_event_info passed in, which should be a value from
118
 * one of the WaitEventXXX enums defined in pgstat.h.
119
 *
120
 * Return true in one arbitrarily chosen participant.  Return false in all
121
 * others.  The return code can be used to elect one participant to execute a
122
 * phase of work that must be done serially while other participants wait.
123
 */
124
bool
125
BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
126
0
{
127
0
  bool    release = false;
128
0
  bool    elected;
129
0
  int     start_phase;
130
0
  int     next_phase;
131
132
0
  SpinLockAcquire(&barrier->mutex);
133
0
  start_phase = barrier->phase;
134
0
  next_phase = start_phase + 1;
135
0
  ++barrier->arrived;
136
0
  if (barrier->arrived == barrier->participants)
137
0
  {
138
0
    release = true;
139
0
    barrier->arrived = 0;
140
0
    barrier->phase = next_phase;
141
0
    barrier->elected = next_phase;
142
0
  }
143
0
  SpinLockRelease(&barrier->mutex);
144
145
  /*
146
   * If we were the last expected participant to arrive, we can release our
147
   * peers and return true to indicate that this backend has been elected to
148
   * perform any serial work.
149
   */
150
0
  if (release)
151
0
  {
152
0
    ConditionVariableBroadcast(&barrier->condition_variable);
153
154
0
    return true;
155
0
  }
156
157
  /*
158
   * Otherwise we have to wait for the last participant to arrive and
159
   * advance the phase.
160
   */
161
0
  elected = false;
162
0
  ConditionVariablePrepareToSleep(&barrier->condition_variable);
163
0
  for (;;)
164
0
  {
165
    /*
166
     * We know that phase must either be start_phase, indicating that we
167
     * need to keep waiting, or next_phase, indicating that the last
168
     * participant that we were waiting for has either arrived or detached
169
     * so that the next phase has begun.  The phase cannot advance any
170
     * further than that without this backend's participation, because
171
     * this backend is attached.
172
     */
173
0
    SpinLockAcquire(&barrier->mutex);
174
0
    Assert(barrier->phase == start_phase || barrier->phase == next_phase);
175
0
    release = barrier->phase == next_phase;
176
0
    if (release && barrier->elected != next_phase)
177
0
    {
178
      /*
179
       * Usually the backend that arrives last and releases the other
180
       * backends is elected to return true (see above), so that it can
181
       * begin processing serial work while it has a CPU timeslice.
182
       * However, if the barrier advanced because someone detached, then
183
       * one of the backends that is awoken will need to be elected.
184
       */
185
0
      barrier->elected = barrier->phase;
186
0
      elected = true;
187
0
    }
188
0
    SpinLockRelease(&barrier->mutex);
189
0
    if (release)
190
0
      break;
191
0
    ConditionVariableSleep(&barrier->condition_variable, wait_event_info);
192
0
  }
193
0
  ConditionVariableCancelSleep();
194
195
0
  return elected;
196
0
}
197
198
/*
199
 * Arrive at this barrier, but detach rather than waiting.  Returns true if
200
 * the caller was the last to detach.
201
 */
202
bool
203
BarrierArriveAndDetach(Barrier *barrier)
204
0
{
205
0
  return BarrierDetachImpl(barrier, true);
206
0
}
207
208
/*
209
 * Arrive at a barrier, and detach all but the last to arrive.  Returns true if
210
 * the caller was the last to arrive, and is therefore still attached.
211
 */
212
bool
213
BarrierArriveAndDetachExceptLast(Barrier *barrier)
214
0
{
215
0
  SpinLockAcquire(&barrier->mutex);
216
0
  if (barrier->participants > 1)
217
0
  {
218
0
    --barrier->participants;
219
0
    SpinLockRelease(&barrier->mutex);
220
221
0
    return false;
222
0
  }
223
0
  Assert(barrier->participants == 1);
224
0
  ++barrier->phase;
225
0
  SpinLockRelease(&barrier->mutex);
226
227
0
  return true;
228
0
}
229
230
/*
231
 * Attach to a barrier.  All waiting participants will now wait for this
232
 * participant to call BarrierArriveAndWait(), BarrierDetach() or
233
 * BarrierArriveAndDetach().  Return the current phase.
234
 */
235
int
236
BarrierAttach(Barrier *barrier)
237
0
{
238
0
  int     phase;
239
240
0
  Assert(!barrier->static_party);
241
242
0
  SpinLockAcquire(&barrier->mutex);
243
0
  ++barrier->participants;
244
0
  phase = barrier->phase;
245
0
  SpinLockRelease(&barrier->mutex);
246
247
0
  return phase;
248
0
}
249
250
/*
251
 * Detach from a barrier.  This may release other waiters from
252
 * BarrierArriveAndWait() and advance the phase if they were only waiting for
253
 * this backend.  Return true if this participant was the last to detach.
254
 */
255
bool
256
BarrierDetach(Barrier *barrier)
257
0
{
258
0
  return BarrierDetachImpl(barrier, false);
259
0
}
260
261
/*
262
 * Return the current phase of a barrier.  The caller must be attached.
263
 */
264
int
265
BarrierPhase(Barrier *barrier)
266
0
{
267
  /*
268
   * It is OK to read barrier->phase without locking, because it can't
269
   * change without us (we are attached to it), and we executed a memory
270
   * barrier when we either attached or participated in changing it last
271
   * time.
272
   */
273
0
  return barrier->phase;
274
0
}
275
276
/*
277
 * Return an instantaneous snapshot of the number of participants currently
278
 * attached to this barrier.  For debugging purposes only.
279
 */
280
int
281
BarrierParticipants(Barrier *barrier)
282
0
{
283
0
  int     participants;
284
285
0
  SpinLockAcquire(&barrier->mutex);
286
0
  participants = barrier->participants;
287
0
  SpinLockRelease(&barrier->mutex);
288
289
0
  return participants;
290
0
}
291
292
/*
293
 * Detach from a barrier.  If 'arrive' is true then also increment the phase
294
 * if there are no other participants.  If there are other participants
295
 * waiting, then the phase will be advanced and they'll be released if they
296
 * were only waiting for the caller.  Return true if this participant was the
297
 * last to detach.
298
 */
299
static inline bool
300
BarrierDetachImpl(Barrier *barrier, bool arrive)
301
0
{
302
0
  bool    release;
303
0
  bool    last;
304
305
0
  Assert(!barrier->static_party);
306
307
0
  SpinLockAcquire(&barrier->mutex);
308
0
  Assert(barrier->participants > 0);
309
0
  --barrier->participants;
310
311
  /*
312
   * If any other participants are waiting and we were the last participant
313
   * waited for, release them.  If no other participants are waiting, but
314
   * this is a BarrierArriveAndDetach() call, then advance the phase too.
315
   */
316
0
  if ((arrive || barrier->participants > 0) &&
317
0
    barrier->arrived == barrier->participants)
318
0
  {
319
0
    release = true;
320
0
    barrier->arrived = 0;
321
0
    ++barrier->phase;
322
0
  }
323
0
  else
324
0
    release = false;
325
326
0
  last = barrier->participants == 0;
327
0
  SpinLockRelease(&barrier->mutex);
328
329
0
  if (release)
330
0
    ConditionVariableBroadcast(&barrier->condition_variable);
331
332
0
  return last;
333
0
}