Coverage Report

Created: 2026-03-31 06:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/haproxy/src/stconn.c
Line
Count
Source
1
/*
2
 * stream connector management functions
3
 *
4
 * Copyright 2021 Christopher Faulet <cfaulet@haproxy.com>
5
 *
6
 * This program is free software; you can redistribute it and/or
7
 * modify it under the terms of the GNU General Public License
8
 * as published by the Free Software Foundation; either version
9
 * 2 of the License, or (at your option) any later version.
10
 *
11
 */
12
13
#include <haproxy/api.h>
14
#include <haproxy/applet.h>
15
#include <haproxy/arg.h>
16
#include <haproxy/connection.h>
17
#include <haproxy/check.h>
18
#include <haproxy/filters.h>
19
#include <haproxy/hstream.h>
20
#include <haproxy/http_ana.h>
21
#include <haproxy/pipe.h>
22
#include <haproxy/pool.h>
23
#include <haproxy/proxy.h>
24
#include <haproxy/sample.h>
25
#include <haproxy/sc_strm.h>
26
#include <haproxy/stconn.h>
27
#include <haproxy/xref.h>
28
29
DECLARE_TYPED_POOL(pool_head_connstream, "stconn", struct stconn);
30
DECLARE_TYPED_POOL(pool_head_sedesc, "sedesc", struct sedesc);
31
32
static int sc_conn_recv(struct stconn *sc);
33
static int sc_conn_send(struct stconn *sc);
34
35
/* Initializes an endpoint */
36
void sedesc_init(struct sedesc *sedesc)
37
0
{
38
0
  sedesc->se = NULL;
39
0
  sedesc->conn = NULL;
40
0
  sedesc->sc = NULL;
41
0
  sedesc->lra = TICK_ETERNITY;
42
0
  sedesc->fsb = TICK_ETERNITY;
43
0
  sedesc->xref.peer = NULL;
44
0
  se_fl_setall(sedesc, SE_FL_NONE);
45
0
  sedesc->term_evts_log = 0;
46
0
  sedesc->abort_info.info = 0;
47
0
  sedesc->abort_info.code = 0;
48
49
0
  sedesc->iobuf.pipe = NULL;
50
0
  sedesc->iobuf.buf = NULL;
51
0
  sedesc->iobuf.offset = sedesc->iobuf.data = 0;
52
0
  sedesc->iobuf.flags = IOBUF_FL_NONE;
53
54
0
  sedesc->kip = 0;
55
0
  sedesc->kop = 0;
56
0
}
57
58
/* Tries to alloc an endpoint and initialize it. Returns NULL on failure. */
59
struct sedesc *sedesc_new()
60
0
{
61
0
  struct sedesc *sedesc;
62
63
0
  sedesc = pool_alloc(pool_head_sedesc);
64
0
  if (unlikely(!sedesc))
65
0
    return NULL;
66
67
0
  sedesc_init(sedesc);
68
0
  return sedesc;
69
0
}
70
71
/* Releases an endpoint. It is the caller responsibility to be sure it is safe
72
 * and it is not shared with another entity
73
 */
74
void sedesc_free(struct sedesc *sedesc)
75
0
{
76
0
  if (sedesc) {
77
0
    if (sedesc->iobuf.pipe)
78
0
      put_pipe(sedesc->iobuf.pipe);
79
0
    pool_free(pool_head_sedesc, sedesc);
80
0
  }
81
0
}
82
83
/* Performs a shutdown on the endpoint. This function deals with connection and
84
 * applet endpoints. It is responsible to set SE flags corresponding to the
85
 * given shut modes and to call right shutdown functions of the endpoint. It is
86
 * called from the sc_abort and sc_shutdown functions at the SC level.
87
 */
88
void se_shutdown(struct sedesc *sedesc, enum se_shut_mode mode)
89
0
{
90
0
  struct sedesc *sdo;
91
0
  struct se_abort_info *reason = NULL;
92
0
  unsigned int flags = 0;
93
94
0
  if ((mode & (SE_SHW_SILENT|SE_SHW_NORMAL)) && !se_fl_test(sedesc, SE_FL_SHW)) {
95
0
    se_report_term_evt(sedesc, se_tevt_type_shutw);
96
0
    flags |= (mode & SE_SHW_NORMAL) ? SE_FL_SHWN : SE_FL_SHWS;
97
0
  }
98
0
  if ((mode & (SE_SHR_RESET|SE_SHR_DRAIN)) && !se_fl_test(sedesc, SE_FL_SHR))
99
0
    flags |= (mode & SE_SHR_DRAIN) ? SE_FL_SHRD : SE_FL_SHRR;
100
101
0
  if (se_fl_test(sedesc, SE_FL_T_MUX)) {
102
0
    const struct mux_ops *mux = (sedesc->conn ? sedesc->conn->mux : NULL);
103
104
0
    if (flags) {
105
0
      if (mux && mux->shut) {
106
0
        sdo = se_opposite(sedesc);
107
0
        if (sdo)
108
0
          reason = &sdo->abort_info;
109
0
        CALL_MUX_NO_RET(mux, shut(sedesc->sc, mode, reason));
110
0
      }
111
0
      se_fl_set(sedesc, flags);
112
0
    }
113
0
  }
114
0
  else if (se_fl_test(sedesc, SE_FL_T_APPLET)) {
115
0
    struct appctx *appctx = sedesc->se;
116
117
0
    if (flags) {
118
0
      if (appctx->applet->shut) {
119
0
        sdo = se_opposite(sedesc);
120
0
        if (sdo)
121
0
          reason = &sdo->abort_info;
122
0
        appctx->applet->shut(appctx, mode, reason);
123
0
      }
124
0
      se_fl_set(sedesc, flags);
125
0
    }
126
127
0
    if (se_fl_test(sedesc, SE_FL_SHR) && se_fl_test(sedesc, SE_FL_SHW))
128
0
      appctx_shut(appctx);
129
0
  }
130
0
}
131
132
/* Tries to allocate a new stconn and initialize its main fields. On
133
 * failure, nothing is allocated and NULL is returned. It is an internal
134
 * function. The caller must, at least, set the SE_FL_ORPHAN or SE_FL_DETACHED
135
 * flag.
136
 */
137
static struct stconn *sc_new(struct sedesc *sedesc)
138
0
{
139
0
  struct stconn *sc;
140
141
0
  sc = pool_alloc(pool_head_connstream);
142
143
0
  if (unlikely(!sc))
144
0
    goto alloc_error;
145
146
0
  sc->obj_type = OBJ_TYPE_SC;
147
0
  sc->flags = SC_FL_NONE;
148
0
  sc->state = SC_ST_INI;
149
0
  sc->ioto = TICK_ETERNITY;
150
0
  sc->room_needed = 0;
151
0
  sc->app = NULL;
152
0
  sc->src = NULL;
153
0
  sc->dst = NULL;
154
0
  sc->bytes_in = sc->bytes_out = 0;
155
0
  sc->wait_event.tasklet = NULL;
156
0
  sc->wait_event.events = 0;
157
158
0
  sc->term_evts_log = 0;
159
160
  /* If there is no endpoint, allocate a new one now */
161
0
  if (!sedesc) {
162
0
    sedesc = sedesc_new();
163
0
    if (unlikely(!sedesc))
164
0
      goto alloc_error;
165
0
  }
166
0
  sc->sedesc = sedesc;
167
0
  sedesc->sc = sc;
168
169
0
  return sc;
170
171
0
  alloc_error:
172
0
  pool_free(pool_head_connstream, sc);
173
0
  return NULL;
174
0
}
175
176
/* Creates a new stream connector and its associated stream from a mux. <sd> must
177
 * be defined. It returns NULL on error. On success, the new stream connector is
178
 * returned. In this case, SE_FL_ORPHAN flag is removed.
179
 */
180
struct stconn *sc_new_from_endp(struct sedesc *sd, struct session *sess, struct buffer *input)
181
0
{
182
0
  struct stconn *sc;
183
184
0
  sc = sc_new(sd);
185
0
  if (unlikely(!sc))
186
0
    return NULL;
187
0
  if (unlikely(!sess->fe->stream_new_from_sc(sess, sc, input))) {
188
0
    sd->sc = NULL;
189
0
    if (sc->sedesc != sd) {
190
      /* none was provided so sc_new() allocated one */
191
0
      sedesc_free(sc->sedesc);
192
0
    }
193
0
    pool_free(pool_head_connstream, sc);
194
0
    se_fl_set(sd, SE_FL_ORPHAN);
195
0
    return NULL;
196
0
  }
197
0
  se_fl_clr(sd, SE_FL_ORPHAN);
198
0
  return sc;
199
0
}
200
201
/* Creates a new stream connector from an stream. There is no endpoint here, thus it
202
 * will be created by sc_new(). So the SE_FL_DETACHED flag is set. It returns
203
 * NULL on error. On success, the new stream connector is returned.
204
 */
205
struct stconn *sc_new_from_strm(struct stream *strm, unsigned int flags)
206
0
{
207
0
  struct stconn *sc;
208
209
0
  sc = sc_new(NULL);
210
0
  if (unlikely(!sc))
211
0
    return NULL;
212
0
  sc->flags |= flags;
213
214
0
  if (flags & SC_FL_ISBACK)
215
0
    sc_ep_set(sc, SE_FL_APP_STARTED);
216
217
0
  sc_ep_set(sc, SE_FL_DETACHED);
218
0
  sc->app = &strm->obj_type;
219
0
  return sc;
220
0
}
221
222
/* Creates a new stream connector from an health-check. There is no endpoint here,
223
 * thus it will be created by sc_new(). So the SE_FL_DETACHED flag is set. It
224
 * returns NULL on error. On success, the new stream connector is returned.
225
 */
226
struct stconn *sc_new_from_check(struct check *check)
227
0
{
228
0
  struct stconn *sc;
229
230
0
  sc = sc_new(NULL);
231
0
  if (unlikely(!sc))
232
0
    return NULL;
233
0
  sc->flags = SC_FL_ISBACK;
234
0
  sc_ep_set(sc, SE_FL_APP_STARTED);
235
0
  sc_ep_set(sc, SE_FL_DETACHED);
236
0
  sc->app = &check->obj_type;
237
0
  return sc;
238
0
}
239
240
/* Releases a stconn previously allocated by sc_new(), as well as its
241
 * endpoint, if it exists. This function is called internally or on error path.
242
 */
243
void sc_free(struct stconn *sc)
244
0
{
245
0
  sockaddr_free(&sc->src);
246
0
  sockaddr_free(&sc->dst);
247
0
  if (sc->sedesc) {
248
0
    BUG_ON(!sc_ep_test(sc, SE_FL_DETACHED));
249
0
    sedesc_free(sc->sedesc);
250
0
  }
251
0
  tasklet_free(sc->wait_event.tasklet);
252
0
  pool_free(pool_head_connstream, sc);
253
0
}
254
255
/* Conditionally removes a stream connector if it is detached and if there is no app
256
 * layer defined. Except on error path, this one must be used. if release, the
257
 * pointer on the SC is set to NULL.
258
 */
259
static void sc_free_cond(struct stconn **scp)
260
0
{
261
0
  struct stconn *sc = *scp;
262
263
0
  if (!sc->app && (!sc->sedesc || sc_ep_test(sc, SE_FL_DETACHED))) {
264
0
    sc_free(sc);
265
0
    *scp = NULL;
266
0
  }
267
0
}
268
269
270
/* Attaches a stconn to a mux endpoint and sets the endpoint ctx. Returns
271
 * -1 on error and 0 on success. SE_FL_DETACHED flag is removed. This function is
272
 * called from a mux when it is attached to a stream or a health-check.
273
 */
274
int sc_attach_mux(struct stconn *sc, void *sd, void *ctx)
275
0
{
276
0
  struct connection *conn = ctx;
277
0
  struct sedesc *sedesc = sc->sedesc;
278
279
0
  if (sc_strm(sc)) {
280
0
    if (!sc->wait_event.tasklet) {
281
0
      sc->wait_event.tasklet = tasklet_new();
282
0
      if (!sc->wait_event.tasklet)
283
0
        return -1;
284
0
      sc->wait_event.tasklet->process = sc_conn_io_cb;
285
0
      sc->wait_event.tasklet->context = sc;
286
0
      sc->wait_event.events = 0;
287
0
    }
288
289
0
    xref_create(&sc->sedesc->xref, &sc_opposite(sc)->sedesc->xref);
290
0
  }
291
0
  else if (sc_check(sc)) {
292
0
    if (!sc->wait_event.tasklet) {
293
0
      sc->wait_event.tasklet = tasklet_new();
294
0
      if (!sc->wait_event.tasklet)
295
0
        return -1;
296
0
      sc->wait_event.tasklet->process = srv_chk_io_cb;
297
0
      sc->wait_event.tasklet->context = sc;
298
0
      sc->wait_event.events = 0;
299
0
    }
300
0
  }
301
302
0
  sedesc->se = sd;
303
0
  sedesc->conn = ctx;
304
0
  se_fl_set(sedesc, SE_FL_T_MUX);
305
0
  se_fl_clr(sedesc, SE_FL_DETACHED);
306
0
  if (!conn->ctx)
307
0
    conn->ctx = sc;
308
0
  return 0;
309
0
}
310
311
/* Attaches a stconn to an applet endpoint and sets the endpoint
312
 * ctx. Returns -1 on error and 0 on success. SE_FL_DETACHED flag is
313
 * removed. This function is called by a stream when a backend applet is
314
 * registered.
315
 */
316
static int sc_attach_applet(struct stconn *sc, struct appctx *appctx)
317
0
{
318
0
  sc->sedesc->se = appctx;
319
0
  sc_ep_set(sc, SE_FL_T_APPLET);
320
0
  sc_ep_clr(sc, SE_FL_DETACHED);
321
0
  if (sc_strm(sc))
322
0
    xref_create(&sc->sedesc->xref, &sc_opposite(sc)->sedesc->xref);
323
324
0
  return 0;
325
0
}
326
327
/* Attaches a stconn to a app layer and sets the relevant
328
 * callbacks. Returns -1 on error and 0 on success. SE_FL_ORPHAN flag is
329
 * removed. This function is called by a stream when it is created to attach it
330
 * on the stream connector on the client side.
331
 */
332
int sc_attach_strm(struct stconn *sc, struct stream *strm)
333
0
{
334
0
  sc->app = &strm->obj_type;
335
0
  sc_ep_clr(sc, SE_FL_ORPHAN);
336
0
  sc_ep_report_read_activity(sc);
337
0
  if (sc_ep_test(sc, SE_FL_T_MUX)) {
338
0
    sc->wait_event.tasklet = tasklet_new();
339
0
    if (!sc->wait_event.tasklet)
340
0
      return -1;
341
0
    sc->wait_event.tasklet->process = sc_conn_io_cb;
342
0
    sc->wait_event.tasklet->context = sc;
343
0
    sc->wait_event.events = 0;
344
0
  }
345
0
  return 0;
346
0
}
347
348
/* Attach a stconn to a haterm layer and sets the relevant
349
 * callbacks. Returns -1 on error and 0 on success. SE_FL_ORPHAN flag is
350
 * removed. This function is called by a haterm stream when it is created
351
 * to attach it on the stream connector on the client side.
352
 */
353
int sc_attach_hstream(struct stconn *sc, struct hstream *hs)
354
0
{
355
0
  BUG_ON(!sc_ep_test(sc, SE_FL_T_MUX));
356
357
0
  sc->app = &hs->obj_type;
358
0
  sc_ep_clr(sc, SE_FL_ORPHAN);
359
0
  sc_ep_report_read_activity(sc);
360
0
  sc->wait_event.tasklet = tasklet_new();
361
0
  if (!sc->wait_event.tasklet)
362
0
    return -1;
363
364
0
  sc->wait_event.tasklet->process = sc_hstream_io_cb;
365
0
  sc->wait_event.tasklet->context = sc;
366
0
  sc->wait_event.events = 0;
367
0
  return 0;
368
0
}
369
370
/* Detaches the stconn from the endpoint, if any. For a connecrion, if a
371
 * mux owns the connection ->detach() callback is called. Otherwise, it means
372
 * the stream connector owns the connection. In this case the connection is closed
373
 * and released. For an applet, the appctx is released. If still allocated, the
374
 * endpoint is reset and flag as detached. If the app layer is also detached,
375
 * the stream connector is released.
376
 */
377
static void sc_detach_endp(struct stconn **scp)
378
0
{
379
0
  struct stconn *sc = *scp;
380
0
  struct xref *peer;
381
382
0
  if (!sc)
383
0
    return;
384
385
386
  /* Remove my link in the original objects. */
387
0
  peer = xref_get_peer_and_lock(&sc->sedesc->xref);
388
0
  if (peer)
389
0
    xref_disconnect(&sc->sedesc->xref, peer);
390
391
0
  if (sc_ep_test(sc, SE_FL_T_MUX)) {
392
0
    struct connection *conn = __sc_conn(sc);
393
0
    struct sedesc *sedesc = sc->sedesc;
394
395
0
    if (conn->mux) {
396
0
      if (sc->wait_event.events != 0)
397
0
        conn->mux->unsubscribe(sc, sc->wait_event.events, &sc->wait_event);
398
0
      se_fl_set(sedesc, SE_FL_ORPHAN);
399
0
      sedesc->sc = NULL;
400
0
      sc->sedesc = NULL;
401
0
      CALL_MUX_NO_RET(conn->mux, detach(sedesc));
402
0
    }
403
0
    else {
404
      /* It's too early to have a mux, let's just destroy
405
       * the connection
406
       */
407
0
      conn_stop_tracking(conn);
408
0
      conn_full_close(conn);
409
0
      if (conn->destroy_cb)
410
0
        conn->destroy_cb(conn);
411
0
      conn_free(conn);
412
0
    }
413
0
  }
414
0
  else if (sc_ep_test(sc, SE_FL_T_APPLET)) {
415
0
    struct appctx *appctx = __sc_appctx(sc);
416
417
0
    sc_ep_set(sc, SE_FL_ORPHAN);
418
0
    sc->sedesc->sc = NULL;
419
0
    sc->sedesc = NULL;
420
0
    se_shutdown(appctx->sedesc, SE_SHR_RESET|SE_SHW_NORMAL);
421
0
    appctx_free(appctx);
422
0
  }
423
424
0
  if (sc->sedesc) {
425
    /* the SD wasn't used and can be recycled */
426
0
    sc->sedesc->se     = NULL;
427
0
    sc->sedesc->conn   = NULL;
428
0
    sc->sedesc->flags  = 0;
429
0
    sc_ep_set(sc, SE_FL_DETACHED);
430
0
  }
431
432
  /* FIXME: Rest SC for now but must be reviewed. SC flags are only
433
   *        connection related for now but this will evolved
434
   */
435
0
  sc->flags &= SC_FL_ISBACK;
436
0
  sc_free_cond(scp);
437
0
}
438
439
/* Detaches the stconn from the app layer. If there is no endpoint attached
440
 * to the stconn
441
 */
442
static void sc_detach_app(struct stconn **scp)
443
0
{
444
0
  struct stconn *sc = *scp;
445
446
0
  if (!sc)
447
0
    return;
448
449
0
  sc->app = NULL;
450
0
  sockaddr_free(&sc->src);
451
0
  sockaddr_free(&sc->dst);
452
453
0
  tasklet_free(sc->wait_event.tasklet);
454
0
  sc->wait_event.tasklet = NULL;
455
0
  sc->wait_event.events = 0;
456
0
  sc_free_cond(scp);
457
0
}
458
459
/* Destroy the stconn. It is detached from its endpoint and its
460
 * application. After this call, the stconn must be considered as released.
461
 */
462
void sc_destroy(struct stconn *sc)
463
0
{
464
0
  sc_detach_endp(&sc);
465
0
  sc_detach_app(&sc);
466
0
  BUG_ON_HOT(sc);
467
0
}
468
469
/* Resets the stream connector endpoint. It happens when the app layer want to renew
470
 * its endpoint. For a connection retry for instance. If a mux or an applet is
471
 * attached, a new endpoint is created. Returns -1 on error and 0 on success.
472
 */
473
int sc_reset_endp(struct stconn *sc)
474
0
{
475
0
  struct sedesc *new_sd;
476
477
0
  BUG_ON(!sc->app);
478
479
0
  if (!__sc_endp(sc)) {
480
    /* endpoint not attached or attached to a mux with no
481
     * target. Thus the endpoint will not be release but just
482
     * reset. The app is still attached, the sc will not be
483
     * released.
484
     */
485
0
    sc_detach_endp(&sc);
486
0
    return 0;
487
0
  }
488
489
  /* allocate the new endpoint first to be able to set error if it
490
   * fails */
491
0
  new_sd = sedesc_new();
492
0
  if (!unlikely(new_sd))
493
0
    return -1;
494
495
  /* The app is still attached, the sc will not be released */
496
0
  sc_detach_endp(&sc);
497
0
  BUG_ON(!sc);
498
0
  BUG_ON(sc->sedesc);
499
0
  sc->sedesc = new_sd;
500
0
  sc->sedesc->sc = sc;
501
0
  sc->bytes_in = sc->bytes_out = 0;
502
0
  sc_ep_set(sc, SE_FL_DETACHED);
503
0
  return 0;
504
0
}
505
506
507
/* Create an applet to handle a stream connector as a new appctx. The SC will
508
 * wake it up every time it is solicited. The appctx must be deleted by the task
509
 * handler using sc_detach_endp(), possibly from within the function itself.
510
 * It also pre-initializes the applet's context and returns it (or NULL in case
511
 * it could not be allocated).
512
 */
513
struct appctx *sc_applet_create(struct stconn *sc, struct applet *app)
514
0
{
515
0
  struct appctx *appctx;
516
517
0
  appctx = appctx_new_here(app, sc->sedesc);
518
0
  if (!appctx)
519
0
    return NULL;
520
0
  if (sc_attach_applet(sc, appctx) == -1) {
521
0
    appctx_free_on_early_error(appctx);
522
0
    return NULL;
523
0
  }
524
0
  appctx->t->nice = __sc_strm(sc)->task->nice;
525
0
  applet_need_more_data(appctx);
526
0
  appctx_wakeup(appctx);
527
528
0
  sc->state = SC_ST_RDY;
529
0
  return appctx;
530
0
}
531
532
/* Conditionally forward the close to the write side. It return 1 if it can be
533
 * forwarded. It is the caller responsibility to forward the close to the write
534
 * side. Otherwise, 0 is returned. In this case, SC_FL_SHUT_WANTED flag may be set on
535
 * the consumer SC if we are only waiting for the outgoing data to be flushed.
536
 */
537
static inline int sc_cond_forward_shut(struct stconn *sc)
538
0
{
539
  /* The close must not be forwarded */
540
0
  if (!(sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) || !(sc->flags & SC_FL_NOHALF))
541
0
    return 0;
542
543
0
  if ((co_data(sc_ic(sc)) || sc_ep_have_ff_data(sc_opposite(sc))) && !(sc_ic(sc)->flags & CF_WRITE_TIMEOUT)) {
544
    /* the shutdown cannot be forwarded now because
545
     * we should flush outgoing data first. But instruct the output
546
     * channel it should be done ASAP.
547
     */
548
0
    sc_schedule_shutdown(sc);
549
0
    return 0;
550
0
  }
551
552
  /* the close can be immediately forwarded to the write side */
553
0
  return 1;
554
0
}
555
556
557
static inline int sc_is_fastfwd_supported(struct stconn *sc)
558
0
{
559
0
  return (!(global.tune.no_zero_copy_fwd & NO_ZERO_COPY_FWD) &&
560
0
    !(sc->flags & SC_FL_NO_FASTFWD) &&
561
0
    sc_ep_test(sc, SE_FL_MAY_FASTFWD_PROD) &&
562
0
    sc_ep_test(sc_opposite(sc), SE_FL_MAY_FASTFWD_CONS) &&
563
0
    sc_ic(sc)->to_forward);
564
0
}
565
566
/*
567
 * This function performs a shutdown-read on a detached stream connector in a
568
 * connected or init state (it does nothing for other states). It either shuts
569
 * the read side or marks itself as closed. The buffer flags are updated to
570
 * reflect the new state. If the stream connector has SC_FL_NOHALF, we also
571
 * forward the close to the write side. The owner task is woken up if it exists.
572
 */
573
void sc_abort(struct stconn *sc)
574
0
{
575
0
  struct channel *ic = sc_ic(sc);
576
577
0
  BUG_ON(!sc_strm(sc));
578
579
0
  if (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))
580
0
    return;
581
582
0
  sc->flags |= SC_FL_ABRT_DONE;
583
0
  ic->flags |= CF_READ_EVENT;
584
585
0
  if (!sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST))
586
0
    return;
587
588
0
  if (sc->flags & SC_FL_SHUT_DONE) {
589
0
    if (sc_ep_test(sc, SE_FL_T_MUX|SE_FL_T_APPLET))
590
0
      se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_SILENT);
591
592
0
    sc->state = SC_ST_DIS;
593
0
    if (sc->flags & SC_FL_ISBACK)
594
0
      __sc_strm(sc)->conn_exp = TICK_ETERNITY;
595
0
  }
596
0
  else if (sc_cond_forward_shut(sc))
597
0
    return sc_shutdown(sc);
598
599
0
  if (!sc_ep_test(sc, SE_FL_T_MUX|SE_FL_T_APPLET)) {
600
    /* note that if the task exists, it must unregister itself once it runs */
601
0
    if (!(sc->flags & SC_FL_DONT_WAKE))
602
0
      task_wakeup(sc_strm_task(sc), TASK_WOKEN_IO);
603
0
  }
604
0
}
605
606
/*
607
 * This function performs a shutdown-write on a detached stream connector in a
608
 * connected or init state (it does nothing for other states). It either shuts
609
 * the write side or marks itself as closed. The buffer flags are updated to
610
 * reflect the new state. It does also close everything if the SC was marked as
611
 * being in error state. The owner task is woken up if it exists.
612
 */
613
void sc_shutdown(struct stconn *sc)
614
0
{
615
616
0
  struct channel *ic = sc_ic(sc);
617
0
  struct channel *oc = sc_oc(sc);
618
619
0
  BUG_ON(!sc_strm(sc));
620
621
0
  sc->flags &= ~SC_FL_SHUT_WANTED;
622
0
  if (sc->flags & SC_FL_SHUT_DONE)
623
0
    return;
624
0
  sc->flags |= SC_FL_SHUT_DONE;
625
0
  oc->flags |= CF_WRITE_EVENT;
626
0
  sc_set_hcto(sc);
627
0
  sc_report_term_evt(sc, strm_tevt_type_shutw);
628
629
0
  if (sc_ep_test(sc, SE_FL_T_APPLET)) {
630
    /* on shutw we always wake the applet up */
631
0
    appctx_wakeup(__sc_appctx(sc));
632
0
  }
633
634
0
  switch (sc->state) {
635
0
  case SC_ST_RDY:
636
0
  case SC_ST_EST:
637
    /* we have to shut before closing, otherwise some short messages
638
     * may never leave the system, especially when there are remaining
639
     * unread data in the socket input buffer, or when nolinger is set.
640
     * However, if SC_FL_NOLINGER is explicitly set, we know there is
641
     * no risk so we close both sides immediately.
642
     */
643
0
    if (!(sc->flags & (SC_FL_ERROR|SC_FL_NOLINGER|SC_FL_EOS|SC_FL_ABRT_DONE)) &&
644
0
        !(ic->flags & CF_DONT_READ)) {
645
0
      if (sc_ep_test(sc, SE_FL_T_MUX|SE_FL_T_APPLET))
646
0
        se_shutdown(sc->sedesc, SE_SHW_NORMAL);
647
0
      return;
648
0
    }
649
650
0
    if (sc_ep_test(sc, SE_FL_T_MUX|SE_FL_T_APPLET))
651
0
      se_shutdown(sc->sedesc, SE_SHR_RESET|((sc->flags & SC_FL_NOLINGER) ? SE_SHW_SILENT : SE_SHW_NORMAL));
652
653
0
    sc->state = SC_ST_DIS;
654
0
    break;
655
656
0
  case SC_ST_CON:
657
0
    if (sc_ep_test(sc, SE_FL_T_MUX)) {
658
      /* we may have to close a pending connection, and mark the
659
       * response buffer as abort
660
       */
661
0
      se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_SILENT);
662
0
    }
663
0
    __fallthrough;
664
0
  case SC_ST_CER:
665
0
  case SC_ST_QUE:
666
0
  case SC_ST_TAR:
667
    /* Note that none of these states may happen with applets */
668
0
    sc->state = SC_ST_DIS;
669
0
    break;
670
0
  default:
671
0
    break;
672
0
  }
673
674
0
  sc->flags &= ~SC_FL_NOLINGER;
675
0
  if (!(sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)))
676
0
    sc->flags |= SC_FL_ABRT_DONE;
677
0
  if (sc->flags & SC_FL_ISBACK)
678
0
    __sc_strm(sc)->conn_exp = TICK_ETERNITY;
679
680
0
  if (!sc_ep_test(sc, SE_FL_T_MUX|SE_FL_T_APPLET)) {
681
    /* note that if the task exists, it must unregister itself once it runs */
682
0
    if (!(sc->flags & SC_FL_DONT_WAKE))
683
0
      task_wakeup(sc_strm_task(sc), TASK_WOKEN_IO);
684
0
  }
685
0
}
686
687
/* This is to be used after making some room available in a channel. It will
688
 * return without doing anything if the stream connector's RX path is blocked.
689
 * It will automatically mark the stream connector as busy processing the end
690
 * point in order to avoid useless repeated wakeups.
691
 * It will then woken the right entity to enable receipt of new data.
692
 */
693
void sc_chk_rcv(struct stconn *sc)
694
0
{
695
0
  BUG_ON(!sc_strm(sc));
696
697
0
  if (sc_ep_test(sc, SE_FL_APPLET_NEED_CONN) &&
698
0
      sc_state_in(sc_opposite(sc)->state, SC_SB_RDY|SC_SB_EST|SC_SB_DIS|SC_SB_CLO)) {
699
0
    sc_ep_clr(sc, SE_FL_APPLET_NEED_CONN);
700
0
    sc_ep_report_read_activity(sc);
701
0
  }
702
703
0
  if (!sc_is_recv_allowed(sc))
704
0
    return;
705
706
0
  if (!sc_state_in(sc->state, SC_SB_RDY|SC_SB_EST))
707
0
    return;
708
709
0
  sc_ep_set(sc, SE_FL_HAVE_NO_DATA);
710
711
  /* (re)start reading */
712
0
  if (sc_ep_test(sc, SE_FL_T_MUX)) {
713
0
    if (sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST))
714
0
      tasklet_wakeup(sc->wait_event.tasklet, TASK_WOKEN_IO);
715
0
  }
716
0
  else if  (sc_ep_test(sc, SE_FL_T_APPLET)) {
717
0
    if (!sc_ep_have_ff_data(sc_opposite(sc)))
718
0
      appctx_wakeup(__sc_appctx(sc));
719
0
  }
720
0
  else {
721
    /* In theory, it should not happen. This CHECK_IF will be used to validate it (or not...) */
722
0
    CHECK_IF(!sc_ep_test(sc, SE_FL_T_MUX|SE_FL_T_APPLET));
723
0
    if (!(sc->flags & SC_FL_DONT_WAKE))
724
0
      task_wakeup(sc_strm_task(sc), TASK_WOKEN_IO);
725
0
  }
726
0
}
727
728
729
/* This function is used for inter-stream connector calls. It is called by the
730
 * producer to inform the consumer side that it may be interested in checking
731
 * for data in the buffer. Note that it intentionally does not update timeouts,
732
 * so that we can still check them later at wake-up.
733
 */
734
static inline void sc_chk_snd(struct stconn *sc)
735
0
{
736
0
  struct channel *oc = sc_oc(sc);
737
738
0
  BUG_ON(!sc_strm(sc));
739
740
0
  if (sc_ep_test(sc, SE_FL_T_MUX)) {
741
0
    if (unlikely(!sc_state_in(sc->state, SC_SB_RDY|SC_SB_EST) ||
742
0
           (sc->flags & SC_FL_SHUT_DONE)))
743
0
      return;
744
745
0
    if (unlikely(!co_data(oc) && !sc_ep_have_ff_data(sc)))  /* called with nothing to send ! */
746
0
      return;
747
748
0
    if (!sc_ep_have_ff_data(sc) &&              /* data wants to be fast-forwarded ASAP */
749
0
        !sc_ep_test(sc, SE_FL_WAIT_DATA))       /* not waiting for data */
750
0
      return;
751
752
0
    if (!(sc->wait_event.events & SUB_RETRY_SEND))
753
0
      sc_conn_send(sc);
754
755
0
    if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING) || sc_is_conn_error(sc)) {
756
      /* Write error on the file descriptor */
757
0
      BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING));
758
0
      goto out_wakeup;
759
0
    }
760
761
    /* OK, so now we know that some data might have been sent, and that we may
762
     * have to poll first. We have to do that too if the buffer is not empty.
763
     */
764
0
    if (!co_data(oc) && !sc_ep_have_ff_data(sc)) {
765
      /* the connection is established but we can't write. Either the
766
       * buffer is empty, or we just refrain from sending because the
767
       * ->o limit was reached. Maybe we just wrote the last
768
       * chunk and need to close.
769
       */
770
0
      if ((oc->flags & CF_AUTO_CLOSE) &&
771
0
          ((sc->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) == SC_FL_SHUT_WANTED) &&
772
0
          sc_state_in(sc->state, SC_SB_RDY|SC_SB_EST)) {
773
0
        sc_shutdown(sc);
774
0
        goto out_wakeup;
775
0
      }
776
777
0
      if ((sc->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) == 0)
778
0
        sc_ep_set(sc, SE_FL_WAIT_DATA);
779
0
    }
780
0
    else {
781
      /* Otherwise there are remaining data to be sent in the buffer,
782
       * which means we have to poll before doing so.
783
       */
784
0
      sc_ep_clr(sc, SE_FL_WAIT_DATA);
785
0
    }
786
787
    /* in case of special condition (error, shutdown, end of write...), we
788
     * have to notify the task.
789
     */
790
0
    if (likely((sc->flags & SC_FL_SHUT_DONE) ||
791
0
         ((oc->flags & CF_WRITE_EVENT) && sc->state < SC_ST_EST) ||
792
0
         ((oc->flags & CF_WAKE_WRITE) &&
793
0
          ((!co_data(oc) && !oc->to_forward) ||
794
0
           !sc_state_in(sc->state, SC_SB_EST))))) {
795
0
    out_wakeup:
796
0
      if (!(sc->flags & SC_FL_DONT_WAKE))
797
0
        task_wakeup(sc_strm_task(sc), TASK_WOKEN_IO);
798
0
    }
799
0
  }
800
0
  else if  (sc_ep_test(sc, SE_FL_T_APPLET)) {
801
0
    if (unlikely(sc->state != SC_ST_EST || (sc->flags & SC_FL_SHUT_DONE)))
802
0
      return;
803
804
    /* we only wake the applet up if it was waiting for some data  and is ready to consume it */
805
0
    if (!sc_ep_test(sc, SE_FL_WAIT_DATA|SE_FL_WONT_CONSUME))
806
0
      return;
807
808
0
    if (co_data(oc) || sc_ep_have_ff_data(sc)) {
809
      /* (re)start sending */
810
0
      appctx_wakeup(__sc_appctx(sc));
811
0
    }
812
0
  }
813
0
  else {
814
    /* In theory, it should not happen. This CHECK_IF will be used to validate it (or not...) */
815
0
    CHECK_IF(!sc_ep_test(sc, SE_FL_T_MUX|SE_FL_T_APPLET));
816
817
0
    if (unlikely(sc->state != SC_ST_EST || (sc->flags & SC_FL_SHUT_DONE)))
818
0
      return;
819
820
0
    if (!sc_ep_test(sc, SE_FL_WAIT_DATA) ||  /* not waiting for data */
821
0
        (!co_data(oc) && !sc_ep_have_ff_data(sc)))  /* called with nothing to send ! */
822
0
      return;
823
824
    /* Otherwise there are remaining data to be sent in the buffer,
825
     * so we tell the handler.
826
     */
827
0
    sc_ep_clr(sc, SE_FL_WAIT_DATA);
828
0
    if (!(sc->flags & SC_FL_DONT_WAKE))
829
0
      task_wakeup(sc_strm_task(sc), TASK_WOKEN_IO);
830
0
  }
831
0
}
832
833
/* This function is designed to be called from within the stream handler to
834
 * update the input channel's expiration timer and the stream connector's
835
 * Rx flags based on the channel's flags. It needs to be called only once
836
 * after the channel's flags have settled down, and before they are cleared,
837
 * though it doesn't harm to call it as often as desired (it just slightly
838
 * hurts performance). It must not be called from outside of the stream
839
 * handler, as what it does will be used to compute the stream task's
840
 * expiration.
841
 */
842
void sc_update_rx(struct stconn *sc)
843
0
{
844
0
  struct channel *ic = sc_ic(sc);
845
846
0
  if (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))
847
0
    return;
848
849
  /* Unblock the SC if it needs room and the free space is large enough (0
850
   * means it can always be unblocked). Do not unblock it if -1 was
851
   * specified.
852
   */
853
0
  if (!sc->room_needed || (sc->room_needed > 0 && channel_recv_max(ic) >= sc->room_needed))
854
0
    sc_have_room(sc);
855
856
  /* Read not closed, update FD status and timeout for reads */
857
0
  if (ic->flags & CF_DONT_READ)
858
0
    sc_wont_read(sc);
859
0
  else
860
0
    sc_will_read(sc);
861
862
0
  sc_chk_rcv(sc);
863
0
}
864
865
/* This function is designed to be called from within the stream handler to
866
 * update the output channel's expiration timer and the stream connector's
867
 * Tx flags based on the channel's flags. It needs to be called only once
868
 * after the channel's flags have settled down, and before they are cleared,
869
 * though it doesn't harm to call it as often as desired (it just slightly
870
 * hurts performance). It must not be called from outside of the stream
871
 * handler, as what it does will be used to compute the stream task's
872
 * expiration.
873
 */
874
void sc_update_tx(struct stconn *sc)
875
0
{
876
0
  struct channel *oc = sc_oc(sc);
877
878
0
  if (sc->flags & SC_FL_SHUT_DONE)
879
0
    return;
880
881
  /* Write not closed, update FD status and timeout for writes */
882
0
  if (!co_data(oc)) {
883
    /* stop writing */
884
0
    if (!sc_ep_test(sc, SE_FL_WAIT_DATA)) {
885
0
      if ((sc->flags & SC_FL_SHUT_WANTED) == 0)
886
0
        sc_ep_set(sc, SE_FL_WAIT_DATA);
887
0
    }
888
0
    return;
889
0
  }
890
891
  /* (re)start writing */
892
0
  sc_ep_clr(sc, SE_FL_WAIT_DATA);
893
0
}
894
895
/* This function is the equivalent to sc_update() except that it's
896
 * designed to be called from outside the stream handlers, typically the lower
897
 * layers (applets, connections) after I/O completion. After updating the stream
898
 * interface and timeouts, it will try to forward what can be forwarded, then to
899
 * wake the associated task up if an important event requires special handling.
900
 * It may update SE_FL_WAIT_DATA and/or SC_FL_NEED_ROOM, that the callers are
901
 * encouraged to watch to take appropriate action.
902
 * It should not be called from within the stream itself, sc_update()
903
 * is designed for this. Please do not statify this function, it's often
904
 * present in backtraces, it's useful to recognize it.
905
 */
906
void sc_notify(struct stconn *sc)
907
0
{
908
0
  struct channel *ic = sc_ic(sc);
909
0
  struct channel *oc = sc_oc(sc);
910
0
  struct stconn *sco = sc_opposite(sc);
911
0
  struct task *task = sc_strm_task(sc);
912
913
  /* process consumer side */
914
0
  if (!co_data(oc) && !sc_ep_have_ff_data(sc)) {
915
0
    struct connection *conn = sc_conn(sc);
916
917
0
    if (((sc->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) == SC_FL_SHUT_WANTED) &&
918
0
        (sc->state == SC_ST_EST) && (!conn || !(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS))))
919
0
      sc_shutdown(sc);
920
0
  }
921
922
  /* indicate that we may be waiting for data from the output channel or
923
   * we're about to close and can't expect more data if SC_FL_SHUT_WANTED is there.
924
   */
925
0
  if (!(sc->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)))
926
0
    sc_ep_set(sc, SE_FL_WAIT_DATA);
927
0
  else if ((sc->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) == SC_FL_SHUT_WANTED)
928
0
    sc_ep_clr(sc, SE_FL_WAIT_DATA);
929
930
0
  if (oc->flags & CF_DONT_READ)
931
0
    sc_wont_read(sco);
932
0
  else
933
0
    sc_will_read(sco);
934
935
  /* Notify the other side when we've injected data into the IC that
936
   * needs to be forwarded. We can do fast-forwarding as soon as there
937
   * are output data, but we avoid doing this if some of the data are
938
   * not yet scheduled for being forwarded, because it is very likely
939
   * that it will be done again immediately afterwards once the following
940
   * data are parsed (eg: HTTP chunking). We only clear SC_FL_NEED_ROOM
941
   * once we've emptied *some* of the output buffer, and not just when
942
   * there is available room, because applets are often forced to stop
943
   * before the buffer is full. We must not stop based on input data
944
   * alone because an HTTP parser might need more data to complete the
945
   * parsing.
946
   */
947
0
  if (sc_ep_have_ff_data(sc_opposite(sc)) ||
948
0
      (co_data(ic) && sc_ep_test(sco, SE_FL_WAIT_DATA) &&
949
0
       (!HAS_DATA_FILTERS(__sc_strm(sc), ic) || channel_input_data(ic) == 0) &&
950
0
       (!(sc->flags & SC_FL_SND_EXP_MORE) || channel_full(ic, co_data(ic)) || channel_input_data(ic) == 0))) {
951
0
    int new_len, last_len;
952
953
0
    last_len = co_data(ic) + sc_ep_ff_data(sco);
954
0
    sc_chk_snd(sco);
955
0
    new_len = co_data(ic) + sc_ep_ff_data(sco);
956
957
    /* check if the consumer has freed some space either in the
958
     * buffer or in the pipe.
959
     */
960
0
    if (!sc->room_needed || (new_len < last_len && (sc->room_needed < 0 || channel_recv_max(ic) >= sc->room_needed)))
961
0
      sc_have_room(sc);
962
0
  }
963
964
0
  if (!(ic->flags & CF_DONT_READ))
965
0
    sc_will_read(sc);
966
967
0
  sc_chk_rcv(sc);
968
0
  sc_chk_rcv(sco);
969
970
  /* wake the task up only when needed */
971
0
  if (/* changes on the production side that must be handled:
972
       *  - An error on receipt: SC_FL_ERROR
973
       *  - A read event: shutdown for reads (CF_READ_EVENT + EOS/ABRT_DONE)
974
       *                  end of input (CF_READ_EVENT + SC_FL_EOI)
975
       *                  data received and no fast-forwarding (CF_READ_EVENT + !to_forward)
976
       *                  read event while consumer side is not established (CF_READ_EVENT + sco->state != SC_ST_EST)
977
       */
978
0
    ((ic->flags & CF_READ_EVENT) && ((sc->flags & SC_FL_EOI) || (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) || !ic->to_forward || sco->state != SC_ST_EST)) ||
979
0
    (sc->flags & SC_FL_ERROR) ||
980
981
      /* changes on the consumption side */
982
0
      sc_ep_test(sc, SE_FL_ERR_PENDING) ||
983
0
      ((oc->flags & CF_WRITE_EVENT) &&
984
0
       ((sc->state < SC_ST_EST) ||
985
0
        (sc->flags & SC_FL_SHUT_DONE) ||
986
0
        (((oc->flags & CF_WAKE_WRITE) ||
987
0
    (!(oc->flags & CF_AUTO_CLOSE) &&
988
0
     !(sc->flags & (SC_FL_SHUT_WANTED|SC_FL_SHUT_DONE)))) &&
989
0
         (sco->state != SC_ST_EST ||
990
0
    (!co_data(oc) && !oc->to_forward)))))) {
991
0
    task_wakeup(task, TASK_WOKEN_IO);
992
0
  }
993
0
  else {
994
    /* Update expiration date for the task and requeue it if not already expired.
995
     * Only I/O timeouts are evaluated. The stream is responsible of others.
996
     */
997
0
    if (!tick_is_expired(task->expire, now_ms)) {
998
0
      task->expire = tick_first(task->expire, sc_ep_rcv_ex(sc));
999
0
      task->expire = tick_first(task->expire, sc_ep_snd_ex(sc));
1000
0
      task->expire = tick_first(task->expire, sc_ep_rcv_ex(sco));
1001
0
      task->expire = tick_first(task->expire, sc_ep_snd_ex(sco));
1002
1003
0
      BUG_ON(tick_is_expired(task->expire, now_ms));
1004
0
      task_queue(task);
1005
0
    }
1006
0
  }
1007
1008
0
  if (ic->flags & CF_READ_EVENT)
1009
0
    sc->flags &= ~SC_FL_RCV_ONCE;
1010
0
}
1011
1012
/*
1013
 * This function propagates an end-of-stream received on a socket-based connection.
1014
 * It updates the stream connector. If the stream connector has SC_FL_NOHALF,
1015
 * the close is also forwarded to the write side as an abort.
1016
 */
1017
static void sc_conn_eos(struct stconn *sc)
1018
0
{
1019
0
  struct channel *ic = sc_ic(sc);
1020
1021
0
  BUG_ON(!sc_conn(sc));
1022
1023
0
  if (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))
1024
0
    return;
1025
0
  sc->flags |= SC_FL_EOS;
1026
0
  ic->flags |= CF_READ_EVENT;
1027
0
  sc_ep_report_read_activity(sc);
1028
0
  sc_report_term_evt(sc, (sc->flags & SC_FL_EOI ? strm_tevt_type_eos: strm_tevt_type_truncated_eos));
1029
0
  if (sc->state != SC_ST_EST)
1030
0
    return;
1031
1032
0
  if (sc->flags & SC_FL_SHUT_DONE)
1033
0
    goto do_close;
1034
1035
0
  if (sc_cond_forward_shut(sc)) {
1036
    /* we want to immediately forward this close to the write side */
1037
    /* force flag on ssl to keep stream in cache */
1038
0
    goto do_close;
1039
0
  }
1040
1041
  /* otherwise that's just a normal read shutdown */
1042
0
  return;
1043
1044
0
 do_close:
1045
  /* OK we completely close the socket here just as if we went through sc_shut[rw]() */
1046
0
  se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_SILENT);
1047
1048
0
  sc->flags &= ~SC_FL_SHUT_WANTED;
1049
0
  sc->flags |= SC_FL_SHUT_DONE;
1050
1051
0
  sc->state = SC_ST_DIS;
1052
0
  if (sc->flags & SC_FL_ISBACK)
1053
0
    __sc_strm(sc)->conn_exp = TICK_ETERNITY;
1054
0
  return;
1055
0
}
1056
1057
/*
1058
 * This is the callback which is called by the connection layer to receive data
1059
 * into the buffer from the connection. It iterates over the mux layer's
1060
 * rcv_buf function. Please do not statify this function, it's often present in
1061
 * backtraces, it's useful to recognize it.
1062
 */
1063
int sc_conn_recv(struct stconn *sc)
1064
0
{
1065
0
  struct connection *conn = __sc_conn(sc);
1066
0
  struct channel *ic = sc_ic(sc);
1067
0
  int ret, max, cur_read = 0;
1068
0
  int read_poll = MAX_READ_POLL_LOOPS;
1069
0
  int flags = 0;
1070
1071
  /* If not established yet, do nothing. */
1072
0
  if (sc->state != SC_ST_EST)
1073
0
    return 0;
1074
1075
  /* If another call to sc_conn_recv() failed, and we subscribed to
1076
   * recv events already, give up now.
1077
   */
1078
0
  if ((sc->wait_event.events & SUB_RETRY_RECV) || sc_waiting_room(sc))
1079
0
    return 0;
1080
1081
  /* maybe we were called immediately after an asynchronous abort */
1082
0
  if (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))
1083
0
    return 1;
1084
1085
  /* we must wait because the mux is not installed yet */
1086
0
  if (!conn->mux)
1087
0
    return 0;
1088
1089
  /* stop immediately on errors. Note that we DON'T want to stop on
1090
   * POLL_ERR, as the poller might report a write error while there
1091
   * are still data available in the recv buffer. This typically
1092
   * happens when we send too large a request to a backend server
1093
   * which rejects it before reading it all.
1094
   */
1095
0
  if (!sc_ep_test(sc, SE_FL_RCV_MORE)) {
1096
0
    if (!conn_xprt_ready(conn))
1097
0
      return 0;
1098
0
    if (sc_ep_test(sc, SE_FL_ERROR))
1099
0
      goto end_recv;
1100
0
  }
1101
1102
  /* prepare to detect if the mux needs more room */
1103
0
  sc_ep_clr(sc, SE_FL_WANT_ROOM);
1104
1105
0
  channel_check_idletimer(ic);
1106
1107
#if defined(USE_LINUX_SPLICE)
1108
  /* Detect if the splicing is possible depending on the stream policy */
1109
  if ((global.tune.options & GTUNE_USE_SPLICE) &&
1110
      (ic->to_forward >= MIN_SPLICE_FORWARD) &&
1111
      ((!(sc->flags & SC_FL_ISBACK) && ((strm_fe(__sc_strm(sc))->options2|__sc_strm(sc)->be->options2) & PR_O2_SPLIC_REQ)) ||
1112
       ((sc->flags & SC_FL_ISBACK) && ((strm_fe(__sc_strm(sc))->options2|__sc_strm(sc)->be->options2) & PR_O2_SPLIC_RTR)) ||
1113
       ((ic->flags & CF_STREAMER_FAST) && ((strm_sess(__sc_strm(sc))->fe->options2|__sc_strm(sc)->be->options2) & PR_O2_SPLIC_AUT))))
1114
    flags |= CO_RFL_MAY_SPLICE;
1115
#endif
1116
1117
  /* First, let's see if we may fast-forward data from a side to the other
1118
   * one without using the channel buffer.
1119
   */
1120
0
  if (sc_is_fastfwd_supported(sc)) {
1121
0
    if (channel_data(ic)) {
1122
      /* We're embarrassed, there are already data pending in
1123
       * the buffer and we don't want to have them at two
1124
       * locations at a time. Let's indicate we need some
1125
       * place and ask the consumer to hurry.
1126
       */
1127
0
      flags |= CO_RFL_BUF_FLUSH;
1128
0
      goto abort_fastfwd;
1129
0
    }
1130
0
    sc_ep_fwd_kip(sc, sc_opposite(sc));
1131
0
    ret = CALL_MUX_WITH_RET(conn->mux, fastfwd(sc, ic->to_forward, flags));
1132
0
    if (ret < 0)
1133
0
      goto abort_fastfwd;
1134
0
    else if (ret > 0) {
1135
0
      if (ic->to_forward != CHN_INFINITE_FORWARD)
1136
0
        ic->to_forward -= ret;
1137
0
      sc->bytes_in += ret;
1138
0
      cur_read += ret;
1139
0
      ic->flags |= CF_READ_EVENT;
1140
0
    }
1141
1142
0
    if (sc_ep_test(sc, SE_FL_EOS | SE_FL_ERROR))
1143
0
      goto end_recv;
1144
1145
0
    if (sc_ep_test(sc, SE_FL_WANT_ROOM))
1146
0
      sc_need_room(sc, -1);
1147
1148
0
    if (sc_ep_test(sc, SE_FL_MAY_FASTFWD_PROD) && ic->to_forward)
1149
0
      goto done_recv;
1150
0
  }
1151
1152
0
 abort_fastfwd:
1153
  /* now we'll need a input buffer for the stream */
1154
0
  if (!sc_alloc_ibuf(sc, &(__sc_strm(sc)->buffer_wait)))
1155
0
    goto end_recv;
1156
1157
  /* For an HTX stream, if the buffer is stuck (no output data with some
1158
   * input data) and if the HTX message is fragmented or if its free space
1159
   * wraps, we force an HTX deframentation. It is a way to have a
1160
   * contiguous free space nad to let the mux to copy as much data as
1161
   * possible.
1162
   *
1163
   * NOTE: A possible optim may be to let the mux decides if defrag is
1164
   *       required or not, depending on amount of data to be xferred.
1165
   */
1166
0
  if (IS_HTX_STRM(__sc_strm(sc)) && !co_data(ic)) {
1167
0
    struct htx *htx = htxbuf(&ic->buf);
1168
1169
0
    if (htx_is_not_empty(htx) && ((htx->flags & HTX_FL_FRAGMENTED) || htx_space_wraps(htx)))
1170
0
      htx_defrag(htx, NULL, 0);
1171
0
  }
1172
1173
  /* Instruct the mux it must subscribed for read events */
1174
0
  if (!(sc->flags & SC_FL_ISBACK) &&                         /* for frontend conns only */
1175
0
      (sc_opposite(sc)->state != SC_ST_INI) &&               /* before backend connection setup */
1176
0
      proxy_abrt_close(__sc_strm(sc)->be))                   /* if abortonclose option is set for the current backend */
1177
0
    flags |= CO_RFL_KEEP_RECV;
1178
1179
  /* Important note : if we're called with POLL_IN|POLL_HUP, it means the read polling
1180
   * was enabled, which implies that the recv buffer was not full. So we have a guarantee
1181
   * that if such an event is not handled above in splice, it will be handled here by
1182
   * recv().
1183
   */
1184
0
  while (sc_ep_test(sc, SE_FL_RCV_MORE) ||
1185
0
         (!(conn->flags & CO_FL_HANDSHAKE) &&
1186
0
    (!sc_ep_test(sc, SE_FL_ERROR | SE_FL_EOS)) && !(sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)))) {
1187
0
    int cur_flags = flags;
1188
1189
    /* Compute transient CO_RFL_* flags */
1190
0
    if (co_data(ic)) {
1191
0
      cur_flags |= (CO_RFL_BUF_WET | CO_RFL_BUF_NOT_STUCK);
1192
0
    }
1193
1194
    /* <max> may be null. This is the mux responsibility to set
1195
     * SE_FL_RCV_MORE on the SC if more space is needed.
1196
     */
1197
0
    max = channel_recv_max(ic);
1198
0
    if (b_is_small(sc_ib(sc)) || ((ic->flags & CF_WROTE_DATA) && b_is_large(sc_ib(sc))))
1199
0
      max = 0;
1200
0
    ret = CALL_MUX_WITH_RET(conn->mux, rcv_buf(sc, &ic->buf, max, cur_flags));
1201
1202
0
    if (sc_ep_test(sc, SE_FL_WANT_ROOM)) {
1203
      /* SE_FL_WANT_ROOM must not be reported if the channel's
1204
       * buffer is empty.
1205
       */
1206
0
      BUG_ON(c_empty(ic));
1207
1208
0
      sc_need_room(sc, channel_recv_max(ic) + 1);
1209
      /* Add READ_PARTIAL because some data are pending but
1210
       * cannot be xferred to the channel
1211
       */
1212
0
      ic->flags |= CF_READ_EVENT;
1213
0
      sc_ep_report_read_activity(sc);
1214
0
    }
1215
1216
0
    if (ret <= 0) {
1217
      /* if we refrained from reading because we asked for a
1218
       * flush to satisfy rcv_pipe(), we must not subscribe
1219
       * and instead report that there's not enough room
1220
       * here to proceed.
1221
       */
1222
0
      if (flags & CO_RFL_BUF_FLUSH)
1223
0
        sc_need_room(sc, -1);
1224
0
      break;
1225
0
    }
1226
1227
0
    cur_read += ret;
1228
1229
    /* if we're allowed to directly forward data, we must update ->o */
1230
0
    if (ic->to_forward && !(sc_opposite(sc)->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED))) {
1231
0
      unsigned long fwd = ret;
1232
0
      if (ic->to_forward != CHN_INFINITE_FORWARD) {
1233
0
        if (fwd > ic->to_forward)
1234
0
          fwd = ic->to_forward;
1235
0
        ic->to_forward -= fwd;
1236
0
      }
1237
0
      c_adv(ic, fwd);
1238
0
    }
1239
1240
0
    ic->flags |= CF_READ_EVENT;
1241
0
    sc->bytes_in += ret;
1242
1243
    /* End-of-input reached, we can leave. In this case, it is
1244
     * important to break the loop to not block the SC because of
1245
     * the channel's policies.This way, we are still able to receive
1246
     * shutdowns.
1247
     */
1248
0
    if (sc_ep_test(sc, SE_FL_EOI))
1249
0
      break;
1250
1251
0
    if ((sc->flags & SC_FL_RCV_ONCE) || --read_poll <= 0) {
1252
      /* we don't expect to read more data */
1253
0
      sc_wont_read(sc);
1254
0
      break;
1255
0
    }
1256
1257
    /* if too many bytes were missing from last read, it means that
1258
     * it's pointless trying to read again because the system does
1259
     * not have them in buffers.
1260
     */
1261
0
    if (ret < max) {
1262
      /* if a streamer has read few data, it may be because we
1263
       * have exhausted system buffers. It's not worth trying
1264
       * again.
1265
       */
1266
0
      if (ic->flags & CF_STREAMER) {
1267
        /* we're stopped by the channel's policy */
1268
0
        sc_wont_read(sc);
1269
0
        break;
1270
0
      }
1271
1272
      /* if we read a large block smaller than what we requested,
1273
       * it's almost certain we'll never get anything more.
1274
       */
1275
0
      if (ret >= global.tune.recv_enough) {
1276
        /* we're stopped by the channel's policy */
1277
0
        sc_wont_read(sc);
1278
0
        break;
1279
0
      }
1280
0
    }
1281
1282
    /* if we are waiting for more space, don't try to read more data
1283
     * right now.
1284
     */
1285
0
    if (sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM))
1286
0
      break;
1287
0
  } /* while !flags */
1288
1289
0
 done_recv:
1290
0
  if (!cur_read)
1291
0
    se_have_no_more_data(sc->sedesc);
1292
0
  else {
1293
0
    channel_check_xfer(ic, cur_read);
1294
0
    sc_ep_report_read_activity(sc);
1295
0
  }
1296
1297
0
 end_recv:
1298
0
  ret = (cur_read != 0);
1299
1300
  /* Report EOI on the channel if it was reached from the mux point of
1301
   * view. */
1302
0
  if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) {
1303
0
    sc_ep_report_read_activity(sc);
1304
0
    sc->flags |= SC_FL_EOI;
1305
0
    ic->flags |= CF_READ_EVENT;
1306
0
    ret = 1;
1307
0
  }
1308
1309
0
  if (sc_ep_test(sc, SE_FL_EOS)) {
1310
    /* we received a shutdown */
1311
0
    if (ic->flags & CF_AUTO_CLOSE)
1312
0
      sc_schedule_shutdown(sc_opposite(sc));
1313
0
    sc_conn_eos(sc);
1314
0
    ret = 1;
1315
0
  }
1316
0
  if (sc_ep_test(sc, SE_FL_ERROR)) {
1317
0
    sc->flags |= SC_FL_ERROR;
1318
0
    if (!(sc->flags & SC_FL_EOS))
1319
0
      sc_report_term_evt(sc, (sc->flags & SC_FL_EOI ? strm_tevt_type_rcv_err: strm_tevt_type_truncated_rcv_err));
1320
0
    ret = 1;
1321
0
  }
1322
1323
  /* Ensure sc_conn_process() is called if waiting on handshake. */
1324
0
  if (!(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)) &&
1325
0
      sc_ep_test(sc, SE_FL_WAIT_FOR_HS)) {
1326
0
    ret = 1;
1327
0
  }
1328
1329
0
  if (sc->flags & (SC_FL_EOS|SC_FL_ERROR)) {
1330
    /* No more data are expected at this stage */
1331
0
    se_have_no_more_data(sc->sedesc);
1332
0
  }
1333
0
  else if (!cur_read &&
1334
0
     !(sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM)) &&
1335
0
     !(sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))) {
1336
    /* Subscribe to receive events if we're blocking on I/O. Nothing
1337
     * was received and it was not because of a blocking
1338
     * condition.
1339
     */
1340
0
    conn->mux->subscribe(sc, SUB_RETRY_RECV, &sc->wait_event);
1341
0
    se_have_no_more_data(sc->sedesc);
1342
0
  }
1343
0
  else if (sc->flags & SC_FL_EOI) {
1344
    /* No more data are expected at this stage, except if abortonclose is enabled */
1345
0
    if (!(flags & CO_RFL_KEEP_RECV))
1346
0
      se_have_no_more_data(sc->sedesc);
1347
0
    else
1348
0
      se_have_more_data(sc->sedesc);
1349
0
  }
1350
0
  else {
1351
    /* The mux may have more data to deliver. Be sure to be able to
1352
     * ask it ASAP
1353
     */
1354
0
    se_have_more_data(sc->sedesc);
1355
0
    ret = 1;
1356
0
  }
1357
1358
0
  return ret;
1359
0
}
1360
1361
/* This tries to perform a synchronous receive on the stream connector to
1362
 * try to collect last arrived data. In practice it's only implemented on
1363
 * stconns. Returns 0 if nothing was done, non-zero if new data or a
1364
 * shutdown were collected. This may result on some delayed receive calls
1365
 * to be programmed and performed later, though it doesn't provide any
1366
 * such guarantee.
1367
 */
1368
int sc_conn_sync_recv(struct stconn *sc)
1369
0
{
1370
0
  if (!sc_state_in(sc->state, SC_SB_RDY|SC_SB_EST))
1371
0
    return 0;
1372
1373
0
  if (!sc_mux_ops(sc))
1374
0
    return 0; // only stconns are supported
1375
1376
0
  if (sc->wait_event.events & SUB_RETRY_RECV)
1377
0
    return 0; // already subscribed
1378
1379
0
  if (!sc_is_recv_allowed(sc))
1380
0
    return 0; // already failed
1381
1382
0
  return sc_conn_recv(sc);
1383
0
}
1384
1385
/*
1386
 * This function is called to send buffer data to a stream socket.
1387
 * It calls the mux layer's snd_buf function. It relies on the
1388
 * caller to commit polling changes. The caller should check conn->flags
1389
 * for errors. Please do not statify this function, it's often present in
1390
 * backtraces, it's useful to recognize it.
1391
 */
1392
int sc_conn_send(struct stconn *sc)
1393
0
{
1394
0
  struct connection *conn = __sc_conn(sc);
1395
0
  struct stconn *sco = sc_opposite(sc);
1396
0
  struct stream *s = __sc_strm(sc);
1397
0
  struct channel *oc = sc_oc(sc);
1398
0
  int ret;
1399
0
  int did_send = 0;
1400
1401
0
  if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING) || sc_is_conn_error(sc)) {
1402
    /* We're probably there because the tasklet was woken up,
1403
     * but process_stream() ran before, detected there were an
1404
     * error and put the SC back to SC_ST_TAR. There's still
1405
     * CO_FL_ERROR on the connection but we don't want to add
1406
     * SE_FL_ERROR back, so give up
1407
     */
1408
0
    if (sc->state < SC_ST_CON)
1409
0
      return 0;
1410
0
    BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING));
1411
0
    if (sc_ep_test(sc, SE_FL_ERROR))
1412
0
      sc->flags |= SC_FL_ERROR;
1413
0
    if (co_data(oc) || sc_ep_have_ff_data(sc))
1414
0
      sc_ep_report_blocked_send(sc, 0);
1415
0
    return 1;
1416
0
  }
1417
1418
  /* We're already waiting to be able to send, give up */
1419
0
  if (sc->wait_event.events & SUB_RETRY_SEND)
1420
0
    return 0;
1421
1422
  /* we might have been called just after an asynchronous shutw */
1423
0
  if (sc->flags & SC_FL_SHUT_DONE)
1424
0
    return 1;
1425
1426
  /* we must wait because the mux is not installed yet */
1427
0
  if (!conn->mux)
1428
0
    return 0;
1429
1430
0
  sc_ep_fwd_kip(sco, sc);
1431
1432
0
  if (sc_ep_have_ff_data(sc)) {
1433
0
    unsigned int send_flag = 0;
1434
1435
0
    if ((!(sc->flags & (SC_FL_SND_ASAP|SC_FL_SND_NEVERWAIT)) &&
1436
0
         ((oc->to_forward && oc->to_forward != CHN_INFINITE_FORWARD) ||
1437
0
          (sc->flags & SC_FL_SND_EXP_MORE) ||
1438
0
          (IS_HTX_STRM(s) &&
1439
0
           (!(sco->flags & (SC_FL_EOI|SC_FL_EOS|SC_FL_ABRT_DONE)) && htx_expect_more(htxbuf(&oc->buf)))))) ||
1440
0
        ((oc->flags & CF_ISRESP) &&
1441
0
         (oc->flags & CF_AUTO_CLOSE) &&
1442
0
         (sc->flags & SC_FL_SHUT_WANTED)))
1443
0
      send_flag |= CO_SFL_MSG_MORE;
1444
1445
0
    if (oc->flags & CF_STREAMER)
1446
0
      send_flag |= CO_SFL_STREAMER;
1447
1448
0
    ret = CALL_MUX_WITH_RET(conn->mux, resume_fastfwd(sc, send_flag));
1449
0
    if (ret > 0) {
1450
0
      sc->bytes_out += ret;
1451
0
      did_send = 1;
1452
0
    }
1453
1454
0
    if (sc_ep_have_ff_data(sc))
1455
0
      goto end;
1456
0
  }
1457
1458
  /* At this point, the pipe is empty, but we may still have data pending
1459
   * in the normal buffer.
1460
   */
1461
0
  if (co_data(oc)) {
1462
    /* when we're here, we already know that there is no spliced
1463
     * data left, and that there are sendable buffered data.
1464
     */
1465
1466
    /* check if we want to inform the kernel that we're interested in
1467
     * sending more data after this call. We want this if :
1468
     *  - we're about to close after this last send and want to merge
1469
     *    the ongoing FIN with the last segment.
1470
     *  - we know we can't send everything at once and must get back
1471
     *    here because of unaligned data
1472
     *  - there is still a finite amount of data to forward
1473
     * The test is arranged so that the most common case does only 2
1474
     * tests.
1475
     */
1476
0
    unsigned int send_flag = 0;
1477
1478
0
    if ((!(sc->flags & (SC_FL_SND_ASAP|SC_FL_SND_NEVERWAIT)) &&
1479
0
         ((oc->to_forward && oc->to_forward != CHN_INFINITE_FORWARD) ||
1480
0
          (sc->flags & SC_FL_SND_EXP_MORE) ||
1481
0
          (IS_HTX_STRM(s) &&
1482
0
           (!(sco->flags & (SC_FL_EOI|SC_FL_EOS|SC_FL_ABRT_DONE)) && htx_expect_more(htxbuf(&oc->buf)))))) ||
1483
0
        ((oc->flags & CF_ISRESP) &&
1484
0
         (oc->flags & CF_AUTO_CLOSE) &&
1485
0
         (sc->flags & SC_FL_SHUT_WANTED)))
1486
0
      send_flag |= CO_SFL_MSG_MORE;
1487
1488
0
    if (oc->flags & CF_STREAMER)
1489
0
      send_flag |= CO_SFL_STREAMER;
1490
1491
0
    if (s->txn && s->txn->flags & TX_L7_RETRY && !b_data(&s->txn->l7_buffer)) {
1492
      /* If we want to be able to do L7 retries, copy
1493
       * the data we're about to send, so that we are able
1494
       * to resend them if needed
1495
       */
1496
      /* Try to allocate a buffer if we had none.
1497
       * If it fails, the next test will just
1498
       * disable the l7 retries by setting
1499
       * l7_conn_retries to 0.
1500
       */
1501
0
      if (s->txn->req.msg_state != HTTP_MSG_DONE || b_is_large(&oc->buf))
1502
0
        s->txn->flags &= ~TX_L7_RETRY;
1503
0
      else {
1504
0
        if (!(s->be->options2 & PR_O2_USE_SBUF_L7_RETRY) ||
1505
0
            !htx_copy_to_small_buffer(&s->txn->l7_buffer, &oc->buf)) {
1506
0
          if (b_alloc(&s->txn->l7_buffer, DB_UNLIKELY) == NULL)
1507
0
            s->txn->flags &= ~TX_L7_RETRY;
1508
0
          else {
1509
0
            memcpy(b_orig(&s->txn->l7_buffer),
1510
0
                   b_orig(&oc->buf),
1511
0
                   b_size(&oc->buf));
1512
0
          }
1513
0
        }
1514
1515
0
        if (s->txn->flags & TX_L7_RETRY) {
1516
0
          s->txn->l7_buffer.head = co_data(oc);
1517
0
          b_set_data(&s->txn->l7_buffer, co_data(oc));
1518
0
        }
1519
0
      }
1520
0
    }
1521
1522
0
    if ((sc->flags & SC_FL_SHUT_WANTED) && co_data(oc) == c_data(oc))
1523
0
      send_flag |= CO_SFL_LAST_DATA;
1524
1525
0
    ret = CALL_MUX_WITH_RET(conn->mux, snd_buf(sc, &oc->buf, co_data(oc), send_flag));
1526
0
    if (ret > 0) {
1527
0
      did_send = 1;
1528
0
      c_rew(oc, ret);
1529
0
      c_realign_if_empty(oc);
1530
0
      sc->bytes_out += ret;
1531
0
      if (!co_data(oc)) {
1532
        /* Always clear both flags once everything has been sent, they're one-shot */
1533
0
        sc->flags &= ~(SC_FL_SND_ASAP|SC_FL_SND_EXP_MORE);
1534
0
      }
1535
      /* if some data remain in the buffer, it's only because the
1536
       * system buffers are full, we will try next time.
1537
       */
1538
0
    }
1539
0
  }
1540
1541
0
 end:
1542
0
  if (did_send) {
1543
0
    oc->flags |= CF_WRITE_EVENT | CF_WROTE_DATA;
1544
0
    if (sc->state == SC_ST_CON)
1545
0
      sc->state = SC_ST_RDY;
1546
0
  }
1547
1548
0
  if (!sco->room_needed || (did_send && (sco->room_needed < 0 || channel_recv_max(sc_oc(sc)) >= sco->room_needed)))
1549
0
    sc_have_room(sco);
1550
1551
0
  if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) {
1552
0
    oc->flags |= CF_WRITE_EVENT;
1553
0
    BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING));
1554
0
    sc_report_term_evt(sc, strm_tevt_type_snd_err);
1555
0
    if (sc_ep_test(sc, SE_FL_ERROR))
1556
0
      sc->flags |= SC_FL_ERROR;
1557
0
    return 1;
1558
0
  }
1559
1560
  /* FIXME: Must be reviewed for FF */
1561
0
  if (!co_data(oc) && !sc_ep_have_ff_data(sc)) {
1562
0
    if (did_send)
1563
0
      sc_ep_report_send_activity(sc);
1564
    /* If fast-forwarding is blocked, unblock it now to check for
1565
     * receive on the other side
1566
     */
1567
0
    if (sc->sedesc->iobuf.flags & IOBUF_FL_FF_BLOCKED) {
1568
0
      sc->sedesc->iobuf.flags &= ~IOBUF_FL_FF_BLOCKED;
1569
0
      sc_have_room(sco);
1570
0
      did_send = 1;
1571
0
    }
1572
0
  }
1573
0
  else {
1574
    /* We couldn't send all of our data, let the mux know we'd like to send more */
1575
0
    conn->mux->subscribe(sc, SUB_RETRY_SEND, &sc->wait_event);
1576
0
    if (sc_state_in(sc->state, SC_SB_EST|SC_SB_DIS|SC_SB_CLO))
1577
0
        sc_ep_report_blocked_send(sc, did_send);
1578
0
  }
1579
1580
0
  return did_send;
1581
0
}
1582
1583
/* perform a synchronous send() for the stream connector. The CF_WRITE_EVENT
1584
 * flag are cleared prior to the attempt, and will possibly be updated in case
1585
 * of success.
1586
 */
1587
int sc_conn_sync_send(struct stconn *sc)
1588
0
{
1589
0
  struct channel *oc = sc_oc(sc);
1590
0
  int did_send = 0;
1591
1592
0
  oc->flags &= ~CF_WRITE_EVENT;
1593
1594
0
  if (sc->flags & SC_FL_SHUT_DONE)
1595
0
    goto end;
1596
1597
0
  if (!co_data(oc))
1598
0
    goto end;
1599
1600
0
  if (!sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST))
1601
0
    goto end;
1602
1603
0
  if (!sc_mux_ops(sc))
1604
0
    goto end;
1605
1606
0
  did_send = sc_conn_send(sc);
1607
0
  if (oc->flags & CF_WRITE_EVENT)
1608
0
    oc->flags |= CF_WAKE_ONCE;
1609
0
  end:
1610
0
  return did_send;
1611
0
}
1612
1613
/* Called by I/O handlers after completion.. It propagates
1614
 * connection flags to the stream connector, updates the stream (which may or
1615
 * may not take this opportunity to try to forward data), then update the
1616
 * connection's polling based on the channels and stream connector's final
1617
 * states. The function always returns 0. Please do not statify this function,
1618
 * it's often present in backtraces, it's useful to recognize it.
1619
 */
1620
int sc_conn_process(struct stconn *sc)
1621
0
{
1622
0
  struct connection *conn = __sc_conn(sc);
1623
0
  struct channel *ic = sc_ic(sc);
1624
0
  struct channel *oc = sc_oc(sc);
1625
1626
0
  BUG_ON(!conn);
1627
1628
  /* If we have data to send, try it now */
1629
0
  if ((co_data(oc) || sc_ep_have_ff_data(sc)) &&
1630
0
      !(sc->wait_event.events & SUB_RETRY_SEND))
1631
0
    sc_conn_send(sc);
1632
1633
  /* First step, report to the stream connector what was detected at the
1634
   * connection layer : errors and connection establishment.
1635
   * Only add SC_FL_ERROR if we're connected, or we're attempting to
1636
   * connect, we may get there because we got woken up, but only run
1637
   * after process_stream() noticed there were an error, and decided
1638
   * to retry to connect, the connection may still have CO_FL_ERROR,
1639
   * and we don't want to add SC_FL_ERROR back
1640
   *
1641
   * Note: This test is only required because sc_conn_process is also the SI
1642
   *       wake callback. Otherwise sc_conn_recv()/sc_conn_send() already take
1643
   *       care of it.
1644
   */
1645
1646
0
  if (sc->state >= SC_ST_CON) {
1647
0
    if (sc_is_conn_error(sc))
1648
0
      sc->flags |= SC_FL_ERROR;
1649
0
  }
1650
1651
  /* If we had early data, and the handshake ended, then
1652
   * we can remove the flag, and attempt to wake the task up,
1653
   * in the event there's an analyser waiting for the end of
1654
   * the handshake.
1655
   */
1656
0
  if (!(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)) &&
1657
0
      sc_ep_test(sc, SE_FL_WAIT_FOR_HS)) {
1658
0
    sc_ep_clr(sc, SE_FL_WAIT_FOR_HS);
1659
0
    task_wakeup(sc_strm_task(sc), TASK_WOKEN_MSG);
1660
0
  }
1661
1662
0
  if (!sc_state_in(sc->state, SC_SB_EST|SC_SB_DIS|SC_SB_CLO) &&
1663
0
      (conn->flags & CO_FL_WAIT_XPRT) == 0) {
1664
0
    if (sc->flags & SC_FL_ISBACK)
1665
0
      __sc_strm(sc)->conn_exp = TICK_ETERNITY;
1666
0
    oc->flags |= CF_WRITE_EVENT;
1667
0
    if (sc->state == SC_ST_CON)
1668
0
      sc->state = SC_ST_RDY;
1669
0
  }
1670
1671
  /* Report EOI on the channel if it was reached from the mux point of
1672
   * view.
1673
   *
1674
   * Note: This test is only required because sc_conn_process is also the SI
1675
   *       wake callback. Otherwise sc_conn_recv()/sc_conn_send() already take
1676
   *       care of it.
1677
   */
1678
0
  if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) {
1679
0
    sc->flags |= SC_FL_EOI;
1680
0
    ic->flags |= CF_READ_EVENT;
1681
0
    sc_ep_report_read_activity(sc);
1682
0
  }
1683
1684
  /* Report EOS on the channel if it was reached from the mux point of
1685
   * view.
1686
   *
1687
   * Note: This test is only required because sc_conn_process is also the SI
1688
   *       wake callback. Otherwise sc_conn_recv()/sc_conn_send() already take
1689
   *       care of it.
1690
   */
1691
0
  if (sc_ep_test(sc, SE_FL_EOS) && !(sc->flags & SC_FL_EOS)) {
1692
    /* we received a shutdown */
1693
0
    if (ic->flags & CF_AUTO_CLOSE)
1694
0
      sc_schedule_shutdown(sc_opposite(sc));
1695
0
    sc_conn_eos(sc);
1696
0
  }
1697
1698
0
  if (sc_ep_test(sc, SE_FL_ERROR) && !(sc->flags & SC_FL_ERROR)) {
1699
0
    if (!(sc->flags & SC_FL_EOS))
1700
0
      sc_report_term_evt(sc, (sc->flags & SC_FL_EOI ? strm_tevt_type_rcv_err: strm_tevt_type_truncated_rcv_err));
1701
0
    sc->flags |= SC_FL_ERROR;
1702
0
  }
1703
1704
  /* Second step : update the stream connector and channels, try to forward any
1705
   * pending data, then possibly wake the stream up based on the new
1706
   * stream connector status.
1707
   */
1708
0
  sc_notify(sc);
1709
0
  stream_release_buffers(__sc_strm(sc));
1710
0
  return 0;
1711
0
}
1712
1713
/* This is the ->process() function for any stream connector's wait_event task.
1714
 * It's assigned during the stream connector's initialization, for any type of
1715
 * stream connector. Thus it is always safe to perform a tasklet_wakeup() on a
1716
 * stream connector, as the presence of the SC is checked there.
1717
 */
1718
struct task *sc_conn_io_cb(struct task *t, void *ctx, unsigned int state)
1719
0
{
1720
0
  struct stconn *sc = ctx;
1721
0
  int ret = 0;
1722
1723
0
  if (!sc_conn(sc))
1724
0
    return t;
1725
1726
0
  if (!(sc->wait_event.events & SUB_RETRY_SEND) && (co_data(sc_oc(sc)) || sc_ep_have_ff_data(sc) || (sc->sedesc->iobuf.flags & IOBUF_FL_FF_BLOCKED)))
1727
0
    ret = sc_conn_send(sc);
1728
0
  if (!(sc->wait_event.events & SUB_RETRY_RECV))
1729
0
    ret |= sc_conn_recv(sc);
1730
0
  if (ret != 0 || (state & TASK_WOKEN_MSG))
1731
0
    sc_conn_process(sc);
1732
1733
0
  stream_release_buffers(__sc_strm(sc));
1734
0
  return t;
1735
0
}
1736
1737
/*
1738
 * This function propagates an end-of-stream received from an applet. It
1739
 * updates the stream connector. If it is is already shut, the applet is
1740
 * released. Otherwise, we try to forward the shutdown, immediately or ASAP.
1741
 */
1742
static void sc_applet_eos(struct stconn *sc)
1743
0
{
1744
0
  struct channel *ic = sc_ic(sc);
1745
1746
0
  BUG_ON(!sc_appctx(sc));
1747
1748
0
  if (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))
1749
0
    return;
1750
0
  sc->flags |= SC_FL_EOS;
1751
0
  ic->flags |= CF_READ_EVENT;
1752
0
  sc_ep_report_read_activity(sc);
1753
0
  sc_report_term_evt(sc, (sc->flags & SC_FL_EOI ? strm_tevt_type_eos: strm_tevt_type_truncated_eos));
1754
1755
  /* Note: on abort, we don't call the applet */
1756
1757
0
  if (sc->state != SC_ST_EST)
1758
0
    return;
1759
1760
0
  if (sc->flags & SC_FL_SHUT_DONE) {
1761
0
    se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_NORMAL);
1762
0
    sc->state = SC_ST_DIS;
1763
0
    if (sc->flags & SC_FL_ISBACK)
1764
0
      __sc_strm(sc)->conn_exp = TICK_ETERNITY;
1765
0
  }
1766
0
  else if (sc_cond_forward_shut(sc))
1767
0
    return sc_shutdown(sc);
1768
0
}
1769
1770
/*
1771
 * This is the callback which is called by the applet layer to receive data into
1772
 * the buffer from the appctx. It iterates over the applet's rcv_buf
1773
 * function. Please do not statify this function, it's often present in
1774
 * backtraces, it's useful to recognize it.
1775
 */
1776
int sc_applet_recv(struct stconn *sc)
1777
0
{
1778
0
  struct appctx *appctx = __sc_appctx(sc);
1779
0
  struct channel *ic = sc_ic(sc);
1780
0
  int ret, max, cur_read = 0;
1781
0
  int read_poll = MAX_READ_POLL_LOOPS;
1782
0
  int flags = 0;
1783
1784
1785
  /* If another call to sc_applet_recv() failed, give up now.
1786
   */
1787
0
  if (sc_waiting_room(sc))
1788
0
    return 0;
1789
1790
  /* maybe we were called immediately after an asynchronous abort */
1791
0
  if (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))
1792
0
    return 1;
1793
1794
  /* We must wait because the applet is not fully initialized */
1795
0
  if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
1796
0
    return 0;
1797
1798
  /* stop immediately on errors. */
1799
0
  if (!sc_ep_test(sc, SE_FL_RCV_MORE)) {
1800
    // TODO: be sure SE_FL_RCV_MORE may be set for applet ?
1801
0
    if (sc_ep_test(sc, SE_FL_ERROR))
1802
0
      goto end_recv;
1803
0
  }
1804
1805
  /* prepare to detect if the mux needs more room */
1806
0
  sc_ep_clr(sc, SE_FL_WANT_ROOM);
1807
1808
0
  channel_check_idletimer(ic);
1809
1810
  /* First, let's see if we may fast-forward data from a side to the other
1811
   * one without using the channel buffer.
1812
   */
1813
0
  if (sc_is_fastfwd_supported(sc)) {
1814
0
    if (channel_data(ic)) {
1815
      /* We're embarrassed, there are already data pending in
1816
       * the buffer and we don't want to have them at two
1817
       * locations at a time. Let's indicate we need some
1818
       * place and ask the consumer to hurry.
1819
       */
1820
0
      flags |= CO_RFL_BUF_FLUSH;
1821
0
      goto abort_fastfwd;
1822
0
    }
1823
0
    sc_ep_fwd_kip(sc, sc_opposite(sc));
1824
0
    ret = appctx_fastfwd(sc, ic->to_forward, flags);
1825
0
    if (ret < 0)
1826
0
      goto abort_fastfwd;
1827
0
    else if (ret > 0) {
1828
0
      if (ic->to_forward != CHN_INFINITE_FORWARD)
1829
0
        ic->to_forward -= ret;
1830
0
      sc->bytes_in += ret;
1831
0
      cur_read += ret;
1832
0
      ic->flags |= CF_READ_EVENT;
1833
0
    }
1834
1835
0
    if (sc_ep_test(sc, SE_FL_EOS | SE_FL_ERROR))
1836
0
      goto end_recv;
1837
1838
0
    if (sc_ep_test(sc, SE_FL_WANT_ROOM))
1839
0
      sc_need_room(sc, -1);
1840
1841
0
    if (sc_ep_test(sc, SE_FL_MAY_FASTFWD_PROD) && ic->to_forward)
1842
0
      goto done_recv;
1843
0
  }
1844
1845
0
 abort_fastfwd:
1846
0
  if (!sc_alloc_ibuf(sc, &appctx->buffer_wait))
1847
0
    goto end_recv;
1848
1849
  /* For an HTX stream, if the buffer is stuck (no output data with some
1850
   * input data) and if the HTX message is fragmented or if its free space
1851
   * wraps, we force an HTX deframentation. It is a way to have a
1852
   * contiguous free space nad to let the mux to copy as much data as
1853
   * possible.
1854
   *
1855
   * NOTE: A possible optim may be to let the mux decides if defrag is
1856
   *       required or not, depending on amount of data to be xferred.
1857
   */
1858
0
  if (IS_HTX_STRM(__sc_strm(sc)) && !co_data(ic)) {
1859
0
    struct htx *htx = htxbuf(&ic->buf);
1860
1861
0
    if (htx_is_not_empty(htx) && ((htx->flags & HTX_FL_FRAGMENTED) || htx_space_wraps(htx)))
1862
0
      htx_defrag(htx, NULL, 0);
1863
0
  }
1864
1865
  /* Compute transient CO_RFL_* flags */
1866
0
  if (co_data(ic)) {
1867
0
    flags |= (CO_RFL_BUF_WET | CO_RFL_BUF_NOT_STUCK);
1868
0
  }
1869
1870
  /* <max> may be null. This is the mux responsibility to set
1871
   * SE_FL_RCV_MORE on the SC if more space is needed.
1872
   */
1873
0
  max = channel_recv_max(ic);
1874
0
  if (b_is_small(sc_ib(sc)) || ((ic->flags & CF_WROTE_DATA) && b_is_large(sc_ib(sc))))
1875
0
    max = 0;
1876
0
  ret = appctx_rcv_buf(sc, &ic->buf, max, flags);
1877
0
  if (sc_ep_test(sc, SE_FL_WANT_ROOM)) {
1878
    /* SE_FL_WANT_ROOM must not be reported if the channel's
1879
     * buffer is empty.
1880
     */
1881
0
    BUG_ON(c_empty(ic));
1882
1883
0
    sc_need_room(sc, channel_recv_max(ic) + 1);
1884
    /* Add READ_PARTIAL because some data are pending but
1885
     * cannot be xferred to the channel
1886
     */
1887
0
    ic->flags |= CF_READ_EVENT;
1888
0
    sc_ep_report_read_activity(sc);
1889
0
  }
1890
1891
0
  if (ret <= 0) {
1892
    /* if we refrained from reading because we asked for a flush to
1893
     * satisfy rcv_pipe(), report that there's not enough room here
1894
     * to proceed.
1895
     */
1896
0
    if (flags & CO_RFL_BUF_FLUSH)
1897
0
      sc_need_room(sc, -1);
1898
0
    goto done_recv;
1899
0
  }
1900
1901
0
  cur_read += ret;
1902
1903
  /* if we're allowed to directly forward data, we must update ->o */
1904
0
  if (ic->to_forward && !(sc_opposite(sc)->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED))) {
1905
0
    unsigned long fwd = ret;
1906
0
    if (ic->to_forward != CHN_INFINITE_FORWARD) {
1907
0
      if (fwd > ic->to_forward)
1908
0
        fwd = ic->to_forward;
1909
0
      ic->to_forward -= fwd;
1910
0
    }
1911
0
    c_adv(ic, fwd);
1912
0
  }
1913
1914
0
  ic->flags |= CF_READ_EVENT;
1915
0
  sc->bytes_in += ret;
1916
1917
  /* End-of-input reached, we can leave. In this case, it is
1918
   * important to break the loop to not block the SC because of
1919
   * the channel's policies.This way, we are still able to receive
1920
   * shutdowns.
1921
   */
1922
0
  if (sc_ep_test(sc, SE_FL_EOI))
1923
0
    goto done_recv;
1924
1925
0
  if ((sc->flags & SC_FL_RCV_ONCE) || --read_poll <= 0) {
1926
    /* we don't expect to read more data */
1927
0
    sc_wont_read(sc);
1928
0
    goto done_recv;
1929
0
  }
1930
1931
  /* if too many bytes were missing from last read, it means that
1932
   * it's pointless trying to read again because the system does
1933
   * not have them in buffers.
1934
   */
1935
0
  if (ret < max) {
1936
    /* if a streamer has read few data, it may be because we
1937
     * have exhausted system buffers. It's not worth trying
1938
     * again.
1939
     */
1940
0
    if (ic->flags & CF_STREAMER) {
1941
      /* we're stopped by the channel's policy */
1942
0
      sc_wont_read(sc);
1943
0
      goto done_recv;
1944
0
    }
1945
1946
    /* if we read a large block smaller than what we requested,
1947
     * it's almost certain we'll never get anything more.
1948
     */
1949
0
    if (ret >= global.tune.recv_enough) {
1950
      /* we're stopped by the channel's policy */
1951
0
      sc_wont_read(sc);
1952
0
    }
1953
0
  }
1954
1955
0
 done_recv:
1956
0
  if (cur_read) {
1957
0
    channel_check_xfer(ic, cur_read);
1958
0
    sc_ep_report_read_activity(sc);
1959
0
  }
1960
1961
0
 end_recv:
1962
0
  ret = (cur_read != 0);
1963
1964
  /* Report EOI on the channel if it was reached from the mux point of
1965
   * view. */
1966
0
  if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) {
1967
0
    sc_ep_report_read_activity(sc);
1968
0
    sc->flags |= SC_FL_EOI;
1969
0
    ic->flags |= CF_READ_EVENT;
1970
0
    ret = 1;
1971
0
  }
1972
1973
0
  if (sc_ep_test(sc, SE_FL_EOS)) {
1974
    /* we received a shutdown */
1975
0
    if (ic->flags & CF_AUTO_CLOSE)
1976
0
      sc_schedule_shutdown(sc_opposite(sc));
1977
0
    sc_applet_eos(sc);
1978
0
    ret = 1;
1979
0
  }
1980
1981
0
  if (sc_ep_test(sc, SE_FL_ERROR)) {
1982
0
    sc->flags |= SC_FL_ERROR;
1983
0
    ret = 1;
1984
0
  }
1985
0
  else if (cur_read || (sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM))) {
1986
0
    se_have_more_data(sc->sedesc);
1987
0
    ret = 1;
1988
0
  }
1989
1990
0
  return ret;
1991
0
}
1992
1993
/* This tries to perform a synchronous receive on the stream connector to
1994
 * try to collect last arrived data. In practice it's only implemented on
1995
 * stconns. Returns 0 if nothing was done, non-zero if new data or a
1996
 * shutdown were collected. This may result on some delayed receive calls
1997
 * to be programmed and performed later, though it doesn't provide any
1998
 * such guarantee.
1999
 */
2000
int sc_applet_sync_recv(struct stconn *sc)
2001
0
{
2002
0
  if (!appctx_app_test(__sc_appctx(sc), APPLET_FL_NEW_API))
2003
0
    return 0;
2004
2005
0
  if (!sc_state_in(sc->state, SC_SB_RDY|SC_SB_EST))
2006
0
    return 0;
2007
2008
0
  if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
2009
0
    return 0;
2010
2011
0
  if (!sc_is_recv_allowed(sc))
2012
0
    return 0; // already failed
2013
2014
0
  return sc_applet_recv(sc);
2015
0
}
2016
2017
/*
2018
 * This function is called to send buffer data to an applet. It calls the
2019
 * applet's snd_buf function. Please do not statify this function, it's often
2020
 * present in backtraces, it's useful to recognize it.
2021
 */
2022
int sc_applet_send(struct stconn *sc)
2023
0
{
2024
0
  struct stconn *sco = sc_opposite(sc);
2025
0
  struct channel *oc = sc_oc(sc);
2026
0
  size_t ret;
2027
0
  int did_send = 0;
2028
2029
0
  if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) {
2030
0
    BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING));
2031
0
    if (co_data(oc))
2032
0
      sc_ep_report_blocked_send(sc, 0);
2033
0
    return 1;
2034
0
  }
2035
2036
0
  if (sc_ep_test(sc, SE_FL_WONT_CONSUME))
2037
0
    return 0;
2038
2039
  /* we might have been called just after an asynchronous shutw */
2040
0
  if (sc->flags & SC_FL_SHUT_DONE)
2041
0
    return 1;
2042
2043
  /* We must wait because the applet is not fully initialized */
2044
0
  if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
2045
0
    return 0;
2046
2047
0
  sc_ep_fwd_kip(sco, sc);
2048
2049
  /* TODO: Splicing is not supported, so it is not possible to have FF data stuck into the I/O buf */
2050
0
  BUG_ON(sc_ep_have_ff_data(sc));
2051
2052
0
  if (co_data(oc)) {
2053
0
    unsigned int send_flag = 0;
2054
2055
0
    if ((sc->flags & SC_FL_SHUT_WANTED) && co_data(oc) == c_data(oc))
2056
0
      send_flag |= CO_SFL_LAST_DATA;
2057
2058
0
    ret = appctx_snd_buf(sc, &oc->buf, co_data(oc), send_flag);
2059
0
    if (ret > 0) {
2060
0
      did_send = 1;
2061
0
      c_rew(oc, ret);
2062
0
      c_realign_if_empty(oc);
2063
0
      sc->bytes_out += ret;
2064
0
      if (!co_data(oc)) {
2065
        /* Always clear both flags once everything has been sent, they're one-shot */
2066
0
        sc->flags &= ~(SC_FL_SND_ASAP|SC_FL_SND_EXP_MORE);
2067
0
      }
2068
      /* if some data remain in the buffer, it's only because the
2069
       * system buffers are full, we will try next time.
2070
       */
2071
0
    }
2072
0
  }
2073
2074
0
  if (did_send)
2075
0
    oc->flags |= CF_WRITE_EVENT | CF_WROTE_DATA;
2076
2077
0
  if (!sco->room_needed || (did_send && (sco->room_needed < 0 || channel_recv_max(sc_oc(sc)) >= sco->room_needed)))
2078
0
    sc_have_room(sco);
2079
2080
0
  if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) {
2081
0
    oc->flags |= CF_WRITE_EVENT;
2082
0
    BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING));
2083
0
    if (sc_ep_test(sc, SE_FL_ERROR))
2084
0
      sc->flags |= SC_FL_ERROR;
2085
0
    return 1;
2086
0
  }
2087
2088
0
  if (!co_data(oc)) {
2089
0
    if (did_send)
2090
0
      sc_ep_report_send_activity(sc);
2091
0
  }
2092
0
  else {
2093
0
    sc_ep_report_blocked_send(sc, did_send);
2094
0
  }
2095
2096
0
  return did_send;
2097
0
}
2098
2099
void sc_applet_sync_send(struct stconn *sc)
2100
0
{
2101
0
  struct channel *oc = sc_oc(sc);
2102
2103
0
  oc->flags &= ~CF_WRITE_EVENT;
2104
2105
0
  if (!appctx_app_test(__sc_appctx(sc), APPLET_FL_NEW_API))
2106
0
    return;
2107
2108
0
  if (sc->flags & SC_FL_SHUT_DONE)
2109
0
    return;
2110
2111
0
  if (!co_data(oc))
2112
0
    return;
2113
2114
0
  if (!sc_state_in(sc->state, SC_SB_EST))
2115
0
    return;
2116
2117
0
  if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
2118
0
    return;
2119
2120
0
  sc_applet_send(sc);
2121
0
  if (oc->flags & CF_WRITE_EVENT)
2122
0
    oc->flags |= CF_WAKE_ONCE;
2123
0
}
2124
2125
/* Callback to be used by applet handlers upon completion. It updates the stream
2126
 * (which may or may not take this opportunity to try to forward data), then
2127
 * may re-enable the applet's based on the channels and stream connector's final
2128
 * states. Please do not statify this function, it's often present in backtraces,
2129
 * it's useful to recognize it.
2130
 */
2131
int sc_applet_process(struct stconn *sc)
2132
0
{
2133
0
  struct channel *ic = sc_ic(sc);
2134
2135
0
  BUG_ON(!sc_appctx(sc));
2136
2137
  /* Report EOI on the channel if it was reached from the applet point of
2138
   * view. */
2139
0
  if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) {
2140
0
    sc_ep_report_read_activity(sc);
2141
0
    sc->flags |= SC_FL_EOI;
2142
0
    ic->flags |= CF_READ_EVENT;
2143
0
  }
2144
2145
0
  if (sc_ep_test(sc, SE_FL_ERROR))
2146
0
    sc->flags |= SC_FL_ERROR;
2147
2148
0
  if (sc_ep_test(sc, SE_FL_EOS)) {
2149
    /* we received a shutdown */
2150
0
    sc_applet_eos(sc);
2151
0
  }
2152
2153
0
  BUG_ON(sc_ep_test(sc, SE_FL_HAVE_NO_DATA|SE_FL_EOI) == SE_FL_EOI);
2154
2155
  /* If the applet wants to write and the channel is closed, it's a
2156
   * broken pipe and it must be reported.
2157
   */
2158
0
  if (!sc_ep_test(sc, SE_FL_HAVE_NO_DATA) && (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)))
2159
0
    sc_ep_set(sc, SE_FL_ERROR);
2160
2161
  /* automatically mark the applet having data available if it reported
2162
   * begin blocked by the channel.
2163
   */
2164
0
  if ((sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM)) ||
2165
0
      sc_ep_test(sc, SE_FL_APPLET_NEED_CONN))
2166
0
    applet_have_more_data(__sc_appctx(sc));
2167
2168
  /* update the stream connector, channels, and possibly wake the stream up */
2169
0
  sc_notify(sc);
2170
0
  stream_release_buffers(__sc_strm(sc));
2171
2172
  /* sc_notify may have passed through chk_snd and released some blocking
2173
   * flags. Process_stream will consider those flags to wake up the
2174
   * appctx but in the case the task is not in runqueue we may have to
2175
   * wakeup the appctx immediately.
2176
   */
2177
0
  if ((sc_is_recv_allowed(sc) && !applet_fl_test(__sc_appctx(sc), APPCTX_FL_OUTBLK_ALLOC)) ||
2178
0
      (sc_is_send_allowed(sc) && !applet_fl_test(__sc_appctx(sc), APPCTX_FL_INBLK_ALLOC)))
2179
0
    appctx_wakeup(__sc_appctx(sc));
2180
0
  return 0;
2181
0
}
2182
2183
2184
/* Prepares an endpoint upgrade. We don't now at this stage if the upgrade will
2185
 * succeed or not and if the stconn will be reused by the new endpoint. Thus,
2186
 * for now, only pretend the stconn is detached.
2187
 */
2188
void sc_conn_prepare_endp_upgrade(struct stconn *sc)
2189
0
{
2190
0
  BUG_ON(!sc_conn(sc) || !sc->app);
2191
0
  sc_ep_clr(sc, SE_FL_T_MUX);
2192
0
  sc_ep_set(sc, SE_FL_DETACHED);
2193
0
}
2194
2195
/* Endpoint upgrade failed. Restore the stconn state. */
2196
void sc_conn_abort_endp_upgrade(struct stconn *sc)
2197
0
{
2198
0
  sc_ep_set(sc, SE_FL_T_MUX);
2199
0
  sc_ep_clr(sc, SE_FL_DETACHED);
2200
0
}
2201
2202
/* Commit the endpoint upgrade. If stconn is attached, it means the new endpoint
2203
 * use it. So we do nothing. Otherwise, the stconn will be destroy with the
2204
 * overlying stream. So, it means we must commit the detach.
2205
*/
2206
void sc_conn_commit_endp_upgrade(struct stconn *sc)
2207
0
{
2208
0
  if (!sc_ep_test(sc, SE_FL_DETACHED))
2209
0
    return;
2210
0
  sc_detach_endp(&sc);
2211
  /* Because it was already set as detached, the sedesc must be preserved */
2212
0
  BUG_ON(!sc);
2213
0
  BUG_ON(!sc->sedesc);
2214
0
}
2215
2216
/* Return a debug string exposing the internals of the front or back
2217
 * stream/connection when supported. It will be protocol-dependent and will
2218
 * change over time like the output of "show fd" or "show sess all".
2219
 */
2220
static int smp_fetch_debug_str(const struct arg *args, struct sample *smp, const char *kw, void *private)
2221
0
{
2222
0
  struct connection *conn;
2223
0
  struct stconn *sc;
2224
0
  union mux_sctl_dbg_str_ctx sctl_ctx = { };
2225
2226
0
  if (!smp->strm)
2227
0
    return 0;
2228
2229
0
  sc = (kw[0] == 'f' ? smp->strm->scf : smp->strm->scb);
2230
0
  conn = sc_conn(sc);
2231
2232
0
  if (!conn)
2233
0
    return 0;
2234
2235
  /* a missing mux is necessarily on the backend, and may arrive later */
2236
0
  if (!conn->mux) {
2237
0
    smp->flags |= SMP_F_MAY_CHANGE;
2238
0
    return 0;
2239
0
  }
2240
2241
  /* Not implemented, return nothing */
2242
0
  if (!conn->mux->sctl)
2243
0
    return 0;
2244
2245
0
  sctl_ctx.arg.debug_flags = args->data.sint ? args->data.sint : ~0U;
2246
0
  if (conn->mux->sctl(sc, MUX_SCTL_DBG_STR, &sctl_ctx) == -1)
2247
0
    return 0;
2248
2249
0
  smp->data.type = SMP_T_STR;
2250
0
  smp->flags = SMP_F_VOL_TEST | SMP_F_MAY_CHANGE;
2251
0
  smp->data.u.str = sctl_ctx.ret.buf;
2252
0
  return 1;
2253
0
}
2254
2255
/* return the frontend or backend mux stream ID.
2256
 */
2257
static int
2258
smp_fetch_sid(const struct arg *args, struct sample *smp, const char *kw, void *private)
2259
0
{
2260
0
  struct connection *conn;
2261
0
  struct stconn *sc;
2262
0
  int64_t sid = 0;
2263
2264
0
  if (!smp->strm)
2265
0
    return 0;
2266
2267
0
  sc = (kw[0] == 'f' ? smp->strm->scf : smp->strm->scb);
2268
0
  conn = sc_conn(sc);
2269
2270
  /* No connection */
2271
0
  if (!conn)
2272
0
    return 0;
2273
2274
  /* No mux install, this may change */
2275
0
  if (!conn->mux) {
2276
0
    smp->flags |= SMP_F_MAY_CHANGE;
2277
0
    return 0;
2278
0
  }
2279
2280
  /* No sctl, report sid=0 in this case */
2281
0
  if (conn->mux->sctl) {
2282
0
    if (conn->mux->sctl(sc, MUX_SCTL_SID, &sid) == -1)
2283
0
      return 0;
2284
0
  }
2285
2286
0
  smp->flags = SMP_F_VOL_TXN;
2287
0
  smp->data.type = SMP_T_SINT;
2288
0
  smp->data.u.sint = sid;
2289
2290
0
  return 1;
2291
0
}
2292
2293
/* return 1 if the frontend or backend mux stream has received an abort and 0 otherwise.
2294
 */
2295
static int
2296
smp_fetch_strm_aborted(const struct arg *args, struct sample *smp, const char *kw, void *private)
2297
0
{
2298
0
  struct stconn *sc;
2299
0
  unsigned int aborted = 0;
2300
2301
0
  if (!smp->strm)
2302
0
    return 0;
2303
2304
0
  sc = (kw[0] == 'f' ? smp->strm->scf : smp->strm->scb);
2305
0
  if (sc->sedesc->abort_info.info)
2306
0
    aborted = 1;
2307
2308
0
  smp->flags = SMP_F_VOL_TXN;
2309
0
  smp->data.type = SMP_T_BOOL;
2310
0
  smp->data.u.sint = aborted;
2311
2312
0
  return 1;
2313
0
}
2314
2315
/* return the H2/QUIC RESET code of the frontend or backend mux stream. Any value
2316
 * means an a RST_STREAM was received on H2 and a STOP_SENDING on QUIC. Otherwise the sample fetch fails.
2317
 */
2318
static int
2319
smp_fetch_strm_rst_code(const struct arg *args, struct sample *smp, const char *kw, void *private)
2320
0
{
2321
0
  struct stconn *sc;
2322
0
  unsigned int source;
2323
0
  unsigned long long code = 0;
2324
2325
0
  if (!smp->strm)
2326
0
    return 0;
2327
2328
0
  sc = (kw[0] == 'f' ? smp->strm->scf : smp->strm->scb);
2329
0
  source = ((sc->sedesc->abort_info.info & SE_ABRT_SRC_MASK) >> SE_ABRT_SRC_SHIFT);
2330
0
  if (source != SE_ABRT_SRC_MUX_H2 && source != SE_ABRT_SRC_MUX_QUIC) {
2331
0
    if (!source)
2332
0
      smp->flags |= SMP_F_MAY_CHANGE;
2333
0
    return 0;
2334
0
  }
2335
0
  code = sc->sedesc->abort_info.code;
2336
2337
0
  smp->flags = SMP_F_VOL_TXN;
2338
0
  smp->data.type = SMP_T_SINT;
2339
0
  smp->data.u.sint = code;
2340
2341
0
  return 1;
2342
0
}
2343
2344
/* Note: must not be declared <const> as its list will be overwritten.
2345
 * Note: fetches that may return multiple types should be declared using the
2346
 * appropriate pseudo-type. If not available it must be declared as the lowest
2347
 * common denominator, the type that can be casted into all other ones.
2348
 */
2349
static struct sample_fetch_kw_list sample_fetch_keywords = {ILH, {
2350
  { "bs.debug_str", smp_fetch_debug_str, ARG1(0,SINT), NULL, SMP_T_STR, SMP_USE_L5SRV },
2351
  { "bs.id", smp_fetch_sid, 0, NULL, SMP_T_SINT, SMP_USE_L5SRV },
2352
  { "bs.aborted", smp_fetch_strm_aborted, 0, NULL, SMP_T_SINT, SMP_USE_L5SRV },
2353
  { "bs.rst_code", smp_fetch_strm_rst_code, 0, NULL, SMP_T_SINT, SMP_USE_L5SRV },
2354
  { "fs.debug_str", smp_fetch_debug_str, ARG1(0,SINT), NULL, SMP_T_STR, SMP_USE_L5CLI },
2355
  { "fs.id", smp_fetch_sid, 0, NULL, SMP_T_STR, SMP_USE_L5CLI },
2356
  { "fs.aborted", smp_fetch_strm_aborted, 0, NULL, SMP_T_SINT, SMP_USE_L5CLI },
2357
  { "fs.rst_code", smp_fetch_strm_rst_code, 0, NULL, SMP_T_SINT, SMP_USE_L5CLI },
2358
  { /* END */ },
2359
}};
2360
2361
INITCALL1(STG_REGISTER, sample_register_fetches, &sample_fetch_keywords);