Coverage Report

Created: 2026-01-17 06:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/haproxy/src/peers.c
Line
Count
Source
1
/*
2
 * Peer synchro management.
3
 *
4
 * Copyright 2010 EXCELIANCE, Emeric Brun <ebrun@exceliance.fr>
5
 *
6
 * This program is free software; you can redistribute it and/or
7
 * modify it under the terms of the GNU General Public License
8
 * as published by the Free Software Foundation; either version
9
 * 2 of the License, or (at your option) any later version.
10
 *
11
 */
12
13
#include <errno.h>
14
#include <stdio.h>
15
#include <stdlib.h>
16
#include <string.h>
17
18
#include <sys/socket.h>
19
#include <sys/stat.h>
20
#include <sys/types.h>
21
22
#include <import/eb32tree.h>
23
#include <import/ebmbtree.h>
24
#include <import/ebpttree.h>
25
26
#include <haproxy/api.h>
27
#include <haproxy/applet.h>
28
#include <haproxy/cfgparse.h>
29
#include <haproxy/channel.h>
30
#include <haproxy/cli.h>
31
#include <haproxy/dict.h>
32
#include <haproxy/errors.h>
33
#include <haproxy/fd.h>
34
#include <haproxy/frontend.h>
35
#include <haproxy/net_helper.h>
36
#include <haproxy/obj_type-t.h>
37
#include <haproxy/peers.h>
38
#include <haproxy/proxy.h>
39
#include <haproxy/sc_strm.h>
40
#include <haproxy/session-t.h>
41
#include <haproxy/signal.h>
42
#include <haproxy/stats-t.h>
43
#include <haproxy/stconn.h>
44
#include <haproxy/stick_table.h>
45
#include <haproxy/stream.h>
46
#include <haproxy/task.h>
47
#include <haproxy/thread.h>
48
#include <haproxy/time.h>
49
#include <haproxy/tools.h>
50
#include <haproxy/trace.h>
51
52
/***********************************/
53
/* Current shared table sync state */
54
/***********************************/
55
0
#define SHTABLE_F_TEACH_STAGE1      0x00000001 /* Teach state 1 complete */
56
0
#define SHTABLE_F_TEACH_STAGE2      0x00000002 /* Teach state 2 complete */
57
58
59
#define PEER_RESYNC_TIMEOUT         5000 /* 5 seconds */
60
#define PEER_RECONNECT_TIMEOUT      5000 /* 5 seconds */
61
#define PEER_LOCAL_RECONNECT_TIMEOUT 500 /* 500ms */
62
#define PEER_HEARTBEAT_TIMEOUT      3000 /* 3 seconds */
63
64
/* default maximum of updates sent at once */
65
#define PEER_DEF_MAX_UPDATES_AT_ONCE      200
66
67
/* flags for "show peers" */
68
0
#define PEERS_SHOW_F_DICT           0x00000001 /* also show the contents of the dictionary */
69
70
/*****************************/
71
/* Sync message class        */
72
/*****************************/
73
enum {
74
  PEER_MSG_CLASS_CONTROL = 0,
75
  PEER_MSG_CLASS_ERROR,
76
  PEER_MSG_CLASS_STICKTABLE = 10,
77
  PEER_MSG_CLASS_RESERVED = 255,
78
};
79
80
/*****************************/
81
/* control message types     */
82
/*****************************/
83
enum {
84
  PEER_MSG_CTRL_RESYNCREQ = 0,
85
  PEER_MSG_CTRL_RESYNCFINISHED,
86
  PEER_MSG_CTRL_RESYNCPARTIAL,
87
  PEER_MSG_CTRL_RESYNCCONFIRM,
88
  PEER_MSG_CTRL_HEARTBEAT,
89
};
90
91
/*****************************/
92
/* error message types       */
93
/*****************************/
94
enum {
95
  PEER_MSG_ERR_PROTOCOL = 0,
96
  PEER_MSG_ERR_SIZELIMIT,
97
};
98
99
/* network key types;
100
 * network types were directly and mistakenly
101
 * mapped on sample types, to keep backward
102
 * compatiblitiy we keep those values but
103
 * we now use a internal/network mapping
104
 * to avoid further mistakes adding or
105
 * modifying internals types
106
 */
107
enum {
108
        PEER_KT_ANY = 0,  /* any type */
109
        PEER_KT_RESV1,    /* UNUSED */
110
        PEER_KT_SINT,     /* signed 64bits integer type */
111
        PEER_KT_RESV3,    /* UNUSED */
112
        PEER_KT_IPV4,     /* ipv4 type */
113
        PEER_KT_IPV6,     /* ipv6 type */
114
        PEER_KT_STR,      /* char string type */
115
        PEER_KT_BIN,      /* buffer type */
116
        PEER_KT_TYPES     /* number of types, must always be last */
117
};
118
119
/* Map used to retrieve network type from internal type
120
 * Note: Undeclared mapping maps entry to PEER_KT_ANY == 0
121
 */
122
static int peer_net_key_type[SMP_TYPES] = {
123
  [SMP_T_SINT] = PEER_KT_SINT,
124
  [SMP_T_IPV4] = PEER_KT_IPV4,
125
  [SMP_T_IPV6] = PEER_KT_IPV6,
126
  [SMP_T_STR]  = PEER_KT_STR,
127
  [SMP_T_BIN]  = PEER_KT_BIN,
128
};
129
130
/* Map used to retrieve internal type from external type
131
 * Note: Undeclared mapping maps entry to SMP_T_ANY == 0
132
 */
133
static int peer_int_key_type[PEER_KT_TYPES] = {
134
  [PEER_KT_SINT] = SMP_T_SINT,
135
  [PEER_KT_IPV4] = SMP_T_IPV4,
136
  [PEER_KT_IPV6] = SMP_T_IPV6,
137
  [PEER_KT_STR]  = SMP_T_STR,
138
  [PEER_KT_BIN]  = SMP_T_BIN,
139
};
140
141
/*
142
 * Parameters used by functions to build peer protocol messages. */
143
struct peer_prep_params {
144
  struct {
145
    struct peer *peer;
146
  } hello;
147
  struct {
148
    unsigned int st1;
149
  } error_status;
150
  struct {
151
    struct stksess *stksess;
152
    struct shared_table *shared_table;
153
    unsigned int updateid;
154
    int use_identifier;
155
    int use_timed;
156
    struct peer *peer;
157
  } updt;
158
  struct {
159
    struct shared_table *shared_table;
160
  } swtch;
161
  struct {
162
    struct shared_table *shared_table;
163
  } ack;
164
  struct {
165
    unsigned char head[2];
166
  } control;
167
  struct {
168
    unsigned char head[2];
169
  } error;
170
};
171
172
/*******************************/
173
/* stick table sync mesg types */
174
/* Note: ids >= 128 contains   */
175
/* id message contains data     */
176
/*******************************/
177
0
#define PEER_MSG_STKT_UPDATE           0x80
178
0
#define PEER_MSG_STKT_INCUPDATE        0x81
179
0
#define PEER_MSG_STKT_DEFINE           0x82
180
0
#define PEER_MSG_STKT_SWITCH           0x83
181
0
#define PEER_MSG_STKT_ACK              0x84
182
0
#define PEER_MSG_STKT_UPDATE_TIMED     0x85
183
0
#define PEER_MSG_STKT_INCUPDATE_TIMED  0x86
184
/* All the stick-table message identifiers abova have the #7 bit set */
185
0
#define PEER_MSG_STKT_BIT                 7
186
0
#define PEER_MSG_STKT_BIT_MASK         (1 << PEER_MSG_STKT_BIT)
187
188
/* The maximum length of an encoded data length. */
189
0
#define PEER_MSG_ENC_LENGTH_MAXLEN    5
190
191
/* Minimum 64-bits value encoded with 2 bytes */
192
0
#define PEER_ENC_2BYTES_MIN                                  0xf0 /*               0xf0 (or 240) */
193
/* 3 bytes */
194
#define PEER_ENC_3BYTES_MIN  ((1ULL << 11) | PEER_ENC_2BYTES_MIN) /*              0x8f0 (or 2288) */
195
/* 4 bytes */
196
#define PEER_ENC_4BYTES_MIN  ((1ULL << 18) | PEER_ENC_3BYTES_MIN) /*            0x408f0 (or 264432) */
197
/* 5 bytes */
198
#define PEER_ENC_5BYTES_MIN  ((1ULL << 25) | PEER_ENC_4BYTES_MIN) /*          0x20408f0 (or 33818864) */
199
/* 6 bytes */
200
#define PEER_ENC_6BYTES_MIN  ((1ULL << 32) | PEER_ENC_5BYTES_MIN) /*        0x1020408f0 (or 4328786160) */
201
/* 7 bytes */
202
#define PEER_ENC_7BYTES_MIN  ((1ULL << 39) | PEER_ENC_6BYTES_MIN) /*       0x81020408f0 (or 554084600048) */
203
/* 8 bytes */
204
#define PEER_ENC_8BYTES_MIN  ((1ULL << 46) | PEER_ENC_7BYTES_MIN) /*     0x4081020408f0 (or 70922828777712) */
205
/* 9 bytes */
206
#define PEER_ENC_9BYTES_MIN  ((1ULL << 53) | PEER_ENC_8BYTES_MIN) /*   0x204081020408f0 (or 9078122083518704) */
207
/* 10 bytes */
208
#define PEER_ENC_10BYTES_MIN ((1ULL << 60) | PEER_ENC_9BYTES_MIN) /* 0x10204081020408f0 (or 1161999626690365680) */
209
210
/* #7 bit used to detect the last byte to be encoded */
211
0
#define PEER_ENC_STOP_BIT         7
212
/* The byte minimum value with #7 bit set */
213
0
#define PEER_ENC_STOP_BYTE        (1 << PEER_ENC_STOP_BIT)
214
/* The left most number of bits set for PEER_ENC_2BYTES_MIN */
215
0
#define PEER_ENC_2BYTES_MIN_BITS  4
216
217
0
#define PEER_MSG_HEADER_LEN               2
218
219
0
#define PEER_STKT_CACHE_MAX_ENTRIES       128
220
221
/**********************************/
222
/* Peer Session IO handler states */
223
/**********************************/
224
225
enum {
226
  PEER_SESS_ST_ACCEPT = 0,     /* Initial state for session create by an accept, must be zero! */
227
  PEER_SESS_ST_GETVERSION,     /* Validate supported protocol version */
228
  PEER_SESS_ST_GETHOST,        /* Validate host ID correspond to local host id */
229
  PEER_SESS_ST_GETPEER,        /* Validate peer ID correspond to a known remote peer id */
230
  /* after this point, data were possibly exchanged */
231
  PEER_SESS_ST_SENDSUCCESS,    /* Send ret code 200 (success) and wait for message */
232
  PEER_SESS_ST_CONNECT,        /* Initial state for session create on a connect, push presentation into buffer */
233
  PEER_SESS_ST_GETSTATUS,      /* Wait for the welcome message */
234
  PEER_SESS_ST_WAITMSG,        /* Wait for data messages */
235
  PEER_SESS_ST_EXIT,           /* Exit with status code */
236
  PEER_SESS_ST_ERRPROTO,       /* Send error proto message before exit */
237
  PEER_SESS_ST_ERRSIZE,        /* Send error size message before exit */
238
  PEER_SESS_ST_END,            /* Killed session */
239
};
240
241
/***************************************************/
242
/* Peer Session status code - part of the protocol */
243
/***************************************************/
244
245
0
#define PEER_SESS_SC_CONNECTCODE    100 /* connect in progress */
246
0
#define PEER_SESS_SC_CONNECTEDCODE  110 /* tcp connect success */
247
248
0
#define PEER_SESS_SC_SUCCESSCODE    200 /* accept or connect successful */
249
250
0
#define PEER_SESS_SC_TRYAGAIN       300 /* try again later */
251
252
0
#define PEER_SESS_SC_ERRPROTO       501 /* error protocol */
253
0
#define PEER_SESS_SC_ERRVERSION     502 /* unknown protocol version */
254
0
#define PEER_SESS_SC_ERRHOST        503 /* bad host name */
255
0
#define PEER_SESS_SC_ERRPEER        504 /* unknown peer */
256
257
0
#define PEER_SESSION_PROTO_NAME         "HAProxyS"
258
0
#define PEER_MAJOR_VER        2
259
0
#define PEER_MINOR_VER        1
260
0
#define PEER_DWNGRD_MINOR_VER 0
261
262
static size_t proto_len = sizeof(PEER_SESSION_PROTO_NAME) - 1;
263
struct peers *cfg_peers = NULL;
264
static int peers_max_updates_at_once = PEER_DEF_MAX_UPDATES_AT_ONCE;
265
static void peer_session_forceshutdown(struct peer *peer);
266
267
static struct ebpt_node *dcache_tx_insert(struct dcache *dc,
268
                                          struct dcache_tx_entry *i);
269
static inline void flush_dcache(struct peer *peer);
270
271
/* trace source and events */
272
static void peers_trace(enum trace_level level, uint64_t mask,
273
                        const struct trace_source *src,
274
                        const struct ist where, const struct ist func,
275
                        const void *a1, const void *a2, const void *a3, const void *a4);
276
277
static const char *statuscode_str(int statuscode);
278
static const char *peer_app_state_str(enum peer_app_state appstate);
279
static const char *peer_learn_state_str(enum peer_learn_state learnstate);
280
static const char *peer_applet_state_str(int state);
281
282
static const struct trace_event peers_trace_events[] = {
283
#define PEERS_EV_SESS_NEW      (1ULL << 0)
284
  { .mask = PEERS_EV_SESS_NEW,     .name = "sess_new",       .desc = "create new peer session" },
285
#define PEERS_EV_SESS_END      (1ULL << 1)
286
  { .mask = PEERS_EV_SESS_END,     .name = "sess_end",       .desc = "peer session terminated" },
287
#define PEERS_EV_SESS_ERR      (1ULL << 2)
288
  { .mask = PEERS_EV_SESS_ERR,     .name = "sess_err",       .desc = "error on peer session" },
289
#define PEERS_EV_SESS_SHUT     (1ULL << 3)
290
  { .mask = PEERS_EV_SESS_SHUT,    .name = "sess_shut",      .desc = "peer session shutdown" },
291
#define PEERS_EV_SESS_WAKE     (1ULL << 4)
292
  { .mask = PEERS_EV_SESS_WAKE,    .name = "sess_wakeup",    .desc = "peer session wakeup" },
293
#define PEERS_EV_SESS_RESYNC   (1ULL << 5)
294
  { .mask = PEERS_EV_SESS_RESYNC,  .name = "sess_resync",    .desc = "peer session resync" },
295
#define PEERS_EV_SESS_IO       (1ULL << 6)
296
  { .mask = PEERS_EV_SESS_IO,      .name = "sess_io",        .desc = "peer session I/O" },
297
298
#define PEERS_EV_RX_MSG        (1ULL << 7)
299
  { .mask = PEERS_EV_RX_MSG,       .name = "rx_msg",         .desc = "message received" },
300
#define PEERS_EV_RX_BLK        (1ULL << 8)
301
  { .mask = PEERS_EV_RX_BLK,       .name = "rx_blocked",     .desc = "receive blocked" },
302
#define PEERS_EV_RX_ERR        (1ULL << 9)
303
  { .mask = PEERS_EV_RX_ERR,       .name = "rx_error",       .desc = "receive error" },
304
305
#define PEERS_EV_TX_MSG        (1ULL << 10)
306
  { .mask = PEERS_EV_TX_MSG,       .name = "tx_msg",         .desc = "message sent" },
307
#define PEERS_EV_TX_BLK        (1ULL << 11)
308
  { .mask = PEERS_EV_TX_BLK,       .name = "tx_blocked",     .desc = "send blocked" },
309
#define PEERS_EV_TX_ERR        (1ULL << 12)
310
  { .mask = PEERS_EV_TX_ERR,       .name = "tx_error",       .desc = "send error" },
311
312
313
#define PEERS_EV_PROTO_ERR     (1ULL << 13)
314
  { .mask = PEERS_EV_PROTO_ERR,    .name = "proto_error",    .desc = "protocol error" },
315
#define PEERS_EV_PROTO_HELLO   (1ULL << 14)
316
  { .mask = PEERS_EV_PROTO_HELLO,   .name = "proto_hello",   .desc = "protocol hello message" },
317
#define PEERS_EV_PROTO_SUCCESS (1ULL << 15)
318
  { .mask = PEERS_EV_PROTO_SUCCESS, .name = "proto_success", .desc = "protocol success message" },
319
#define PEERS_EV_PROTO_UPDATE  (1ULL << 16)
320
  { .mask = PEERS_EV_PROTO_UPDATE,  .name = "proto_update",  .desc = "protocol UPDATE message" },
321
#define PEERS_EV_PROTO_ACK     (1ULL << 17)
322
  { .mask = PEERS_EV_PROTO_ACK,     .name = "proto_ack",     .desc = "protocol ACK message" },
323
#define PEERS_EV_PROTO_SWITCH  (1ULL << 18)
324
  { .mask = PEERS_EV_PROTO_SWITCH,  .name = "proto_switch",  .desc = "protocol TABLE SWITCH message" },
325
#define PEERS_EV_PROTO_DEF     (1ULL << 19)
326
  { .mask = PEERS_EV_PROTO_DEF,     .name = "proto_def",     .desc = "protocol TABLE DEFINITION message" },
327
#define PEERS_EV_PROTO_CTRL    (1ULL << 20)
328
  { .mask = PEERS_EV_PROTO_CTRL,    .name = "proto_ctrl",    .desc = "protocol control message" },
329
  { }
330
};
331
332
static const struct name_desc peers_trace_lockon_args[4] = {
333
  /* arg1 */ { /* already used by the appctx */ },
334
  /* arg2 */ { .name="peer", .desc="Peer" },
335
  /* arg3 */ { .name="peers",  .desc="Peers" },
336
  /* arg4 */ { }
337
};
338
339
static const struct name_desc peers_trace_decoding[] = {
340
0
#define PEERS_VERB_CLEAN    1
341
  { .name="clean",    .desc="only user-friendly stuff, generally suitable for level \"user\"" },
342
0
#define PEERS_VERB_MINIMAL  2
343
  { .name="minimal",  .desc="report only peer state and flags, no real decoding" },
344
0
#define PEERS_VERB_SIMPLE   3
345
  { .name="simple",   .desc="add simple info about messages when available" },
346
#define PEERS_VERB_ADVANCED 4
347
  { .name="advanced", .desc="add more info about messages when available" },
348
#define PEERS_VERB_COMPLETE 5
349
  { .name="complete", .desc="add full data dump when available" },
350
  { /* end */ }
351
};
352
353
354
struct trace_source trace_peers = {
355
  .name = IST("peers"),
356
  .desc = "Peers protocol",
357
  .arg_def = TRC_ARG1_APPCTX,
358
  .default_cb = peers_trace,
359
  .known_events = peers_trace_events,
360
  .lockon_args = peers_trace_lockon_args,
361
  .decoding = peers_trace_decoding,
362
  .report_events = ~0,  /* report everything by default */
363
};
364
365
/* Return peer control message types as strings (only for debugging purpose). */
366
static inline __maybe_unused char *ctrl_msg_type_str(unsigned int type)
367
0
{
368
0
  switch (type) {
369
0
  case PEER_MSG_CTRL_RESYNCREQ:
370
0
    return "RESYNCREQ";
371
0
  case PEER_MSG_CTRL_RESYNCFINISHED:
372
0
    return "RESYNCFINISHED";
373
0
  case PEER_MSG_CTRL_RESYNCPARTIAL:
374
0
    return "RESYNCPARTIAL";
375
0
  case PEER_MSG_CTRL_RESYNCCONFIRM:
376
0
    return "RESYNCCONFIRM";
377
0
  case PEER_MSG_CTRL_HEARTBEAT:
378
0
    return "HEARTBEAT";
379
0
  default:
380
0
    return "???";
381
0
  }
382
0
}
383
384
#define TRACE_SOURCE    &trace_peers
385
INITCALL1(STG_REGISTER, trace_register_source, TRACE_SOURCE);
386
387
static void peers_trace(enum trace_level level, uint64_t mask,
388
                        const struct trace_source *src,
389
                        const struct ist where, const struct ist func,
390
                        const void *a1, const void *a2, const void *a3, const void *a4)
391
0
{
392
0
  const struct appctx *appctx = a1;
393
0
  const struct peer *peer = a2;
394
0
  const struct peers *peers = NULL;
395
0
  const struct shared_table *st = a3;
396
397
0
  if (!peer && appctx)
398
0
    peer = appctx->svcctx;
399
0
  if (!peer || src->verbosity < PEERS_VERB_CLEAN)
400
0
    return;
401
0
  if (!peers)
402
0
    peers = peer->peers;
403
0
  if (!appctx)
404
0
    appctx = peer->appctx;
405
406
0
  chunk_appendf(&trace_buf, " : [%c,%s] <%s/%s> ",
407
0
          (appctx ? (appctx_is_back(appctx) ? 'B' : 'F') : '-'),
408
0
          (appctx ? peer_applet_state_str(appctx->st0) : "-"),
409
0
          peers->id, peer->id);
410
411
0
  if (peer->local)
412
0
    chunk_appendf(&trace_buf, "RELOADING(%s) ", stopping ? "old" : "new");
413
414
0
  if (src->verbosity == PEERS_VERB_CLEAN)
415
0
    return;
416
417
0
  chunk_appendf(&trace_buf, "peer=(.fl=0x%08x, .app=%s, .learn=%s, .teach=%s, status=%s, ",
418
0
          peer->flags, peer_app_state_str(peer->appstate), peer_learn_state_str(peer->learnstate),
419
0
          ((peer->flags & PEER_TEACH_FLAGS) == PEER_F_TEACH_PROCESS ? "PROCESS" :
420
0
           ((peer->flags & PEER_F_TEACH_FINISHED) ? "FINISHED" : "NONE")),
421
0
          statuscode_str(peer->statuscode));
422
423
0
  chunk_appendf(&trace_buf, ".reco=%s, ", (peer->reconnect
424
0
             ? (tick_is_expired(peer->reconnect, now_ms)
425
0
                ? "<PAST>"
426
0
                : human_time(TICKS_TO_MS(peer->reconnect - now_ms), TICKS_TO_MS(1000)))
427
0
             : "<NEVER>"));
428
429
0
  chunk_appendf(&trace_buf, ".heart=%s, ", (peer->heartbeat
430
0
              ? (tick_is_expired(peer->heartbeat, now_ms)
431
0
                 ? "<PAST>"
432
0
                 : human_time(TICKS_TO_MS(peer->heartbeat - now_ms), TICKS_TO_MS(1000)))
433
0
              : "<NEVER>"));
434
435
0
  chunk_appendf(&trace_buf, ".last_hdshk=%s) ", (peer->last_hdshk
436
0
                   ? (tick_is_expired(peer->last_hdshk, now_ms)
437
0
                ? "<PAST>"
438
0
                : human_time(TICKS_TO_MS(peer->last_hdshk - now_ms), TICKS_TO_MS(1000)))
439
0
                   : "<NEVER>"));
440
441
0
  if (st)
442
0
    chunk_appendf(&trace_buf, "st=(.id=%s, .fl=0x%08x, .pushed=%u, .acked=%u) ",
443
0
            st->table->id, st->flags, st->last_pushed, st->last_acked);
444
445
0
  if (src->verbosity == PEERS_VERB_MINIMAL)
446
0
    return;
447
448
0
  if (appctx)
449
0
    chunk_appendf(&trace_buf, "appctx=(%p, .fl=0x%08x, .st0=%d, .st1=%d) ",
450
0
            appctx, appctx->flags, appctx->st0, appctx->st1);
451
452
0
  chunk_appendf(&trace_buf, "peers=(.fl=0x%08x, local=%s) ",
453
0
                peers->flags, peers->local->id);
454
455
0
  if (src->verbosity == PEERS_VERB_SIMPLE)
456
0
    return;
457
0
}
458
459
static const char *statuscode_str(int statuscode)
460
0
{
461
0
  switch (statuscode) {
462
0
  case PEER_SESS_SC_CONNECTCODE:
463
0
    return "CONN";
464
0
  case PEER_SESS_SC_CONNECTEDCODE:
465
0
    return "HSHK";
466
0
  case PEER_SESS_SC_SUCCESSCODE:
467
0
    return "ESTA";
468
0
  case PEER_SESS_SC_TRYAGAIN:
469
0
    return "RETR";
470
0
  case PEER_SESS_SC_ERRPROTO:
471
0
    return "PROT";
472
0
  case PEER_SESS_SC_ERRVERSION:
473
0
    return "VERS";
474
0
  case PEER_SESS_SC_ERRHOST:
475
0
    return "NAME";
476
0
  case PEER_SESS_SC_ERRPEER:
477
0
    return "UNKN";
478
0
  default:
479
0
    return "NONE";
480
0
  }
481
0
}
482
483
static const char *peer_app_state_str(enum peer_app_state appstate)
484
0
{
485
0
  switch (appstate) {
486
0
  case PEER_APP_ST_STOPPED:
487
0
    return "STOPPED";
488
0
  case PEER_APP_ST_STARTING:
489
0
    return "STARTING";
490
0
  case PEER_APP_ST_RUNNING:
491
0
    return "RUNNING";
492
0
  case PEER_APP_ST_STOPPING:
493
0
    return "STOPPING";
494
0
  default:
495
0
    return "UNKNOWN";
496
0
  }
497
0
}
498
499
static const char *peer_learn_state_str(enum peer_learn_state learnstate)
500
0
{
501
0
  switch (learnstate) {
502
0
  case PEER_LR_ST_NOTASSIGNED:
503
0
    return "NOTASSIGNED";
504
0
  case PEER_LR_ST_ASSIGNED:
505
0
    return "ASSIGNED";
506
0
  case PEER_LR_ST_PROCESSING:
507
0
    return "PROCESSING";
508
0
  case PEER_LR_ST_FINISHED:
509
0
    return "FINISHED";
510
0
  default:
511
0
    return "UNKNOWN";
512
0
  }
513
0
}
514
515
static const char *peer_applet_state_str(int state)
516
0
{
517
0
  switch (state) {
518
0
  case PEER_SESS_ST_ACCEPT:       return "ACCEPT";
519
0
  case PEER_SESS_ST_GETVERSION:   return "GETVERSION";
520
0
  case PEER_SESS_ST_GETHOST:      return "GETHOST";
521
0
  case PEER_SESS_ST_GETPEER:      return "GETPEER";
522
0
  case PEER_SESS_ST_SENDSUCCESS:  return "SENDSUCCESS";
523
0
  case PEER_SESS_ST_CONNECT:      return "CONNECT";
524
0
  case PEER_SESS_ST_GETSTATUS:    return "GETSTATUS";
525
0
  case PEER_SESS_ST_WAITMSG:      return "WAITMSG";
526
0
  case PEER_SESS_ST_EXIT:         return "EXIT";
527
0
  case PEER_SESS_ST_ERRPROTO:     return "ERRPROTO";
528
0
  case PEER_SESS_ST_ERRSIZE:      return "ERRSIZE";
529
0
  case PEER_SESS_ST_END:          return "END";
530
0
  default:                        return "UNKNOWN";
531
0
  }
532
0
}
533
534
/* This function encode an uint64 to 'dynamic' length format.
535
   The encoded value is written at address *str, and the
536
   caller must assure that size after *str is large enough.
537
   At return, the *str is set at the next Byte after then
538
   encoded integer. The function returns then length of the
539
   encoded integer in Bytes */
540
0
int intencode(uint64_t i, char **str) {
541
0
  int idx = 0;
542
0
  unsigned char *msg;
543
544
0
  msg = (unsigned char *)*str;
545
0
  if (i < PEER_ENC_2BYTES_MIN) {
546
0
    msg[0] = (unsigned char)i;
547
0
    *str = (char *)&msg[idx+1];
548
0
    return (idx+1);
549
0
  }
550
551
0
  msg[idx] =(unsigned char)i | PEER_ENC_2BYTES_MIN;
552
0
  i = (i - PEER_ENC_2BYTES_MIN) >> PEER_ENC_2BYTES_MIN_BITS;
553
0
  while (i >= PEER_ENC_STOP_BYTE) {
554
0
    msg[++idx] = (unsigned char)i | PEER_ENC_STOP_BYTE;
555
0
    i = (i - PEER_ENC_STOP_BYTE) >> PEER_ENC_STOP_BIT;
556
0
  }
557
0
  msg[++idx] = (unsigned char)i;
558
0
  *str = (char *)&msg[idx+1];
559
0
  return (idx+1);
560
0
}
561
562
563
/* This function returns a decoded 64bits unsigned integer
564
 * from a varint
565
 *
566
 * Calling:
567
 * - *str must point on the first byte of the buffer to decode.
568
 * - end must point on the next byte after the end of the buffer
569
 *   we are authorized to parse (buf + buflen)
570
 *
571
 * At return:
572
 *
573
 * On success *str will point at the byte following
574
 * the fully decoded integer into the buffer. and
575
 * the decoded value is returned.
576
 *
577
 * If end is reached before the integer was fully decoded,
578
 * *str is set to NULL and the caller have to check this
579
 * to know  there is a decoding error. In this case
580
 * the returned integer is also forced to 0
581
 */
582
uint64_t intdecode(char **str, char *end)
583
0
{
584
0
  unsigned char *msg;
585
0
  uint64_t i;
586
0
  int shift;
587
588
0
  if (!*str)
589
0
    return 0;
590
591
0
  msg = (unsigned char *)*str;
592
0
  if (msg >= (unsigned char *)end)
593
0
    goto fail;
594
595
0
  i = *(msg++);
596
0
  if (i >= PEER_ENC_2BYTES_MIN) {
597
0
    shift = PEER_ENC_2BYTES_MIN_BITS;
598
0
    do {
599
0
      if (msg >= (unsigned char *)end)
600
0
        goto fail;
601
0
      i += (uint64_t)*msg << shift;
602
0
      shift += PEER_ENC_STOP_BIT;
603
0
    } while (*(msg++) >= PEER_ENC_STOP_BYTE);
604
0
  }
605
0
  *str = (char *)msg;
606
0
  return i;
607
608
0
 fail:
609
0
  *str = NULL;
610
0
  return 0;
611
0
}
612
613
/*
614
 * Build a "hello" peer protocol message.
615
 * Return the number of written bytes written to build this messages if succeeded,
616
 * 0 if not.
617
 */
618
static int peer_prepare_hellomsg(char *msg, size_t size, struct peer_prep_params *p)
619
0
{
620
0
  int min_ver, ret;
621
0
  struct peer *peer;
622
623
0
  peer = p->hello.peer;
624
0
  min_ver = (peer->flags & PEER_F_DWNGRD) ? PEER_DWNGRD_MINOR_VER : PEER_MINOR_VER;
625
  /* Prepare headers */
626
0
  ret = snprintf(msg, size, PEER_SESSION_PROTO_NAME " %d.%d\n%s\n%s %d %d\n",
627
0
           (int)PEER_MAJOR_VER, min_ver, peer->id, localpeer, (int)getpid(), (int)1);
628
0
  if (ret >= size)
629
0
    return 0;
630
631
0
  return ret;
632
0
}
633
634
/*
635
 * Build a "handshake succeeded" status message.
636
 * Return the number of written bytes written to build this messages if succeeded,
637
 * 0 if not.
638
 */
639
static int peer_prepare_status_successmsg(char *msg, size_t size, struct peer_prep_params *p)
640
0
{
641
0
  int ret;
642
643
0
  ret = snprintf(msg, size, "%d\n", (int)PEER_SESS_SC_SUCCESSCODE);
644
0
  if (ret >= size)
645
0
    return 0;
646
647
0
  return ret;
648
0
}
649
650
/*
651
 * Build an error status message.
652
 * Return the number of written bytes written to build this messages if succeeded,
653
 * 0 if not.
654
 */
655
static int peer_prepare_status_errormsg(char *msg, size_t size, struct peer_prep_params *p)
656
0
{
657
0
  int ret;
658
0
  unsigned int st1;
659
660
0
  st1 = p->error_status.st1;
661
0
  ret = snprintf(msg, size, "%u\n", st1);
662
0
  if (ret >= size)
663
0
    return 0;
664
665
0
  return ret;
666
0
}
667
668
/* Set the stick-table UPDATE message type byte at <msg_type> address,
669
 * depending on <use_identifier> and <use_timed> boolean parameters.
670
 * Always successful.
671
 */
672
static inline void peer_set_update_msg_type(char *msg_type, int use_identifier, int use_timed)
673
0
{
674
0
  if (use_timed) {
675
0
    if (use_identifier)
676
0
      *msg_type = PEER_MSG_STKT_UPDATE_TIMED;
677
0
    else
678
0
      *msg_type = PEER_MSG_STKT_INCUPDATE_TIMED;
679
0
  }
680
0
  else {
681
0
    if (use_identifier)
682
0
      *msg_type = PEER_MSG_STKT_UPDATE;
683
0
    else
684
0
      *msg_type = PEER_MSG_STKT_INCUPDATE;
685
0
  }
686
0
}
687
/*
688
 * This prepare the data update message on the stick session <ts>, <st> is the considered
689
 * stick table.
690
 *  <msg> is a buffer of <size> to receive data message content
691
 * If function returns 0, the caller should consider we were unable to encode this message (TODO:
692
 * check size)
693
 */
694
int peer_prepare_updatemsg(char *msg, size_t size, struct peer_prep_params *p)
695
0
{
696
0
  uint32_t netinteger;
697
0
  unsigned short datalen;
698
0
  char *cursor, *datamsg;
699
0
  unsigned int data_type;
700
0
  void *data_ptr;
701
0
  struct stksess *ts;
702
0
  struct shared_table *st;
703
0
  unsigned int updateid;
704
0
  int use_identifier;
705
0
  int use_timed;
706
0
  struct peer *peer;
707
708
0
  ts = p->updt.stksess;
709
0
  st = p->updt.shared_table;
710
0
  updateid = p->updt.updateid;
711
0
  use_identifier = p->updt.use_identifier;
712
0
  use_timed = p->updt.use_timed;
713
0
  peer = p->updt.peer;
714
715
0
  cursor = datamsg = msg + PEER_MSG_HEADER_LEN + PEER_MSG_ENC_LENGTH_MAXLEN;
716
717
  /* construct message */
718
719
  /* check if we need to send the update identifier */
720
0
  if (!st->last_pushed || updateid < st->last_pushed || ((updateid - st->last_pushed) != 1)) {
721
0
    use_identifier = 1;
722
0
  }
723
724
  /* encode update identifier if needed */
725
0
  if (use_identifier)  {
726
0
    netinteger = htonl(updateid);
727
0
    memcpy(cursor, &netinteger, sizeof(netinteger));
728
0
    cursor += sizeof(netinteger);
729
0
  }
730
731
0
  if (use_timed) {
732
0
    netinteger = htonl(tick_remain(now_ms, ts->expire));
733
0
    memcpy(cursor, &netinteger, sizeof(netinteger));
734
0
    cursor += sizeof(netinteger);
735
0
  }
736
737
  /* encode the key */
738
0
  if (st->table->type == SMP_T_STR) {
739
0
    int stlen = strlen((char *)ts->key.key);
740
741
0
    intencode(stlen, &cursor);
742
0
    memcpy(cursor, ts->key.key, stlen);
743
0
    cursor += stlen;
744
0
  }
745
0
  else if (st->table->type == SMP_T_SINT) {
746
0
    netinteger = htonl(read_u32(ts->key.key));
747
0
    memcpy(cursor, &netinteger, sizeof(netinteger));
748
0
    cursor += sizeof(netinteger);
749
0
  }
750
0
  else {
751
0
    memcpy(cursor, ts->key.key, st->table->key_size);
752
0
    cursor += st->table->key_size;
753
0
  }
754
755
0
  HA_RWLOCK_RDLOCK(STK_SESS_LOCK, &ts->lock);
756
  /* encode values */
757
0
  for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
758
759
0
    data_ptr = stktable_data_ptr(st->table, ts, data_type);
760
0
    if (data_ptr) {
761
      /* in case of array all elements use
762
       * the same std_type and they are linearly
763
       * encoded.
764
       */
765
0
      if (stktable_data_types[data_type].is_array) {
766
0
        unsigned int idx = 0;
767
768
0
        switch (stktable_data_types[data_type].std_type) {
769
0
        case STD_T_SINT: {
770
0
          int data;
771
772
0
          do {
773
0
            data = stktable_data_cast(data_ptr, std_t_sint);
774
0
            intencode(data, &cursor);
775
776
0
            data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx);
777
0
          } while(data_ptr);
778
0
          break;
779
0
        }
780
0
        case STD_T_UINT: {
781
0
          unsigned int data;
782
783
0
          do {
784
0
            data = stktable_data_cast(data_ptr, std_t_uint);
785
0
            intencode(data, &cursor);
786
787
0
            data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx);
788
0
          } while(data_ptr);
789
0
          break;
790
0
        }
791
0
        case STD_T_ULL: {
792
0
          unsigned long long data;
793
794
0
          do {
795
0
            data = stktable_data_cast(data_ptr, std_t_ull);
796
0
            intencode(data, &cursor);
797
798
0
            data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx);
799
0
          } while(data_ptr);
800
0
          break;
801
0
        }
802
0
        case STD_T_FRQP: {
803
0
          struct freq_ctr *frqp;
804
805
0
          do {
806
0
            frqp = &stktable_data_cast(data_ptr, std_t_frqp);
807
0
            intencode((unsigned int)(now_ms - frqp->curr_tick), &cursor);
808
0
            intencode(frqp->curr_ctr, &cursor);
809
0
            intencode(frqp->prev_ctr, &cursor);
810
811
0
            data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx);
812
0
          } while(data_ptr);
813
0
          break;
814
0
        }
815
0
        }
816
817
        /* array elements fully encoded
818
         * proceed next data_type.
819
         */
820
0
        continue;
821
0
      }
822
0
      switch (stktable_data_types[data_type].std_type) {
823
0
        case STD_T_SINT: {
824
0
          int data;
825
826
0
          data = stktable_data_cast(data_ptr, std_t_sint);
827
0
          intencode(data, &cursor);
828
0
          break;
829
0
        }
830
0
        case STD_T_UINT: {
831
0
          unsigned int data;
832
833
0
          data = stktable_data_cast(data_ptr, std_t_uint);
834
0
          intencode(data, &cursor);
835
0
          break;
836
0
        }
837
0
        case STD_T_ULL: {
838
0
          unsigned long long data;
839
840
0
          data = stktable_data_cast(data_ptr, std_t_ull);
841
0
          intencode(data, &cursor);
842
0
          break;
843
0
        }
844
0
        case STD_T_FRQP: {
845
0
          struct freq_ctr *frqp;
846
847
0
          frqp = &stktable_data_cast(data_ptr, std_t_frqp);
848
0
          intencode((unsigned int)(now_ms - frqp->curr_tick), &cursor);
849
0
          intencode(frqp->curr_ctr, &cursor);
850
0
          intencode(frqp->prev_ctr, &cursor);
851
0
          break;
852
0
        }
853
0
        case STD_T_DICT: {
854
0
          struct dict_entry *de;
855
0
          struct ebpt_node *cached_de;
856
0
          struct dcache_tx_entry cde = { };
857
0
          char *beg, *end;
858
0
          size_t value_len, data_len;
859
0
          struct dcache *dc;
860
861
0
          de = stktable_data_cast(data_ptr, std_t_dict);
862
0
          if (!de) {
863
            /* No entry */
864
0
            intencode(0, &cursor);
865
0
            break;
866
0
          }
867
868
0
          dc = peer->dcache;
869
0
          cde.entry.key = de;
870
0
          cached_de = dcache_tx_insert(dc, &cde);
871
0
          if (cached_de == &cde.entry) {
872
0
            if (cde.id + 1 >= PEER_ENC_2BYTES_MIN)
873
0
              break;
874
            /* Encode the length of the remaining data -> 1 */
875
0
            intencode(1, &cursor);
876
            /* Encode the cache entry ID */
877
0
            intencode(cde.id + 1, &cursor);
878
0
          }
879
0
          else {
880
            /* Leave enough room to encode the remaining data length. */
881
0
            end = beg = cursor + PEER_MSG_ENC_LENGTH_MAXLEN;
882
            /* Encode the dictionary entry key */
883
0
            intencode(cde.id + 1, &end);
884
            /* Encode the length of the dictionary entry data */
885
0
            value_len = de->len;
886
0
            intencode(value_len, &end);
887
            /* Copy the data */
888
0
            memcpy(end, de->value.key, value_len);
889
0
            end += value_len;
890
            /* Encode the length of the data */
891
0
            data_len = end - beg;
892
0
            intencode(data_len, &cursor);
893
0
            memmove(cursor, beg, data_len);
894
0
            cursor += data_len;
895
0
          }
896
0
          break;
897
0
        }
898
0
      }
899
0
    }
900
0
  }
901
0
  HA_RWLOCK_RDUNLOCK(STK_SESS_LOCK, &ts->lock);
902
903
  /* Compute datalen */
904
0
  datalen = (cursor - datamsg);
905
906
  /*  prepare message header */
907
0
  msg[0] = PEER_MSG_CLASS_STICKTABLE;
908
0
  peer_set_update_msg_type(&msg[1], use_identifier, use_timed);
909
0
  cursor = &msg[2];
910
0
  intencode(datalen, &cursor);
911
912
  /* move data after header */
913
0
  memmove(cursor, datamsg, datalen);
914
915
  /* return header size + data_len */
916
0
  return (cursor - msg) + datalen;
917
0
}
918
919
/*
920
 * This prepare the switch table message to targeted share table <st>.
921
 *  <msg> is a buffer of <size> to receive data message content
922
 * If function returns 0, the caller should consider we were unable to encode this message (TODO:
923
 * check size)
924
 */
925
static int peer_prepare_switchmsg(char *msg, size_t size, struct peer_prep_params *params)
926
0
{
927
0
  int len;
928
0
  unsigned short datalen;
929
0
  struct buffer *chunk;
930
0
  char *cursor, *datamsg, *chunkp, *chunkq;
931
0
  uint64_t data = 0;
932
0
  unsigned int data_type;
933
0
  struct shared_table *st;
934
935
0
  st = params->swtch.shared_table;
936
0
  cursor = datamsg = msg + PEER_MSG_HEADER_LEN + PEER_MSG_ENC_LENGTH_MAXLEN;
937
938
  /* Encode data */
939
940
  /* encode local id */
941
0
  intencode(st->local_id, &cursor);
942
943
  /* encode table name */
944
0
  len = strlen(st->table->nid);
945
0
  intencode(len, &cursor);
946
0
  memcpy(cursor, st->table->nid, len);
947
0
  cursor += len;
948
949
  /* encode table type */
950
951
0
  intencode(peer_net_key_type[st->table->type], &cursor);
952
953
  /* encode table key size */
954
0
  intencode(st->table->key_size, &cursor);
955
956
0
  chunk = get_trash_chunk();
957
0
  chunkp = chunkq = chunk->area;
958
  /* encode available known data types in table */
959
0
  for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
960
0
    if (st->table->data_ofs[data_type]) {
961
      /* stored data types parameters are all linearly encoded
962
       * at the end of the 'table definition' message.
963
       *
964
       * Currently only array data_types and and data_types
965
       * using freq_counter base type have parameters:
966
       *
967
       * - array has always at least one parameter set to the
968
       *   number of elements.
969
       *
970
       * - array of base-type freq_counters has an additional
971
       *  parameter set to the period used to compute those
972
       *  freq_counters.
973
       *
974
       * - simple freq counter has a parameter set to the period
975
       *   used to compute
976
       *
977
       *  A set of parameter for a datatype MUST BE prefixed
978
       *  by the data-type id itself:
979
       *  This is useless because the data_types are ordered and
980
       *  the data_type bitfield already gives the information of
981
       *  stored types, but it was designed this way when the
982
       *  push of period parameter was added for freq counters
983
       *  and we don't want to break the compatibility.
984
       *
985
       */
986
0
      if (stktable_data_types[data_type].is_array) {
987
        /* This is an array type so we first encode
988
         * the data_type itself to prefix parameters
989
         */
990
0
        intencode(data_type, &chunkq);
991
992
        /* We encode the first parameter which is
993
         * the number of elements of this array
994
         */
995
0
        intencode(st->table->data_nbelem[data_type], &chunkq);
996
997
        /* for array of freq counters, there is an additional
998
         * period parameter to encode
999
         */
1000
0
        if (stktable_data_types[data_type].std_type == STD_T_FRQP)
1001
0
          intencode(st->table->data_arg[data_type].u, &chunkq);
1002
0
      }
1003
0
      else if (stktable_data_types[data_type].std_type == STD_T_FRQP) {
1004
        /* this datatype is a simple freq counter not part
1005
         * of an array. We encode the data_type itself
1006
         * to prefix the 'period' parameter
1007
         */
1008
0
        intencode(data_type, &chunkq);
1009
0
        intencode(st->table->data_arg[data_type].u, &chunkq);
1010
0
      }
1011
      /* set the bit corresponding to stored data type */
1012
0
      data |= 1ULL << data_type;
1013
0
    }
1014
0
  }
1015
0
  intencode(data, &cursor);
1016
1017
  /* Encode stick-table entries duration. */
1018
0
  intencode(st->table->expire, &cursor);
1019
1020
0
  if (chunkq > chunkp) {
1021
0
    chunk->data = chunkq - chunkp;
1022
0
    memcpy(cursor, chunk->area, chunk->data);
1023
0
    cursor += chunk->data;
1024
0
  }
1025
1026
  /* Compute datalen */
1027
0
  datalen = (cursor - datamsg);
1028
1029
  /*  prepare message header */
1030
0
  msg[0] = PEER_MSG_CLASS_STICKTABLE;
1031
0
  msg[1] = PEER_MSG_STKT_DEFINE;
1032
0
  cursor = &msg[2];
1033
0
  intencode(datalen, &cursor);
1034
1035
  /* move data after header */
1036
0
  memmove(cursor, datamsg, datalen);
1037
1038
  /* return header size + data_len */
1039
0
  return (cursor - msg) + datalen;
1040
0
}
1041
1042
/*
1043
 * This prepare the acknowledge message on the stick session <ts>, <st> is the considered
1044
 * stick table.
1045
 *  <msg> is a buffer of <size> to receive data message content
1046
 * If function returns 0, the caller should consider we were unable to encode this message (TODO:
1047
 * check size)
1048
 */
1049
static int peer_prepare_ackmsg(char *msg, size_t size, struct peer_prep_params *p)
1050
0
{
1051
0
  unsigned short datalen;
1052
0
  char *cursor, *datamsg;
1053
0
  uint32_t netinteger;
1054
0
  struct shared_table *st;
1055
1056
0
  cursor = datamsg = msg + PEER_MSG_HEADER_LEN + PEER_MSG_ENC_LENGTH_MAXLEN;
1057
1058
0
  st = p->ack.shared_table;
1059
0
  intencode(st->remote_id, &cursor);
1060
0
  netinteger = htonl(st->last_get);
1061
0
  memcpy(cursor, &netinteger, sizeof(netinteger));
1062
0
  cursor += sizeof(netinteger);
1063
1064
  /* Compute datalen */
1065
0
  datalen = (cursor - datamsg);
1066
1067
  /*  prepare message header */
1068
0
  msg[0] = PEER_MSG_CLASS_STICKTABLE;
1069
0
  msg[1] = PEER_MSG_STKT_ACK;
1070
0
  cursor = &msg[2];
1071
0
  intencode(datalen, &cursor);
1072
1073
  /* move data after header */
1074
0
  memmove(cursor, datamsg, datalen);
1075
1076
  /* return header size + data_len */
1077
0
  return (cursor - msg) + datalen;
1078
0
}
1079
1080
/*
1081
 * Function to deinit connected peer
1082
 */
1083
void __peer_session_deinit(struct peer *peer)
1084
0
{
1085
0
  struct peers *peers = peer->peers;
1086
0
  int thr;
1087
1088
0
  if (!peers || !peer->appctx)
1089
0
    return;
1090
1091
0
  thr = peer->appctx->t->tid;
1092
0
  HA_ATOMIC_DEC(&peers->applet_count[thr]);
1093
1094
0
  if (peer->appctx->st0 == PEER_SESS_ST_WAITMSG)
1095
0
    HA_ATOMIC_DEC(&connected_peers);
1096
1097
0
  HA_ATOMIC_DEC(&active_peers);
1098
1099
0
  flush_dcache(peer);
1100
1101
  /* Re-init current table pointers to force announcement on re-connect */
1102
0
  peer->remote_table = peer->last_local_table = peer->stop_local_table = NULL;
1103
0
  peer->appctx = NULL;
1104
1105
        /* reset teaching flags to 0 */
1106
0
        peer->flags &= ~PEER_TEACH_FLAGS;
1107
1108
  /* Mark the peer as stopping and wait for the sync task */
1109
0
  peer->flags |= PEER_F_WAIT_SYNCTASK_ACK;
1110
0
  peer->appstate = PEER_APP_ST_STOPPING;
1111
0
  TRACE_STATE("peer session stopping", PEERS_EV_SESS_END, peer->appctx, peer);
1112
0
  task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
1113
0
}
1114
1115
static int peer_session_init(struct appctx *appctx)
1116
0
{
1117
0
  struct peer *peer = appctx->svcctx;
1118
0
  struct stream *s;
1119
0
  struct sockaddr_storage *addr = NULL;
1120
1121
0
  TRACE_ENTER(PEERS_EV_SESS_NEW, appctx, peer);
1122
0
  if (!sockaddr_alloc(&addr, &peer->srv->addr, sizeof(peer->srv->addr)))
1123
0
    goto out_error;
1124
0
  set_host_port(addr, peer->srv->svc_port);
1125
1126
0
  if (appctx_finalize_startup(appctx, peer->peers->peers_fe, &BUF_NULL) == -1)
1127
0
    goto out_free_addr;
1128
1129
0
  s = appctx_strm(appctx);
1130
  /* applet is waiting for data */
1131
0
  applet_need_more_data(appctx);
1132
0
  appctx_wakeup(appctx);
1133
1134
  /* initiate an outgoing connection */
1135
0
  s->scb->dst = addr;
1136
0
  s->scb->flags |= (SC_FL_RCV_ONCE|SC_FL_NOLINGER);
1137
0
  s->flags = SF_ASSIGNED;
1138
0
  stream_set_srv_target(s, peer->srv);
1139
1140
0
  s->do_log = NULL;
1141
0
  s->uniq_id = 0;
1142
0
  _HA_ATOMIC_INC(&active_peers);
1143
0
  TRACE_LEAVE(PEERS_EV_SESS_NEW, appctx, peer);
1144
0
  return 0;
1145
1146
0
 out_free_addr:
1147
0
  sockaddr_free(&addr);
1148
0
 out_error:
1149
0
  TRACE_ERROR("peer session init failed", PEERS_EV_SESS_NEW|PEERS_EV_SESS_END|PEERS_EV_SESS_ERR, NULL, peer);
1150
0
  return -1;
1151
0
}
1152
1153
/*
1154
 * Callback to release a session with a peer
1155
 */
1156
static void peer_session_release(struct appctx *appctx)
1157
0
{
1158
0
  struct peer *peer = appctx->svcctx;
1159
1160
  /* appctx->svcctx is not a peer session */
1161
0
  if (appctx->st0 < PEER_SESS_ST_SENDSUCCESS)
1162
0
    return;
1163
1164
  /* peer session identified */
1165
0
  if (peer) {
1166
0
    HA_SPIN_LOCK(PEER_LOCK, &peer->lock);
1167
0
    if (peer->appctx == appctx)
1168
0
      __peer_session_deinit(peer);
1169
0
    peer->flags &= ~PEER_F_ALIVE;
1170
0
    HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
1171
0
    TRACE_STATE("peer session released", PEERS_EV_SESS_END, appctx, peer);
1172
0
  }
1173
0
}
1174
1175
/* Retrieve the major and minor versions of peers protocol
1176
 * announced by a remote peer. <str> is a null-terminated
1177
 * string with the following format: "<maj_ver>.<min_ver>".
1178
 */
1179
static int peer_get_version(const char *str,
1180
                            unsigned int *maj_ver, unsigned int *min_ver)
1181
0
{
1182
0
  unsigned int majv, minv;
1183
0
  const char *pos, *saved;
1184
0
  const char *end;
1185
1186
0
  saved = pos = str;
1187
0
  end = str + strlen(str);
1188
1189
0
  majv = read_uint(&pos, end);
1190
0
  if (saved == pos || *pos++ != '.')
1191
0
    return -1;
1192
1193
0
  saved = pos;
1194
0
  minv = read_uint(&pos, end);
1195
0
  if (saved == pos || pos != end)
1196
0
    return -1;
1197
1198
0
  *maj_ver = majv;
1199
0
  *min_ver = minv;
1200
1201
0
  return 0;
1202
0
}
1203
1204
/*
1205
 * Parse a line terminated by an optional '\r' character, followed by a mandatory
1206
 * '\n' character.
1207
 * Returns 1 if succeeded or 0 if a '\n' character could not be found, and -1 if
1208
 * a line could not be read because the communication channel is closed.
1209
 */
1210
static inline int peer_getline(struct appctx  *appctx)
1211
0
{
1212
0
  int n = 0;
1213
1214
0
  TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx);
1215
0
  if (applet_get_inbuf(appctx) == NULL) {
1216
0
    applet_need_more_data(appctx);
1217
0
    goto out;
1218
0
  }
1219
1220
0
  n = applet_getline(appctx, trash.area, trash.size);
1221
0
  if (!n) {
1222
0
    applet_need_more_data(appctx);
1223
0
    goto out;
1224
0
  }
1225
1226
0
  if (n < 0 || trash.area[n - 1] != '\n') {
1227
0
    appctx->st0 = PEER_SESS_ST_END;
1228
0
    TRACE_ERROR("failed to receive data (channel closed or full)", PEERS_EV_SESS_IO|PEERS_EV_RX_ERR, appctx);
1229
0
    return -1;
1230
0
  }
1231
1232
0
  if (n > 1 && (trash.area[n - 2] == '\r'))
1233
0
    trash.area[n - 2] = 0;
1234
0
  else
1235
0
    trash.area[n - 1] = 0;
1236
1237
0
  applet_skip_input(appctx, n);
1238
0
  out:
1239
0
  TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx);
1240
0
  return n;
1241
0
}
1242
1243
/*
1244
 * Send a message after having called <peer_prepare_msg> to build it.
1245
 * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
1246
 * Returns -1 if there was not enough room left to send the message,
1247
 * any other negative returned value must  be considered as an error with an appcxt st0
1248
 * returned value equal to PEER_SESS_ST_END.
1249
 */
1250
static inline int peer_send_msg(struct appctx *appctx,
1251
                                int (*peer_prepare_msg)(char *, size_t, struct peer_prep_params *),
1252
                                struct peer_prep_params *params)
1253
0
{
1254
0
  int ret, msglen;
1255
1256
0
  TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_TX_MSG, appctx);
1257
0
  msglen = peer_prepare_msg(trash.area, trash.size, params);
1258
0
  if (!msglen) {
1259
    /* internal error: message does not fit in trash */
1260
0
    appctx->st0 = PEER_SESS_ST_END;
1261
0
    TRACE_ERROR("failed to send data (message too long)", PEERS_EV_SESS_IO|PEERS_EV_TX_ERR, appctx);
1262
0
    return 0;
1263
0
  }
1264
1265
  /* message to buffer */
1266
0
  ret = applet_putblk(appctx, trash.area, msglen);
1267
0
  if (ret <= 0) {
1268
0
    if (ret != -1) {
1269
0
      TRACE_ERROR("failed to send data (channel closed)", PEERS_EV_SESS_IO|PEERS_EV_TX_ERR, appctx);
1270
0
      appctx->st0 = PEER_SESS_ST_END;
1271
0
    }
1272
0
  }
1273
1274
0
  TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_TX_MSG, appctx);
1275
0
  return ret;
1276
0
}
1277
1278
/*
1279
 * Send a hello message.
1280
 * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
1281
 * Returns -1 if there was not enough room left to send the message,
1282
 * any other negative returned value must  be considered as an error with an appcxt st0
1283
 * returned value equal to PEER_SESS_ST_END.
1284
 */
1285
static inline int peer_send_hellomsg(struct appctx *appctx, struct peer *peer)
1286
0
{
1287
0
  struct peer_prep_params p = {
1288
0
    .hello.peer = peer,
1289
0
  };
1290
1291
0
  TRACE_PROTO("send hello message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_HELLO, appctx, peer);
1292
0
  return peer_send_msg(appctx, peer_prepare_hellomsg, &p);
1293
0
}
1294
1295
/*
1296
 * Send a success peer handshake status message.
1297
 * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
1298
 * Returns -1 if there was not enough room left to send the message,
1299
 * any other negative returned value must  be considered as an error with an appcxt st0
1300
 * returned value equal to PEER_SESS_ST_END.
1301
 */
1302
static inline int peer_send_status_successmsg(struct appctx *appctx)
1303
0
{
1304
0
  TRACE_PROTO("send status success message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_SUCCESS, appctx);
1305
0
  return peer_send_msg(appctx, peer_prepare_status_successmsg, NULL);
1306
0
}
1307
1308
/*
1309
 * Send a peer handshake status error message.
1310
 * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
1311
 * Returns -1 if there was not enough room left to send the message,
1312
 * any other negative returned value must  be considered as an error with an appcxt st0
1313
 * returned value equal to PEER_SESS_ST_END.
1314
 */
1315
static inline int peer_send_status_errormsg(struct appctx *appctx)
1316
0
{
1317
0
  struct peer_prep_params p = {
1318
0
    .error_status.st1 = appctx->st1,
1319
0
  };
1320
1321
0
  TRACE_PROTO("send status error message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_ERR, appctx);
1322
0
  return peer_send_msg(appctx, peer_prepare_status_errormsg, &p);
1323
0
}
1324
1325
/*
1326
 * Send a stick-table switch message.
1327
 * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
1328
 * Returns -1 if there was not enough room left to send the message,
1329
 * any other negative returned value must  be considered as an error with an appcxt st0
1330
 * returned value equal to PEER_SESS_ST_END.
1331
 */
1332
static inline int peer_send_switchmsg(struct shared_table *st, struct appctx *appctx)
1333
0
{
1334
0
  struct peer_prep_params p = {
1335
0
    .swtch.shared_table = st,
1336
0
  };
1337
1338
0
  TRACE_PROTO("send table switch message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_SWITCH, appctx, NULL, st);
1339
0
  return peer_send_msg(appctx, peer_prepare_switchmsg, &p);
1340
0
}
1341
1342
/*
1343
 * Send a stick-table update acknowledgement message.
1344
 * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
1345
 * Returns -1 if there was not enough room left to send the message,
1346
 * any other negative returned value must  be considered as an error with an appcxt st0
1347
 * returned value equal to PEER_SESS_ST_END.
1348
 */
1349
static inline int peer_send_ackmsg(struct shared_table *st, struct appctx *appctx)
1350
0
{
1351
0
  struct peer_prep_params p = {
1352
0
    .ack.shared_table = st,
1353
0
  };
1354
1355
0
  TRACE_PROTO("send ack message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_ACK, appctx, NULL, st);
1356
0
  return peer_send_msg(appctx, peer_prepare_ackmsg, &p);
1357
0
}
1358
1359
/*
1360
 * Send a stick-table update message.
1361
 * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
1362
 * Returns -1 if there was not enough room left to send the message,
1363
 * any other negative returned value must  be considered as an error with an appcxt st0
1364
 * returned value equal to PEER_SESS_ST_END.
1365
 */
1366
static inline int peer_send_updatemsg(struct shared_table *st, struct appctx *appctx, struct stksess *ts,
1367
                                      unsigned int updateid, int use_identifier, int use_timed)
1368
0
{
1369
0
  struct peer_prep_params p = {
1370
0
    .updt = {
1371
0
      .stksess = ts,
1372
0
      .shared_table = st,
1373
0
      .updateid = updateid,
1374
0
      .use_identifier = use_identifier,
1375
0
      .use_timed = use_timed,
1376
0
      .peer = appctx->svcctx,
1377
0
    },
1378
0
  };
1379
1380
0
  TRACE_PROTO("send update message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_UPDATE, appctx, NULL, st);
1381
0
  return peer_send_msg(appctx, peer_prepare_updatemsg, &p);
1382
0
}
1383
1384
/*
1385
 * Build a peer protocol control class message.
1386
 * Returns the number of written bytes used to build the message if succeeded,
1387
 * 0 if not.
1388
 */
1389
static int peer_prepare_control_msg(char *msg, size_t size, struct peer_prep_params *p)
1390
0
{
1391
0
  if (size < sizeof p->control.head)
1392
0
    return 0;
1393
1394
0
  msg[0] = p->control.head[0];
1395
0
  msg[1] = p->control.head[1];
1396
1397
0
  return 2;
1398
0
}
1399
1400
/*
1401
 * Send a stick-table synchronization request message.
1402
 * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
1403
 * Returns -1 if there was not enough room left to send the message,
1404
 * any other negative returned value must  be considered as an error with an appctx st0
1405
 * returned value equal to PEER_SESS_ST_END.
1406
 */
1407
static inline int peer_send_resync_reqmsg(struct appctx *appctx,
1408
                                          struct peer *peer, struct peers *peers)
1409
0
{
1410
0
  struct peer_prep_params p = {
1411
0
    .control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_RESYNCREQ, },
1412
0
  };
1413
1414
0
  TRACE_PROTO("send resync request message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
1415
0
  return peer_send_msg(appctx, peer_prepare_control_msg, &p);
1416
0
}
1417
1418
/*
1419
 * Send a stick-table synchronization confirmation message.
1420
 * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
1421
 * Returns -1 if there was not enough room left to send the message,
1422
 * any other negative returned value must  be considered as an error with an appctx st0
1423
 * returned value equal to PEER_SESS_ST_END.
1424
 */
1425
static inline int peer_send_resync_confirmsg(struct appctx *appctx,
1426
                                             struct peer *peer, struct peers *peers)
1427
0
{
1428
0
  struct peer_prep_params p = {
1429
0
    .control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_RESYNCCONFIRM, },
1430
0
  };
1431
1432
0
  TRACE_PROTO("send resync confirm message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
1433
0
  return peer_send_msg(appctx, peer_prepare_control_msg, &p);
1434
0
}
1435
1436
/*
1437
 * Send a stick-table synchronization finished message.
1438
 * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
1439
 * Returns -1 if there was not enough room left to send the message,
1440
 * any other negative returned value must  be considered as an error with an appctx st0
1441
 * returned value equal to PEER_SESS_ST_END.
1442
 */
1443
static inline int peer_send_resync_finishedmsg(struct appctx *appctx,
1444
                                               struct peer *peer, struct peers *peers)
1445
0
{
1446
0
  struct peer_prep_params p = {
1447
0
    .control.head = { PEER_MSG_CLASS_CONTROL, },
1448
0
  };
1449
1450
0
  p.control.head[1] = (HA_ATOMIC_LOAD(&peers->flags) & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED ?
1451
0
    PEER_MSG_CTRL_RESYNCFINISHED : PEER_MSG_CTRL_RESYNCPARTIAL;
1452
1453
0
  TRACE_PROTO("send full resync finish message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
1454
0
  return peer_send_msg(appctx, peer_prepare_control_msg, &p);
1455
0
}
1456
1457
/*
1458
 * Send a heartbeat message.
1459
 * Return 0 if the message could not be built modifying the appctx st0 to PEER_SESS_ST_END value.
1460
 * Returns -1 if there was not enough room left to send the message,
1461
 * any other negative returned value must  be considered as an error with an appctx st0
1462
 * returned value equal to PEER_SESS_ST_END.
1463
 */
1464
static inline int peer_send_heartbeatmsg(struct appctx *appctx,
1465
                                         struct peer *peer, struct peers *peers)
1466
0
{
1467
0
  struct peer_prep_params p = {
1468
0
    .control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_HEARTBEAT, },
1469
0
  };
1470
1471
0
  TRACE_PROTO("send heartbeat message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
1472
0
  return peer_send_msg(appctx, peer_prepare_control_msg, &p);
1473
0
}
1474
1475
/*
1476
 * Build a peer protocol error class message.
1477
 * Returns the number of written bytes used to build the message if succeeded,
1478
 * 0 if not.
1479
 */
1480
static int peer_prepare_error_msg(char *msg, size_t size, struct peer_prep_params *p)
1481
0
{
1482
0
  if (size < sizeof p->error.head)
1483
0
    return 0;
1484
1485
0
  msg[0] = p->error.head[0];
1486
0
  msg[1] = p->error.head[1];
1487
1488
0
  return 2;
1489
0
}
1490
1491
/*
1492
 * Send a "size limit reached" error message.
1493
 * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
1494
 * Returns -1 if there was not enough room left to send the message,
1495
 * any other negative returned value must  be considered as an error with an appctx st0
1496
 * returned value equal to PEER_SESS_ST_END.
1497
 */
1498
static inline int peer_send_error_size_limitmsg(struct appctx *appctx)
1499
0
{
1500
0
  struct peer_prep_params p = {
1501
0
    .error.head = { PEER_MSG_CLASS_ERROR, PEER_MSG_ERR_SIZELIMIT, },
1502
0
  };
1503
1504
0
  TRACE_PROTO("send error size limit message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_ERR, appctx);
1505
0
  return peer_send_msg(appctx, peer_prepare_error_msg, &p);
1506
0
}
1507
1508
/*
1509
 * Send a "peer protocol" error message.
1510
 * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
1511
 * Returns -1 if there was not enough room left to send the message,
1512
 * any other negative returned value must  be considered as an error with an appctx st0
1513
 * returned value equal to PEER_SESS_ST_END.
1514
 */
1515
static inline int peer_send_error_protomsg(struct appctx *appctx)
1516
0
{
1517
0
  struct peer_prep_params p = {
1518
0
    .error.head = { PEER_MSG_CLASS_ERROR, PEER_MSG_ERR_PROTOCOL, },
1519
0
  };
1520
1521
0
  TRACE_PROTO("send protocol error message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_ERR, appctx);
1522
0
  return peer_send_msg(appctx, peer_prepare_error_msg, &p);
1523
0
}
1524
1525
/*
1526
 * Function used to lookup for recent stick-table updates associated with
1527
 * <st> shared stick-table when a lesson must be taught a peer (learn state is not PEER_LR_ST_NOTASSIGNED).
1528
 */
1529
static inline struct stksess *peer_teach_process_stksess_lookup(struct shared_table *st)
1530
0
{
1531
0
  struct eb32_node *eb;
1532
0
  struct stksess *ret;
1533
1534
0
  eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
1535
0
  if (!eb) {
1536
0
    eb = eb32_first(&st->table->updates);
1537
0
    if (!eb || (eb->key == st->last_pushed)) {
1538
0
      st->last_pushed = st->table->localupdate;
1539
0
      return NULL;
1540
0
    }
1541
0
  }
1542
1543
  /* if distance between the last pushed and the retrieved key
1544
   * is greater than the distance last_pushed and the local_update
1545
   * this means we are beyond localupdate.
1546
   */
1547
0
  if ((eb->key - st->last_pushed) > (st->table->localupdate - st->last_pushed)) {
1548
0
    st->last_pushed = st->table->localupdate;
1549
0
    return NULL;
1550
0
  }
1551
1552
0
  ret = eb32_entry(eb, struct stksess, upd);
1553
0
  if (!_HA_ATOMIC_LOAD(&ret->seen))
1554
0
    _HA_ATOMIC_STORE(&ret->seen, 1);
1555
0
  return ret;
1556
0
}
1557
1558
/*
1559
 * Function used to lookup for recent stick-table updates associated with
1560
 * <st> shared stick-table during teach state 1 step.
1561
 */
1562
static inline struct stksess *peer_teach_stage1_stksess_lookup(struct shared_table *st)
1563
0
{
1564
0
  struct eb32_node *eb;
1565
0
  struct stksess *ret;
1566
1567
0
  eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
1568
0
  if (!eb) {
1569
0
    st->flags |= SHTABLE_F_TEACH_STAGE1;
1570
0
    eb = eb32_first(&st->table->updates);
1571
0
    if (eb)
1572
0
      st->last_pushed = eb->key - 1;
1573
0
    return NULL;
1574
0
  }
1575
1576
0
  ret = eb32_entry(eb, struct stksess, upd);
1577
0
  if (!_HA_ATOMIC_LOAD(&ret->seen))
1578
0
    _HA_ATOMIC_STORE(&ret->seen, 1);
1579
0
  return ret;
1580
0
}
1581
1582
/*
1583
 * Function used to lookup for recent stick-table updates associated with
1584
 * <st> shared stick-table during teach state 2 step.
1585
 */
1586
static inline struct stksess *peer_teach_stage2_stksess_lookup(struct shared_table *st)
1587
0
{
1588
0
  struct eb32_node *eb;
1589
0
  struct stksess *ret;
1590
1591
0
  eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1);
1592
0
  if (!eb || eb->key > st->teaching_origin) {
1593
0
    st->flags |= SHTABLE_F_TEACH_STAGE2;
1594
0
    return NULL;
1595
0
  }
1596
1597
0
  ret = eb32_entry(eb, struct stksess, upd);
1598
0
  if (!_HA_ATOMIC_LOAD(&ret->seen))
1599
0
    _HA_ATOMIC_STORE(&ret->seen, 1);
1600
0
  return ret;
1601
0
}
1602
1603
/*
1604
 * Generic function to emit update messages for <st> stick-table when a lesson must
1605
 * be taught to the peer <p>.
1606
 *
1607
 * This function temporary unlock/lock <st> when it sends stick-table updates or
1608
 * when decrementing its refcount in case of any error when it sends this updates.
1609
 * It must be called with the stick-table lock released.
1610
 *
1611
 * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
1612
 * Returns -1 if there was not enough room left to send the message,
1613
 * any other negative returned value must  be considered as an error with an appcxt st0
1614
 * returned value equal to PEER_SESS_ST_END.
1615
 * If it returns 0 or -1, this function leave <st> locked if already locked when entering this function
1616
 * unlocked if not already locked when entering this function.
1617
 */
1618
int peer_send_teachmsgs(struct appctx *appctx, struct peer *p,
1619
                        struct stksess *(*peer_stksess_lookup)(struct shared_table *),
1620
                        struct shared_table *st)
1621
0
{
1622
0
  int ret, new_pushed, use_timed;
1623
0
  int updates_sent = 0;
1624
0
  int failed_once = 0;
1625
1626
0
  TRACE_ENTER(PEERS_EV_SESS_IO, appctx, p, st);
1627
1628
0
  ret = 1;
1629
0
  use_timed = 0;
1630
0
  if (st != p->last_local_table) {
1631
0
    ret = peer_send_switchmsg(st, appctx);
1632
0
    if (ret <= 0)
1633
0
      goto out_unlocked;
1634
1635
0
    p->last_local_table = st;
1636
0
    TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_SWITCH, appctx, NULL, st, NULL,
1637
0
           "table switch message sent (table=%s)", st->table->id);
1638
0
  }
1639
1640
0
  if (peer_stksess_lookup != peer_teach_process_stksess_lookup)
1641
0
    use_timed = !(p->flags & PEER_F_DWNGRD);
1642
1643
  /* We force new pushed to 1 to force identifier in update message */
1644
0
  new_pushed = 1;
1645
1646
0
  if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock) != 0) {
1647
    /* just don't engage here if there is any contention */
1648
0
    applet_have_more_data(appctx);
1649
0
    ret = -1;
1650
0
    goto out_unlocked;
1651
0
  }
1652
1653
0
  while (1) {
1654
0
    struct stksess *ts;
1655
0
    unsigned updateid;
1656
1657
    /* push local updates */
1658
0
    ts = peer_stksess_lookup(st);
1659
0
    if (!ts) {
1660
0
      ret = 1; // done
1661
0
      break;
1662
0
    }
1663
1664
0
    updateid = ts->upd.key;
1665
0
    if (p->srv->shard && ts->shard != p->srv->shard) {
1666
      /* Skip this entry */
1667
0
      st->last_pushed = updateid;
1668
0
      new_pushed = 1;
1669
0
      continue;
1670
0
    }
1671
1672
0
    HA_ATOMIC_INC(&ts->ref_cnt);
1673
0
    HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
1674
1675
0
    ret = peer_send_updatemsg(st, appctx, ts, updateid, new_pushed, use_timed);
1676
1677
0
    if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock) != 0) {
1678
0
      if (failed_once) {
1679
        /* we've already faced contention twice in this
1680
         * loop, this is getting serious, do not insist
1681
         * anymore and come back later
1682
         */
1683
0
        HA_ATOMIC_DEC(&ts->ref_cnt);
1684
0
        applet_have_more_data(appctx);
1685
0
        ret = -1;
1686
0
        goto out_unlocked;
1687
0
      }
1688
      /* OK contention happens, for this one we'll wait on the
1689
       * lock, but only once.
1690
       */
1691
0
      failed_once++;
1692
0
      HA_RWLOCK_RDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
1693
0
    }
1694
1695
0
    HA_ATOMIC_DEC(&ts->ref_cnt);
1696
0
    if (ret <= 0)
1697
0
      break;
1698
1699
0
    st->last_pushed = updateid;
1700
0
    TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_UPDATE, appctx, NULL, st, NULL,
1701
0
           "update message sent (table=%s, updateid=%u)", st->table->id, st->last_pushed);
1702
1703
    /* identifier may not needed in next update message */
1704
0
    new_pushed = 0;
1705
1706
0
    updates_sent++;
1707
0
    if (updates_sent >= peers_max_updates_at_once) {
1708
0
      applet_have_more_data(appctx);
1709
0
      ret = -1;
1710
0
      break;
1711
0
    }
1712
0
  }
1713
1714
0
 out:
1715
0
  HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
1716
0
 out_unlocked:
1717
0
  TRACE_LEAVE(PEERS_EV_SESS_IO, appctx, p, st);
1718
0
  return ret;
1719
0
}
1720
1721
/*
1722
 * Function to emit update messages for <st> stick-table when a lesson must
1723
 * be taught to the peer <p> (learn state is not PEER_LR_ST_NOTASSIGNED).
1724
 *
1725
 * Note that <st> shared stick-table is locked when calling this function, and
1726
 * the lock is dropped then re-acquired.
1727
 *
1728
 * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
1729
 * Returns -1 if there was not enough room left to send the message,
1730
 * any other negative returned value must  be considered as an error with an appcxt st0
1731
 * returned value equal to PEER_SESS_ST_END.
1732
 */
1733
static inline int peer_send_teach_process_msgs(struct appctx *appctx, struct peer *p,
1734
                                               struct shared_table *st)
1735
0
{
1736
0
  TRACE_PROTO("send teach process messages", PEERS_EV_SESS_IO, appctx, p, st);
1737
0
  return peer_send_teachmsgs(appctx, p, peer_teach_process_stksess_lookup, st);
1738
0
}
1739
1740
/*
1741
 * Function to emit update messages for <st> stick-table when a lesson must
1742
 * be taught to the peer <p> during teach state 1 step. It must be called with
1743
 * the stick-table lock released.
1744
 *
1745
 * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
1746
 * Returns -1 if there was not enough room left to send the message,
1747
 * any other negative returned value must  be considered as an error with an appcxt st0
1748
 * returned value equal to PEER_SESS_ST_END.
1749
 */
1750
static inline int peer_send_teach_stage1_msgs(struct appctx *appctx, struct peer *p,
1751
                                              struct shared_table *st)
1752
0
{
1753
0
  TRACE_PROTO("send teach stage1 messages", PEERS_EV_SESS_IO, appctx, p, st);
1754
0
  return peer_send_teachmsgs(appctx, p, peer_teach_stage1_stksess_lookup, st);
1755
0
}
1756
1757
/*
1758
 * Function to emit update messages for <st> stick-table when a lesson must
1759
 * be taught to the peer <p> during teach state 1 step. It must be called with
1760
 * the stick-table lock released.
1761
 *
1762
 * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value.
1763
 * Returns -1 if there was not enough room left to send the message,
1764
 * any other negative returned value must  be considered as an error with an appcxt st0
1765
 * returned value equal to PEER_SESS_ST_END.
1766
 */
1767
static inline int peer_send_teach_stage2_msgs(struct appctx *appctx, struct peer *p,
1768
                                              struct shared_table *st)
1769
0
{
1770
0
  TRACE_PROTO("send teach stage2 messages", PEERS_EV_SESS_IO, appctx, p, st);
1771
0
  return peer_send_teachmsgs(appctx, p, peer_teach_stage2_stksess_lookup, st);
1772
0
}
1773
1774
1775
/*
1776
 * Function used to parse a stick-table update message after it has been received
1777
 * by <p> peer with <msg_cur> as address of the pointer to the position in the
1778
 * receipt buffer with <msg_end> being position of the end of the stick-table message.
1779
 * Update <msg_curr> accordingly to the peer protocol specs if no peer protocol error
1780
 * was encountered.
1781
 * <exp> must be set if the stick-table entry expires.
1782
 * <updt> must be set for  PEER_MSG_STKT_UPDATE or PEER_MSG_STKT_UPDATE_TIMED stick-table
1783
 * messages, in this case the stick-table update message is received with a stick-table
1784
 * update ID.
1785
 * <totl> is the length of the stick-table update message computed upon receipt.
1786
 */
1787
int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt, int exp,
1788
                         char **msg_cur, char *msg_end, int msg_len, int totl)
1789
0
{
1790
0
  struct shared_table *st = p->remote_table;
1791
0
  struct stktable *table;
1792
0
  struct stksess *ts, *newts;
1793
0
  struct stksess *wts = NULL; /* write_to stksess */
1794
0
  uint32_t update;
1795
0
  int expire;
1796
0
  unsigned int data_type;
1797
0
  size_t keylen;
1798
0
  void *data_ptr;
1799
0
  char *msg_save;
1800
1801
0
  TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_UPDATE, appctx, p, st);
1802
1803
  /* Here we have data message */
1804
0
  if (!st) {
1805
0
    TRACE_PROTO("ignore update message: no remote table", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_UPDATE, appctx, p);
1806
0
    goto ignore_msg;
1807
0
  }
1808
1809
0
  table = st->table;
1810
1811
0
  expire = MS_TO_TICKS(table->expire);
1812
1813
0
  if (updt) {
1814
0
    if (msg_len < sizeof(update)) {
1815
0
      TRACE_ERROR("malformed update message: message too small", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
1816
0
      goto malformed_exit;
1817
0
    }
1818
1819
0
    memcpy(&update, *msg_cur, sizeof(update));
1820
0
    *msg_cur += sizeof(update);
1821
0
  }
1822
0
  else
1823
0
    update = st->last_get + 1;
1824
1825
0
  if (p->learnstate != PEER_LR_ST_PROCESSING)
1826
0
    st->last_get = htonl(update);
1827
1828
0
  if (exp) {
1829
0
    size_t expire_sz = sizeof expire;
1830
1831
0
    if (*msg_cur + expire_sz > msg_end) {
1832
0
      TRACE_ERROR("malformed update message: wrong expiration size", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
1833
0
      goto malformed_exit;
1834
0
    }
1835
1836
0
    memcpy(&expire, *msg_cur, expire_sz);
1837
0
    *msg_cur += expire_sz;
1838
0
    expire = ntohl(expire);
1839
    /* Protocol contains expire in MS, check if value is less than table config */
1840
0
    if (expire > table->expire)
1841
0
      expire = table->expire;
1842
    /* the rest of the code considers expire as ticks and not MS */
1843
0
    expire = MS_TO_TICKS(expire);
1844
0
  }
1845
1846
0
  newts = stksess_new(table, NULL);
1847
0
  if (!newts) {
1848
0
    TRACE_PROTO("ignore update message: failed to get a new sticky session", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_UPDATE, appctx, p, st);
1849
0
    goto ignore_msg;
1850
0
  }
1851
1852
0
  if (table->type == SMP_T_STR) {
1853
0
    unsigned int to_read, to_store;
1854
1855
0
    to_read = intdecode(msg_cur, msg_end);
1856
0
    if (!*msg_cur) {
1857
0
      TRACE_ERROR("malformed update message: invalid string length", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
1858
0
      goto malformed_free_newts;
1859
0
    }
1860
1861
0
    to_store = MIN(to_read, table->key_size - 1);
1862
0
    if (*msg_cur + to_store > msg_end) {
1863
0
      TRACE_ERROR("malformed update message: invalid string (too big)", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
1864
0
      goto malformed_free_newts;
1865
0
    }
1866
1867
0
    keylen = to_store;
1868
0
    memcpy(newts->key.key, *msg_cur, keylen);
1869
0
    newts->key.key[keylen] = 0;
1870
0
    *msg_cur += to_read;
1871
0
  }
1872
0
  else if (table->type == SMP_T_SINT) {
1873
0
    unsigned int netinteger;
1874
1875
0
    if (*msg_cur + sizeof(netinteger) > msg_end) {
1876
0
      TRACE_ERROR("malformed update message: invalid integer (too big)", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
1877
0
      goto malformed_free_newts;
1878
0
    }
1879
1880
0
    keylen = sizeof(netinteger);
1881
0
    memcpy(&netinteger, *msg_cur, keylen);
1882
0
    netinteger = ntohl(netinteger);
1883
0
    memcpy(newts->key.key, &netinteger, keylen);
1884
0
    *msg_cur += keylen;
1885
0
  }
1886
0
  else {
1887
0
    if (*msg_cur + table->key_size > msg_end) {
1888
0
      TRACE_ERROR("malformed update message: invalid key (too big)", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
1889
0
      goto malformed_free_newts;
1890
0
    }
1891
1892
0
    keylen = table->key_size;
1893
0
    memcpy(newts->key.key, *msg_cur, keylen);
1894
0
    *msg_cur += keylen;
1895
0
  }
1896
1897
0
  newts->shard = stktable_get_key_shard(table, newts->key.key, keylen);
1898
1899
  /* lookup for existing entry */
1900
0
  ts = stktable_set_entry(table, newts);
1901
0
  if (ts != newts) {
1902
0
    stksess_free(table, newts);
1903
0
    newts = NULL;
1904
0
  }
1905
1906
0
  msg_save = *msg_cur;
1907
1908
0
 update_wts:
1909
1910
0
  HA_RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock);
1911
1912
0
  for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) {
1913
0
    uint64_t decoded_int;
1914
0
    unsigned int idx;
1915
0
    int ignore = 0;
1916
1917
0
    if (!((1ULL << data_type) & st->remote_data))
1918
0
      continue;
1919
1920
    /* We shouldn't learn local-only values unless the table is
1921
     * considered as "recv-only". Also, when handling the write_to
1922
     * table we must ignore types that can be processed so we don't
1923
     * interfere with any potential arithmetic logic performed on
1924
     * them (ie: cumulative counters).
1925
     */
1926
0
    if ((stktable_data_types[data_type].is_local &&
1927
0
         !(table->flags & STK_FL_RECV_ONLY)) ||
1928
0
        (table != st->table && !stktable_data_types[data_type].as_is))
1929
0
      ignore = 1;
1930
1931
0
    if (stktable_data_types[data_type].is_array) {
1932
      /* in case of array all elements
1933
       * use the same std_type and they
1934
       * are linearly encoded.
1935
       * The number of elements was provided
1936
       * by table definition message
1937
       */
1938
0
      switch (stktable_data_types[data_type].std_type) {
1939
0
      case STD_T_SINT:
1940
0
        for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
1941
0
          decoded_int = intdecode(msg_cur, msg_end);
1942
0
          if (!*msg_cur) {
1943
0
            TRACE_ERROR("malformed update message: invalid integer data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
1944
0
            goto malformed_unlock;
1945
0
          }
1946
1947
0
          data_ptr = stktable_data_ptr_idx(table, ts, data_type, idx);
1948
0
          if (data_ptr && !ignore)
1949
0
            stktable_data_cast(data_ptr, std_t_sint) = decoded_int;
1950
0
        }
1951
0
        break;
1952
0
      case STD_T_UINT:
1953
0
        for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
1954
0
          decoded_int = intdecode(msg_cur, msg_end);
1955
0
          if (!*msg_cur) {
1956
0
            TRACE_ERROR("malformed update message: invalid unsigned integer data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
1957
0
            goto malformed_unlock;
1958
0
          }
1959
1960
0
          data_ptr = stktable_data_ptr_idx(table, ts, data_type, idx);
1961
0
          if (data_ptr && !ignore)
1962
0
            stktable_data_cast(data_ptr, std_t_uint) = decoded_int;
1963
0
        }
1964
0
        break;
1965
0
      case STD_T_ULL:
1966
0
        for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
1967
0
          decoded_int = intdecode(msg_cur, msg_end);
1968
0
          if (!*msg_cur) {
1969
0
            TRACE_ERROR("malformed update message: invalid unsigned long data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
1970
0
            goto malformed_unlock;
1971
0
          }
1972
1973
0
          data_ptr = stktable_data_ptr_idx(table, ts, data_type, idx);
1974
0
          if (data_ptr && !ignore)
1975
0
            stktable_data_cast(data_ptr, std_t_ull) = decoded_int;
1976
0
        }
1977
0
        break;
1978
0
      case STD_T_FRQP:
1979
0
        for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) {
1980
0
          struct freq_ctr data;
1981
1982
          /* First bit is reserved for the freq_ctr lock
1983
           * Note: here we're still protected by the stksess lock
1984
           * so we don't need to update the update the freq_ctr
1985
           * using its internal lock.
1986
           */
1987
1988
0
          decoded_int = intdecode(msg_cur, msg_end);
1989
0
          if (!*msg_cur) {
1990
0
            TRACE_ERROR("malformed update message: invalid freq_ctr data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
1991
            /* TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p); */
1992
0
            goto malformed_unlock;
1993
0
          }
1994
1995
0
          data.curr_tick = tick_add(now_ms, -decoded_int) & ~0x1;
1996
0
          data.curr_ctr = intdecode(msg_cur, msg_end);
1997
0
          if (!*msg_cur) {
1998
0
            TRACE_ERROR("malformed update message: invalid freq_ctr data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
1999
0
            goto malformed_unlock;
2000
0
          }
2001
2002
0
          data.prev_ctr = intdecode(msg_cur, msg_end);
2003
0
          if (!*msg_cur) {
2004
0
            TRACE_ERROR("malformed update message: invalid freq_ctr data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
2005
0
            goto malformed_unlock;
2006
0
          }
2007
2008
0
          data_ptr = stktable_data_ptr_idx(table, ts, data_type, idx);
2009
0
          if (data_ptr && !ignore)
2010
0
            stktable_data_cast(data_ptr, std_t_frqp) = data;
2011
0
        }
2012
0
        break;
2013
0
      }
2014
2015
      /* array is fully decoded
2016
       * proceed next data_type.
2017
       */
2018
0
      continue;
2019
0
    }
2020
0
    decoded_int = intdecode(msg_cur, msg_end);
2021
0
    if (!*msg_cur) {
2022
0
      TRACE_ERROR("malformed update message: invalid data value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
2023
0
      goto malformed_unlock;
2024
0
    }
2025
2026
0
    switch (stktable_data_types[data_type].std_type) {
2027
0
    case STD_T_SINT:
2028
0
      data_ptr = stktable_data_ptr(table, ts, data_type);
2029
0
      if (data_ptr && !ignore)
2030
0
        stktable_data_cast(data_ptr, std_t_sint) = decoded_int;
2031
0
      break;
2032
2033
0
    case STD_T_UINT:
2034
0
      data_ptr = stktable_data_ptr(table, ts, data_type);
2035
0
      if (data_ptr && !ignore)
2036
0
        stktable_data_cast(data_ptr, std_t_uint) = decoded_int;
2037
0
      break;
2038
2039
0
    case STD_T_ULL:
2040
0
      data_ptr = stktable_data_ptr(table, ts, data_type);
2041
0
      if (data_ptr && !ignore)
2042
0
        stktable_data_cast(data_ptr, std_t_ull) = decoded_int;
2043
0
      break;
2044
2045
0
    case STD_T_FRQP: {
2046
0
      struct freq_ctr data;
2047
2048
      /* First bit is reserved for the freq_ctr lock
2049
      Note: here we're still protected by the stksess lock
2050
      so we don't need to update the update the freq_ctr
2051
      using its internal lock.
2052
      */
2053
2054
0
      data.curr_tick = tick_add(now_ms, -decoded_int) & ~0x1;
2055
0
      data.curr_ctr = intdecode(msg_cur, msg_end);
2056
0
      if (!*msg_cur) {
2057
0
        TRACE_ERROR("malformed update message: invalid freq_ctr value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
2058
0
        goto malformed_unlock;
2059
0
      }
2060
2061
0
      data.prev_ctr = intdecode(msg_cur, msg_end);
2062
0
      if (!*msg_cur) {
2063
0
        TRACE_ERROR("malformed update message: invalid freq_ctr value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
2064
0
        goto malformed_unlock;
2065
0
      }
2066
2067
0
      data_ptr = stktable_data_ptr(table, ts, data_type);
2068
0
      if (data_ptr && !ignore)
2069
0
        stktable_data_cast(data_ptr, std_t_frqp) = data;
2070
0
      break;
2071
0
    }
2072
0
    case STD_T_DICT: {
2073
0
      struct buffer *chunk;
2074
0
      size_t data_len, value_len;
2075
0
      unsigned int id;
2076
0
      struct dict_entry *de;
2077
0
      struct dcache *dc;
2078
0
      char *end;
2079
2080
0
      if (!decoded_int) {
2081
        /* No entry. */
2082
0
        break;
2083
0
      }
2084
0
      data_len = decoded_int;
2085
0
      if (*msg_cur + data_len > msg_end) {
2086
0
        TRACE_ERROR("malformed update message: invalid dict value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
2087
0
        goto malformed_unlock;
2088
0
      }
2089
2090
      /* Compute the end of the current data, <msg_end> being at the end of
2091
       * the entire message.
2092
       */
2093
0
      end = *msg_cur + data_len;
2094
0
      id = intdecode(msg_cur, end);
2095
0
      if (!*msg_cur || !id) {
2096
0
        TRACE_ERROR("malformed update message: invalid dict value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
2097
0
        goto malformed_unlock;
2098
0
      }
2099
2100
0
      dc = p->dcache;
2101
0
      if (*msg_cur == end) {
2102
        /* Dictionary entry key without value. */
2103
0
        if (id > dc->max_entries) {
2104
0
          TRACE_ERROR("malformed update message: invalid dict value", PEERS_EV_SESS_IO|PEERS_EV_PROTO_ERR, appctx, p, st);
2105
0
          goto malformed_unlock;
2106
0
        }
2107
        /* IDs sent over the network are numbered from 1. */
2108
0
        de = dc->rx[id - 1].de;
2109
0
      }
2110
0
      else {
2111
0
        chunk = get_trash_chunk();
2112
0
        value_len = intdecode(msg_cur, end);
2113
0
        if (!*msg_cur || *msg_cur + value_len > end ||
2114
0
          unlikely(value_len + 1 >= chunk->size)) {
2115
0
          TRACE_ERROR("malformed update message: invalid dict value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
2116
0
          goto malformed_unlock;
2117
0
        }
2118
2119
0
        chunk_memcpy(chunk, *msg_cur, value_len);
2120
0
        chunk->area[chunk->data] = '\0';
2121
0
        *msg_cur += value_len;
2122
2123
0
        de = dict_insert(&server_key_dict, chunk->area);
2124
0
        dict_entry_unref(&server_key_dict, dc->rx[id - 1].de);
2125
0
        dc->rx[id - 1].de = de;
2126
0
      }
2127
0
      if (de) {
2128
0
        data_ptr = stktable_data_ptr(table, ts, data_type);
2129
0
        if (data_ptr && !ignore) {
2130
0
          HA_ATOMIC_INC(&de->refcount);
2131
0
          stktable_data_cast(data_ptr, std_t_dict) = de;
2132
0
        }
2133
0
      }
2134
0
      break;
2135
0
    }
2136
0
    }
2137
0
  }
2138
2139
0
  if (st->table->write_to.t && table != st->table->write_to.t) {
2140
0
    struct stktable_key stkey = { .key = ts->key.key, .key_len = keylen };
2141
2142
    /* While we're still under the main ts lock, try to get related
2143
     * write_to stksess with main ts key
2144
     */
2145
0
    wts = stktable_get_entry(st->table->write_to.t, &stkey);
2146
0
  }
2147
2148
  /* Force new expiration */
2149
0
  ts->expire = tick_add(now_ms, expire);
2150
2151
0
  HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
2152
2153
  /* we MUST NOT dec the refcnt yet because stktable_trash_oldest() or
2154
   * process_table_expire() could execute between the two next lines.
2155
   */
2156
0
  stktable_touch_remote(table, ts, 0);
2157
2158
  /* Entry was just learned from a peer, we want to notify this peer
2159
   * if we happen to modify it. Thus let's consider at least one
2160
   * peer has seen the update (ie: the peer that sent us the update)
2161
   */
2162
0
  HA_ATOMIC_STORE(&ts->seen, 1);
2163
2164
  /* only now we can decrement the refcnt */
2165
0
  HA_ATOMIC_DEC(&ts->ref_cnt);
2166
2167
0
  if (wts) {
2168
    /* Start over the message decoding for wts as we got a valid stksess
2169
     * for write_to table, so we need to refresh the entry with supported
2170
     * values.
2171
     *
2172
     * We prefer to do the decoding a second time even though it might
2173
     * cost a bit more than copying from main ts to wts, but doing so
2174
     * enables us to get rid of main ts lock: we only need the wts lock
2175
     * since upstream data is still available in msg_cur
2176
     */
2177
0
    ts = wts;
2178
0
    table = st->table->write_to.t;
2179
0
    wts = NULL; /* so we don't get back here */
2180
0
    *msg_cur = msg_save;
2181
0
    goto update_wts;
2182
0
  }
2183
2184
0
  TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_UPDATE, appctx, p, NULL, NULL,
2185
0
         "Update message successfully processed (table=%s, updateid=%u)", st->table->id, st->last_get);
2186
2187
0
 ignore_msg:
2188
0
  TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_UPDATE, appctx, p, st);
2189
0
  return 1;
2190
2191
0
 malformed_unlock:
2192
  /* malformed message */
2193
0
  HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock);
2194
0
  stktable_touch_remote(st->table, ts, 1);
2195
0
  goto malformed_exit;
2196
2197
0
 malformed_free_newts:
2198
  /* malformed message */
2199
0
  stksess_free(st->table, newts);
2200
0
 malformed_exit:
2201
0
  appctx->st0 = PEER_SESS_ST_ERRPROTO;
2202
0
  TRACE_DEVEL("leaving in error", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st);
2203
0
  return 0;
2204
0
}
2205
2206
/*
2207
 * Function used to parse a stick-table update acknowledgement message after it
2208
 * has been received by <p> peer with <msg_cur> as address of the pointer to the position in the
2209
 * receipt buffer with <msg_end> being the position of the end of the stick-table message.
2210
 * Update <msg_curr> accordingly to the peer protocol specs if no peer protocol error
2211
 * was encountered.
2212
 * Return 1 if succeeded, 0 if not with the appctx state st0 set to PEER_SESS_ST_ERRPROTO.
2213
 */
2214
static inline int peer_treat_ackmsg(struct appctx *appctx, struct peer *p,
2215
                                    char **msg_cur, char *msg_end)
2216
0
{
2217
  /* ack message */
2218
0
  uint32_t table_id ;
2219
0
  uint32_t update;
2220
0
  struct shared_table *st = NULL;
2221
0
  int ret = 1;
2222
2223
0
  TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, p);
2224
  /* ignore ack during teaching process */
2225
0
  if (p->flags & PEER_F_TEACH_PROCESS) {
2226
0
    TRACE_DEVEL("Ignore ack during teaching process", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, p);
2227
0
    goto end;
2228
0
  }
2229
2230
0
  table_id = intdecode(msg_cur, msg_end);
2231
0
  if (!*msg_cur || (*msg_cur + sizeof(update) > msg_end)) {
2232
0
    TRACE_ERROR("malformed ackk message: no table id", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
2233
0
    appctx->st0 = PEER_SESS_ST_ERRPROTO;
2234
0
    ret = 0;
2235
0
    goto end;
2236
0
  }
2237
2238
0
  memcpy(&update, *msg_cur, sizeof(update));
2239
0
  update = ntohl(update);
2240
0
  for (st = p->tables; st; st = st->next) {
2241
0
    if (st->local_id == table_id) {
2242
0
      st->update = update;
2243
0
      TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_ACK, appctx, p, NULL, NULL,
2244
0
             "Ack message successfully process (table=%s, updateid=%u)", st->table->id, st->update);
2245
0
      break;
2246
0
    }
2247
0
  }
2248
2249
0
  end:
2250
0
  TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, p, st);
2251
0
  return ret;
2252
0
}
2253
2254
/*
2255
 * Function used to parse a stick-table switch message after it has been received
2256
 * by <p> peer with <msg_cur> as address of the pointer to the position in the
2257
 * receipt buffer with <msg_end> being the position of the end of the stick-table message.
2258
 * Update <msg_curr> accordingly to the peer protocol specs if no peer protocol error
2259
 * was encountered.
2260
 * Return 1 if succeeded, 0 if not with the appctx state st0 set to PEER_SESS_ST_ERRPROTO.
2261
 */
2262
static inline int peer_treat_switchmsg(struct appctx *appctx, struct peer *p,
2263
                                      char **msg_cur, char *msg_end)
2264
0
{
2265
0
  struct shared_table *st;
2266
0
  int table_id;
2267
2268
0
  TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_SWITCH, appctx, p);
2269
0
  table_id = intdecode(msg_cur, msg_end);
2270
0
  if (!*msg_cur) {
2271
0
    TRACE_ERROR("malformed table switch message: no table id", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
2272
0
    appctx->st0 = PEER_SESS_ST_ERRPROTO;
2273
0
    return 0;
2274
0
  }
2275
2276
0
  p->remote_table = NULL;
2277
0
  for (st = p->tables; st; st = st->next) {
2278
0
    if (st->remote_id == table_id) {
2279
0
      p->remote_table = st;
2280
0
      TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_SWITCH, appctx, p, NULL, NULL,
2281
0
             "table switch message successfully process (table=%s)", st->table->id);
2282
0
      break;
2283
0
    }
2284
0
  }
2285
2286
0
  TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_SWITCH, appctx, p, st);
2287
0
  return 1;
2288
0
}
2289
2290
/*
2291
 * Function used to parse a stick-table definition message after it has been received
2292
 * by <p> peer with <msg_cur> as address of the pointer to the position in the
2293
 * receipt buffer with <msg_end> being the position of the end of the stick-table message.
2294
 * Update <msg_curr> accordingly to the peer protocol specs if no peer protocol error
2295
 * was encountered.
2296
 * <totl> is the length of the stick-table update message computed upon receipt.
2297
 * Return 1 if succeeded, 0 if not with the appctx state st0 set to PEER_SESS_ST_ERRPROTO.
2298
 */
2299
static inline int peer_treat_definemsg(struct appctx *appctx, struct peer *p,
2300
                                      char **msg_cur, char *msg_end, int totl)
2301
0
{
2302
0
  int table_id_len;
2303
0
  struct shared_table *st;
2304
0
  int table_type;
2305
0
  int table_keylen;
2306
0
  int table_id;
2307
0
  uint64_t table_data;
2308
2309
0
  TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
2310
0
  table_id = intdecode(msg_cur, msg_end);
2311
0
  if (!*msg_cur) {
2312
0
    TRACE_ERROR("malformed table definition message: no table id", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
2313
0
    goto malformed_exit;
2314
0
  }
2315
2316
0
  table_id_len = intdecode(msg_cur, msg_end);
2317
0
  if (!*msg_cur) {
2318
0
    TRACE_ERROR("malformed table definition message: no table name length", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
2319
0
    goto malformed_exit;
2320
0
  }
2321
2322
0
  p->remote_table = NULL;
2323
0
  if (!table_id_len || (*msg_cur + table_id_len) >= msg_end) {
2324
0
    TRACE_ERROR("malformed table definition message: no table name", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
2325
0
    goto malformed_exit;
2326
0
  }
2327
2328
0
  for (st = p->tables; st; st = st->next) {
2329
    /* Reset IDs */
2330
0
    if (st->remote_id == table_id)
2331
0
      st->remote_id = 0;
2332
2333
0
    if (!p->remote_table && (table_id_len == strlen(st->table->nid)) &&
2334
0
        (memcmp(st->table->nid, *msg_cur, table_id_len) == 0))
2335
0
      p->remote_table = st;
2336
0
  }
2337
2338
0
  if (!p->remote_table) {
2339
0
    TRACE_PROTO("ignore table definition message: table not found", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
2340
0
    goto ignore_msg;
2341
0
  }
2342
2343
0
  *msg_cur += table_id_len;
2344
0
  if (*msg_cur >= msg_end) {
2345
0
    TRACE_ERROR("malformed table definition message: truncated message", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
2346
0
    goto malformed_exit;
2347
0
  }
2348
2349
0
  table_type = intdecode(msg_cur, msg_end);
2350
0
  if (!*msg_cur) {
2351
0
    TRACE_ERROR("malformed table definition message: no table type", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
2352
0
    goto malformed_exit;
2353
0
  }
2354
2355
0
  table_keylen = intdecode(msg_cur, msg_end);
2356
0
  if (!*msg_cur) {
2357
0
    TRACE_ERROR("malformed table definition message: no key length", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
2358
0
    goto malformed_exit;
2359
0
  }
2360
2361
0
  table_data = intdecode(msg_cur, msg_end);
2362
0
  if (!*msg_cur) {
2363
0
    TRACE_ERROR("malformed table definition message: no data type", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
2364
0
    goto malformed_exit;
2365
0
  }
2366
2367
0
  if (p->remote_table->table->type != peer_int_key_type[table_type]
2368
0
    || p->remote_table->table->key_size != table_keylen) {
2369
0
    p->remote_table = NULL;
2370
0
    TRACE_PROTO("ignore table definition message: no key/type match", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
2371
0
    goto ignore_msg;
2372
0
  }
2373
2374
  /* Check if there there is the additional expire data */
2375
0
  intdecode(msg_cur, msg_end);
2376
0
  if (*msg_cur) {
2377
0
    uint64_t data_type;
2378
0
    uint64_t type;
2379
2380
    /* This define contains the expire data so we consider
2381
     * it also contain all data_types parameters.
2382
     */
2383
0
    for (data_type = 0; data_type < STKTABLE_DATA_TYPES; data_type++) {
2384
0
      if (table_data & (1ULL << data_type)) {
2385
0
        if (stktable_data_types[data_type].is_array) {
2386
          /* This should be an array
2387
           * so we parse the data_type prefix
2388
           * because we must have parameters.
2389
           */
2390
0
          type = intdecode(msg_cur, msg_end);
2391
0
          if (!*msg_cur) {
2392
0
            p->remote_table = NULL;
2393
0
            TRACE_PROTO("ignore table definition message: missing meta data for array", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
2394
0
            goto ignore_msg;
2395
0
          }
2396
2397
          /* check if the data_type match the current from the bitfield */
2398
0
          if (type != data_type) {
2399
0
            p->remote_table = NULL;
2400
0
            TRACE_PROTO("ignore table definition message: meta data mismatch type", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
2401
0
            goto ignore_msg;
2402
0
          }
2403
2404
          /* decode the nbelem of the array */
2405
0
          p->remote_table->remote_data_nbelem[type] = intdecode(msg_cur, msg_end);
2406
0
          if (!*msg_cur) {
2407
0
            p->remote_table = NULL;
2408
0
            TRACE_PROTO("ignore table definition message: missing array size meta data for array", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
2409
0
            goto ignore_msg;
2410
0
          }
2411
2412
          /* if it is an array of frqp, we must also have the period to decode */
2413
0
          if (stktable_data_types[data_type].std_type == STD_T_FRQP) {
2414
0
            intdecode(msg_cur, msg_end);
2415
0
            if (!*msg_cur) {
2416
0
              p->remote_table = NULL;
2417
0
              TRACE_PROTO("ignore table definition message: missing period for frqp", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
2418
0
              goto ignore_msg;
2419
0
            }
2420
0
          }
2421
0
        }
2422
0
        else if (stktable_data_types[data_type].std_type == STD_T_FRQP) {
2423
          /* This should be a std freq counter data_type
2424
           * so we parse the data_type prefix
2425
           * because we must have parameters.
2426
           */
2427
0
          type = intdecode(msg_cur, msg_end);
2428
0
          if (!*msg_cur) {
2429
0
            p->remote_table = NULL;
2430
0
            TRACE_PROTO("ignore table definition message: missing data for frqp", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
2431
0
            goto ignore_msg;
2432
0
          }
2433
2434
          /* check if the data_type match the current from the bitfield */
2435
0
          if (type != data_type) {
2436
0
            p->remote_table = NULL;
2437
0
            TRACE_PROTO("ignore table definition message: meta data mismatch", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
2438
0
            goto ignore_msg;
2439
0
          }
2440
2441
          /* decode the period */
2442
0
          intdecode(msg_cur, msg_end);
2443
0
          if (!*msg_cur) {
2444
0
            p->remote_table = NULL;
2445
0
            TRACE_PROTO("ignore table definition message: mismatch period for frqp", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
2446
0
            goto ignore_msg;
2447
0
          }
2448
0
        }
2449
0
      }
2450
0
    }
2451
0
  }
2452
0
  else {
2453
0
    uint64_t data_type;
2454
2455
    /* There is not additional data but
2456
     * array size parameter is mandatory to parse array
2457
     * so we consider an error if an array data_type is define
2458
     * but there is no additional data.
2459
     */
2460
0
    for (data_type = 0; data_type < STKTABLE_DATA_TYPES; data_type++) {
2461
0
      if (table_data & (1ULL << data_type)) {
2462
0
        if (stktable_data_types[data_type].is_array) {
2463
0
          p->remote_table = NULL;
2464
0
          TRACE_PROTO("ignore table definition message: missing array size meta data for array", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
2465
0
          goto ignore_msg;
2466
0
        }
2467
0
      }
2468
0
    }
2469
0
  }
2470
2471
0
  p->remote_table->remote_data = table_data;
2472
0
  p->remote_table->remote_id = table_id;
2473
2474
0
  TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_DEF, appctx, p, NULL, NULL,
2475
0
         "table definition message successfully process (table=%s)", p->remote_table->table->id);
2476
2477
0
 ignore_msg:
2478
0
  TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p);
2479
0
  return 1;
2480
2481
0
 malformed_exit:
2482
  /* malformed message */
2483
0
  appctx->st0 = PEER_SESS_ST_ERRPROTO;
2484
0
  TRACE_DEVEL("leaving in error", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p);
2485
0
  return 0;
2486
0
}
2487
2488
/*
2489
 * Receive a stick-table message or pre-parse any other message.
2490
 * The message's header will be sent into <msg_head> which must be at least
2491
 * <msg_head_sz> bytes long (at least 7 to store 32-bit variable lengths).
2492
 * The first two bytes are always read, and the rest is only read if the
2493
 * first bytes indicate a stick-table message. If the message is a stick-table
2494
 * message, the varint is decoded and the equivalent number of bytes will be
2495
 * copied into the trash at trash.area. <totl> is incremented by the number of
2496
 * bytes read EVEN IN CASE OF INCOMPLETE MESSAGES.
2497
 * Returns 1 if there was no error, if not, returns 0 if not enough data were available,
2498
 * -1 if there was an error updating the appctx state st0 accordingly.
2499
 */
2500
static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t msg_head_sz,
2501
                                uint32_t *msg_len, int *totl)
2502
0
{
2503
0
  int reql;
2504
0
  char *cur;
2505
2506
0
  TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx);
2507
2508
0
  reql = applet_getblk(appctx, msg_head, 2 * sizeof(char), *totl);
2509
0
  if (reql <= 0) /* closed or EOL not found */
2510
0
    goto incomplete;
2511
2512
0
  *totl += reql;
2513
2514
0
  if (!(msg_head[1] & PEER_MSG_STKT_BIT_MASK))
2515
0
    return 1;
2516
2517
  /* This is a stick-table message, let's go on */
2518
2519
  /* Read and Decode message length */
2520
0
  msg_head    += *totl;
2521
0
  msg_head_sz -= *totl;
2522
0
  reql = applet_input_data(appctx) - *totl;
2523
0
  if (reql > msg_head_sz)
2524
0
    reql = msg_head_sz;
2525
2526
0
  reql = applet_getblk(appctx, msg_head, reql, *totl);
2527
0
  if (reql <= 0) /* closed */
2528
0
    goto incomplete;
2529
2530
0
  cur = msg_head;
2531
0
  *msg_len = intdecode(&cur, cur + reql);
2532
0
  if (!cur) {
2533
    /* the number is truncated, did we read enough ? */
2534
0
    if (reql < msg_head_sz)
2535
0
      goto incomplete;
2536
2537
    /* malformed message */
2538
0
    TRACE_PROTO("malformed message: bad message length encoding", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx);
2539
0
    appctx->st0 = PEER_SESS_ST_ERRPROTO;
2540
0
    return -1;
2541
0
  }
2542
0
  *totl += cur - msg_head;
2543
2544
  /* Read message content */
2545
0
  if (*msg_len) {
2546
0
    if (*msg_len > trash.size) {
2547
      /* Status code is not success, abort */
2548
0
      appctx->st0 = PEER_SESS_ST_ERRSIZE;
2549
0
      TRACE_PROTO("malformed message: too large length encoding", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx);
2550
0
      return -1;
2551
0
    }
2552
2553
0
    reql = applet_getblk(appctx, trash.area, *msg_len, *totl);
2554
0
    if (reql <= 0) /* closed */
2555
0
      goto incomplete;
2556
0
    *totl += reql;
2557
0
  }
2558
2559
0
  TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx);
2560
0
  return 1;
2561
2562
0
 incomplete:
2563
0
  if (reql < 0 || se_fl_test(appctx->sedesc, SE_FL_SHW)) {
2564
    /* there was an error or the message was truncated */
2565
0
    appctx->st0 = PEER_SESS_ST_END;
2566
0
    TRACE_ERROR("error or messafe truncated", PEERS_EV_SESS_IO|PEERS_EV_RX_ERR, appctx);
2567
0
    return -1;
2568
0
  }
2569
2570
0
  TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx);
2571
0
  return 0;
2572
0
}
2573
2574
/*
2575
 * Treat the awaited message with <msg_head> as header.*
2576
 * Return 1 if succeeded, 0 if not.
2577
 */
2578
static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *peer, unsigned char *msg_head,
2579
                                         char **msg_cur, char *msg_end, int msg_len, int totl)
2580
0
{
2581
0
  struct peers *peers = peer->peers;
2582
2583
0
  TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx, peer);
2584
2585
0
  if (msg_head[0] == PEER_MSG_CLASS_CONTROL) {
2586
0
    if (msg_head[1] == PEER_MSG_CTRL_RESYNCREQ) {
2587
0
      struct shared_table *st;
2588
      /* Reset message: remote need resync */
2589
0
      TRACE_PROTO("Resync request message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
2590
      /* prepare tables for a global push */
2591
0
      for (st = peer->tables; st; st = st->next) {
2592
0
        st->teaching_origin = st->last_pushed = st->update;
2593
0
        st->flags = 0;
2594
0
      }
2595
2596
      /* reset teaching flags to 0 */
2597
0
      peer->flags &= ~PEER_TEACH_FLAGS;
2598
2599
      /* flag to start to teach lesson */
2600
0
      peer->flags |= (PEER_F_TEACH_PROCESS|PEER_F_DBG_RESYNC_REQUESTED);
2601
0
      TRACE_STATE("peer elected to teach leasson to remote peer", PEERS_EV_SESS_RESYNC|PEERS_EV_PROTO_CTRL, appctx, peer);
2602
0
    }
2603
0
    else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) {
2604
0
      TRACE_PROTO("Full resync finished message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
2605
0
      if (peer->learnstate == PEER_LR_ST_PROCESSING) {
2606
0
        peer->learnstate = PEER_LR_ST_FINISHED;
2607
0
        peer->flags |= PEER_F_WAIT_SYNCTASK_ACK;
2608
0
        task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
2609
0
        TRACE_STATE("Full resync finished", PEERS_EV_SESS_RESYNC|PEERS_EV_PROTO_CTRL, appctx, peer);
2610
0
      }
2611
0
      peer->confirm++;
2612
0
    }
2613
0
    else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) {
2614
0
      TRACE_PROTO("Partial resync finished message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
2615
0
      if (peer->learnstate == PEER_LR_ST_PROCESSING) {
2616
0
        peer->learnstate = PEER_LR_ST_FINISHED;
2617
0
        peer->flags |= (PEER_F_LEARN_NOTUP2DATE|PEER_F_WAIT_SYNCTASK_ACK);
2618
0
        task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
2619
0
        TRACE_STATE("partial resync finished", PEERS_EV_SESS_RESYNC|PEERS_EV_PROTO_CTRL, appctx, peer);
2620
0
      }
2621
0
      peer->confirm++;
2622
0
    }
2623
0
    else if (msg_head[1] == PEER_MSG_CTRL_RESYNCCONFIRM)  {
2624
0
      struct shared_table *st;
2625
2626
0
      TRACE_PROTO("Resync confirm message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
2627
      /* If stopping state */
2628
0
      if (stopping) {
2629
        /* Close session, push resync no more needed */
2630
0
        peer->flags |= PEER_F_LOCAL_TEACH_COMPLETE;
2631
0
        appctx->st0 = PEER_SESS_ST_END;
2632
0
        TRACE_STATE("process stopping, stop any resync", PEERS_EV_SESS_RESYNC|PEERS_EV_PROTO_CTRL, appctx, peer);
2633
0
        return 0;
2634
0
      }
2635
0
      for (st = peer->tables; st; st = st->next) {
2636
0
        st->update = st->last_pushed = st->teaching_origin;
2637
0
        st->flags = 0;
2638
0
      }
2639
2640
      /* reset teaching flags to 0 */
2641
0
      peer->flags &= ~PEER_TEACH_FLAGS;
2642
0
      TRACE_STATE("Stop teaching", PEERS_EV_SESS_RESYNC|PEERS_EV_PROTO_CTRL, appctx, peer);
2643
0
    }
2644
0
    else if (msg_head[1] == PEER_MSG_CTRL_HEARTBEAT) {
2645
0
      TRACE_PROTO("Heartbeat message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
2646
0
      peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
2647
0
      peer->rx_hbt++;
2648
0
    }
2649
0
  }
2650
0
  else if (msg_head[0] == PEER_MSG_CLASS_STICKTABLE) {
2651
0
    if (msg_head[1] == PEER_MSG_STKT_DEFINE) {
2652
0
      TRACE_PROTO("Table definition message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
2653
0
      if (!peer_treat_definemsg(appctx, peer, msg_cur, msg_end, totl))
2654
0
        return 0;
2655
0
    }
2656
0
    else if (msg_head[1] == PEER_MSG_STKT_SWITCH) {
2657
0
      TRACE_PROTO("Table switch message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer);
2658
0
      if (!peer_treat_switchmsg(appctx, peer, msg_cur, msg_end))
2659
0
        return 0;
2660
0
    }
2661
0
    else if (msg_head[1] == PEER_MSG_STKT_UPDATE ||
2662
0
             msg_head[1] == PEER_MSG_STKT_INCUPDATE ||
2663
0
             msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED ||
2664
0
             msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) {
2665
0
      int update, expire;
2666
2667
0
      TRACE_PROTO("Update message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_UPDATE, appctx, peer);
2668
0
      update = msg_head[1] == PEER_MSG_STKT_UPDATE || msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED;
2669
0
      expire = msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED || msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED;
2670
0
      if (!peer_treat_updatemsg(appctx, peer, update, expire,
2671
0
                                msg_cur, msg_end, msg_len, totl))
2672
0
        return 0;
2673
2674
0
    }
2675
0
    else if (msg_head[1] == PEER_MSG_STKT_ACK) {
2676
0
      TRACE_PROTO("Ack message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, peer);
2677
0
      if (!peer_treat_ackmsg(appctx, peer, msg_cur, msg_end))
2678
0
        return 0;
2679
0
    }
2680
0
  }
2681
0
  else if (msg_head[0] == PEER_MSG_CLASS_RESERVED) {
2682
0
    appctx->st0 = PEER_SESS_ST_ERRPROTO;
2683
0
    TRACE_PROTO("malformed message: reserved", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, peer);
2684
0
    return 0;
2685
0
  }
2686
2687
0
  TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx, peer);
2688
0
  return 1;
2689
0
}
2690
2691
2692
/*
2693
 * Send any message to <peer> peer.
2694
 * Returns 1 if succeeded, or -1 or 0 if failed.
2695
 * -1 means an internal error occurred, 0 is for a peer protocol error leading
2696
 * to a peer state change (from the peer I/O handler point of view).
2697
 *
2698
 *   - peer->last_local_table is the last table for which we send an update
2699
 *                            messages.
2700
 *
2701
 *   - peer->stop_local_table is the last evaluated table. It is unset when the
2702
 *                            teaching process starts. But we use it as a
2703
 *                            restart point when the loop is interrupted. It is
2704
 *                            especially useful when the number of tables exceeds
2705
 *                            peers_max_updates_at_once value.
2706
 *
2707
 * When a teaching lopp is started, the peer's last_local_table is saved in a
2708
 * local variable. This variable is used as a finish point. When the crrent
2709
 * table is equal to it, it means all tables were evaluated, all updates where
2710
 * sent and the teaching process is finished.
2711
 *
2712
 * peer->stop_local_table is always NULL when the teaching process begins. It is
2713
 * only reset at the end. In the mean time, it always point on a table.
2714
 */
2715
2716
int peer_send_msgs(struct appctx *appctx,
2717
                   struct peer *peer, struct peers *peers)
2718
0
{
2719
0
  int repl = 1;
2720
2721
0
  TRACE_ENTER(PEERS_EV_SESS_IO, appctx, peer);
2722
2723
  /* Need to request a resync (only possible for a remote peer at this stage) */
2724
0
  if (peer->learnstate == PEER_LR_ST_ASSIGNED) {
2725
0
    BUG_ON(peer->local);
2726
0
    repl = peer_send_resync_reqmsg(appctx, peer, peers);
2727
0
    if (repl <= 0)
2728
0
      goto end;
2729
0
    peer->learnstate = PEER_LR_ST_PROCESSING;
2730
0
    TRACE_STATE("Start processing resync", PEERS_EV_SESS_IO|PEERS_EV_SESS_RESYNC, appctx, peer);
2731
0
  }
2732
2733
  /* Nothing to read, now we start to write */
2734
0
  if (peer->tables) {
2735
0
    struct shared_table *st;
2736
0
    struct shared_table *last_local_table;
2737
0
    int updates = 0;
2738
2739
0
    last_local_table = peer->last_local_table;
2740
0
    if (!last_local_table)
2741
0
      last_local_table = peer->tables;
2742
0
    if (!peer->stop_local_table)
2743
0
      peer->stop_local_table = last_local_table;
2744
0
    st = peer->stop_local_table->next;
2745
2746
0
    while (1) {
2747
0
      if (!st)
2748
0
        st = peer->tables;
2749
      /* It remains some updates to ack */
2750
0
      if (st->last_get != st->last_acked) {
2751
0
        repl = peer_send_ackmsg(st, appctx);
2752
0
        if (repl <= 0)
2753
0
          goto end;
2754
2755
0
        st->last_acked = st->last_get;
2756
0
        TRACE_PRINTF(TRACE_LEVEL_PROTO, PEERS_EV_PROTO_ACK, appctx, NULL, st, NULL,
2757
0
               "ack message sent (table=%s, updateid=%u)", st->table->id, st->last_acked);
2758
0
      }
2759
2760
0
      if (!(peer->flags & PEER_F_TEACH_PROCESS)) {
2761
0
        int must_send;
2762
2763
0
        if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock)) {
2764
0
          applet_have_more_data(appctx);
2765
0
          repl = -1;
2766
0
          goto end;
2767
0
        }
2768
0
        must_send = (peer->learnstate == PEER_LR_ST_NOTASSIGNED) && (st->last_pushed != st->table->localupdate);
2769
0
        HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
2770
2771
0
        if (must_send) {
2772
0
          repl = peer_send_teach_process_msgs(appctx, peer, st);
2773
0
          if (repl <= 0) {
2774
0
            peer->stop_local_table = peer->last_local_table;
2775
0
            goto end;
2776
0
          }
2777
0
        }
2778
0
      }
2779
0
      else if (!(peer->flags & PEER_F_TEACH_FINISHED)) {
2780
0
        if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) {
2781
0
          repl = peer_send_teach_stage1_msgs(appctx, peer, st);
2782
0
          if (repl <= 0) {
2783
0
            peer->stop_local_table = peer->last_local_table;
2784
0
            goto end;
2785
0
          }
2786
0
        }
2787
2788
0
        if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) {
2789
0
          repl = peer_send_teach_stage2_msgs(appctx, peer, st);
2790
0
          if (repl <= 0) {
2791
0
            peer->stop_local_table = peer->last_local_table;
2792
0
            goto end;
2793
0
          }
2794
0
        }
2795
0
      }
2796
2797
0
      if (st == last_local_table) {
2798
0
        peer->stop_local_table = NULL;
2799
0
        break;
2800
0
      }
2801
2802
      /* This one is to be sure to restart from <st->next> if we are interrupted
2803
       * because of peer_send_teach_stage2_msgs or because buffer is full
2804
       * when sedning an ackmsg. In both cases current <st> was evaluated and
2805
       * we must restart from <st->next>
2806
       */
2807
0
      peer->stop_local_table = st;
2808
2809
0
      updates++;
2810
0
      if (updates >= peers_max_updates_at_once) {
2811
0
        applet_have_more_data(appctx);
2812
0
        repl = -1;
2813
0
        goto end;
2814
0
      }
2815
2816
0
      st = st->next;
2817
0
    }
2818
0
  }
2819
2820
0
  if ((peer->flags & PEER_F_TEACH_PROCESS) && !(peer->flags & PEER_F_TEACH_FINISHED)) {
2821
0
    repl = peer_send_resync_finishedmsg(appctx, peer, peers);
2822
0
    if (repl <= 0)
2823
0
      goto end;
2824
2825
    /* flag finished message sent */
2826
0
    peer->flags |= PEER_F_TEACH_FINISHED;
2827
0
    TRACE_STATE("full/partial resync finished", PEERS_EV_SESS_IO|PEERS_EV_SESS_RESYNC, appctx, peer);
2828
0
  }
2829
2830
  /* Confirm finished or partial messages */
2831
0
  while (peer->confirm) {
2832
0
    repl = peer_send_resync_confirmsg(appctx, peer, peers);
2833
0
    if (repl <= 0)
2834
0
      goto end;
2835
0
    TRACE_STATE("Confirm resync is finished", PEERS_EV_SESS_IO|PEERS_EV_SESS_RESYNC, appctx, peer);
2836
0
    peer->confirm--;
2837
0
  }
2838
2839
0
  repl = 1;
2840
0
  end:
2841
0
  TRACE_LEAVE(PEERS_EV_SESS_IO, appctx, peer);
2842
0
  return repl;
2843
0
}
2844
2845
/*
2846
 * Read and parse a first line of a "hello" peer protocol message.
2847
 * Returns 0 if could not read a line, -1 if there was a read error or
2848
 * the line is malformed, 1 if succeeded.
2849
 */
2850
static inline int peer_getline_version(struct appctx *appctx,
2851
                                       unsigned int *maj_ver, unsigned int *min_ver)
2852
0
{
2853
0
  int reql;
2854
2855
0
  reql = peer_getline(appctx);
2856
0
  if (!reql)
2857
0
    return 0;
2858
2859
0
  if (reql < 0)
2860
0
    return -1;
2861
2862
  /* test protocol */
2863
0
  if (strncmp(PEER_SESSION_PROTO_NAME " ", trash.area, proto_len + 1) != 0) {
2864
0
    appctx->st0 = PEER_SESS_ST_EXIT;
2865
0
    appctx->st1 = PEER_SESS_SC_ERRPROTO;
2866
0
    TRACE_ERROR("protocol error: invalid version line", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx);
2867
0
    return -1;
2868
0
  }
2869
0
  if (peer_get_version(trash.area + proto_len + 1, maj_ver, min_ver) == -1 ||
2870
0
    *maj_ver != PEER_MAJOR_VER || *min_ver > PEER_MINOR_VER) {
2871
0
    appctx->st0 = PEER_SESS_ST_EXIT;
2872
0
    appctx->st1 = PEER_SESS_SC_ERRVERSION;
2873
0
    TRACE_ERROR("protocol error: invalid version", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx);
2874
0
    return -1;
2875
0
  }
2876
2877
0
  TRACE_DATA("version line received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_HELLO, appctx);
2878
0
  return 1;
2879
0
}
2880
2881
/*
2882
 * Read and parse a second line of a "hello" peer protocol message.
2883
 * Returns 0 if could not read a line, -1 if there was a read error or
2884
 * the line is malformed, 1 if succeeded.
2885
 */
2886
static inline int peer_getline_host(struct appctx *appctx)
2887
0
{
2888
0
  int reql;
2889
2890
0
  reql = peer_getline(appctx);
2891
0
  if (!reql)
2892
0
    return 0;
2893
2894
0
  if (reql < 0)
2895
0
    return -1;
2896
2897
  /* test hostname match */
2898
0
  if (strcmp(localpeer, trash.area) != 0) {
2899
0
    appctx->st0 = PEER_SESS_ST_EXIT;
2900
0
    appctx->st1 = PEER_SESS_SC_ERRHOST;
2901
0
    TRACE_ERROR("protocol error: wrong host", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx);
2902
0
    return -1;
2903
0
  }
2904
2905
0
  TRACE_DATA("host line received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_HELLO, appctx);
2906
0
  return 1;
2907
0
}
2908
2909
/*
2910
 * Read and parse a last line of a "hello" peer protocol message.
2911
 * Returns 0 if could not read a character, -1 if there was a read error or
2912
 * the line is malformed, 1 if succeeded.
2913
 * Set <curpeer> accordingly (the remote peer sending the "hello" message).
2914
 */
2915
static inline int peer_getline_last(struct appctx *appctx, struct peer **curpeer)
2916
0
{
2917
0
  char *p;
2918
0
  int reql;
2919
0
  struct peer *peer;
2920
0
  struct peers *peers = strm_fe(appctx_strm(appctx))->parent;
2921
2922
0
  reql = peer_getline(appctx);
2923
0
  if (!reql)
2924
0
    return 0;
2925
2926
0
  if (reql < 0)
2927
0
    return -1;
2928
2929
  /* parse line "<peer name> <pid> <relative_pid>" */
2930
0
  p = strchr(trash.area, ' ');
2931
0
  if (!p) {
2932
0
    appctx->st0 = PEER_SESS_ST_EXIT;
2933
0
    appctx->st1 = PEER_SESS_SC_ERRPROTO;
2934
0
    TRACE_ERROR("protocol error: invalid peer line", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx);
2935
0
    return -1;
2936
0
  }
2937
0
  *p = 0;
2938
2939
  /* lookup known peer */
2940
0
  for (peer = peers->remote; peer; peer = peer->next) {
2941
0
    if (strcmp(peer->id, trash.area) == 0)
2942
0
      break;
2943
0
  }
2944
2945
  /* if unknown peer */
2946
0
  if (!peer) {
2947
0
    appctx->st0 = PEER_SESS_ST_EXIT;
2948
0
    appctx->st1 = PEER_SESS_SC_ERRPEER;
2949
0
    TRACE_ERROR("protocol error: unknown peer", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx);
2950
0
    return -1;
2951
0
  }
2952
0
  *curpeer = peer;
2953
2954
0
  TRACE_DATA("peer line received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_HELLO, appctx, peer);
2955
0
  return 1;
2956
0
}
2957
2958
/*
2959
 * Init <peer> peer after validating a connection at peer protocol level. It may
2960
 * a incoming or outgoing connection. The peer init must be acknowledge by the
2961
 * sync task. Message processing is blocked in the meanwhile.
2962
 */
2963
static inline void init_connected_peer(struct peer *peer, struct peers *peers)
2964
0
{
2965
0
  struct shared_table *st;
2966
2967
0
  TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_SESS_NEW, NULL, peer);
2968
0
  peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
2969
2970
  /* Init cursors */
2971
0
  for (st = peer->tables; st ; st = st->next) {
2972
0
    st->last_get = st->last_acked = 0;
2973
0
    HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
2974
    /* if st->update appears to be in future it means
2975
     * that the last acked value is very old and we
2976
     * remain unconnected a too long time to use this
2977
     * acknowledgement as a reset.
2978
     * We should update the protocol to be able to
2979
     * signal the remote peer that it needs a full resync.
2980
     * Here a partial fix consist to set st->update at
2981
     * the max past value.
2982
     */
2983
0
    if ((int)(st->table->localupdate - st->update) < 0)
2984
0
      st->update = st->table->localupdate + (2147483648U);
2985
0
    st->teaching_origin = st->last_pushed = st->update;
2986
0
    st->flags = 0;
2987
2988
0
    HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock);
2989
0
  }
2990
2991
  /* Awake main task to ack the new peer state */
2992
0
  task_wakeup(peers->sync_task, TASK_WOKEN_MSG);
2993
2994
  /* Init confirm counter */
2995
0
  peer->confirm = 0;
2996
2997
        /* reset teaching flags to 0 */
2998
0
        peer->flags &= ~PEER_TEACH_FLAGS;
2999
3000
0
  if (peer->local && !(appctx_is_back(peer->appctx))) {
3001
    /* If the local peer has established the connection (appctx is
3002
     * on the frontend side), flag it to start to teach lesson.
3003
     */
3004
0
                peer->flags |= PEER_F_TEACH_PROCESS;
3005
0
    TRACE_STATE("peer elected to teach lesson to local peer", PEERS_EV_SESS_NEW|PEERS_EV_SESS_RESYNC, NULL, peer);
3006
0
  }
3007
3008
  /* Mark the peer as starting and wait the sync task */
3009
0
  peer->flags |= PEER_F_WAIT_SYNCTASK_ACK;
3010
0
  peer->appstate = PEER_APP_ST_STARTING;
3011
0
  TRACE_STATE("peer session starting", PEERS_EV_SESS_NEW, NULL, peer);
3012
0
  TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_SESS_NEW, NULL, peer);
3013
0
}
3014
3015
/*
3016
 * IO Handler to handle message exchange with a peer
3017
 */
3018
void peer_io_handler(struct appctx *appctx)
3019
0
{
3020
0
  struct peer *curpeer = NULL;
3021
0
  int reql = 0;
3022
0
  int repl = 0;
3023
0
  unsigned int maj_ver, min_ver;
3024
0
  int prev_state;
3025
0
  int msg_done = 0;
3026
3027
0
  TRACE_ENTER(PEERS_EV_SESS_IO, appctx);
3028
3029
0
  if (unlikely(applet_fl_test(appctx, APPCTX_FL_EOS|APPCTX_FL_ERROR))) {
3030
0
    applet_reset_input(appctx);
3031
0
    goto out;
3032
0
  }
3033
3034
  /* Check if the out buffer is available. */
3035
0
  if (!applet_get_outbuf(appctx)) {
3036
0
    applet_have_more_data(appctx);
3037
0
    goto out;
3038
0
  }
3039
3040
0
  while (1) {
3041
0
    prev_state = appctx->st0;
3042
0
switchstate:
3043
0
    maj_ver = min_ver = (unsigned int)-1;
3044
0
    switch(appctx->st0) {
3045
0
      case PEER_SESS_ST_ACCEPT:
3046
0
        prev_state = appctx->st0;
3047
0
        appctx->svcctx = NULL;
3048
0
        appctx->st0 = PEER_SESS_ST_GETVERSION;
3049
0
        __fallthrough;
3050
0
      case PEER_SESS_ST_GETVERSION:
3051
0
        prev_state = appctx->st0;
3052
0
        TRACE_STATE("get version line", PEERS_EV_SESS_IO, appctx);
3053
0
        reql = peer_getline_version(appctx, &maj_ver, &min_ver);
3054
0
        if (reql <= 0) {
3055
0
          if (!reql)
3056
0
            goto out;
3057
0
          goto switchstate;
3058
0
        }
3059
3060
0
        appctx->st0 = PEER_SESS_ST_GETHOST;
3061
0
        __fallthrough;
3062
0
      case PEER_SESS_ST_GETHOST:
3063
0
        prev_state = appctx->st0;
3064
0
        TRACE_STATE("get host line", PEERS_EV_SESS_IO, appctx);
3065
0
        reql = peer_getline_host(appctx);
3066
0
        if (reql <= 0) {
3067
0
          if (!reql)
3068
0
            goto out;
3069
0
          goto switchstate;
3070
0
        }
3071
3072
0
        appctx->st0 = PEER_SESS_ST_GETPEER;
3073
0
        __fallthrough;
3074
0
      case PEER_SESS_ST_GETPEER: {
3075
0
        prev_state = appctx->st0;
3076
0
        TRACE_STATE("get peer line", PEERS_EV_SESS_IO, appctx);
3077
0
        reql = peer_getline_last(appctx, &curpeer);
3078
0
        if (reql <= 0) {
3079
0
          if (!reql)
3080
0
            goto out;
3081
0
          goto switchstate;
3082
0
        }
3083
3084
0
        HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock);
3085
0
        if (curpeer->appctx && curpeer->appctx != appctx) {
3086
0
          if (curpeer->local) {
3087
            /* Local connection, reply a retry */
3088
0
            appctx->st0 = PEER_SESS_ST_EXIT;
3089
0
            appctx->st1 = PEER_SESS_SC_TRYAGAIN;
3090
0
            TRACE_STATE("local connection, retry", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer);
3091
0
            goto switchstate;
3092
0
          }
3093
3094
0
          TRACE_STATE("release old session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer);
3095
          /* we're killing a connection, we must apply a random delay before
3096
           * retrying otherwise the other end will do the same and we can loop
3097
           * for a while.
3098
           */
3099
0
          curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
3100
0
          peer_session_forceshutdown(curpeer);
3101
3102
0
          curpeer->heartbeat = TICK_ETERNITY;
3103
0
          curpeer->coll++;
3104
0
        }
3105
0
        if (maj_ver != (unsigned int)-1 && min_ver != (unsigned int)-1) {
3106
0
          if (min_ver == PEER_DWNGRD_MINOR_VER) {
3107
0
            curpeer->flags |= PEER_F_DWNGRD;
3108
0
          }
3109
0
          else {
3110
0
            curpeer->flags &= ~PEER_F_DWNGRD;
3111
0
          }
3112
0
        }
3113
0
        curpeer->appctx = appctx;
3114
0
        curpeer->flags |= PEER_F_ALIVE;
3115
0
        appctx->svcctx = curpeer;
3116
0
        appctx->st0 = PEER_SESS_ST_SENDSUCCESS;
3117
0
        _HA_ATOMIC_INC(&active_peers);
3118
0
      }
3119
0
      __fallthrough;
3120
0
      case PEER_SESS_ST_SENDSUCCESS: {
3121
0
        prev_state = appctx->st0;
3122
0
        if (!curpeer) {
3123
0
          curpeer = appctx->svcctx;
3124
0
          HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock);
3125
0
          if (curpeer->appctx != appctx) {
3126
0
            TRACE_STATE("release old session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer);
3127
0
            appctx->st0 = PEER_SESS_ST_END;
3128
0
            goto switchstate;
3129
0
          }
3130
0
        }
3131
3132
0
        TRACE_STATE("send success", PEERS_EV_SESS_IO, appctx, curpeer);
3133
0
        repl = peer_send_status_successmsg(appctx);
3134
0
        if (repl <= 0) {
3135
0
          if (repl == -1)
3136
0
            goto out;
3137
0
          goto switchstate;
3138
0
        }
3139
3140
        /* Register status code */
3141
0
        curpeer->statuscode = PEER_SESS_SC_SUCCESSCODE;
3142
0
        curpeer->last_hdshk = now_ms;
3143
3144
0
        init_connected_peer(curpeer, curpeer->peers);
3145
3146
        /* switch to waiting message state */
3147
0
        _HA_ATOMIC_INC(&connected_peers);
3148
0
        appctx->st0 = PEER_SESS_ST_WAITMSG;
3149
0
        TRACE_STATE("connected, now wait for messages", PEERS_EV_SESS_IO, appctx, curpeer);
3150
0
        goto switchstate;
3151
0
      }
3152
0
      case PEER_SESS_ST_CONNECT: {
3153
0
        prev_state = appctx->st0;
3154
0
        if (!curpeer) {
3155
0
          curpeer = appctx->svcctx;
3156
0
          HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock);
3157
0
          if (curpeer->appctx != appctx) {
3158
0
            TRACE_STATE("release old session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer);
3159
0
            appctx->st0 = PEER_SESS_ST_END;
3160
0
            goto switchstate;
3161
0
          }
3162
0
        }
3163
3164
0
        TRACE_STATE("send hello message", PEERS_EV_SESS_IO, appctx, curpeer);
3165
0
        repl = peer_send_hellomsg(appctx, curpeer);
3166
0
        if (repl <= 0) {
3167
0
          if (repl == -1)
3168
0
            goto out;
3169
0
          goto switchstate;
3170
0
        }
3171
3172
        /* switch to the waiting statuscode state */
3173
0
        appctx->st0 = PEER_SESS_ST_GETSTATUS;
3174
0
      }
3175
0
      __fallthrough;
3176
0
      case PEER_SESS_ST_GETSTATUS: {
3177
0
        prev_state = appctx->st0;
3178
0
        if (!curpeer) {
3179
0
          curpeer = appctx->svcctx;
3180
0
          HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock);
3181
0
          if (curpeer->appctx != appctx) {
3182
0
            TRACE_STATE("release old session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer);
3183
0
            appctx->st0 = PEER_SESS_ST_END;
3184
0
            goto switchstate;
3185
0
          }
3186
0
        }
3187
0
        curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE;
3188
0
        TRACE_STATE("get status", PEERS_EV_SESS_IO, appctx, curpeer);
3189
3190
0
        reql = peer_getline(appctx);
3191
0
        if (!reql)
3192
0
          goto out;
3193
3194
0
        if (reql < 0)
3195
0
          goto switchstate;
3196
3197
        /* Register status code */
3198
0
        curpeer->statuscode = atoi(trash.area);
3199
0
        curpeer->last_hdshk = now_ms;
3200
3201
        /* Awake main task */
3202
0
        task_wakeup(curpeer->peers->sync_task, TASK_WOKEN_MSG);
3203
3204
        /* If status code is success */
3205
0
        if (curpeer->statuscode == PEER_SESS_SC_SUCCESSCODE) {
3206
0
          init_connected_peer(curpeer, curpeer->peers);
3207
0
        }
3208
0
        else {
3209
0
          if (curpeer->statuscode == PEER_SESS_SC_ERRVERSION)
3210
0
            curpeer->flags |= PEER_F_DWNGRD;
3211
          /* Status code is not success, abort */
3212
0
          appctx->st0 = PEER_SESS_ST_END;
3213
0
          goto switchstate;
3214
0
        }
3215
0
        _HA_ATOMIC_INC(&connected_peers);
3216
0
        appctx->st0 = PEER_SESS_ST_WAITMSG;
3217
0
        TRACE_STATE("connected, now wait for messages", PEERS_EV_SESS_IO, appctx, curpeer);
3218
0
      }
3219
0
      __fallthrough;
3220
0
      case PEER_SESS_ST_WAITMSG: {
3221
0
        uint32_t msg_len = 0;
3222
0
        char *msg_cur = trash.area;
3223
0
        char *msg_end = trash.area;
3224
0
        unsigned char msg_head[7]; // 2 + 5 for varint32
3225
0
        int totl = 0;
3226
3227
0
        prev_state = appctx->st0;
3228
0
        if (!curpeer) {
3229
0
          curpeer = appctx->svcctx;
3230
0
          HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock);
3231
0
          if (curpeer->appctx != appctx) {
3232
0
            TRACE_STATE("release old session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer);
3233
0
            appctx->st0 = PEER_SESS_ST_END;
3234
0
            goto switchstate;
3235
0
          }
3236
0
        }
3237
3238
0
        if (curpeer->flags & PEER_F_WAIT_SYNCTASK_ACK) {
3239
0
          applet_wont_consume(appctx);
3240
0
          TRACE_STATE("peer is waiting for sync task", PEERS_EV_SESS_IO, appctx, curpeer);
3241
0
          goto out;
3242
0
        }
3243
3244
        /* check if we've already hit the rx limit (i.e. we've
3245
         * already gone through send_msgs and we don't want to
3246
         * process input messages again). We must absolutely
3247
         * leave via send_msgs otherwise we can leave the
3248
         * connection in a stuck state if acks are missing for
3249
         * example.
3250
         */
3251
0
        if (msg_done >= peers_max_updates_at_once) {
3252
0
          applet_have_more_data(appctx); // make sure to come back here
3253
0
          goto send_msgs;
3254
0
        }
3255
3256
0
        applet_will_consume(appctx);
3257
3258
        /* local peer is assigned of a lesson, start it */
3259
0
        if (curpeer->learnstate == PEER_LR_ST_ASSIGNED && curpeer->local) {
3260
0
          curpeer->learnstate = PEER_LR_ST_PROCESSING;
3261
0
          TRACE_STATE("peer starts to learn", PEERS_EV_SESS_IO, appctx, curpeer);
3262
0
        }
3263
3264
0
        reql = peer_recv_msg(appctx, (char *)msg_head, sizeof msg_head, &msg_len, &totl);
3265
0
        if (reql <= 0) {
3266
0
          if (reql == -1)
3267
0
            goto switchstate;
3268
0
          goto send_msgs;
3269
0
        }
3270
3271
0
        msg_end += msg_len;
3272
0
        if (!peer_treat_awaited_msg(appctx, curpeer, msg_head, &msg_cur, msg_end, msg_len, totl))
3273
0
          goto switchstate;
3274
3275
0
        curpeer->flags |= PEER_F_ALIVE;
3276
3277
        /* skip consumed message */
3278
0
        applet_skip_input(appctx, totl);
3279
3280
        /* make sure we don't process too many at once */
3281
0
        if (msg_done >= peers_max_updates_at_once)
3282
0
          goto send_msgs;
3283
0
        msg_done++;
3284
3285
        /* loop on that state to peek next message */
3286
0
        goto switchstate;
3287
3288
0
send_msgs:
3289
0
        if (curpeer->flags & PEER_F_HEARTBEAT) {
3290
0
          curpeer->flags &= ~PEER_F_HEARTBEAT;
3291
0
          repl = peer_send_heartbeatmsg(appctx, curpeer, curpeer->peers);
3292
0
          if (repl <= 0) {
3293
0
            if (repl == -1)
3294
0
              goto out;
3295
0
            goto switchstate;
3296
0
          }
3297
0
          curpeer->tx_hbt++;
3298
0
        }
3299
        /* we get here when a peer_recv_msg() returns 0 in reql */
3300
0
        repl = peer_send_msgs(appctx, curpeer, curpeer->peers);
3301
0
        if (repl <= 0) {
3302
0
          if (repl == -1)
3303
0
            goto out;
3304
0
          goto switchstate;
3305
0
        }
3306
3307
        /* noting more to do */
3308
0
        goto out;
3309
0
      }
3310
0
      case PEER_SESS_ST_EXIT:
3311
0
        if (prev_state == PEER_SESS_ST_WAITMSG)
3312
0
          _HA_ATOMIC_DEC(&connected_peers);
3313
0
        prev_state = appctx->st0;
3314
0
        TRACE_STATE("send status error message", PEERS_EV_SESS_IO|PEERS_EV_SESS_ERR, appctx, curpeer);
3315
0
        if (peer_send_status_errormsg(appctx) == -1)
3316
0
          goto out;
3317
0
        appctx->st0 = PEER_SESS_ST_END;
3318
0
        goto switchstate;
3319
0
      case PEER_SESS_ST_ERRSIZE: {
3320
0
        if (prev_state == PEER_SESS_ST_WAITMSG)
3321
0
          _HA_ATOMIC_DEC(&connected_peers);
3322
0
        prev_state = appctx->st0;
3323
0
        TRACE_STATE("send error size message", PEERS_EV_SESS_IO|PEERS_EV_SESS_ERR, appctx, curpeer);
3324
0
        if (peer_send_error_size_limitmsg(appctx) == -1)
3325
0
          goto out;
3326
0
        appctx->st0 = PEER_SESS_ST_END;
3327
0
        goto switchstate;
3328
0
      }
3329
0
      case PEER_SESS_ST_ERRPROTO: {
3330
0
        if (curpeer)
3331
0
          curpeer->proto_err++;
3332
0
        if (prev_state == PEER_SESS_ST_WAITMSG)
3333
0
          _HA_ATOMIC_DEC(&connected_peers);
3334
0
        prev_state = appctx->st0;
3335
0
        TRACE_STATE("send proto error message", PEERS_EV_SESS_IO|PEERS_EV_SESS_ERR, appctx, curpeer);
3336
0
        if (peer_send_error_protomsg(appctx) == -1)
3337
0
          goto out;
3338
0
        appctx->st0 = PEER_SESS_ST_END;
3339
0
        prev_state = appctx->st0;
3340
0
      }
3341
0
      __fallthrough;
3342
0
      case PEER_SESS_ST_END: {
3343
0
        if (prev_state == PEER_SESS_ST_WAITMSG)
3344
0
          _HA_ATOMIC_DEC(&connected_peers);
3345
0
        prev_state = appctx->st0;
3346
0
        TRACE_STATE("Terminate peer session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer);
3347
0
        if (curpeer) {
3348
0
          HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
3349
0
          curpeer = NULL;
3350
0
        }
3351
0
        applet_set_eos(appctx);
3352
0
        applet_reset_input(appctx);
3353
0
        goto out;
3354
0
      }
3355
0
    }
3356
0
  }
3357
0
out:
3358
  /* sc_opposite(sc)->flags |= SC_FL_RCV_ONCE; */
3359
3360
0
  if (curpeer)
3361
0
    HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock);
3362
3363
0
  TRACE_LEAVE(PEERS_EV_SESS_IO, appctx, curpeer);
3364
0
  return;
3365
0
}
3366
3367
static struct applet peer_applet = {
3368
  .obj_type = OBJ_TYPE_APPLET,
3369
  .flags = APPLET_FL_NEW_API,
3370
  .name = "<PEER>", /* used for logging */
3371
  .fct = peer_io_handler,
3372
  .rcv_buf = appctx_raw_rcv_buf,
3373
  .snd_buf = appctx_raw_snd_buf,
3374
  .init = peer_session_init,
3375
  .release = peer_session_release,
3376
};
3377
3378
3379
/*
3380
 * Use this function to force a close of a peer session
3381
 */
3382
static void peer_session_forceshutdown(struct peer *peer)
3383
0
{
3384
0
  struct appctx *appctx = peer->appctx;
3385
3386
  /* Note that the peer sessions which have just been created
3387
   * (->st0 == PEER_SESS_ST_CONNECT) must not
3388
   * be shutdown, if not, the TCP session will never be closed
3389
   * and stay in CLOSE_WAIT state after having been closed by
3390
   * the remote side.
3391
   */
3392
0
  if (!appctx || appctx->st0 == PEER_SESS_ST_CONNECT)
3393
0
    return;
3394
3395
0
  if (appctx->applet != &peer_applet)
3396
0
    return;
3397
3398
0
  TRACE_STATE("peer session shutdown", PEERS_EV_SESS_SHUT|PEERS_EV_SESS_END, appctx, peer);
3399
0
  __peer_session_deinit(peer);
3400
3401
0
  appctx->st0 = PEER_SESS_ST_END;
3402
0
  appctx_wakeup(appctx);
3403
0
}
3404
3405
/* Pre-configures a peers frontend to accept incoming connections */
3406
void peers_setup_frontend(struct proxy *fe)
3407
0
{
3408
0
  fe->mode = PR_MODE_PEERS;
3409
0
  fe->maxconn = 0;
3410
0
  fe->conn_retries = CONN_RETRIES; /* FIXME ignored since 91e785ed
3411
                                    * ("MINOR: stream: Rely on a per-stream max connection retries value")
3412
                                    * If this is really expected this should be set on the stream directly
3413
                                    * because the proxy is not part of the main proxy list and thus
3414
                                    * lacks the required post init for this setting to be considered
3415
                                    */
3416
0
  fe->timeout.connect = MS_TO_TICKS(1000);
3417
0
  fe->timeout.client = MS_TO_TICKS(5000);
3418
0
  fe->timeout.server = MS_TO_TICKS(5000);
3419
0
  fe->accept = frontend_accept;
3420
0
  fe->default_target = &peer_applet.obj_type;
3421
0
  fe->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC;
3422
0
}
3423
3424
/*
3425
 * Create a new peer session in assigned state (connect will start automatically)
3426
 */
3427
static struct appctx *peer_session_create(struct peers *peers, struct peer *peer)
3428
0
{
3429
0
  struct appctx *appctx;
3430
0
  unsigned int thr = 0;
3431
0
  int idx;
3432
3433
0
  TRACE_ENTER(PEERS_EV_SESS_NEW, NULL, peer);
3434
3435
0
  peer->new_conn++;
3436
0
  peer->reconnect = tick_add(now_ms, (stopping ? MS_TO_TICKS(PEER_LOCAL_RECONNECT_TIMEOUT) : MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)));
3437
0
  peer->heartbeat = TICK_ETERNITY;
3438
0
  peer->statuscode = PEER_SESS_SC_CONNECTCODE;
3439
0
  peer->last_hdshk = now_ms;
3440
3441
0
  for (idx = 0; idx < global.nbthread; idx++)
3442
0
    thr = peers->applet_count[idx] < peers->applet_count[thr] ? idx : thr;
3443
0
  appctx = appctx_new_on(&peer_applet, NULL, thr);
3444
0
  if (!appctx) {
3445
0
    TRACE_ERROR("peer APPCTX creation failed", PEERS_EV_SESS_NEW|PEERS_EV_SESS_END|PEERS_EV_SESS_ERR, NULL, peer);
3446
0
    goto out_close;
3447
0
  }
3448
0
  appctx->svcctx = (void *)peer;
3449
3450
0
  appctx->st0 = PEER_SESS_ST_CONNECT;
3451
0
  peer->appctx = appctx;
3452
3453
0
  HA_ATOMIC_INC(&peers->applet_count[thr]);
3454
0
  appctx_wakeup(appctx);
3455
3456
0
  TRACE_LEAVE(PEERS_EV_SESS_NEW, appctx, peer);
3457
0
  return appctx;
3458
3459
0
 out_close:
3460
0
  return NULL;
3461
0
}
3462
3463
/* Clear LEARN flags to a given peer, dealing with aborts if it was assigned for
3464
 * learning. In this case, the resync timeout is re-armed.
3465
 */
3466
static void clear_peer_learning_status(struct peer *peer)
3467
0
{
3468
0
  if (peer->learnstate != PEER_LR_ST_NOTASSIGNED) {
3469
0
    struct peers *peers = peer->peers;
3470
3471
    /* unassign current peer for learning */
3472
0
    HA_ATOMIC_AND(&peers->flags, ~PEERS_F_RESYNC_ASSIGN);
3473
0
    HA_ATOMIC_OR(&peers->flags, (peer->local ? PEERS_F_DBG_RESYNC_LOCALABORT : PEERS_F_DBG_RESYNC_REMOTEABORT));
3474
3475
    /* reschedule a resync */
3476
0
    peer->peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000));
3477
0
    peer->learnstate = PEER_LR_ST_NOTASSIGNED;
3478
0
  }
3479
0
  peer->flags &= ~PEER_F_LEARN_NOTUP2DATE;
3480
0
}
3481
3482
static void sync_peer_learn_state(struct peers *peers, struct peer *peer)
3483
0
{
3484
0
  unsigned int flags = 0;
3485
3486
0
  if (peer->learnstate != PEER_LR_ST_FINISHED)
3487
0
    return;
3488
3489
  /* The learning process is now finished */
3490
0
  if (peer->flags & PEER_F_LEARN_NOTUP2DATE) {
3491
    /* Partial resync */
3492
0
    flags |= (peer->local ? PEERS_F_DBG_RESYNC_LOCALPARTIAL : PEERS_F_DBG_RESYNC_REMOTEPARTIAL);
3493
0
    peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
3494
0
    TRACE_STATE("learning finished, peer session partially resync", PEERS_EV_SESS_RESYNC, NULL, peer);
3495
0
  }
3496
0
  else {
3497
    /* Full resync */
3498
0
    struct peer *rem_peer;
3499
0
    int commit_a_finish = 1;
3500
3501
0
    if (peer->srv->shard) {
3502
0
      flags |= PEERS_F_DBG_RESYNC_REMOTEPARTIAL;
3503
0
      peer->flags |= PEER_F_LEARN_NOTUP2DATE;
3504
0
      for (rem_peer = peers->remote; rem_peer; rem_peer = rem_peer->next) {
3505
0
        if (rem_peer->srv->shard && rem_peer != peer) {
3506
0
          HA_SPIN_LOCK(PEER_LOCK, &rem_peer->lock);
3507
0
          if (rem_peer->srv->shard == peer->srv->shard) {
3508
            /* flag all peers from same shard
3509
             * notup2date to disable request
3510
             * of a resync frm them
3511
             */
3512
0
            rem_peer->flags |= PEER_F_LEARN_NOTUP2DATE;
3513
0
          }
3514
0
          else if (!(rem_peer->flags & PEER_F_LEARN_NOTUP2DATE)) {
3515
            /* it remains some other shards not requested
3516
             * we don't commit a resync finish to request
3517
             * the other shards
3518
             */
3519
0
            commit_a_finish = 0;
3520
0
          }
3521
0
          HA_SPIN_UNLOCK(PEER_LOCK, &rem_peer->lock);
3522
0
        }
3523
0
      }
3524
3525
0
      if (!commit_a_finish) {
3526
        /* it remains some shard to request, we schedule a new request */
3527
0
        peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
3528
0
        TRACE_STATE("Resync in progress, some shard not resync yet", PEERS_EV_SESS_RESYNC, NULL, peer);
3529
0
      }
3530
0
    }
3531
3532
0
    if (commit_a_finish) {
3533
0
      flags |= (PEERS_F_RESYNC_LOCAL_FINISHED|PEERS_F_RESYNC_REMOTE_FINISHED);
3534
0
      flags |= (peer->local ? PEERS_F_DBG_RESYNC_LOCALFINISHED : PEERS_F_DBG_RESYNC_REMOTEFINISHED);
3535
0
      TRACE_STATE("learning finished, peer session fully resync", PEERS_EV_SESS_RESYNC, NULL, peer);
3536
0
    }
3537
0
  }
3538
0
  peer->learnstate = PEER_LR_ST_NOTASSIGNED;
3539
0
  HA_ATOMIC_AND(&peers->flags, ~PEERS_F_RESYNC_ASSIGN);
3540
0
  HA_ATOMIC_OR(&peers->flags, flags);
3541
3542
0
  if (peer->appctx)
3543
0
    appctx_wakeup(peer->appctx);
3544
0
}
3545
3546
/* Synchronise the peer applet state with its associated peers section. This
3547
 * function handles STARTING->RUNNING and STOPPING->STOPPED transitions.
3548
 */
3549
static void sync_peer_app_state(struct peers *peers, struct peer *peer)
3550
0
{
3551
0
  if (peer->appstate == PEER_APP_ST_STOPPING) {
3552
0
    clear_peer_learning_status(peer);
3553
0
    peer->appstate = PEER_APP_ST_STOPPED;
3554
0
    TRACE_STATE("peer session now stopped", PEERS_EV_SESS_END, NULL, peer);
3555
0
  }
3556
0
  else if (peer->appstate == PEER_APP_ST_STARTING) {
3557
0
    clear_peer_learning_status(peer);
3558
0
    if (peer->local & appctx_is_back(peer->appctx)) {
3559
      /* if local peer has accepted the connection (appctx is
3560
       * on the backend side), flag it to learn a lesson and
3561
       * be sure it will start immediately. This only happens
3562
       * if no resync is in progress and if the lacal resync
3563
       * was not already performed.
3564
       */
3565
0
      if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL &&
3566
0
          !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
3567
        /* assign local peer for a lesson */
3568
0
        peer->learnstate = PEER_LR_ST_ASSIGNED;
3569
0
        HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_LOCALASSIGN);
3570
0
        TRACE_STATE("peer session assigned for a local resync", PEERS_EV_SESS_RESYNC|PEERS_EV_SESS_WAKE, NULL, peer);
3571
0
      }
3572
0
    }
3573
0
    else if (!peer->local) {
3574
      /* If a connection was validated for a remote peer, flag
3575
       * it to learn a lesson but don't start it yet. The peer
3576
       * must request it explicitly.  This only happens if no
3577
       * resync is in progress and if the remote resync was
3578
       * not already performed.
3579
       */
3580
0
      if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE &&
3581
0
          !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
3582
        /* assign remote peer for a lesson */
3583
0
        peer->learnstate = PEER_LR_ST_ASSIGNED;
3584
0
        HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_REMOTEASSIGN);
3585
0
        TRACE_STATE("peer session assigned for a remote resync", PEERS_EV_SESS_RESYNC|PEERS_EV_SESS_WAKE, NULL, peer);
3586
0
      }
3587
0
    }
3588
0
    peer->appstate = PEER_APP_ST_RUNNING;
3589
0
    TRACE_STATE("peer session running", PEERS_EV_SESS_NEW|PEERS_EV_SESS_WAKE, NULL, peer);
3590
0
    appctx_wakeup(peer->appctx);
3591
0
  }
3592
0
}
3593
3594
/* Process the sync task for a running process.  It is called from process_peer_sync() only */
3595
static void __process_running_peer_sync(struct task *task, struct peers *peers, unsigned int state)
3596
0
{
3597
0
  struct peer *peer;
3598
0
  struct shared_table *st;
3599
0
  int must_resched = 0;
3600
3601
  /* resync timeout set to TICK_ETERNITY means we just start
3602
   * a new process and timer was not initialized.
3603
   * We must arm this timer to switch to a request to a remote
3604
   * node if incoming connection from old local process never
3605
   * comes.
3606
   */
3607
0
  if (peers->resync_timeout == TICK_ETERNITY)
3608
0
    peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
3609
3610
0
  if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL) &&
3611
0
      (!nb_oldpids || tick_is_expired(peers->resync_timeout, now_ms)) &&
3612
0
      !(peers->flags & PEERS_F_RESYNC_ASSIGN)) {
3613
    /* Resync from local peer needed
3614
       no peer was assigned for the lesson
3615
       and no old local peer found
3616
       or resync timeout expire */
3617
3618
    /* flag no more resync from local, to try resync from remotes */
3619
0
    HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_LOCAL_FINISHED|PEERS_F_DBG_RESYNC_LOCALTIMEOUT);
3620
3621
    /* reschedule a resync */
3622
0
    peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
3623
0
  }
3624
3625
  /* For each session */
3626
0
  for (peer = peers->remote; peer; peer = peer->next) {
3627
0
    if (HA_SPIN_TRYLOCK(PEER_LOCK, &peer->lock) != 0) {
3628
0
      must_resched = 1;
3629
0
      continue;
3630
0
    }
3631
3632
0
    sync_peer_learn_state(peers, peer);
3633
0
    sync_peer_app_state(peers, peer);
3634
3635
    /* Peer changes, if any, were now ack by the sync task. Unblock
3636
     * the peer (any wakeup should already be performed, no need to
3637
     * do it here)
3638
     */
3639
0
    peer->flags &= ~PEER_F_WAIT_SYNCTASK_ACK;
3640
3641
    /* For each remote peers */
3642
0
    if (!peer->local) {
3643
0
      if (!peer->appctx) {
3644
        /* no active peer connection */
3645
0
        if (peer->statuscode == 0 ||
3646
0
            ((peer->statuscode == PEER_SESS_SC_CONNECTCODE ||
3647
0
              peer->statuscode == PEER_SESS_SC_SUCCESSCODE ||
3648
0
              peer->statuscode == PEER_SESS_SC_CONNECTEDCODE) &&
3649
0
             tick_is_expired(peer->reconnect, now_ms))) {
3650
          /* connection never tried
3651
           * or previous peer connection established with success
3652
           * or previous peer connection failed while connecting
3653
           * and reconnection timer is expired */
3654
3655
          /* retry a connect */
3656
0
          peer->appctx = peer_session_create(peers, peer);
3657
0
        }
3658
0
        else if (!tick_is_expired(peer->reconnect, now_ms)) {
3659
          /* If previous session failed during connection
3660
           * but reconnection timer is not expired */
3661
3662
          /* reschedule task for reconnect */
3663
0
          task->expire = tick_first(task->expire, peer->reconnect);
3664
0
        }
3665
        /* else do nothing */
3666
0
      } /* !peer->appctx */
3667
0
      else if (peer->statuscode == PEER_SESS_SC_SUCCESSCODE) {
3668
        /* current peer connection is active and established */
3669
0
        if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
3670
0
            !(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
3671
0
            !(peer->flags & PEER_F_LEARN_NOTUP2DATE)) {
3672
          /* Resync from a remote is needed
3673
           * and no peer was assigned for lesson
3674
           * and current peer may be up2date */
3675
3676
          /* assign peer for the lesson */
3677
0
          peer->learnstate = PEER_LR_ST_ASSIGNED;
3678
0
          HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_REMOTEASSIGN);
3679
0
          TRACE_STATE("peer session assigned for a remote resync", PEERS_EV_SESS_RESYNC|PEERS_EV_SESS_WAKE, NULL, peer);
3680
3681
          /* wake up peer handler to handle a request of resync */
3682
0
          appctx_wakeup(peer->appctx);
3683
0
        }
3684
0
        else {
3685
0
          int update_to_push = 0;
3686
3687
          /* Awake session if there is data to push */
3688
0
          for (st = peer->tables; st ; st = st->next) {
3689
0
            if (st->last_pushed != st->table->localupdate) {
3690
              /* wake up the peer handler to push local updates */
3691
0
              update_to_push = 1;
3692
              /* There is no need to send a heartbeat message
3693
               * when some updates must be pushed. The remote
3694
               * peer will consider <peer> peer as alive when it will
3695
               * receive these updates.
3696
               */
3697
0
              peer->flags &= ~PEER_F_HEARTBEAT;
3698
              /* Re-schedule another one later. */
3699
0
              peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
3700
              /* Refresh reconnect if necessary */
3701
0
              if (tick_is_expired(peer->reconnect, now_ms))
3702
0
                peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
3703
              /* We are going to send updates, let's ensure we will
3704
               * come back to send heartbeat messages or to reconnect.
3705
               */
3706
0
              TRACE_DEVEL("wakeup peer session to send update", PEERS_EV_SESS_WAKE, NULL, peer);
3707
0
              task->expire = tick_first(peer->reconnect, peer->heartbeat);
3708
0
              appctx_wakeup(peer->appctx);
3709
0
              break;
3710
0
            }
3711
0
          }
3712
          /* When there are updates to send we do not reconnect
3713
           * and do not send heartbeat message either.
3714
           */
3715
0
          if (!update_to_push) {
3716
0
            if (tick_is_expired(peer->reconnect, now_ms)) {
3717
0
              if (peer->flags & PEER_F_ALIVE) {
3718
                /* This peer was alive during a 'reconnect' period.
3719
                 * Flag it as not alive again for the next period.
3720
                 */
3721
0
                peer->flags &= ~PEER_F_ALIVE;
3722
0
                TRACE_STATE("unresponsive peer session detected", PEERS_EV_SESS_SHUT, NULL, peer);
3723
0
                peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT));
3724
0
              }
3725
0
              else  {
3726
0
                peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
3727
0
                peer->heartbeat = TICK_ETERNITY;
3728
0
                TRACE_STATE("dead peer session, force shutdown", PEERS_EV_SESS_SHUT, NULL, peer);
3729
0
                peer_session_forceshutdown(peer);
3730
0
                sync_peer_app_state(peers, peer);
3731
0
                peer->no_hbt++;
3732
0
              }
3733
0
            }
3734
0
            else if (tick_is_expired(peer->heartbeat, now_ms)) {
3735
0
              peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT));
3736
0
              peer->flags |= PEER_F_HEARTBEAT;
3737
0
              TRACE_DEVEL("wakeup peer session to send heartbeat message", PEERS_EV_SESS_WAKE, NULL, peer);
3738
0
              appctx_wakeup(peer->appctx);
3739
0
            }
3740
0
            task->expire = tick_first(peer->reconnect, peer->heartbeat);
3741
0
          }
3742
0
        }
3743
        /* else do nothing */
3744
0
      } /* SUCCESSCODE */
3745
0
    } /* !peer->peer->local */
3746
3747
0
    HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
3748
0
  } /* for */
3749
3750
  /* Resync from remotes expired or no remote peer: consider resync is finished */
3751
0
  if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) &&
3752
0
      !(peers->flags & PEERS_F_RESYNC_ASSIGN) &&
3753
0
      (tick_is_expired(peers->resync_timeout, now_ms) || !peers->remote->next)) {
3754
    /* Resync from remote peer needed
3755
     * no peer was assigned for the lesson
3756
     * and resync timeout expire */
3757
3758
    /* flag no more resync from remote, consider resync is finished */
3759
0
    HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_REMOTE_FINISHED|PEERS_F_DBG_RESYNC_REMOTETIMEOUT);
3760
0
  }
3761
3762
0
  if (!must_resched && (peers->flags & PEERS_RESYNC_STATEMASK) != PEERS_RESYNC_FINISHED) {
3763
    /* Resync not finished*/
3764
    /* reschedule task to resync timeout if not expired, to ended resync if needed */
3765
0
    if (!tick_is_expired(peers->resync_timeout, now_ms))
3766
0
      task->expire = tick_first(task->expire, peers->resync_timeout);
3767
0
  } else if (must_resched)
3768
0
    task_wakeup(task, TASK_WOKEN_OTHER);
3769
0
}
3770
3771
/* Process the sync task for a stopping process. It is called from process_peer_sync() only */
3772
static void __process_stopping_peer_sync(struct task *task, struct peers *peers, unsigned int state)
3773
0
{
3774
0
  struct peer *peer;
3775
0
  struct shared_table *st;
3776
0
  static int dont_stop = 0;
3777
3778
  /* For each peer */
3779
0
  for (peer = peers->remote; peer; peer = peer->next) {
3780
0
    HA_SPIN_LOCK(PEER_LOCK, &peer->lock);
3781
3782
0
    sync_peer_learn_state(peers, peer);
3783
0
    sync_peer_app_state(peers, peer);
3784
3785
    /* Peer changes, if any, were now ack by the sync task. Unblock
3786
     * the peer (any wakeup should already be performed, no need to
3787
     * do it here)
3788
     */
3789
0
    peer->flags &= ~PEER_F_WAIT_SYNCTASK_ACK;
3790
3791
0
    if ((state & TASK_WOKEN_SIGNAL) && !dont_stop) {
3792
      /* we're killing a connection, we must apply a random delay before
3793
       * retrying otherwise the other end will do the same and we can loop
3794
       * for a while.
3795
       */
3796
0
      peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000));
3797
0
      if (peer->appctx) {
3798
0
        peer_session_forceshutdown(peer);
3799
0
        sync_peer_app_state(peers, peer);
3800
0
      }
3801
0
    }
3802
3803
0
    HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
3804
0
  }
3805
3806
  /* We've just received the signal */
3807
0
  if (state & TASK_WOKEN_SIGNAL) {
3808
0
    if (!dont_stop) {
3809
      /* add DO NOT STOP flag if not present */
3810
0
      _HA_ATOMIC_INC(&jobs);
3811
0
      dont_stop = 1;
3812
3813
      /* Set resync timeout for the local peer and request a immediate reconnect */
3814
0
      peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
3815
0
      peers->local->reconnect = tick_add(now_ms, 0);
3816
0
    }
3817
0
  }
3818
3819
0
  peer = peers->local;
3820
0
  HA_SPIN_LOCK(PEER_LOCK, &peer->lock);
3821
0
  if (peer->flags & PEER_F_LOCAL_TEACH_COMPLETE) {
3822
0
    if (dont_stop) {
3823
      /* resync of new process was complete, current process can die now */
3824
0
      _HA_ATOMIC_DEC(&jobs);
3825
0
      dont_stop = 0;
3826
0
      for (st = peer->tables; st ; st = st->next)
3827
0
        HA_ATOMIC_DEC(&st->table->refcnt);
3828
0
    }
3829
0
  }
3830
0
  else if (!peer->appctx) {
3831
    /* Re-arm resync timeout if necessary */
3832
0
    if (!tick_isset(peers->resync_timeout))
3833
0
      peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT));
3834
3835
    /* If there's no active peer connection */
3836
0
    if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED &&
3837
0
        !tick_is_expired(peers->resync_timeout, now_ms) &&
3838
0
        (peer->statuscode == 0 ||
3839
0
         peer->statuscode == PEER_SESS_SC_SUCCESSCODE ||
3840
0
         peer->statuscode == PEER_SESS_SC_CONNECTEDCODE ||
3841
0
         peer->statuscode == PEER_SESS_SC_TRYAGAIN)) {
3842
      /* The resync is finished for the local peer and
3843
       *   the resync timeout is not expired and
3844
       *   connection never tried
3845
       *   or previous peer connection was successfully established
3846
       *   or previous tcp connect succeeded but init state incomplete
3847
       *   or during previous connect, peer replies a try again statuscode */
3848
3849
0
      if (!tick_is_expired(peer->reconnect, now_ms)) {
3850
        /* reconnection timer is not expired. reschedule task for reconnect */
3851
0
        task->expire = tick_first(task->expire, peer->reconnect);
3852
0
      }
3853
0
      else  {
3854
        /* connect to the local peer if we must push a local sync */
3855
0
        if (dont_stop) {
3856
0
          peer_session_create(peers, peer);
3857
0
        }
3858
0
      }
3859
0
    }
3860
0
    else {
3861
      /* Other error cases */
3862
0
      if (dont_stop) {
3863
        /* unable to resync new process, current process can die now */
3864
0
        _HA_ATOMIC_DEC(&jobs);
3865
0
        dont_stop = 0;
3866
0
        for (st = peer->tables; st ; st = st->next)
3867
0
          HA_ATOMIC_DEC(&st->table->refcnt);
3868
0
      }
3869
0
    }
3870
0
  }
3871
0
  else if (peer->statuscode == PEER_SESS_SC_SUCCESSCODE ) {
3872
    /* Reset resync timeout during a resync */
3873
0
    peers->resync_timeout = TICK_ETERNITY;
3874
3875
    /* current peer connection is active and established
3876
     * wake up all peer handlers to push remaining local updates */
3877
0
    for (st = peer->tables; st ; st = st->next) {
3878
0
      if (st->last_pushed != st->table->localupdate) {
3879
0
        appctx_wakeup(peer->appctx);
3880
0
        break;
3881
0
      }
3882
0
    }
3883
0
  }
3884
0
  HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock);
3885
0
}
3886
3887
/*
3888
 * Task processing function to manage re-connect, peer session
3889
 * tasks wakeup on local update and heartbeat. Let's keep it exported so that it
3890
 * resolves in stack traces and "show tasks".
3891
 */
3892
struct task *process_peer_sync(struct task * task, void *context, unsigned int state)
3893
0
{
3894
0
  struct peers *peers = context;
3895
3896
0
  task->expire = TICK_ETERNITY;
3897
3898
0
  if (!stopping) {
3899
    /* Normal case (not soft stop)*/
3900
0
    __process_running_peer_sync(task, peers, state);
3901
3902
0
  }
3903
0
  else {
3904
    /* soft stop case */
3905
0
    __process_stopping_peer_sync(task, peers, state);
3906
0
  } /* stopping */
3907
3908
  /* Wakeup for re-connect */
3909
0
  return task;
3910
0
}
3911
3912
3913
/*
3914
 * returns 0 in case of error.
3915
 */
3916
int peers_init_sync(struct peers *peers)
3917
0
{
3918
0
  static uint operating_thread = 0;
3919
0
  struct peer * curpeer;
3920
3921
0
  for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) {
3922
0
    peers->peers_fe->maxconn += 3;
3923
0
  }
3924
3925
  /* go backwards so as to distribute the load to other threads
3926
   * than the ones operating the stick-tables for small confs.
3927
   */
3928
0
  operating_thread = (operating_thread - 1) % global.nbthread;
3929
0
  peers->sync_task = task_new_on(operating_thread);
3930
0
  if (!peers->sync_task)
3931
0
    return 0;
3932
3933
0
  memset(peers->applet_count, 0, sizeof(peers->applet_count));
3934
0
  peers->sync_task->process = process_peer_sync;
3935
0
  peers->sync_task->context = (void *)peers;
3936
0
  peers->sighandler = signal_register_task(0, peers->sync_task, 0);
3937
0
  task_wakeup(peers->sync_task, TASK_WOKEN_INIT);
3938
0
  return 1;
3939
0
}
3940
3941
/*
3942
 * Allocate a cache a dictionary entries used upon transmission.
3943
 */
3944
static struct dcache_tx *new_dcache_tx(size_t max_entries)
3945
0
{
3946
0
  struct dcache_tx *d;
3947
0
  struct ebpt_node *entries;
3948
3949
0
  d = malloc(sizeof *d);
3950
0
  entries = calloc(max_entries, sizeof *entries);
3951
0
  if (!d || !entries)
3952
0
    goto err;
3953
3954
0
  d->lru_key = 0;
3955
0
  d->prev_lookup = NULL;
3956
0
  d->cached_entries = EB_ROOT_UNIQUE;
3957
0
  d->entries = entries;
3958
3959
0
  return d;
3960
3961
0
 err:
3962
0
  free(d);
3963
0
  free(entries);
3964
0
  return NULL;
3965
0
}
3966
3967
/*
3968
 * Allocate a cache of dictionary entries with <name> as name and <max_entries>
3969
 * as maximum of entries.
3970
 * Return the dictionary cache if succeeded, NULL if not.
3971
 * Must be deallocated calling free_dcache().
3972
 */
3973
static struct dcache *new_dcache(size_t max_entries)
3974
0
{
3975
0
  struct dcache_tx *dc_tx;
3976
0
  struct dcache *dc;
3977
0
  struct dcache_rx *dc_rx;
3978
3979
0
  dc = calloc(1, sizeof *dc);
3980
0
  dc_tx = new_dcache_tx(max_entries);
3981
0
  dc_rx = calloc(max_entries, sizeof *dc_rx);
3982
0
  if (!dc || !dc_tx || !dc_rx)
3983
0
    goto err;
3984
3985
0
  dc->tx = dc_tx;
3986
0
  dc->rx = dc_rx;
3987
0
  dc->max_entries = max_entries;
3988
3989
0
  return dc;
3990
3991
0
 err:
3992
0
  free(dc);
3993
0
  free(dc_tx);
3994
0
  free(dc_rx);
3995
0
  return NULL;
3996
0
}
3997
3998
/*
3999
 * Look for the dictionary entry with the value of <i> in <d> cache of dictionary
4000
 * entries used upon transmission.
4001
 * Return the entry if found, NULL if not.
4002
 */
4003
static struct ebpt_node *dcache_tx_lookup_value(struct dcache_tx *d,
4004
                                                struct dcache_tx_entry *i)
4005
0
{
4006
0
  return ebpt_lookup(&d->cached_entries, i->entry.key);
4007
0
}
4008
4009
/*
4010
 * Flush <dc> cache.
4011
 * Always succeeds.
4012
 */
4013
static inline void flush_dcache(struct peer *peer)
4014
0
{
4015
0
  int i;
4016
0
  struct dcache *dc = peer->dcache;
4017
4018
0
  for (i = 0; i < dc->max_entries; i++) {
4019
0
    ebpt_delete(&dc->tx->entries[i]);
4020
0
    dc->tx->entries[i].key = NULL;
4021
0
    dict_entry_unref(&server_key_dict, dc->rx[i].de);
4022
0
    dc->rx[i].de = NULL;
4023
0
  }
4024
0
  dc->tx->prev_lookup = NULL;
4025
0
  dc->tx->lru_key = 0;
4026
4027
0
  memset(dc->rx, 0, dc->max_entries * sizeof *dc->rx);
4028
0
}
4029
4030
/*
4031
 * Insert a dictionary entry in <dc> cache part used upon transmission (->tx)
4032
 * with information provided by <i> dictionary cache entry (especially the value
4033
 * to be inserted if not already). Return <i> if already present in the cache
4034
 * or something different of <i> if not.
4035
 */
4036
static struct ebpt_node *dcache_tx_insert(struct dcache *dc, struct dcache_tx_entry *i)
4037
0
{
4038
0
  struct dcache_tx *dc_tx;
4039
0
  struct ebpt_node *o;
4040
4041
0
  dc_tx = dc->tx;
4042
4043
0
  if (dc_tx->prev_lookup && dc_tx->prev_lookup->key == i->entry.key) {
4044
0
    o = dc_tx->prev_lookup;
4045
0
  } else {
4046
0
    o = dcache_tx_lookup_value(dc_tx, i);
4047
0
    if (o) {
4048
      /* Save it */
4049
0
      dc_tx->prev_lookup = o;
4050
0
    }
4051
0
  }
4052
4053
0
  if (o) {
4054
    /* Copy the ID. */
4055
0
    i->id = o - dc->tx->entries;
4056
0
    return &i->entry;
4057
0
  }
4058
4059
  /* The new entry to put in cache */
4060
0
  dc_tx->prev_lookup = o = &dc_tx->entries[dc_tx->lru_key];
4061
4062
0
  ebpt_delete(o);
4063
0
  o->key = i->entry.key;
4064
0
  ebpt_insert(&dc_tx->cached_entries, o);
4065
0
  i->id = dc_tx->lru_key;
4066
4067
  /* Update the index for the next entry to put in cache */
4068
0
  dc_tx->lru_key = (dc_tx->lru_key + 1) & (dc->max_entries - 1);
4069
4070
0
  return o;
4071
0
}
4072
4073
/*
4074
 * Allocate a dictionary cache for each peer of <peers> section.
4075
 * Return 1 if succeeded, 0 if not.
4076
 */
4077
int peers_alloc_dcache(struct peers *peers)
4078
0
{
4079
0
  struct peer *p;
4080
4081
0
  for (p = peers->remote; p; p = p->next) {
4082
0
    p->dcache = new_dcache(PEER_STKT_CACHE_MAX_ENTRIES);
4083
0
    if (!p->dcache)
4084
0
      return 0;
4085
0
  }
4086
4087
0
  return 1;
4088
0
}
4089
4090
/*
4091
 * Function used to register a table for sync on a group of peers
4092
 * Returns 0 in case of success.
4093
 */
4094
int peers_register_table(struct peers *peers, struct stktable *table)
4095
0
{
4096
0
  struct shared_table *st;
4097
0
  struct peer * curpeer;
4098
0
  int id = 0;
4099
0
  int retval = 0;
4100
4101
0
  for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) {
4102
0
    st = calloc(1,sizeof(*st));
4103
0
    if (!st) {
4104
0
      retval = 1;
4105
0
      break;
4106
0
    }
4107
0
    st->table = table;
4108
0
    st->next = curpeer->tables;
4109
0
    if (curpeer->tables)
4110
0
      id = curpeer->tables->local_id;
4111
0
    st->local_id = id + 1;
4112
4113
    /* If peer is local we inc table
4114
     * refcnt to protect against flush
4115
     * until this process pushed all
4116
     * table content to the new one
4117
     */
4118
0
    if (curpeer->local)
4119
0
      HA_ATOMIC_INC(&st->table->refcnt);
4120
0
    curpeer->tables = st;
4121
0
  }
4122
4123
0
  table->sync_task = peers->sync_task;
4124
4125
0
  return retval;
4126
0
}
4127
4128
/* context used by a "show peers" command */
4129
struct show_peers_ctx {
4130
  void *target;           /* if non-null, dump only this section and stop */
4131
  struct peers *peers;    /* "peers" section being currently dumped. */
4132
  struct peer *peer;      /* "peer" being currently dumped. */
4133
  int flags;              /* non-zero if "dict" dump requested */
4134
  enum {
4135
    STATE_HEAD = 0, /* dump the section's header */
4136
    STATE_PEER,     /* dump the whole peer */
4137
    STATE_DONE,     /* finished */
4138
  } state;                /* parser's state */
4139
};
4140
4141
/*
4142
 * Parse the "show peers" command arguments.
4143
 * Returns 0 if succeeded, 1 if not with the ->msg of the appctx set as
4144
 * error message.
4145
 */
4146
static int cli_parse_show_peers(char **args, char *payload, struct appctx *appctx, void *private)
4147
0
{
4148
0
  struct show_peers_ctx *ctx = applet_reserve_svcctx(appctx, sizeof(*ctx));
4149
4150
0
  if (strcmp(args[2], "dict") == 0) {
4151
    /* show the dictionaries (large dump) */
4152
0
    ctx->flags |= PEERS_SHOW_F_DICT;
4153
0
    args++;
4154
0
  } else if (strcmp(args[2], "-") == 0)
4155
0
    args++; // allows to show a section called "dict"
4156
4157
0
  if (*args[2]) {
4158
0
    struct peers *p;
4159
4160
0
    for (p = cfg_peers; p; p = p->next) {
4161
0
      if (strcmp(p->id, args[2]) == 0) {
4162
0
        ctx->target = p;
4163
0
        break;
4164
0
      }
4165
0
    }
4166
4167
0
    if (!p)
4168
0
      return cli_err(appctx, "No such peers\n");
4169
0
  }
4170
4171
  /* where to start from */
4172
0
  ctx->peers = ctx->target ? ctx->target : cfg_peers;
4173
0
  return 0;
4174
0
}
4175
4176
/*
4177
 * This function dumps the peer state information of <peers> "peers" section.
4178
 * Returns 0 if the output buffer is full and needs to be called again, non-zero if not.
4179
 * Dedicated to be called by cli_io_handler_show_peers() cli I/O handler.
4180
 */
4181
static int peers_dump_head(struct buffer *msg, struct appctx *appctx, struct peers *peers)
4182
0
{
4183
0
  struct tm tm;
4184
4185
0
  get_localtime(peers->last_change, &tm);
4186
0
  chunk_appendf(msg, "%p: [%02d/%s/%04d:%02d:%02d:%02d] id=%s disabled=%d flags=0x%x resync_timeout=%s task_calls=%u\n",
4187
0
                peers,
4188
0
                tm.tm_mday, monthname[tm.tm_mon], tm.tm_year+1900,
4189
0
                tm.tm_hour, tm.tm_min, tm.tm_sec,
4190
0
                peers->id, peers->disabled, HA_ATOMIC_LOAD(&peers->flags),
4191
0
                peers->resync_timeout ?
4192
0
                   tick_is_expired(peers->resync_timeout, now_ms) ? "<PAST>" :
4193
0
                           human_time(TICKS_TO_MS(peers->resync_timeout - now_ms),
4194
0
                           TICKS_TO_MS(1000)) : "<NEVER>",
4195
0
                peers->sync_task ? peers->sync_task->calls : 0);
4196
4197
0
  if (applet_putchk(appctx, msg) == -1)
4198
0
    return 0;
4199
4200
0
  return 1;
4201
0
}
4202
4203
/*
4204
 * This function dumps <peer> state information.
4205
 * Returns 0 if the output buffer is full and needs to be called again, non-zero
4206
 * if not. Dedicated to be called by cli_io_handler_show_peers() cli I/O handler.
4207
 */
4208
static int peers_dump_peer(struct buffer *msg, struct appctx *appctx, struct peer *peer, int flags)
4209
0
{
4210
0
  struct connection *conn;
4211
0
  char pn[INET6_ADDRSTRLEN];
4212
0
  struct stconn *peer_cs;
4213
0
  struct stream *peer_s;
4214
0
  struct shared_table *st;
4215
4216
0
  addr_to_str(&peer->srv->addr, pn, sizeof pn);
4217
0
  chunk_appendf(msg, "  %p: id=%s(%s,%s) addr=%s:%d app_state=%s learn_state=%s last_status=%s",
4218
0
                peer, peer->id,
4219
0
                peer->local ? "local" : "remote",
4220
0
                peer->appctx ? "active" : "inactive",
4221
0
                pn, peer->srv->svc_port,
4222
0
          peer_app_state_str(peer->appstate),
4223
0
          peer_learn_state_str(peer->learnstate),
4224
0
                statuscode_str(peer->statuscode));
4225
4226
0
  chunk_appendf(msg, " last_hdshk=%s\n",
4227
0
                peer->last_hdshk ? human_time(TICKS_TO_MS(now_ms - peer->last_hdshk),
4228
0
                                              TICKS_TO_MS(1000)) : "<NEVER>");
4229
4230
0
  chunk_appendf(msg, "        reconnect=%s",
4231
0
                peer->reconnect ?
4232
0
                   tick_is_expired(peer->reconnect, now_ms) ? "<PAST>" :
4233
0
                           human_time(TICKS_TO_MS(peer->reconnect - now_ms),
4234
0
                           TICKS_TO_MS(1000)) : "<NEVER>");
4235
4236
0
  chunk_appendf(msg, " heartbeat=%s",
4237
0
                peer->heartbeat ?
4238
0
                   tick_is_expired(peer->heartbeat, now_ms) ? "<PAST>" :
4239
0
                           human_time(TICKS_TO_MS(peer->heartbeat - now_ms),
4240
0
                           TICKS_TO_MS(1000)) : "<NEVER>");
4241
4242
0
  chunk_appendf(msg, " confirm=%u tx_hbt=%u rx_hbt=%u no_hbt=%u new_conn=%u proto_err=%u coll=%u\n",
4243
0
                peer->confirm, peer->tx_hbt, peer->rx_hbt,
4244
0
                peer->no_hbt, peer->new_conn, peer->proto_err, peer->coll);
4245
4246
0
  chunk_appendf(&trash, "        flags=0x%x", peer->flags);
4247
4248
0
  if (!peer->appctx)
4249
0
    goto table_info;
4250
4251
0
  chunk_appendf(&trash, " appctx:%p st0=%d st1=%d task_calls=%u",
4252
0
                peer->appctx, peer->appctx->st0, peer->appctx->st1,
4253
0
                peer->appctx->t ? peer->appctx->t->calls : 0);
4254
4255
0
  peer_cs = appctx_sc(peer->appctx);
4256
0
  if (!peer_cs) {
4257
    /* the appctx might exist but not yet be initialized due to
4258
     * deferred initialization used to balance applets across
4259
     * threads.
4260
     */
4261
0
    goto table_info;
4262
0
  }
4263
4264
0
  peer_s = __sc_strm(peer_cs);
4265
4266
0
  chunk_appendf(&trash, " state=%s", sc_state_str(sc_opposite(peer_cs)->state));
4267
4268
0
  conn = objt_conn(strm_orig(peer_s));
4269
0
  if (conn)
4270
0
    chunk_appendf(&trash, "\n        xprt=%s", conn_get_xprt_name(conn));
4271
4272
0
  switch (conn && conn_get_src(conn) ? addr_to_str(conn->src, pn, sizeof(pn)) : AF_UNSPEC) {
4273
0
  case AF_INET:
4274
0
  case AF_INET6:
4275
0
    chunk_appendf(&trash, " src=%s:%d", pn, get_host_port(conn->src));
4276
0
    break;
4277
0
  case AF_UNIX:
4278
0
  case AF_CUST_ABNS:
4279
0
  case AF_CUST_ABNSZ:
4280
0
    chunk_appendf(&trash, " src=unix:%d", strm_li(peer_s)->luid);
4281
0
    break;
4282
0
  }
4283
4284
0
  switch (conn && conn_get_dst(conn) ? addr_to_str(conn->dst, pn, sizeof(pn)) : AF_UNSPEC) {
4285
0
  case AF_INET:
4286
0
  case AF_INET6:
4287
0
    chunk_appendf(&trash, " addr=%s:%d", pn, get_host_port(conn->dst));
4288
0
    break;
4289
0
  case AF_UNIX:
4290
0
  case AF_CUST_ABNS:
4291
0
  case AF_CUST_ABNSZ:
4292
0
    chunk_appendf(&trash, " addr=unix:%d", strm_li(peer_s)->luid);
4293
0
    break;
4294
0
  }
4295
4296
0
 table_info:
4297
0
  if (peer->remote_table)
4298
0
    chunk_appendf(&trash, "\n        remote_table:%p id=%s local_id=%d remote_id=%d",
4299
0
                  peer->remote_table,
4300
0
                  peer->remote_table->table->id,
4301
0
                  peer->remote_table->local_id,
4302
0
                  peer->remote_table->remote_id);
4303
4304
0
  if (peer->last_local_table)
4305
0
    chunk_appendf(&trash, "\n        last_local_table:%p id=%s local_id=%d remote_id=%d",
4306
0
                  peer->last_local_table,
4307
0
                  peer->last_local_table->table->id,
4308
0
                  peer->last_local_table->local_id,
4309
0
                  peer->last_local_table->remote_id);
4310
4311
0
  if (peer->tables) {
4312
0
    chunk_appendf(&trash, "\n        shared tables:");
4313
0
    for (st = peer->tables; st; st = st->next) {
4314
0
      int i, count;
4315
0
      struct stktable *t;
4316
0
      struct dcache *dcache;
4317
4318
0
      t = st->table;
4319
0
      dcache = peer->dcache;
4320
4321
0
      chunk_appendf(&trash, "\n          %p local_id=%d remote_id=%d "
4322
0
                    "flags=0x%x remote_data=0x%llx",
4323
0
                    st, st->local_id, st->remote_id,
4324
0
                    st->flags, (unsigned long long)st->remote_data);
4325
0
      chunk_appendf(&trash, "\n              last_acked=%u last_pushed=%u last_get=%u"
4326
0
                    " teaching_origin=%u update=%u",
4327
0
                    st->last_acked, st->last_pushed, st->last_get, st->teaching_origin, st->update);
4328
0
      chunk_appendf(&trash, "\n              table:%p id=%s update=%u localupdate=%u refcnt=%u",
4329
0
                    t, t->id, t->update, t->localupdate, t->refcnt);
4330
0
      if (flags & PEERS_SHOW_F_DICT) {
4331
0
        chunk_appendf(&trash, "\n        TX dictionary cache:");
4332
0
        count = 0;
4333
0
        for (i = 0; i < dcache->max_entries; i++) {
4334
0
          struct ebpt_node *node;
4335
0
          struct dict_entry *de;
4336
4337
0
          node = &dcache->tx->entries[i];
4338
0
          if (!node->key)
4339
0
            break;
4340
4341
0
          if (!count++)
4342
0
            chunk_appendf(&trash, "\n        ");
4343
0
          de = node->key;
4344
0
          chunk_appendf(&trash, "  %3u -> %s", i, (char *)de->value.key);
4345
0
          count &= 0x3;
4346
0
        }
4347
0
        chunk_appendf(&trash, "\n        RX dictionary cache:");
4348
0
        count = 0;
4349
0
        for (i = 0; i < dcache->max_entries; i++) {
4350
0
          if (!count++)
4351
0
            chunk_appendf(&trash, "\n        ");
4352
0
          chunk_appendf(&trash, "  %3u -> %s", i,
4353
0
                  dcache->rx[i].de ?
4354
0
                  (char *)dcache->rx[i].de->value.key : "-");
4355
0
          count &= 0x3;
4356
0
        }
4357
0
      } else {
4358
0
        chunk_appendf(&trash, "\n        Dictionary cache not dumped (use \"show peers dict\")");
4359
0
      }
4360
0
    }
4361
0
  }
4362
4363
0
 end:
4364
0
  chunk_appendf(&trash, "\n");
4365
0
  if (applet_putchk(appctx, msg) == -1)
4366
0
    return 0;
4367
4368
0
  return 1;
4369
0
}
4370
4371
/*
4372
 * This function dumps all the peers of "peers" section.
4373
 * Returns 0 if the output buffer is full and needs to be called
4374
 * again, non-zero if not. It proceeds in an isolated thread, so
4375
 * there is no thread safety issue here.
4376
 */
4377
static int cli_io_handler_show_peers(struct appctx *appctx)
4378
0
{
4379
0
  struct show_peers_ctx *ctx = appctx->svcctx;
4380
0
  int ret = 0, first_peers = 1;
4381
4382
0
  thread_isolate();
4383
4384
0
  chunk_reset(&trash);
4385
4386
0
  while (ctx->state != STATE_DONE) {
4387
0
    switch (ctx->state) {
4388
0
    case STATE_HEAD:
4389
0
      if (!ctx->peers) {
4390
        /* No more peers list. */
4391
0
        ctx->state = STATE_DONE;
4392
0
      }
4393
0
      else {
4394
0
        if (!first_peers)
4395
0
          chunk_appendf(&trash, "\n");
4396
0
        else
4397
0
          first_peers = 0;
4398
0
        if (!peers_dump_head(&trash, appctx, ctx->peers))
4399
0
          goto out;
4400
4401
0
        ctx->peer = ctx->peers->remote;
4402
0
        ctx->peers = ctx->peers->next;
4403
0
        ctx->state = STATE_PEER;
4404
0
      }
4405
0
      break;
4406
4407
0
    case STATE_PEER:
4408
0
      if (!ctx->peer) {
4409
        /* End of peer list */
4410
0
        if (!ctx->target)
4411
0
          ctx->state = STATE_HEAD; // next one
4412
0
          else
4413
0
          ctx->state = STATE_DONE;
4414
0
      }
4415
0
      else {
4416
0
        if (!peers_dump_peer(&trash, appctx, ctx->peer, ctx->flags))
4417
0
          goto out;
4418
4419
0
        ctx->peer = ctx->peer->next;
4420
0
      }
4421
0
      break;
4422
4423
0
    default:
4424
0
      break;
4425
0
    }
4426
0
  }
4427
0
  ret = 1;
4428
0
 out:
4429
0
  thread_release();
4430
0
  return ret;
4431
0
}
4432
4433
4434
struct peers_kw_list peers_keywords = {
4435
  .list = LIST_HEAD_INIT(peers_keywords.list)
4436
};
4437
4438
void peers_register_keywords(struct peers_kw_list *pkwl)
4439
0
{
4440
0
  LIST_APPEND(&peers_keywords.list, &pkwl->list);
4441
0
}
4442
4443
/* config parser for global "tune.peers.max-updates-at-once" */
4444
static int cfg_parse_max_updt_at_once(char **args, int section_type, struct proxy *curpx,
4445
                                      const struct proxy *defpx, const char *file, int line,
4446
                                      char **err)
4447
0
{
4448
0
  int arg = -1;
4449
4450
0
  if (too_many_args(1, args, err, NULL))
4451
0
    return -1;
4452
4453
0
  if (*(args[1]) != 0)
4454
0
    arg = atoi(args[1]);
4455
4456
0
  if (arg < 1) {
4457
0
    memprintf(err, "'%s' expects an integer argument greater than 0.", args[0]);
4458
0
    return -1;
4459
0
  }
4460
4461
0
  peers_max_updates_at_once = arg;
4462
0
  return 0;
4463
0
}
4464
4465
/* config keyword parsers */
4466
static struct cfg_kw_list cfg_kws = {ILH, {
4467
  { CFG_GLOBAL, "tune.peers.max-updates-at-once",  cfg_parse_max_updt_at_once },
4468
  { 0, NULL, NULL }
4469
}};
4470
4471
INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws);
4472
4473
/*
4474
 * CLI keywords.
4475
 */
4476
static struct cli_kw_list cli_kws = {{ }, {
4477
  { { "show", "peers", NULL }, "show peers [dict|-] [section]           : dump some information about all the peers or this peers section", cli_parse_show_peers, cli_io_handler_show_peers, },
4478
  {},
4479
}};
4480
4481
/* Register cli keywords */
4482
INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws);