Coverage Report

Created: 2025-07-11 06:21

/src/libevent/bufferevent_pair.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson
3
 *
4
 * Redistribution and use in source and binary forms, with or without
5
 * modification, are permitted provided that the following conditions
6
 * are met:
7
 * 1. Redistributions of source code must retain the above copyright
8
 *    notice, this list of conditions and the following disclaimer.
9
 * 2. Redistributions in binary form must reproduce the above copyright
10
 *    notice, this list of conditions and the following disclaimer in the
11
 *    documentation and/or other materials provided with the distribution.
12
 * 3. The name of the author may not be used to endorse or promote products
13
 *    derived from this software without specific prior written permission.
14
 *
15
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
16
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
17
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
18
 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
19
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
20
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
21
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
22
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
23
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
24
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
25
 */
26
#include "event2/event-config.h"
27
#include "evconfig-private.h"
28
29
#include <sys/types.h>
30
31
#ifdef _WIN32
32
#include <winsock2.h>
33
#endif
34
35
#include "event2/util.h"
36
#include "event2/buffer.h"
37
#include "event2/bufferevent.h"
38
#include "event2/bufferevent_struct.h"
39
#include "event2/event.h"
40
#include "defer-internal.h"
41
#include "bufferevent-internal.h"
42
#include "mm-internal.h"
43
#include "util-internal.h"
44
45
struct bufferevent_pair {
46
  struct bufferevent_private bev;
47
  struct bufferevent_pair *partner;
48
  /* For ->destruct() lock checking */
49
  struct bufferevent_pair *unlinked_partner;
50
};
51
52
53
/* Given a bufferevent that's really a bev part of a bufferevent_pair,
54
 * return that bufferevent_filtered. Returns NULL otherwise.*/
55
static inline struct bufferevent_pair *
56
upcast(struct bufferevent *bev)
57
0
{
58
0
  struct bufferevent_pair *bev_p;
59
0
  bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev.bev);
60
0
  EVUTIL_ASSERT(BEV_IS_PAIR(&bev_p->bev.bev));
61
0
  return bev_p;
62
0
}
63
64
0
#define downcast(bev_pair) (&(bev_pair)->bev.bev)
65
66
static inline void
67
incref_and_lock(struct bufferevent *b)
68
0
{
69
0
  struct bufferevent_pair *bevp;
70
0
  bufferevent_incref_and_lock_(b);
71
0
  bevp = upcast(b);
72
0
  if (bevp->partner)
73
0
    bufferevent_incref_and_lock_(downcast(bevp->partner));
74
0
}
75
76
static inline void
77
decref_and_unlock(struct bufferevent *b)
78
0
{
79
0
  struct bufferevent_pair *bevp = upcast(b);
80
0
  if (bevp->partner)
81
0
    bufferevent_decref_and_unlock_(downcast(bevp->partner));
82
0
  bufferevent_decref_and_unlock_(b);
83
0
}
84
85
/* XXX Handle close */
86
87
static void be_pair_outbuf_cb(struct evbuffer *,
88
    const struct evbuffer_cb_info *, void *);
89
90
static struct bufferevent_pair *
91
bufferevent_pair_elt_new(struct event_base *base,
92
    int options)
93
0
{
94
0
  struct bufferevent_pair *bufev;
95
0
  if (! (bufev = mm_calloc(1, sizeof(struct bufferevent_pair))))
96
0
    return NULL;
97
0
  if (bufferevent_init_common_(&bufev->bev, base, &bufferevent_ops_pair,
98
0
    options)) {
99
0
    mm_free(bufev);
100
0
    return NULL;
101
0
  }
102
0
  if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) {
103
0
    bufferevent_free(downcast(bufev));
104
0
    return NULL;
105
0
  }
106
107
0
  bufferevent_init_generic_timeout_cbs_(&bufev->bev.bev);
108
109
0
  return bufev;
110
0
}
111
112
int
113
bufferevent_pair_new(struct event_base *base, int options,
114
    struct bufferevent *pair[2])
115
0
{
116
0
  struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL;
117
0
  int tmp_options;
118
119
0
  options |= BEV_OPT_DEFER_CALLBACKS;
120
0
  tmp_options = options & ~BEV_OPT_THREADSAFE;
121
122
0
  bufev1 = bufferevent_pair_elt_new(base, options);
123
0
  if (!bufev1)
124
0
    return -1;
125
0
  bufev2 = bufferevent_pair_elt_new(base, tmp_options);
126
0
  if (!bufev2) {
127
0
    bufferevent_free(downcast(bufev1));
128
0
    return -1;
129
0
  }
130
131
0
  if (options & BEV_OPT_THREADSAFE) {
132
    /*XXXX check return */
133
0
    bufferevent_enable_locking_(downcast(bufev2), bufev1->bev.lock);
134
0
  }
135
136
0
  bufev1->partner = bufev2;
137
0
  bufev2->partner = bufev1;
138
139
0
  evbuffer_freeze(downcast(bufev1)->input, 0);
140
0
  evbuffer_freeze(downcast(bufev1)->output, 1);
141
0
  evbuffer_freeze(downcast(bufev2)->input, 0);
142
0
  evbuffer_freeze(downcast(bufev2)->output, 1);
143
144
0
  pair[0] = downcast(bufev1);
145
0
  pair[1] = downcast(bufev2);
146
147
0
  return 0;
148
0
}
149
150
static void
151
be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
152
    int ignore_wm)
153
0
{
154
0
  size_t dst_size;
155
0
  size_t n;
156
157
0
  evbuffer_unfreeze(src->output, 1);
158
0
  evbuffer_unfreeze(dst->input, 0);
159
160
0
  if (dst->wm_read.high) {
161
0
    dst_size = evbuffer_get_length(dst->input);
162
0
    if (dst_size < dst->wm_read.high) {
163
0
      n = dst->wm_read.high - dst_size;
164
0
      evbuffer_remove_buffer(src->output, dst->input, n);
165
0
    } else {
166
0
      if (!ignore_wm)
167
0
        goto done;
168
0
      n = evbuffer_get_length(src->output);
169
0
      evbuffer_add_buffer(dst->input, src->output);
170
0
    }
171
0
  } else {
172
0
    n = evbuffer_get_length(src->output);
173
0
    evbuffer_add_buffer(dst->input, src->output);
174
0
  }
175
176
0
  if (n) {
177
0
    BEV_RESET_GENERIC_READ_TIMEOUT(dst);
178
179
0
    if (evbuffer_get_length(dst->output))
180
0
      BEV_RESET_GENERIC_WRITE_TIMEOUT(dst);
181
0
    else
182
0
      BEV_DEL_GENERIC_WRITE_TIMEOUT(dst);
183
0
  }
184
185
0
  bufferevent_trigger_nolock_(dst, EV_READ, 0);
186
0
  bufferevent_trigger_nolock_(src, EV_WRITE, 0);
187
0
done:
188
0
  evbuffer_freeze(src->output, 1);
189
0
  evbuffer_freeze(dst->input, 0);
190
0
}
191
192
static inline int
193
be_pair_wants_to_talk(struct bufferevent_pair *src,
194
    struct bufferevent_pair *dst)
195
0
{
196
0
  return (downcast(src)->enabled & EV_WRITE) &&
197
0
      (downcast(dst)->enabled & EV_READ) &&
198
0
      !dst->bev.read_suspended &&
199
0
      evbuffer_get_length(downcast(src)->output);
200
0
}
201
202
static void
203
be_pair_outbuf_cb(struct evbuffer *outbuf,
204
    const struct evbuffer_cb_info *info, void *arg)
205
0
{
206
0
  struct bufferevent_pair *bev_pair = arg;
207
0
  struct bufferevent_pair *partner = bev_pair->partner;
208
209
0
  incref_and_lock(downcast(bev_pair));
210
211
0
  if (info->n_added > info->n_deleted && partner) {
212
    /* We got more data.  If the other side's reading, then
213
       hand it over. */
214
0
    if (be_pair_wants_to_talk(bev_pair, partner)) {
215
0
      be_pair_transfer(downcast(bev_pair), downcast(partner), 0);
216
0
    }
217
0
  }
218
219
0
  decref_and_unlock(downcast(bev_pair));
220
0
}
221
222
static int
223
be_pair_enable(struct bufferevent *bufev, short events)
224
0
{
225
0
  struct bufferevent_pair *bev_p = upcast(bufev);
226
0
  struct bufferevent_pair *partner = bev_p->partner;
227
228
0
  incref_and_lock(bufev);
229
230
0
  if (events & EV_READ) {
231
0
    BEV_RESET_GENERIC_READ_TIMEOUT(bufev);
232
0
  }
233
0
  if ((events & EV_WRITE) && evbuffer_get_length(bufev->output))
234
0
    BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
235
236
  /* We're starting to read! Does the other side have anything to write?*/
237
0
  if ((events & EV_READ) && partner &&
238
0
      be_pair_wants_to_talk(partner, bev_p)) {
239
0
    be_pair_transfer(downcast(partner), bufev, 0);
240
0
  }
241
  /* We're starting to write! Does the other side want to read? */
242
0
  if ((events & EV_WRITE) && partner &&
243
0
      be_pair_wants_to_talk(bev_p, partner)) {
244
0
    be_pair_transfer(bufev, downcast(partner), 0);
245
0
  }
246
0
  decref_and_unlock(bufev);
247
0
  return 0;
248
0
}
249
250
static int
251
be_pair_disable(struct bufferevent *bev, short events)
252
0
{
253
0
  if (events & EV_READ) {
254
0
    BEV_DEL_GENERIC_READ_TIMEOUT(bev);
255
0
  }
256
0
  if (events & EV_WRITE) {
257
0
    BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
258
0
  }
259
0
  return 0;
260
0
}
261
262
static void
263
be_pair_unlink(struct bufferevent *bev)
264
0
{
265
0
  struct bufferevent_pair *bev_p = upcast(bev);
266
267
0
  if (bev_p->partner) {
268
0
    bev_p->unlinked_partner = bev_p->partner;
269
0
    bev_p->partner->partner = NULL;
270
0
    bev_p->partner = NULL;
271
0
  }
272
0
}
273
274
/* Free *shared* lock in the latest be (since we share it between two of them). */
275
static void
276
be_pair_destruct(struct bufferevent *bev)
277
0
{
278
0
  struct bufferevent_pair *bev_p = upcast(bev);
279
280
  /* Transfer ownership of the lock into partner, otherwise we will use
281
   * already free'd lock during freeing second bev, see next example:
282
   *
283
   * bev1->own_lock = 1
284
   * bev2->own_lock = 0
285
   * bev2->lock = bev1->lock
286
   *
287
   * bufferevent_free(bev1) # refcnt == 0 -> unlink
288
   * bufferevent_free(bev2) # refcnt == 0 -> unlink
289
   *
290
   * event_base_free() -> finalizers -> EVTHREAD_FREE_LOCK(bev1->lock)
291
   *                                 -> BEV_LOCK(bev2->lock) <-- already freed
292
   *
293
   * Where bev1 == pair[0], bev2 == pair[1].
294
   */
295
0
  if (bev_p->unlinked_partner && bev_p->bev.own_lock) {
296
0
    bev_p->unlinked_partner->bev.own_lock = 1;
297
0
    bev_p->bev.own_lock = 0;
298
0
  }
299
0
  bev_p->unlinked_partner = NULL;
300
0
}
301
302
static int
303
be_pair_flush(struct bufferevent *bev, short iotype,
304
    enum bufferevent_flush_mode mode)
305
0
{
306
0
  struct bufferevent_pair *bev_p = upcast(bev);
307
0
  struct bufferevent *partner;
308
309
0
  if (!bev_p->partner)
310
0
    return -1;
311
312
0
  if (mode == BEV_NORMAL)
313
0
    return 0;
314
315
0
  incref_and_lock(bev);
316
317
0
  partner = downcast(bev_p->partner);
318
319
0
  if ((iotype & EV_READ) != 0)
320
0
    be_pair_transfer(partner, bev, 1);
321
322
0
  if ((iotype & EV_WRITE) != 0)
323
0
    be_pair_transfer(bev, partner, 1);
324
325
0
  if (mode == BEV_FINISHED) {
326
0
    short what = BEV_EVENT_EOF;
327
0
    if (iotype & EV_READ)
328
0
      what |= BEV_EVENT_WRITING;
329
0
    if (iotype & EV_WRITE)
330
0
      what |= BEV_EVENT_READING;
331
0
    bufferevent_run_eventcb_(partner, what, 0);
332
0
  }
333
0
  decref_and_unlock(bev);
334
0
  return 0;
335
0
}
336
337
struct bufferevent *
338
bufferevent_pair_get_partner(struct bufferevent *bev)
339
0
{
340
0
  struct bufferevent_pair *bev_p;
341
0
  struct bufferevent *partner = NULL;
342
0
  if (!BEV_IS_PAIR(bev))
343
0
    return NULL;
344
0
  bev_p = upcast(bev);
345
0
  incref_and_lock(bev);
346
0
  if (bev_p->partner)
347
0
    partner = downcast(bev_p->partner);
348
0
  decref_and_unlock(bev);
349
0
  return partner;
350
0
}
351
352
const struct bufferevent_ops bufferevent_ops_pair = {
353
  "pair_elt",
354
  evutil_offsetof(struct bufferevent_pair, bev.bev),
355
  be_pair_enable,
356
  be_pair_disable,
357
  be_pair_unlink,
358
  be_pair_destruct,
359
  bufferevent_generic_adj_timeouts_,
360
  be_pair_flush,
361
  NULL, /* ctrl */
362
};