Coverage Report

Created: 2026-01-17 06:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/haproxy/src/sink.c
Line
Count
Source
1
/*
2
 * Event sink management
3
 *
4
 * Copyright (C) 2000-2019 Willy Tarreau - w@1wt.eu
5
 *
6
 * This library is free software; you can redistribute it and/or
7
 * modify it under the terms of the GNU Lesser General Public
8
 * License as published by the Free Software Foundation, version 2.1
9
 * exclusively.
10
 *
11
 * This library is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
 * Lesser General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU Lesser General Public
17
 * License along with this library; if not, write to the Free Software
18
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19
 */
20
21
#include <sys/mman.h>
22
#include <errno.h>
23
#include <fcntl.h>
24
25
#include <import/ist.h>
26
#include <haproxy/api.h>
27
#include <haproxy/applet.h>
28
#include <haproxy/cfgparse.h>
29
#include <haproxy/cli.h>
30
#include <haproxy/errors.h>
31
#include <haproxy/list.h>
32
#include <haproxy/log.h>
33
#include <haproxy/proxy.h>
34
#include <haproxy/ring.h>
35
#include <haproxy/sc_strm.h>
36
#include <haproxy/signal.h>
37
#include <haproxy/sink.h>
38
#include <haproxy/stconn.h>
39
#include <haproxy/time.h>
40
#include <haproxy/tools.h>
41
42
struct list sink_list = LIST_HEAD_INIT(sink_list);
43
44
/* sink proxies list */
45
struct proxy *sink_proxies_list;
46
47
struct sink *cfg_sink;
48
49
static struct sink *_sink_find(const char *name)
50
0
{
51
0
  struct sink *sink;
52
53
0
  list_for_each_entry(sink, &sink_list, sink_list)
54
0
    if (strcmp(sink->name, name) == 0)
55
0
      return sink;
56
0
  return NULL;
57
0
}
58
59
/* returns sink if it really exists */
60
struct sink *sink_find(const char *name)
61
0
{
62
0
  struct sink *sink;
63
64
0
  sink = _sink_find(name);
65
0
  if (sink && sink->type != SINK_TYPE_FORWARD_DECLARED)
66
0
    return sink;
67
0
  return NULL;
68
0
}
69
70
/* Similar to sink_find(), but intended to be used during config parsing:
71
 * tries to resolve sink name, if it fails, creates the sink and marks
72
 * it as forward-declared and hope that it will be defined later.
73
 *
74
 * The caller has to identify itself using <from>, <file> and <line> in
75
 * order to report precise error messages in the event that the sink is
76
 * never defined later (only the first misuse will be considered).
77
 *
78
 * It returns the sink on success and NULL on failure (memory error)
79
 */
80
struct sink *sink_find_early(const char *name, const char *from, const char *file, int line)
81
0
{
82
0
  struct sink *sink;
83
84
  /* not expected to be used during runtime */
85
0
  BUG_ON(!(global.mode & MODE_STARTING));
86
87
0
  sink = _sink_find(name);
88
0
  if (sink)
89
0
    return sink;
90
91
  /* not found, try to forward-declare it */
92
0
  sink = calloc(1, sizeof(*sink));
93
0
  if (!sink)
94
0
    return NULL;
95
96
0
  sink->name = strdup(name);
97
0
  if (!sink->name)
98
0
    goto err;
99
100
0
  memprintf(&sink->desc, "parsing [%s:%d] : %s", file, line, from);
101
0
  if (!sink->desc)
102
0
    goto err;
103
104
0
  sink->type = SINK_TYPE_FORWARD_DECLARED;
105
0
  LIST_APPEND(&sink_list, &sink->sink_list);
106
107
0
  return sink;
108
109
0
 err:
110
0
  ha_free(&sink->name);
111
0
  ha_free(&sink->desc);
112
0
  ha_free(&sink);
113
0
  return NULL;
114
0
}
115
116
/* creates a new sink and adds it to the list, it's still generic and not fully
117
 * initialized. Returns NULL on allocation failure. If another one already
118
 * exists with the same name, it will be returned. The caller can detect it as
119
 * a newly created one has type SINK_TYPE_NEW.
120
 */
121
static struct sink *__sink_new(const char *name, const char *desc, int fmt)
122
0
{
123
0
  struct sink *sink;
124
0
  uint8_t _new = 0;
125
126
0
  sink = _sink_find(name);
127
0
  if (sink) {
128
0
    if (sink->type == SINK_TYPE_FORWARD_DECLARED) {
129
0
      ha_free(&sink->desc); // free previous desc
130
0
      goto forward_declared;
131
0
    }
132
0
    goto end;
133
0
  }
134
135
0
  sink = calloc(1, sizeof(*sink));
136
0
  _new = 1;
137
0
  if (!sink)
138
0
    goto end;
139
140
0
  sink->name = strdup(name);
141
0
  if (!sink->name)
142
0
    goto err;
143
144
0
 forward_declared:
145
0
  sink->desc = strdup(desc);
146
0
  if (!sink->desc)
147
0
    goto err;
148
149
0
  sink->fmt  = fmt;
150
0
  sink->type = SINK_TYPE_NEW;
151
0
  sink->maxlen = BUFSIZE;
152
  /* address will be filled by the caller if needed */
153
0
  sink->ctx.fd = -1;
154
0
  sink->ctx.dropped = 0;
155
0
  if (_new)
156
0
    LIST_APPEND(&sink_list, &sink->sink_list);
157
0
 end:
158
0
  return sink;
159
160
0
 err:
161
0
  ha_free(&sink->name);
162
0
  ha_free(&sink->desc);
163
0
  ha_free(&sink);
164
165
0
  return NULL;
166
0
}
167
168
/* creates a sink called <name> of type FD associated to fd <fd>, format <fmt>,
169
 * and description <desc>. Returns NULL on allocation failure or conflict.
170
 * Perfect duplicates are merged (same type, fd, and name).
171
 */
172
struct sink *sink_new_fd(const char *name, const char *desc, enum log_fmt fmt, int fd)
173
0
{
174
0
  struct sink *sink;
175
176
0
  sink = __sink_new(name, desc, fmt);
177
0
  if (!sink || (sink->type == SINK_TYPE_FD && sink->ctx.fd == fd))
178
0
    goto end;
179
180
0
  if (sink->type != SINK_TYPE_NEW) {
181
0
    sink = NULL;
182
0
    goto end;
183
0
  }
184
185
0
  sink->type = SINK_TYPE_FD;
186
0
  sink->ctx.fd = fd;
187
0
 end:
188
0
  return sink;
189
0
}
190
191
/* creates a sink called <name> of type BUF of size <size>, format <fmt>,
192
 * and description <desc>. Returns NULL on allocation failure or conflict.
193
 * Perfect duplicates are merged (same type and name). If sizes differ, the
194
 * largest one is kept.
195
 */
196
struct sink *sink_new_buf(const char *name, const char *desc, enum log_fmt fmt, size_t size)
197
0
{
198
0
  struct sink *sink;
199
200
0
  sink = __sink_new(name, desc, fmt);
201
0
  if (!sink)
202
0
    goto fail;
203
204
0
  if (sink->type == SINK_TYPE_BUFFER) {
205
    /* such a buffer already exists, we may have to resize it */
206
0
    if (!ring_resize(sink->ctx.ring, size))
207
0
      goto fail;
208
0
    goto end;
209
0
  }
210
211
0
  if (sink->type != SINK_TYPE_NEW) {
212
    /* already exists of another type */
213
0
    goto fail;
214
0
  }
215
216
0
  sink->ctx.ring = ring_new(size);
217
0
  if (!sink->ctx.ring) {
218
0
    LIST_DELETE(&sink->sink_list);
219
0
    free(sink->name);
220
0
    free(sink->desc);
221
0
    free(sink);
222
0
    goto fail;
223
0
  }
224
225
0
  sink->type = SINK_TYPE_BUFFER;
226
0
 end:
227
0
  return sink;
228
0
 fail:
229
0
  return NULL;
230
0
}
231
232
/* tries to send <nmsg> message parts from message array <msg> to sink <sink>.
233
 * Formatting according to the sink's preference is done here, unless sink->fmt
234
 * is unspecified, in which case the caller formatting will be used instead.
235
 * Lost messages are NOT accounted for. It is preferable to call sink_write()
236
 * instead which will also try to emit the number of dropped messages when there
237
 * are any.
238
 *
239
 * It will stop writing at <maxlen> instead of sink->maxlen if <maxlen> is
240
 * positive and inferior to sink->maxlen.
241
 *
242
 * It returns >0 if it could write anything, <=0 otherwise.
243
 */
244
 ssize_t __sink_write(struct sink *sink, struct log_header hdr,
245
                      size_t maxlen, const struct ist msg[], size_t nmsg)
246
0
 {
247
0
  struct ist *pfx = NULL;
248
0
  size_t npfx = 0;
249
250
0
  if (sink->fmt == LOG_FORMAT_RAW)
251
0
    goto send;
252
253
0
  if (sink->fmt != LOG_FORMAT_UNSPEC)
254
0
    hdr.format = sink->fmt; /* sink format prevails over log one */
255
0
  pfx = build_log_header(hdr, &npfx);
256
257
0
send:
258
0
  if (!maxlen)
259
0
    maxlen = ~0;
260
0
  if (sink->type == SINK_TYPE_FD) {
261
0
    return fd_write_frag_line(sink->ctx.fd, MIN(maxlen, sink->maxlen), pfx, npfx, msg, nmsg, 1);
262
0
  }
263
0
  else if (sink->type == SINK_TYPE_BUFFER) {
264
0
    return ring_write(sink->ctx.ring, MIN(maxlen, sink->maxlen), pfx, npfx, msg, nmsg);
265
0
  }
266
0
  return 0;
267
0
}
268
269
/* Tries to emit a message indicating the number of dropped events.
270
 * The log header of the original message that we tried to emit is reused
271
 * here with the only difference that we override the log level. This is
272
 * possible since the announce message will be sent from the same context.
273
 *
274
 * In case of success, the amount of drops is reduced by as much.
275
 * The function ensures that a single thread will do that work at once, other
276
 * ones will only report a failure if a thread is dumping, so that no thread
277
 * waits. A pair od atomic OR and AND is performed around the code so the
278
 * caller would be advised to only call this function AFTER having verified
279
 * that sink->ctx.dropped is not zero in order to avoid a memory write. On
280
 * success, >0 is returned, otherwise <=0 on failure, indicating that it could
281
 * not eliminate the pending drop counter. It may loop up to 10 times trying
282
 * to catch up with failing competing threads.
283
 */
284
int sink_announce_dropped(struct sink *sink, struct log_header hdr)
285
0
{
286
0
  static THREAD_LOCAL char msg_dropped1[] = "1 event dropped";
287
0
  static THREAD_LOCAL char msg_dropped2[] = "0000000000 events dropped";
288
0
  uint dropped, last_dropped;
289
0
  struct ist msgvec[1];
290
0
  uint retries = 10;
291
0
  int ret = 0;
292
293
  /* Explanation. ctx.dropped is made of:
294
   *     bit0     = 1 if dropped dump in progress
295
   *     bit1..31 = dropped counter
296
   * If non-zero there have been some drops. If not &1, it means
297
   * nobody's taking care of them and we'll have to, otherwise
298
   * another thread is already on them and we can just pass and
299
   * count another drop (hence add 2).
300
   */
301
0
  dropped = HA_ATOMIC_LOAD(&sink->ctx.dropped);
302
0
  do {
303
0
    if (dropped & 1) {
304
      /* another thread was already on it */
305
0
      goto leave;
306
0
    }
307
0
  } while (!_HA_ATOMIC_CAS(&sink->ctx.dropped, &dropped, dropped | 1));
308
309
0
  last_dropped = 0;
310
0
  dropped >>= 1;
311
0
  while (1) {
312
0
    while (unlikely(dropped > last_dropped) && retries-- > 0) {
313
      /* try to aggregate multiple messages if other threads arrive while
314
       * we're producing the dropped message.
315
       */
316
0
      uint msglen = sizeof(msg_dropped1);
317
0
      const char *msg = msg_dropped1;
318
319
0
      last_dropped = dropped;
320
0
      if (dropped > 1) {
321
0
        msg = ultoa_r(dropped, msg_dropped2, 11);
322
0
        msg_dropped2[10] = ' ';
323
0
        msglen = msg_dropped2 + sizeof(msg_dropped2) - msg;
324
0
      }
325
0
      msgvec[0] = ist2(msg, msglen);
326
0
      dropped = HA_ATOMIC_LOAD(&sink->ctx.dropped) >> 1;
327
0
    }
328
329
0
    if (!dropped)
330
0
      break;
331
332
0
    last_dropped = 0;
333
0
    hdr.level = LOG_NOTICE; /* override level but keep original log header data */
334
335
0
    if (__sink_write(sink, hdr, 0, msgvec, 1) <= 0)
336
0
      goto done;
337
338
    /* success! */
339
0
    HA_ATOMIC_SUB(&sink->ctx.dropped, dropped << 1);
340
0
  }
341
342
  /* done! */
343
0
  ret = 1;
344
0
done:
345
  /* unlock the counter */
346
0
  HA_ATOMIC_AND(&sink->ctx.dropped, ~1);
347
0
leave:
348
0
  return ret;
349
0
}
350
351
/* parse the "show events" command, returns 1 if a message is returned, otherwise zero */
352
static int cli_parse_show_events(char **args, char *payload, struct appctx *appctx, void *private)
353
0
{
354
0
  struct sink *sink;
355
0
  uint ring_flags;
356
0
  int arg;
357
358
0
  args++; // make args[1] the 1st arg
359
360
0
  if (!*args[1]) {
361
    /* no arg => report the list of supported sink */
362
0
    chunk_printf(&trash, "Supported events sinks are listed below. Add -0(zero), -w(wait), -n(new). Any key to stop.\n");
363
0
    list_for_each_entry(sink, &sink_list, sink_list) {
364
0
      chunk_appendf(&trash, "    %-10s : type=%s, %u dropped, %s\n",
365
0
              sink->name,
366
0
              sink->type == SINK_TYPE_NEW ? "init" :
367
0
              sink->type == SINK_TYPE_FD ? "fd" :
368
0
              sink->type == SINK_TYPE_BUFFER ? "buffer" : "?",
369
0
              sink->ctx.dropped, sink->desc);
370
0
    }
371
372
0
    trash.area[trash.data] = 0;
373
0
    return cli_msg(appctx, LOG_WARNING, trash.area);
374
0
  }
375
376
0
  if (!cli_has_level(appctx, ACCESS_LVL_OPER))
377
0
    return 1;
378
379
0
  sink = sink_find(args[1]);
380
0
  if (!sink)
381
0
    return cli_err(appctx, "No such event sink");
382
383
0
  if (sink->type != SINK_TYPE_BUFFER)
384
0
    return cli_msg(appctx, LOG_NOTICE, "Nothing to report for this sink");
385
386
0
  ring_flags = 0;
387
0
  for (arg = 2; *args[arg]; arg++) {
388
0
    if (strcmp(args[arg], "-w") == 0)
389
0
      ring_flags |= RING_WF_WAIT_MODE;
390
0
    else if (strcmp(args[arg], "-n") == 0)
391
0
      ring_flags |= RING_WF_SEEK_NEW;
392
0
    else if (strcmp(args[arg], "-0") == 0)
393
0
      ring_flags |= RING_WF_END_ZERO;
394
0
    else if (strcmp(args[arg], "-nw") == 0 || strcmp(args[arg], "-wn") == 0)
395
0
      ring_flags |= RING_WF_WAIT_MODE | RING_WF_SEEK_NEW;
396
0
    else
397
0
      return cli_err(appctx, "unknown option");
398
0
  }
399
0
  return ring_attach_cli(sink->ctx.ring, appctx, ring_flags);
400
0
}
401
402
/* Pre-configures a ring proxy to emit connections */
403
void sink_setup_proxy(struct proxy *px)
404
0
{
405
0
  px->mode = PR_MODE_SYSLOG;
406
0
  px->maxconn = 0;
407
0
  px->conn_retries = 1; /* FIXME ignored since 91e785ed
408
                         * ("MINOR: stream: Rely on a per-stream max connection retries value")
409
                         * If this is really expected this should be set on the stream directly
410
                         * because the proxy lacks the CAP_FE so this setting is not considered
411
                         */
412
0
  px->timeout.server = TICK_ETERNITY;
413
0
  px->timeout.client = TICK_ETERNITY;
414
0
  px->timeout.connect = TICK_ETERNITY;
415
0
  px->accept = NULL;
416
0
  px->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
417
0
  px->next = sink_proxies_list;
418
0
  sink_proxies_list = px;
419
0
}
420
421
static void _sink_forward_io_handler(struct appctx *appctx,
422
                                     ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len, char delim))
423
0
{
424
0
  struct sink_forward_target *sft = appctx->svcctx;
425
0
  struct sink *sink = sft->sink;
426
0
  struct ring *ring = sink->ctx.ring;
427
0
  size_t ofs, last_ofs;
428
0
  size_t processed;
429
0
  int ret = 0;
430
431
0
  if (unlikely(applet_fl_test(appctx, APPCTX_FL_EOS|APPCTX_FL_ERROR)))
432
0
    goto out;
433
434
  /* if stopping was requested, close immediately */
435
0
  if (unlikely(stopping))
436
0
    goto soft_close;
437
438
  /* if the connection is not established, inform the stream that we want
439
   * to be notified whenever the connection completes.
440
   */
441
0
  if (se_fl_test(appctx->sedesc, SE_FL_APPLET_NEED_CONN)) {
442
0
    applet_need_more_data(appctx);
443
0
    applet_have_more_data(appctx);
444
0
    goto out;
445
0
  }
446
447
0
  if (!applet_get_outbuf(appctx)) {
448
0
    applet_have_more_data(appctx);
449
0
    goto out;
450
0
  }
451
452
0
  HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
453
0
  BUG_ON(appctx != sft->appctx);
454
455
0
  MT_LIST_DELETE(&appctx->wait_entry);
456
457
0
  ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0,
458
0
                               msg_handler, '\n', &processed);
459
0
  sft->e_processed += processed;
460
461
  /* if server's max-reuse is set (>= 0), destroy the applet once the
462
   * connection has been reused at least 'max-reuse' times, which means
463
   * it has processed at least 'max-reuse + 1' events (applet will
464
   * perform a new connection attempt)
465
   */
466
0
  if (sft->srv->max_reuse >= 0) {
467
0
    uint max_reuse = sft->srv->max_reuse + 1;
468
469
0
    if (max_reuse < sft->srv->max_reuse)
470
0
      max_reuse = sft->srv->max_reuse; // overflow, cap to max value
471
472
0
    if (sft->e_processed / max_reuse !=
473
0
        (sft->e_processed - processed) / max_reuse) {
474
0
      HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
475
0
      goto soft_close;
476
0
    }
477
0
  }
478
479
0
  if (ret) {
480
    /* let's be woken up once new data arrive */
481
0
    MT_LIST_APPEND(&ring->waiters, &appctx->wait_entry);
482
0
    ofs = ring_tail(ring);
483
0
    if (ofs != last_ofs) {
484
      /* more data was added into the ring between the
485
       * unlock and the lock, and the writer might not
486
       * have seen us. We need to reschedule a read.
487
       */
488
0
      applet_have_more_data(appctx);
489
0
    } else
490
0
      applet_have_no_more_data(appctx);
491
0
  }
492
0
  HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
493
494
0
out:
495
  /* always drain data from server */
496
0
  applet_reset_input(appctx);
497
0
  return;
498
499
0
soft_close:
500
  /* be careful: since the socket lacks the NOLINGER flag (on purpose)
501
   * soft_close will result in the port staying in TIME_WAIT state:
502
   * don't abuse from soft_close!
503
   */
504
0
  applet_set_eos(appctx);
505
506
  /* if required, hard_close could be achieve by using SE_FL_EOS|SE_FL_ERROR
507
   * flag combination: RST will be sent, TIME_WAIT will be avoided as if
508
   * we performed a normal close with NOLINGER flag set
509
   */
510
0
}
511
512
/*
513
 * IO Handler to handle message push to syslog tcp server.
514
 * It takes its context from appctx->svcctx.
515
 */
516
static inline void sink_forward_io_handler(struct appctx *appctx)
517
0
{
518
0
  _sink_forward_io_handler(appctx, applet_append_line);
519
0
}
520
521
/*
522
 * IO Handler to handle message push to syslog tcp server
523
 * using octet counting frames
524
 * It takes its context from appctx->svcctx.
525
 */
526
static inline void sink_forward_oc_io_handler(struct appctx *appctx)
527
0
{
528
0
  _sink_forward_io_handler(appctx, syslog_applet_append_event);
529
0
}
530
531
void __sink_forward_session_deinit(struct sink_forward_target *sft)
532
0
{
533
0
  struct sink *sink;
534
535
0
  sink = sft->sink;
536
0
  if (!sink)
537
0
    return;
538
539
0
  MT_LIST_DELETE(&sft->appctx->wait_entry);
540
541
0
  sft->appctx = NULL;
542
0
  task_wakeup(sink->forward_task, TASK_WOKEN_MSG);
543
0
}
544
545
static int sink_forward_session_init(struct appctx *appctx)
546
0
{
547
0
  struct sink_forward_target *sft = appctx->svcctx;
548
0
  struct stream *s;
549
0
  struct sockaddr_storage *addr = NULL;
550
551
  /* sft init is performed asynchronously so <sft> must be manipulated
552
   * under the lock
553
   */
554
0
  HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
555
556
0
  BUG_ON(sft->appctx != appctx);
557
558
0
  if (!sockaddr_alloc(&addr, &sft->srv->addr, sizeof(sft->srv->addr)))
559
0
    goto out_error;
560
  /* srv port should be learned from srv->svc_port not from srv->addr */
561
0
  set_host_port(addr, sft->srv->svc_port);
562
563
0
  if (appctx_finalize_startup(appctx, sft->srv->proxy, &BUF_NULL) == -1)
564
0
    goto out_free_addr;
565
566
0
  s = appctx_strm(appctx);
567
0
  s->scb->dst = addr;
568
0
  s->scb->flags |= (SC_FL_RCV_ONCE);
569
570
0
  stream_set_srv_target(s, sft->srv);
571
0
  s->flags = SF_ASSIGNED;
572
573
0
  s->do_log = NULL;
574
0
  s->uniq_id = 0;
575
576
0
  se_need_remote_conn(appctx->sedesc);
577
0
  applet_expect_no_data(appctx);
578
579
0
  HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
580
581
0
  return 0;
582
583
0
 out_free_addr:
584
0
  sockaddr_free(&addr);
585
0
 out_error:
586
0
  HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
587
0
  return -1;
588
0
}
589
590
static void sink_forward_session_release(struct appctx *appctx)
591
0
{
592
0
  struct sink_forward_target *sft = appctx->svcctx;
593
594
0
  if (!sft)
595
0
    return;
596
597
0
  HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
598
0
  BUG_ON(sft->appctx != appctx);
599
0
  __sink_forward_session_deinit(sft);
600
0
  HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
601
0
}
602
603
static struct applet sink_forward_applet = {
604
  .obj_type = OBJ_TYPE_APPLET,
605
  .flags = APPLET_FL_NEW_API,
606
  .name = "<SINKFWD>", /* used for logging */
607
  .fct = sink_forward_io_handler,
608
  .rcv_buf = appctx_raw_rcv_buf,
609
  .snd_buf = appctx_raw_snd_buf,
610
  .init = sink_forward_session_init,
611
  .release = sink_forward_session_release,
612
};
613
614
static struct applet sink_forward_oc_applet = {
615
  .obj_type = OBJ_TYPE_APPLET,
616
  .flags = APPLET_FL_NEW_API,
617
  .name = "<SINKFWDOC>", /* used for logging */
618
  .fct = sink_forward_oc_io_handler,
619
  .rcv_buf = appctx_raw_rcv_buf,
620
  .snd_buf = appctx_raw_snd_buf,
621
  .init = sink_forward_session_init,
622
  .release = sink_forward_session_release,
623
};
624
625
/*
626
 * Create a new peer session in assigned state (connect will start automatically)
627
 * It sets its context into appctx->svcctx.
628
 */
629
static struct appctx *sink_forward_session_create(struct sink *sink, struct sink_forward_target *sft)
630
0
{
631
0
  struct appctx *appctx;
632
0
  struct applet *applet = &sink_forward_applet;
633
0
  uint best_tid, best_load;
634
0
  int attempts, first;
635
636
0
  if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
637
0
    applet = &sink_forward_oc_applet;
638
639
0
  BUG_ON(!global.nbthread);
640
0
  attempts = MIN(global.nbthread, 3);
641
0
  first = 1;
642
643
  /* to shut gcc warning */
644
0
  best_tid = best_load = 0;
645
646
  /* to help spread the load over multiple threads, try to find a
647
   * non-overloaded thread by picking a random thread and checking
648
   * its load. If we fail to find a non-overloaded thread after 3
649
   * attempts, let's pick the least overloaded one.
650
   */
651
0
  while (attempts-- > 0) {
652
0
    uint cur_tid;
653
0
    uint cur_load;
654
655
0
    cur_tid = statistical_prng_range(global.nbthread);
656
0
    cur_load = HA_ATOMIC_LOAD(&ha_thread_ctx[cur_tid].rq_total);
657
658
0
    if (first || cur_load < best_load) {
659
0
      best_tid = cur_tid;
660
0
      best_load = cur_load;
661
0
    }
662
0
    first = 0;
663
664
    /* if we already found a non-overloaded thread, stop now */
665
0
    if (HA_ATOMIC_LOAD(&ha_thread_ctx[best_tid].rq_total) < 3)
666
0
      break;
667
0
  }
668
669
0
  appctx = appctx_new_on(applet, NULL, best_tid);
670
0
  if (!appctx)
671
0
    goto out_close;
672
0
  appctx->svcctx = (void *)sft;
673
0
  appctx_wakeup(appctx);
674
0
  sft->last_conn = now_ms;
675
0
  return appctx;
676
677
  /* Error unrolling */
678
0
 out_close:
679
0
  return NULL;
680
0
}
681
682
/*
683
 * Task to handle connections to forward servers
684
 */
685
static struct task *process_sink_forward(struct task * task, void *context, unsigned int state)
686
0
{
687
0
  struct sink *sink = (struct sink *)context;
688
0
  struct sink_forward_target *sft = sink->sft;
689
690
0
  task->expire = TICK_ETERNITY;
691
692
0
  if (!stopping) {
693
0
    while (sft) {
694
0
      HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
695
      /* If appctx is NULL, start a new session and perform the appctx
696
       * assignment right away since the applet is not supposed to change
697
       * during the session lifetime. By doing the assignment now we
698
       * make sure to start the session exactly once.
699
       *
700
       * We enforce a tempo to ensure we don't perform more than 1 session
701
       * establishment attempt per second.
702
       */
703
0
      if (!sft->appctx) {
704
0
        int tempo = tick_add(sft->last_conn, MS_TO_TICKS(1000));
705
706
0
        if (sft->last_conn == TICK_ETERNITY || tick_is_expired(tempo, now_ms))
707
0
          sft->appctx = sink_forward_session_create(sink, sft);
708
0
        else if (task->expire == TICK_ETERNITY)
709
0
          task->expire = tempo;
710
0
        else
711
0
          task->expire = tick_first(task->expire, tempo);
712
0
      }
713
0
      HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
714
0
      sft = sft->next;
715
0
    }
716
0
  }
717
0
  else {
718
0
    while (sft) {
719
0
      HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
720
      /* awake applet to perform a clean close */
721
0
      if (sft->appctx)
722
0
        appctx_wakeup(sft->appctx);
723
0
      HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
724
0
      sft = sft->next;
725
0
    }
726
0
  }
727
728
0
  return task;
729
0
}
730
/*
731
 * Init task to manage connections to forward servers
732
 *
733
 * returns 0 in case of error.
734
 */
735
int sink_init_forward(struct sink *sink)
736
0
{
737
0
  sink->forward_task = task_new_anywhere();
738
0
  if (!sink->forward_task)
739
0
    return 0;
740
741
0
  sink->forward_task->process = process_sink_forward;
742
0
  sink->forward_task->context = (void *)sink;
743
0
  sink->forward_sighandler = signal_register_task(0, sink->forward_task, 0);
744
0
  task_wakeup(sink->forward_task, TASK_WOKEN_INIT);
745
0
  return 1;
746
0
}
747
748
/* This tries to rotate a file-backed ring, but only if it contains contents.
749
 * This way empty rings will not cause backups to be overwritten and it's safe
750
 * to reload multiple times. That's only best effort, failures are silently
751
 * ignored.
752
 */
753
void sink_rotate_file_backed_ring(const char *name)
754
0
{
755
0
  struct ring_storage storage;
756
0
  char *oldback;
757
0
  int ret;
758
0
  int fd;
759
760
0
  fd = open(name, O_RDONLY);
761
0
  if (fd < 0)
762
0
    return;
763
764
  /* check for contents validity */
765
0
  ret = read(fd, &storage, sizeof(storage));
766
0
  close(fd);
767
768
0
  if (ret != sizeof(storage))
769
0
    goto rotate;
770
771
  /* check that it's the expected format before touching it */
772
0
  if (storage.rsvd != sizeof(storage))
773
0
    return;
774
775
  /* contents are present, we want to keep them => rotate. Note that
776
   * an empty ring buffer has one byte (the marker).
777
   */
778
0
  if (storage.head != 0 || storage.tail != 1)
779
0
    goto rotate;
780
781
  /* nothing to keep, let's scratch the file and preserve the backup */
782
0
  return;
783
784
0
 rotate:
785
0
  oldback = NULL;
786
0
  memprintf(&oldback, "%s.bak", name);
787
0
  if (oldback) {
788
    /* try to rename any possibly existing ring file to
789
     * ".bak" and delete remains of older ones. This will
790
     * ensure we don't wipe useful debug info upon restart.
791
     */
792
0
    unlink(oldback);
793
0
    if (rename(name, oldback) < 0)
794
0
      unlink(oldback);
795
0
    ha_free(&oldback);
796
0
  }
797
0
}
798
799
800
/* helper function to completely deallocate a sink struct
801
 */
802
static void sink_free(struct sink *sink)
803
0
{
804
0
  struct sink_forward_target *sft_next;
805
806
0
  if (!sink)
807
0
    return;
808
0
  if (sink->type == SINK_TYPE_BUFFER) {
809
0
    if (sink->store) {
810
0
      size_t size = (ring_allocated_size(sink->ctx.ring) + 4095UL) & -4096UL;
811
0
      void *area = ring_allocated_area(sink->ctx.ring);
812
813
0
      msync(area, size, MS_SYNC);
814
0
      munmap(area, size);
815
0
      ha_free(&sink->store);
816
0
    }
817
0
    ring_free(sink->ctx.ring);
818
0
  }
819
0
  LIST_DEL_INIT(&sink->sink_list); // remove from parent list
820
0
  task_destroy(sink->forward_task);
821
0
  free_proxy(sink->forward_px);
822
0
  ha_free(&sink->name);
823
0
  ha_free(&sink->desc);
824
0
  while (sink->sft) {
825
0
    sft_next = sink->sft->next;
826
0
    ha_free(&sink->sft);
827
0
    sink->sft = sft_next;
828
0
  }
829
0
  ha_free(&sink);
830
0
}
831
832
/* Helper function to create new high-level ring buffer (as in ring section from
833
 * the config): will create a new sink of buf type, and a new forward proxy,
834
 * which will be stored in forward_px to know that the sink is responsible for
835
 * it.
836
 *
837
 * Returns NULL on failure
838
 */
839
static struct sink *sink_new_ringbuf(const char *id, const char *description,
840
                                     const char *file, int linenum, char **err_msg)
841
0
{
842
0
  struct sink *sink;
843
0
  struct proxy *p = NULL; // forward_px
844
845
  /* allocate new proxy to handle forwards, mark it as internal proxy
846
   * because we don't want haproxy to do the automatic syslog backend
847
   * init, instead we will manage it by hand
848
   */
849
0
  p = alloc_new_proxy(id, PR_CAP_BE|PR_CAP_INT, err_msg);
850
0
  if (!p)
851
0
    goto err;
852
853
0
  sink_setup_proxy(p);
854
0
  p->conf.args.file = p->conf.file = copy_file_name(file);
855
0
  p->conf.args.line = p->conf.line = linenum;
856
857
0
  sink = sink_new_buf(id, description, LOG_FORMAT_RAW, BUFSIZE);
858
0
  if (!sink) {
859
0
    memprintf(err_msg, "unable to create a new sink buffer for ring '%s'", id);
860
0
    goto err;
861
0
  }
862
863
  /* link sink to proxy */
864
0
  sink->forward_px = p;
865
866
0
  return sink;
867
868
0
 err:
869
0
  free_proxy(p);
870
0
  return NULL;
871
0
}
872
873
/* helper function: add a new server to an existing sink
874
 *
875
 * Returns 1 on success and 0 on failure
876
 */
877
static int sink_add_srv(struct sink *sink, struct server *srv)
878
0
{
879
0
  struct sink_forward_target *sft;
880
881
  /* allocate new sink_forward_target descriptor */
882
0
  sft = calloc(1, sizeof(*sft));
883
0
  if (!sft) {
884
0
    ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n", srv->id, sink->name);
885
0
    return 0;
886
0
  }
887
0
  sft->srv = srv;
888
0
  sft->appctx = NULL;
889
0
  sft->ofs = ~0; /* init ring offset */
890
0
  sft->sink = sink;
891
0
  sft->next = sink->sft;
892
0
  HA_SPIN_INIT(&sft->lock);
893
894
  /* mark server attached to the ring */
895
0
  if (!ring_attach(sink->ctx.ring)) {
896
0
    ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, sink->name);
897
0
    ha_free(&sft);
898
0
    return 0;
899
0
  }
900
0
  sink->sft = sft;
901
0
  return 1;
902
0
}
903
904
/* Finalize sink struct to ensure configuration consistency and
905
 * allocate final struct members
906
 *
907
 * Returns ERR_NONE on success, ERR_WARN on warning
908
 * Returns a composition of ERR_ALERT, ERR_ABORT, ERR_FATAL on failure
909
 */
910
static int sink_finalize(struct sink *sink)
911
0
{
912
0
  int err_code = ERR_NONE;
913
0
  struct server *srv;
914
915
0
  if (sink && (sink->type == SINK_TYPE_BUFFER)) {
916
0
    if (!sink->maxlen)
917
0
      sink->maxlen = ~0; // maxlen not set: no implicit truncation
918
0
    else if (sink->maxlen > ring_max_payload(sink->ctx.ring)) {
919
      /* maxlen set by user however it doesn't fit: set to max value */
920
0
      ha_warning("ring '%s' event max length '%u' exceeds max payload size, forced to '%lu'.\n",
921
0
                 sink->name, sink->maxlen, (unsigned long)ring_max_payload(sink->ctx.ring));
922
0
      sink->maxlen = ring_max_payload(sink->ctx.ring);
923
0
      err_code |= ERR_WARN;
924
0
    }
925
926
    /* prepare forward server descriptors */
927
0
    if (sink->forward_px) {
928
      /* sink proxy is set: register all servers from the proxy */
929
0
      srv = sink->forward_px->srv;
930
0
      while (srv) {
931
0
        if (!sink_add_srv(sink, srv)) {
932
0
          err_code |= ERR_ALERT | ERR_FATAL;
933
0
          break;
934
0
        }
935
0
        srv = srv->next;
936
0
      }
937
0
    }
938
    /* init forwarding if at least one sft is registered */
939
0
    if (sink->sft && sink_init_forward(sink) == 0) {
940
0
      ha_alert("error when trying to initialize sink buffer forwarding.\n");
941
0
      err_code |= ERR_ALERT | ERR_FATAL;
942
0
    }
943
0
    if (!sink->store) {
944
      /* virtual memory backed sink */
945
0
      vma_set_name(ring_allocated_area(sink->ctx.ring),
946
0
                   ring_allocated_size(sink->ctx.ring),
947
0
                   "ring", sink->name);
948
0
    }
949
0
  }
950
0
  return err_code;
951
0
}
952
953
/*
954
 * Parse "ring" section and create corresponding sink buffer.
955
 *
956
 * The function returns 0 in success case, otherwise, it returns error
957
 * flags.
958
 */
959
int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
960
0
{
961
0
  int err_code = 0;
962
0
  char *err_msg = NULL;
963
0
  const char *inv;
964
965
0
  if (strcmp(args[0], "ring") == 0) { /* new ring section */
966
0
    if (!*args[1]) {
967
0
      ha_alert("parsing [%s:%d] : missing ring name.\n", file, linenum);
968
0
      err_code |= ERR_ALERT | ERR_FATAL;
969
0
      goto err;
970
0
    }
971
972
0
    inv = invalid_char(args[1]);
973
0
    if (inv) {
974
0
      ha_alert("parsing [%s:%d] : invalid ring name '%s' (character '%c' is not permitted).\n", file, linenum, args[1], *inv);
975
0
      err_code |= ERR_ALERT | ERR_FATAL;
976
0
      goto err;
977
0
    }
978
979
0
    if (sink_find(args[1])) {
980
0
      ha_alert("parsing [%s:%d] : sink named '%s' already exists.\n", file, linenum, args[1]);
981
0
      err_code |= ERR_ALERT | ERR_FATAL;
982
0
      goto err;
983
0
    }
984
985
0
    cfg_sink = sink_new_ringbuf(args[1], args[1], file, linenum, &err_msg);
986
0
    if (!cfg_sink) {
987
0
      ha_alert("parsing [%s:%d] : %s.\n", file, linenum, err_msg);
988
0
      ha_free(&err_msg);
989
0
      err_code |= ERR_ALERT | ERR_FATAL;
990
0
      goto err;
991
0
    }
992
993
    /* set maxlen value to 0 for now, we rely on this in postparsing
994
     * to know if it was explicitly set using the "maxlen" parameter
995
     */
996
0
    cfg_sink->maxlen = 0;
997
0
  }
998
0
  else if (strcmp(args[0], "size") == 0) {
999
0
    size_t size;
1000
1001
0
    if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) {
1002
0
      ha_alert("parsing [%s:%d] : 'size' directive not usable with this type of sink.\n", file, linenum);
1003
0
      err_code |= ERR_ALERT | ERR_FATAL;
1004
0
      goto err;
1005
0
    }
1006
1007
0
    if (parse_size_err(args[1], &size) != NULL || !size) {
1008
0
      ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
1009
0
      err_code |= ERR_ALERT | ERR_FATAL;
1010
0
      goto err;
1011
0
    }
1012
1013
0
    if (size > RING_TAIL_LOCK) {
1014
0
      ha_alert("parsing [%s:%d] : too large size '%llu' for new sink buffer, the limit on this platform is %llu bytes.\n", file, linenum, (ullong)size, (ullong)RING_TAIL_LOCK);
1015
0
      err_code |= ERR_ALERT | ERR_FATAL;
1016
0
      goto err;
1017
0
    }
1018
1019
0
    if (cfg_sink->store) {
1020
0
      ha_alert("parsing [%s:%d] : cannot resize an already mapped file, please specify 'size' before 'backing-file'.\n", file, linenum);
1021
0
      err_code |= ERR_ALERT | ERR_FATAL;
1022
0
      goto err;
1023
0
    }
1024
1025
0
    if (size < ring_data(cfg_sink->ctx.ring)) {
1026
0
      ha_warning("parsing [%s:%d] : ignoring new size '%llu' that is smaller than contents '%llu' for ring '%s'.\n",
1027
0
           file, linenum, (ullong)size, (ullong)ring_data(cfg_sink->ctx.ring), cfg_sink->name);
1028
0
      err_code |= ERR_WARN;
1029
0
      goto err;
1030
0
    }
1031
1032
0
    if (!ring_resize(cfg_sink->ctx.ring, size)) {
1033
0
      ha_alert("parsing [%s:%d] : fail to set sink buffer size '%llu' for ring '%s'.\n", file, linenum,
1034
0
         (ullong)ring_size(cfg_sink->ctx.ring), cfg_sink->name);
1035
0
      err_code |= ERR_ALERT | ERR_FATAL;
1036
0
      goto err;
1037
0
    }
1038
0
  }
1039
0
  else if (strcmp(args[0], "backing-file") == 0) {
1040
    /* This tries to mmap file <file> for size <size> and to use it as a backing store
1041
     * for ring <ring>. Existing data are delete. NULL is returned on error.
1042
     */
1043
0
    const char *backing = args[1];
1044
0
    size_t size;
1045
0
    void *area;
1046
0
    int fd;
1047
1048
0
    if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) {
1049
0
      ha_alert("parsing [%s:%d] : 'backing-file' only usable with existing rings.\n", file, linenum);
1050
0
      err_code |= ERR_ALERT | ERR_FATAL;
1051
0
      goto err;
1052
0
    }
1053
1054
0
    if (cfg_sink->store) {
1055
0
      ha_alert("parsing [%s:%d] : 'backing-file' already specified for ring '%s' (was '%s').\n", file, linenum, cfg_sink->name, cfg_sink->store);
1056
0
      err_code |= ERR_ALERT | ERR_FATAL;
1057
0
      goto err;
1058
0
    }
1059
1060
    /* let's check if the file exists and is not empty. That's the
1061
     * only condition under which we'll trigger a rotate, so that
1062
     * config checks, reloads, or restarts that don't emit anything
1063
     * do not rotate it again.
1064
     */
1065
0
    sink_rotate_file_backed_ring(backing);
1066
1067
0
    fd = open(backing, O_RDWR | O_CREAT, 0600);
1068
0
    if (fd < 0) {
1069
0
      ha_alert("parsing [%s:%d] : cannot open backing-file '%s' for ring '%s': %s.\n", file, linenum, backing, cfg_sink->name, strerror(errno));
1070
0
      err_code |= ERR_ALERT | ERR_FATAL;
1071
0
      goto err;
1072
0
    }
1073
1074
0
    size = (ring_size(cfg_sink->ctx.ring) + 4095UL) & -4096UL;
1075
0
    if (ftruncate(fd, size) != 0) {
1076
0
      close(fd);
1077
0
      ha_alert("parsing [%s:%d] : could not adjust size of backing-file for ring '%s': %s.\n", file, linenum, cfg_sink->name, strerror(errno));
1078
0
      err_code |= ERR_ALERT | ERR_FATAL;
1079
0
      goto err;
1080
0
    }
1081
1082
0
    area = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1083
0
    if (area == MAP_FAILED) {
1084
0
      close(fd);
1085
0
      ha_alert("parsing [%s:%d] : failed to use '%s' as a backing file for ring '%s': %s.\n", file, linenum, backing, cfg_sink->name, strerror(errno));
1086
0
      err_code |= ERR_ALERT | ERR_FATAL;
1087
0
      goto err;
1088
0
    }
1089
1090
    /* we don't need the file anymore */
1091
0
    close(fd);
1092
0
    cfg_sink->store = strdup(backing);
1093
1094
    /* never fails */
1095
0
    ring_free(cfg_sink->ctx.ring);
1096
0
    cfg_sink->ctx.ring = ring_make_from_area(area, size, 1);
1097
0
  }
1098
0
  else if (strcmp(args[0],"server") == 0) {
1099
0
    if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) {
1100
0
      ha_alert("parsing [%s:%d] : unable to create server '%s'.\n", file, linenum, args[1]);
1101
0
      err_code |= ERR_ALERT | ERR_FATAL;
1102
0
      goto err;
1103
0
    }
1104
1105
0
    err_code |= parse_server(file, linenum, args, cfg_sink->forward_px, NULL,
1106
0
                             SRV_PARSE_PARSE_ADDR|SRV_PARSE_INITIAL_RESOLVE);
1107
1108
0
    if (err_code & ERR_CODE)
1109
0
      goto err;
1110
0
  }
1111
0
  else if (strcmp(args[0],"timeout") == 0) {
1112
0
    if (!cfg_sink || !cfg_sink->forward_px) {
1113
0
      ha_alert("parsing [%s:%d] : unable to set timeout '%s'.\n", file, linenum, args[1]);
1114
0
      err_code |= ERR_ALERT | ERR_FATAL;
1115
0
      goto err;
1116
0
    }
1117
1118
0
                if (strcmp(args[1], "connect") == 0 ||
1119
0
        strcmp(args[1], "server") == 0) {
1120
0
      const char *res;
1121
0
      unsigned int tout;
1122
1123
0
      if (!*args[2]) {
1124
0
        ha_alert("parsing [%s:%d] : '%s %s' expects <time> as argument.\n",
1125
0
           file, linenum, args[0], args[1]);
1126
0
        err_code |= ERR_ALERT | ERR_FATAL;
1127
0
        goto err;
1128
0
      }
1129
0
      res = parse_time_err(args[2], &tout, TIME_UNIT_MS);
1130
0
      if (res == PARSE_TIME_OVER) {
1131
0
        ha_alert("parsing [%s:%d]: timer overflow in argument <%s> to <%s %s>, maximum value is 2147483647 ms (~24.8 days).\n",
1132
0
           file, linenum, args[2], args[0], args[1]);
1133
0
        err_code |= ERR_ALERT | ERR_FATAL;
1134
0
        goto err;
1135
0
      }
1136
0
      else if (res == PARSE_TIME_UNDER) {
1137
0
        ha_alert("parsing [%s:%d]: timer underflow in argument <%s> to <%s %s>, minimum non-null value is 1 ms.\n",
1138
0
           file, linenum, args[2], args[0], args[1]);
1139
0
        err_code |= ERR_ALERT | ERR_FATAL;
1140
0
        goto err;
1141
0
      }
1142
0
      else if (res) {
1143
0
        ha_alert("parsing [%s:%d]: unexpected character '%c' in argument to <%s %s>.\n",
1144
0
           file, linenum, *res, args[0], args[1]);
1145
0
        err_code |= ERR_ALERT | ERR_FATAL;
1146
0
        goto err;
1147
0
      }
1148
0
                        if (args[1][0] == 'c')
1149
0
                                cfg_sink->forward_px->timeout.connect = tout;
1150
0
                        else
1151
0
                                cfg_sink->forward_px->timeout.server = tout;
1152
0
    }
1153
0
  }
1154
0
  else if (strcmp(args[0],"format") == 0) {
1155
0
    if (!cfg_sink) {
1156
0
      ha_alert("parsing [%s:%d] : unable to set format '%s'.\n", file, linenum, args[1]);
1157
0
      err_code |= ERR_ALERT | ERR_FATAL;
1158
0
      goto err;
1159
0
    }
1160
1161
0
    cfg_sink->fmt = get_log_format(args[1]);
1162
0
    if (cfg_sink->fmt == LOG_FORMAT_UNSPEC) {
1163
0
      ha_alert("parsing [%s:%d] : unknown format '%s'.\n", file, linenum, args[1]);
1164
0
      err_code |= ERR_ALERT | ERR_FATAL;
1165
0
      goto err;
1166
0
    }
1167
0
  }
1168
0
  else if (strcmp(args[0],"maxlen") == 0) {
1169
0
    if (!cfg_sink) {
1170
0
      ha_alert("parsing [%s:%d] : unable to set event max length '%s'.\n", file, linenum, args[1]);
1171
0
      err_code |= ERR_ALERT | ERR_FATAL;
1172
0
      goto err;
1173
0
    }
1174
1175
0
    cfg_sink->maxlen = atol(args[1]);
1176
0
    if (!cfg_sink->maxlen) {
1177
0
      ha_alert("parsing [%s:%d] : invalid size '%s' for new sink buffer.\n", file, linenum, args[1]);
1178
0
      err_code |= ERR_ALERT | ERR_FATAL;
1179
0
      goto err;
1180
0
    }
1181
0
  }
1182
0
  else if (strcmp(args[0],"description") == 0) {
1183
0
    if (!cfg_sink) {
1184
0
      ha_alert("parsing [%s:%d] : unable to set description '%s'.\n", file, linenum, args[1]);
1185
0
      err_code |= ERR_ALERT | ERR_FATAL;
1186
0
      goto err;
1187
0
    }
1188
1189
0
    if (!*args[1]) {
1190
0
      ha_alert("parsing [%s:%d] : missing ring description text.\n", file, linenum);
1191
0
      err_code |= ERR_ALERT | ERR_FATAL;
1192
0
      goto err;
1193
0
    }
1194
1195
0
    free(cfg_sink->desc);
1196
1197
0
    cfg_sink->desc = strdup(args[1]);
1198
0
    if (!cfg_sink->desc) {
1199
0
      ha_alert("parsing [%s:%d] : fail to set description '%s'.\n", file, linenum, args[1]);
1200
0
      err_code |= ERR_ALERT | ERR_FATAL;
1201
0
      goto err;
1202
0
    }
1203
0
  }
1204
0
  else {
1205
0
    ha_alert("parsing [%s:%d] : unknown statement '%s'.\n", file, linenum, args[0]);
1206
0
    err_code |= ERR_ALERT | ERR_FATAL;
1207
0
    goto err;
1208
0
  }
1209
1210
0
err:
1211
0
  return err_code;
1212
0
}
1213
1214
/* Creates a new sink buffer from a logger.
1215
 *
1216
 * It uses the logger's address to declare a forward
1217
 * server for this buffer. And it initializes the
1218
 * forwarding.
1219
 *
1220
 * The function returns a pointer on the
1221
 * allocated struct sink if allocate
1222
 * and initialize succeed, else if it fails
1223
 * it returns NULL.
1224
 *
1225
 * Note: the sink is created using the name
1226
 *       specified into logger->target.ring_name
1227
 */
1228
struct sink *sink_new_from_logger(struct logger *logger)
1229
0
{
1230
0
  struct sink *sink = NULL;
1231
0
  struct server *srv = NULL;
1232
0
  char *err_msg = NULL;
1233
1234
  /* prepare description for the sink */
1235
0
  chunk_reset(&trash);
1236
0
  chunk_printf(&trash, "created from log directive declared into '%s' at line %d", logger->conf.file, logger->conf.line);
1237
1238
  /* allocate a new sink buffer */
1239
0
  sink = sink_new_ringbuf(logger->target.ring_name, trash.area, logger->conf.file, logger->conf.line, &err_msg);
1240
0
  if (!sink) {
1241
0
    ha_alert("%s.\n", err_msg);
1242
0
    ha_free(&err_msg);
1243
0
    goto error;
1244
0
  }
1245
1246
  /* ring format normally defaults to RAW, but here we set ring format
1247
   * to UNSPEC to inherit from caller format in sink_write() since we
1248
   * cannot customize implicit ring settings
1249
   */
1250
0
  sink->fmt = LOG_FORMAT_UNSPEC;
1251
1252
  /* for the same reason, we disable sink->maxlen to inherit from caller
1253
   * maxlen in sink_write()
1254
   */
1255
0
  sink->maxlen = 0;
1256
1257
  /* Set default connect and server timeout for sink forward proxy */
1258
0
  sink->forward_px->timeout.connect = MS_TO_TICKS(1000);
1259
0
  sink->forward_px->timeout.server = MS_TO_TICKS(5000);
1260
1261
  /* allocate a new server to forward messages
1262
   * from ring buffer
1263
   */
1264
0
  srv = new_server(sink->forward_px);
1265
0
  if (!srv)
1266
0
    goto error;
1267
1268
  /* init server */
1269
0
  srv->id = strdup(logger->target.ring_name);
1270
0
  srv->conf.file = strdup(logger->conf.file);
1271
0
  srv->conf.line = logger->conf.line;
1272
0
  srv->addr = *logger->target.addr;
1273
0
  srv->svc_port = get_host_port(logger->target.addr);
1274
0
  HA_SPIN_INIT(&srv->lock);
1275
1276
0
  if (sink_finalize(sink) & ERR_CODE)
1277
0
    goto error;
1278
1279
0
  return sink;
1280
1281
0
 error:
1282
0
  sink_free(sink);
1283
1284
0
  return NULL;
1285
0
}
1286
1287
/* This function is pretty similar to sink_from_logger():
1288
 * But instead of creating a forward proxy and server from a logger struct
1289
 * it uses already existing srv to create the forwarding sink, so most of
1290
 * the initialization is bypassed.
1291
 *
1292
 * The function returns a pointer on the
1293
 * allocated struct sink if allocate
1294
 * and initialize succeed, else if it fails
1295
 * it returns NULL.
1296
 *
1297
 * <from> allows to specify a string that will be inserted into the sink
1298
 * description to describe where it was created from.
1299
1300
 * Note: the sink is created using the name
1301
 *       specified into srv->id
1302
 */
1303
struct sink *sink_new_from_srv(struct server *srv, const char *from)
1304
0
{
1305
0
  struct sink *sink = NULL;
1306
0
  int bufsize = (srv->log_bufsize) ? srv->log_bufsize : BUFSIZE;
1307
0
  char *sink_name = NULL;
1308
1309
  /* prepare description for the sink */
1310
0
  chunk_reset(&trash);
1311
0
  chunk_printf(&trash, "created from %s declared into '%s' at line %d", from, srv->conf.file, srv->conf.line);
1312
1313
0
  memprintf(&sink_name, "%s/%s", srv->proxy->id, srv->id);
1314
0
  if (!sink_name) {
1315
0
    ha_alert("memory error while creating ring buffer for server '%s/%s'.\n", srv->proxy->id, srv->id);
1316
0
    goto error;
1317
0
  }
1318
1319
  /* directly create a sink of BUF type, and use UNSPEC log format to
1320
   * inherit from caller fmt in sink_write()
1321
   *
1322
   * sink_name must be unique to prevent existing sink from being reused
1323
   */
1324
0
  sink = sink_new_buf(sink_name, trash.area, LOG_FORMAT_UNSPEC, bufsize);
1325
0
  ha_free(&sink_name); // no longer needed
1326
1327
0
  if (!sink) {
1328
0
    ha_alert("unable to create a new sink buffer for server '%s/%s'.\n", srv->proxy->id, srv->id);
1329
0
    goto error;
1330
0
  }
1331
1332
  /* we disable sink->maxlen to inherit from caller
1333
   * maxlen in sink_write()
1334
   */
1335
0
  sink->maxlen = 0;
1336
1337
  /* add server to sink */
1338
0
  if (!sink_add_srv(sink, srv))
1339
0
    goto error;
1340
1341
0
  if (sink_finalize(sink) & ERR_CODE)
1342
0
    goto error;
1343
1344
0
  return sink;
1345
1346
0
 error:
1347
0
  sink_free(sink);
1348
1349
0
  return NULL;
1350
0
}
1351
1352
/*
1353
 * Post parsing "ring" section.
1354
 *
1355
 * The function returns 0 in success case, otherwise, it returns error
1356
 * flags.
1357
 */
1358
int cfg_post_parse_ring()
1359
0
{
1360
0
  int err_code;
1361
1362
0
  err_code = sink_finalize(cfg_sink);
1363
0
  cfg_sink = NULL;
1364
1365
0
  return err_code;
1366
0
}
1367
1368
/* function: resolve a single logger target of BUFFER type
1369
 *
1370
 * Returns err_code which defaults to ERR_NONE and can be set to a combination
1371
 * of ERR_WARN, ERR_ALERT, ERR_FATAL and ERR_ABORT in case of errors.
1372
 * <msg> could be set at any time (it will usually be set on error, but
1373
 * could also be set when no error occurred to report a diag warning), thus is
1374
 * up to the caller to check it and to free it.
1375
 */
1376
int sink_resolve_logger_buffer(struct logger *logger, char **msg)
1377
0
{
1378
0
  struct log_target *target = &logger->target;
1379
0
  int err_code = ERR_NONE;
1380
0
  struct sink *sink;
1381
1382
0
  BUG_ON(target->type != LOG_TARGET_BUFFER || (target->flags & LOG_TARGET_FL_RESOLVED));
1383
0
  if (target->addr) {
1384
0
    sink = sink_new_from_logger(logger);
1385
0
    if (!sink) {
1386
0
      memprintf(msg, "cannot be initialized (failed to create implicit ring)");
1387
0
      err_code |= ERR_ALERT | ERR_FATAL;
1388
0
      goto end;
1389
0
    }
1390
0
    ha_free(&target->addr); /* we no longer need this */
1391
0
  }
1392
0
  else {
1393
0
    sink = sink_find(target->ring_name);
1394
0
    if (!sink) {
1395
0
      memprintf(msg, "uses unknown ring named '%s'", target->ring_name);
1396
0
      err_code |= ERR_ALERT | ERR_FATAL;
1397
0
      goto end;
1398
0
    }
1399
0
    else if (sink->type != SINK_TYPE_BUFFER) {
1400
0
      memprintf(msg, "uses incompatible ring '%s'", target->ring_name);
1401
0
      err_code |= ERR_ALERT | ERR_FATAL;
1402
0
      goto end;
1403
0
    }
1404
0
  }
1405
  /* consistency checks */
1406
0
  if (sink && logger->maxlen > ring_max_payload(sink->ctx.ring)) {
1407
0
    memprintf(msg, "uses a max length which exceeds ring capacity ('%s' supports %lu bytes at most)",
1408
0
              target->ring_name, (unsigned long)ring_max_payload(sink->ctx.ring));
1409
0
  }
1410
0
  else if (sink && logger->maxlen > sink->maxlen) {
1411
0
    memprintf(msg, "uses a ring with a smaller maxlen than the one specified on the log directive ('%s' has maxlen = %d), logs will be truncated according to the lowest maxlen between the two",
1412
0
              target->ring_name, sink->maxlen);
1413
0
  }
1414
0
 end:
1415
0
  ha_free(&target->ring_name); /* sink is resolved and will replace ring_name hint */
1416
0
  target->sink = sink;
1417
0
  return err_code;
1418
0
}
1419
1420
static void sink_init()
1421
0
{
1422
0
  sink_new_fd("stdout", "standard output (fd#1)", LOG_FORMAT_RAW, 1);
1423
0
  sink_new_fd("stderr", "standard output (fd#2)", LOG_FORMAT_RAW, 2);
1424
0
  sink_new_buf("buf0",  "in-memory ring buffer", LOG_FORMAT_TIMED, 1048576);
1425
0
  sink_new_buf("dpapi",  "DPAPI ring buffer", LOG_FORMAT_TIMED, 1048576);
1426
0
}
1427
1428
static int sink_postcheck()
1429
0
{
1430
0
  struct sink *sink;
1431
1432
0
  list_for_each_entry(sink, &sink_list, sink_list) {
1433
0
    if (sink->type == SINK_TYPE_FORWARD_DECLARED) {
1434
      /* sink wasn't upgraded to actual sink despite being
1435
       * forward-declared: it is an error (the sink doesn't
1436
       * really exist)
1437
       */
1438
0
      ha_alert("%s: sink '%s' doesn't exist.\n", sink->desc, sink->name);
1439
0
      return ERR_ALERT | ERR_FATAL;
1440
0
    }
1441
0
  }
1442
0
  return ERR_NONE;
1443
0
}
1444
1445
static void sink_deinit()
1446
0
{
1447
0
  struct sink *sink, *sb;
1448
1449
0
  list_for_each_entry_safe(sink, sb, &sink_list, sink_list)
1450
0
    sink_free(sink);
1451
0
}
1452
1453
INITCALL0(STG_REGISTER, sink_init);
1454
REGISTER_POST_CHECK(sink_postcheck);
1455
REGISTER_POST_DEINIT(sink_deinit);
1456
1457
static struct cli_kw_list cli_kws = {{ },{
1458
  { { "show", "events", NULL }, "show events [<sink>] [-w] [-n] [-0]     : show event sink state", cli_parse_show_events, NULL, NULL },
1459
  {{},}
1460
}};
1461
1462
INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);
1463
1464
/* config parsers for this section */
1465
REGISTER_CONFIG_SECTION("ring", cfg_parse_ring, cfg_post_parse_ring);
1466
1467
/*
1468
 * Local variables:
1469
 *  c-indent-level: 8
1470
 *  c-basic-offset: 8
1471
 * End:
1472
 */