/src/haproxy/src/stconn.c
Line | Count | Source |
1 | | /* |
2 | | * stream connector management functions |
3 | | * |
4 | | * Copyright 2021 Christopher Faulet <cfaulet@haproxy.com> |
5 | | * |
6 | | * This program is free software; you can redistribute it and/or |
7 | | * modify it under the terms of the GNU General Public License |
8 | | * as published by the Free Software Foundation; either version |
9 | | * 2 of the License, or (at your option) any later version. |
10 | | * |
11 | | */ |
12 | | |
13 | | #include <haproxy/api.h> |
14 | | #include <haproxy/applet.h> |
15 | | #include <haproxy/arg.h> |
16 | | #include <haproxy/connection.h> |
17 | | #include <haproxy/check.h> |
18 | | #include <haproxy/filters.h> |
19 | | #include <haproxy/hstream.h> |
20 | | #include <haproxy/http_ana.h> |
21 | | #include <haproxy/pipe.h> |
22 | | #include <haproxy/pool.h> |
23 | | #include <haproxy/proxy.h> |
24 | | #include <haproxy/sample.h> |
25 | | #include <haproxy/sc_strm.h> |
26 | | #include <haproxy/stconn.h> |
27 | | #include <haproxy/xref.h> |
28 | | |
29 | | DECLARE_TYPED_POOL(pool_head_connstream, "stconn", struct stconn); |
30 | | DECLARE_TYPED_POOL(pool_head_sedesc, "sedesc", struct sedesc); |
31 | | |
32 | | static int sc_conn_recv(struct stconn *sc); |
33 | | static int sc_conn_send(struct stconn *sc); |
34 | | |
35 | | /* Initializes an endpoint */ |
36 | | void sedesc_init(struct sedesc *sedesc) |
37 | 0 | { |
38 | 0 | sedesc->se = NULL; |
39 | 0 | sedesc->conn = NULL; |
40 | 0 | sedesc->sc = NULL; |
41 | 0 | sedesc->lra = TICK_ETERNITY; |
42 | 0 | sedesc->fsb = TICK_ETERNITY; |
43 | 0 | sedesc->xref.peer = NULL; |
44 | 0 | se_fl_setall(sedesc, SE_FL_NONE); |
45 | 0 | sedesc->term_evts_log = 0; |
46 | 0 | sedesc->abort_info.info = 0; |
47 | 0 | sedesc->abort_info.code = 0; |
48 | |
|
49 | 0 | sedesc->iobuf.pipe = NULL; |
50 | 0 | sedesc->iobuf.buf = NULL; |
51 | 0 | sedesc->iobuf.offset = sedesc->iobuf.data = 0; |
52 | 0 | sedesc->iobuf.flags = IOBUF_FL_NONE; |
53 | |
|
54 | 0 | sedesc->kip = 0; |
55 | 0 | sedesc->kop = 0; |
56 | 0 | } |
57 | | |
58 | | /* Tries to alloc an endpoint and initialize it. Returns NULL on failure. */ |
59 | | struct sedesc *sedesc_new() |
60 | 0 | { |
61 | 0 | struct sedesc *sedesc; |
62 | |
|
63 | 0 | sedesc = pool_alloc(pool_head_sedesc); |
64 | 0 | if (unlikely(!sedesc)) |
65 | 0 | return NULL; |
66 | | |
67 | 0 | sedesc_init(sedesc); |
68 | 0 | return sedesc; |
69 | 0 | } |
70 | | |
71 | | /* Releases an endpoint. It is the caller responsibility to be sure it is safe |
72 | | * and it is not shared with another entity |
73 | | */ |
74 | | void sedesc_free(struct sedesc *sedesc) |
75 | 0 | { |
76 | 0 | if (sedesc) { |
77 | 0 | if (sedesc->iobuf.pipe) |
78 | 0 | put_pipe(sedesc->iobuf.pipe); |
79 | 0 | pool_free(pool_head_sedesc, sedesc); |
80 | 0 | } |
81 | 0 | } |
82 | | |
83 | | /* Performs a shutdown on the endpoint. This function deals with connection and |
84 | | * applet endpoints. It is responsible to set SE flags corresponding to the |
85 | | * given shut modes and to call right shutdown functions of the endpoint. It is |
86 | | * called from the sc_abort and sc_shutdown functions at the SC level. |
87 | | */ |
88 | | void se_shutdown(struct sedesc *sedesc, enum se_shut_mode mode) |
89 | 0 | { |
90 | 0 | struct sedesc *sdo; |
91 | 0 | struct se_abort_info *reason = NULL; |
92 | 0 | unsigned int flags = 0; |
93 | |
|
94 | 0 | if ((mode & (SE_SHW_SILENT|SE_SHW_NORMAL)) && !se_fl_test(sedesc, SE_FL_SHW)) { |
95 | 0 | se_report_term_evt(sedesc, se_tevt_type_shutw); |
96 | 0 | flags |= (mode & SE_SHW_NORMAL) ? SE_FL_SHWN : SE_FL_SHWS; |
97 | 0 | } |
98 | 0 | if ((mode & (SE_SHR_RESET|SE_SHR_DRAIN)) && !se_fl_test(sedesc, SE_FL_SHR)) |
99 | 0 | flags |= (mode & SE_SHR_DRAIN) ? SE_FL_SHRD : SE_FL_SHRR; |
100 | |
|
101 | 0 | if (se_fl_test(sedesc, SE_FL_T_MUX)) { |
102 | 0 | const struct mux_ops *mux = (sedesc->conn ? sedesc->conn->mux : NULL); |
103 | |
|
104 | 0 | if (flags) { |
105 | 0 | if (mux && mux->shut) { |
106 | 0 | sdo = se_opposite(sedesc); |
107 | 0 | if (sdo) |
108 | 0 | reason = &sdo->abort_info; |
109 | 0 | CALL_MUX_NO_RET(mux, shut(sedesc->sc, mode, reason)); |
110 | 0 | } |
111 | 0 | se_fl_set(sedesc, flags); |
112 | 0 | } |
113 | 0 | } |
114 | 0 | else if (se_fl_test(sedesc, SE_FL_T_APPLET)) { |
115 | 0 | struct appctx *appctx = sedesc->se; |
116 | |
|
117 | 0 | if (flags) { |
118 | 0 | if (appctx->applet->shut) { |
119 | 0 | sdo = se_opposite(sedesc); |
120 | 0 | if (sdo) |
121 | 0 | reason = &sdo->abort_info; |
122 | 0 | appctx->applet->shut(appctx, mode, reason); |
123 | 0 | } |
124 | 0 | se_fl_set(sedesc, flags); |
125 | 0 | } |
126 | |
|
127 | 0 | if (se_fl_test(sedesc, SE_FL_SHR) && se_fl_test(sedesc, SE_FL_SHW)) |
128 | 0 | appctx_shut(appctx); |
129 | 0 | } |
130 | 0 | } |
131 | | |
132 | | /* Tries to allocate a new stconn and initialize its main fields. On |
133 | | * failure, nothing is allocated and NULL is returned. It is an internal |
134 | | * function. The caller must, at least, set the SE_FL_ORPHAN or SE_FL_DETACHED |
135 | | * flag. |
136 | | */ |
137 | | static struct stconn *sc_new(struct sedesc *sedesc) |
138 | 0 | { |
139 | 0 | struct stconn *sc; |
140 | |
|
141 | 0 | sc = pool_alloc(pool_head_connstream); |
142 | |
|
143 | 0 | if (unlikely(!sc)) |
144 | 0 | goto alloc_error; |
145 | | |
146 | 0 | sc->obj_type = OBJ_TYPE_SC; |
147 | 0 | sc->flags = SC_FL_NONE; |
148 | 0 | sc->state = SC_ST_INI; |
149 | 0 | sc->ioto = TICK_ETERNITY; |
150 | 0 | sc->room_needed = 0; |
151 | 0 | sc->app = NULL; |
152 | 0 | sc->src = NULL; |
153 | 0 | sc->dst = NULL; |
154 | 0 | sc->bytes_in = sc->bytes_out = 0; |
155 | 0 | sc->wait_event.tasklet = NULL; |
156 | 0 | sc->wait_event.events = 0; |
157 | |
|
158 | 0 | sc->term_evts_log = 0; |
159 | | |
160 | | /* If there is no endpoint, allocate a new one now */ |
161 | 0 | if (!sedesc) { |
162 | 0 | sedesc = sedesc_new(); |
163 | 0 | if (unlikely(!sedesc)) |
164 | 0 | goto alloc_error; |
165 | 0 | } |
166 | 0 | sc->sedesc = sedesc; |
167 | 0 | sedesc->sc = sc; |
168 | |
|
169 | 0 | return sc; |
170 | | |
171 | 0 | alloc_error: |
172 | 0 | pool_free(pool_head_connstream, sc); |
173 | 0 | return NULL; |
174 | 0 | } |
175 | | |
176 | | /* Creates a new stream connector and its associated stream from a mux. <sd> must |
177 | | * be defined. It returns NULL on error. On success, the new stream connector is |
178 | | * returned. In this case, SE_FL_ORPHAN flag is removed. |
179 | | */ |
180 | | struct stconn *sc_new_from_endp(struct sedesc *sd, struct session *sess, struct buffer *input) |
181 | 0 | { |
182 | 0 | struct stconn *sc; |
183 | |
|
184 | 0 | sc = sc_new(sd); |
185 | 0 | if (unlikely(!sc)) |
186 | 0 | return NULL; |
187 | 0 | if (unlikely(!sess->fe->stream_new_from_sc(sess, sc, input))) { |
188 | 0 | sd->sc = NULL; |
189 | 0 | if (sc->sedesc != sd) { |
190 | | /* none was provided so sc_new() allocated one */ |
191 | 0 | sedesc_free(sc->sedesc); |
192 | 0 | } |
193 | 0 | pool_free(pool_head_connstream, sc); |
194 | 0 | se_fl_set(sd, SE_FL_ORPHAN); |
195 | 0 | return NULL; |
196 | 0 | } |
197 | 0 | se_fl_clr(sd, SE_FL_ORPHAN); |
198 | 0 | return sc; |
199 | 0 | } |
200 | | |
201 | | /* Creates a new stream connector from an stream. There is no endpoint here, thus it |
202 | | * will be created by sc_new(). So the SE_FL_DETACHED flag is set. It returns |
203 | | * NULL on error. On success, the new stream connector is returned. |
204 | | */ |
205 | | struct stconn *sc_new_from_strm(struct stream *strm, unsigned int flags) |
206 | 0 | { |
207 | 0 | struct stconn *sc; |
208 | |
|
209 | 0 | sc = sc_new(NULL); |
210 | 0 | if (unlikely(!sc)) |
211 | 0 | return NULL; |
212 | 0 | sc->flags |= flags; |
213 | |
|
214 | 0 | if (flags & SC_FL_ISBACK) |
215 | 0 | sc_ep_set(sc, SE_FL_APP_STARTED); |
216 | |
|
217 | 0 | sc_ep_set(sc, SE_FL_DETACHED); |
218 | 0 | sc->app = &strm->obj_type; |
219 | 0 | return sc; |
220 | 0 | } |
221 | | |
222 | | /* Creates a new stream connector from an health-check. There is no endpoint here, |
223 | | * thus it will be created by sc_new(). So the SE_FL_DETACHED flag is set. It |
224 | | * returns NULL on error. On success, the new stream connector is returned. |
225 | | */ |
226 | | struct stconn *sc_new_from_check(struct check *check) |
227 | 0 | { |
228 | 0 | struct stconn *sc; |
229 | |
|
230 | 0 | sc = sc_new(NULL); |
231 | 0 | if (unlikely(!sc)) |
232 | 0 | return NULL; |
233 | 0 | sc->flags = SC_FL_ISBACK; |
234 | 0 | sc_ep_set(sc, SE_FL_APP_STARTED); |
235 | 0 | sc_ep_set(sc, SE_FL_DETACHED); |
236 | 0 | sc->app = &check->obj_type; |
237 | 0 | return sc; |
238 | 0 | } |
239 | | |
240 | | /* Releases a stconn previously allocated by sc_new(), as well as its |
241 | | * endpoint, if it exists. This function is called internally or on error path. |
242 | | */ |
243 | | void sc_free(struct stconn *sc) |
244 | 0 | { |
245 | 0 | sockaddr_free(&sc->src); |
246 | 0 | sockaddr_free(&sc->dst); |
247 | 0 | if (sc->sedesc) { |
248 | 0 | BUG_ON(!sc_ep_test(sc, SE_FL_DETACHED)); |
249 | 0 | sedesc_free(sc->sedesc); |
250 | 0 | } |
251 | 0 | tasklet_free(sc->wait_event.tasklet); |
252 | 0 | pool_free(pool_head_connstream, sc); |
253 | 0 | } |
254 | | |
255 | | /* Conditionally removes a stream connector if it is detached and if there is no app |
256 | | * layer defined. Except on error path, this one must be used. if release, the |
257 | | * pointer on the SC is set to NULL. |
258 | | */ |
259 | | static void sc_free_cond(struct stconn **scp) |
260 | 0 | { |
261 | 0 | struct stconn *sc = *scp; |
262 | |
|
263 | 0 | if (!sc->app && (!sc->sedesc || sc_ep_test(sc, SE_FL_DETACHED))) { |
264 | 0 | sc_free(sc); |
265 | 0 | *scp = NULL; |
266 | 0 | } |
267 | 0 | } |
268 | | |
269 | | |
270 | | /* Attaches a stconn to a mux endpoint and sets the endpoint ctx. Returns |
271 | | * -1 on error and 0 on success. SE_FL_DETACHED flag is removed. This function is |
272 | | * called from a mux when it is attached to a stream or a health-check. |
273 | | */ |
274 | | int sc_attach_mux(struct stconn *sc, void *sd, void *ctx) |
275 | 0 | { |
276 | 0 | struct connection *conn = ctx; |
277 | 0 | struct sedesc *sedesc = sc->sedesc; |
278 | |
|
279 | 0 | if (sc_strm(sc)) { |
280 | 0 | if (!sc->wait_event.tasklet) { |
281 | 0 | sc->wait_event.tasklet = tasklet_new(); |
282 | 0 | if (!sc->wait_event.tasklet) |
283 | 0 | return -1; |
284 | 0 | sc->wait_event.tasklet->process = sc_conn_io_cb; |
285 | 0 | sc->wait_event.tasklet->context = sc; |
286 | 0 | sc->wait_event.events = 0; |
287 | 0 | } |
288 | | |
289 | 0 | xref_create(&sc->sedesc->xref, &sc_opposite(sc)->sedesc->xref); |
290 | 0 | } |
291 | 0 | else if (sc_check(sc)) { |
292 | 0 | if (!sc->wait_event.tasklet) { |
293 | 0 | sc->wait_event.tasklet = tasklet_new(); |
294 | 0 | if (!sc->wait_event.tasklet) |
295 | 0 | return -1; |
296 | 0 | sc->wait_event.tasklet->process = srv_chk_io_cb; |
297 | 0 | sc->wait_event.tasklet->context = sc; |
298 | 0 | sc->wait_event.events = 0; |
299 | 0 | } |
300 | 0 | } |
301 | | |
302 | 0 | sedesc->se = sd; |
303 | 0 | sedesc->conn = ctx; |
304 | 0 | se_fl_set(sedesc, SE_FL_T_MUX); |
305 | 0 | se_fl_clr(sedesc, SE_FL_DETACHED); |
306 | 0 | if (!conn->ctx) |
307 | 0 | conn->ctx = sc; |
308 | 0 | return 0; |
309 | 0 | } |
310 | | |
311 | | /* Attaches a stconn to an applet endpoint and sets the endpoint |
312 | | * ctx. Returns -1 on error and 0 on success. SE_FL_DETACHED flag is |
313 | | * removed. This function is called by a stream when a backend applet is |
314 | | * registered. |
315 | | */ |
316 | | static int sc_attach_applet(struct stconn *sc, struct appctx *appctx) |
317 | 0 | { |
318 | 0 | sc->sedesc->se = appctx; |
319 | 0 | sc_ep_set(sc, SE_FL_T_APPLET); |
320 | 0 | sc_ep_clr(sc, SE_FL_DETACHED); |
321 | 0 | if (sc_strm(sc)) |
322 | 0 | xref_create(&sc->sedesc->xref, &sc_opposite(sc)->sedesc->xref); |
323 | |
|
324 | 0 | return 0; |
325 | 0 | } |
326 | | |
327 | | /* Attaches a stconn to a app layer and sets the relevant |
328 | | * callbacks. Returns -1 on error and 0 on success. SE_FL_ORPHAN flag is |
329 | | * removed. This function is called by a stream when it is created to attach it |
330 | | * on the stream connector on the client side. |
331 | | */ |
332 | | int sc_attach_strm(struct stconn *sc, struct stream *strm) |
333 | 0 | { |
334 | 0 | sc->app = &strm->obj_type; |
335 | 0 | sc_ep_clr(sc, SE_FL_ORPHAN); |
336 | 0 | sc_ep_report_read_activity(sc); |
337 | 0 | if (sc_ep_test(sc, SE_FL_T_MUX)) { |
338 | 0 | sc->wait_event.tasklet = tasklet_new(); |
339 | 0 | if (!sc->wait_event.tasklet) |
340 | 0 | return -1; |
341 | 0 | sc->wait_event.tasklet->process = sc_conn_io_cb; |
342 | 0 | sc->wait_event.tasklet->context = sc; |
343 | 0 | sc->wait_event.events = 0; |
344 | 0 | } |
345 | 0 | return 0; |
346 | 0 | } |
347 | | |
348 | | /* Attach a stconn to a haterm layer and sets the relevant |
349 | | * callbacks. Returns -1 on error and 0 on success. SE_FL_ORPHAN flag is |
350 | | * removed. This function is called by a haterm stream when it is created |
351 | | * to attach it on the stream connector on the client side. |
352 | | */ |
353 | | int sc_attach_hstream(struct stconn *sc, struct hstream *hs) |
354 | 0 | { |
355 | 0 | BUG_ON(!sc_ep_test(sc, SE_FL_T_MUX)); |
356 | |
|
357 | 0 | sc->app = &hs->obj_type; |
358 | 0 | sc_ep_clr(sc, SE_FL_ORPHAN); |
359 | 0 | sc_ep_report_read_activity(sc); |
360 | 0 | sc->wait_event.tasklet = tasklet_new(); |
361 | 0 | if (!sc->wait_event.tasklet) |
362 | 0 | return -1; |
363 | | |
364 | 0 | sc->wait_event.tasklet->process = sc_hstream_io_cb; |
365 | 0 | sc->wait_event.tasklet->context = sc; |
366 | 0 | sc->wait_event.events = 0; |
367 | 0 | return 0; |
368 | 0 | } |
369 | | |
370 | | /* Detaches the stconn from the endpoint, if any. For a connecrion, if a |
371 | | * mux owns the connection ->detach() callback is called. Otherwise, it means |
372 | | * the stream connector owns the connection. In this case the connection is closed |
373 | | * and released. For an applet, the appctx is released. If still allocated, the |
374 | | * endpoint is reset and flag as detached. If the app layer is also detached, |
375 | | * the stream connector is released. |
376 | | */ |
377 | | static void sc_detach_endp(struct stconn **scp) |
378 | 0 | { |
379 | 0 | struct stconn *sc = *scp; |
380 | 0 | struct xref *peer; |
381 | |
|
382 | 0 | if (!sc) |
383 | 0 | return; |
384 | | |
385 | | |
386 | | /* Remove my link in the original objects. */ |
387 | 0 | peer = xref_get_peer_and_lock(&sc->sedesc->xref); |
388 | 0 | if (peer) |
389 | 0 | xref_disconnect(&sc->sedesc->xref, peer); |
390 | |
|
391 | 0 | if (sc_ep_test(sc, SE_FL_T_MUX)) { |
392 | 0 | struct connection *conn = __sc_conn(sc); |
393 | 0 | struct sedesc *sedesc = sc->sedesc; |
394 | |
|
395 | 0 | if (conn->mux) { |
396 | 0 | if (sc->wait_event.events != 0) |
397 | 0 | conn->mux->unsubscribe(sc, sc->wait_event.events, &sc->wait_event); |
398 | 0 | se_fl_set(sedesc, SE_FL_ORPHAN); |
399 | 0 | sedesc->sc = NULL; |
400 | 0 | sc->sedesc = NULL; |
401 | 0 | CALL_MUX_NO_RET(conn->mux, detach(sedesc)); |
402 | 0 | } |
403 | 0 | else { |
404 | | /* It's too early to have a mux, let's just destroy |
405 | | * the connection |
406 | | */ |
407 | 0 | conn_stop_tracking(conn); |
408 | 0 | conn_full_close(conn); |
409 | 0 | if (conn->destroy_cb) |
410 | 0 | conn->destroy_cb(conn); |
411 | 0 | conn_free(conn); |
412 | 0 | } |
413 | 0 | } |
414 | 0 | else if (sc_ep_test(sc, SE_FL_T_APPLET)) { |
415 | 0 | struct appctx *appctx = __sc_appctx(sc); |
416 | |
|
417 | 0 | sc_ep_set(sc, SE_FL_ORPHAN); |
418 | 0 | sc->sedesc->sc = NULL; |
419 | 0 | sc->sedesc = NULL; |
420 | 0 | se_shutdown(appctx->sedesc, SE_SHR_RESET|SE_SHW_NORMAL); |
421 | 0 | appctx_free(appctx); |
422 | 0 | } |
423 | |
|
424 | 0 | if (sc->sedesc) { |
425 | | /* the SD wasn't used and can be recycled */ |
426 | 0 | sc->sedesc->se = NULL; |
427 | 0 | sc->sedesc->conn = NULL; |
428 | 0 | sc->sedesc->flags = 0; |
429 | 0 | sc_ep_set(sc, SE_FL_DETACHED); |
430 | 0 | } |
431 | | |
432 | | /* FIXME: Rest SC for now but must be reviewed. SC flags are only |
433 | | * connection related for now but this will evolved |
434 | | */ |
435 | 0 | sc->flags &= SC_FL_ISBACK; |
436 | 0 | sc_free_cond(scp); |
437 | 0 | } |
438 | | |
439 | | /* Detaches the stconn from the app layer. If there is no endpoint attached |
440 | | * to the stconn |
441 | | */ |
442 | | static void sc_detach_app(struct stconn **scp) |
443 | 0 | { |
444 | 0 | struct stconn *sc = *scp; |
445 | |
|
446 | 0 | if (!sc) |
447 | 0 | return; |
448 | | |
449 | 0 | sc->app = NULL; |
450 | 0 | sockaddr_free(&sc->src); |
451 | 0 | sockaddr_free(&sc->dst); |
452 | |
|
453 | 0 | tasklet_free(sc->wait_event.tasklet); |
454 | 0 | sc->wait_event.tasklet = NULL; |
455 | 0 | sc->wait_event.events = 0; |
456 | 0 | sc_free_cond(scp); |
457 | 0 | } |
458 | | |
459 | | /* Destroy the stconn. It is detached from its endpoint and its |
460 | | * application. After this call, the stconn must be considered as released. |
461 | | */ |
462 | | void sc_destroy(struct stconn *sc) |
463 | 0 | { |
464 | 0 | sc_detach_endp(&sc); |
465 | 0 | sc_detach_app(&sc); |
466 | 0 | BUG_ON_HOT(sc); |
467 | 0 | } |
468 | | |
469 | | /* Resets the stream connector endpoint. It happens when the app layer want to renew |
470 | | * its endpoint. For a connection retry for instance. If a mux or an applet is |
471 | | * attached, a new endpoint is created. Returns -1 on error and 0 on success. |
472 | | */ |
473 | | int sc_reset_endp(struct stconn *sc) |
474 | 0 | { |
475 | 0 | struct sedesc *new_sd; |
476 | |
|
477 | 0 | BUG_ON(!sc->app); |
478 | |
|
479 | 0 | if (!__sc_endp(sc)) { |
480 | | /* endpoint not attached or attached to a mux with no |
481 | | * target. Thus the endpoint will not be release but just |
482 | | * reset. The app is still attached, the sc will not be |
483 | | * released. |
484 | | */ |
485 | 0 | sc_detach_endp(&sc); |
486 | 0 | return 0; |
487 | 0 | } |
488 | | |
489 | | /* allocate the new endpoint first to be able to set error if it |
490 | | * fails */ |
491 | 0 | new_sd = sedesc_new(); |
492 | 0 | if (!unlikely(new_sd)) |
493 | 0 | return -1; |
494 | | |
495 | | /* The app is still attached, the sc will not be released */ |
496 | 0 | sc_detach_endp(&sc); |
497 | 0 | BUG_ON(!sc); |
498 | 0 | BUG_ON(sc->sedesc); |
499 | 0 | sc->sedesc = new_sd; |
500 | 0 | sc->sedesc->sc = sc; |
501 | 0 | sc->bytes_in = sc->bytes_out = 0; |
502 | 0 | sc_ep_set(sc, SE_FL_DETACHED); |
503 | 0 | return 0; |
504 | 0 | } |
505 | | |
506 | | |
507 | | /* Create an applet to handle a stream connector as a new appctx. The SC will |
508 | | * wake it up every time it is solicited. The appctx must be deleted by the task |
509 | | * handler using sc_detach_endp(), possibly from within the function itself. |
510 | | * It also pre-initializes the applet's context and returns it (or NULL in case |
511 | | * it could not be allocated). |
512 | | */ |
513 | | struct appctx *sc_applet_create(struct stconn *sc, struct applet *app) |
514 | 0 | { |
515 | 0 | struct appctx *appctx; |
516 | |
|
517 | 0 | appctx = appctx_new_here(app, sc->sedesc); |
518 | 0 | if (!appctx) |
519 | 0 | return NULL; |
520 | 0 | if (sc_attach_applet(sc, appctx) == -1) { |
521 | 0 | appctx_free_on_early_error(appctx); |
522 | 0 | return NULL; |
523 | 0 | } |
524 | 0 | appctx->t->nice = __sc_strm(sc)->task->nice; |
525 | 0 | applet_need_more_data(appctx); |
526 | 0 | appctx_wakeup(appctx); |
527 | |
|
528 | 0 | sc->state = SC_ST_RDY; |
529 | 0 | return appctx; |
530 | 0 | } |
531 | | |
532 | | /* Conditionally forward the close to the write side. It return 1 if it can be |
533 | | * forwarded. It is the caller responsibility to forward the close to the write |
534 | | * side. Otherwise, 0 is returned. In this case, SC_FL_SHUT_WANTED flag may be set on |
535 | | * the consumer SC if we are only waiting for the outgoing data to be flushed. |
536 | | */ |
537 | | static inline int sc_cond_forward_shut(struct stconn *sc) |
538 | 0 | { |
539 | | /* The close must not be forwarded */ |
540 | 0 | if (!(sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) || !(sc->flags & SC_FL_NOHALF)) |
541 | 0 | return 0; |
542 | | |
543 | 0 | if ((co_data(sc_ic(sc)) || sc_ep_have_ff_data(sc_opposite(sc))) && !(sc_ic(sc)->flags & CF_WRITE_TIMEOUT)) { |
544 | | /* the shutdown cannot be forwarded now because |
545 | | * we should flush outgoing data first. But instruct the output |
546 | | * channel it should be done ASAP. |
547 | | */ |
548 | 0 | sc_schedule_shutdown(sc); |
549 | 0 | return 0; |
550 | 0 | } |
551 | | |
552 | | /* the close can be immediately forwarded to the write side */ |
553 | 0 | return 1; |
554 | 0 | } |
555 | | |
556 | | |
557 | | static inline int sc_is_fastfwd_supported(struct stconn *sc) |
558 | 0 | { |
559 | 0 | return (!(global.tune.no_zero_copy_fwd & NO_ZERO_COPY_FWD) && |
560 | 0 | !(sc->flags & SC_FL_NO_FASTFWD) && |
561 | 0 | sc_ep_test(sc, SE_FL_MAY_FASTFWD_PROD) && |
562 | 0 | sc_ep_test(sc_opposite(sc), SE_FL_MAY_FASTFWD_CONS) && |
563 | 0 | sc_ic(sc)->to_forward); |
564 | 0 | } |
565 | | |
566 | | /* |
567 | | * This function performs a shutdown-read on a detached stream connector in a |
568 | | * connected or init state (it does nothing for other states). It either shuts |
569 | | * the read side or marks itself as closed. The buffer flags are updated to |
570 | | * reflect the new state. If the stream connector has SC_FL_NOHALF, we also |
571 | | * forward the close to the write side. The owner task is woken up if it exists. |
572 | | */ |
573 | | void sc_abort(struct stconn *sc) |
574 | 0 | { |
575 | 0 | struct channel *ic = sc_ic(sc); |
576 | |
|
577 | 0 | BUG_ON(!sc_strm(sc)); |
578 | |
|
579 | 0 | if (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) |
580 | 0 | return; |
581 | | |
582 | 0 | sc->flags |= SC_FL_ABRT_DONE; |
583 | 0 | ic->flags |= CF_READ_EVENT; |
584 | |
|
585 | 0 | if (!sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST)) |
586 | 0 | return; |
587 | | |
588 | 0 | if (sc->flags & SC_FL_SHUT_DONE) { |
589 | 0 | if (sc_ep_test(sc, SE_FL_T_MUX|SE_FL_T_APPLET)) |
590 | 0 | se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_SILENT); |
591 | |
|
592 | 0 | sc->state = SC_ST_DIS; |
593 | 0 | if (sc->flags & SC_FL_ISBACK) |
594 | 0 | __sc_strm(sc)->conn_exp = TICK_ETERNITY; |
595 | 0 | } |
596 | 0 | else if (sc_cond_forward_shut(sc)) |
597 | 0 | return sc_shutdown(sc); |
598 | | |
599 | 0 | if (!sc_ep_test(sc, SE_FL_T_MUX|SE_FL_T_APPLET)) { |
600 | | /* note that if the task exists, it must unregister itself once it runs */ |
601 | 0 | if (!(sc->flags & SC_FL_DONT_WAKE)) |
602 | 0 | task_wakeup(sc_strm_task(sc), TASK_WOKEN_IO); |
603 | 0 | } |
604 | 0 | } |
605 | | |
606 | | /* |
607 | | * This function performs a shutdown-write on a detached stream connector in a |
608 | | * connected or init state (it does nothing for other states). It either shuts |
609 | | * the write side or marks itself as closed. The buffer flags are updated to |
610 | | * reflect the new state. It does also close everything if the SC was marked as |
611 | | * being in error state. The owner task is woken up if it exists. |
612 | | */ |
613 | | void sc_shutdown(struct stconn *sc) |
614 | 0 | { |
615 | |
|
616 | 0 | struct channel *ic = sc_ic(sc); |
617 | 0 | struct channel *oc = sc_oc(sc); |
618 | |
|
619 | 0 | BUG_ON(!sc_strm(sc)); |
620 | |
|
621 | 0 | sc->flags &= ~SC_FL_SHUT_WANTED; |
622 | 0 | if (sc->flags & SC_FL_SHUT_DONE) |
623 | 0 | return; |
624 | 0 | sc->flags |= SC_FL_SHUT_DONE; |
625 | 0 | oc->flags |= CF_WRITE_EVENT; |
626 | 0 | sc_set_hcto(sc); |
627 | 0 | sc_report_term_evt(sc, strm_tevt_type_shutw); |
628 | |
|
629 | 0 | if (sc_ep_test(sc, SE_FL_T_APPLET)) { |
630 | | /* on shutw we always wake the applet up */ |
631 | 0 | appctx_wakeup(__sc_appctx(sc)); |
632 | 0 | } |
633 | |
|
634 | 0 | switch (sc->state) { |
635 | 0 | case SC_ST_RDY: |
636 | 0 | case SC_ST_EST: |
637 | | /* we have to shut before closing, otherwise some short messages |
638 | | * may never leave the system, especially when there are remaining |
639 | | * unread data in the socket input buffer, or when nolinger is set. |
640 | | * However, if SC_FL_NOLINGER is explicitly set, we know there is |
641 | | * no risk so we close both sides immediately. |
642 | | */ |
643 | 0 | if (!(sc->flags & (SC_FL_ERROR|SC_FL_NOLINGER|SC_FL_EOS|SC_FL_ABRT_DONE)) && |
644 | 0 | !(ic->flags & CF_DONT_READ)) { |
645 | 0 | if (sc_ep_test(sc, SE_FL_T_MUX|SE_FL_T_APPLET)) |
646 | 0 | se_shutdown(sc->sedesc, SE_SHW_NORMAL); |
647 | 0 | return; |
648 | 0 | } |
649 | | |
650 | 0 | if (sc_ep_test(sc, SE_FL_T_MUX|SE_FL_T_APPLET)) |
651 | 0 | se_shutdown(sc->sedesc, SE_SHR_RESET|((sc->flags & SC_FL_NOLINGER) ? SE_SHW_SILENT : SE_SHW_NORMAL)); |
652 | |
|
653 | 0 | sc->state = SC_ST_DIS; |
654 | 0 | break; |
655 | | |
656 | 0 | case SC_ST_CON: |
657 | 0 | if (sc_ep_test(sc, SE_FL_T_MUX)) { |
658 | | /* we may have to close a pending connection, and mark the |
659 | | * response buffer as abort |
660 | | */ |
661 | 0 | se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_SILENT); |
662 | 0 | } |
663 | 0 | __fallthrough; |
664 | 0 | case SC_ST_CER: |
665 | 0 | case SC_ST_QUE: |
666 | 0 | case SC_ST_TAR: |
667 | | /* Note that none of these states may happen with applets */ |
668 | 0 | sc->state = SC_ST_DIS; |
669 | 0 | break; |
670 | 0 | default: |
671 | 0 | break; |
672 | 0 | } |
673 | | |
674 | 0 | sc->flags &= ~SC_FL_NOLINGER; |
675 | 0 | if (!(sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))) |
676 | 0 | sc->flags |= SC_FL_ABRT_DONE; |
677 | 0 | if (sc->flags & SC_FL_ISBACK) |
678 | 0 | __sc_strm(sc)->conn_exp = TICK_ETERNITY; |
679 | |
|
680 | 0 | if (!sc_ep_test(sc, SE_FL_T_MUX|SE_FL_T_APPLET)) { |
681 | | /* note that if the task exists, it must unregister itself once it runs */ |
682 | 0 | if (!(sc->flags & SC_FL_DONT_WAKE)) |
683 | 0 | task_wakeup(sc_strm_task(sc), TASK_WOKEN_IO); |
684 | 0 | } |
685 | 0 | } |
686 | | |
687 | | /* This is to be used after making some room available in a channel. It will |
688 | | * return without doing anything if the stream connector's RX path is blocked. |
689 | | * It will automatically mark the stream connector as busy processing the end |
690 | | * point in order to avoid useless repeated wakeups. |
691 | | * It will then woken the right entity to enable receipt of new data. |
692 | | */ |
693 | | void sc_chk_rcv(struct stconn *sc) |
694 | 0 | { |
695 | 0 | BUG_ON(!sc_strm(sc)); |
696 | |
|
697 | 0 | if (sc_ep_test(sc, SE_FL_APPLET_NEED_CONN) && |
698 | 0 | sc_state_in(sc_opposite(sc)->state, SC_SB_RDY|SC_SB_EST|SC_SB_DIS|SC_SB_CLO)) { |
699 | 0 | sc_ep_clr(sc, SE_FL_APPLET_NEED_CONN); |
700 | 0 | sc_ep_report_read_activity(sc); |
701 | 0 | } |
702 | |
|
703 | 0 | if (!sc_is_recv_allowed(sc)) |
704 | 0 | return; |
705 | | |
706 | 0 | if (!sc_state_in(sc->state, SC_SB_RDY|SC_SB_EST)) |
707 | 0 | return; |
708 | | |
709 | 0 | sc_ep_set(sc, SE_FL_HAVE_NO_DATA); |
710 | | |
711 | | /* (re)start reading */ |
712 | 0 | if (sc_ep_test(sc, SE_FL_T_MUX)) { |
713 | 0 | if (sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST)) |
714 | 0 | tasklet_wakeup(sc->wait_event.tasklet, TASK_WOKEN_IO); |
715 | 0 | } |
716 | 0 | else if (sc_ep_test(sc, SE_FL_T_APPLET)) { |
717 | 0 | if (!sc_ep_have_ff_data(sc_opposite(sc))) |
718 | 0 | appctx_wakeup(__sc_appctx(sc)); |
719 | 0 | } |
720 | 0 | else { |
721 | | /* In theory, it should not happen. This CHECK_IF will be used to validate it (or not...) */ |
722 | 0 | CHECK_IF(!sc_ep_test(sc, SE_FL_T_MUX|SE_FL_T_APPLET)); |
723 | 0 | if (!(sc->flags & SC_FL_DONT_WAKE)) |
724 | 0 | task_wakeup(sc_strm_task(sc), TASK_WOKEN_IO); |
725 | 0 | } |
726 | 0 | } |
727 | | |
728 | | |
729 | | /* This function is used for inter-stream connector calls. It is called by the |
730 | | * producer to inform the consumer side that it may be interested in checking |
731 | | * for data in the buffer. Note that it intentionally does not update timeouts, |
732 | | * so that we can still check them later at wake-up. |
733 | | */ |
734 | | static inline void sc_chk_snd(struct stconn *sc) |
735 | 0 | { |
736 | 0 | struct channel *oc = sc_oc(sc); |
737 | |
|
738 | 0 | BUG_ON(!sc_strm(sc)); |
739 | |
|
740 | 0 | if (sc_ep_test(sc, SE_FL_T_MUX)) { |
741 | 0 | if (unlikely(!sc_state_in(sc->state, SC_SB_RDY|SC_SB_EST) || |
742 | 0 | (sc->flags & SC_FL_SHUT_DONE))) |
743 | 0 | return; |
744 | | |
745 | 0 | if (unlikely(!co_data(oc) && !sc_ep_have_ff_data(sc))) /* called with nothing to send ! */ |
746 | 0 | return; |
747 | | |
748 | 0 | if (!sc_ep_have_ff_data(sc) && /* data wants to be fast-forwarded ASAP */ |
749 | 0 | !sc_ep_test(sc, SE_FL_WAIT_DATA)) /* not waiting for data */ |
750 | 0 | return; |
751 | | |
752 | 0 | if (!(sc->wait_event.events & SUB_RETRY_SEND)) |
753 | 0 | sc_conn_send(sc); |
754 | |
|
755 | 0 | if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING) || sc_is_conn_error(sc)) { |
756 | | /* Write error on the file descriptor */ |
757 | 0 | BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING)); |
758 | 0 | goto out_wakeup; |
759 | 0 | } |
760 | | |
761 | | /* OK, so now we know that some data might have been sent, and that we may |
762 | | * have to poll first. We have to do that too if the buffer is not empty. |
763 | | */ |
764 | 0 | if (!co_data(oc) && !sc_ep_have_ff_data(sc)) { |
765 | | /* the connection is established but we can't write. Either the |
766 | | * buffer is empty, or we just refrain from sending because the |
767 | | * ->o limit was reached. Maybe we just wrote the last |
768 | | * chunk and need to close. |
769 | | */ |
770 | 0 | if ((oc->flags & CF_AUTO_CLOSE) && |
771 | 0 | ((sc->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) == SC_FL_SHUT_WANTED) && |
772 | 0 | sc_state_in(sc->state, SC_SB_RDY|SC_SB_EST)) { |
773 | 0 | sc_shutdown(sc); |
774 | 0 | goto out_wakeup; |
775 | 0 | } |
776 | | |
777 | 0 | if ((sc->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) == 0) |
778 | 0 | sc_ep_set(sc, SE_FL_WAIT_DATA); |
779 | 0 | } |
780 | 0 | else { |
781 | | /* Otherwise there are remaining data to be sent in the buffer, |
782 | | * which means we have to poll before doing so. |
783 | | */ |
784 | 0 | sc_ep_clr(sc, SE_FL_WAIT_DATA); |
785 | 0 | } |
786 | | |
787 | | /* in case of special condition (error, shutdown, end of write...), we |
788 | | * have to notify the task. |
789 | | */ |
790 | 0 | if (likely((sc->flags & SC_FL_SHUT_DONE) || |
791 | 0 | ((oc->flags & CF_WRITE_EVENT) && sc->state < SC_ST_EST) || |
792 | 0 | ((oc->flags & CF_WAKE_WRITE) && |
793 | 0 | ((!co_data(oc) && !oc->to_forward) || |
794 | 0 | !sc_state_in(sc->state, SC_SB_EST))))) { |
795 | 0 | out_wakeup: |
796 | 0 | if (!(sc->flags & SC_FL_DONT_WAKE)) |
797 | 0 | task_wakeup(sc_strm_task(sc), TASK_WOKEN_IO); |
798 | 0 | } |
799 | 0 | } |
800 | 0 | else if (sc_ep_test(sc, SE_FL_T_APPLET)) { |
801 | 0 | if (unlikely(sc->state != SC_ST_EST || (sc->flags & SC_FL_SHUT_DONE))) |
802 | 0 | return; |
803 | | |
804 | | /* we only wake the applet up if it was waiting for some data and is ready to consume it */ |
805 | 0 | if (!sc_ep_test(sc, SE_FL_WAIT_DATA|SE_FL_WONT_CONSUME)) |
806 | 0 | return; |
807 | | |
808 | 0 | if (co_data(oc) || sc_ep_have_ff_data(sc)) { |
809 | | /* (re)start sending */ |
810 | 0 | appctx_wakeup(__sc_appctx(sc)); |
811 | 0 | } |
812 | 0 | } |
813 | 0 | else { |
814 | | /* In theory, it should not happen. This CHECK_IF will be used to validate it (or not...) */ |
815 | 0 | CHECK_IF(!sc_ep_test(sc, SE_FL_T_MUX|SE_FL_T_APPLET)); |
816 | |
|
817 | 0 | if (unlikely(sc->state != SC_ST_EST || (sc->flags & SC_FL_SHUT_DONE))) |
818 | 0 | return; |
819 | | |
820 | 0 | if (!sc_ep_test(sc, SE_FL_WAIT_DATA) || /* not waiting for data */ |
821 | 0 | (!co_data(oc) && !sc_ep_have_ff_data(sc))) /* called with nothing to send ! */ |
822 | 0 | return; |
823 | | |
824 | | /* Otherwise there are remaining data to be sent in the buffer, |
825 | | * so we tell the handler. |
826 | | */ |
827 | 0 | sc_ep_clr(sc, SE_FL_WAIT_DATA); |
828 | 0 | if (!(sc->flags & SC_FL_DONT_WAKE)) |
829 | 0 | task_wakeup(sc_strm_task(sc), TASK_WOKEN_IO); |
830 | 0 | } |
831 | 0 | } |
832 | | |
833 | | /* This function is designed to be called from within the stream handler to |
834 | | * update the input channel's expiration timer and the stream connector's |
835 | | * Rx flags based on the channel's flags. It needs to be called only once |
836 | | * after the channel's flags have settled down, and before they are cleared, |
837 | | * though it doesn't harm to call it as often as desired (it just slightly |
838 | | * hurts performance). It must not be called from outside of the stream |
839 | | * handler, as what it does will be used to compute the stream task's |
840 | | * expiration. |
841 | | */ |
842 | | void sc_update_rx(struct stconn *sc) |
843 | 0 | { |
844 | 0 | struct channel *ic = sc_ic(sc); |
845 | |
|
846 | 0 | if (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) |
847 | 0 | return; |
848 | | |
849 | | /* Unblock the SC if it needs room and the free space is large enough (0 |
850 | | * means it can always be unblocked). Do not unblock it if -1 was |
851 | | * specified. |
852 | | */ |
853 | 0 | if (!sc->room_needed || (sc->room_needed > 0 && channel_recv_max(ic) >= sc->room_needed)) |
854 | 0 | sc_have_room(sc); |
855 | | |
856 | | /* Read not closed, update FD status and timeout for reads */ |
857 | 0 | if (ic->flags & CF_DONT_READ) |
858 | 0 | sc_wont_read(sc); |
859 | 0 | else |
860 | 0 | sc_will_read(sc); |
861 | |
|
862 | 0 | sc_chk_rcv(sc); |
863 | 0 | } |
864 | | |
865 | | /* This function is designed to be called from within the stream handler to |
866 | | * update the output channel's expiration timer and the stream connector's |
867 | | * Tx flags based on the channel's flags. It needs to be called only once |
868 | | * after the channel's flags have settled down, and before they are cleared, |
869 | | * though it doesn't harm to call it as often as desired (it just slightly |
870 | | * hurts performance). It must not be called from outside of the stream |
871 | | * handler, as what it does will be used to compute the stream task's |
872 | | * expiration. |
873 | | */ |
874 | | void sc_update_tx(struct stconn *sc) |
875 | 0 | { |
876 | 0 | struct channel *oc = sc_oc(sc); |
877 | |
|
878 | 0 | if (sc->flags & SC_FL_SHUT_DONE) |
879 | 0 | return; |
880 | | |
881 | | /* Write not closed, update FD status and timeout for writes */ |
882 | 0 | if (!co_data(oc)) { |
883 | | /* stop writing */ |
884 | 0 | if (!sc_ep_test(sc, SE_FL_WAIT_DATA)) { |
885 | 0 | if ((sc->flags & SC_FL_SHUT_WANTED) == 0) |
886 | 0 | sc_ep_set(sc, SE_FL_WAIT_DATA); |
887 | 0 | } |
888 | 0 | return; |
889 | 0 | } |
890 | | |
891 | | /* (re)start writing */ |
892 | 0 | sc_ep_clr(sc, SE_FL_WAIT_DATA); |
893 | 0 | } |
894 | | |
895 | | /* This function is the equivalent to sc_update() except that it's |
896 | | * designed to be called from outside the stream handlers, typically the lower |
897 | | * layers (applets, connections) after I/O completion. After updating the stream |
898 | | * interface and timeouts, it will try to forward what can be forwarded, then to |
899 | | * wake the associated task up if an important event requires special handling. |
900 | | * It may update SE_FL_WAIT_DATA and/or SC_FL_NEED_ROOM, that the callers are |
901 | | * encouraged to watch to take appropriate action. |
902 | | * It should not be called from within the stream itself, sc_update() |
903 | | * is designed for this. Please do not statify this function, it's often |
904 | | * present in backtraces, it's useful to recognize it. |
905 | | */ |
906 | | void sc_notify(struct stconn *sc) |
907 | 0 | { |
908 | 0 | struct channel *ic = sc_ic(sc); |
909 | 0 | struct channel *oc = sc_oc(sc); |
910 | 0 | struct stconn *sco = sc_opposite(sc); |
911 | 0 | struct task *task = sc_strm_task(sc); |
912 | | |
913 | | /* process consumer side */ |
914 | 0 | if (!co_data(oc) && !sc_ep_have_ff_data(sc)) { |
915 | 0 | struct connection *conn = sc_conn(sc); |
916 | |
|
917 | 0 | if (((sc->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) == SC_FL_SHUT_WANTED) && |
918 | 0 | (sc->state == SC_ST_EST) && (!conn || !(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)))) |
919 | 0 | sc_shutdown(sc); |
920 | 0 | } |
921 | | |
922 | | /* indicate that we may be waiting for data from the output channel or |
923 | | * we're about to close and can't expect more data if SC_FL_SHUT_WANTED is there. |
924 | | */ |
925 | 0 | if (!(sc->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED))) |
926 | 0 | sc_ep_set(sc, SE_FL_WAIT_DATA); |
927 | 0 | else if ((sc->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) == SC_FL_SHUT_WANTED) |
928 | 0 | sc_ep_clr(sc, SE_FL_WAIT_DATA); |
929 | |
|
930 | 0 | if (oc->flags & CF_DONT_READ) |
931 | 0 | sc_wont_read(sco); |
932 | 0 | else |
933 | 0 | sc_will_read(sco); |
934 | | |
935 | | /* Notify the other side when we've injected data into the IC that |
936 | | * needs to be forwarded. We can do fast-forwarding as soon as there |
937 | | * are output data, but we avoid doing this if some of the data are |
938 | | * not yet scheduled for being forwarded, because it is very likely |
939 | | * that it will be done again immediately afterwards once the following |
940 | | * data are parsed (eg: HTTP chunking). We only clear SC_FL_NEED_ROOM |
941 | | * once we've emptied *some* of the output buffer, and not just when |
942 | | * there is available room, because applets are often forced to stop |
943 | | * before the buffer is full. We must not stop based on input data |
944 | | * alone because an HTTP parser might need more data to complete the |
945 | | * parsing. |
946 | | */ |
947 | 0 | if (sc_ep_have_ff_data(sc_opposite(sc)) || |
948 | 0 | (co_data(ic) && sc_ep_test(sco, SE_FL_WAIT_DATA) && |
949 | 0 | (!HAS_DATA_FILTERS(__sc_strm(sc), ic) || channel_input_data(ic) == 0) && |
950 | 0 | (!(sc->flags & SC_FL_SND_EXP_MORE) || channel_full(ic, co_data(ic)) || channel_input_data(ic) == 0))) { |
951 | 0 | int new_len, last_len; |
952 | |
|
953 | 0 | last_len = co_data(ic) + sc_ep_ff_data(sco); |
954 | 0 | sc_chk_snd(sco); |
955 | 0 | new_len = co_data(ic) + sc_ep_ff_data(sco); |
956 | | |
957 | | /* check if the consumer has freed some space either in the |
958 | | * buffer or in the pipe. |
959 | | */ |
960 | 0 | if (!sc->room_needed || (new_len < last_len && (sc->room_needed < 0 || channel_recv_max(ic) >= sc->room_needed))) |
961 | 0 | sc_have_room(sc); |
962 | 0 | } |
963 | |
|
964 | 0 | if (!(ic->flags & CF_DONT_READ)) |
965 | 0 | sc_will_read(sc); |
966 | |
|
967 | 0 | sc_chk_rcv(sc); |
968 | 0 | sc_chk_rcv(sco); |
969 | | |
970 | | /* wake the task up only when needed */ |
971 | 0 | if (/* changes on the production side that must be handled: |
972 | | * - An error on receipt: SC_FL_ERROR |
973 | | * - A read event: shutdown for reads (CF_READ_EVENT + EOS/ABRT_DONE) |
974 | | * end of input (CF_READ_EVENT + SC_FL_EOI) |
975 | | * data received and no fast-forwarding (CF_READ_EVENT + !to_forward) |
976 | | * read event while consumer side is not established (CF_READ_EVENT + sco->state != SC_ST_EST) |
977 | | */ |
978 | 0 | ((ic->flags & CF_READ_EVENT) && ((sc->flags & SC_FL_EOI) || (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) || !ic->to_forward || sco->state != SC_ST_EST)) || |
979 | 0 | (sc->flags & SC_FL_ERROR) || |
980 | | |
981 | | /* changes on the consumption side */ |
982 | 0 | sc_ep_test(sc, SE_FL_ERR_PENDING) || |
983 | 0 | ((oc->flags & CF_WRITE_EVENT) && |
984 | 0 | ((sc->state < SC_ST_EST) || |
985 | 0 | (sc->flags & SC_FL_SHUT_DONE) || |
986 | 0 | (((oc->flags & CF_WAKE_WRITE) || |
987 | 0 | (!(oc->flags & CF_AUTO_CLOSE) && |
988 | 0 | !(sc->flags & (SC_FL_SHUT_WANTED|SC_FL_SHUT_DONE)))) && |
989 | 0 | (sco->state != SC_ST_EST || |
990 | 0 | (!co_data(oc) && !oc->to_forward)))))) { |
991 | 0 | task_wakeup(task, TASK_WOKEN_IO); |
992 | 0 | } |
993 | 0 | else { |
994 | | /* Update expiration date for the task and requeue it if not already expired. |
995 | | * Only I/O timeouts are evaluated. The stream is responsible of others. |
996 | | */ |
997 | 0 | if (!tick_is_expired(task->expire, now_ms)) { |
998 | 0 | task->expire = tick_first(task->expire, sc_ep_rcv_ex(sc)); |
999 | 0 | task->expire = tick_first(task->expire, sc_ep_snd_ex(sc)); |
1000 | 0 | task->expire = tick_first(task->expire, sc_ep_rcv_ex(sco)); |
1001 | 0 | task->expire = tick_first(task->expire, sc_ep_snd_ex(sco)); |
1002 | |
|
1003 | 0 | BUG_ON(tick_is_expired(task->expire, now_ms)); |
1004 | 0 | task_queue(task); |
1005 | 0 | } |
1006 | 0 | } |
1007 | |
|
1008 | 0 | if (ic->flags & CF_READ_EVENT) |
1009 | 0 | sc->flags &= ~SC_FL_RCV_ONCE; |
1010 | 0 | } |
1011 | | |
1012 | | /* |
1013 | | * This function propagates an end-of-stream received on a socket-based connection. |
1014 | | * It updates the stream connector. If the stream connector has SC_FL_NOHALF, |
1015 | | * the close is also forwarded to the write side as an abort. |
1016 | | */ |
1017 | | static void sc_conn_eos(struct stconn *sc) |
1018 | 0 | { |
1019 | 0 | struct channel *ic = sc_ic(sc); |
1020 | |
|
1021 | 0 | BUG_ON(!sc_conn(sc)); |
1022 | |
|
1023 | 0 | if (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) |
1024 | 0 | return; |
1025 | 0 | sc->flags |= SC_FL_EOS; |
1026 | 0 | ic->flags |= CF_READ_EVENT; |
1027 | 0 | sc_ep_report_read_activity(sc); |
1028 | 0 | sc_report_term_evt(sc, (sc->flags & SC_FL_EOI ? strm_tevt_type_eos: strm_tevt_type_truncated_eos)); |
1029 | 0 | if (sc->state != SC_ST_EST) |
1030 | 0 | return; |
1031 | | |
1032 | 0 | if (sc->flags & SC_FL_SHUT_DONE) |
1033 | 0 | goto do_close; |
1034 | | |
1035 | 0 | if (sc_cond_forward_shut(sc)) { |
1036 | | /* we want to immediately forward this close to the write side */ |
1037 | | /* force flag on ssl to keep stream in cache */ |
1038 | 0 | goto do_close; |
1039 | 0 | } |
1040 | | |
1041 | | /* otherwise that's just a normal read shutdown */ |
1042 | 0 | return; |
1043 | | |
1044 | 0 | do_close: |
1045 | | /* OK we completely close the socket here just as if we went through sc_shut[rw]() */ |
1046 | 0 | se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_SILENT); |
1047 | |
|
1048 | 0 | sc->flags &= ~SC_FL_SHUT_WANTED; |
1049 | 0 | sc->flags |= SC_FL_SHUT_DONE; |
1050 | |
|
1051 | 0 | sc->state = SC_ST_DIS; |
1052 | 0 | if (sc->flags & SC_FL_ISBACK) |
1053 | 0 | __sc_strm(sc)->conn_exp = TICK_ETERNITY; |
1054 | 0 | return; |
1055 | 0 | } |
1056 | | |
1057 | | /* |
1058 | | * This is the callback which is called by the connection layer to receive data |
1059 | | * into the buffer from the connection. It iterates over the mux layer's |
1060 | | * rcv_buf function. Please do not statify this function, it's often present in |
1061 | | * backtraces, it's useful to recognize it. |
1062 | | */ |
1063 | | int sc_conn_recv(struct stconn *sc) |
1064 | 0 | { |
1065 | 0 | struct connection *conn = __sc_conn(sc); |
1066 | 0 | struct channel *ic = sc_ic(sc); |
1067 | 0 | int ret, max, cur_read = 0; |
1068 | 0 | int read_poll = MAX_READ_POLL_LOOPS; |
1069 | 0 | int flags = 0; |
1070 | | |
1071 | | /* If not established yet, do nothing. */ |
1072 | 0 | if (sc->state != SC_ST_EST) |
1073 | 0 | return 0; |
1074 | | |
1075 | | /* If another call to sc_conn_recv() failed, and we subscribed to |
1076 | | * recv events already, give up now. |
1077 | | */ |
1078 | 0 | if ((sc->wait_event.events & SUB_RETRY_RECV) || sc_waiting_room(sc)) |
1079 | 0 | return 0; |
1080 | | |
1081 | | /* maybe we were called immediately after an asynchronous abort */ |
1082 | 0 | if (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) |
1083 | 0 | return 1; |
1084 | | |
1085 | | /* we must wait because the mux is not installed yet */ |
1086 | 0 | if (!conn->mux) |
1087 | 0 | return 0; |
1088 | | |
1089 | | /* stop immediately on errors. Note that we DON'T want to stop on |
1090 | | * POLL_ERR, as the poller might report a write error while there |
1091 | | * are still data available in the recv buffer. This typically |
1092 | | * happens when we send too large a request to a backend server |
1093 | | * which rejects it before reading it all. |
1094 | | */ |
1095 | 0 | if (!sc_ep_test(sc, SE_FL_RCV_MORE)) { |
1096 | 0 | if (!conn_xprt_ready(conn)) |
1097 | 0 | return 0; |
1098 | 0 | if (sc_ep_test(sc, SE_FL_ERROR)) |
1099 | 0 | goto end_recv; |
1100 | 0 | } |
1101 | | |
1102 | | /* prepare to detect if the mux needs more room */ |
1103 | 0 | sc_ep_clr(sc, SE_FL_WANT_ROOM); |
1104 | |
|
1105 | 0 | channel_check_idletimer(ic); |
1106 | |
|
1107 | | #if defined(USE_LINUX_SPLICE) |
1108 | | /* Detect if the splicing is possible depending on the stream policy */ |
1109 | | if ((global.tune.options & GTUNE_USE_SPLICE) && |
1110 | | (ic->to_forward >= MIN_SPLICE_FORWARD) && |
1111 | | ((!(sc->flags & SC_FL_ISBACK) && ((strm_fe(__sc_strm(sc))->options2|__sc_strm(sc)->be->options2) & PR_O2_SPLIC_REQ)) || |
1112 | | ((sc->flags & SC_FL_ISBACK) && ((strm_fe(__sc_strm(sc))->options2|__sc_strm(sc)->be->options2) & PR_O2_SPLIC_RTR)) || |
1113 | | ((ic->flags & CF_STREAMER_FAST) && ((strm_sess(__sc_strm(sc))->fe->options2|__sc_strm(sc)->be->options2) & PR_O2_SPLIC_AUT)))) |
1114 | | flags |= CO_RFL_MAY_SPLICE; |
1115 | | #endif |
1116 | | |
1117 | | /* First, let's see if we may fast-forward data from a side to the other |
1118 | | * one without using the channel buffer. |
1119 | | */ |
1120 | 0 | if (sc_is_fastfwd_supported(sc)) { |
1121 | 0 | if (channel_data(ic)) { |
1122 | | /* We're embarrassed, there are already data pending in |
1123 | | * the buffer and we don't want to have them at two |
1124 | | * locations at a time. Let's indicate we need some |
1125 | | * place and ask the consumer to hurry. |
1126 | | */ |
1127 | 0 | flags |= CO_RFL_BUF_FLUSH; |
1128 | 0 | goto abort_fastfwd; |
1129 | 0 | } |
1130 | 0 | sc_ep_fwd_kip(sc, sc_opposite(sc)); |
1131 | 0 | ret = CALL_MUX_WITH_RET(conn->mux, fastfwd(sc, ic->to_forward, flags)); |
1132 | 0 | if (ret < 0) |
1133 | 0 | goto abort_fastfwd; |
1134 | 0 | else if (ret > 0) { |
1135 | 0 | if (ic->to_forward != CHN_INFINITE_FORWARD) |
1136 | 0 | ic->to_forward -= ret; |
1137 | 0 | sc->bytes_in += ret; |
1138 | 0 | cur_read += ret; |
1139 | 0 | ic->flags |= CF_READ_EVENT; |
1140 | 0 | } |
1141 | | |
1142 | 0 | if (sc_ep_test(sc, SE_FL_EOS | SE_FL_ERROR)) |
1143 | 0 | goto end_recv; |
1144 | | |
1145 | 0 | if (sc_ep_test(sc, SE_FL_WANT_ROOM)) |
1146 | 0 | sc_need_room(sc, -1); |
1147 | |
|
1148 | 0 | if (sc_ep_test(sc, SE_FL_MAY_FASTFWD_PROD) && ic->to_forward) |
1149 | 0 | goto done_recv; |
1150 | 0 | } |
1151 | | |
1152 | 0 | abort_fastfwd: |
1153 | | /* now we'll need a input buffer for the stream */ |
1154 | 0 | if (!sc_alloc_ibuf(sc, &(__sc_strm(sc)->buffer_wait))) |
1155 | 0 | goto end_recv; |
1156 | | |
1157 | | /* For an HTX stream, if the buffer is stuck (no output data with some |
1158 | | * input data) and if the HTX message is fragmented or if its free space |
1159 | | * wraps, we force an HTX deframentation. It is a way to have a |
1160 | | * contiguous free space nad to let the mux to copy as much data as |
1161 | | * possible. |
1162 | | * |
1163 | | * NOTE: A possible optim may be to let the mux decides if defrag is |
1164 | | * required or not, depending on amount of data to be xferred. |
1165 | | */ |
1166 | 0 | if (IS_HTX_STRM(__sc_strm(sc)) && !co_data(ic)) { |
1167 | 0 | struct htx *htx = htxbuf(&ic->buf); |
1168 | |
|
1169 | 0 | if (htx_is_not_empty(htx) && ((htx->flags & HTX_FL_FRAGMENTED) || htx_space_wraps(htx))) |
1170 | 0 | htx_defrag(htx, NULL, 0); |
1171 | 0 | } |
1172 | | |
1173 | | /* Instruct the mux it must subscribed for read events */ |
1174 | 0 | if (!(sc->flags & SC_FL_ISBACK) && /* for frontend conns only */ |
1175 | 0 | (sc_opposite(sc)->state != SC_ST_INI) && /* before backend connection setup */ |
1176 | 0 | proxy_abrt_close(__sc_strm(sc)->be)) /* if abortonclose option is set for the current backend */ |
1177 | 0 | flags |= CO_RFL_KEEP_RECV; |
1178 | | |
1179 | | /* Important note : if we're called with POLL_IN|POLL_HUP, it means the read polling |
1180 | | * was enabled, which implies that the recv buffer was not full. So we have a guarantee |
1181 | | * that if such an event is not handled above in splice, it will be handled here by |
1182 | | * recv(). |
1183 | | */ |
1184 | 0 | while (sc_ep_test(sc, SE_FL_RCV_MORE) || |
1185 | 0 | (!(conn->flags & CO_FL_HANDSHAKE) && |
1186 | 0 | (!sc_ep_test(sc, SE_FL_ERROR | SE_FL_EOS)) && !(sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)))) { |
1187 | 0 | int cur_flags = flags; |
1188 | | |
1189 | | /* Compute transient CO_RFL_* flags */ |
1190 | 0 | if (co_data(ic)) { |
1191 | 0 | cur_flags |= (CO_RFL_BUF_WET | CO_RFL_BUF_NOT_STUCK); |
1192 | 0 | } |
1193 | | |
1194 | | /* <max> may be null. This is the mux responsibility to set |
1195 | | * SE_FL_RCV_MORE on the SC if more space is needed. |
1196 | | */ |
1197 | 0 | max = channel_recv_max(ic); |
1198 | 0 | if (b_is_small(sc_ib(sc)) || ((ic->flags & CF_WROTE_DATA) && b_is_large(sc_ib(sc)))) |
1199 | 0 | max = 0; |
1200 | 0 | ret = CALL_MUX_WITH_RET(conn->mux, rcv_buf(sc, &ic->buf, max, cur_flags)); |
1201 | |
|
1202 | 0 | if (sc_ep_test(sc, SE_FL_WANT_ROOM)) { |
1203 | | /* SE_FL_WANT_ROOM must not be reported if the channel's |
1204 | | * buffer is empty. |
1205 | | */ |
1206 | 0 | BUG_ON(c_empty(ic)); |
1207 | |
|
1208 | 0 | sc_need_room(sc, channel_recv_max(ic) + 1); |
1209 | | /* Add READ_PARTIAL because some data are pending but |
1210 | | * cannot be xferred to the channel |
1211 | | */ |
1212 | 0 | ic->flags |= CF_READ_EVENT; |
1213 | 0 | sc_ep_report_read_activity(sc); |
1214 | 0 | } |
1215 | |
|
1216 | 0 | if (ret <= 0) { |
1217 | | /* if we refrained from reading because we asked for a |
1218 | | * flush to satisfy rcv_pipe(), we must not subscribe |
1219 | | * and instead report that there's not enough room |
1220 | | * here to proceed. |
1221 | | */ |
1222 | 0 | if (flags & CO_RFL_BUF_FLUSH) |
1223 | 0 | sc_need_room(sc, -1); |
1224 | 0 | break; |
1225 | 0 | } |
1226 | | |
1227 | 0 | cur_read += ret; |
1228 | | |
1229 | | /* if we're allowed to directly forward data, we must update ->o */ |
1230 | 0 | if (ic->to_forward && !(sc_opposite(sc)->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED))) { |
1231 | 0 | unsigned long fwd = ret; |
1232 | 0 | if (ic->to_forward != CHN_INFINITE_FORWARD) { |
1233 | 0 | if (fwd > ic->to_forward) |
1234 | 0 | fwd = ic->to_forward; |
1235 | 0 | ic->to_forward -= fwd; |
1236 | 0 | } |
1237 | 0 | c_adv(ic, fwd); |
1238 | 0 | } |
1239 | |
|
1240 | 0 | ic->flags |= CF_READ_EVENT; |
1241 | 0 | sc->bytes_in += ret; |
1242 | | |
1243 | | /* End-of-input reached, we can leave. In this case, it is |
1244 | | * important to break the loop to not block the SC because of |
1245 | | * the channel's policies.This way, we are still able to receive |
1246 | | * shutdowns. |
1247 | | */ |
1248 | 0 | if (sc_ep_test(sc, SE_FL_EOI)) |
1249 | 0 | break; |
1250 | | |
1251 | 0 | if ((sc->flags & SC_FL_RCV_ONCE) || --read_poll <= 0) { |
1252 | | /* we don't expect to read more data */ |
1253 | 0 | sc_wont_read(sc); |
1254 | 0 | break; |
1255 | 0 | } |
1256 | | |
1257 | | /* if too many bytes were missing from last read, it means that |
1258 | | * it's pointless trying to read again because the system does |
1259 | | * not have them in buffers. |
1260 | | */ |
1261 | 0 | if (ret < max) { |
1262 | | /* if a streamer has read few data, it may be because we |
1263 | | * have exhausted system buffers. It's not worth trying |
1264 | | * again. |
1265 | | */ |
1266 | 0 | if (ic->flags & CF_STREAMER) { |
1267 | | /* we're stopped by the channel's policy */ |
1268 | 0 | sc_wont_read(sc); |
1269 | 0 | break; |
1270 | 0 | } |
1271 | | |
1272 | | /* if we read a large block smaller than what we requested, |
1273 | | * it's almost certain we'll never get anything more. |
1274 | | */ |
1275 | 0 | if (ret >= global.tune.recv_enough) { |
1276 | | /* we're stopped by the channel's policy */ |
1277 | 0 | sc_wont_read(sc); |
1278 | 0 | break; |
1279 | 0 | } |
1280 | 0 | } |
1281 | | |
1282 | | /* if we are waiting for more space, don't try to read more data |
1283 | | * right now. |
1284 | | */ |
1285 | 0 | if (sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM)) |
1286 | 0 | break; |
1287 | 0 | } /* while !flags */ |
1288 | | |
1289 | 0 | done_recv: |
1290 | 0 | if (!cur_read) |
1291 | 0 | se_have_no_more_data(sc->sedesc); |
1292 | 0 | else { |
1293 | 0 | channel_check_xfer(ic, cur_read); |
1294 | 0 | sc_ep_report_read_activity(sc); |
1295 | 0 | } |
1296 | |
|
1297 | 0 | end_recv: |
1298 | 0 | ret = (cur_read != 0); |
1299 | | |
1300 | | /* Report EOI on the channel if it was reached from the mux point of |
1301 | | * view. */ |
1302 | 0 | if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) { |
1303 | 0 | sc_ep_report_read_activity(sc); |
1304 | 0 | sc->flags |= SC_FL_EOI; |
1305 | 0 | ic->flags |= CF_READ_EVENT; |
1306 | 0 | ret = 1; |
1307 | 0 | } |
1308 | |
|
1309 | 0 | if (sc_ep_test(sc, SE_FL_EOS)) { |
1310 | | /* we received a shutdown */ |
1311 | 0 | if (ic->flags & CF_AUTO_CLOSE) |
1312 | 0 | sc_schedule_shutdown(sc_opposite(sc)); |
1313 | 0 | sc_conn_eos(sc); |
1314 | 0 | ret = 1; |
1315 | 0 | } |
1316 | 0 | if (sc_ep_test(sc, SE_FL_ERROR)) { |
1317 | 0 | sc->flags |= SC_FL_ERROR; |
1318 | 0 | if (!(sc->flags & SC_FL_EOS)) |
1319 | 0 | sc_report_term_evt(sc, (sc->flags & SC_FL_EOI ? strm_tevt_type_rcv_err: strm_tevt_type_truncated_rcv_err)); |
1320 | 0 | ret = 1; |
1321 | 0 | } |
1322 | | |
1323 | | /* Ensure sc_conn_process() is called if waiting on handshake. */ |
1324 | 0 | if (!(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)) && |
1325 | 0 | sc_ep_test(sc, SE_FL_WAIT_FOR_HS)) { |
1326 | 0 | ret = 1; |
1327 | 0 | } |
1328 | |
|
1329 | 0 | if (sc->flags & (SC_FL_EOS|SC_FL_ERROR)) { |
1330 | | /* No more data are expected at this stage */ |
1331 | 0 | se_have_no_more_data(sc->sedesc); |
1332 | 0 | } |
1333 | 0 | else if (!cur_read && |
1334 | 0 | !(sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM)) && |
1335 | 0 | !(sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))) { |
1336 | | /* Subscribe to receive events if we're blocking on I/O. Nothing |
1337 | | * was received and it was not because of a blocking |
1338 | | * condition. |
1339 | | */ |
1340 | 0 | conn->mux->subscribe(sc, SUB_RETRY_RECV, &sc->wait_event); |
1341 | 0 | se_have_no_more_data(sc->sedesc); |
1342 | 0 | } |
1343 | 0 | else if (sc->flags & SC_FL_EOI) { |
1344 | | /* No more data are expected at this stage, except if abortonclose is enabled */ |
1345 | 0 | if (!(flags & CO_RFL_KEEP_RECV)) |
1346 | 0 | se_have_no_more_data(sc->sedesc); |
1347 | 0 | else |
1348 | 0 | se_have_more_data(sc->sedesc); |
1349 | 0 | } |
1350 | 0 | else { |
1351 | | /* The mux may have more data to deliver. Be sure to be able to |
1352 | | * ask it ASAP |
1353 | | */ |
1354 | 0 | se_have_more_data(sc->sedesc); |
1355 | 0 | ret = 1; |
1356 | 0 | } |
1357 | |
|
1358 | 0 | return ret; |
1359 | 0 | } |
1360 | | |
1361 | | /* This tries to perform a synchronous receive on the stream connector to |
1362 | | * try to collect last arrived data. In practice it's only implemented on |
1363 | | * stconns. Returns 0 if nothing was done, non-zero if new data or a |
1364 | | * shutdown were collected. This may result on some delayed receive calls |
1365 | | * to be programmed and performed later, though it doesn't provide any |
1366 | | * such guarantee. |
1367 | | */ |
1368 | | int sc_conn_sync_recv(struct stconn *sc) |
1369 | 0 | { |
1370 | 0 | if (!sc_state_in(sc->state, SC_SB_RDY|SC_SB_EST)) |
1371 | 0 | return 0; |
1372 | | |
1373 | 0 | if (!sc_mux_ops(sc)) |
1374 | 0 | return 0; // only stconns are supported |
1375 | | |
1376 | 0 | if (sc->wait_event.events & SUB_RETRY_RECV) |
1377 | 0 | return 0; // already subscribed |
1378 | | |
1379 | 0 | if (!sc_is_recv_allowed(sc)) |
1380 | 0 | return 0; // already failed |
1381 | | |
1382 | 0 | return sc_conn_recv(sc); |
1383 | 0 | } |
1384 | | |
1385 | | /* |
1386 | | * This function is called to send buffer data to a stream socket. |
1387 | | * It calls the mux layer's snd_buf function. It relies on the |
1388 | | * caller to commit polling changes. The caller should check conn->flags |
1389 | | * for errors. Please do not statify this function, it's often present in |
1390 | | * backtraces, it's useful to recognize it. |
1391 | | */ |
1392 | | int sc_conn_send(struct stconn *sc) |
1393 | 0 | { |
1394 | 0 | struct connection *conn = __sc_conn(sc); |
1395 | 0 | struct stconn *sco = sc_opposite(sc); |
1396 | 0 | struct stream *s = __sc_strm(sc); |
1397 | 0 | struct channel *oc = sc_oc(sc); |
1398 | 0 | int ret; |
1399 | 0 | int did_send = 0; |
1400 | |
|
1401 | 0 | if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING) || sc_is_conn_error(sc)) { |
1402 | | /* We're probably there because the tasklet was woken up, |
1403 | | * but process_stream() ran before, detected there were an |
1404 | | * error and put the SC back to SC_ST_TAR. There's still |
1405 | | * CO_FL_ERROR on the connection but we don't want to add |
1406 | | * SE_FL_ERROR back, so give up |
1407 | | */ |
1408 | 0 | if (sc->state < SC_ST_CON) |
1409 | 0 | return 0; |
1410 | 0 | BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING)); |
1411 | 0 | if (sc_ep_test(sc, SE_FL_ERROR)) |
1412 | 0 | sc->flags |= SC_FL_ERROR; |
1413 | 0 | if (co_data(oc) || sc_ep_have_ff_data(sc)) |
1414 | 0 | sc_ep_report_blocked_send(sc, 0); |
1415 | 0 | return 1; |
1416 | 0 | } |
1417 | | |
1418 | | /* We're already waiting to be able to send, give up */ |
1419 | 0 | if (sc->wait_event.events & SUB_RETRY_SEND) |
1420 | 0 | return 0; |
1421 | | |
1422 | | /* we might have been called just after an asynchronous shutw */ |
1423 | 0 | if (sc->flags & SC_FL_SHUT_DONE) |
1424 | 0 | return 1; |
1425 | | |
1426 | | /* we must wait because the mux is not installed yet */ |
1427 | 0 | if (!conn->mux) |
1428 | 0 | return 0; |
1429 | | |
1430 | 0 | sc_ep_fwd_kip(sco, sc); |
1431 | |
|
1432 | 0 | if (sc_ep_have_ff_data(sc)) { |
1433 | 0 | unsigned int send_flag = 0; |
1434 | |
|
1435 | 0 | if ((!(sc->flags & (SC_FL_SND_ASAP|SC_FL_SND_NEVERWAIT)) && |
1436 | 0 | ((oc->to_forward && oc->to_forward != CHN_INFINITE_FORWARD) || |
1437 | 0 | (sc->flags & SC_FL_SND_EXP_MORE) || |
1438 | 0 | (IS_HTX_STRM(s) && |
1439 | 0 | (!(sco->flags & (SC_FL_EOI|SC_FL_EOS|SC_FL_ABRT_DONE)) && htx_expect_more(htxbuf(&oc->buf)))))) || |
1440 | 0 | ((oc->flags & CF_ISRESP) && |
1441 | 0 | (oc->flags & CF_AUTO_CLOSE) && |
1442 | 0 | (sc->flags & SC_FL_SHUT_WANTED))) |
1443 | 0 | send_flag |= CO_SFL_MSG_MORE; |
1444 | |
|
1445 | 0 | if (oc->flags & CF_STREAMER) |
1446 | 0 | send_flag |= CO_SFL_STREAMER; |
1447 | |
|
1448 | 0 | ret = CALL_MUX_WITH_RET(conn->mux, resume_fastfwd(sc, send_flag)); |
1449 | 0 | if (ret > 0) { |
1450 | 0 | sc->bytes_out += ret; |
1451 | 0 | did_send = 1; |
1452 | 0 | } |
1453 | |
|
1454 | 0 | if (sc_ep_have_ff_data(sc)) |
1455 | 0 | goto end; |
1456 | 0 | } |
1457 | | |
1458 | | /* At this point, the pipe is empty, but we may still have data pending |
1459 | | * in the normal buffer. |
1460 | | */ |
1461 | 0 | if (co_data(oc)) { |
1462 | | /* when we're here, we already know that there is no spliced |
1463 | | * data left, and that there are sendable buffered data. |
1464 | | */ |
1465 | | |
1466 | | /* check if we want to inform the kernel that we're interested in |
1467 | | * sending more data after this call. We want this if : |
1468 | | * - we're about to close after this last send and want to merge |
1469 | | * the ongoing FIN with the last segment. |
1470 | | * - we know we can't send everything at once and must get back |
1471 | | * here because of unaligned data |
1472 | | * - there is still a finite amount of data to forward |
1473 | | * The test is arranged so that the most common case does only 2 |
1474 | | * tests. |
1475 | | */ |
1476 | 0 | unsigned int send_flag = 0; |
1477 | |
|
1478 | 0 | if ((!(sc->flags & (SC_FL_SND_ASAP|SC_FL_SND_NEVERWAIT)) && |
1479 | 0 | ((oc->to_forward && oc->to_forward != CHN_INFINITE_FORWARD) || |
1480 | 0 | (sc->flags & SC_FL_SND_EXP_MORE) || |
1481 | 0 | (IS_HTX_STRM(s) && |
1482 | 0 | (!(sco->flags & (SC_FL_EOI|SC_FL_EOS|SC_FL_ABRT_DONE)) && htx_expect_more(htxbuf(&oc->buf)))))) || |
1483 | 0 | ((oc->flags & CF_ISRESP) && |
1484 | 0 | (oc->flags & CF_AUTO_CLOSE) && |
1485 | 0 | (sc->flags & SC_FL_SHUT_WANTED))) |
1486 | 0 | send_flag |= CO_SFL_MSG_MORE; |
1487 | |
|
1488 | 0 | if (oc->flags & CF_STREAMER) |
1489 | 0 | send_flag |= CO_SFL_STREAMER; |
1490 | |
|
1491 | 0 | if (s->txn && s->txn->flags & TX_L7_RETRY && !b_data(&s->txn->l7_buffer)) { |
1492 | | /* If we want to be able to do L7 retries, copy |
1493 | | * the data we're about to send, so that we are able |
1494 | | * to resend them if needed |
1495 | | */ |
1496 | | /* Try to allocate a buffer if we had none. |
1497 | | * If it fails, the next test will just |
1498 | | * disable the l7 retries by setting |
1499 | | * l7_conn_retries to 0. |
1500 | | */ |
1501 | 0 | if (s->txn->req.msg_state != HTTP_MSG_DONE || b_is_large(&oc->buf)) |
1502 | 0 | s->txn->flags &= ~TX_L7_RETRY; |
1503 | 0 | else { |
1504 | 0 | if (!(s->be->options2 & PR_O2_USE_SBUF_L7_RETRY) || |
1505 | 0 | !htx_copy_to_small_buffer(&s->txn->l7_buffer, &oc->buf)) { |
1506 | 0 | if (b_alloc(&s->txn->l7_buffer, DB_UNLIKELY) == NULL) |
1507 | 0 | s->txn->flags &= ~TX_L7_RETRY; |
1508 | 0 | else { |
1509 | 0 | memcpy(b_orig(&s->txn->l7_buffer), |
1510 | 0 | b_orig(&oc->buf), |
1511 | 0 | b_size(&oc->buf)); |
1512 | 0 | } |
1513 | 0 | } |
1514 | |
|
1515 | 0 | if (s->txn->flags & TX_L7_RETRY) { |
1516 | 0 | s->txn->l7_buffer.head = co_data(oc); |
1517 | 0 | b_set_data(&s->txn->l7_buffer, co_data(oc)); |
1518 | 0 | } |
1519 | 0 | } |
1520 | 0 | } |
1521 | |
|
1522 | 0 | if ((sc->flags & SC_FL_SHUT_WANTED) && co_data(oc) == c_data(oc)) |
1523 | 0 | send_flag |= CO_SFL_LAST_DATA; |
1524 | |
|
1525 | 0 | ret = CALL_MUX_WITH_RET(conn->mux, snd_buf(sc, &oc->buf, co_data(oc), send_flag)); |
1526 | 0 | if (ret > 0) { |
1527 | 0 | did_send = 1; |
1528 | 0 | c_rew(oc, ret); |
1529 | 0 | c_realign_if_empty(oc); |
1530 | 0 | sc->bytes_out += ret; |
1531 | 0 | if (!co_data(oc)) { |
1532 | | /* Always clear both flags once everything has been sent, they're one-shot */ |
1533 | 0 | sc->flags &= ~(SC_FL_SND_ASAP|SC_FL_SND_EXP_MORE); |
1534 | 0 | } |
1535 | | /* if some data remain in the buffer, it's only because the |
1536 | | * system buffers are full, we will try next time. |
1537 | | */ |
1538 | 0 | } |
1539 | 0 | } |
1540 | |
|
1541 | 0 | end: |
1542 | 0 | if (did_send) { |
1543 | 0 | oc->flags |= CF_WRITE_EVENT | CF_WROTE_DATA; |
1544 | 0 | if (sc->state == SC_ST_CON) |
1545 | 0 | sc->state = SC_ST_RDY; |
1546 | 0 | } |
1547 | |
|
1548 | 0 | if (!sco->room_needed || (did_send && (sco->room_needed < 0 || channel_recv_max(sc_oc(sc)) >= sco->room_needed))) |
1549 | 0 | sc_have_room(sco); |
1550 | |
|
1551 | 0 | if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) { |
1552 | 0 | oc->flags |= CF_WRITE_EVENT; |
1553 | 0 | BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING)); |
1554 | 0 | sc_report_term_evt(sc, strm_tevt_type_snd_err); |
1555 | 0 | if (sc_ep_test(sc, SE_FL_ERROR)) |
1556 | 0 | sc->flags |= SC_FL_ERROR; |
1557 | 0 | return 1; |
1558 | 0 | } |
1559 | | |
1560 | | /* FIXME: Must be reviewed for FF */ |
1561 | 0 | if (!co_data(oc) && !sc_ep_have_ff_data(sc)) { |
1562 | 0 | if (did_send) |
1563 | 0 | sc_ep_report_send_activity(sc); |
1564 | | /* If fast-forwarding is blocked, unblock it now to check for |
1565 | | * receive on the other side |
1566 | | */ |
1567 | 0 | if (sc->sedesc->iobuf.flags & IOBUF_FL_FF_BLOCKED) { |
1568 | 0 | sc->sedesc->iobuf.flags &= ~IOBUF_FL_FF_BLOCKED; |
1569 | 0 | sc_have_room(sco); |
1570 | 0 | did_send = 1; |
1571 | 0 | } |
1572 | 0 | } |
1573 | 0 | else { |
1574 | | /* We couldn't send all of our data, let the mux know we'd like to send more */ |
1575 | 0 | conn->mux->subscribe(sc, SUB_RETRY_SEND, &sc->wait_event); |
1576 | 0 | if (sc_state_in(sc->state, SC_SB_EST|SC_SB_DIS|SC_SB_CLO)) |
1577 | 0 | sc_ep_report_blocked_send(sc, did_send); |
1578 | 0 | } |
1579 | |
|
1580 | 0 | return did_send; |
1581 | 0 | } |
1582 | | |
1583 | | /* perform a synchronous send() for the stream connector. The CF_WRITE_EVENT |
1584 | | * flag are cleared prior to the attempt, and will possibly be updated in case |
1585 | | * of success. |
1586 | | */ |
1587 | | int sc_conn_sync_send(struct stconn *sc) |
1588 | 0 | { |
1589 | 0 | struct channel *oc = sc_oc(sc); |
1590 | 0 | int did_send = 0; |
1591 | |
|
1592 | 0 | oc->flags &= ~CF_WRITE_EVENT; |
1593 | |
|
1594 | 0 | if (sc->flags & SC_FL_SHUT_DONE) |
1595 | 0 | goto end; |
1596 | | |
1597 | 0 | if (!co_data(oc)) |
1598 | 0 | goto end; |
1599 | | |
1600 | 0 | if (!sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST)) |
1601 | 0 | goto end; |
1602 | | |
1603 | 0 | if (!sc_mux_ops(sc)) |
1604 | 0 | goto end; |
1605 | | |
1606 | 0 | did_send = sc_conn_send(sc); |
1607 | 0 | if (oc->flags & CF_WRITE_EVENT) |
1608 | 0 | oc->flags |= CF_WAKE_ONCE; |
1609 | 0 | end: |
1610 | 0 | return did_send; |
1611 | 0 | } |
1612 | | |
1613 | | /* Called by I/O handlers after completion.. It propagates |
1614 | | * connection flags to the stream connector, updates the stream (which may or |
1615 | | * may not take this opportunity to try to forward data), then update the |
1616 | | * connection's polling based on the channels and stream connector's final |
1617 | | * states. The function always returns 0. Please do not statify this function, |
1618 | | * it's often present in backtraces, it's useful to recognize it. |
1619 | | */ |
1620 | | int sc_conn_process(struct stconn *sc) |
1621 | 0 | { |
1622 | 0 | struct connection *conn = __sc_conn(sc); |
1623 | 0 | struct channel *ic = sc_ic(sc); |
1624 | 0 | struct channel *oc = sc_oc(sc); |
1625 | |
|
1626 | 0 | BUG_ON(!conn); |
1627 | | |
1628 | | /* If we have data to send, try it now */ |
1629 | 0 | if ((co_data(oc) || sc_ep_have_ff_data(sc)) && |
1630 | 0 | !(sc->wait_event.events & SUB_RETRY_SEND)) |
1631 | 0 | sc_conn_send(sc); |
1632 | | |
1633 | | /* First step, report to the stream connector what was detected at the |
1634 | | * connection layer : errors and connection establishment. |
1635 | | * Only add SC_FL_ERROR if we're connected, or we're attempting to |
1636 | | * connect, we may get there because we got woken up, but only run |
1637 | | * after process_stream() noticed there were an error, and decided |
1638 | | * to retry to connect, the connection may still have CO_FL_ERROR, |
1639 | | * and we don't want to add SC_FL_ERROR back |
1640 | | * |
1641 | | * Note: This test is only required because sc_conn_process is also the SI |
1642 | | * wake callback. Otherwise sc_conn_recv()/sc_conn_send() already take |
1643 | | * care of it. |
1644 | | */ |
1645 | |
|
1646 | 0 | if (sc->state >= SC_ST_CON) { |
1647 | 0 | if (sc_is_conn_error(sc)) |
1648 | 0 | sc->flags |= SC_FL_ERROR; |
1649 | 0 | } |
1650 | | |
1651 | | /* If we had early data, and the handshake ended, then |
1652 | | * we can remove the flag, and attempt to wake the task up, |
1653 | | * in the event there's an analyser waiting for the end of |
1654 | | * the handshake. |
1655 | | */ |
1656 | 0 | if (!(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)) && |
1657 | 0 | sc_ep_test(sc, SE_FL_WAIT_FOR_HS)) { |
1658 | 0 | sc_ep_clr(sc, SE_FL_WAIT_FOR_HS); |
1659 | 0 | task_wakeup(sc_strm_task(sc), TASK_WOKEN_MSG); |
1660 | 0 | } |
1661 | |
|
1662 | 0 | if (!sc_state_in(sc->state, SC_SB_EST|SC_SB_DIS|SC_SB_CLO) && |
1663 | 0 | (conn->flags & CO_FL_WAIT_XPRT) == 0) { |
1664 | 0 | if (sc->flags & SC_FL_ISBACK) |
1665 | 0 | __sc_strm(sc)->conn_exp = TICK_ETERNITY; |
1666 | 0 | oc->flags |= CF_WRITE_EVENT; |
1667 | 0 | if (sc->state == SC_ST_CON) |
1668 | 0 | sc->state = SC_ST_RDY; |
1669 | 0 | } |
1670 | | |
1671 | | /* Report EOI on the channel if it was reached from the mux point of |
1672 | | * view. |
1673 | | * |
1674 | | * Note: This test is only required because sc_conn_process is also the SI |
1675 | | * wake callback. Otherwise sc_conn_recv()/sc_conn_send() already take |
1676 | | * care of it. |
1677 | | */ |
1678 | 0 | if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) { |
1679 | 0 | sc->flags |= SC_FL_EOI; |
1680 | 0 | ic->flags |= CF_READ_EVENT; |
1681 | 0 | sc_ep_report_read_activity(sc); |
1682 | 0 | } |
1683 | | |
1684 | | /* Report EOS on the channel if it was reached from the mux point of |
1685 | | * view. |
1686 | | * |
1687 | | * Note: This test is only required because sc_conn_process is also the SI |
1688 | | * wake callback. Otherwise sc_conn_recv()/sc_conn_send() already take |
1689 | | * care of it. |
1690 | | */ |
1691 | 0 | if (sc_ep_test(sc, SE_FL_EOS) && !(sc->flags & SC_FL_EOS)) { |
1692 | | /* we received a shutdown */ |
1693 | 0 | if (ic->flags & CF_AUTO_CLOSE) |
1694 | 0 | sc_schedule_shutdown(sc_opposite(sc)); |
1695 | 0 | sc_conn_eos(sc); |
1696 | 0 | } |
1697 | |
|
1698 | 0 | if (sc_ep_test(sc, SE_FL_ERROR) && !(sc->flags & SC_FL_ERROR)) { |
1699 | 0 | if (!(sc->flags & SC_FL_EOS)) |
1700 | 0 | sc_report_term_evt(sc, (sc->flags & SC_FL_EOI ? strm_tevt_type_rcv_err: strm_tevt_type_truncated_rcv_err)); |
1701 | 0 | sc->flags |= SC_FL_ERROR; |
1702 | 0 | } |
1703 | | |
1704 | | /* Second step : update the stream connector and channels, try to forward any |
1705 | | * pending data, then possibly wake the stream up based on the new |
1706 | | * stream connector status. |
1707 | | */ |
1708 | 0 | sc_notify(sc); |
1709 | 0 | stream_release_buffers(__sc_strm(sc)); |
1710 | 0 | return 0; |
1711 | 0 | } |
1712 | | |
1713 | | /* This is the ->process() function for any stream connector's wait_event task. |
1714 | | * It's assigned during the stream connector's initialization, for any type of |
1715 | | * stream connector. Thus it is always safe to perform a tasklet_wakeup() on a |
1716 | | * stream connector, as the presence of the SC is checked there. |
1717 | | */ |
1718 | | struct task *sc_conn_io_cb(struct task *t, void *ctx, unsigned int state) |
1719 | 0 | { |
1720 | 0 | struct stconn *sc = ctx; |
1721 | 0 | int ret = 0; |
1722 | |
|
1723 | 0 | if (!sc_conn(sc)) |
1724 | 0 | return t; |
1725 | | |
1726 | 0 | if (!(sc->wait_event.events & SUB_RETRY_SEND) && (co_data(sc_oc(sc)) || sc_ep_have_ff_data(sc) || (sc->sedesc->iobuf.flags & IOBUF_FL_FF_BLOCKED))) |
1727 | 0 | ret = sc_conn_send(sc); |
1728 | 0 | if (!(sc->wait_event.events & SUB_RETRY_RECV)) |
1729 | 0 | ret |= sc_conn_recv(sc); |
1730 | 0 | if (ret != 0 || (state & TASK_WOKEN_MSG)) |
1731 | 0 | sc_conn_process(sc); |
1732 | |
|
1733 | 0 | stream_release_buffers(__sc_strm(sc)); |
1734 | 0 | return t; |
1735 | 0 | } |
1736 | | |
1737 | | /* |
1738 | | * This function propagates an end-of-stream received from an applet. It |
1739 | | * updates the stream connector. If it is is already shut, the applet is |
1740 | | * released. Otherwise, we try to forward the shutdown, immediately or ASAP. |
1741 | | */ |
1742 | | static void sc_applet_eos(struct stconn *sc) |
1743 | 0 | { |
1744 | 0 | struct channel *ic = sc_ic(sc); |
1745 | |
|
1746 | 0 | BUG_ON(!sc_appctx(sc)); |
1747 | |
|
1748 | 0 | if (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) |
1749 | 0 | return; |
1750 | 0 | sc->flags |= SC_FL_EOS; |
1751 | 0 | ic->flags |= CF_READ_EVENT; |
1752 | 0 | sc_ep_report_read_activity(sc); |
1753 | 0 | sc_report_term_evt(sc, (sc->flags & SC_FL_EOI ? strm_tevt_type_eos: strm_tevt_type_truncated_eos)); |
1754 | | |
1755 | | /* Note: on abort, we don't call the applet */ |
1756 | |
|
1757 | 0 | if (sc->state != SC_ST_EST) |
1758 | 0 | return; |
1759 | | |
1760 | 0 | if (sc->flags & SC_FL_SHUT_DONE) { |
1761 | 0 | se_shutdown(sc->sedesc, SE_SHR_RESET|SE_SHW_NORMAL); |
1762 | 0 | sc->state = SC_ST_DIS; |
1763 | 0 | if (sc->flags & SC_FL_ISBACK) |
1764 | 0 | __sc_strm(sc)->conn_exp = TICK_ETERNITY; |
1765 | 0 | } |
1766 | 0 | else if (sc_cond_forward_shut(sc)) |
1767 | 0 | return sc_shutdown(sc); |
1768 | 0 | } |
1769 | | |
1770 | | /* |
1771 | | * This is the callback which is called by the applet layer to receive data into |
1772 | | * the buffer from the appctx. It iterates over the applet's rcv_buf |
1773 | | * function. Please do not statify this function, it's often present in |
1774 | | * backtraces, it's useful to recognize it. |
1775 | | */ |
1776 | | int sc_applet_recv(struct stconn *sc) |
1777 | 0 | { |
1778 | 0 | struct appctx *appctx = __sc_appctx(sc); |
1779 | 0 | struct channel *ic = sc_ic(sc); |
1780 | 0 | int ret, max, cur_read = 0; |
1781 | 0 | int read_poll = MAX_READ_POLL_LOOPS; |
1782 | 0 | int flags = 0; |
1783 | | |
1784 | | |
1785 | | /* If another call to sc_applet_recv() failed, give up now. |
1786 | | */ |
1787 | 0 | if (sc_waiting_room(sc)) |
1788 | 0 | return 0; |
1789 | | |
1790 | | /* maybe we were called immediately after an asynchronous abort */ |
1791 | 0 | if (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) |
1792 | 0 | return 1; |
1793 | | |
1794 | | /* We must wait because the applet is not fully initialized */ |
1795 | 0 | if (se_fl_test(sc->sedesc, SE_FL_ORPHAN)) |
1796 | 0 | return 0; |
1797 | | |
1798 | | /* stop immediately on errors. */ |
1799 | 0 | if (!sc_ep_test(sc, SE_FL_RCV_MORE)) { |
1800 | | // TODO: be sure SE_FL_RCV_MORE may be set for applet ? |
1801 | 0 | if (sc_ep_test(sc, SE_FL_ERROR)) |
1802 | 0 | goto end_recv; |
1803 | 0 | } |
1804 | | |
1805 | | /* prepare to detect if the mux needs more room */ |
1806 | 0 | sc_ep_clr(sc, SE_FL_WANT_ROOM); |
1807 | |
|
1808 | 0 | channel_check_idletimer(ic); |
1809 | | |
1810 | | /* First, let's see if we may fast-forward data from a side to the other |
1811 | | * one without using the channel buffer. |
1812 | | */ |
1813 | 0 | if (sc_is_fastfwd_supported(sc)) { |
1814 | 0 | if (channel_data(ic)) { |
1815 | | /* We're embarrassed, there are already data pending in |
1816 | | * the buffer and we don't want to have them at two |
1817 | | * locations at a time. Let's indicate we need some |
1818 | | * place and ask the consumer to hurry. |
1819 | | */ |
1820 | 0 | flags |= CO_RFL_BUF_FLUSH; |
1821 | 0 | goto abort_fastfwd; |
1822 | 0 | } |
1823 | 0 | sc_ep_fwd_kip(sc, sc_opposite(sc)); |
1824 | 0 | ret = appctx_fastfwd(sc, ic->to_forward, flags); |
1825 | 0 | if (ret < 0) |
1826 | 0 | goto abort_fastfwd; |
1827 | 0 | else if (ret > 0) { |
1828 | 0 | if (ic->to_forward != CHN_INFINITE_FORWARD) |
1829 | 0 | ic->to_forward -= ret; |
1830 | 0 | sc->bytes_in += ret; |
1831 | 0 | cur_read += ret; |
1832 | 0 | ic->flags |= CF_READ_EVENT; |
1833 | 0 | } |
1834 | | |
1835 | 0 | if (sc_ep_test(sc, SE_FL_EOS | SE_FL_ERROR)) |
1836 | 0 | goto end_recv; |
1837 | | |
1838 | 0 | if (sc_ep_test(sc, SE_FL_WANT_ROOM)) |
1839 | 0 | sc_need_room(sc, -1); |
1840 | |
|
1841 | 0 | if (sc_ep_test(sc, SE_FL_MAY_FASTFWD_PROD) && ic->to_forward) |
1842 | 0 | goto done_recv; |
1843 | 0 | } |
1844 | | |
1845 | 0 | abort_fastfwd: |
1846 | 0 | if (!sc_alloc_ibuf(sc, &appctx->buffer_wait)) |
1847 | 0 | goto end_recv; |
1848 | | |
1849 | | /* For an HTX stream, if the buffer is stuck (no output data with some |
1850 | | * input data) and if the HTX message is fragmented or if its free space |
1851 | | * wraps, we force an HTX deframentation. It is a way to have a |
1852 | | * contiguous free space nad to let the mux to copy as much data as |
1853 | | * possible. |
1854 | | * |
1855 | | * NOTE: A possible optim may be to let the mux decides if defrag is |
1856 | | * required or not, depending on amount of data to be xferred. |
1857 | | */ |
1858 | 0 | if (IS_HTX_STRM(__sc_strm(sc)) && !co_data(ic)) { |
1859 | 0 | struct htx *htx = htxbuf(&ic->buf); |
1860 | |
|
1861 | 0 | if (htx_is_not_empty(htx) && ((htx->flags & HTX_FL_FRAGMENTED) || htx_space_wraps(htx))) |
1862 | 0 | htx_defrag(htx, NULL, 0); |
1863 | 0 | } |
1864 | | |
1865 | | /* Compute transient CO_RFL_* flags */ |
1866 | 0 | if (co_data(ic)) { |
1867 | 0 | flags |= (CO_RFL_BUF_WET | CO_RFL_BUF_NOT_STUCK); |
1868 | 0 | } |
1869 | | |
1870 | | /* <max> may be null. This is the mux responsibility to set |
1871 | | * SE_FL_RCV_MORE on the SC if more space is needed. |
1872 | | */ |
1873 | 0 | max = channel_recv_max(ic); |
1874 | 0 | if (b_is_small(sc_ib(sc)) || ((ic->flags & CF_WROTE_DATA) && b_is_large(sc_ib(sc)))) |
1875 | 0 | max = 0; |
1876 | 0 | ret = appctx_rcv_buf(sc, &ic->buf, max, flags); |
1877 | 0 | if (sc_ep_test(sc, SE_FL_WANT_ROOM)) { |
1878 | | /* SE_FL_WANT_ROOM must not be reported if the channel's |
1879 | | * buffer is empty. |
1880 | | */ |
1881 | 0 | BUG_ON(c_empty(ic)); |
1882 | |
|
1883 | 0 | sc_need_room(sc, channel_recv_max(ic) + 1); |
1884 | | /* Add READ_PARTIAL because some data are pending but |
1885 | | * cannot be xferred to the channel |
1886 | | */ |
1887 | 0 | ic->flags |= CF_READ_EVENT; |
1888 | 0 | sc_ep_report_read_activity(sc); |
1889 | 0 | } |
1890 | |
|
1891 | 0 | if (ret <= 0) { |
1892 | | /* if we refrained from reading because we asked for a flush to |
1893 | | * satisfy rcv_pipe(), report that there's not enough room here |
1894 | | * to proceed. |
1895 | | */ |
1896 | 0 | if (flags & CO_RFL_BUF_FLUSH) |
1897 | 0 | sc_need_room(sc, -1); |
1898 | 0 | goto done_recv; |
1899 | 0 | } |
1900 | | |
1901 | 0 | cur_read += ret; |
1902 | | |
1903 | | /* if we're allowed to directly forward data, we must update ->o */ |
1904 | 0 | if (ic->to_forward && !(sc_opposite(sc)->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED))) { |
1905 | 0 | unsigned long fwd = ret; |
1906 | 0 | if (ic->to_forward != CHN_INFINITE_FORWARD) { |
1907 | 0 | if (fwd > ic->to_forward) |
1908 | 0 | fwd = ic->to_forward; |
1909 | 0 | ic->to_forward -= fwd; |
1910 | 0 | } |
1911 | 0 | c_adv(ic, fwd); |
1912 | 0 | } |
1913 | |
|
1914 | 0 | ic->flags |= CF_READ_EVENT; |
1915 | 0 | sc->bytes_in += ret; |
1916 | | |
1917 | | /* End-of-input reached, we can leave. In this case, it is |
1918 | | * important to break the loop to not block the SC because of |
1919 | | * the channel's policies.This way, we are still able to receive |
1920 | | * shutdowns. |
1921 | | */ |
1922 | 0 | if (sc_ep_test(sc, SE_FL_EOI)) |
1923 | 0 | goto done_recv; |
1924 | | |
1925 | 0 | if ((sc->flags & SC_FL_RCV_ONCE) || --read_poll <= 0) { |
1926 | | /* we don't expect to read more data */ |
1927 | 0 | sc_wont_read(sc); |
1928 | 0 | goto done_recv; |
1929 | 0 | } |
1930 | | |
1931 | | /* if too many bytes were missing from last read, it means that |
1932 | | * it's pointless trying to read again because the system does |
1933 | | * not have them in buffers. |
1934 | | */ |
1935 | 0 | if (ret < max) { |
1936 | | /* if a streamer has read few data, it may be because we |
1937 | | * have exhausted system buffers. It's not worth trying |
1938 | | * again. |
1939 | | */ |
1940 | 0 | if (ic->flags & CF_STREAMER) { |
1941 | | /* we're stopped by the channel's policy */ |
1942 | 0 | sc_wont_read(sc); |
1943 | 0 | goto done_recv; |
1944 | 0 | } |
1945 | | |
1946 | | /* if we read a large block smaller than what we requested, |
1947 | | * it's almost certain we'll never get anything more. |
1948 | | */ |
1949 | 0 | if (ret >= global.tune.recv_enough) { |
1950 | | /* we're stopped by the channel's policy */ |
1951 | 0 | sc_wont_read(sc); |
1952 | 0 | } |
1953 | 0 | } |
1954 | | |
1955 | 0 | done_recv: |
1956 | 0 | if (cur_read) { |
1957 | 0 | channel_check_xfer(ic, cur_read); |
1958 | 0 | sc_ep_report_read_activity(sc); |
1959 | 0 | } |
1960 | |
|
1961 | 0 | end_recv: |
1962 | 0 | ret = (cur_read != 0); |
1963 | | |
1964 | | /* Report EOI on the channel if it was reached from the mux point of |
1965 | | * view. */ |
1966 | 0 | if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) { |
1967 | 0 | sc_ep_report_read_activity(sc); |
1968 | 0 | sc->flags |= SC_FL_EOI; |
1969 | 0 | ic->flags |= CF_READ_EVENT; |
1970 | 0 | ret = 1; |
1971 | 0 | } |
1972 | |
|
1973 | 0 | if (sc_ep_test(sc, SE_FL_EOS)) { |
1974 | | /* we received a shutdown */ |
1975 | 0 | if (ic->flags & CF_AUTO_CLOSE) |
1976 | 0 | sc_schedule_shutdown(sc_opposite(sc)); |
1977 | 0 | sc_applet_eos(sc); |
1978 | 0 | ret = 1; |
1979 | 0 | } |
1980 | |
|
1981 | 0 | if (sc_ep_test(sc, SE_FL_ERROR)) { |
1982 | 0 | sc->flags |= SC_FL_ERROR; |
1983 | 0 | ret = 1; |
1984 | 0 | } |
1985 | 0 | else if (cur_read || (sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM))) { |
1986 | 0 | se_have_more_data(sc->sedesc); |
1987 | 0 | ret = 1; |
1988 | 0 | } |
1989 | |
|
1990 | 0 | return ret; |
1991 | 0 | } |
1992 | | |
1993 | | /* This tries to perform a synchronous receive on the stream connector to |
1994 | | * try to collect last arrived data. In practice it's only implemented on |
1995 | | * stconns. Returns 0 if nothing was done, non-zero if new data or a |
1996 | | * shutdown were collected. This may result on some delayed receive calls |
1997 | | * to be programmed and performed later, though it doesn't provide any |
1998 | | * such guarantee. |
1999 | | */ |
2000 | | int sc_applet_sync_recv(struct stconn *sc) |
2001 | 0 | { |
2002 | 0 | if (!appctx_app_test(__sc_appctx(sc), APPLET_FL_NEW_API)) |
2003 | 0 | return 0; |
2004 | | |
2005 | 0 | if (!sc_state_in(sc->state, SC_SB_RDY|SC_SB_EST)) |
2006 | 0 | return 0; |
2007 | | |
2008 | 0 | if (se_fl_test(sc->sedesc, SE_FL_ORPHAN)) |
2009 | 0 | return 0; |
2010 | | |
2011 | 0 | if (!sc_is_recv_allowed(sc)) |
2012 | 0 | return 0; // already failed |
2013 | | |
2014 | 0 | return sc_applet_recv(sc); |
2015 | 0 | } |
2016 | | |
2017 | | /* |
2018 | | * This function is called to send buffer data to an applet. It calls the |
2019 | | * applet's snd_buf function. Please do not statify this function, it's often |
2020 | | * present in backtraces, it's useful to recognize it. |
2021 | | */ |
2022 | | int sc_applet_send(struct stconn *sc) |
2023 | 0 | { |
2024 | 0 | struct stconn *sco = sc_opposite(sc); |
2025 | 0 | struct channel *oc = sc_oc(sc); |
2026 | 0 | size_t ret; |
2027 | 0 | int did_send = 0; |
2028 | |
|
2029 | 0 | if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) { |
2030 | 0 | BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING)); |
2031 | 0 | if (co_data(oc)) |
2032 | 0 | sc_ep_report_blocked_send(sc, 0); |
2033 | 0 | return 1; |
2034 | 0 | } |
2035 | | |
2036 | 0 | if (sc_ep_test(sc, SE_FL_WONT_CONSUME)) |
2037 | 0 | return 0; |
2038 | | |
2039 | | /* we might have been called just after an asynchronous shutw */ |
2040 | 0 | if (sc->flags & SC_FL_SHUT_DONE) |
2041 | 0 | return 1; |
2042 | | |
2043 | | /* We must wait because the applet is not fully initialized */ |
2044 | 0 | if (se_fl_test(sc->sedesc, SE_FL_ORPHAN)) |
2045 | 0 | return 0; |
2046 | | |
2047 | 0 | sc_ep_fwd_kip(sco, sc); |
2048 | | |
2049 | | /* TODO: Splicing is not supported, so it is not possible to have FF data stuck into the I/O buf */ |
2050 | 0 | BUG_ON(sc_ep_have_ff_data(sc)); |
2051 | |
|
2052 | 0 | if (co_data(oc)) { |
2053 | 0 | unsigned int send_flag = 0; |
2054 | |
|
2055 | 0 | if ((sc->flags & SC_FL_SHUT_WANTED) && co_data(oc) == c_data(oc)) |
2056 | 0 | send_flag |= CO_SFL_LAST_DATA; |
2057 | |
|
2058 | 0 | ret = appctx_snd_buf(sc, &oc->buf, co_data(oc), send_flag); |
2059 | 0 | if (ret > 0) { |
2060 | 0 | did_send = 1; |
2061 | 0 | c_rew(oc, ret); |
2062 | 0 | c_realign_if_empty(oc); |
2063 | 0 | sc->bytes_out += ret; |
2064 | 0 | if (!co_data(oc)) { |
2065 | | /* Always clear both flags once everything has been sent, they're one-shot */ |
2066 | 0 | sc->flags &= ~(SC_FL_SND_ASAP|SC_FL_SND_EXP_MORE); |
2067 | 0 | } |
2068 | | /* if some data remain in the buffer, it's only because the |
2069 | | * system buffers are full, we will try next time. |
2070 | | */ |
2071 | 0 | } |
2072 | 0 | } |
2073 | |
|
2074 | 0 | if (did_send) |
2075 | 0 | oc->flags |= CF_WRITE_EVENT | CF_WROTE_DATA; |
2076 | |
|
2077 | 0 | if (!sco->room_needed || (did_send && (sco->room_needed < 0 || channel_recv_max(sc_oc(sc)) >= sco->room_needed))) |
2078 | 0 | sc_have_room(sco); |
2079 | |
|
2080 | 0 | if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING)) { |
2081 | 0 | oc->flags |= CF_WRITE_EVENT; |
2082 | 0 | BUG_ON(sc_ep_test(sc, SE_FL_EOS|SE_FL_ERROR|SE_FL_ERR_PENDING) == (SE_FL_EOS|SE_FL_ERR_PENDING)); |
2083 | 0 | if (sc_ep_test(sc, SE_FL_ERROR)) |
2084 | 0 | sc->flags |= SC_FL_ERROR; |
2085 | 0 | return 1; |
2086 | 0 | } |
2087 | | |
2088 | 0 | if (!co_data(oc)) { |
2089 | 0 | if (did_send) |
2090 | 0 | sc_ep_report_send_activity(sc); |
2091 | 0 | } |
2092 | 0 | else { |
2093 | 0 | sc_ep_report_blocked_send(sc, did_send); |
2094 | 0 | } |
2095 | |
|
2096 | 0 | return did_send; |
2097 | 0 | } |
2098 | | |
2099 | | void sc_applet_sync_send(struct stconn *sc) |
2100 | 0 | { |
2101 | 0 | struct channel *oc = sc_oc(sc); |
2102 | |
|
2103 | 0 | oc->flags &= ~CF_WRITE_EVENT; |
2104 | |
|
2105 | 0 | if (!appctx_app_test(__sc_appctx(sc), APPLET_FL_NEW_API)) |
2106 | 0 | return; |
2107 | | |
2108 | 0 | if (sc->flags & SC_FL_SHUT_DONE) |
2109 | 0 | return; |
2110 | | |
2111 | 0 | if (!co_data(oc)) |
2112 | 0 | return; |
2113 | | |
2114 | 0 | if (!sc_state_in(sc->state, SC_SB_EST)) |
2115 | 0 | return; |
2116 | | |
2117 | 0 | if (se_fl_test(sc->sedesc, SE_FL_ORPHAN)) |
2118 | 0 | return; |
2119 | | |
2120 | 0 | sc_applet_send(sc); |
2121 | 0 | if (oc->flags & CF_WRITE_EVENT) |
2122 | 0 | oc->flags |= CF_WAKE_ONCE; |
2123 | 0 | } |
2124 | | |
2125 | | /* Callback to be used by applet handlers upon completion. It updates the stream |
2126 | | * (which may or may not take this opportunity to try to forward data), then |
2127 | | * may re-enable the applet's based on the channels and stream connector's final |
2128 | | * states. Please do not statify this function, it's often present in backtraces, |
2129 | | * it's useful to recognize it. |
2130 | | */ |
2131 | | int sc_applet_process(struct stconn *sc) |
2132 | 0 | { |
2133 | 0 | struct channel *ic = sc_ic(sc); |
2134 | |
|
2135 | 0 | BUG_ON(!sc_appctx(sc)); |
2136 | | |
2137 | | /* Report EOI on the channel if it was reached from the applet point of |
2138 | | * view. */ |
2139 | 0 | if (sc_ep_test(sc, SE_FL_EOI) && !(sc->flags & SC_FL_EOI)) { |
2140 | 0 | sc_ep_report_read_activity(sc); |
2141 | 0 | sc->flags |= SC_FL_EOI; |
2142 | 0 | ic->flags |= CF_READ_EVENT; |
2143 | 0 | } |
2144 | |
|
2145 | 0 | if (sc_ep_test(sc, SE_FL_ERROR)) |
2146 | 0 | sc->flags |= SC_FL_ERROR; |
2147 | |
|
2148 | 0 | if (sc_ep_test(sc, SE_FL_EOS)) { |
2149 | | /* we received a shutdown */ |
2150 | 0 | sc_applet_eos(sc); |
2151 | 0 | } |
2152 | |
|
2153 | 0 | BUG_ON(sc_ep_test(sc, SE_FL_HAVE_NO_DATA|SE_FL_EOI) == SE_FL_EOI); |
2154 | | |
2155 | | /* If the applet wants to write and the channel is closed, it's a |
2156 | | * broken pipe and it must be reported. |
2157 | | */ |
2158 | 0 | if (!sc_ep_test(sc, SE_FL_HAVE_NO_DATA) && (sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))) |
2159 | 0 | sc_ep_set(sc, SE_FL_ERROR); |
2160 | | |
2161 | | /* automatically mark the applet having data available if it reported |
2162 | | * begin blocked by the channel. |
2163 | | */ |
2164 | 0 | if ((sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM)) || |
2165 | 0 | sc_ep_test(sc, SE_FL_APPLET_NEED_CONN)) |
2166 | 0 | applet_have_more_data(__sc_appctx(sc)); |
2167 | | |
2168 | | /* update the stream connector, channels, and possibly wake the stream up */ |
2169 | 0 | sc_notify(sc); |
2170 | 0 | stream_release_buffers(__sc_strm(sc)); |
2171 | | |
2172 | | /* sc_notify may have passed through chk_snd and released some blocking |
2173 | | * flags. Process_stream will consider those flags to wake up the |
2174 | | * appctx but in the case the task is not in runqueue we may have to |
2175 | | * wakeup the appctx immediately. |
2176 | | */ |
2177 | 0 | if ((sc_is_recv_allowed(sc) && !applet_fl_test(__sc_appctx(sc), APPCTX_FL_OUTBLK_ALLOC)) || |
2178 | 0 | (sc_is_send_allowed(sc) && !applet_fl_test(__sc_appctx(sc), APPCTX_FL_INBLK_ALLOC))) |
2179 | 0 | appctx_wakeup(__sc_appctx(sc)); |
2180 | 0 | return 0; |
2181 | 0 | } |
2182 | | |
2183 | | |
2184 | | /* Prepares an endpoint upgrade. We don't now at this stage if the upgrade will |
2185 | | * succeed or not and if the stconn will be reused by the new endpoint. Thus, |
2186 | | * for now, only pretend the stconn is detached. |
2187 | | */ |
2188 | | void sc_conn_prepare_endp_upgrade(struct stconn *sc) |
2189 | 0 | { |
2190 | 0 | BUG_ON(!sc_conn(sc) || !sc->app); |
2191 | 0 | sc_ep_clr(sc, SE_FL_T_MUX); |
2192 | 0 | sc_ep_set(sc, SE_FL_DETACHED); |
2193 | 0 | } |
2194 | | |
2195 | | /* Endpoint upgrade failed. Restore the stconn state. */ |
2196 | | void sc_conn_abort_endp_upgrade(struct stconn *sc) |
2197 | 0 | { |
2198 | 0 | sc_ep_set(sc, SE_FL_T_MUX); |
2199 | 0 | sc_ep_clr(sc, SE_FL_DETACHED); |
2200 | 0 | } |
2201 | | |
2202 | | /* Commit the endpoint upgrade. If stconn is attached, it means the new endpoint |
2203 | | * use it. So we do nothing. Otherwise, the stconn will be destroy with the |
2204 | | * overlying stream. So, it means we must commit the detach. |
2205 | | */ |
2206 | | void sc_conn_commit_endp_upgrade(struct stconn *sc) |
2207 | 0 | { |
2208 | 0 | if (!sc_ep_test(sc, SE_FL_DETACHED)) |
2209 | 0 | return; |
2210 | 0 | sc_detach_endp(&sc); |
2211 | | /* Because it was already set as detached, the sedesc must be preserved */ |
2212 | 0 | BUG_ON(!sc); |
2213 | 0 | BUG_ON(!sc->sedesc); |
2214 | 0 | } |
2215 | | |
2216 | | /* Return a debug string exposing the internals of the front or back |
2217 | | * stream/connection when supported. It will be protocol-dependent and will |
2218 | | * change over time like the output of "show fd" or "show sess all". |
2219 | | */ |
2220 | | static int smp_fetch_debug_str(const struct arg *args, struct sample *smp, const char *kw, void *private) |
2221 | 0 | { |
2222 | 0 | struct connection *conn; |
2223 | 0 | struct stconn *sc; |
2224 | 0 | union mux_sctl_dbg_str_ctx sctl_ctx = { }; |
2225 | |
|
2226 | 0 | if (!smp->strm) |
2227 | 0 | return 0; |
2228 | | |
2229 | 0 | sc = (kw[0] == 'f' ? smp->strm->scf : smp->strm->scb); |
2230 | 0 | conn = sc_conn(sc); |
2231 | |
|
2232 | 0 | if (!conn) |
2233 | 0 | return 0; |
2234 | | |
2235 | | /* a missing mux is necessarily on the backend, and may arrive later */ |
2236 | 0 | if (!conn->mux) { |
2237 | 0 | smp->flags |= SMP_F_MAY_CHANGE; |
2238 | 0 | return 0; |
2239 | 0 | } |
2240 | | |
2241 | | /* Not implemented, return nothing */ |
2242 | 0 | if (!conn->mux->sctl) |
2243 | 0 | return 0; |
2244 | | |
2245 | 0 | sctl_ctx.arg.debug_flags = args->data.sint ? args->data.sint : ~0U; |
2246 | 0 | if (conn->mux->sctl(sc, MUX_SCTL_DBG_STR, &sctl_ctx) == -1) |
2247 | 0 | return 0; |
2248 | | |
2249 | 0 | smp->data.type = SMP_T_STR; |
2250 | 0 | smp->flags = SMP_F_VOL_TEST | SMP_F_MAY_CHANGE; |
2251 | 0 | smp->data.u.str = sctl_ctx.ret.buf; |
2252 | 0 | return 1; |
2253 | 0 | } |
2254 | | |
2255 | | /* return the frontend or backend mux stream ID. |
2256 | | */ |
2257 | | static int |
2258 | | smp_fetch_sid(const struct arg *args, struct sample *smp, const char *kw, void *private) |
2259 | 0 | { |
2260 | 0 | struct connection *conn; |
2261 | 0 | struct stconn *sc; |
2262 | 0 | int64_t sid = 0; |
2263 | |
|
2264 | 0 | if (!smp->strm) |
2265 | 0 | return 0; |
2266 | | |
2267 | 0 | sc = (kw[0] == 'f' ? smp->strm->scf : smp->strm->scb); |
2268 | 0 | conn = sc_conn(sc); |
2269 | | |
2270 | | /* No connection */ |
2271 | 0 | if (!conn) |
2272 | 0 | return 0; |
2273 | | |
2274 | | /* No mux install, this may change */ |
2275 | 0 | if (!conn->mux) { |
2276 | 0 | smp->flags |= SMP_F_MAY_CHANGE; |
2277 | 0 | return 0; |
2278 | 0 | } |
2279 | | |
2280 | | /* No sctl, report sid=0 in this case */ |
2281 | 0 | if (conn->mux->sctl) { |
2282 | 0 | if (conn->mux->sctl(sc, MUX_SCTL_SID, &sid) == -1) |
2283 | 0 | return 0; |
2284 | 0 | } |
2285 | | |
2286 | 0 | smp->flags = SMP_F_VOL_TXN; |
2287 | 0 | smp->data.type = SMP_T_SINT; |
2288 | 0 | smp->data.u.sint = sid; |
2289 | |
|
2290 | 0 | return 1; |
2291 | 0 | } |
2292 | | |
2293 | | /* return 1 if the frontend or backend mux stream has received an abort and 0 otherwise. |
2294 | | */ |
2295 | | static int |
2296 | | smp_fetch_strm_aborted(const struct arg *args, struct sample *smp, const char *kw, void *private) |
2297 | 0 | { |
2298 | 0 | struct stconn *sc; |
2299 | 0 | unsigned int aborted = 0; |
2300 | |
|
2301 | 0 | if (!smp->strm) |
2302 | 0 | return 0; |
2303 | | |
2304 | 0 | sc = (kw[0] == 'f' ? smp->strm->scf : smp->strm->scb); |
2305 | 0 | if (sc->sedesc->abort_info.info) |
2306 | 0 | aborted = 1; |
2307 | |
|
2308 | 0 | smp->flags = SMP_F_VOL_TXN; |
2309 | 0 | smp->data.type = SMP_T_BOOL; |
2310 | 0 | smp->data.u.sint = aborted; |
2311 | |
|
2312 | 0 | return 1; |
2313 | 0 | } |
2314 | | |
2315 | | /* return the H2/QUIC RESET code of the frontend or backend mux stream. Any value |
2316 | | * means an a RST_STREAM was received on H2 and a STOP_SENDING on QUIC. Otherwise the sample fetch fails. |
2317 | | */ |
2318 | | static int |
2319 | | smp_fetch_strm_rst_code(const struct arg *args, struct sample *smp, const char *kw, void *private) |
2320 | 0 | { |
2321 | 0 | struct stconn *sc; |
2322 | 0 | unsigned int source; |
2323 | 0 | unsigned long long code = 0; |
2324 | |
|
2325 | 0 | if (!smp->strm) |
2326 | 0 | return 0; |
2327 | | |
2328 | 0 | sc = (kw[0] == 'f' ? smp->strm->scf : smp->strm->scb); |
2329 | 0 | source = ((sc->sedesc->abort_info.info & SE_ABRT_SRC_MASK) >> SE_ABRT_SRC_SHIFT); |
2330 | 0 | if (source != SE_ABRT_SRC_MUX_H2 && source != SE_ABRT_SRC_MUX_QUIC) { |
2331 | 0 | if (!source) |
2332 | 0 | smp->flags |= SMP_F_MAY_CHANGE; |
2333 | 0 | return 0; |
2334 | 0 | } |
2335 | 0 | code = sc->sedesc->abort_info.code; |
2336 | |
|
2337 | 0 | smp->flags = SMP_F_VOL_TXN; |
2338 | 0 | smp->data.type = SMP_T_SINT; |
2339 | 0 | smp->data.u.sint = code; |
2340 | |
|
2341 | 0 | return 1; |
2342 | 0 | } |
2343 | | |
2344 | | /* Note: must not be declared <const> as its list will be overwritten. |
2345 | | * Note: fetches that may return multiple types should be declared using the |
2346 | | * appropriate pseudo-type. If not available it must be declared as the lowest |
2347 | | * common denominator, the type that can be casted into all other ones. |
2348 | | */ |
2349 | | static struct sample_fetch_kw_list sample_fetch_keywords = {ILH, { |
2350 | | { "bs.debug_str", smp_fetch_debug_str, ARG1(0,SINT), NULL, SMP_T_STR, SMP_USE_L5SRV }, |
2351 | | { "bs.id", smp_fetch_sid, 0, NULL, SMP_T_SINT, SMP_USE_L5SRV }, |
2352 | | { "bs.aborted", smp_fetch_strm_aborted, 0, NULL, SMP_T_SINT, SMP_USE_L5SRV }, |
2353 | | { "bs.rst_code", smp_fetch_strm_rst_code, 0, NULL, SMP_T_SINT, SMP_USE_L5SRV }, |
2354 | | { "fs.debug_str", smp_fetch_debug_str, ARG1(0,SINT), NULL, SMP_T_STR, SMP_USE_L5CLI }, |
2355 | | { "fs.id", smp_fetch_sid, 0, NULL, SMP_T_STR, SMP_USE_L5CLI }, |
2356 | | { "fs.aborted", smp_fetch_strm_aborted, 0, NULL, SMP_T_SINT, SMP_USE_L5CLI }, |
2357 | | { "fs.rst_code", smp_fetch_strm_rst_code, 0, NULL, SMP_T_SINT, SMP_USE_L5CLI }, |
2358 | | { /* END */ }, |
2359 | | }}; |
2360 | | |
2361 | | INITCALL1(STG_REGISTER, sample_register_fetches, &sample_fetch_keywords); |