Coverage Report

Created: 2023-11-19 07:16

/src/haproxy/src/stream.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Stream management functions.
3
 *
4
 * Copyright 2000-2012 Willy Tarreau <w@1wt.eu>
5
 *
6
 * This program is free software; you can redistribute it and/or
7
 * modify it under the terms of the GNU General Public License
8
 * as published by the Free Software Foundation; either version
9
 * 2 of the License, or (at your option) any later version.
10
 *
11
 */
12
13
#include <stdlib.h>
14
#include <unistd.h>
15
16
#include <import/ebistree.h>
17
18
#include <haproxy/acl.h>
19
#include <haproxy/action.h>
20
#include <haproxy/activity.h>
21
#include <haproxy/api.h>
22
#include <haproxy/applet.h>
23
#include <haproxy/arg.h>
24
#include <haproxy/backend.h>
25
#include <haproxy/capture.h>
26
#include <haproxy/cfgparse.h>
27
#include <haproxy/channel.h>
28
#include <haproxy/check.h>
29
#include <haproxy/cli.h>
30
#include <haproxy/connection.h>
31
#include <haproxy/dict.h>
32
#include <haproxy/dynbuf.h>
33
#include <haproxy/fd.h>
34
#include <haproxy/filters.h>
35
#include <haproxy/freq_ctr.h>
36
#include <haproxy/frontend.h>
37
#include <haproxy/global.h>
38
#include <haproxy/hlua.h>
39
#include <haproxy/http_ana.h>
40
#include <haproxy/http_rules.h>
41
#include <haproxy/htx.h>
42
#include <haproxy/istbuf.h>
43
#include <haproxy/log.h>
44
#include <haproxy/pipe.h>
45
#include <haproxy/pool.h>
46
#include <haproxy/proxy.h>
47
#include <haproxy/queue.h>
48
#include <haproxy/sc_strm.h>
49
#include <haproxy/server.h>
50
#include <haproxy/resolvers.h>
51
#include <haproxy/sample.h>
52
#include <haproxy/session.h>
53
#include <haproxy/stats-t.h>
54
#include <haproxy/stconn.h>
55
#include <haproxy/stick_table.h>
56
#include <haproxy/stream.h>
57
#include <haproxy/task.h>
58
#include <haproxy/tcp_rules.h>
59
#include <haproxy/thread.h>
60
#include <haproxy/tools.h>
61
#include <haproxy/trace.h>
62
#include <haproxy/vars.h>
63
64
65
DECLARE_POOL(pool_head_stream, "stream", sizeof(struct stream));
66
DECLARE_POOL(pool_head_uniqueid, "uniqueid", UNIQUEID_LEN);
67
68
/* incremented by each "show sess" to fix a delimiter between streams */
69
unsigned stream_epoch = 0;
70
71
/* List of all use-service keywords. */
72
static struct list service_keywords = LIST_HEAD_INIT(service_keywords);
73
74
75
/* trace source and events */
76
static void strm_trace(enum trace_level level, uint64_t mask,
77
           const struct trace_source *src,
78
           const struct ist where, const struct ist func,
79
           const void *a1, const void *a2, const void *a3, const void *a4);
80
81
/* The event representation is split like this :
82
 *   strm  - stream
83
 *   sc    - stream connector
84
 *   http  - http analyzis
85
 *   tcp   - tcp analyzis
86
 *
87
 * STRM_EV_* macros are defined in <proto/stream.h>
88
 */
89
static const struct trace_event strm_trace_events[] = {
90
  { .mask = STRM_EV_STRM_NEW,     .name = "strm_new",     .desc = "new stream" },
91
  { .mask = STRM_EV_STRM_FREE,    .name = "strm_free",    .desc = "release stream" },
92
  { .mask = STRM_EV_STRM_ERR,     .name = "strm_err",     .desc = "error during stream processing" },
93
  { .mask = STRM_EV_STRM_ANA,     .name = "strm_ana",     .desc = "stream analyzers" },
94
  { .mask = STRM_EV_STRM_PROC,    .name = "strm_proc",    .desc = "stream processing" },
95
96
  { .mask = STRM_EV_CS_ST,        .name = "sc_state",     .desc = "processing connector states" },
97
98
  { .mask = STRM_EV_HTTP_ANA,     .name = "http_ana",     .desc = "HTTP analyzers" },
99
  { .mask = STRM_EV_HTTP_ERR,     .name = "http_err",     .desc = "error during HTTP analyzis" },
100
101
  { .mask = STRM_EV_TCP_ANA,      .name = "tcp_ana",      .desc = "TCP analyzers" },
102
  { .mask = STRM_EV_TCP_ERR,      .name = "tcp_err",      .desc = "error during TCP analyzis" },
103
104
  { .mask = STRM_EV_FLT_ANA,      .name = "flt_ana",      .desc = "Filter analyzers" },
105
  { .mask = STRM_EV_FLT_ERR,      .name = "flt_err",      .desc = "error during filter analyzis" },
106
  {}
107
};
108
109
static const struct name_desc strm_trace_lockon_args[4] = {
110
  /* arg1 */ { /* already used by the stream */ },
111
  /* arg2 */ { },
112
  /* arg3 */ { },
113
  /* arg4 */ { }
114
};
115
116
static const struct name_desc strm_trace_decoding[] = {
117
0
#define STRM_VERB_CLEAN    1
118
  { .name="clean",    .desc="only user-friendly stuff, generally suitable for level \"user\"" },
119
0
#define STRM_VERB_MINIMAL  2
120
  { .name="minimal",  .desc="report info on streams and connectors" },
121
0
#define STRM_VERB_SIMPLE   3
122
  { .name="simple",   .desc="add info on request and response channels" },
123
0
#define STRM_VERB_ADVANCED 4
124
  { .name="advanced", .desc="add info on channel's buffer for data and developer levels only" },
125
0
#define STRM_VERB_COMPLETE 5
126
  { .name="complete", .desc="add info on channel's buffer" },
127
  { /* end */ }
128
};
129
130
struct trace_source trace_strm = {
131
  .name = IST("stream"),
132
  .desc = "Applicative stream",
133
  .arg_def = TRC_ARG1_STRM,  // TRACE()'s first argument is always a stream
134
  .default_cb = strm_trace,
135
  .known_events = strm_trace_events,
136
  .lockon_args = strm_trace_lockon_args,
137
  .decoding = strm_trace_decoding,
138
  .report_events = ~0,  // report everything by default
139
};
140
141
#define TRACE_SOURCE &trace_strm
142
INITCALL1(STG_REGISTER, trace_register_source, TRACE_SOURCE);
143
144
/* the stream traces always expect that arg1, if non-null, is of a stream (from
145
 * which we can derive everything), that arg2, if non-null, is an http
146
 * transaction, that arg3, if non-null, is an http message.
147
 */
148
static void strm_trace(enum trace_level level, uint64_t mask, const struct trace_source *src,
149
           const struct ist where, const struct ist func,
150
           const void *a1, const void *a2, const void *a3, const void *a4)
151
0
{
152
0
  const struct stream *s = a1;
153
0
  const struct http_txn *txn = a2;
154
0
  const struct http_msg *msg = a3;
155
0
  struct task *task;
156
0
  const struct channel *req, *res;
157
0
  struct htx *htx;
158
159
0
  if (!s || src->verbosity < STRM_VERB_CLEAN)
160
0
    return;
161
162
0
  task = s->task;
163
0
  req  = &s->req;
164
0
  res  = &s->res;
165
0
  htx  = (msg ? htxbuf(&msg->chn->buf) : NULL);
166
167
  /* General info about the stream (htx/tcp, id...) */
168
0
  chunk_appendf(&trace_buf, " : [%u,%s]",
169
0
          s->uniq_id, ((s->flags & SF_HTX) ? "HTX" : "TCP"));
170
0
  if (isttest(s->unique_id)) {
171
0
    chunk_appendf(&trace_buf, " id=");
172
0
    b_putist(&trace_buf, s->unique_id);
173
0
  }
174
175
  /* Front and back stream connector state */
176
0
  chunk_appendf(&trace_buf, " SC=(%s,%s)",
177
0
          sc_state_str(s->scf->state), sc_state_str(s->scb->state));
178
179
  /* If txn is defined, HTTP req/rep states */
180
0
  if (txn)
181
0
    chunk_appendf(&trace_buf, " HTTP=(%s,%s)",
182
0
            h1_msg_state_str(txn->req.msg_state), h1_msg_state_str(txn->rsp.msg_state));
183
0
  if (msg)
184
0
    chunk_appendf(&trace_buf, " %s", ((msg->chn->flags & CF_ISRESP) ? "RESPONSE" : "REQUEST"));
185
186
0
  if (src->verbosity == STRM_VERB_CLEAN)
187
0
    return;
188
189
  /* If msg defined, display status-line if possible (verbosity > MINIMAL) */
190
0
  if (src->verbosity > STRM_VERB_MINIMAL && htx && htx_nbblks(htx)) {
191
0
    const struct htx_blk *blk = __htx_get_head_blk(htx);
192
0
    const struct htx_sl  *sl  = htx_get_blk_ptr(htx, blk);
193
0
    enum htx_blk_type    type = htx_get_blk_type(blk);
194
195
0
    if (type == HTX_BLK_REQ_SL || type == HTX_BLK_RES_SL)
196
0
      chunk_appendf(&trace_buf, " - \"%.*s %.*s %.*s\"",
197
0
              HTX_SL_P1_LEN(sl), HTX_SL_P1_PTR(sl),
198
0
              HTX_SL_P2_LEN(sl), HTX_SL_P2_PTR(sl),
199
0
              HTX_SL_P3_LEN(sl), HTX_SL_P3_PTR(sl));
200
0
  }
201
202
0
    chunk_appendf(&trace_buf, " - t=%p t.exp=%d s=(%p,0x%08x,0x%x)",
203
0
            task, tick_isset(task->expire) ? TICKS_TO_MS(task->expire - now_ms) : TICK_ETERNITY, s, s->flags, s->conn_err_type);
204
205
  /* If txn defined info about HTTP msgs, otherwise info about SI. */
206
0
  if (txn) {
207
0
    chunk_appendf(&trace_buf, " txn.flags=0x%08x, http.flags=(0x%08x,0x%08x) status=%d",
208
0
            txn->flags, txn->req.flags, txn->rsp.flags, txn->status);
209
0
  }
210
0
  else {
211
0
    chunk_appendf(&trace_buf, " scf=(%p,%d,0x%08x,0x%x) scb=(%p,%d,0x%08x,0x%x) scf.exp(r,w)=(%d,%d) scb.exp(r,w)=(%d,%d) retries=%d",
212
0
            s->scf, s->scf->state, s->scf->flags, s->scf->sedesc->flags,
213
0
            s->scb, s->scb->state, s->scb->flags, s->scb->sedesc->flags,
214
0
            tick_isset(sc_ep_rcv_ex(s->scf)) ? TICKS_TO_MS(sc_ep_rcv_ex(s->scf) - now_ms) : TICK_ETERNITY,
215
0
            tick_isset(sc_ep_snd_ex(s->scf)) ? TICKS_TO_MS(sc_ep_snd_ex(s->scf) - now_ms) : TICK_ETERNITY,
216
0
            tick_isset(sc_ep_rcv_ex(s->scb)) ? TICKS_TO_MS(sc_ep_rcv_ex(s->scb) - now_ms) : TICK_ETERNITY,
217
0
            tick_isset(sc_ep_snd_ex(s->scb)) ? TICKS_TO_MS(sc_ep_snd_ex(s->scb) - now_ms) : TICK_ETERNITY,
218
0
            s->conn_retries);
219
0
  }
220
221
0
  if (src->verbosity == STRM_VERB_MINIMAL)
222
0
    return;
223
224
225
  /* If txn defined, don't display all channel info */
226
0
  if (src->verbosity == STRM_VERB_SIMPLE || txn) {
227
0
    chunk_appendf(&trace_buf, " req=(%p .fl=0x%08x .exp=%d)",
228
0
            req, req->flags, tick_isset(req->analyse_exp) ? TICKS_TO_MS(req->analyse_exp - now_ms) : TICK_ETERNITY);
229
0
    chunk_appendf(&trace_buf, " res=(%p .fl=0x%08x .exp=%d)",
230
0
            res, res->flags, tick_isset(res->analyse_exp) ? TICKS_TO_MS(res->analyse_exp - now_ms) : TICK_ETERNITY);
231
0
  }
232
0
  else {
233
0
    chunk_appendf(&trace_buf, " req=(%p .fl=0x%08x .ana=0x%08x .exp=%u .o=%lu .tot=%llu .to_fwd=%u)",
234
0
            req, req->flags, req->analysers, req->analyse_exp,
235
0
            (long)req->output, req->total, req->to_forward);
236
0
    chunk_appendf(&trace_buf, " res=(%p .fl=0x%08x .ana=0x%08x .exp=%u .o=%lu .tot=%llu .to_fwd=%u)",
237
0
            res, res->flags, res->analysers, res->analyse_exp,
238
0
            (long)res->output, res->total, res->to_forward);
239
0
  }
240
241
0
  if (src->verbosity == STRM_VERB_SIMPLE ||
242
0
      (src->verbosity == STRM_VERB_ADVANCED && src->level < TRACE_LEVEL_DATA))
243
0
    return;
244
245
  /* channels' buffer info */
246
0
  if (s->flags & SF_HTX) {
247
0
    struct htx *rqhtx = htxbuf(&req->buf);
248
0
    struct htx *rphtx = htxbuf(&res->buf);
249
250
0
    chunk_appendf(&trace_buf, " htx=(%u/%u#%u, %u/%u#%u)",
251
0
            rqhtx->data, rqhtx->size, htx_nbblks(rqhtx),
252
0
            rphtx->data, rphtx->size, htx_nbblks(rphtx));
253
0
  }
254
0
  else {
255
0
    chunk_appendf(&trace_buf, " buf=(%u@%p+%u/%u, %u@%p+%u/%u)",
256
0
            (unsigned int)b_data(&req->buf), b_orig(&req->buf),
257
0
            (unsigned int)b_head_ofs(&req->buf), (unsigned int)b_size(&req->buf),
258
0
            (unsigned int)b_data(&res->buf), b_orig(&res->buf),
259
0
            (unsigned int)b_head_ofs(&res->buf), (unsigned int)b_size(&res->buf));
260
0
  }
261
262
  /* If msg defined, display htx info if defined (level > USER) */
263
0
  if (src->level > TRACE_LEVEL_USER && htx && htx_nbblks(htx)) {
264
0
    int full = 0;
265
266
    /* Full htx info (level > STATE && verbosity > SIMPLE) */
267
0
    if (src->level > TRACE_LEVEL_STATE) {
268
0
      if (src->verbosity == STRM_VERB_COMPLETE)
269
0
        full = 1;
270
0
    }
271
272
0
    chunk_memcat(&trace_buf, "\n\t", 2);
273
0
    htx_dump(&trace_buf, htx, full);
274
0
  }
275
0
}
276
277
/* Upgrade an existing stream for stream connector <sc>. Return < 0 on error. This
278
 * is only valid right after a TCP to H1 upgrade. The stream should be
279
 * "reativated" by removing SF_IGNORE flag. And the right mode must be set.  On
280
 * success, <input> buffer is transferred to the stream and thus points to
281
 * BUF_NULL. On error, it is unchanged and it is the caller responsibility to
282
 * release it (this never happens for now).
283
 */
284
int stream_upgrade_from_sc(struct stconn *sc, struct buffer *input)
285
0
{
286
0
  struct stream *s = __sc_strm(sc);
287
0
  const struct mux_ops *mux = sc_mux_ops(sc);
288
289
0
  if (mux) {
290
0
    if (mux->flags & MX_FL_HTX)
291
0
      s->flags |= SF_HTX;
292
0
  }
293
294
0
  if (!b_is_null(input)) {
295
    /* Xfer the input buffer to the request channel. <input> will
296
     * than point to BUF_NULL. From this point, it is the stream
297
     * responsibility to release it.
298
     */
299
0
    s->req.buf = *input;
300
0
    *input = BUF_NULL;
301
0
    s->req.total = (IS_HTX_STRM(s) ? htxbuf(&s->req.buf)->data : b_data(&s->req.buf));
302
0
    sc_ep_report_read_activity(s->scf);
303
0
  }
304
305
0
  s->req.flags |= CF_READ_EVENT; /* Always report a read event */
306
0
  s->flags &= ~SF_IGNORE;
307
308
0
  task_wakeup(s->task, TASK_WOKEN_INIT);
309
0
  return 0;
310
0
}
311
312
/* Callback used to wake up a stream when an input buffer is available. The
313
 * stream <s>'s stream connectors are checked for a failed buffer allocation
314
 * as indicated by the presence of the SC_FL_NEED_BUFF flag and the lack of a
315
 * buffer, and and input buffer is assigned there (at most one). The function
316
 * returns 1 and wakes the stream up if a buffer was taken, otherwise zero.
317
 * It's designed to be called from __offer_buffer().
318
 */
319
int stream_buf_available(void *arg)
320
0
{
321
0
  struct stream *s = arg;
322
323
0
  if (!s->req.buf.size && !sc_ep_have_ff_data(s->scb) && s->scf->flags & SC_FL_NEED_BUFF &&
324
0
      b_alloc(&s->req.buf))
325
0
    sc_have_buff(s->scf);
326
0
  else if (!s->res.buf.size && !sc_ep_have_ff_data(s->scf) && s->scb->flags & SC_FL_NEED_BUFF &&
327
0
     b_alloc(&s->res.buf))
328
0
    sc_have_buff(s->scb);
329
0
  else
330
0
    return 0;
331
332
0
  task_wakeup(s->task, TASK_WOKEN_RES);
333
0
  return 1;
334
335
0
}
336
337
/* This function is called from the session handler which detects the end of
338
 * handshake, in order to complete initialization of a valid stream. It must be
339
 * called with a completely initialized session. It returns the pointer to
340
 * the newly created stream, or NULL in case of fatal error. The client-facing
341
 * end point is assigned to <origin>, which must be valid. The stream's task
342
 * is configured with a nice value inherited from the listener's nice if any.
343
 * The task's context is set to the new stream, and its function is set to
344
 * process_stream(). Target and analysers are null. <input> is used as input
345
 * buffer for the request channel and may contain data. On success, it is
346
 * transfer to the stream and <input> is set to BUF_NULL. On error, <input>
347
 * buffer is unchanged and it is the caller responsibility to release it.
348
 */
349
struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer *input)
350
0
{
351
0
  struct stream *s;
352
0
  struct task *t;
353
354
0
  DBG_TRACE_ENTER(STRM_EV_STRM_NEW);
355
0
  if (unlikely((s = pool_alloc(pool_head_stream)) == NULL))
356
0
    goto out_fail_alloc;
357
358
  /* minimum stream initialization required for an embryonic stream is
359
   * fairly low. We need very little to execute L4 ACLs, then we need a
360
   * task to make the client-side connection live on its own.
361
   *  - flags
362
   *  - stick-entry tracking
363
   */
364
0
  s->flags = 0;
365
0
  s->logs.logwait = sess->fe->to_log;
366
0
  s->logs.level = 0;
367
0
  s->logs.request_ts = 0;
368
0
  s->logs.t_queue = -1;
369
0
  s->logs.t_connect = -1;
370
0
  s->logs.t_data = -1;
371
0
  s->logs.t_close = 0;
372
0
  s->logs.bytes_in = s->logs.bytes_out = 0;
373
0
  s->logs.prx_queue_pos = 0;  /* we get the number of pending conns before us */
374
0
  s->logs.srv_queue_pos = 0; /* we will get this number soon */
375
0
  s->obj_type = OBJ_TYPE_STREAM;
376
377
0
  s->logs.accept_date = sess->accept_date;
378
0
  s->logs.accept_ts = sess->accept_ts;
379
0
  s->logs.t_handshake = sess->t_handshake;
380
0
  s->logs.t_idle = sess->t_idle;
381
382
  /* default logging function */
383
0
  s->do_log = strm_log;
384
385
  /* default error reporting function, may be changed by analysers */
386
0
  s->srv_error = default_srv_error;
387
388
  /* Initialise the current rule list pointer to NULL. We are sure that
389
   * any rulelist match the NULL pointer.
390
   */
391
0
  s->current_rule_list = NULL;
392
0
  s->current_rule = NULL;
393
0
  s->rules_exp = TICK_ETERNITY;
394
0
  s->last_rule_file = NULL;
395
0
  s->last_rule_line = 0;
396
397
0
  s->stkctr = NULL;
398
0
  if (pool_head_stk_ctr) {
399
0
    s->stkctr = pool_alloc(pool_head_stk_ctr);
400
0
    if (!s->stkctr)
401
0
      goto out_fail_alloc;
402
403
    /* Copy SC counters for the stream. We don't touch refcounts because
404
     * any reference we have is inherited from the session. Since the stream
405
     * doesn't exist without the session, the session's existence guarantees
406
     * we don't lose the entry. During the store operation, the stream won't
407
     * touch these ones.
408
     */
409
0
    memcpy(s->stkctr, sess->stkctr, sizeof(s->stkctr[0]) * global.tune.nb_stk_ctr);
410
0
  }
411
412
0
  s->sess = sess;
413
414
0
  s->stream_epoch = _HA_ATOMIC_LOAD(&stream_epoch);
415
0
  s->uniq_id = _HA_ATOMIC_FETCH_ADD(&global.req_count, 1);
416
417
  /* OK, we're keeping the stream, so let's properly initialize the stream */
418
0
  LIST_INIT(&s->back_refs);
419
420
0
  LIST_INIT(&s->buffer_wait.list);
421
0
  s->buffer_wait.target = s;
422
0
  s->buffer_wait.wakeup_cb = stream_buf_available;
423
424
0
  s->lat_time = s->cpu_time = 0;
425
0
  s->call_rate.curr_tick = s->call_rate.curr_ctr = s->call_rate.prev_ctr = 0;
426
0
  s->pcli_next_pid = 0;
427
0
  s->pcli_flags = 0;
428
0
  s->unique_id = IST_NULL;
429
430
0
  if ((t = task_new_here()) == NULL)
431
0
    goto out_fail_alloc;
432
433
0
  s->task = t;
434
0
  s->pending_events = 0;
435
0
  s->conn_retries = 0;
436
0
  s->conn_exp = TICK_ETERNITY;
437
0
  s->conn_err_type = STRM_ET_NONE;
438
0
  s->prev_conn_state = SC_ST_INI;
439
0
  t->process = process_stream;
440
0
  t->context = s;
441
0
  t->expire = TICK_ETERNITY;
442
0
  if (sess->listener)
443
0
    t->nice = sess->listener->bind_conf->nice;
444
445
  /* Note: initially, the stream's backend points to the frontend.
446
   * This changes later when switching rules are executed or
447
   * when the default backend is assigned.
448
   */
449
0
  s->be  = sess->fe;
450
0
  s->req_cap = NULL;
451
0
  s->res_cap = NULL;
452
453
  /* Initialize all the variables contexts even if not used.
454
   * This permits to prune these contexts without errors.
455
   *
456
   * We need to make sure that those lists are not re-initialized
457
   * by stream-dependant underlying code because we could lose
458
   * track of already defined variables, leading to data inconsistency
459
   * and memory leaks...
460
   *
461
   * For reference: we had a very old bug caused by vars_txn and
462
   * vars_reqres being accidentally re-initialized in http_create_txn()
463
   * (https://github.com/haproxy/haproxy/issues/1935)
464
   */
465
0
  vars_init_head(&s->vars_txn,    SCOPE_TXN);
466
0
  vars_init_head(&s->vars_reqres, SCOPE_REQ);
467
468
        /* Set SF_HTX flag for HTTP frontends. */
469
0
  if (sess->fe->mode == PR_MODE_HTTP)
470
0
    s->flags |= SF_HTX;
471
472
0
  s->scf = sc;
473
0
  if (sc_attach_strm(s->scf, s) < 0)
474
0
    goto out_fail_attach_scf;
475
476
0
  s->scb = sc_new_from_strm(s, SC_FL_ISBACK);
477
0
  if (!s->scb)
478
0
    goto out_fail_alloc_scb;
479
480
0
  sc_set_state(s->scf, SC_ST_EST);
481
482
0
  if (likely(sess->fe->options2 & PR_O2_INDEPSTR))
483
0
    s->scf->flags |= SC_FL_INDEP_STR;
484
485
0
  if (likely(sess->fe->options2 & PR_O2_INDEPSTR))
486
0
    s->scb->flags |= SC_FL_INDEP_STR;
487
488
0
  if (sc_ep_test(sc, SE_FL_WEBSOCKET))
489
0
    s->flags |= SF_WEBSOCKET;
490
0
  if (sc_conn(sc)) {
491
0
    const struct mux_ops *mux = sc_mux_ops(sc);
492
493
0
    if (mux && mux->flags & MX_FL_HTX)
494
0
      s->flags |= SF_HTX;
495
0
  }
496
497
0
  stream_init_srv_conn(s);
498
0
  s->target = sess->fe->default_target;
499
500
0
  s->pend_pos = NULL;
501
0
  s->priority_class = 0;
502
0
  s->priority_offset = 0;
503
504
  /* init store persistence */
505
0
  s->store_count = 0;
506
507
0
  channel_init(&s->req);
508
0
  s->req.flags |= CF_READ_EVENT; /* the producer is already connected */
509
0
  s->req.analysers = sess->listener ? sess->listener->bind_conf->analysers : sess->fe->fe_req_ana;
510
511
0
  if (IS_HTX_STRM(s)) {
512
    /* Be sure to have HTTP analysers because in case of
513
     * "destructive" stream upgrade, they may be missing (e.g
514
     * TCP>H2)
515
     */
516
0
    s->req.analysers |= AN_REQ_WAIT_HTTP|AN_REQ_HTTP_PROCESS_FE;
517
0
  }
518
519
0
  if (!sess->fe->fe_req_ana) {
520
0
    channel_auto_connect(&s->req);  /* don't wait to establish connection */
521
0
    channel_auto_close(&s->req);    /* let the producer forward close requests */
522
0
  }
523
524
0
  s->scf->ioto = sess->fe->timeout.client;
525
0
  s->req.analyse_exp = TICK_ETERNITY;
526
527
0
  channel_init(&s->res);
528
0
  s->res.flags |= CF_ISRESP;
529
0
  s->res.analysers = 0;
530
531
0
  if (sess->fe->options2 & PR_O2_NODELAY) {
532
0
    s->scf->flags |= SC_FL_SND_NEVERWAIT;
533
0
    s->scb->flags |= SC_FL_SND_NEVERWAIT;
534
0
  }
535
536
0
  s->scb->ioto = TICK_ETERNITY;
537
0
  s->res.analyse_exp = TICK_ETERNITY;
538
539
0
  s->txn = NULL;
540
0
  s->hlua = NULL;
541
542
0
  s->resolv_ctx.requester = NULL;
543
0
  s->resolv_ctx.hostname_dn = NULL;
544
0
  s->resolv_ctx.hostname_dn_len = 0;
545
0
  s->resolv_ctx.parent = NULL;
546
547
0
  s->tunnel_timeout = TICK_ETERNITY;
548
549
0
  LIST_APPEND(&th_ctx->streams, &s->list);
550
551
0
  if (flt_stream_init(s) < 0 || flt_stream_start(s) < 0)
552
0
    goto out_fail_accept;
553
554
  /* just in case the caller would have pre-disabled it */
555
0
  se_will_consume(s->scf->sedesc);
556
557
0
  if (sess->fe->accept && sess->fe->accept(s) < 0)
558
0
    goto out_fail_accept;
559
560
0
  if (!b_is_null(input)) {
561
    /* Xfer the input buffer to the request channel. <input> will
562
     * than point to BUF_NULL. From this point, it is the stream
563
     * responsibility to release it.
564
     */
565
0
    s->req.buf = *input;
566
0
    *input = BUF_NULL;
567
0
    s->req.total = (IS_HTX_STRM(s) ? htxbuf(&s->req.buf)->data : b_data(&s->req.buf));
568
0
    sc_ep_report_read_activity(s->scf);
569
0
  }
570
571
  /* it is important not to call the wakeup function directly but to
572
   * pass through task_wakeup(), because this one knows how to apply
573
   * priorities to tasks. Using multi thread we must be sure that
574
   * stream is fully initialized before calling task_wakeup. So
575
   * the caller must handle the task_wakeup
576
   */
577
0
  DBG_TRACE_LEAVE(STRM_EV_STRM_NEW, s);
578
0
  task_wakeup(s->task, TASK_WOKEN_INIT);
579
0
  return s;
580
581
  /* Error unrolling */
582
0
 out_fail_accept:
583
0
  flt_stream_release(s, 0);
584
0
  LIST_DELETE(&s->list);
585
0
  sc_free(s->scb);
586
0
 out_fail_alloc_scb:
587
0
 out_fail_attach_scf:
588
0
  task_destroy(t);
589
0
 out_fail_alloc:
590
0
  if (s)
591
0
    pool_free(pool_head_stk_ctr, s->stkctr);
592
0
  pool_free(pool_head_stream, s);
593
0
  DBG_TRACE_DEVEL("leaving on error", STRM_EV_STRM_NEW|STRM_EV_STRM_ERR);
594
0
  return NULL;
595
0
}
596
597
/*
598
 * frees  the context associated to a stream. It must have been removed first.
599
 */
600
void stream_free(struct stream *s)
601
0
{
602
0
  struct session *sess = strm_sess(s);
603
0
  struct proxy *fe = sess->fe;
604
0
  struct bref *bref, *back;
605
0
  int i;
606
607
0
  DBG_TRACE_POINT(STRM_EV_STRM_FREE, s);
608
609
  /* detach the stream from its own task before even releasing it so
610
   * that walking over a task list never exhibits a dying stream.
611
   */
612
0
  s->task->context = NULL;
613
0
  __ha_barrier_store();
614
615
0
  pendconn_free(s);
616
617
0
  if (objt_server(s->target)) { /* there may be requests left pending in queue */
618
0
    if (s->flags & SF_CURR_SESS) {
619
0
      s->flags &= ~SF_CURR_SESS;
620
0
      _HA_ATOMIC_DEC(&__objt_server(s->target)->cur_sess);
621
0
    }
622
0
    if (may_dequeue_tasks(__objt_server(s->target), s->be))
623
0
      process_srv_queue(__objt_server(s->target));
624
0
  }
625
626
0
  if (unlikely(s->srv_conn)) {
627
    /* the stream still has a reserved slot on a server, but
628
     * it should normally be only the same as the one above,
629
     * so this should not happen in fact.
630
     */
631
0
    sess_change_server(s, NULL);
632
0
  }
633
634
  /* We may still be present in the buffer wait queue */
635
0
  if (LIST_INLIST(&s->buffer_wait.list))
636
0
    LIST_DEL_INIT(&s->buffer_wait.list);
637
638
0
  if (s->req.buf.size || s->res.buf.size) {
639
0
    int count = !!s->req.buf.size + !!s->res.buf.size;
640
641
0
    b_free(&s->req.buf);
642
0
    b_free(&s->res.buf);
643
0
    offer_buffers(NULL, count);
644
0
  }
645
646
0
  pool_free(pool_head_uniqueid, s->unique_id.ptr);
647
0
  s->unique_id = IST_NULL;
648
649
0
  flt_stream_stop(s);
650
0
  flt_stream_release(s, 0);
651
652
0
  hlua_ctx_destroy(s->hlua);
653
0
  s->hlua = NULL;
654
0
  if (s->txn)
655
0
    http_destroy_txn(s);
656
657
  /* ensure the client-side transport layer is destroyed */
658
  /* Be sure it is useless !! */
659
  /* if (cli_cs) */
660
  /*  cs_close(cli_cs); */
661
662
0
  for (i = 0; i < s->store_count; i++) {
663
0
    if (!s->store[i].ts)
664
0
      continue;
665
0
    stksess_free(s->store[i].table, s->store[i].ts);
666
0
    s->store[i].ts = NULL;
667
0
  }
668
669
0
  if (s->resolv_ctx.requester) {
670
0
    __decl_thread(struct resolvers *resolvers = s->resolv_ctx.parent->arg.resolv.resolvers);
671
672
0
    HA_SPIN_LOCK(DNS_LOCK, &resolvers->lock);
673
0
    ha_free(&s->resolv_ctx.hostname_dn);
674
0
    s->resolv_ctx.hostname_dn_len = 0;
675
0
    resolv_unlink_resolution(s->resolv_ctx.requester);
676
0
    HA_SPIN_UNLOCK(DNS_LOCK, &resolvers->lock);
677
678
0
    pool_free(resolv_requester_pool, s->resolv_ctx.requester);
679
0
    s->resolv_ctx.requester = NULL;
680
0
  }
681
682
0
  if (fe) {
683
0
    if (s->req_cap) {
684
0
      struct cap_hdr *h;
685
0
      for (h = fe->req_cap; h; h = h->next)
686
0
        pool_free(h->pool, s->req_cap[h->index]);
687
0
      pool_free(fe->req_cap_pool, s->req_cap);
688
0
    }
689
690
0
    if (s->res_cap) {
691
0
      struct cap_hdr *h;
692
0
      for (h = fe->rsp_cap; h; h = h->next)
693
0
        pool_free(h->pool, s->res_cap[h->index]);
694
0
      pool_free(fe->rsp_cap_pool, s->res_cap);
695
0
    }
696
0
  }
697
698
  /* Cleanup all variable contexts. */
699
0
  if (!LIST_ISEMPTY(&s->vars_txn.head))
700
0
    vars_prune(&s->vars_txn, s->sess, s);
701
0
  if (!LIST_ISEMPTY(&s->vars_reqres.head))
702
0
    vars_prune(&s->vars_reqres, s->sess, s);
703
704
0
  stream_store_counters(s);
705
0
  pool_free(pool_head_stk_ctr, s->stkctr);
706
707
0
  list_for_each_entry_safe(bref, back, &s->back_refs, users) {
708
    /* we have to unlink all watchers. We must not relink them if
709
     * this stream was the last one in the list. This is safe to do
710
     * here because we're touching our thread's list so we know
711
     * that other streams are not active, and the watchers will
712
     * only touch their node under thread isolation.
713
     */
714
0
    LIST_DEL_INIT(&bref->users);
715
0
    if (s->list.n != &th_ctx->streams)
716
0
      LIST_APPEND(&LIST_ELEM(s->list.n, struct stream *, list)->back_refs, &bref->users);
717
0
    bref->ref = s->list.n;
718
0
    __ha_barrier_store();
719
0
  }
720
0
  LIST_DELETE(&s->list);
721
722
0
  sc_destroy(s->scb);
723
0
  sc_destroy(s->scf);
724
725
0
  pool_free(pool_head_stream, s);
726
727
  /* We may want to free the maximum amount of pools if the proxy is stopping */
728
0
  if (fe && unlikely(fe->flags & (PR_FL_DISABLED|PR_FL_STOPPED))) {
729
0
    pool_flush(pool_head_buffer);
730
0
    pool_flush(pool_head_http_txn);
731
0
    pool_flush(pool_head_requri);
732
0
    pool_flush(pool_head_capture);
733
0
    pool_flush(pool_head_stream);
734
0
    pool_flush(pool_head_session);
735
0
    pool_flush(pool_head_connection);
736
0
    pool_flush(pool_head_pendconn);
737
0
    pool_flush(fe->req_cap_pool);
738
0
    pool_flush(fe->rsp_cap_pool);
739
0
  }
740
0
}
741
742
743
/* Allocates a work buffer for stream <s>. It is meant to be called inside
744
 * process_stream(). It will only allocate the side needed for the function
745
 * to work fine, which is the response buffer so that an error message may be
746
 * built and returned. Response buffers may be allocated from the reserve, this
747
 * is critical to ensure that a response may always flow and will never block a
748
 * server from releasing a connection. Returns 0 in case of failure, non-zero
749
 * otherwise.
750
 */
751
static int stream_alloc_work_buffer(struct stream *s)
752
0
{
753
0
  if (b_alloc(&s->res.buf))
754
0
    return 1;
755
0
  return 0;
756
0
}
757
758
/* releases unused buffers after processing. Typically used at the end of the
759
 * update() functions. It will try to wake up as many tasks/applets as the
760
 * number of buffers that it releases. In practice, most often streams are
761
 * blocked on a single buffer, so it makes sense to try to wake two up when two
762
 * buffers are released at once.
763
 */
764
void stream_release_buffers(struct stream *s)
765
0
{
766
0
  int offer = 0;
767
768
0
  if (c_size(&s->req) && c_empty(&s->req)) {
769
0
    offer++;
770
0
    b_free(&s->req.buf);
771
0
  }
772
0
  if (c_size(&s->res) && c_empty(&s->res)) {
773
0
    offer++;
774
0
    b_free(&s->res.buf);
775
0
  }
776
777
  /* if we're certain to have at least 1 buffer available, and there is
778
   * someone waiting, we can wake up a waiter and offer them.
779
   */
780
0
  if (offer)
781
0
    offer_buffers(s, offer);
782
0
}
783
784
void stream_process_counters(struct stream *s)
785
0
{
786
0
  struct session *sess = s->sess;
787
0
  unsigned long long bytes;
788
0
  int i;
789
790
0
  bytes = s->req.total - s->logs.bytes_in;
791
0
  s->logs.bytes_in = s->req.total;
792
0
  if (bytes) {
793
0
    _HA_ATOMIC_ADD(&sess->fe->fe_counters.bytes_in, bytes);
794
0
    _HA_ATOMIC_ADD(&s->be->be_counters.bytes_in,    bytes);
795
796
0
    if (objt_server(s->target))
797
0
      _HA_ATOMIC_ADD(&__objt_server(s->target)->counters.bytes_in, bytes);
798
799
0
    if (sess->listener && sess->listener->counters)
800
0
      _HA_ATOMIC_ADD(&sess->listener->counters->bytes_in, bytes);
801
802
0
    for (i = 0; i < global.tune.nb_stk_ctr; i++) {
803
0
      if (!stkctr_inc_bytes_in_ctr(&s->stkctr[i], bytes))
804
0
        stkctr_inc_bytes_in_ctr(&sess->stkctr[i], bytes);
805
0
    }
806
0
  }
807
808
0
  bytes = s->res.total - s->logs.bytes_out;
809
0
  s->logs.bytes_out = s->res.total;
810
0
  if (bytes) {
811
0
    _HA_ATOMIC_ADD(&sess->fe->fe_counters.bytes_out, bytes);
812
0
    _HA_ATOMIC_ADD(&s->be->be_counters.bytes_out,    bytes);
813
814
0
    if (objt_server(s->target))
815
0
      _HA_ATOMIC_ADD(&__objt_server(s->target)->counters.bytes_out, bytes);
816
817
0
    if (sess->listener && sess->listener->counters)
818
0
      _HA_ATOMIC_ADD(&sess->listener->counters->bytes_out, bytes);
819
820
0
    for (i = 0; i < global.tune.nb_stk_ctr; i++) {
821
0
      if (!stkctr_inc_bytes_out_ctr(&s->stkctr[i], bytes))
822
0
        stkctr_inc_bytes_out_ctr(&sess->stkctr[i], bytes);
823
0
    }
824
0
  }
825
0
}
826
827
/* Abort processing on the both channels in same time */
828
void stream_abort(struct stream *s)
829
0
{
830
0
  channel_abort(&s->req);
831
0
  channel_abort(&s->res);
832
0
}
833
834
/*
835
 * Returns a message to the client ; the connection is shut down for read,
836
 * and the request is cleared so that no server connection can be initiated.
837
 * The buffer is marked for read shutdown on the other side to protect the
838
 * message, and the buffer write is enabled. The message is contained in a
839
 * "chunk". If it is null, then an empty message is used. The reply buffer does
840
 * not need to be empty before this, and its contents will not be overwritten.
841
 * The primary goal of this function is to return error messages to a client.
842
 */
843
void stream_retnclose(struct stream *s, const struct buffer *msg)
844
0
{
845
0
  struct channel *ic = &s->req;
846
0
  struct channel *oc = &s->res;
847
848
0
  channel_auto_read(ic);
849
0
  channel_abort(ic);
850
0
  channel_erase(ic);
851
0
  channel_truncate(oc);
852
853
0
  if (likely(msg && msg->data))
854
0
    co_inject(oc, msg->area, msg->data);
855
856
0
  channel_auto_read(oc);
857
0
  channel_auto_close(oc);
858
0
  sc_schedule_abort(s->scb);
859
0
}
860
861
int stream_set_timeout(struct stream *s, enum act_timeout_name name, int timeout)
862
0
{
863
0
  switch (name) {
864
0
  case ACT_TIMEOUT_CLIENT:
865
0
    s->scf->ioto = timeout;
866
0
    return 1;
867
868
0
  case ACT_TIMEOUT_SERVER:
869
0
    s->scb->ioto = timeout;
870
0
    return 1;
871
872
0
  case ACT_TIMEOUT_TUNNEL:
873
0
    s->tunnel_timeout = timeout;
874
0
    return 1;
875
876
0
  default:
877
0
    return 0;
878
0
  }
879
0
}
880
881
/*
882
 * This function handles the transition between the SC_ST_CON state and the
883
 * SC_ST_EST state. It must only be called after switching from SC_ST_CON (or
884
 * SC_ST_INI or SC_ST_RDY) to SC_ST_EST, but only when a ->proto is defined.
885
 * Note that it will switch the interface to SC_ST_DIS if we already have
886
 * the SC_FL_ABRT_DONE flag, it means we were able to forward the request, and
887
 * receive the response, before process_stream() had the opportunity to
888
 * make the switch from SC_ST_CON to SC_ST_EST. When that happens, we want
889
 * to go through back_establish() anyway, to make sure the analysers run.
890
 * Timeouts are cleared. Error are reported on the channel so that analysers
891
 * can handle them.
892
 */
893
static void back_establish(struct stream *s)
894
0
{
895
0
  struct connection *conn = sc_conn(s->scb);
896
0
  struct channel *req = &s->req;
897
0
  struct channel *rep = &s->res;
898
899
0
  DBG_TRACE_ENTER(STRM_EV_STRM_PROC|STRM_EV_CS_ST, s);
900
  /* First, centralize the timers information, and clear any irrelevant
901
   * timeout.
902
   */
903
0
  s->logs.t_connect = ns_to_ms(now_ns - s->logs.accept_ts);
904
0
  s->conn_exp = TICK_ETERNITY;
905
0
  s->flags &= ~SF_CONN_EXP;
906
907
  /* errors faced after sending data need to be reported */
908
0
  if ((s->scb->flags & SC_FL_ERROR) && req->flags & CF_WROTE_DATA) {
909
0
    s->req.flags |= CF_WRITE_EVENT;
910
0
    s->res.flags |= CF_READ_EVENT;
911
0
    s->conn_err_type = STRM_ET_DATA_ERR;
912
0
    DBG_TRACE_STATE("read/write error", STRM_EV_STRM_PROC|STRM_EV_CS_ST|STRM_EV_STRM_ERR, s);
913
0
  }
914
915
0
  if (objt_server(s->target))
916
0
    health_adjust(__objt_server(s->target), HANA_STATUS_L4_OK);
917
918
0
  if (!IS_HTX_STRM(s)) { /* let's allow immediate data connection in this case */
919
    /* if the user wants to log as soon as possible, without counting
920
     * bytes from the server, then this is the right moment. */
921
0
    if (!LIST_ISEMPTY(&strm_fe(s)->logformat) && !(s->logs.logwait & LW_BYTES)) {
922
      /* note: no pend_pos here, session is established */
923
0
      s->logs.t_close = s->logs.t_connect; /* to get a valid end date */
924
0
      s->do_log(s);
925
0
    }
926
0
  }
927
0
  else {
928
0
    s->scb->flags |= SC_FL_RCV_ONCE; /* a single read is enough to get response headers */
929
0
  }
930
931
0
  rep->analysers |= strm_fe(s)->fe_rsp_ana | s->be->be_rsp_ana;
932
933
0
  se_have_more_data(s->scb->sedesc);
934
0
  rep->flags |= CF_READ_EVENT; /* producer is now attached */
935
0
  sc_ep_report_read_activity(s->scb);
936
0
  if (conn) {
937
    /* real connections have timeouts
938
     * if already defined, it means that a set-timeout rule has
939
     * been executed so do not overwrite them
940
     */
941
0
    if (!tick_isset(s->scb->ioto))
942
0
      s->scb->ioto = s->be->timeout.server;
943
0
    if (!tick_isset(s->tunnel_timeout))
944
0
      s->tunnel_timeout = s->be->timeout.tunnel;
945
946
    /* The connection is now established, try to read data from the
947
     * underlying layer, and subscribe to recv events. We use a
948
     * delayed recv here to give a chance to the data to flow back
949
     * by the time we process other tasks.
950
     */
951
0
    sc_chk_rcv(s->scb);
952
0
  }
953
  /* If we managed to get the whole response, and we don't have anything
954
   * left to send, or can't, switch to SC_ST_DIS now. */
955
0
  if ((s->scb->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) || (s->scf->flags & SC_FL_SHUT_DONE)) {
956
0
    s->scb->state = SC_ST_DIS;
957
0
    DBG_TRACE_STATE("response channel shutdwn for read/write", STRM_EV_STRM_PROC|STRM_EV_CS_ST|STRM_EV_STRM_ERR, s);
958
0
  }
959
960
0
  DBG_TRACE_LEAVE(STRM_EV_STRM_PROC|STRM_EV_CS_ST, s);
961
0
}
962
963
/* Set correct stream termination flags in case no analyser has done it. It
964
 * also counts a failed request if the server state has not reached the request
965
 * stage.
966
 */
967
void sess_set_term_flags(struct stream *s)
968
0
{
969
0
  if (!(s->flags & SF_FINST_MASK)) {
970
0
    if (s->scb->state == SC_ST_INI) {
971
      /* anything before REQ in fact */
972
0
      _HA_ATOMIC_INC(&strm_fe(s)->fe_counters.failed_req);
973
0
      if (strm_li(s) && strm_li(s)->counters)
974
0
        _HA_ATOMIC_INC(&strm_li(s)->counters->failed_req);
975
976
0
      s->flags |= SF_FINST_R;
977
0
    }
978
0
    else if (s->scb->state == SC_ST_QUE)
979
0
      s->flags |= SF_FINST_Q;
980
0
    else if (sc_state_in(s->scb->state, SC_SB_REQ|SC_SB_TAR|SC_SB_ASS|SC_SB_CON|SC_SB_CER|SC_SB_RDY))
981
0
      s->flags |= SF_FINST_C;
982
0
    else if (s->scb->state == SC_ST_EST || s->prev_conn_state == SC_ST_EST)
983
0
      s->flags |= SF_FINST_D;
984
0
    else
985
0
      s->flags |= SF_FINST_L;
986
0
  }
987
0
}
988
989
/* This function parses the use-service action ruleset. It executes
990
 * the associated ACL and set an applet as a stream or txn final node.
991
 * it returns ACT_RET_ERR if an error occurs, the proxy left in
992
 * consistent state. It returns ACT_RET_STOP in success case because
993
 * use-service must be a terminal action. Returns ACT_RET_YIELD
994
 * if the initialisation function require more data.
995
 */
996
enum act_return process_use_service(struct act_rule *rule, struct proxy *px,
997
                                    struct session *sess, struct stream *s, int flags)
998
999
0
{
1000
0
  struct appctx *appctx;
1001
1002
  /* Initialises the applet if it is required. */
1003
0
  if (flags & ACT_OPT_FIRST) {
1004
    /* Register applet. this function schedules the applet. */
1005
0
    s->target = &rule->applet.obj_type;
1006
0
    appctx = sc_applet_create(s->scb, objt_applet(s->target));
1007
0
    if (unlikely(!appctx))
1008
0
      return ACT_RET_ERR;
1009
1010
    /* Finish initialisation of the context. */
1011
0
    appctx->rule = rule;
1012
0
    if (appctx_init(appctx) == -1)
1013
0
      return ACT_RET_ERR;
1014
0
  }
1015
0
  else
1016
0
    appctx = __sc_appctx(s->scb);
1017
1018
0
  if (rule->from != ACT_F_HTTP_REQ) {
1019
0
    if (sess->fe == s->be) /* report it if the request was intercepted by the frontend */
1020
0
      _HA_ATOMIC_INC(&sess->fe->fe_counters.intercepted_req);
1021
1022
    /* The flag SF_ASSIGNED prevent from server assignment. */
1023
0
    s->flags |= SF_ASSIGNED;
1024
0
  }
1025
1026
  /* Now we can schedule the applet. */
1027
0
  applet_need_more_data(appctx);
1028
0
  appctx_wakeup(appctx);
1029
0
  return ACT_RET_STOP;
1030
0
}
1031
1032
/* This stream analyser checks the switching rules and changes the backend
1033
 * if appropriate. The default_backend rule is also considered, then the
1034
 * target backend's forced persistence rules are also evaluated last if any.
1035
 * It returns 1 if the processing can continue on next analysers, or zero if it
1036
 * either needs more data or wants to immediately abort the request.
1037
 */
1038
static int process_switching_rules(struct stream *s, struct channel *req, int an_bit)
1039
0
{
1040
0
  struct persist_rule *prst_rule;
1041
0
  struct session *sess = s->sess;
1042
0
  struct proxy *fe = sess->fe;
1043
1044
0
  req->analysers &= ~an_bit;
1045
0
  req->analyse_exp = TICK_ETERNITY;
1046
1047
0
  DBG_TRACE_ENTER(STRM_EV_STRM_ANA, s);
1048
1049
  /* now check whether we have some switching rules for this request */
1050
0
  if (!(s->flags & SF_BE_ASSIGNED)) {
1051
0
    struct switching_rule *rule;
1052
1053
0
    list_for_each_entry(rule, &fe->switching_rules, list) {
1054
0
      int ret = 1;
1055
1056
0
      if (rule->cond) {
1057
0
        ret = acl_exec_cond(rule->cond, fe, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL);
1058
0
        ret = acl_pass(ret);
1059
0
        if (rule->cond->pol == ACL_COND_UNLESS)
1060
0
          ret = !ret;
1061
0
      }
1062
1063
0
      if (ret) {
1064
        /* If the backend name is dynamic, try to resolve the name.
1065
         * If we can't resolve the name, or if any error occurs, break
1066
         * the loop and fallback to the default backend.
1067
         */
1068
0
        struct proxy *backend = NULL;
1069
1070
0
        if (rule->dynamic) {
1071
0
          struct buffer *tmp;
1072
1073
0
          tmp = alloc_trash_chunk();
1074
0
          if (!tmp)
1075
0
            goto sw_failed;
1076
1077
0
          if (build_logline(s, tmp->area, tmp->size, &rule->be.expr))
1078
0
            backend = proxy_be_by_name(tmp->area);
1079
1080
0
          free_trash_chunk(tmp);
1081
0
          tmp = NULL;
1082
1083
0
          if (!backend)
1084
0
            break;
1085
0
        }
1086
0
        else
1087
0
          backend = rule->be.backend;
1088
1089
0
        if (!stream_set_backend(s, backend))
1090
0
          goto sw_failed;
1091
0
        break;
1092
0
      }
1093
0
    }
1094
1095
    /* To ensure correct connection accounting on the backend, we
1096
     * have to assign one if it was not set (eg: a listen). This
1097
     * measure also takes care of correctly setting the default
1098
     * backend if any. Don't do anything if an upgrade is already in
1099
     * progress.
1100
     */
1101
0
    if (!(s->flags & (SF_BE_ASSIGNED|SF_IGNORE)))
1102
0
      if (!stream_set_backend(s, fe->defbe.be ? fe->defbe.be : s->be))
1103
0
        goto sw_failed;
1104
1105
    /* No backend assigned but no error reported. It happens when a
1106
     * TCP stream is upgraded to HTTP/2.
1107
     */
1108
0
    if ((s->flags & (SF_BE_ASSIGNED|SF_IGNORE)) == SF_IGNORE) {
1109
0
      DBG_TRACE_DEVEL("leaving with no backend because of a destructive upgrade", STRM_EV_STRM_ANA, s);
1110
0
      return 0;
1111
0
    }
1112
1113
0
  }
1114
1115
  /* we don't want to run the TCP or HTTP filters again if the backend has not changed */
1116
0
  if (fe == s->be) {
1117
0
    s->req.analysers &= ~AN_REQ_INSPECT_BE;
1118
0
    s->req.analysers &= ~AN_REQ_HTTP_PROCESS_BE;
1119
0
    s->req.analysers &= ~AN_REQ_FLT_START_BE;
1120
0
  }
1121
1122
  /* as soon as we know the backend, we must check if we have a matching forced or ignored
1123
   * persistence rule, and report that in the stream.
1124
   */
1125
0
  list_for_each_entry(prst_rule, &s->be->persist_rules, list) {
1126
0
    int ret = 1;
1127
1128
0
    if (prst_rule->cond) {
1129
0
                  ret = acl_exec_cond(prst_rule->cond, s->be, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL);
1130
0
      ret = acl_pass(ret);
1131
0
      if (prst_rule->cond->pol == ACL_COND_UNLESS)
1132
0
        ret = !ret;
1133
0
    }
1134
1135
0
    if (ret) {
1136
      /* no rule, or the rule matches */
1137
0
      if (prst_rule->type == PERSIST_TYPE_FORCE) {
1138
0
        s->flags |= SF_FORCE_PRST;
1139
0
      } else {
1140
0
        s->flags |= SF_IGNORE_PRST;
1141
0
      }
1142
0
      break;
1143
0
    }
1144
0
  }
1145
1146
0
  DBG_TRACE_LEAVE(STRM_EV_STRM_ANA, s);
1147
0
  return 1;
1148
1149
0
 sw_failed:
1150
  /* immediately abort this request in case of allocation failure */
1151
0
  stream_abort(s);
1152
1153
0
  if (!(s->flags & SF_ERR_MASK))
1154
0
    s->flags |= SF_ERR_RESOURCE;
1155
0
  if (!(s->flags & SF_FINST_MASK))
1156
0
    s->flags |= SF_FINST_R;
1157
1158
0
  if (s->txn)
1159
0
    s->txn->status = 500;
1160
0
  s->req.analysers &= AN_REQ_FLT_END;
1161
0
  s->req.analyse_exp = TICK_ETERNITY;
1162
0
  DBG_TRACE_DEVEL("leaving on error", STRM_EV_STRM_ANA|STRM_EV_STRM_ERR, s);
1163
0
  return 0;
1164
0
}
1165
1166
/* This stream analyser works on a request. It applies all use-server rules on
1167
 * it then returns 1. The data must already be present in the buffer otherwise
1168
 * they won't match. It always returns 1.
1169
 */
1170
static int process_server_rules(struct stream *s, struct channel *req, int an_bit)
1171
0
{
1172
0
  struct proxy *px = s->be;
1173
0
  struct session *sess = s->sess;
1174
0
  struct server_rule *rule;
1175
1176
0
  DBG_TRACE_ENTER(STRM_EV_STRM_ANA, s);
1177
1178
0
  if (!(s->flags & SF_ASSIGNED)) {
1179
0
    list_for_each_entry(rule, &px->server_rules, list) {
1180
0
      int ret;
1181
1182
0
      ret = acl_exec_cond(rule->cond, s->be, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL);
1183
0
      ret = acl_pass(ret);
1184
0
      if (rule->cond->pol == ACL_COND_UNLESS)
1185
0
        ret = !ret;
1186
1187
0
      if (ret) {
1188
0
        struct server *srv;
1189
1190
0
        if (rule->dynamic) {
1191
0
          struct buffer *tmp = get_trash_chunk();
1192
1193
0
          if (!build_logline(s, tmp->area, tmp->size, &rule->expr))
1194
0
            break;
1195
1196
0
          srv = findserver(s->be, tmp->area);
1197
0
          if (!srv)
1198
0
            break;
1199
0
        }
1200
0
        else
1201
0
          srv = rule->srv.ptr;
1202
1203
0
        if ((srv->cur_state != SRV_ST_STOPPED) ||
1204
0
            (px->options & PR_O_PERSIST) ||
1205
0
            (s->flags & SF_FORCE_PRST)) {
1206
0
          s->flags |= SF_DIRECT | SF_ASSIGNED;
1207
0
          s->target = &srv->obj_type;
1208
0
          break;
1209
0
        }
1210
        /* if the server is not UP, let's go on with next rules
1211
         * just in case another one is suited.
1212
         */
1213
0
      }
1214
0
    }
1215
0
  }
1216
1217
0
  req->analysers &= ~an_bit;
1218
0
  req->analyse_exp = TICK_ETERNITY;
1219
0
  DBG_TRACE_LEAVE(STRM_EV_STRM_ANA, s);
1220
0
  return 1;
1221
0
}
1222
1223
static inline void sticking_rule_find_target(struct stream *s,
1224
                                             struct stktable *t, struct stksess *ts)
1225
0
{
1226
0
  struct proxy *px = s->be;
1227
0
  struct eb32_node *node;
1228
0
  struct dict_entry *de;
1229
0
  void *ptr;
1230
0
  struct server *srv;
1231
1232
  /* Look for the server name previously stored in <t> stick-table */
1233
0
  HA_RWLOCK_RDLOCK(STK_SESS_LOCK, &ts->lock);
1234
0
  ptr = __stktable_data_ptr(t, ts, STKTABLE_DT_SERVER_KEY);
1235
0
  de = stktable_data_cast(ptr, std_t_dict);
1236
0
  HA_RWLOCK_RDUNLOCK(STK_SESS_LOCK, &ts->lock);
1237
1238
0
  if (de) {
1239
0
    struct ebpt_node *node;
1240
1241
0
    if (t->server_key_type == STKTABLE_SRV_NAME) {
1242
0
      node = ebis_lookup(&px->conf.used_server_name, de->value.key);
1243
0
      if (node) {
1244
0
        srv = container_of(node, struct server, conf.name);
1245
0
        goto found;
1246
0
      }
1247
0
    } else if (t->server_key_type == STKTABLE_SRV_ADDR) {
1248
0
      HA_RWLOCK_RDLOCK(PROXY_LOCK, &px->lock);
1249
0
      node = ebis_lookup(&px->used_server_addr, de->value.key);
1250
0
      HA_RWLOCK_RDUNLOCK(PROXY_LOCK, &px->lock);
1251
0
      if (node) {
1252
0
        srv = container_of(node, struct server, addr_node);
1253
0
        goto found;
1254
0
      }
1255
0
    }
1256
0
  }
1257
1258
  /* Look for the server ID */
1259
0
  HA_RWLOCK_RDLOCK(STK_SESS_LOCK, &ts->lock);
1260
0
  ptr = __stktable_data_ptr(t, ts, STKTABLE_DT_SERVER_ID);
1261
0
  node = eb32_lookup(&px->conf.used_server_id, stktable_data_cast(ptr, std_t_sint));
1262
0
  HA_RWLOCK_RDUNLOCK(STK_SESS_LOCK, &ts->lock);
1263
1264
0
  if (!node)
1265
0
    return;
1266
1267
0
  srv = container_of(node, struct server, conf.id);
1268
0
 found:
1269
0
  if ((srv->cur_state != SRV_ST_STOPPED) ||
1270
0
      (px->options & PR_O_PERSIST) || (s->flags & SF_FORCE_PRST)) {
1271
0
    s->flags |= SF_DIRECT | SF_ASSIGNED;
1272
0
    s->target = &srv->obj_type;
1273
0
  }
1274
0
}
1275
1276
/* This stream analyser works on a request. It applies all sticking rules on
1277
 * it then returns 1. The data must already be present in the buffer otherwise
1278
 * they won't match. It always returns 1.
1279
 */
1280
static int process_sticking_rules(struct stream *s, struct channel *req, int an_bit)
1281
0
{
1282
0
  struct proxy    *px   = s->be;
1283
0
  struct session *sess  = s->sess;
1284
0
  struct sticking_rule  *rule;
1285
1286
0
  DBG_TRACE_ENTER(STRM_EV_STRM_ANA, s);
1287
1288
0
  list_for_each_entry(rule, &px->sticking_rules, list) {
1289
0
    int ret = 1 ;
1290
0
    int i;
1291
1292
    /* Only the first stick store-request of each table is applied
1293
     * and other ones are ignored. The purpose is to allow complex
1294
     * configurations which look for multiple entries by decreasing
1295
     * order of precision and to stop at the first which matches.
1296
     * An example could be a store of the IP address from an HTTP
1297
     * header first, then from the source if not found.
1298
     */
1299
0
    if (rule->flags & STK_IS_STORE) {
1300
0
      for (i = 0; i < s->store_count; i++) {
1301
0
        if (rule->table.t == s->store[i].table)
1302
0
          break;
1303
0
      }
1304
1305
0
      if (i !=  s->store_count)
1306
0
        continue;
1307
0
    }
1308
1309
0
    if (rule->cond) {
1310
0
                  ret = acl_exec_cond(rule->cond, px, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL);
1311
0
      ret = acl_pass(ret);
1312
0
      if (rule->cond->pol == ACL_COND_UNLESS)
1313
0
        ret = !ret;
1314
0
    }
1315
1316
0
    if (ret) {
1317
0
      struct stktable_key *key;
1318
1319
0
      key = stktable_fetch_key(rule->table.t, px, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL, rule->expr, NULL);
1320
0
      if (!key)
1321
0
        continue;
1322
1323
0
      if (rule->flags & STK_IS_MATCH) {
1324
0
        struct stksess *ts;
1325
1326
0
        if ((ts = stktable_lookup_key(rule->table.t, key)) != NULL) {
1327
0
          if (!(s->flags & SF_ASSIGNED))
1328
0
            sticking_rule_find_target(s, rule->table.t, ts);
1329
0
          stktable_touch_local(rule->table.t, ts, 1);
1330
0
        }
1331
0
      }
1332
0
      if (rule->flags & STK_IS_STORE) {
1333
0
        if (s->store_count < (sizeof(s->store) / sizeof(s->store[0]))) {
1334
0
          struct stksess *ts;
1335
1336
0
          ts = stksess_new(rule->table.t, key);
1337
0
          if (ts) {
1338
0
            s->store[s->store_count].table = rule->table.t;
1339
0
            s->store[s->store_count++].ts = ts;
1340
0
          }
1341
0
        }
1342
0
      }
1343
0
    }
1344
0
  }
1345
1346
0
  req->analysers &= ~an_bit;
1347
0
  req->analyse_exp = TICK_ETERNITY;
1348
0
  DBG_TRACE_LEAVE(STRM_EV_STRM_ANA, s);
1349
0
  return 1;
1350
0
}
1351
1352
/* This stream analyser works on a response. It applies all store rules on it
1353
 * then returns 1. The data must already be present in the buffer otherwise
1354
 * they won't match. It always returns 1.
1355
 */
1356
static int process_store_rules(struct stream *s, struct channel *rep, int an_bit)
1357
0
{
1358
0
  struct proxy    *px   = s->be;
1359
0
  struct session *sess  = s->sess;
1360
0
  struct sticking_rule  *rule;
1361
0
  int i;
1362
0
  int nbreq = s->store_count;
1363
1364
0
  DBG_TRACE_ENTER(STRM_EV_STRM_ANA, s);
1365
1366
0
  list_for_each_entry(rule, &px->storersp_rules, list) {
1367
0
    int ret = 1 ;
1368
1369
    /* Only the first stick store-response of each table is applied
1370
     * and other ones are ignored. The purpose is to allow complex
1371
     * configurations which look for multiple entries by decreasing
1372
     * order of precision and to stop at the first which matches.
1373
     * An example could be a store of a set-cookie value, with a
1374
     * fallback to a parameter found in a 302 redirect.
1375
     *
1376
     * The store-response rules are not allowed to override the
1377
     * store-request rules for the same table, but they may coexist.
1378
     * Thus we can have up to one store-request entry and one store-
1379
     * response entry for the same table at any time.
1380
     */
1381
0
    for (i = nbreq; i < s->store_count; i++) {
1382
0
      if (rule->table.t == s->store[i].table)
1383
0
        break;
1384
0
    }
1385
1386
    /* skip existing entries for this table */
1387
0
    if (i < s->store_count)
1388
0
      continue;
1389
1390
0
    if (rule->cond) {
1391
0
                  ret = acl_exec_cond(rule->cond, px, sess, s, SMP_OPT_DIR_RES|SMP_OPT_FINAL);
1392
0
                  ret = acl_pass(ret);
1393
0
      if (rule->cond->pol == ACL_COND_UNLESS)
1394
0
        ret = !ret;
1395
0
    }
1396
1397
0
    if (ret) {
1398
0
      struct stktable_key *key;
1399
1400
0
      key = stktable_fetch_key(rule->table.t, px, sess, s, SMP_OPT_DIR_RES|SMP_OPT_FINAL, rule->expr, NULL);
1401
0
      if (!key)
1402
0
        continue;
1403
1404
0
      if (s->store_count < (sizeof(s->store) / sizeof(s->store[0]))) {
1405
0
        struct stksess *ts;
1406
1407
0
        ts = stksess_new(rule->table.t, key);
1408
0
        if (ts) {
1409
0
          s->store[s->store_count].table = rule->table.t;
1410
0
          s->store[s->store_count++].ts = ts;
1411
0
        }
1412
0
      }
1413
0
    }
1414
0
  }
1415
1416
  /* process store request and store response */
1417
0
  for (i = 0; i < s->store_count; i++) {
1418
0
    struct stksess *ts;
1419
0
    void *ptr;
1420
0
    char *key;
1421
0
    struct dict_entry *de;
1422
0
    struct stktable *t = s->store[i].table;
1423
1424
0
    if (!objt_server(s->target) || (__objt_server(s->target)->flags & SRV_F_NON_STICK)) {
1425
0
      stksess_free(s->store[i].table, s->store[i].ts);
1426
0
      s->store[i].ts = NULL;
1427
0
      continue;
1428
0
    }
1429
1430
0
    ts = stktable_set_entry(t, s->store[i].ts);
1431
0
    if (ts != s->store[i].ts) {
1432
      /* the entry already existed, we can free ours */
1433
0
      stksess_free(t, s->store[i].ts);
1434
0
    }
1435
0
    s->store[i].ts = NULL;
1436
1437
0
    if (t->server_key_type == STKTABLE_SRV_NAME)
1438
0
      key = __objt_server(s->target)->id;
1439
0
    else if (t->server_key_type == STKTABLE_SRV_ADDR)
1440
0
      key = __objt_server(s->target)->addr_node.key;
1441
0
    else
1442
0
      key = NULL;
1443
1444
0
    HA_RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
1445
0
    ptr = __stktable_data_ptr(t, ts, STKTABLE_DT_SERVER_ID);
1446
0
    stktable_data_cast(ptr, std_t_sint) = __objt_server(s->target)->puid;
1447
1448
0
    if (key) {
1449
0
      de = dict_insert(&server_key_dict, key);
1450
0
      if (de) {
1451
0
        ptr = __stktable_data_ptr(t, ts, STKTABLE_DT_SERVER_KEY);
1452
0
        stktable_data_cast(ptr, std_t_dict) = de;
1453
0
      }
1454
0
    }
1455
1456
0
    HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
1457
1458
0
    stktable_touch_local(t, ts, 1);
1459
0
  }
1460
0
  s->store_count = 0; /* everything is stored */
1461
1462
0
  rep->analysers &= ~an_bit;
1463
0
  rep->analyse_exp = TICK_ETERNITY;
1464
1465
0
  DBG_TRACE_LEAVE(STRM_EV_STRM_ANA, s);
1466
0
  return 1;
1467
0
}
1468
1469
/* Set the stream to HTTP mode, if necessary. The minimal request HTTP analysers
1470
 * are set and the client mux is upgraded. It returns 1 if the stream processing
1471
 * may continue or 0 if it should be stopped. It happens on error or if the
1472
 * upgrade required a new stream. The mux protocol may be specified.
1473
 */
1474
int stream_set_http_mode(struct stream *s, const struct mux_proto_list *mux_proto)
1475
0
{
1476
0
  struct stconn *sc = s->scf;
1477
0
  struct connection  *conn;
1478
1479
  /* Already an HTTP stream */
1480
0
  if (IS_HTX_STRM(s))
1481
0
    return 1;
1482
1483
0
  s->req.analysers |= AN_REQ_WAIT_HTTP|AN_REQ_HTTP_PROCESS_FE;
1484
1485
0
  if (unlikely(!s->txn && !http_create_txn(s)))
1486
0
    return 0;
1487
1488
0
  conn = sc_conn(sc);
1489
0
  if (conn) {
1490
0
    se_have_more_data(s->scf->sedesc);
1491
    /* Make sure we're unsubscribed, the the new
1492
     * mux will probably want to subscribe to
1493
     * the underlying XPRT
1494
     */
1495
0
    if (s->scf->wait_event.events)
1496
0
      conn->mux->unsubscribe(sc, s->scf->wait_event.events, &(s->scf->wait_event));
1497
1498
0
    if (conn->mux->flags & MX_FL_NO_UPG)
1499
0
      return 0;
1500
1501
0
    sc_conn_prepare_endp_upgrade(sc);
1502
0
    if (conn_upgrade_mux_fe(conn, sc, &s->req.buf,
1503
0
          (mux_proto ? mux_proto->token : ist("")),
1504
0
          PROTO_MODE_HTTP)  == -1) {
1505
0
      sc_conn_abort_endp_upgrade(sc);
1506
0
      return 0;
1507
0
    }
1508
0
    sc_conn_commit_endp_upgrade(sc);
1509
1510
0
    s->req.flags &= ~(CF_READ_EVENT|CF_AUTO_CONNECT);
1511
0
    s->req.total = 0;
1512
0
    s->flags |= SF_IGNORE;
1513
0
    if (sc_ep_test(sc, SE_FL_DETACHED)) {
1514
      /* If stream connector is detached, it means it was not
1515
       * reused by the new mux. Son destroy it, disable
1516
       * logging, and abort the stream process. Thus the
1517
       * stream will be silently destroyed. The new mux will
1518
       * create new streams.
1519
       */
1520
0
      s->logs.logwait = 0;
1521
0
      s->logs.level = 0;
1522
0
      stream_abort(s);
1523
0
      s->req.analysers &= AN_REQ_FLT_END;
1524
0
      s->req.analyse_exp = TICK_ETERNITY;
1525
0
    }
1526
0
  }
1527
1528
0
  return 1;
1529
0
}
1530
1531
1532
/* Updates at once the channel flags, and timers of both stream connectors of a
1533
 * same stream, to complete the work after the analysers, then updates the data
1534
 * layer below. This will ensure that any synchronous update performed at the
1535
 * data layer will be reflected in the channel flags and/or stream connector.
1536
 * Note that this does not change the stream connector's current state, though
1537
 * it updates the previous state to the current one.
1538
 */
1539
static void stream_update_both_sc(struct stream *s)
1540
0
{
1541
0
  struct stconn *scf = s->scf;
1542
0
  struct stconn *scb = s->scb;
1543
0
  struct channel *req = &s->req;
1544
0
  struct channel *res = &s->res;
1545
1546
0
  req->flags &= ~(CF_READ_EVENT|CF_WRITE_EVENT);
1547
0
  res->flags &= ~(CF_READ_EVENT|CF_WRITE_EVENT);
1548
1549
0
  s->prev_conn_state = scb->state;
1550
1551
  /* let's recompute both sides states */
1552
0
  if (sc_state_in(scf->state, SC_SB_RDY|SC_SB_EST))
1553
0
    sc_update(scf);
1554
1555
0
  if (sc_state_in(scb->state, SC_SB_RDY|SC_SB_EST))
1556
0
    sc_update(scb);
1557
1558
  /* stream connectors are processed outside of process_stream() and must be
1559
   * handled at the latest moment.
1560
   */
1561
0
  if (sc_appctx(scf)) {
1562
0
    if (sc_is_recv_allowed(scf) || sc_is_send_allowed(scf))
1563
0
      appctx_wakeup(__sc_appctx(scf));
1564
0
  }
1565
0
  if (sc_appctx(scb)) {
1566
0
    if (sc_is_recv_allowed(scb) || sc_is_send_allowed(scb))
1567
0
      appctx_wakeup(__sc_appctx(scb));
1568
0
  }
1569
0
}
1570
1571
/* check SC and channel timeouts, and close the corresponding stream connectors
1572
 * for future reads or writes.
1573
 * Note: this will also concern upper layers but we do not touch any other
1574
 * flag. We must be careful and correctly detect state changes when calling
1575
 * them.
1576
 */
1577
static void stream_handle_timeouts(struct stream *s)
1578
0
{
1579
0
  stream_check_conn_timeout(s);
1580
1581
0
  sc_check_timeouts(s->scf);
1582
0
  channel_check_timeout(&s->req);
1583
0
  sc_check_timeouts(s->scb);
1584
0
  channel_check_timeout(&s->res);
1585
1586
0
  if (unlikely(!(s->scb->flags & SC_FL_SHUT_DONE) && (s->req.flags & CF_WRITE_TIMEOUT))) {
1587
0
    s->scb->flags |= SC_FL_NOLINGER;
1588
0
    sc_shutdown(s->scb);
1589
0
  }
1590
1591
0
  if (unlikely(!(s->scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && (s->req.flags & CF_READ_TIMEOUT))) {
1592
0
    if (s->scf->flags & SC_FL_NOHALF)
1593
0
      s->scf->flags |= SC_FL_NOLINGER;
1594
0
    sc_abort(s->scf);
1595
0
  }
1596
0
  if (unlikely(!(s->scf->flags & SC_FL_SHUT_DONE) && (s->res.flags & CF_WRITE_TIMEOUT))) {
1597
0
    s->scf->flags |= SC_FL_NOLINGER;
1598
0
    sc_shutdown(s->scf);
1599
0
  }
1600
1601
0
  if (unlikely(!(s->scb->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && (s->res.flags & CF_READ_TIMEOUT))) {
1602
0
    if (s->scb->flags & SC_FL_NOHALF)
1603
0
      s->scb->flags |= SC_FL_NOLINGER;
1604
0
    sc_abort(s->scb);
1605
0
  }
1606
1607
0
  if (HAS_FILTERS(s))
1608
0
    flt_stream_check_timeouts(s);
1609
0
}
1610
1611
/* if the current task's wake_date was set, it's being profiled, thus we may
1612
 * report latencies and CPU usages in logs, so it's desirable to update the
1613
 * latency when entering process_stream().
1614
 */
1615
static void stream_cond_update_cpu_latency(struct stream *s)
1616
0
{
1617
0
  uint32_t lat = th_ctx->sched_call_date - th_ctx->sched_wake_date;
1618
1619
0
  s->lat_time += lat;
1620
0
}
1621
1622
/* if the current task's wake_date was set, it's being profiled, thus we may
1623
 * report latencies and CPU usages in logs, so it's desirable to do that before
1624
 * logging in order to report accurate CPU usage. In this case we count that
1625
 * final part and reset the wake date so that the scheduler doesn't do it a
1626
 * second time, and by doing so we also avoid an extra call to clock_gettime().
1627
 * The CPU usage will be off by the little time needed to run over stream_free()
1628
 * but that's only marginal.
1629
 */
1630
static void stream_cond_update_cpu_usage(struct stream *s)
1631
0
{
1632
0
  uint32_t cpu;
1633
1634
  /* stats are only registered for non-zero wake dates */
1635
0
  if (likely(!th_ctx->sched_wake_date))
1636
0
    return;
1637
1638
0
  cpu = (uint32_t)now_mono_time() - th_ctx->sched_call_date;
1639
0
  s->cpu_time += cpu;
1640
0
  HA_ATOMIC_ADD(&th_ctx->sched_profile_entry->cpu_time, cpu);
1641
0
  th_ctx->sched_wake_date = 0;
1642
0
}
1643
1644
/* this functions is called directly by the scheduler for tasks whose
1645
 * ->process points to process_stream(), and is used to keep latencies
1646
 * and CPU usage measurements accurate.
1647
 */
1648
void stream_update_timings(struct task *t, uint64_t lat, uint64_t cpu)
1649
0
{
1650
0
  struct stream *s = t->context;
1651
0
  s->lat_time += lat;
1652
0
  s->cpu_time += cpu;
1653
0
}
1654
1655
1656
/* This macro is very specific to the function below. See the comments in
1657
 * process_stream() below to understand the logic and the tests.
1658
 */
1659
0
#define UPDATE_ANALYSERS(real, list, back, flag) {     \
1660
0
    list = (((list) & ~(flag)) | ~(back)) & (real);   \
1661
0
    back = real;            \
1662
0
    if (!(list))           \
1663
0
      break;           \
1664
0
    if (((list) ^ ((list) & ((list) - 1))) < (flag)) \
1665
0
      continue;         \
1666
0
}
1667
1668
/* These 2 following macros call an analayzer for the specified channel if the
1669
 * right flag is set. The first one is used for "filterable" analyzers. If a
1670
 * stream has some registered filters, pre and post analyaze callbacks are
1671
 * called. The second are used for other analyzers (AN_REQ/RES_FLT_* and
1672
 * AN_REQ/RES_HTTP_XFER_BODY) */
1673
#define FLT_ANALYZE(strm, chn, fun, list, back, flag, ...)      \
1674
0
  {                 \
1675
0
    if ((list) & (flag)) {           \
1676
0
      if (HAS_FILTERS(strm)) {             \
1677
0
        if (!flt_pre_analyze((strm), (chn), (flag)))    \
1678
0
          break;               \
1679
0
        if (!fun((strm), (chn), (flag), ##__VA_ARGS__)) \
1680
0
          break;         \
1681
0
        if (!flt_post_analyze((strm), (chn), (flag))) \
1682
0
          break;         \
1683
0
      }             \
1684
0
      else {             \
1685
0
        if (!fun((strm), (chn), (flag), ##__VA_ARGS__)) \
1686
0
          break;         \
1687
0
      }             \
1688
0
      UPDATE_ANALYSERS((chn)->analysers, (list),    \
1689
0
           (back), (flag));     \
1690
0
    }                \
1691
0
  }
1692
1693
#define ANALYZE(strm, chn, fun, list, back, flag, ...)      \
1694
0
  {               \
1695
0
    if ((list) & (flag)) {         \
1696
0
      if (!fun((strm), (chn), (flag), ##__VA_ARGS__)) \
1697
0
        break;         \
1698
0
      UPDATE_ANALYSERS((chn)->analysers, (list),  \
1699
0
           (back), (flag));   \
1700
0
    }              \
1701
0
  }
1702
1703
/* Processes the client, server, request and response jobs of a stream task,
1704
 * then puts it back to the wait queue in a clean state, or cleans up its
1705
 * resources if it must be deleted. Returns in <next> the date the task wants
1706
 * to be woken up, or TICK_ETERNITY. In order not to call all functions for
1707
 * nothing too many times, the request and response buffers flags are monitored
1708
 * and each function is called only if at least another function has changed at
1709
 * least one flag it is interested in.
1710
 */
1711
struct task *process_stream(struct task *t, void *context, unsigned int state)
1712
0
{
1713
0
  struct server *srv;
1714
0
  struct stream *s = context;
1715
0
  struct session *sess = s->sess;
1716
0
  unsigned int scf_flags, scb_flags;
1717
0
  unsigned int rqf_last, rpf_last;
1718
0
  unsigned int rq_prod_last, rq_cons_last;
1719
0
  unsigned int rp_cons_last, rp_prod_last;
1720
0
  unsigned int req_ana_back, res_ana_back;
1721
0
  struct channel *req, *res;
1722
0
  struct stconn *scf, *scb;
1723
0
  unsigned int rate;
1724
1725
0
  DBG_TRACE_ENTER(STRM_EV_STRM_PROC, s);
1726
1727
0
  activity[tid].stream_calls++;
1728
0
  stream_cond_update_cpu_latency(s);
1729
1730
0
  req = &s->req;
1731
0
  res = &s->res;
1732
1733
0
  scf = s->scf;
1734
0
  scb = s->scb;
1735
1736
  /* First, attempt to receive pending data from I/O layers */
1737
0
  sc_conn_sync_recv(scf);
1738
0
  sc_conn_sync_recv(scb);
1739
1740
  /* Let's check if we're looping without making any progress, e.g. due
1741
   * to a bogus analyser or the fact that we're ignoring a read0. The
1742
   * call_rate counter only counts calls with no progress made.
1743
   */
1744
0
  if (!((req->flags | res->flags) & (CF_READ_EVENT|CF_WRITE_EVENT))) {
1745
0
    rate = update_freq_ctr(&s->call_rate, 1);
1746
0
    if (rate >= 100000 && s->call_rate.prev_ctr) // make sure to wait at least a full second
1747
0
      stream_dump_and_crash(&s->obj_type, read_freq_ctr(&s->call_rate));
1748
0
  }
1749
1750
  /* this data may be no longer valid, clear it */
1751
0
  if (s->txn)
1752
0
    memset(&s->txn->auth, 0, sizeof(s->txn->auth));
1753
1754
  /* This flag must explicitly be set every time */
1755
0
  req->flags &= ~CF_WAKE_WRITE;
1756
0
  res->flags &= ~CF_WAKE_WRITE;
1757
1758
  /* Keep a copy of req/rep flags so that we can detect shutdowns */
1759
0
  rqf_last = req->flags & ~CF_MASK_ANALYSER;
1760
0
  rpf_last = res->flags & ~CF_MASK_ANALYSER;
1761
1762
  /* we don't want the stream connector functions to recursively wake us up */
1763
0
  scf->flags |= SC_FL_DONT_WAKE;
1764
0
  scb->flags |= SC_FL_DONT_WAKE;
1765
1766
  /* Keep a copy of SC flags */
1767
0
  scf_flags = scf->flags;
1768
0
  scb_flags = scb->flags;
1769
1770
  /* update pending events */
1771
0
  s->pending_events |= (state & TASK_WOKEN_ANY);
1772
1773
  /* 1a: Check for low level timeouts if needed. We just set a flag on
1774
   * stream connectors when their timeouts have expired.
1775
   */
1776
0
  if (unlikely(s->pending_events & TASK_WOKEN_TIMER)) {
1777
0
    stream_handle_timeouts(s);
1778
1779
    /* Once in a while we're woken up because the task expires. But
1780
     * this does not necessarily mean that a timeout has been reached.
1781
     * So let's not run a whole stream processing if only an expiration
1782
     * timeout needs to be refreshed.
1783
     */
1784
0
    if (!((scf->flags | scb->flags) & (SC_FL_ERROR|SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_SHUT_DONE)) &&
1785
0
        !((req->flags | res->flags) & (CF_READ_EVENT|CF_READ_TIMEOUT|CF_WRITE_EVENT|CF_WRITE_TIMEOUT)) &&
1786
0
        !(s->flags & SF_CONN_EXP) &&
1787
0
        ((s->pending_events & TASK_WOKEN_ANY) == TASK_WOKEN_TIMER)) {
1788
0
      scf->flags &= ~SC_FL_DONT_WAKE;
1789
0
      scb->flags &= ~SC_FL_DONT_WAKE;
1790
0
      goto update_exp_and_leave;
1791
0
    }
1792
0
  }
1793
1794
0
 resync_stconns:
1795
  /* below we may emit error messages so we have to ensure that we have
1796
   * our buffers properly allocated. If the allocation failed, an error is
1797
   * triggered.
1798
   *
1799
   * NOTE: An error is returned because the mechanism to queue entities
1800
   *       waiting for a buffer is totally broken for now. However, this
1801
   *       part must be refactored. When it will be handled, this part
1802
   *       must be be reviewed too.
1803
   */
1804
0
  if (!stream_alloc_work_buffer(s)) {
1805
0
    scf->flags |= SC_FL_ERROR;
1806
0
    s->conn_err_type = STRM_ET_CONN_RES;
1807
1808
0
    scb->flags |= SC_FL_ERROR;
1809
0
    s->conn_err_type = STRM_ET_CONN_RES;
1810
1811
0
    if (!(s->flags & SF_ERR_MASK))
1812
0
      s->flags |= SF_ERR_RESOURCE;
1813
0
    sess_set_term_flags(s);
1814
0
  }
1815
1816
  /* 1b: check for low-level errors reported at the stream connector.
1817
   * First we check if it's a retryable error (in which case we don't
1818
   * want to tell the buffer). Otherwise we report the error one level
1819
   * upper by setting flags into the buffers. Note that the side towards
1820
   * the client cannot have connect (hence retryable) errors. Also, the
1821
   * connection setup code must be able to deal with any type of abort.
1822
   */
1823
0
  srv = objt_server(s->target);
1824
0
  if (unlikely(scf->flags & SC_FL_ERROR)) {
1825
0
    if (sc_state_in(scf->state, SC_SB_EST|SC_SB_DIS)) {
1826
0
      sc_abort(scf);
1827
0
      sc_shutdown(scf);
1828
      //sc_report_error(scf); TODO: Be sure it is useless
1829
0
      if (!(req->analysers) && !(res->analysers)) {
1830
0
        _HA_ATOMIC_INC(&s->be->be_counters.cli_aborts);
1831
0
        _HA_ATOMIC_INC(&sess->fe->fe_counters.cli_aborts);
1832
0
        if (sess->listener && sess->listener->counters)
1833
0
          _HA_ATOMIC_INC(&sess->listener->counters->cli_aborts);
1834
0
        if (srv)
1835
0
          _HA_ATOMIC_INC(&srv->counters.cli_aborts);
1836
0
        if (!(s->flags & SF_ERR_MASK))
1837
0
          s->flags |= SF_ERR_CLICL;
1838
0
        if (!(s->flags & SF_FINST_MASK))
1839
0
          s->flags |= SF_FINST_D;
1840
0
      }
1841
0
    }
1842
0
  }
1843
1844
0
  if (unlikely(scb->flags & SC_FL_ERROR)) {
1845
0
    if (sc_state_in(scb->state, SC_SB_EST|SC_SB_DIS)) {
1846
0
      sc_abort(scb);
1847
0
      sc_shutdown(scb);
1848
      //sc_report_error(scb); TODO: Be sure it is useless
1849
0
      _HA_ATOMIC_INC(&s->be->be_counters.failed_resp);
1850
0
      if (srv)
1851
0
        _HA_ATOMIC_INC(&srv->counters.failed_resp);
1852
0
      if (!(req->analysers) && !(res->analysers)) {
1853
0
        _HA_ATOMIC_INC(&s->be->be_counters.srv_aborts);
1854
0
        _HA_ATOMIC_INC(&sess->fe->fe_counters.srv_aborts);
1855
0
        if (sess->listener && sess->listener->counters)
1856
0
          _HA_ATOMIC_INC(&sess->listener->counters->srv_aborts);
1857
0
        if (srv)
1858
0
          _HA_ATOMIC_INC(&srv->counters.srv_aborts);
1859
0
        if (!(s->flags & SF_ERR_MASK))
1860
0
          s->flags |= SF_ERR_SRVCL;
1861
0
        if (!(s->flags & SF_FINST_MASK))
1862
0
          s->flags |= SF_FINST_D;
1863
0
      }
1864
0
    }
1865
    /* note: maybe we should process connection errors here ? */
1866
0
  }
1867
1868
0
  if (sc_state_in(scb->state, SC_SB_CON|SC_SB_RDY)) {
1869
    /* we were trying to establish a connection on the server side,
1870
     * maybe it succeeded, maybe it failed, maybe we timed out, ...
1871
     */
1872
0
    if (scb->state == SC_ST_RDY)
1873
0
      back_handle_st_rdy(s);
1874
0
    else if (s->scb->state == SC_ST_CON)
1875
0
      back_handle_st_con(s);
1876
1877
0
    if (scb->state == SC_ST_CER)
1878
0
      back_handle_st_cer(s);
1879
0
    else if (scb->state == SC_ST_EST)
1880
0
      back_establish(s);
1881
1882
    /* state is now one of SC_ST_CON (still in progress), SC_ST_EST
1883
     * (established), SC_ST_DIS (abort), SC_ST_CLO (last error),
1884
     * SC_ST_ASS/SC_ST_TAR/SC_ST_REQ for retryable errors.
1885
     */
1886
0
  }
1887
1888
0
  rq_prod_last = scf->state;
1889
0
  rq_cons_last = scb->state;
1890
0
  rp_cons_last = scf->state;
1891
0
  rp_prod_last = scb->state;
1892
1893
  /* Check for connection closure */
1894
0
  DBG_TRACE_POINT(STRM_EV_STRM_PROC, s);
1895
1896
  /* nothing special to be done on client side */
1897
0
  if (unlikely(scf->state == SC_ST_DIS)) {
1898
0
    scf->state = SC_ST_CLO;
1899
1900
    /* This is needed only when debugging is enabled, to indicate
1901
     * client-side close.
1902
     */
1903
0
    if (unlikely((global.mode & MODE_DEBUG) &&
1904
0
           (!(global.mode & MODE_QUIET) ||
1905
0
            (global.mode & MODE_VERBOSE)))) {
1906
0
      chunk_printf(&trash, "%08x:%s.clicls[%04x:%04x]\n",
1907
0
             s->uniq_id, s->be->id,
1908
0
             (unsigned short)conn_fd(sc_conn(scf)),
1909
0
             (unsigned short)conn_fd(sc_conn(scb)));
1910
0
      DISGUISE(write(1, trash.area, trash.data));
1911
0
    }
1912
0
  }
1913
1914
  /* When a server-side connection is released, we have to count it and
1915
   * check for pending connections on this server.
1916
   */
1917
0
  if (unlikely(scb->state == SC_ST_DIS)) {
1918
0
    scb->state = SC_ST_CLO;
1919
0
    srv = objt_server(s->target);
1920
0
    if (srv) {
1921
0
      if (s->flags & SF_CURR_SESS) {
1922
0
        s->flags &= ~SF_CURR_SESS;
1923
0
        _HA_ATOMIC_DEC(&srv->cur_sess);
1924
0
      }
1925
0
      sess_change_server(s, NULL);
1926
0
      if (may_dequeue_tasks(srv, s->be))
1927
0
        process_srv_queue(srv);
1928
0
    }
1929
1930
    /* This is needed only when debugging is enabled, to indicate
1931
     * server-side close.
1932
     */
1933
0
    if (unlikely((global.mode & MODE_DEBUG) &&
1934
0
           (!(global.mode & MODE_QUIET) ||
1935
0
            (global.mode & MODE_VERBOSE)))) {
1936
0
      if (s->prev_conn_state == SC_ST_EST) {
1937
0
        chunk_printf(&trash, "%08x:%s.srvcls[%04x:%04x]\n",
1938
0
               s->uniq_id, s->be->id,
1939
0
               (unsigned short)conn_fd(sc_conn(scf)),
1940
0
               (unsigned short)conn_fd(sc_conn(scb)));
1941
0
        DISGUISE(write(1, trash.area, trash.data));
1942
0
      }
1943
0
    }
1944
0
  }
1945
1946
  /*
1947
   * Note: of the transient states (REQ, CER, DIS), only REQ may remain
1948
   * at this point.
1949
   */
1950
1951
0
 resync_request:
1952
  /* Analyse request */
1953
0
  if (((req->flags & ~rqf_last) & CF_MASK_ANALYSER) ||
1954
0
      ((scf->flags ^ scf_flags) & (SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_ABRT_WANTED)) ||
1955
0
      ((scb->flags ^ scb_flags) & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) ||
1956
0
      (req->analysers && (scb->flags & SC_FL_SHUT_DONE)) ||
1957
0
      scf->state != rq_prod_last ||
1958
0
      scb->state != rq_cons_last ||
1959
0
      s->pending_events & TASK_WOKEN_MSG) {
1960
0
    unsigned int scf_flags_ana = scf->flags;
1961
0
    unsigned int scb_flags_ana = scb->flags;
1962
1963
0
    if (sc_state_in(scf->state, SC_SB_EST|SC_SB_DIS|SC_SB_CLO)) {
1964
0
      int max_loops = global.tune.maxpollevents;
1965
0
      unsigned int ana_list;
1966
0
      unsigned int ana_back;
1967
1968
      /* it's up to the analysers to stop new connections,
1969
       * disable reading or closing. Note: if an analyser
1970
       * disables any of these bits, it is responsible for
1971
       * enabling them again when it disables itself, so
1972
       * that other analysers are called in similar conditions.
1973
       */
1974
0
      channel_auto_read(req);
1975
0
      channel_auto_connect(req);
1976
0
      channel_auto_close(req);
1977
1978
      /* We will call all analysers for which a bit is set in
1979
       * req->analysers, following the bit order from LSB
1980
       * to MSB. The analysers must remove themselves from
1981
       * the list when not needed. Any analyser may return 0
1982
       * to break out of the loop, either because of missing
1983
       * data to take a decision, or because it decides to
1984
       * kill the stream. We loop at least once through each
1985
       * analyser, and we may loop again if other analysers
1986
       * are added in the middle.
1987
       *
1988
       * We build a list of analysers to run. We evaluate all
1989
       * of these analysers in the order of the lower bit to
1990
       * the higher bit. This ordering is very important.
1991
       * An analyser will often add/remove other analysers,
1992
       * including itself. Any changes to itself have no effect
1993
       * on the loop. If it removes any other analysers, we
1994
       * want those analysers not to be called anymore during
1995
       * this loop. If it adds an analyser that is located
1996
       * after itself, we want it to be scheduled for being
1997
       * processed during the loop. If it adds an analyser
1998
       * which is located before it, we want it to switch to
1999
       * it immediately, even if it has already been called
2000
       * once but removed since.
2001
       *
2002
       * In order to achieve this, we compare the analyser
2003
       * list after the call with a copy of it before the
2004
       * call. The work list is fed with analyser bits that
2005
       * appeared during the call. Then we compare previous
2006
       * work list with the new one, and check the bits that
2007
       * appeared. If the lowest of these bits is lower than
2008
       * the current bit, it means we have enabled a previous
2009
       * analyser and must immediately loop again.
2010
       */
2011
2012
0
      ana_list = ana_back = req->analysers;
2013
0
      while (ana_list && max_loops--) {
2014
        /* Warning! ensure that analysers are always placed in ascending order! */
2015
0
        ANALYZE    (s, req, flt_start_analyze,          ana_list, ana_back, AN_REQ_FLT_START_FE);
2016
0
        FLT_ANALYZE(s, req, tcp_inspect_request,        ana_list, ana_back, AN_REQ_INSPECT_FE);
2017
0
        FLT_ANALYZE(s, req, http_wait_for_request,      ana_list, ana_back, AN_REQ_WAIT_HTTP);
2018
0
        FLT_ANALYZE(s, req, http_wait_for_request_body, ana_list, ana_back, AN_REQ_HTTP_BODY);
2019
0
        FLT_ANALYZE(s, req, http_process_req_common,    ana_list, ana_back, AN_REQ_HTTP_PROCESS_FE, sess->fe);
2020
0
        FLT_ANALYZE(s, req, process_switching_rules,    ana_list, ana_back, AN_REQ_SWITCHING_RULES);
2021
0
        ANALYZE    (s, req, flt_start_analyze,          ana_list, ana_back, AN_REQ_FLT_START_BE);
2022
0
        FLT_ANALYZE(s, req, tcp_inspect_request,        ana_list, ana_back, AN_REQ_INSPECT_BE);
2023
0
        FLT_ANALYZE(s, req, http_process_req_common,    ana_list, ana_back, AN_REQ_HTTP_PROCESS_BE, s->be);
2024
0
        FLT_ANALYZE(s, req, http_process_tarpit,        ana_list, ana_back, AN_REQ_HTTP_TARPIT);
2025
0
        FLT_ANALYZE(s, req, process_server_rules,       ana_list, ana_back, AN_REQ_SRV_RULES);
2026
0
        FLT_ANALYZE(s, req, http_process_request,       ana_list, ana_back, AN_REQ_HTTP_INNER);
2027
0
        FLT_ANALYZE(s, req, tcp_persist_rdp_cookie,     ana_list, ana_back, AN_REQ_PRST_RDP_COOKIE);
2028
0
        FLT_ANALYZE(s, req, process_sticking_rules,     ana_list, ana_back, AN_REQ_STICKING_RULES);
2029
0
        ANALYZE    (s, req, flt_analyze_http_headers,   ana_list, ana_back, AN_REQ_FLT_HTTP_HDRS);
2030
0
        ANALYZE    (s, req, http_request_forward_body,  ana_list, ana_back, AN_REQ_HTTP_XFER_BODY);
2031
0
        ANALYZE    (s, req, pcli_wait_for_request,      ana_list, ana_back, AN_REQ_WAIT_CLI);
2032
0
        ANALYZE    (s, req, flt_xfer_data,              ana_list, ana_back, AN_REQ_FLT_XFER_DATA);
2033
0
        ANALYZE    (s, req, flt_end_analyze,            ana_list, ana_back, AN_REQ_FLT_END);
2034
0
        break;
2035
0
      }
2036
0
    }
2037
2038
0
    rq_prod_last = scf->state;
2039
0
    rq_cons_last = scb->state;
2040
0
    req->flags &= ~CF_WAKE_ONCE;
2041
0
    rqf_last = req->flags;
2042
0
    scf_flags = (scf_flags & ~(SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_ABRT_WANTED)) | (scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_ABRT_WANTED));
2043
0
    scb_flags = (scb_flags & ~(SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) | (scb->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED));
2044
2045
0
    if (((scf->flags ^ scf_flags_ana) & (SC_FL_EOS|SC_FL_ABRT_DONE)) || ((scb->flags ^ scb_flags_ana) & SC_FL_SHUT_DONE))
2046
0
      goto resync_request;
2047
0
  }
2048
2049
  /* we'll monitor the request analysers while parsing the response,
2050
   * because some response analysers may indirectly enable new request
2051
   * analysers (eg: HTTP keep-alive).
2052
   */
2053
0
  req_ana_back = req->analysers;
2054
2055
0
 resync_response:
2056
  /* Analyse response */
2057
2058
0
  if (((res->flags & ~rpf_last) & CF_MASK_ANALYSER) ||
2059
0
      ((scb->flags ^ scb_flags) & (SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_ABRT_WANTED)) ||
2060
0
      ((scf->flags ^ scf_flags) & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) ||
2061
0
      (res->analysers && (scf->flags & SC_FL_SHUT_DONE)) ||
2062
0
      scf->state != rp_cons_last ||
2063
0
      scb->state != rp_prod_last ||
2064
0
      s->pending_events & TASK_WOKEN_MSG) {
2065
0
    unsigned int scb_flags_ana = scb->flags;
2066
0
    unsigned int scf_flags_ana = scf->flags;
2067
2068
0
    if (sc_state_in(scb->state, SC_SB_EST|SC_SB_DIS|SC_SB_CLO)) {
2069
0
      int max_loops = global.tune.maxpollevents;
2070
0
      unsigned int ana_list;
2071
0
      unsigned int ana_back;
2072
2073
      /* it's up to the analysers to stop disable reading or
2074
       * closing. Note: if an analyser disables any of these
2075
       * bits, it is responsible for enabling them again when
2076
       * it disables itself, so that other analysers are called
2077
       * in similar conditions.
2078
       */
2079
0
      channel_auto_read(res);
2080
0
      channel_auto_close(res);
2081
2082
      /* We will call all analysers for which a bit is set in
2083
       * res->analysers, following the bit order from LSB
2084
       * to MSB. The analysers must remove themselves from
2085
       * the list when not needed. Any analyser may return 0
2086
       * to break out of the loop, either because of missing
2087
       * data to take a decision, or because it decides to
2088
       * kill the stream. We loop at least once through each
2089
       * analyser, and we may loop again if other analysers
2090
       * are added in the middle.
2091
       */
2092
2093
0
      ana_list = ana_back = res->analysers;
2094
0
      while (ana_list && max_loops--) {
2095
        /* Warning! ensure that analysers are always placed in ascending order! */
2096
0
        ANALYZE    (s, res, flt_start_analyze,          ana_list, ana_back, AN_RES_FLT_START_FE);
2097
0
        ANALYZE    (s, res, flt_start_analyze,          ana_list, ana_back, AN_RES_FLT_START_BE);
2098
0
        FLT_ANALYZE(s, res, tcp_inspect_response,       ana_list, ana_back, AN_RES_INSPECT);
2099
0
        FLT_ANALYZE(s, res, http_wait_for_response,     ana_list, ana_back, AN_RES_WAIT_HTTP);
2100
0
        FLT_ANALYZE(s, res, process_store_rules,        ana_list, ana_back, AN_RES_STORE_RULES);
2101
0
        FLT_ANALYZE(s, res, http_process_res_common,    ana_list, ana_back, AN_RES_HTTP_PROCESS_BE, s->be);
2102
0
        ANALYZE    (s, res, flt_analyze_http_headers,   ana_list, ana_back, AN_RES_FLT_HTTP_HDRS);
2103
0
        ANALYZE    (s, res, http_response_forward_body, ana_list, ana_back, AN_RES_HTTP_XFER_BODY);
2104
0
        ANALYZE    (s, res, pcli_wait_for_response,     ana_list, ana_back, AN_RES_WAIT_CLI);
2105
0
        ANALYZE    (s, res, flt_xfer_data,              ana_list, ana_back, AN_RES_FLT_XFER_DATA);
2106
0
        ANALYZE    (s, res, flt_end_analyze,            ana_list, ana_back, AN_RES_FLT_END);
2107
0
        break;
2108
0
      }
2109
0
    }
2110
2111
0
    rp_cons_last = scf->state;
2112
0
    rp_prod_last = scb->state;
2113
0
    res->flags &= ~CF_WAKE_ONCE;
2114
0
    rpf_last = res->flags;
2115
0
    scb_flags = (scb_flags & ~(SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_ABRT_WANTED)) | (scb->flags & (SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_ABRT_WANTED));
2116
0
    scf_flags = (scf_flags & ~(SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) | (scf->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED));
2117
2118
0
    if (((scb->flags ^ scb_flags_ana) & (SC_FL_EOS|SC_FL_ABRT_DONE)) || ((scf->flags ^ scf_flags_ana) & SC_FL_SHUT_DONE))
2119
0
      goto resync_response;
2120
0
  }
2121
2122
  /* we'll monitor the response analysers because some response analysers
2123
   * may be enabled/disabled later
2124
   */
2125
0
  res_ana_back = res->analysers;
2126
2127
  /* maybe someone has added some request analysers, so we must check and loop */
2128
0
  if (req->analysers & ~req_ana_back)
2129
0
    goto resync_request;
2130
2131
0
  if ((req->flags & ~rqf_last) & CF_MASK_ANALYSER)
2132
0
    goto resync_request;
2133
2134
  /* FIXME: here we should call protocol handlers which rely on
2135
   * both buffers.
2136
   */
2137
2138
2139
  /*
2140
   * Now we propagate unhandled errors to the stream. Normally
2141
   * we're just in a data phase here since it means we have not
2142
   * seen any analyser who could set an error status.
2143
   */
2144
0
  srv = objt_server(s->target);
2145
0
  if (unlikely(!(s->flags & SF_ERR_MASK))) {
2146
0
    if ((scf->flags & SC_FL_ERROR) || req->flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT)) {
2147
      /* Report it if the client got an error or a read timeout expired */
2148
0
      req->analysers &= AN_REQ_FLT_END;
2149
0
      channel_auto_close(req);
2150
0
      if (scf->flags & SC_FL_ERROR) {
2151
0
        _HA_ATOMIC_INC(&s->be->be_counters.cli_aborts);
2152
0
        _HA_ATOMIC_INC(&sess->fe->fe_counters.cli_aborts);
2153
0
        if (sess->listener && sess->listener->counters)
2154
0
          _HA_ATOMIC_INC(&sess->listener->counters->cli_aborts);
2155
0
        if (srv)
2156
0
          _HA_ATOMIC_INC(&srv->counters.cli_aborts);
2157
0
        s->flags |= SF_ERR_CLICL;
2158
0
      }
2159
0
      else if (req->flags & CF_READ_TIMEOUT) {
2160
0
        _HA_ATOMIC_INC(&s->be->be_counters.cli_aborts);
2161
0
        _HA_ATOMIC_INC(&sess->fe->fe_counters.cli_aborts);
2162
0
        if (sess->listener && sess->listener->counters)
2163
0
          _HA_ATOMIC_INC(&sess->listener->counters->cli_aborts);
2164
0
        if (srv)
2165
0
          _HA_ATOMIC_INC(&srv->counters.cli_aborts);
2166
0
        s->flags |= SF_ERR_CLITO;
2167
0
      }
2168
0
      else {
2169
0
        _HA_ATOMIC_INC(&s->be->be_counters.srv_aborts);
2170
0
        _HA_ATOMIC_INC(&sess->fe->fe_counters.srv_aborts);
2171
0
        if (sess->listener && sess->listener->counters)
2172
0
          _HA_ATOMIC_INC(&sess->listener->counters->srv_aborts);
2173
0
        if (srv)
2174
0
          _HA_ATOMIC_INC(&srv->counters.srv_aborts);
2175
0
        s->flags |= SF_ERR_SRVTO;
2176
0
      }
2177
0
      sess_set_term_flags(s);
2178
2179
      /* Abort the request if a client error occurred while
2180
       * the backend stream connector is in the SC_ST_INI
2181
       * state. It is switched into the SC_ST_CLO state and
2182
       * the request channel is erased. */
2183
0
      if (scb->state == SC_ST_INI) {
2184
0
        s->scb->state = SC_ST_CLO;
2185
0
        channel_abort(req);
2186
0
        if (IS_HTX_STRM(s))
2187
0
          channel_htx_erase(req, htxbuf(&req->buf));
2188
0
        else
2189
0
          channel_erase(req);
2190
0
      }
2191
0
    }
2192
0
    else if ((scb->flags & SC_FL_ERROR) || res->flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT)) {
2193
      /* Report it if the server got an error or a read timeout expired */
2194
0
      res->analysers &= AN_RES_FLT_END;
2195
0
      channel_auto_close(res);
2196
0
      if (scb->flags & SC_FL_ERROR) {
2197
0
        _HA_ATOMIC_INC(&s->be->be_counters.srv_aborts);
2198
0
        _HA_ATOMIC_INC(&sess->fe->fe_counters.srv_aborts);
2199
0
        if (sess->listener && sess->listener->counters)
2200
0
          _HA_ATOMIC_INC(&sess->listener->counters->srv_aborts);
2201
0
        if (srv)
2202
0
          _HA_ATOMIC_INC(&srv->counters.srv_aborts);
2203
0
        s->flags |= SF_ERR_SRVCL;
2204
0
      }
2205
0
      else if (res->flags & CF_READ_TIMEOUT) {
2206
0
        _HA_ATOMIC_INC(&s->be->be_counters.srv_aborts);
2207
0
        _HA_ATOMIC_INC(&sess->fe->fe_counters.srv_aborts);
2208
0
        if (sess->listener && sess->listener->counters)
2209
0
          _HA_ATOMIC_INC(&sess->listener->counters->srv_aborts);
2210
0
        if (srv)
2211
0
          _HA_ATOMIC_INC(&srv->counters.srv_aborts);
2212
0
        s->flags |= SF_ERR_SRVTO;
2213
0
      }
2214
0
      else {
2215
0
        _HA_ATOMIC_INC(&s->be->be_counters.cli_aborts);
2216
0
        _HA_ATOMIC_INC(&sess->fe->fe_counters.cli_aborts);
2217
0
        if (sess->listener && sess->listener->counters)
2218
0
          _HA_ATOMIC_INC(&sess->listener->counters->cli_aborts);
2219
0
        if (srv)
2220
0
          _HA_ATOMIC_INC(&srv->counters.cli_aborts);
2221
0
        s->flags |= SF_ERR_CLITO;
2222
0
      }
2223
0
      sess_set_term_flags(s);
2224
0
    }
2225
0
  }
2226
2227
  /*
2228
   * Here we take care of forwarding unhandled data. This also includes
2229
   * connection establishments and shutdown requests.
2230
   */
2231
2232
2233
  /* If no one is interested in analysing data, it's time to forward
2234
   * everything. We configure the buffer to forward indefinitely.
2235
   * Note that we're checking SC_FL_ABRT_WANTED as an indication of a possible
2236
   * recent call to channel_abort().
2237
   */
2238
0
  if (unlikely((!req->analysers || (req->analysers == AN_REQ_FLT_END && !(req->flags & CF_FLT_ANALYZE))) &&
2239
0
         !(scf->flags & SC_FL_ABRT_WANTED) && !(scb->flags & SC_FL_SHUT_DONE) &&
2240
0
         (sc_state_in(scf->state, SC_SB_EST|SC_SB_DIS|SC_SB_CLO)) &&
2241
0
         (req->to_forward != CHN_INFINITE_FORWARD))) {
2242
    /* This buffer is freewheeling, there's no analyser
2243
     * attached to it. If any data are left in, we'll permit them to
2244
     * move.
2245
     */
2246
0
    channel_auto_read(req);
2247
0
    channel_auto_connect(req);
2248
0
    channel_auto_close(req);
2249
2250
0
    if (IS_HTX_STRM(s)) {
2251
0
      struct htx *htx = htxbuf(&req->buf);
2252
2253
      /* We'll let data flow between the producer (if still connected)
2254
       * to the consumer.
2255
       */
2256
0
      co_set_data(req, htx->data);
2257
0
      if ((global.tune.options & GTUNE_USE_FAST_FWD) &&
2258
0
          !(scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && !(scb->flags & SC_FL_SHUT_WANTED))
2259
0
        channel_htx_forward_forever(req, htx);
2260
0
    }
2261
0
    else {
2262
      /* We'll let data flow between the producer (if still connected)
2263
       * to the consumer (which might possibly not be connected yet).
2264
       */
2265
0
      c_adv(req, ci_data(req));
2266
0
      if ((global.tune.options & GTUNE_USE_FAST_FWD) &&
2267
0
          !(scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && !(scb->flags & SC_FL_SHUT_WANTED))
2268
0
        channel_forward_forever(req);
2269
0
    }
2270
0
  }
2271
2272
  /* reflect what the L7 analysers have seen last */
2273
0
  rqf_last = req->flags;
2274
0
  scf_flags = (scf_flags & ~(SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_ABRT_WANTED)) | (scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_ABRT_WANTED));
2275
0
  scb_flags = (scb_flags & ~(SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) | (scb->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED));
2276
2277
  /* it's possible that an upper layer has requested a connection setup or abort.
2278
   * There are 2 situations where we decide to establish a new connection :
2279
   *  - there are data scheduled for emission in the buffer
2280
   *  - the CF_AUTO_CONNECT flag is set (active connection)
2281
   */
2282
0
  if (scb->state == SC_ST_INI) {
2283
0
    if (!(scb->flags & SC_FL_SHUT_DONE)) {
2284
0
      if ((req->flags & CF_AUTO_CONNECT) || co_data(req)) {
2285
        /* If we have an appctx, there is no connect method, so we
2286
         * immediately switch to the connected state, otherwise we
2287
         * perform a connection request.
2288
         */
2289
0
        scb->state = SC_ST_REQ; /* new connection requested */
2290
0
        s->conn_retries = 0;
2291
0
        if ((s->be->retry_type &~ PR_RE_CONN_FAILED) &&
2292
0
            (s->be->mode == PR_MODE_HTTP) &&
2293
0
            !(s->txn->flags & TX_D_L7_RETRY))
2294
0
          s->txn->flags |= TX_L7_RETRY;
2295
2296
0
        if (s->be->options & PR_O_ABRT_CLOSE) {
2297
0
          struct connection *conn = sc_conn(scf);
2298
2299
0
          if (conn && conn->mux && conn->mux->ctl)
2300
0
            conn->mux->ctl(conn, MUX_SUBS_RECV, NULL);
2301
0
        }
2302
0
      }
2303
0
    }
2304
0
    else {
2305
0
      s->scb->state = SC_ST_CLO; /* shutw+ini = abort */
2306
0
      sc_schedule_shutdown(scb);
2307
0
      sc_schedule_abort(scb);
2308
0
    }
2309
0
  }
2310
2311
2312
  /* we may have a pending connection request, or a connection waiting
2313
   * for completion.
2314
   */
2315
0
  if (sc_state_in(scb->state, SC_SB_REQ|SC_SB_QUE|SC_SB_TAR|SC_SB_ASS)) {
2316
    /* prune the request variables and swap to the response variables. */
2317
0
    if (s->vars_reqres.scope != SCOPE_RES) {
2318
0
      if (!LIST_ISEMPTY(&s->vars_reqres.head))
2319
0
        vars_prune(&s->vars_reqres, s->sess, s);
2320
0
      vars_init_head(&s->vars_reqres, SCOPE_RES);
2321
0
    }
2322
2323
0
    do {
2324
      /* nb: step 1 might switch from QUE to ASS, but we first want
2325
       * to give a chance to step 2 to perform a redirect if needed.
2326
       */
2327
0
      if (scb->state != SC_ST_REQ)
2328
0
        back_try_conn_req(s);
2329
0
      if (scb->state == SC_ST_REQ)
2330
0
        back_handle_st_req(s);
2331
2332
      /* get a chance to complete an immediate connection setup */
2333
0
      if (scb->state == SC_ST_RDY)
2334
0
        goto resync_stconns;
2335
2336
      /* applets directly go to the ESTABLISHED state. Similarly,
2337
       * servers experience the same fate when their connection
2338
       * is reused.
2339
       */
2340
0
      if (unlikely(scb->state == SC_ST_EST))
2341
0
        back_establish(s);
2342
2343
0
      srv = objt_server(s->target);
2344
0
      if (scb->state == SC_ST_ASS && srv && srv->rdr_len && (s->flags & SF_REDIRECTABLE))
2345
0
        http_perform_server_redirect(s, scb);
2346
0
    } while (scb->state == SC_ST_ASS);
2347
0
  }
2348
2349
  /* Let's see if we can send the pending request now */
2350
0
  sc_conn_sync_send(scb);
2351
2352
  /*
2353
   * Now forward all shutdown requests between both sides of the request buffer
2354
   */
2355
2356
  /* first, let's check if the request buffer needs to shutdown(write), which may
2357
   * happen either because the input is closed or because we want to force a close
2358
   * once the server has begun to respond. If a half-closed timeout is set, we adjust
2359
   * the other side's timeout as well. However this doesn't have effect during the
2360
   * connection setup unless the backend has abortonclose set.
2361
   */
2362
0
  if (unlikely((req->flags & CF_AUTO_CLOSE) && (scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) &&
2363
0
         !(scb->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) &&
2364
0
         (scb->state != SC_ST_CON || (s->be->options & PR_O_ABRT_CLOSE)))) {
2365
0
    sc_schedule_shutdown(scb);
2366
0
  }
2367
2368
  /* shutdown(write) pending */
2369
0
  if (unlikely((scb->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) == SC_FL_SHUT_WANTED &&
2370
0
         (!co_data(req) || (req->flags & CF_WRITE_TIMEOUT)))) {
2371
0
    if (scf->flags & SC_FL_ERROR)
2372
0
      scb->flags |= SC_FL_NOLINGER;
2373
0
    sc_shutdown(scb);
2374
0
  }
2375
2376
  /* shutdown(write) done on server side, we must stop the client too */
2377
0
  if (unlikely((scb->flags & SC_FL_SHUT_DONE) && !(scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_ABRT_WANTED))) &&
2378
0
      !req->analysers)
2379
0
    sc_schedule_abort(scf);
2380
2381
  /* shutdown(read) pending */
2382
0
  if (unlikely((scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_ABRT_WANTED)) == SC_FL_ABRT_WANTED)) {
2383
0
    if (scf->flags & SC_FL_NOHALF)
2384
0
      scf->flags |= SC_FL_NOLINGER;
2385
0
    sc_abort(scf);
2386
0
  }
2387
2388
  /* Benchmarks have shown that it's optimal to do a full resync now */
2389
0
  if (scf->state == SC_ST_DIS ||
2390
0
      sc_state_in(scb->state, SC_SB_RDY|SC_SB_DIS) ||
2391
0
      ((scf->flags & SC_FL_ERROR) && scf->state != SC_ST_CLO) ||
2392
0
      ((scb->flags & SC_FL_ERROR) && scb->state != SC_ST_CLO))
2393
0
    goto resync_stconns;
2394
2395
  /* otherwise we want to check if we need to resync the req buffer or not */
2396
0
  if (((scf->flags ^ scf_flags) & (SC_FL_EOS|SC_FL_ABRT_DONE)) || ((scb->flags ^ scb_flags) & SC_FL_SHUT_DONE))
2397
0
    goto resync_request;
2398
2399
  /* perform output updates to the response buffer */
2400
2401
  /* If no one is interested in analysing data, it's time to forward
2402
   * everything. We configure the buffer to forward indefinitely.
2403
   * Note that we're checking SC_FL_ABRT_WANTED as an indication of a possible
2404
   * recent call to channel_abort().
2405
   */
2406
0
  if (unlikely((!res->analysers || (res->analysers == AN_RES_FLT_END && !(res->flags & CF_FLT_ANALYZE))) &&
2407
0
         !(scf->flags & SC_FL_ABRT_WANTED) && !(scb->flags & SC_FL_SHUT_WANTED) &&
2408
0
         sc_state_in(scb->state, SC_SB_EST|SC_SB_DIS|SC_SB_CLO) &&
2409
0
         (res->to_forward != CHN_INFINITE_FORWARD))) {
2410
    /* This buffer is freewheeling, there's no analyser
2411
     * attached to it. If any data are left in, we'll permit them to
2412
     * move.
2413
     */
2414
0
    channel_auto_read(res);
2415
0
    channel_auto_close(res);
2416
2417
0
    if (IS_HTX_STRM(s)) {
2418
0
      struct htx *htx = htxbuf(&res->buf);
2419
2420
      /* We'll let data flow between the producer (if still connected)
2421
       * to the consumer.
2422
       */
2423
0
      co_set_data(res, htx->data);
2424
0
      if ((global.tune.options & GTUNE_USE_FAST_FWD) &&
2425
0
          !(scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && !(scb->flags & SC_FL_SHUT_WANTED))
2426
0
        channel_htx_forward_forever(res, htx);
2427
0
    }
2428
0
    else {
2429
      /* We'll let data flow between the producer (if still connected)
2430
       * to the consumer.
2431
       */
2432
0
      c_adv(res, ci_data(res));
2433
0
      if ((global.tune.options & GTUNE_USE_FAST_FWD) &&
2434
0
          !(scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && !(scb->flags & SC_FL_SHUT_WANTED))
2435
0
        channel_forward_forever(res);
2436
0
    }
2437
2438
    /* if we have no analyser anymore in any direction and have a
2439
     * tunnel timeout set, use it now. Note that we must respect
2440
     * the half-closed timeouts as well.
2441
     */
2442
0
    if (!req->analysers && s->tunnel_timeout) {
2443
0
      scf->ioto = scb->ioto = s->tunnel_timeout;
2444
2445
0
      if (!IS_HTX_STRM(s)) {
2446
0
        if ((scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_SHUT_DONE)) && tick_isset(sess->fe->timeout.clientfin))
2447
0
          scf->ioto = sess->fe->timeout.clientfin;
2448
0
        if ((scb->flags & (SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_SHUT_DONE)) && tick_isset(s->be->timeout.serverfin))
2449
0
          scb->ioto = s->be->timeout.serverfin;
2450
0
      }
2451
0
    }
2452
0
  }
2453
2454
  /* reflect what the L7 analysers have seen last */
2455
0
  rpf_last = res->flags;
2456
0
  scb_flags = (scb_flags & ~(SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_ABRT_WANTED)) | (scb->flags & (SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_ABRT_WANTED));
2457
0
  scf_flags = (scf_flags & ~(SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) | (scf->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED));
2458
2459
  /* Let's see if we can send the pending response now */
2460
0
  sc_conn_sync_send(scf);
2461
2462
  /*
2463
   * Now forward all shutdown requests between both sides of the buffer
2464
   */
2465
2466
  /*
2467
   * FIXME: this is probably where we should produce error responses.
2468
   */
2469
2470
  /* first, let's check if the response buffer needs to shutdown(write) */
2471
0
  if (unlikely((res->flags & CF_AUTO_CLOSE) && (scb->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) &&
2472
0
         !(scf->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)))) {
2473
0
    sc_schedule_shutdown(scf);
2474
0
  }
2475
2476
  /* shutdown(write) pending */
2477
0
  if (unlikely((scf->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) == SC_FL_SHUT_WANTED &&
2478
0
         (!co_data(res) || (res->flags & CF_WRITE_TIMEOUT)))) {
2479
0
    sc_shutdown(scf);
2480
0
  }
2481
2482
  /* shutdown(write) done on the client side, we must stop the server too */
2483
0
  if (unlikely((scf->flags & SC_FL_SHUT_DONE) && !(scb->flags & (SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_ABRT_WANTED))) &&
2484
0
      !res->analysers)
2485
0
    sc_schedule_abort(scb);
2486
2487
  /* shutdown(read) pending */
2488
0
  if (unlikely((scb->flags & (SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_ABRT_WANTED)) == SC_FL_ABRT_WANTED)) {
2489
0
    if (scb->flags & SC_FL_NOHALF)
2490
0
      scb->flags |= SC_FL_NOLINGER;
2491
0
    sc_abort(scb);
2492
0
  }
2493
2494
0
  if (scf->state == SC_ST_DIS ||
2495
0
      sc_state_in(scb->state, SC_SB_RDY|SC_SB_DIS) ||
2496
0
      ((scf->flags & SC_FL_ERROR) && scf->state != SC_ST_CLO) ||
2497
0
      ((scb->flags & SC_FL_ERROR) && scb->state != SC_ST_CLO))
2498
0
    goto resync_stconns;
2499
2500
0
  if ((req->flags & ~rqf_last) & CF_MASK_ANALYSER)
2501
0
    goto resync_request;
2502
2503
0
  if (((scb->flags ^ scb_flags) & (SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_ABRT_WANTED)) ||
2504
0
      ((scf->flags ^ scf_flags) & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) ||
2505
0
      (res->analysers ^ res_ana_back))
2506
0
    goto resync_response;
2507
2508
0
  if ((((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER) ||
2509
0
      (req->analysers ^ req_ana_back))
2510
0
    goto resync_request;
2511
2512
  /* we're interested in getting wakeups again */
2513
0
  scf->flags &= ~SC_FL_DONT_WAKE;
2514
0
  scb->flags &= ~SC_FL_DONT_WAKE;
2515
2516
0
  if (likely((scf->state != SC_ST_CLO) || !sc_state_in(scb->state, SC_SB_INI|SC_SB_CLO) ||
2517
0
       (req->analysers & AN_REQ_FLT_END) || (res->analysers & AN_RES_FLT_END))) {
2518
0
    if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED) && !(s->flags & SF_IGNORE))
2519
0
      stream_process_counters(s);
2520
2521
0
    stream_update_both_sc(s);
2522
2523
    /* Reset pending events now */
2524
0
    s->pending_events = 0;
2525
2526
0
  update_exp_and_leave:
2527
    /* Note: please ensure that if you branch here you disable SC_FL_DONT_WAKE */
2528
0
    if (!req->analysers)
2529
0
      req->analyse_exp = TICK_ETERNITY;
2530
0
    if (!res->analysers)
2531
0
      res->analyse_exp = TICK_ETERNITY;
2532
2533
0
    if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED) &&
2534
0
              (!tick_isset(req->analyse_exp) || tick_is_expired(req->analyse_exp, now_ms)))
2535
0
      req->analyse_exp = tick_add(now_ms, 5000);
2536
2537
0
    t->expire = (tick_is_expired(t->expire, now_ms) ? 0 : t->expire);
2538
0
    t->expire = tick_first(t->expire, sc_ep_rcv_ex(scf));
2539
0
    t->expire = tick_first(t->expire, sc_ep_snd_ex(scf));
2540
0
    t->expire = tick_first(t->expire, sc_ep_rcv_ex(scb));
2541
0
    t->expire = tick_first(t->expire, sc_ep_snd_ex(scb));
2542
0
    t->expire = tick_first(t->expire, req->analyse_exp);
2543
0
    t->expire = tick_first(t->expire, res->analyse_exp);
2544
0
    t->expire = tick_first(t->expire, s->conn_exp);
2545
2546
0
    if (unlikely(tick_is_expired(t->expire, now_ms))) {
2547
      /* Some events prevented the timeouts to be handled but nothing evolved.
2548
         So do it now and resyunc the stconns
2549
       */
2550
0
      stream_handle_timeouts(s);
2551
0
      goto resync_stconns;
2552
0
    }
2553
2554
0
    s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
2555
0
    stream_release_buffers(s);
2556
2557
0
    DBG_TRACE_DEVEL("queuing", STRM_EV_STRM_PROC, s);
2558
0
    return t; /* nothing more to do */
2559
0
  }
2560
2561
0
  DBG_TRACE_DEVEL("releasing", STRM_EV_STRM_PROC, s);
2562
2563
0
  if (s->flags & SF_BE_ASSIGNED)
2564
0
    _HA_ATOMIC_DEC(&s->be->beconn);
2565
2566
0
  if (unlikely((global.mode & MODE_DEBUG) &&
2567
0
         (!(global.mode & MODE_QUIET) || (global.mode & MODE_VERBOSE)))) {
2568
0
    chunk_printf(&trash, "%08x:%s.closed[%04x:%04x]\n",
2569
0
           s->uniq_id, s->be->id,
2570
0
           (unsigned short)conn_fd(sc_conn(scf)),
2571
0
           (unsigned short)conn_fd(sc_conn(scb)));
2572
0
    DISGUISE(write(1, trash.area, trash.data));
2573
0
  }
2574
2575
0
  if (!(s->flags & SF_IGNORE)) {
2576
0
    s->logs.t_close = ns_to_ms(now_ns - s->logs.accept_ts);
2577
2578
0
    stream_process_counters(s);
2579
2580
0
    if (s->txn && s->txn->status) {
2581
0
      int n;
2582
2583
0
      n = s->txn->status / 100;
2584
0
      if (n < 1 || n > 5)
2585
0
        n = 0;
2586
2587
0
      if (sess->fe->mode == PR_MODE_HTTP) {
2588
0
        _HA_ATOMIC_INC(&sess->fe->fe_counters.p.http.rsp[n]);
2589
0
      }
2590
0
      if ((s->flags & SF_BE_ASSIGNED) &&
2591
0
          (s->be->mode == PR_MODE_HTTP)) {
2592
0
        _HA_ATOMIC_INC(&s->be->be_counters.p.http.rsp[n]);
2593
0
        _HA_ATOMIC_INC(&s->be->be_counters.p.http.cum_req);
2594
0
      }
2595
0
    }
2596
2597
    /* let's do a final log if we need it */
2598
0
    if (!LIST_ISEMPTY(&sess->fe->logformat) && s->logs.logwait &&
2599
0
        !(s->flags & SF_MONITOR) &&
2600
0
        (!(sess->fe->options & PR_O_NULLNOLOG) || req->total)) {
2601
      /* we may need to know the position in the queue */
2602
0
      pendconn_free(s);
2603
2604
0
      stream_cond_update_cpu_usage(s);
2605
0
      s->do_log(s);
2606
0
    }
2607
2608
    /* update time stats for this stream */
2609
0
    stream_update_time_stats(s);
2610
0
  }
2611
2612
  /* the task MUST not be in the run queue anymore */
2613
0
  stream_free(s);
2614
0
  task_destroy(t);
2615
0
  return NULL;
2616
0
}
2617
2618
/* Update the stream's backend and server time stats */
2619
void stream_update_time_stats(struct stream *s)
2620
0
{
2621
0
  int t_request;
2622
0
  int t_queue;
2623
0
  int t_connect;
2624
0
  int t_data;
2625
0
  int t_close;
2626
0
  struct server *srv;
2627
0
  unsigned int samples_window;
2628
2629
0
  t_request = 0;
2630
0
  t_queue   = s->logs.t_queue;
2631
0
  t_connect = s->logs.t_connect;
2632
0
  t_close   = s->logs.t_close;
2633
0
  t_data    = s->logs.t_data;
2634
2635
0
  if (s->be->mode != PR_MODE_HTTP)
2636
0
    t_data = t_connect;
2637
2638
0
  if (t_connect < 0 || t_data < 0)
2639
0
    return;
2640
2641
0
  if ((llong)(s->logs.request_ts - s->logs.accept_ts) >= 0)
2642
0
    t_request = ns_to_ms(s->logs.request_ts - s->logs.accept_ts);
2643
2644
0
  t_data    -= t_connect;
2645
0
  t_connect -= t_queue;
2646
0
  t_queue   -= t_request;
2647
2648
0
  srv = objt_server(s->target);
2649
0
  if (srv) {
2650
0
    samples_window = (((s->be->mode == PR_MODE_HTTP) ?
2651
0
      srv->counters.p.http.cum_req : srv->counters.cum_lbconn) > TIME_STATS_SAMPLES) ? TIME_STATS_SAMPLES : 0;
2652
0
    swrate_add_dynamic(&srv->counters.q_time, samples_window, t_queue);
2653
0
    swrate_add_dynamic(&srv->counters.c_time, samples_window, t_connect);
2654
0
    swrate_add_dynamic(&srv->counters.d_time, samples_window, t_data);
2655
0
    swrate_add_dynamic(&srv->counters.t_time, samples_window, t_close);
2656
0
    HA_ATOMIC_UPDATE_MAX(&srv->counters.qtime_max, t_queue);
2657
0
    HA_ATOMIC_UPDATE_MAX(&srv->counters.ctime_max, t_connect);
2658
0
    HA_ATOMIC_UPDATE_MAX(&srv->counters.dtime_max, t_data);
2659
0
    HA_ATOMIC_UPDATE_MAX(&srv->counters.ttime_max, t_close);
2660
0
  }
2661
0
  samples_window = (((s->be->mode == PR_MODE_HTTP) ?
2662
0
    s->be->be_counters.p.http.cum_req : s->be->be_counters.cum_lbconn) > TIME_STATS_SAMPLES) ? TIME_STATS_SAMPLES : 0;
2663
0
  swrate_add_dynamic(&s->be->be_counters.q_time, samples_window, t_queue);
2664
0
  swrate_add_dynamic(&s->be->be_counters.c_time, samples_window, t_connect);
2665
0
  swrate_add_dynamic(&s->be->be_counters.d_time, samples_window, t_data);
2666
0
  swrate_add_dynamic(&s->be->be_counters.t_time, samples_window, t_close);
2667
0
  HA_ATOMIC_UPDATE_MAX(&s->be->be_counters.qtime_max, t_queue);
2668
0
  HA_ATOMIC_UPDATE_MAX(&s->be->be_counters.ctime_max, t_connect);
2669
0
  HA_ATOMIC_UPDATE_MAX(&s->be->be_counters.dtime_max, t_data);
2670
0
  HA_ATOMIC_UPDATE_MAX(&s->be->be_counters.ttime_max, t_close);
2671
0
}
2672
2673
/*
2674
 * This function adjusts sess->srv_conn and maintains the previous and new
2675
 * server's served stream counts. Setting newsrv to NULL is enough to release
2676
 * current connection slot. This function also notifies any LB algo which might
2677
 * expect to be informed about any change in the number of active streams on a
2678
 * server.
2679
 */
2680
void sess_change_server(struct stream *strm, struct server *newsrv)
2681
0
{
2682
0
  struct server *oldsrv = strm->srv_conn;
2683
2684
0
  if (oldsrv == newsrv)
2685
0
    return;
2686
2687
0
  if (oldsrv) {
2688
0
    _HA_ATOMIC_DEC(&oldsrv->served);
2689
0
    _HA_ATOMIC_DEC(&oldsrv->proxy->served);
2690
0
    __ha_barrier_atomic_store();
2691
0
    if (oldsrv->proxy->lbprm.server_drop_conn)
2692
0
      oldsrv->proxy->lbprm.server_drop_conn(oldsrv);
2693
0
    stream_del_srv_conn(strm);
2694
0
  }
2695
2696
0
  if (newsrv) {
2697
0
    _HA_ATOMIC_INC(&newsrv->served);
2698
0
    _HA_ATOMIC_INC(&newsrv->proxy->served);
2699
0
    __ha_barrier_atomic_store();
2700
0
    if (newsrv->proxy->lbprm.server_take_conn)
2701
0
      newsrv->proxy->lbprm.server_take_conn(newsrv);
2702
0
    stream_add_srv_conn(strm, newsrv);
2703
0
  }
2704
0
}
2705
2706
/* Handle server-side errors for default protocols. It is called whenever a a
2707
 * connection setup is aborted or a request is aborted in queue. It sets the
2708
 * stream termination flags so that the caller does not have to worry about
2709
 * them. It's installed as ->srv_error for the server-side stream connector.
2710
 */
2711
void default_srv_error(struct stream *s, struct stconn *sc)
2712
0
{
2713
0
  int err_type = s->conn_err_type;
2714
0
  int err = 0, fin = 0;
2715
2716
0
  if (err_type & STRM_ET_QUEUE_ABRT) {
2717
0
    err = SF_ERR_CLICL;
2718
0
    fin = SF_FINST_Q;
2719
0
  }
2720
0
  else if (err_type & STRM_ET_CONN_ABRT) {
2721
0
    err = SF_ERR_CLICL;
2722
0
    fin = SF_FINST_C;
2723
0
  }
2724
0
  else if (err_type & STRM_ET_QUEUE_TO) {
2725
0
    err = SF_ERR_SRVTO;
2726
0
    fin = SF_FINST_Q;
2727
0
  }
2728
0
  else if (err_type & STRM_ET_QUEUE_ERR) {
2729
0
    err = SF_ERR_SRVCL;
2730
0
    fin = SF_FINST_Q;
2731
0
  }
2732
0
  else if (err_type & STRM_ET_CONN_TO) {
2733
0
    err = SF_ERR_SRVTO;
2734
0
    fin = SF_FINST_C;
2735
0
  }
2736
0
  else if (err_type & STRM_ET_CONN_ERR) {
2737
0
    err = SF_ERR_SRVCL;
2738
0
    fin = SF_FINST_C;
2739
0
  }
2740
0
  else if (err_type & STRM_ET_CONN_RES) {
2741
0
    err = SF_ERR_RESOURCE;
2742
0
    fin = SF_FINST_C;
2743
0
  }
2744
0
  else /* STRM_ET_CONN_OTHER and others */ {
2745
0
    err = SF_ERR_INTERNAL;
2746
0
    fin = SF_FINST_C;
2747
0
  }
2748
2749
0
  if (!(s->flags & SF_ERR_MASK))
2750
0
    s->flags |= err;
2751
0
  if (!(s->flags & SF_FINST_MASK))
2752
0
    s->flags |= fin;
2753
0
}
2754
2755
/* kill a stream and set the termination flags to <why> (one of SF_ERR_*) */
2756
void stream_shutdown(struct stream *stream, int why)
2757
0
{
2758
0
  if (stream->scb->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED))
2759
0
    return;
2760
2761
0
  sc_schedule_shutdown(stream->scb);
2762
0
  sc_schedule_abort(stream->scb);
2763
0
  stream->task->nice = 1024;
2764
0
  if (!(stream->flags & SF_ERR_MASK))
2765
0
    stream->flags |= why;
2766
0
  task_wakeup(stream->task, TASK_WOKEN_OTHER);
2767
0
}
2768
2769
/* dumps an error message for type <type> at ptr <ptr> related to stream <s>,
2770
 * having reached loop rate <rate>, then aborts hoping to retrieve a core.
2771
 */
2772
void stream_dump_and_crash(enum obj_type *obj, int rate)
2773
0
{
2774
0
  struct stream *s;
2775
0
  char *msg = NULL;
2776
0
  const void *ptr;
2777
2778
0
  ptr = s = objt_stream(obj);
2779
0
  if (!s) {
2780
0
    const struct appctx *appctx = objt_appctx(obj);
2781
0
    if (!appctx)
2782
0
      return;
2783
0
    ptr = appctx;
2784
0
    s = appctx_strm(appctx);
2785
0
    if (!s)
2786
0
      return;
2787
0
  }
2788
2789
0
  chunk_reset(&trash);
2790
0
  chunk_printf(&trash, "  ");
2791
0
  strm_dump_to_buffer(&trash, s, " ", HA_ATOMIC_LOAD(&global.anon_key));
2792
2793
0
  if (ptr != s) { // that's an appctx
2794
0
    const struct appctx *appctx = ptr;
2795
2796
0
    chunk_appendf(&trash, " applet=%p(", appctx->applet);
2797
0
    resolve_sym_name(&trash, NULL, appctx->applet);
2798
0
    chunk_appendf(&trash, ")");
2799
2800
0
    chunk_appendf(&trash, " handler=%p(", appctx->applet->fct);
2801
0
    resolve_sym_name(&trash, NULL, appctx->applet->fct);
2802
0
    chunk_appendf(&trash, ")");
2803
0
  }
2804
2805
0
  memprintf(&msg,
2806
0
            "A bogus %s [%p] is spinning at %d calls per second and refuses to die, "
2807
0
            "aborting now! Please report this error to developers:\n"
2808
0
            "%s\n",
2809
0
            obj_type_name(obj), ptr, rate, trash.area);
2810
2811
0
  ha_alert("%s", msg);
2812
0
  send_log(NULL, LOG_EMERG, "%s", msg);
2813
0
  ABORT_NOW();
2814
0
}
2815
2816
/* initialize the require structures */
2817
static void init_stream()
2818
0
{
2819
0
  int thr;
2820
2821
0
  for (thr = 0; thr < MAX_THREADS; thr++)
2822
0
    LIST_INIT(&ha_thread_ctx[thr].streams);
2823
0
}
2824
INITCALL0(STG_INIT, init_stream);
2825
2826
/* Generates a unique ID based on the given <format>, stores it in the given <strm> and
2827
 * returns the unique ID.
2828
 *
2829
 * If this function fails to allocate memory IST_NULL is returned.
2830
 *
2831
 * If an ID is already stored within the stream nothing happens existing unique ID is
2832
 * returned.
2833
 */
2834
struct ist stream_generate_unique_id(struct stream *strm, struct list *format)
2835
0
{
2836
0
  if (isttest(strm->unique_id)) {
2837
0
    return strm->unique_id;
2838
0
  }
2839
0
  else {
2840
0
    char *unique_id;
2841
0
    int length;
2842
0
    if ((unique_id = pool_alloc(pool_head_uniqueid)) == NULL)
2843
0
      return IST_NULL;
2844
2845
0
    length = build_logline(strm, unique_id, UNIQUEID_LEN, format);
2846
0
    strm->unique_id = ist2(unique_id, length);
2847
2848
0
    return strm->unique_id;
2849
0
  }
2850
0
}
2851
2852
/************************************************************************/
2853
/*           All supported ACL keywords must be declared here.          */
2854
/************************************************************************/
2855
static enum act_return stream_action_set_log_level(struct act_rule *rule, struct proxy *px,
2856
               struct session *sess, struct stream *s, int flags)
2857
0
{
2858
0
  s->logs.level = (uintptr_t)rule->arg.act.p[0];
2859
0
  return ACT_RET_CONT;
2860
0
}
2861
2862
2863
/* Parse a "set-log-level" action. It takes the level value as argument. It
2864
 * returns ACT_RET_PRS_OK on success, ACT_RET_PRS_ERR on error.
2865
 */
2866
static enum act_parse_ret stream_parse_set_log_level(const char **args, int *cur_arg, struct proxy *px,
2867
                 struct act_rule *rule, char **err)
2868
0
{
2869
0
  int level;
2870
2871
0
  if (!*args[*cur_arg]) {
2872
0
    bad_log_level:
2873
0
    memprintf(err, "expects exactly 1 argument (log level name or 'silent')");
2874
0
    return ACT_RET_PRS_ERR;
2875
0
  }
2876
0
  if (strcmp(args[*cur_arg], "silent") == 0)
2877
0
    level = -1;
2878
0
  else if ((level = get_log_level(args[*cur_arg]) + 1) == 0)
2879
0
    goto bad_log_level;
2880
2881
0
  (*cur_arg)++;
2882
2883
  /* Register processing function. */
2884
0
  rule->action_ptr = stream_action_set_log_level;
2885
0
  rule->action = ACT_CUSTOM;
2886
0
  rule->arg.act.p[0] = (void *)(uintptr_t)level;
2887
0
  return ACT_RET_PRS_OK;
2888
0
}
2889
2890
static enum act_return stream_action_set_nice(struct act_rule *rule, struct proxy *px,
2891
                struct session *sess, struct stream *s, int flags)
2892
0
{
2893
0
  s->task->nice = (uintptr_t)rule->arg.act.p[0];
2894
0
  return ACT_RET_CONT;
2895
0
}
2896
2897
2898
/* Parse a "set-nice" action. It takes the nice value as argument. It returns
2899
 * ACT_RET_PRS_OK on success, ACT_RET_PRS_ERR on error.
2900
 */
2901
static enum act_parse_ret stream_parse_set_nice(const char **args, int *cur_arg, struct proxy *px,
2902
            struct act_rule *rule, char **err)
2903
0
{
2904
0
  int nice;
2905
2906
0
  if (!*args[*cur_arg]) {
2907
0
    bad_log_level:
2908
0
    memprintf(err, "expects exactly 1 argument (integer value)");
2909
0
    return ACT_RET_PRS_ERR;
2910
0
  }
2911
2912
0
  nice = atoi(args[*cur_arg]);
2913
0
  if (nice < -1024)
2914
0
    nice = -1024;
2915
0
  else if (nice > 1024)
2916
0
    nice = 1024;
2917
2918
0
  (*cur_arg)++;
2919
2920
  /* Register processing function. */
2921
0
  rule->action_ptr = stream_action_set_nice;
2922
0
  rule->action = ACT_CUSTOM;
2923
0
  rule->arg.act.p[0] = (void *)(uintptr_t)nice;
2924
0
  return ACT_RET_PRS_OK;
2925
0
}
2926
2927
2928
static enum act_return tcp_action_switch_stream_mode(struct act_rule *rule, struct proxy *px,
2929
              struct session *sess, struct stream *s, int flags)
2930
0
{
2931
0
  enum pr_mode mode = (uintptr_t)rule->arg.act.p[0];
2932
0
  const struct mux_proto_list *mux_proto = rule->arg.act.p[1];
2933
2934
0
  if (!IS_HTX_STRM(s) && mode == PR_MODE_HTTP) {
2935
0
    if (!stream_set_http_mode(s, mux_proto)) {
2936
0
      stream_abort(s);
2937
0
      return ACT_RET_ABRT;
2938
0
    }
2939
0
  }
2940
0
  return ACT_RET_STOP;
2941
0
}
2942
2943
2944
static int check_tcp_switch_stream_mode(struct act_rule *rule, struct proxy *px, char **err)
2945
0
{
2946
0
  const struct mux_proto_list *mux_ent;
2947
0
  const struct mux_proto_list *mux_proto = rule->arg.act.p[1];
2948
0
  enum pr_mode pr_mode = (uintptr_t)rule->arg.act.p[0];
2949
0
  enum proto_proxy_mode mode = conn_pr_mode_to_proto_mode(pr_mode);
2950
2951
0
  if (pr_mode == PR_MODE_HTTP)
2952
0
    px->options |= PR_O_HTTP_UPG;
2953
2954
0
  if (mux_proto) {
2955
0
    mux_ent = conn_get_best_mux_entry(mux_proto->token, PROTO_SIDE_FE, mode);
2956
0
    if (!mux_ent || !isteq(mux_ent->token, mux_proto->token)) {
2957
0
      memprintf(err, "MUX protocol '%.*s' is not compatible with the selected mode",
2958
0
          (int)mux_proto->token.len, mux_proto->token.ptr);
2959
0
      return 0;
2960
0
    }
2961
0
  }
2962
0
  else {
2963
0
    mux_ent = conn_get_best_mux_entry(IST_NULL, PROTO_SIDE_FE, mode);
2964
0
    if (!mux_ent) {
2965
0
      memprintf(err, "Unable to find compatible MUX protocol with the selected mode");
2966
0
      return 0;
2967
0
    }
2968
0
  }
2969
2970
  /* Update the mux */
2971
0
  rule->arg.act.p[1] = (void *)mux_ent;
2972
0
  return 1;
2973
2974
0
}
2975
2976
static enum act_parse_ret stream_parse_switch_mode(const char **args, int *cur_arg,
2977
               struct proxy *px, struct act_rule *rule,
2978
               char **err)
2979
0
{
2980
0
  const struct mux_proto_list *mux_proto = NULL;
2981
0
  struct ist proto;
2982
0
  enum pr_mode mode;
2983
2984
  /* must have at least the mode */
2985
0
  if (*(args[*cur_arg]) == 0) {
2986
0
    memprintf(err, "'%s %s' expects a mode as argument.", args[0], args[*cur_arg-1]);
2987
0
    return ACT_RET_PRS_ERR;
2988
0
  }
2989
2990
0
  if (!(px->cap & PR_CAP_FE)) {
2991
0
    memprintf(err, "'%s %s' not allowed because %s '%s' has no frontend capability",
2992
0
        args[0], args[*cur_arg-1], proxy_type_str(px), px->id);
2993
0
    return ACT_RET_PRS_ERR;
2994
0
  }
2995
  /* Check if the mode. For now "tcp" is disabled because downgrade is not
2996
   * supported and PT is the only TCP mux.
2997
   */
2998
0
  if (strcmp(args[*cur_arg], "http") == 0)
2999
0
    mode = PR_MODE_HTTP;
3000
0
  else {
3001
0
    memprintf(err, "'%s %s' expects a valid mode (got '%s').", args[0], args[*cur_arg-1], args[*cur_arg]);
3002
0
    return ACT_RET_PRS_ERR;
3003
0
  }
3004
3005
  /* check the proto, if specified */
3006
0
  if (*(args[*cur_arg+1]) && strcmp(args[*cur_arg+1], "proto") == 0) {
3007
0
    if (*(args[*cur_arg+2]) == 0) {
3008
0
      memprintf(err, "'%s %s': '%s' expects a protocol as argument.",
3009
0
          args[0], args[*cur_arg-1], args[*cur_arg+1]);
3010
0
      return ACT_RET_PRS_ERR;
3011
0
    }
3012
3013
0
    proto = ist(args[*cur_arg + 2]);
3014
0
    mux_proto = get_mux_proto(proto);
3015
0
    if (!mux_proto) {
3016
0
      memprintf(err, "'%s %s': '%s' expects a valid MUX protocol, if specified (got '%s')",
3017
0
          args[0], args[*cur_arg-1], args[*cur_arg+1], args[*cur_arg+2]);
3018
0
      return ACT_RET_PRS_ERR;
3019
0
    }
3020
0
    *cur_arg += 2;
3021
0
  }
3022
3023
0
  (*cur_arg)++;
3024
3025
  /* Register processing function. */
3026
0
  rule->action_ptr = tcp_action_switch_stream_mode;
3027
0
  rule->check_ptr  = check_tcp_switch_stream_mode;
3028
0
  rule->action = ACT_CUSTOM;
3029
0
  rule->arg.act.p[0] = (void *)(uintptr_t)mode;
3030
0
  rule->arg.act.p[1] = (void *)mux_proto;
3031
0
  return ACT_RET_PRS_OK;
3032
0
}
3033
3034
/* 0=OK, <0=Alert, >0=Warning */
3035
static enum act_parse_ret stream_parse_use_service(const char **args, int *cur_arg,
3036
                                                   struct proxy *px, struct act_rule *rule,
3037
                                                   char **err)
3038
0
{
3039
0
  struct action_kw *kw;
3040
3041
  /* Check if the service name exists. */
3042
0
  if (*(args[*cur_arg]) == 0) {
3043
0
    memprintf(err, "'%s' expects a service name.", args[0]);
3044
0
    return ACT_RET_PRS_ERR;
3045
0
  }
3046
3047
  /* lookup for keyword corresponding to a service. */
3048
0
  kw = action_lookup(&service_keywords, args[*cur_arg]);
3049
0
  if (!kw) {
3050
0
    memprintf(err, "'%s' unknown service name.", args[1]);
3051
0
    return ACT_RET_PRS_ERR;
3052
0
  }
3053
0
  (*cur_arg)++;
3054
3055
  /* executes specific rule parser. */
3056
0
  rule->kw = kw;
3057
0
  if (kw->parse((const char **)args, cur_arg, px, rule, err) == ACT_RET_PRS_ERR)
3058
0
    return ACT_RET_PRS_ERR;
3059
3060
  /* Register processing function. */
3061
0
  rule->action_ptr = process_use_service;
3062
0
  rule->action = ACT_CUSTOM;
3063
3064
0
  return ACT_RET_PRS_OK;
3065
0
}
3066
3067
void service_keywords_register(struct action_kw_list *kw_list)
3068
0
{
3069
0
  LIST_APPEND(&service_keywords, &kw_list->list);
3070
0
}
3071
3072
struct action_kw *service_find(const char *kw)
3073
0
{
3074
0
  return action_lookup(&service_keywords, kw);
3075
0
}
3076
3077
/* Lists the known services on <out>. If <out> is null, emit them on stdout one
3078
 * per line.
3079
 */
3080
void list_services(FILE *out)
3081
0
{
3082
0
  const struct action_kw *akwp, *akwn;
3083
0
  struct action_kw_list *kw_list;
3084
0
  int found = 0;
3085
0
  int i;
3086
3087
0
  if (out)
3088
0
    fprintf(out, "Available services :");
3089
3090
0
  for (akwn = akwp = NULL;; akwp = akwn) {
3091
0
    list_for_each_entry(kw_list, &service_keywords, list) {
3092
0
      for (i = 0; kw_list->kw[i].kw != NULL; i++) {
3093
0
        if (strordered(akwp ? akwp->kw : NULL,
3094
0
                 kw_list->kw[i].kw,
3095
0
                 akwn != akwp ? akwn->kw : NULL))
3096
0
          akwn = &kw_list->kw[i];
3097
0
        found = 1;
3098
0
      }
3099
0
    }
3100
0
    if (akwn == akwp)
3101
0
      break;
3102
0
    if (out)
3103
0
      fprintf(out, " %s", akwn->kw);
3104
0
    else
3105
0
      printf("%s\n", akwn->kw);
3106
0
  }
3107
0
  if (!found && out)
3108
0
    fprintf(out, " none\n");
3109
0
}
3110
3111
/* appctx context used by the "show sess" command */
3112
/* flags used for show_sess_ctx.flags */
3113
0
#define CLI_SHOWSESS_F_SUSP  0x00000001   /* show only suspicious streams */
3114
3115
struct show_sess_ctx {
3116
  struct bref bref; /* back-reference from the session being dumped */
3117
  void *target;   /* session we want to dump, or NULL for all */
3118
  unsigned int thr;       /* the thread number being explored (0..MAX_THREADS-1) */
3119
  unsigned int uid; /* if non-null, the uniq_id of the session being dumped */
3120
  unsigned int min_age;   /* minimum age of streams to dump */
3121
  unsigned int flags;     /* CLI_SHOWSESS_* */
3122
  int section;    /* section of the session being dumped */
3123
  int pos;    /* last position of the current session's buffer */
3124
};
3125
3126
/* This function appends a complete dump of a stream state onto the buffer,
3127
 * possibly anonymizing using the specified anon_key. The caller is responsible
3128
 * for ensuring that enough room remains in the buffer to dump a complete
3129
 * stream at once. Each new output line will be prefixed with <pfx> if non-null,
3130
 * which is used to preserve indenting.
3131
 */
3132
void strm_dump_to_buffer(struct buffer *buf, const struct stream *strm, const char *pfx, uint32_t anon_key)
3133
0
{
3134
0
  struct stconn *scf, *scb;
3135
0
  struct tm tm;
3136
0
  extern const char *monthname[12];
3137
0
  char pn[INET6_ADDRSTRLEN];
3138
0
  struct connection *conn;
3139
0
  struct appctx *tmpctx;
3140
3141
0
  pfx = pfx ? pfx : "";
3142
3143
0
  get_localtime(strm->logs.accept_date.tv_sec, &tm);
3144
0
  chunk_appendf(buf,
3145
0
         "%p: [%02d/%s/%04d:%02d:%02d:%02d.%06d] id=%u proto=%s",
3146
0
         strm,
3147
0
         tm.tm_mday, monthname[tm.tm_mon], tm.tm_year+1900,
3148
0
         tm.tm_hour, tm.tm_min, tm.tm_sec, (int)(strm->logs.accept_date.tv_usec),
3149
0
         strm->uniq_id,
3150
0
         strm_li(strm) ? strm_li(strm)->rx.proto->name : "?");
3151
3152
0
  conn = objt_conn(strm_orig(strm));
3153
0
  switch (conn && conn_get_src(conn) ? addr_to_str(conn->src, pn, sizeof(pn)) : AF_UNSPEC) {
3154
0
  case AF_INET:
3155
0
  case AF_INET6:
3156
0
    chunk_appendf(buf, " source=%s:%d\n",
3157
0
                  HA_ANON_STR(anon_key, pn), get_host_port(conn->src));
3158
0
    break;
3159
0
  case AF_UNIX:
3160
0
    chunk_appendf(buf, " source=unix:%d\n", strm_li(strm)->luid);
3161
0
    break;
3162
0
  default:
3163
    /* no more information to print right now */
3164
0
    chunk_appendf(buf, "\n");
3165
0
    break;
3166
0
  }
3167
3168
0
  chunk_appendf(buf,
3169
0
         "%s  flags=0x%x, conn_retries=%d, conn_exp=%s conn_et=0x%03x srv_conn=%p, pend_pos=%p waiting=%d epoch=%#x\n", pfx,
3170
0
         strm->flags, strm->conn_retries,
3171
0
         strm->conn_exp ?
3172
0
                 tick_is_expired(strm->conn_exp, now_ms) ? "<PAST>" :
3173
0
                         human_time(TICKS_TO_MS(strm->conn_exp - now_ms),
3174
0
                         TICKS_TO_MS(1000)) : "<NEVER>",
3175
0
         strm->conn_err_type, strm->srv_conn, strm->pend_pos,
3176
0
         LIST_INLIST(&strm->buffer_wait.list), strm->stream_epoch);
3177
3178
0
  chunk_appendf(buf,
3179
0
         "%s  frontend=%s (id=%u mode=%s), listener=%s (id=%u)", pfx,
3180
0
         HA_ANON_STR(anon_key, strm_fe(strm)->id), strm_fe(strm)->uuid, proxy_mode_str(strm_fe(strm)->mode),
3181
0
         strm_li(strm) ? strm_li(strm)->name ? strm_li(strm)->name : "?" : "?",
3182
0
         strm_li(strm) ? strm_li(strm)->luid : 0);
3183
3184
0
  switch (conn && conn_get_dst(conn) ? addr_to_str(conn->dst, pn, sizeof(pn)) : AF_UNSPEC) {
3185
0
  case AF_INET:
3186
0
  case AF_INET6:
3187
0
    chunk_appendf(buf, " addr=%s:%d\n",
3188
0
           HA_ANON_STR(anon_key, pn), get_host_port(conn->dst));
3189
0
    break;
3190
0
  case AF_UNIX:
3191
0
    chunk_appendf(buf, " addr=unix:%d\n", strm_li(strm)->luid);
3192
0
    break;
3193
0
  default:
3194
    /* no more information to print right now */
3195
0
    chunk_appendf(buf, "\n");
3196
0
    break;
3197
0
  }
3198
3199
0
  if (strm->be->cap & PR_CAP_BE)
3200
0
    chunk_appendf(buf,
3201
0
           "%s  backend=%s (id=%u mode=%s)", pfx,
3202
0
           HA_ANON_STR(anon_key, strm->be->id),
3203
0
           strm->be->uuid, proxy_mode_str(strm->be->mode));
3204
0
  else
3205
0
    chunk_appendf(buf, "%s  backend=<NONE> (id=-1 mode=-)", pfx);
3206
3207
0
  conn = sc_conn(strm->scb);
3208
0
  switch (conn && conn_get_src(conn) ? addr_to_str(conn->src, pn, sizeof(pn)) : AF_UNSPEC) {
3209
0
  case AF_INET:
3210
0
  case AF_INET6:
3211
0
    chunk_appendf(buf, " addr=%s:%d\n",
3212
0
           HA_ANON_STR(anon_key, pn), get_host_port(conn->src));
3213
0
    break;
3214
0
  case AF_UNIX:
3215
0
    chunk_appendf(buf, " addr=unix\n");
3216
0
    break;
3217
0
  default:
3218
    /* no more information to print right now */
3219
0
    chunk_appendf(buf, "\n");
3220
0
    break;
3221
0
  }
3222
3223
0
  if (strm->be->cap & PR_CAP_BE)
3224
0
    chunk_appendf(buf,
3225
0
           "%s  server=%s (id=%u)", pfx,
3226
0
           objt_server(strm->target) ? HA_ANON_STR(anon_key, __objt_server(strm->target)->id) : "<none>",
3227
0
           objt_server(strm->target) ? __objt_server(strm->target)->puid : 0);
3228
0
  else
3229
0
    chunk_appendf(buf, "%s  server=<NONE> (id=-1)", pfx);
3230
3231
0
  switch (conn && conn_get_dst(conn) ? addr_to_str(conn->dst, pn, sizeof(pn)) : AF_UNSPEC) {
3232
0
  case AF_INET:
3233
0
  case AF_INET6:
3234
0
    chunk_appendf(buf, " addr=%s:%d\n",
3235
0
           HA_ANON_STR(anon_key, pn), get_host_port(conn->dst));
3236
0
    break;
3237
0
  case AF_UNIX:
3238
0
    chunk_appendf(buf, " addr=unix\n");
3239
0
    break;
3240
0
  default:
3241
    /* no more information to print right now */
3242
0
    chunk_appendf(buf, "\n");
3243
0
    break;
3244
0
  }
3245
3246
0
  chunk_appendf(buf,
3247
0
          "%s  task=%p (state=0x%02x nice=%d calls=%u rate=%u exp=%s tid=%d(%d/%d)%s", pfx,
3248
0
         strm->task,
3249
0
         strm->task->state,
3250
0
         strm->task->nice, strm->task->calls, read_freq_ctr(&strm->call_rate),
3251
0
         strm->task->expire ?
3252
0
                 tick_is_expired(strm->task->expire, now_ms) ? "<PAST>" :
3253
0
                         human_time(TICKS_TO_MS(strm->task->expire - now_ms),
3254
0
                         TICKS_TO_MS(1000)) : "<NEVER>",
3255
0
               strm->task->tid,
3256
0
               ha_thread_info[strm->task->tid].tgid,
3257
0
               ha_thread_info[strm->task->tid].ltid,
3258
0
         task_in_rq(strm->task) ? ", running" : "");
3259
3260
0
  chunk_appendf(buf,
3261
0
         " age=%s)\n",
3262
0
         human_time(ns_to_sec(now_ns) - ns_to_sec(strm->logs.request_ts), 1));
3263
3264
0
  if (strm->txn)
3265
0
    chunk_appendf(buf,
3266
0
          "%s  txn=%p flags=0x%x meth=%d status=%d req.st=%s rsp.st=%s req.f=0x%02x rsp.f=0x%02x\n", pfx,
3267
0
          strm->txn, strm->txn->flags, strm->txn->meth, strm->txn->status,
3268
0
          h1_msg_state_str(strm->txn->req.msg_state), h1_msg_state_str(strm->txn->rsp.msg_state),
3269
0
          strm->txn->req.flags, strm->txn->rsp.flags);
3270
3271
0
  scf = strm->scf;
3272
0
  chunk_appendf(buf, "%s  scf=%p flags=0x%08x state=%s endp=%s,%p,0x%08x sub=%d", pfx,
3273
0
          scf, scf->flags, sc_state_str(scf->state),
3274
0
          (sc_ep_test(scf, SE_FL_T_MUX) ? "CONN" : (sc_ep_test(scf, SE_FL_T_APPLET) ? "APPCTX" : "NONE")),
3275
0
          scf->sedesc->se, sc_ep_get(scf), scf->wait_event.events);
3276
0
  chunk_appendf(buf, " rex=%s",
3277
0
          sc_ep_rcv_ex(scf) ? human_time(TICKS_TO_MS(sc_ep_rcv_ex(scf) - now_ms), TICKS_TO_MS(1000)) : "<NEVER>");
3278
0
  chunk_appendf(buf, " wex=%s\n",
3279
0
          sc_ep_snd_ex(scf) ? human_time(TICKS_TO_MS(sc_ep_snd_ex(scf) - now_ms), TICKS_TO_MS(1000)) : "<NEVER>");
3280
3281
0
  chunk_appendf(&trash, "%s    iobuf.flags=0x%08x .pipe=%d .buf=%u@%p+%u/%u\n", pfx,
3282
0
          scf->sedesc->iobuf.flags,
3283
0
          scf->sedesc->iobuf.pipe ? scf->sedesc->iobuf.pipe->data : 0,
3284
0
          scf->sedesc->iobuf.buf ? (unsigned int)b_data(scf->sedesc->iobuf.buf): 0,
3285
0
          scf->sedesc->iobuf.buf ? b_orig(scf->sedesc->iobuf.buf): NULL,
3286
0
          scf->sedesc->iobuf.buf ? (unsigned int)b_head_ofs(scf->sedesc->iobuf.buf): 0,
3287
0
          scf->sedesc->iobuf.buf ? (unsigned int)b_size(scf->sedesc->iobuf.buf): 0);
3288
3289
0
  if ((conn = sc_conn(scf)) != NULL) {
3290
0
    if (conn->mux && conn->mux->show_sd) {
3291
0
      char muxpfx[100] = "";
3292
3293
0
      snprintf(muxpfx, sizeof(muxpfx), "%s      ", pfx);
3294
0
      chunk_appendf(buf, "%s     ", pfx);
3295
0
      conn->mux->show_sd(buf, scf->sedesc, muxpfx);
3296
0
      chunk_appendf(buf, "\n");
3297
0
    }
3298
3299
0
    chunk_appendf(buf,
3300
0
                  "%s      co0=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n", pfx,
3301
0
            conn,
3302
0
            conn_get_ctrl_name(conn),
3303
0
            conn_get_xprt_name(conn),
3304
0
            conn_get_mux_name(conn),
3305
0
            sc_get_data_name(scf),
3306
0
                  obj_type_name(conn->target),
3307
0
                  obj_base_ptr(conn->target));
3308
3309
0
    chunk_appendf(buf,
3310
0
                  "%s      flags=0x%08x fd=%d fd.state=%02x updt=%d fd.tmask=0x%lx\n", pfx,
3311
0
                  conn->flags,
3312
0
                  conn_fd(conn),
3313
0
                  conn_fd(conn) >= 0 ? fdtab[conn->handle.fd].state : 0,
3314
0
                  conn_fd(conn) >= 0 ? !!(fdtab[conn->handle.fd].update_mask & ti->ltid_bit) : 0,
3315
0
            conn_fd(conn) >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
3316
0
  }
3317
0
  else if ((tmpctx = sc_appctx(scf)) != NULL) {
3318
0
    chunk_appendf(buf,
3319
0
                  "%s      app0=%p st0=%d st1=%d applet=%s tid=%d nice=%d calls=%u rate=%u\n", pfx,
3320
0
            tmpctx,
3321
0
            tmpctx->st0,
3322
0
            tmpctx->st1,
3323
0
                  tmpctx->applet->name,
3324
0
                  tmpctx->t->tid,
3325
0
                  tmpctx->t->nice, tmpctx->t->calls, read_freq_ctr(&tmpctx->call_rate));
3326
0
  }
3327
3328
0
  scb = strm->scb;
3329
0
  chunk_appendf(buf, "%s  scb=%p flags=0x%08x state=%s endp=%s,%p,0x%08x sub=%d", pfx,
3330
0
          scb, scb->flags, sc_state_str(scb->state),
3331
0
          (sc_ep_test(scb, SE_FL_T_MUX) ? "CONN" : (sc_ep_test(scb, SE_FL_T_APPLET) ? "APPCTX" : "NONE")),
3332
0
          scb->sedesc->se, sc_ep_get(scb), scb->wait_event.events);
3333
0
  chunk_appendf(buf, " rex=%s",
3334
0
          sc_ep_rcv_ex(scb) ? human_time(TICKS_TO_MS(sc_ep_rcv_ex(scb) - now_ms), TICKS_TO_MS(1000)) : "<NEVER>");
3335
0
  chunk_appendf(buf, " wex=%s\n",
3336
0
          sc_ep_snd_ex(scb) ? human_time(TICKS_TO_MS(sc_ep_snd_ex(scb) - now_ms), TICKS_TO_MS(1000)) : "<NEVER>");
3337
3338
0
  chunk_appendf(&trash, "%s    iobuf.flags=0x%08x .pipe=%d .buf=%u@%p+%u/%u\n", pfx,
3339
0
          scb->sedesc->iobuf.flags,
3340
0
          scb->sedesc->iobuf.pipe ? scb->sedesc->iobuf.pipe->data : 0,
3341
0
          scb->sedesc->iobuf.buf ? (unsigned int)b_data(scb->sedesc->iobuf.buf): 0,
3342
0
          scb->sedesc->iobuf.buf ? b_orig(scb->sedesc->iobuf.buf): NULL,
3343
0
          scb->sedesc->iobuf.buf ? (unsigned int)b_head_ofs(scb->sedesc->iobuf.buf): 0,
3344
0
          scb->sedesc->iobuf.buf ? (unsigned int)b_size(scb->sedesc->iobuf.buf): 0);
3345
3346
0
  if ((conn = sc_conn(scb)) != NULL) {
3347
0
    if (conn->mux && conn->mux->show_sd) {
3348
0
      char muxpfx[100] = "";
3349
3350
0
      snprintf(muxpfx, sizeof(muxpfx), "%s      ", pfx);
3351
0
      chunk_appendf(buf, "%s     ", pfx);
3352
0
      conn->mux->show_sd(buf, scb->sedesc, muxpfx);
3353
0
      chunk_appendf(buf, "\n");
3354
0
    }
3355
3356
0
    chunk_appendf(buf,
3357
0
                  "%s      co1=%p ctrl=%s xprt=%s mux=%s data=%s target=%s:%p\n", pfx,
3358
0
            conn,
3359
0
            conn_get_ctrl_name(conn),
3360
0
            conn_get_xprt_name(conn),
3361
0
            conn_get_mux_name(conn),
3362
0
            sc_get_data_name(scb),
3363
0
                  obj_type_name(conn->target),
3364
0
                  obj_base_ptr(conn->target));
3365
3366
0
    chunk_appendf(buf,
3367
0
                  "%s      flags=0x%08x fd=%d fd.state=%02x updt=%d fd.tmask=0x%lx\n", pfx,
3368
0
                  conn->flags,
3369
0
                  conn_fd(conn),
3370
0
                  conn_fd(conn) >= 0 ? fdtab[conn->handle.fd].state : 0,
3371
0
                  conn_fd(conn) >= 0 ? !!(fdtab[conn->handle.fd].update_mask & ti->ltid_bit) : 0,
3372
0
            conn_fd(conn) >= 0 ? fdtab[conn->handle.fd].thread_mask: 0);
3373
0
  }
3374
0
  else if ((tmpctx = sc_appctx(scb)) != NULL) {
3375
0
    chunk_appendf(buf,
3376
0
                  "%s      app1=%p st0=%d st1=%d applet=%s tid=%d nice=%d calls=%u rate=%u\n", pfx,
3377
0
            tmpctx,
3378
0
            tmpctx->st0,
3379
0
            tmpctx->st1,
3380
0
                  tmpctx->applet->name,
3381
0
                  tmpctx->t->tid,
3382
0
                  tmpctx->t->nice, tmpctx->t->calls, read_freq_ctr(&tmpctx->call_rate));
3383
0
  }
3384
3385
0
  if (HAS_FILTERS(strm)) {
3386
0
    const struct filter *flt;
3387
3388
0
    chunk_appendf(buf, "%s  filters={", pfx);
3389
0
    list_for_each_entry(flt, &strm->strm_flt.filters, list) {
3390
0
      if (flt->list.p != &strm->strm_flt.filters)
3391
0
        chunk_appendf(buf, ", ");
3392
0
      chunk_appendf(buf, "%p=\"%s\"", flt, FLT_ID(flt));
3393
0
    }
3394
0
    chunk_appendf(buf, "}\n");
3395
0
  }
3396
3397
0
  chunk_appendf(buf,
3398
0
         "%s  req=%p (f=0x%06x an=0x%x tofwd=%d total=%lld)\n"
3399
0
         "%s      an_exp=%s buf=%p data=%p o=%u p=%u i=%u size=%u\n",
3400
0
         pfx,
3401
0
         &strm->req,
3402
0
         strm->req.flags, strm->req.analysers,
3403
0
         strm->req.to_forward, strm->req.total,
3404
0
         pfx,
3405
0
         strm->req.analyse_exp ?
3406
0
         human_time(TICKS_TO_MS(strm->req.analyse_exp - now_ms),
3407
0
        TICKS_TO_MS(1000)) : "<NEVER>",
3408
0
         &strm->req.buf,
3409
0
         b_orig(&strm->req.buf), (unsigned int)co_data(&strm->req),
3410
0
         (unsigned int)ci_head_ofs(&strm->req), (unsigned int)ci_data(&strm->req),
3411
0
         (unsigned int)strm->req.buf.size);
3412
3413
0
  if (IS_HTX_STRM(strm)) {
3414
0
    struct htx *htx = htxbuf(&strm->req.buf);
3415
3416
0
    chunk_appendf(buf,
3417
0
            "%s      htx=%p flags=0x%x size=%u data=%u used=%u wrap=%s extra=%llu\n", pfx,
3418
0
            htx, htx->flags, htx->size, htx->data, htx_nbblks(htx),
3419
0
            (htx->tail >= htx->head) ? "NO" : "YES",
3420
0
            (unsigned long long)htx->extra);
3421
0
  }
3422
0
  if (HAS_FILTERS(strm) && strm->strm_flt.current[0]) {
3423
0
    const struct filter *flt = strm->strm_flt.current[0];
3424
3425
0
    chunk_appendf(buf, "%s      current_filter=%p (id=\"%s\" flags=0x%x pre=0x%x post=0x%x) \n", pfx,
3426
0
            flt, flt->config->id, flt->flags, flt->pre_analyzers, flt->post_analyzers);
3427
0
  }
3428
3429
0
  chunk_appendf(buf,
3430
0
         "%s  res=%p (f=0x%06x an=0x%x tofwd=%d total=%lld)\n"
3431
0
         "%s      an_exp=%s buf=%p data=%p o=%u p=%u i=%u size=%u\n",
3432
0
         pfx,
3433
0
         &strm->res,
3434
0
         strm->res.flags, strm->res.analysers,
3435
0
         strm->res.to_forward, strm->res.total,
3436
0
         pfx,
3437
0
         strm->res.analyse_exp ?
3438
0
         human_time(TICKS_TO_MS(strm->res.analyse_exp - now_ms),
3439
0
        TICKS_TO_MS(1000)) : "<NEVER>",
3440
0
         &strm->res.buf,
3441
0
               b_orig(&strm->res.buf), (unsigned int)co_data(&strm->res),
3442
0
               (unsigned int)ci_head_ofs(&strm->res), (unsigned int)ci_data(&strm->res),
3443
0
         (unsigned int)strm->res.buf.size);
3444
3445
0
  if (IS_HTX_STRM(strm)) {
3446
0
    struct htx *htx = htxbuf(&strm->res.buf);
3447
3448
0
    chunk_appendf(buf,
3449
0
            "%s      htx=%p flags=0x%x size=%u data=%u used=%u wrap=%s extra=%llu\n", pfx,
3450
0
            htx, htx->flags, htx->size, htx->data, htx_nbblks(htx),
3451
0
            (htx->tail >= htx->head) ? "NO" : "YES",
3452
0
            (unsigned long long)htx->extra);
3453
0
  }
3454
3455
0
  if (HAS_FILTERS(strm) && strm->strm_flt.current[1]) {
3456
0
    const struct filter *flt = strm->strm_flt.current[1];
3457
3458
0
    chunk_appendf(buf, "%s      current_filter=%p (id=\"%s\" flags=0x%x pre=0x%x post=0x%x) \n", pfx,
3459
0
            flt, flt->config->id, flt->flags, flt->pre_analyzers, flt->post_analyzers);
3460
0
  }
3461
3462
0
  if (strm->current_rule_list && strm->current_rule) {
3463
0
    const struct act_rule *rule = strm->current_rule;
3464
0
    chunk_appendf(buf, "%s  current_rule=\"%s\" [%s:%d]\n", pfx, rule->kw->kw, rule->conf.file, rule->conf.line);
3465
0
  }
3466
0
}
3467
3468
/* This function dumps a complete stream state onto the stream connector's
3469
 * read buffer. The stream has to be set in strm. It returns 0 if the output
3470
 * buffer is full and it needs to be called again, otherwise non-zero. It is
3471
 * designed to be called from stats_dump_strm_to_buffer() below.
3472
 */
3473
static int stats_dump_full_strm_to_buffer(struct stconn *sc, struct stream *strm)
3474
0
{
3475
0
  struct appctx *appctx = __sc_appctx(sc);
3476
0
  struct show_sess_ctx *ctx = appctx->svcctx;
3477
3478
0
  chunk_reset(&trash);
3479
3480
0
  if (ctx->section > 0 && ctx->uid != strm->uniq_id) {
3481
    /* stream changed, no need to go any further */
3482
0
    chunk_appendf(&trash, "  *** session terminated while we were watching it ***\n");
3483
0
    if (applet_putchk(appctx, &trash) == -1)
3484
0
      goto full;
3485
0
    goto done;
3486
0
  }
3487
3488
0
  switch (ctx->section) {
3489
0
  case 0: /* main status of the stream */
3490
0
    ctx->uid = strm->uniq_id;
3491
0
    ctx->section = 1;
3492
0
    __fallthrough;
3493
3494
0
  case 1:
3495
0
    strm_dump_to_buffer(&trash, strm, "", appctx->cli_anon_key);
3496
0
    if (applet_putchk(appctx, &trash) == -1)
3497
0
      goto full;
3498
3499
    /* use other states to dump the contents */
3500
0
  }
3501
  /* end of dump */
3502
0
 done:
3503
0
  ctx->uid = 0;
3504
0
  ctx->section = 0;
3505
0
  return 1;
3506
0
 full:
3507
0
  return 0;
3508
0
}
3509
3510
static int cli_parse_show_sess(char **args, char *payload, struct appctx *appctx, void *private)
3511
0
{
3512
0
  struct show_sess_ctx *ctx = applet_reserve_svcctx(appctx, sizeof(*ctx));
3513
3514
0
  if (!cli_has_level(appctx, ACCESS_LVL_OPER))
3515
0
    return 1;
3516
3517
  /* now all sessions by default */
3518
0
  ctx->target = NULL;
3519
0
  ctx->min_age = 0;
3520
0
  ctx->section = 0; /* start with stream status */
3521
0
  ctx->pos = 0;
3522
0
  ctx->thr = 0;
3523
3524
0
  if (*args[2] && strcmp(args[2], "older") == 0) {
3525
0
    unsigned timeout;
3526
0
    const char *res;
3527
3528
0
    if (!*args[3])
3529
0
      return cli_err(appctx, "Expects a minimum age (in seconds by default).\n");
3530
3531
0
    res = parse_time_err(args[3], &timeout, TIME_UNIT_S);
3532
0
    if (res != 0)
3533
0
      return cli_err(appctx, "Invalid age.\n");
3534
3535
0
    ctx->min_age = timeout;
3536
0
    ctx->target = (void *)-1; /* show all matching entries */
3537
0
  }
3538
0
  else if (*args[2] && strcmp(args[2], "susp") == 0) {
3539
0
    ctx->flags |= CLI_SHOWSESS_F_SUSP;
3540
0
    ctx->target = (void *)-1; /* show all matching entries */
3541
0
  }
3542
0
  else if (*args[2] && strcmp(args[2], "all") == 0)
3543
0
    ctx->target = (void *)-1;
3544
0
  else if (*args[2])
3545
0
    ctx->target = (void *)strtoul(args[2], NULL, 0);
3546
3547
  /* The back-ref must be reset, it will be detected and set by
3548
   * the dump code upon first invocation.
3549
   */
3550
0
  LIST_INIT(&ctx->bref.users);
3551
3552
  /* let's set our own stream's epoch to the current one and increment
3553
   * it so that we know which streams were already there before us.
3554
   */
3555
0
  appctx_strm(appctx)->stream_epoch = _HA_ATOMIC_FETCH_ADD(&stream_epoch, 1);
3556
0
  return 0;
3557
0
}
3558
3559
/* This function dumps all streams' states onto the stream connector's
3560
 * read buffer. It returns 0 if the output buffer is full and it needs
3561
 * to be called again, otherwise non-zero. It proceeds in an isolated
3562
 * thread so there is no thread safety issue here.
3563
 */
3564
static int cli_io_handler_dump_sess(struct appctx *appctx)
3565
0
{
3566
0
  struct show_sess_ctx *ctx = appctx->svcctx;
3567
0
  struct stconn *sc = appctx_sc(appctx);
3568
0
  struct connection *conn;
3569
3570
0
  thread_isolate();
3571
3572
0
  if (ctx->thr >= global.nbthread) {
3573
    /* already terminated */
3574
0
    goto done;
3575
0
  }
3576
3577
  /* FIXME: Don't watch the other side !*/
3578
0
  if (unlikely(sc_opposite(sc)->flags & SC_FL_SHUT_DONE)) {
3579
    /* If we're forced to shut down, we might have to remove our
3580
     * reference to the last stream being dumped.
3581
     */
3582
0
    if (!LIST_ISEMPTY(&ctx->bref.users)) {
3583
0
      LIST_DELETE(&ctx->bref.users);
3584
0
      LIST_INIT(&ctx->bref.users);
3585
0
    }
3586
0
    goto done;
3587
0
  }
3588
3589
0
  chunk_reset(&trash);
3590
3591
  /* first, let's detach the back-ref from a possible previous stream */
3592
0
  if (!LIST_ISEMPTY(&ctx->bref.users)) {
3593
0
    LIST_DELETE(&ctx->bref.users);
3594
0
    LIST_INIT(&ctx->bref.users);
3595
0
  } else if (!ctx->bref.ref) {
3596
    /* first call, start with first stream */
3597
0
    ctx->bref.ref = ha_thread_ctx[ctx->thr].streams.n;
3598
0
  }
3599
3600
  /* and start from where we stopped */
3601
0
  while (1) {
3602
0
    char pn[INET6_ADDRSTRLEN];
3603
0
    struct stream *curr_strm;
3604
0
    int done= 0;
3605
3606
0
    if (ctx->bref.ref == &ha_thread_ctx[ctx->thr].streams)
3607
0
      done = 1;
3608
0
    else {
3609
      /* check if we've found a stream created after issuing the "show sess" */
3610
0
      curr_strm = LIST_ELEM(ctx->bref.ref, struct stream *, list);
3611
0
      if ((int)(curr_strm->stream_epoch - appctx_strm(appctx)->stream_epoch) > 0)
3612
0
        done = 1;
3613
0
    }
3614
3615
0
    if (done) {
3616
0
      ctx->thr++;
3617
0
      if (ctx->thr >= global.nbthread)
3618
0
        break;
3619
0
      ctx->bref.ref = ha_thread_ctx[ctx->thr].streams.n;
3620
0
      continue;
3621
0
    }
3622
3623
0
    if (ctx->min_age) {
3624
0
      uint age = ns_to_sec(now_ns) - ns_to_sec(curr_strm->logs.request_ts);
3625
0
      if (age < ctx->min_age)
3626
0
        goto next_sess;
3627
0
    }
3628
3629
0
    if (ctx->flags & CLI_SHOWSESS_F_SUSP) {
3630
      /* only show suspicious streams. Non-suspicious ones have a valid
3631
       * expiration date in the future and a valid front endpoint.
3632
       */
3633
0
      if (curr_strm->task->expire &&
3634
0
          !tick_is_expired(curr_strm->task->expire, now_ms) &&
3635
0
          curr_strm->scf && curr_strm->scf->sedesc && curr_strm->scf->sedesc->se)
3636
0
        goto next_sess;
3637
0
    }
3638
3639
0
    if (ctx->target) {
3640
0
      if (ctx->target != (void *)-1 && ctx->target != curr_strm)
3641
0
        goto next_sess;
3642
3643
0
      LIST_APPEND(&curr_strm->back_refs, &ctx->bref.users);
3644
      /* call the proper dump() function and return if we're missing space */
3645
0
      if (!stats_dump_full_strm_to_buffer(sc, curr_strm))
3646
0
        goto full;
3647
3648
      /* stream dump complete */
3649
0
      LIST_DELETE(&ctx->bref.users);
3650
0
      LIST_INIT(&ctx->bref.users);
3651
0
      if (ctx->target != (void *)-1) {
3652
0
        ctx->target = NULL;
3653
0
        break;
3654
0
      }
3655
0
      else
3656
0
        goto next_sess;
3657
0
    }
3658
3659
0
    chunk_appendf(&trash,
3660
0
           "%p: proto=%s",
3661
0
           curr_strm,
3662
0
           strm_li(curr_strm) ? strm_li(curr_strm)->rx.proto->name : "?");
3663
3664
0
    conn = objt_conn(strm_orig(curr_strm));
3665
0
    switch (conn && conn_get_src(conn) ? addr_to_str(conn->src, pn, sizeof(pn)) : AF_UNSPEC) {
3666
0
    case AF_INET:
3667
0
    case AF_INET6:
3668
0
      chunk_appendf(&trash,
3669
0
             " src=%s:%d fe=%s be=%s srv=%s",
3670
0
             HA_ANON_CLI(pn),
3671
0
             get_host_port(conn->src),
3672
0
             HA_ANON_CLI(strm_fe(curr_strm)->id),
3673
0
             (curr_strm->be->cap & PR_CAP_BE) ? HA_ANON_CLI(curr_strm->be->id) : "<NONE>",
3674
0
             objt_server(curr_strm->target) ? HA_ANON_CLI(__objt_server(curr_strm->target)->id) : "<none>"
3675
0
             );
3676
0
      break;
3677
0
    case AF_UNIX:
3678
0
      chunk_appendf(&trash,
3679
0
             " src=unix:%d fe=%s be=%s srv=%s",
3680
0
             strm_li(curr_strm)->luid,
3681
0
             HA_ANON_CLI(strm_fe(curr_strm)->id),
3682
0
             (curr_strm->be->cap & PR_CAP_BE) ? HA_ANON_CLI(curr_strm->be->id) : "<NONE>",
3683
0
             objt_server(curr_strm->target) ? HA_ANON_CLI(__objt_server(curr_strm->target)->id) : "<none>"
3684
0
             );
3685
0
      break;
3686
0
    }
3687
3688
0
    chunk_appendf(&trash,
3689
0
           " ts=%02x epoch=%#x age=%s calls=%u rate=%u cpu=%llu lat=%llu",
3690
0
                 curr_strm->task->state, curr_strm->stream_epoch,
3691
0
                 human_time(ns_to_sec(now_ns) - ns_to_sec(curr_strm->logs.request_ts), 1),
3692
0
                 curr_strm->task->calls, read_freq_ctr(&curr_strm->call_rate),
3693
0
                 (unsigned long long)curr_strm->cpu_time, (unsigned long long)curr_strm->lat_time);
3694
3695
0
    chunk_appendf(&trash,
3696
0
           " rq[f=%06xh,i=%u,an=%02xh",
3697
0
           curr_strm->req.flags,
3698
0
                 (unsigned int)ci_data(&curr_strm->req),
3699
0
           curr_strm->req.analysers);
3700
3701
0
    chunk_appendf(&trash,
3702
0
           ",ax=%s]",
3703
0
           curr_strm->req.analyse_exp ?
3704
0
           human_time(TICKS_TO_MS(curr_strm->req.analyse_exp - now_ms),
3705
0
          TICKS_TO_MS(1000)) : "");
3706
3707
0
    chunk_appendf(&trash,
3708
0
           " rp[f=%06xh,i=%u,an=%02xh",
3709
0
           curr_strm->res.flags,
3710
0
                 (unsigned int)ci_data(&curr_strm->res),
3711
0
           curr_strm->res.analysers);
3712
0
    chunk_appendf(&trash,
3713
0
           ",ax=%s]",
3714
0
           curr_strm->res.analyse_exp ?
3715
0
           human_time(TICKS_TO_MS(curr_strm->res.analyse_exp - now_ms),
3716
0
          TICKS_TO_MS(1000)) : "");
3717
3718
0
    conn = sc_conn(curr_strm->scf);
3719
0
    chunk_appendf(&trash," scf=[%d,%1xh,fd=%d",
3720
0
            curr_strm->scf->state, curr_strm->scf->flags, conn_fd(conn));
3721
0
    chunk_appendf(&trash, ",rex=%s",
3722
0
            sc_ep_rcv_ex(curr_strm->scf) ?
3723
0
            human_time(TICKS_TO_MS(sc_ep_rcv_ex(curr_strm->scf) - now_ms),
3724
0
           TICKS_TO_MS(1000)) : "");
3725
0
    chunk_appendf(&trash,",wex=%s]",
3726
0
            sc_ep_snd_ex(curr_strm->scf) ?
3727
0
            human_time(TICKS_TO_MS(sc_ep_snd_ex(curr_strm->scf) - now_ms),
3728
0
           TICKS_TO_MS(1000)) : "");
3729
3730
0
    conn = sc_conn(curr_strm->scb);
3731
0
    chunk_appendf(&trash, " scb=[%d,%1xh,fd=%d",
3732
0
            curr_strm->scb->state, curr_strm->scb->flags, conn_fd(conn));
3733
0
    chunk_appendf(&trash, ",rex=%s",
3734
0
            sc_ep_rcv_ex(curr_strm->scb) ?
3735
0
            human_time(TICKS_TO_MS(sc_ep_rcv_ex(curr_strm->scb) - now_ms),
3736
0
           TICKS_TO_MS(1000)) : "");
3737
0
    chunk_appendf(&trash, ",wex=%s]",
3738
0
            sc_ep_snd_ex(curr_strm->scb) ?
3739
0
            human_time(TICKS_TO_MS(sc_ep_snd_ex(curr_strm->scb) - now_ms),
3740
0
           TICKS_TO_MS(1000)) : "");
3741
3742
0
    chunk_appendf(&trash,
3743
0
           " exp=%s rc=%d c_exp=%s",
3744
0
           curr_strm->task->expire ?
3745
0
           human_time(TICKS_TO_MS(curr_strm->task->expire - now_ms),
3746
0
          TICKS_TO_MS(1000)) : "",
3747
0
           curr_strm->conn_retries,
3748
0
           curr_strm->conn_exp ?
3749
0
           human_time(TICKS_TO_MS(curr_strm->conn_exp - now_ms),
3750
0
          TICKS_TO_MS(1000)) : "");
3751
0
    if (task_in_rq(curr_strm->task))
3752
0
      chunk_appendf(&trash, " run(nice=%d)", curr_strm->task->nice);
3753
3754
0
    chunk_appendf(&trash, "\n");
3755
3756
0
    if (applet_putchk(appctx, &trash) == -1) {
3757
      /* let's try again later from this stream. We add ourselves into
3758
       * this stream's users so that it can remove us upon termination.
3759
       */
3760
0
      LIST_APPEND(&curr_strm->back_refs, &ctx->bref.users);
3761
0
      goto full;
3762
0
    }
3763
3764
0
  next_sess:
3765
0
    ctx->bref.ref = curr_strm->list.n;
3766
0
  }
3767
3768
0
  if (ctx->target && ctx->target != (void *)-1) {
3769
    /* specified stream not found */
3770
0
    if (ctx->section > 0)
3771
0
      chunk_appendf(&trash, "  *** session terminated while we were watching it ***\n");
3772
0
    else
3773
0
      chunk_appendf(&trash, "Session not found.\n");
3774
3775
0
    if (applet_putchk(appctx, &trash) == -1)
3776
0
      goto full;
3777
3778
0
    ctx->target = NULL;
3779
0
    ctx->uid = 0;
3780
0
    goto done;
3781
0
  }
3782
3783
0
 done:
3784
0
  thread_release();
3785
0
  return 1;
3786
0
 full:
3787
0
  thread_release();
3788
0
  return 0;
3789
0
}
3790
3791
static void cli_release_show_sess(struct appctx *appctx)
3792
0
{
3793
0
  struct show_sess_ctx *ctx = appctx->svcctx;
3794
3795
0
  if (ctx->thr < global.nbthread) {
3796
    /* a dump was aborted, either in error or timeout. We need to
3797
     * safely detach from the target stream's list. It's mandatory
3798
     * to lock because a stream on the target thread could be moving
3799
     * our node.
3800
     */
3801
0
    thread_isolate();
3802
0
    if (!LIST_ISEMPTY(&ctx->bref.users))
3803
0
      LIST_DELETE(&ctx->bref.users);
3804
0
    thread_release();
3805
0
  }
3806
0
}
3807
3808
/* Parses the "shutdown session" directive, it always returns 1 */
3809
static int cli_parse_shutdown_session(char **args, char *payload, struct appctx *appctx, void *private)
3810
0
{
3811
0
  struct stream *strm, *ptr;
3812
0
  int thr;
3813
3814
0
  if (!cli_has_level(appctx, ACCESS_LVL_ADMIN))
3815
0
    return 1;
3816
3817
0
  ptr = (void *)strtoul(args[2], NULL, 0);
3818
0
  if (!ptr)
3819
0
    return cli_err(appctx, "Session pointer expected (use 'show sess').\n");
3820
3821
0
  strm = NULL;
3822
3823
0
  thread_isolate();
3824
3825
  /* first, look for the requested stream in the stream table */
3826
0
  for (thr = 0; strm != ptr && thr < global.nbthread; thr++) {
3827
0
    list_for_each_entry(strm, &ha_thread_ctx[thr].streams, list) {
3828
0
      if (strm == ptr) {
3829
0
        stream_shutdown(strm, SF_ERR_KILLED);
3830
0
        break;
3831
0
      }
3832
0
    }
3833
0
  }
3834
3835
0
  thread_release();
3836
3837
  /* do we have the stream ? */
3838
0
  if (strm != ptr)
3839
0
    return cli_err(appctx, "No such session (use 'show sess').\n");
3840
3841
0
  return 1;
3842
0
}
3843
3844
/* Parses the "shutdown session server" directive, it always returns 1 */
3845
static int cli_parse_shutdown_sessions_server(char **args, char *payload, struct appctx *appctx, void *private)
3846
0
{
3847
0
  struct server *sv;
3848
3849
0
  if (!cli_has_level(appctx, ACCESS_LVL_ADMIN))
3850
0
    return 1;
3851
3852
0
  sv = cli_find_server(appctx, args[3]);
3853
0
  if (!sv)
3854
0
    return 1;
3855
3856
  /* kill all the stream that are on this server */
3857
0
  HA_SPIN_LOCK(SERVER_LOCK, &sv->lock);
3858
0
  srv_shutdown_streams(sv, SF_ERR_KILLED);
3859
0
  HA_SPIN_UNLOCK(SERVER_LOCK, &sv->lock);
3860
0
  return 1;
3861
0
}
3862
3863
/* register cli keywords */
3864
static struct cli_kw_list cli_kws = {{ },{
3865
  { { "show", "sess",  NULL },             "show sess [<id>|all|susp|older <age>]   : report the list of current sessions or dump this exact session", cli_parse_show_sess, cli_io_handler_dump_sess, cli_release_show_sess },
3866
  { { "shutdown", "session",  NULL },      "shutdown session [id]                   : kill a specific session",                                        cli_parse_shutdown_session, NULL, NULL },
3867
  { { "shutdown", "sessions",  "server" }, "shutdown sessions server <bk>/<srv>     : kill sessions on a server",                                      cli_parse_shutdown_sessions_server, NULL, NULL },
3868
  {{},}
3869
}};
3870
3871
INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
3872
3873
/* main configuration keyword registration. */
3874
static struct action_kw_list stream_tcp_req_keywords = { ILH, {
3875
  { "set-log-level", stream_parse_set_log_level },
3876
  { "set-nice",      stream_parse_set_nice },
3877
  { "switch-mode",   stream_parse_switch_mode },
3878
  { "use-service",   stream_parse_use_service },
3879
  { /* END */ }
3880
}};
3881
3882
INITCALL1(STG_REGISTER, tcp_req_cont_keywords_register, &stream_tcp_req_keywords);
3883
3884
/* main configuration keyword registration. */
3885
static struct action_kw_list stream_tcp_res_keywords = { ILH, {
3886
  { "set-log-level", stream_parse_set_log_level },
3887
  { "set-nice",     stream_parse_set_nice },
3888
  { /* END */ }
3889
}};
3890
3891
INITCALL1(STG_REGISTER, tcp_res_cont_keywords_register, &stream_tcp_res_keywords);
3892
3893
static struct action_kw_list stream_http_req_keywords = { ILH, {
3894
  { "set-log-level", stream_parse_set_log_level },
3895
  { "set-nice",      stream_parse_set_nice },
3896
  { "use-service",   stream_parse_use_service },
3897
  { /* END */ }
3898
}};
3899
3900
INITCALL1(STG_REGISTER, http_req_keywords_register, &stream_http_req_keywords);
3901
3902
static struct action_kw_list stream_http_res_keywords = { ILH, {
3903
  { "set-log-level", stream_parse_set_log_level },
3904
  { "set-nice",      stream_parse_set_nice },
3905
  { /* END */ }
3906
}};
3907
3908
INITCALL1(STG_REGISTER, http_res_keywords_register, &stream_http_res_keywords);
3909
3910
static struct action_kw_list stream_http_after_res_actions =  { ILH, {
3911
  { "set-log-level", stream_parse_set_log_level },
3912
  { /* END */ }
3913
}};
3914
3915
INITCALL1(STG_REGISTER, http_after_res_keywords_register, &stream_http_after_res_actions);
3916
3917
static int smp_fetch_cur_client_timeout(const struct arg *args, struct sample *smp, const char *km, void *private)
3918
0
{
3919
0
  smp->flags = SMP_F_VOL_TXN;
3920
0
  smp->data.type = SMP_T_SINT;
3921
0
  if (!smp->strm)
3922
0
    return 0;
3923
3924
0
  smp->data.u.sint = TICKS_TO_MS(smp->strm->scf->ioto);
3925
0
  return 1;
3926
0
}
3927
3928
static int smp_fetch_cur_server_timeout(const struct arg *args, struct sample *smp, const char *km, void *private)
3929
0
{
3930
0
  smp->flags = SMP_F_VOL_TXN;
3931
0
  smp->data.type = SMP_T_SINT;
3932
0
  if (!smp->strm)
3933
0
    return 0;
3934
3935
0
  smp->data.u.sint = TICKS_TO_MS(smp->strm->scb->ioto);
3936
0
  return 1;
3937
0
}
3938
3939
static int smp_fetch_cur_tunnel_timeout(const struct arg *args, struct sample *smp, const char *km, void *private)
3940
0
{
3941
0
  smp->flags = SMP_F_VOL_TXN;
3942
0
  smp->data.type = SMP_T_SINT;
3943
0
  if (!smp->strm)
3944
0
    return 0;
3945
3946
0
  smp->data.u.sint = TICKS_TO_MS(smp->strm->tunnel_timeout);
3947
0
  return 1;
3948
0
}
3949
3950
static int smp_fetch_last_rule_file(const struct arg *args, struct sample *smp, const char *km, void *private)
3951
0
{
3952
0
  smp->flags = SMP_F_VOL_TXN;
3953
0
  smp->data.type = SMP_T_STR;
3954
0
  if (!smp->strm || !smp->strm->last_rule_file)
3955
0
    return 0;
3956
3957
0
  smp->flags |= SMP_F_CONST;
3958
0
  smp->data.u.str.area = (char *)smp->strm->last_rule_file;
3959
0
  smp->data.u.str.data = strlen(smp->strm->last_rule_file);
3960
0
  return 1;
3961
0
}
3962
3963
static int smp_fetch_last_rule_line(const struct arg *args, struct sample *smp, const char *km, void *private)
3964
0
{
3965
0
  smp->flags = SMP_F_VOL_TXN;
3966
0
  smp->data.type = SMP_T_SINT;
3967
0
  if (!smp->strm || !smp->strm->last_rule_line)
3968
0
    return 0;
3969
3970
0
  smp->data.u.sint = smp->strm->last_rule_line;
3971
0
  return 1;
3972
0
}
3973
3974
/* Note: must not be declared <const> as its list will be overwritten.
3975
 * Please take care of keeping this list alphabetically sorted.
3976
 */
3977
static struct sample_fetch_kw_list smp_kws = {ILH, {
3978
  { "cur_client_timeout", smp_fetch_cur_client_timeout, 0, NULL, SMP_T_SINT, SMP_USE_FTEND, },
3979
  { "cur_server_timeout", smp_fetch_cur_server_timeout, 0, NULL, SMP_T_SINT, SMP_USE_BKEND, },
3980
  { "cur_tunnel_timeout", smp_fetch_cur_tunnel_timeout, 0, NULL, SMP_T_SINT, SMP_USE_BKEND, },
3981
  { "last_rule_file",     smp_fetch_last_rule_file,     0, NULL, SMP_T_STR,  SMP_USE_INTRN, },
3982
  { "last_rule_line",     smp_fetch_last_rule_line,     0, NULL, SMP_T_SINT, SMP_USE_INTRN, },
3983
  { NULL, NULL, 0, 0, 0 },
3984
}};
3985
3986
INITCALL1(STG_REGISTER, sample_register_fetches, &smp_kws);
3987
3988
/*
3989
 * Local variables:
3990
 *  c-indent-level: 8
3991
 *  c-basic-offset: 8
3992
 * End:
3993
 */