Line | Count | Source |
1 | | /* |
2 | | * Peer synchro management. |
3 | | * |
4 | | * Copyright 2010 EXCELIANCE, Emeric Brun <ebrun@exceliance.fr> |
5 | | * |
6 | | * This program is free software; you can redistribute it and/or |
7 | | * modify it under the terms of the GNU General Public License |
8 | | * as published by the Free Software Foundation; either version |
9 | | * 2 of the License, or (at your option) any later version. |
10 | | * |
11 | | */ |
12 | | |
13 | | #include <errno.h> |
14 | | #include <stdio.h> |
15 | | #include <stdlib.h> |
16 | | #include <string.h> |
17 | | |
18 | | #include <sys/socket.h> |
19 | | #include <sys/stat.h> |
20 | | #include <sys/types.h> |
21 | | |
22 | | #include <import/eb32tree.h> |
23 | | #include <import/ebmbtree.h> |
24 | | #include <import/ebpttree.h> |
25 | | |
26 | | #include <haproxy/api.h> |
27 | | #include <haproxy/applet.h> |
28 | | #include <haproxy/cfgparse.h> |
29 | | #include <haproxy/channel.h> |
30 | | #include <haproxy/cli.h> |
31 | | #include <haproxy/dict.h> |
32 | | #include <haproxy/errors.h> |
33 | | #include <haproxy/fd.h> |
34 | | #include <haproxy/frontend.h> |
35 | | #include <haproxy/net_helper.h> |
36 | | #include <haproxy/obj_type-t.h> |
37 | | #include <haproxy/peers.h> |
38 | | #include <haproxy/proxy.h> |
39 | | #include <haproxy/sc_strm.h> |
40 | | #include <haproxy/session-t.h> |
41 | | #include <haproxy/signal.h> |
42 | | #include <haproxy/stats-t.h> |
43 | | #include <haproxy/stconn.h> |
44 | | #include <haproxy/stick_table.h> |
45 | | #include <haproxy/stream.h> |
46 | | #include <haproxy/task.h> |
47 | | #include <haproxy/thread.h> |
48 | | #include <haproxy/time.h> |
49 | | #include <haproxy/tools.h> |
50 | | #include <haproxy/trace.h> |
51 | | |
52 | | /***********************************/ |
53 | | /* Current shared table sync state */ |
54 | | /***********************************/ |
55 | 0 | #define SHTABLE_F_TEACH_STAGE1 0x00000001 /* Teach state 1 complete */ |
56 | 0 | #define SHTABLE_F_TEACH_STAGE2 0x00000002 /* Teach state 2 complete */ |
57 | | |
58 | | |
59 | | #define PEER_RESYNC_TIMEOUT 5000 /* 5 seconds */ |
60 | | #define PEER_RECONNECT_TIMEOUT 5000 /* 5 seconds */ |
61 | | #define PEER_LOCAL_RECONNECT_TIMEOUT 500 /* 500ms */ |
62 | | #define PEER_HEARTBEAT_TIMEOUT 3000 /* 3 seconds */ |
63 | | |
64 | | /* default maximum of updates sent at once */ |
65 | | #define PEER_DEF_MAX_UPDATES_AT_ONCE 200 |
66 | | |
67 | | /* flags for "show peers" */ |
68 | 0 | #define PEERS_SHOW_F_DICT 0x00000001 /* also show the contents of the dictionary */ |
69 | | |
70 | | /*****************************/ |
71 | | /* Sync message class */ |
72 | | /*****************************/ |
73 | | enum { |
74 | | PEER_MSG_CLASS_CONTROL = 0, |
75 | | PEER_MSG_CLASS_ERROR, |
76 | | PEER_MSG_CLASS_STICKTABLE = 10, |
77 | | PEER_MSG_CLASS_RESERVED = 255, |
78 | | }; |
79 | | |
80 | | /*****************************/ |
81 | | /* control message types */ |
82 | | /*****************************/ |
83 | | enum { |
84 | | PEER_MSG_CTRL_RESYNCREQ = 0, |
85 | | PEER_MSG_CTRL_RESYNCFINISHED, |
86 | | PEER_MSG_CTRL_RESYNCPARTIAL, |
87 | | PEER_MSG_CTRL_RESYNCCONFIRM, |
88 | | PEER_MSG_CTRL_HEARTBEAT, |
89 | | }; |
90 | | |
91 | | /*****************************/ |
92 | | /* error message types */ |
93 | | /*****************************/ |
94 | | enum { |
95 | | PEER_MSG_ERR_PROTOCOL = 0, |
96 | | PEER_MSG_ERR_SIZELIMIT, |
97 | | }; |
98 | | |
99 | | /* network key types; |
100 | | * network types were directly and mistakenly |
101 | | * mapped on sample types, to keep backward |
102 | | * compatiblitiy we keep those values but |
103 | | * we now use a internal/network mapping |
104 | | * to avoid further mistakes adding or |
105 | | * modifying internals types |
106 | | */ |
107 | | enum { |
108 | | PEER_KT_ANY = 0, /* any type */ |
109 | | PEER_KT_RESV1, /* UNUSED */ |
110 | | PEER_KT_SINT, /* signed 64bits integer type */ |
111 | | PEER_KT_RESV3, /* UNUSED */ |
112 | | PEER_KT_IPV4, /* ipv4 type */ |
113 | | PEER_KT_IPV6, /* ipv6 type */ |
114 | | PEER_KT_STR, /* char string type */ |
115 | | PEER_KT_BIN, /* buffer type */ |
116 | | PEER_KT_TYPES /* number of types, must always be last */ |
117 | | }; |
118 | | |
119 | | /* Map used to retrieve network type from internal type |
120 | | * Note: Undeclared mapping maps entry to PEER_KT_ANY == 0 |
121 | | */ |
122 | | static int peer_net_key_type[SMP_TYPES] = { |
123 | | [SMP_T_SINT] = PEER_KT_SINT, |
124 | | [SMP_T_IPV4] = PEER_KT_IPV4, |
125 | | [SMP_T_IPV6] = PEER_KT_IPV6, |
126 | | [SMP_T_STR] = PEER_KT_STR, |
127 | | [SMP_T_BIN] = PEER_KT_BIN, |
128 | | }; |
129 | | |
130 | | /* Map used to retrieve internal type from external type |
131 | | * Note: Undeclared mapping maps entry to SMP_T_ANY == 0 |
132 | | */ |
133 | | static int peer_int_key_type[PEER_KT_TYPES] = { |
134 | | [PEER_KT_SINT] = SMP_T_SINT, |
135 | | [PEER_KT_IPV4] = SMP_T_IPV4, |
136 | | [PEER_KT_IPV6] = SMP_T_IPV6, |
137 | | [PEER_KT_STR] = SMP_T_STR, |
138 | | [PEER_KT_BIN] = SMP_T_BIN, |
139 | | }; |
140 | | |
141 | | /* |
142 | | * Parameters used by functions to build peer protocol messages. */ |
143 | | struct peer_prep_params { |
144 | | struct { |
145 | | struct peer *peer; |
146 | | } hello; |
147 | | struct { |
148 | | unsigned int st1; |
149 | | } error_status; |
150 | | struct { |
151 | | struct stksess *stksess; |
152 | | struct shared_table *shared_table; |
153 | | unsigned int updateid; |
154 | | int use_identifier; |
155 | | int use_timed; |
156 | | struct peer *peer; |
157 | | } updt; |
158 | | struct { |
159 | | struct shared_table *shared_table; |
160 | | } swtch; |
161 | | struct { |
162 | | struct shared_table *shared_table; |
163 | | } ack; |
164 | | struct { |
165 | | unsigned char head[2]; |
166 | | } control; |
167 | | struct { |
168 | | unsigned char head[2]; |
169 | | } error; |
170 | | }; |
171 | | |
172 | | /*******************************/ |
173 | | /* stick table sync mesg types */ |
174 | | /* Note: ids >= 128 contains */ |
175 | | /* id message contains data */ |
176 | | /*******************************/ |
177 | 0 | #define PEER_MSG_STKT_UPDATE 0x80 |
178 | 0 | #define PEER_MSG_STKT_INCUPDATE 0x81 |
179 | 0 | #define PEER_MSG_STKT_DEFINE 0x82 |
180 | 0 | #define PEER_MSG_STKT_SWITCH 0x83 |
181 | 0 | #define PEER_MSG_STKT_ACK 0x84 |
182 | 0 | #define PEER_MSG_STKT_UPDATE_TIMED 0x85 |
183 | 0 | #define PEER_MSG_STKT_INCUPDATE_TIMED 0x86 |
184 | | /* All the stick-table message identifiers abova have the #7 bit set */ |
185 | 0 | #define PEER_MSG_STKT_BIT 7 |
186 | 0 | #define PEER_MSG_STKT_BIT_MASK (1 << PEER_MSG_STKT_BIT) |
187 | | |
188 | | /* The maximum length of an encoded data length. */ |
189 | 0 | #define PEER_MSG_ENC_LENGTH_MAXLEN 5 |
190 | | |
191 | | /* Minimum 64-bits value encoded with 2 bytes */ |
192 | 0 | #define PEER_ENC_2BYTES_MIN 0xf0 /* 0xf0 (or 240) */ |
193 | | /* 3 bytes */ |
194 | | #define PEER_ENC_3BYTES_MIN ((1ULL << 11) | PEER_ENC_2BYTES_MIN) /* 0x8f0 (or 2288) */ |
195 | | /* 4 bytes */ |
196 | | #define PEER_ENC_4BYTES_MIN ((1ULL << 18) | PEER_ENC_3BYTES_MIN) /* 0x408f0 (or 264432) */ |
197 | | /* 5 bytes */ |
198 | | #define PEER_ENC_5BYTES_MIN ((1ULL << 25) | PEER_ENC_4BYTES_MIN) /* 0x20408f0 (or 33818864) */ |
199 | | /* 6 bytes */ |
200 | | #define PEER_ENC_6BYTES_MIN ((1ULL << 32) | PEER_ENC_5BYTES_MIN) /* 0x1020408f0 (or 4328786160) */ |
201 | | /* 7 bytes */ |
202 | | #define PEER_ENC_7BYTES_MIN ((1ULL << 39) | PEER_ENC_6BYTES_MIN) /* 0x81020408f0 (or 554084600048) */ |
203 | | /* 8 bytes */ |
204 | | #define PEER_ENC_8BYTES_MIN ((1ULL << 46) | PEER_ENC_7BYTES_MIN) /* 0x4081020408f0 (or 70922828777712) */ |
205 | | /* 9 bytes */ |
206 | | #define PEER_ENC_9BYTES_MIN ((1ULL << 53) | PEER_ENC_8BYTES_MIN) /* 0x204081020408f0 (or 9078122083518704) */ |
207 | | /* 10 bytes */ |
208 | | #define PEER_ENC_10BYTES_MIN ((1ULL << 60) | PEER_ENC_9BYTES_MIN) /* 0x10204081020408f0 (or 1161999626690365680) */ |
209 | | |
210 | | /* #7 bit used to detect the last byte to be encoded */ |
211 | 0 | #define PEER_ENC_STOP_BIT 7 |
212 | | /* The byte minimum value with #7 bit set */ |
213 | 0 | #define PEER_ENC_STOP_BYTE (1 << PEER_ENC_STOP_BIT) |
214 | | /* The left most number of bits set for PEER_ENC_2BYTES_MIN */ |
215 | 0 | #define PEER_ENC_2BYTES_MIN_BITS 4 |
216 | | |
217 | 0 | #define PEER_MSG_HEADER_LEN 2 |
218 | | |
219 | 0 | #define PEER_STKT_CACHE_MAX_ENTRIES 128 |
220 | | |
221 | | /**********************************/ |
222 | | /* Peer Session IO handler states */ |
223 | | /**********************************/ |
224 | | |
225 | | enum { |
226 | | PEER_SESS_ST_ACCEPT = 0, /* Initial state for session create by an accept, must be zero! */ |
227 | | PEER_SESS_ST_GETVERSION, /* Validate supported protocol version */ |
228 | | PEER_SESS_ST_GETHOST, /* Validate host ID correspond to local host id */ |
229 | | PEER_SESS_ST_GETPEER, /* Validate peer ID correspond to a known remote peer id */ |
230 | | /* after this point, data were possibly exchanged */ |
231 | | PEER_SESS_ST_SENDSUCCESS, /* Send ret code 200 (success) and wait for message */ |
232 | | PEER_SESS_ST_CONNECT, /* Initial state for session create on a connect, push presentation into buffer */ |
233 | | PEER_SESS_ST_GETSTATUS, /* Wait for the welcome message */ |
234 | | PEER_SESS_ST_WAITMSG, /* Wait for data messages */ |
235 | | PEER_SESS_ST_EXIT, /* Exit with status code */ |
236 | | PEER_SESS_ST_ERRPROTO, /* Send error proto message before exit */ |
237 | | PEER_SESS_ST_ERRSIZE, /* Send error size message before exit */ |
238 | | PEER_SESS_ST_END, /* Killed session */ |
239 | | }; |
240 | | |
241 | | /***************************************************/ |
242 | | /* Peer Session status code - part of the protocol */ |
243 | | /***************************************************/ |
244 | | |
245 | 0 | #define PEER_SESS_SC_CONNECTCODE 100 /* connect in progress */ |
246 | 0 | #define PEER_SESS_SC_CONNECTEDCODE 110 /* tcp connect success */ |
247 | | |
248 | 0 | #define PEER_SESS_SC_SUCCESSCODE 200 /* accept or connect successful */ |
249 | | |
250 | 0 | #define PEER_SESS_SC_TRYAGAIN 300 /* try again later */ |
251 | | |
252 | 0 | #define PEER_SESS_SC_ERRPROTO 501 /* error protocol */ |
253 | 0 | #define PEER_SESS_SC_ERRVERSION 502 /* unknown protocol version */ |
254 | 0 | #define PEER_SESS_SC_ERRHOST 503 /* bad host name */ |
255 | 0 | #define PEER_SESS_SC_ERRPEER 504 /* unknown peer */ |
256 | | |
257 | 0 | #define PEER_SESSION_PROTO_NAME "HAProxyS" |
258 | 0 | #define PEER_MAJOR_VER 2 |
259 | 0 | #define PEER_MINOR_VER 1 |
260 | 0 | #define PEER_DWNGRD_MINOR_VER 0 |
261 | | |
262 | | static size_t proto_len = sizeof(PEER_SESSION_PROTO_NAME) - 1; |
263 | | struct peers *cfg_peers = NULL; |
264 | | static int peers_max_updates_at_once = PEER_DEF_MAX_UPDATES_AT_ONCE; |
265 | | static void peer_session_forceshutdown(struct peer *peer); |
266 | | |
267 | | static struct ebpt_node *dcache_tx_insert(struct dcache *dc, |
268 | | struct dcache_tx_entry *i); |
269 | | static inline void flush_dcache(struct peer *peer); |
270 | | |
271 | | /* trace source and events */ |
272 | | static void peers_trace(enum trace_level level, uint64_t mask, |
273 | | const struct trace_source *src, |
274 | | const struct ist where, const struct ist func, |
275 | | const void *a1, const void *a2, const void *a3, const void *a4); |
276 | | |
277 | | static const char *statuscode_str(int statuscode); |
278 | | static const char *peer_app_state_str(enum peer_app_state appstate); |
279 | | static const char *peer_learn_state_str(enum peer_learn_state learnstate); |
280 | | static const char *peer_applet_state_str(int state); |
281 | | |
282 | | static const struct trace_event peers_trace_events[] = { |
283 | | #define PEERS_EV_SESS_NEW (1ULL << 0) |
284 | | { .mask = PEERS_EV_SESS_NEW, .name = "sess_new", .desc = "create new peer session" }, |
285 | | #define PEERS_EV_SESS_END (1ULL << 1) |
286 | | { .mask = PEERS_EV_SESS_END, .name = "sess_end", .desc = "peer session terminated" }, |
287 | | #define PEERS_EV_SESS_ERR (1ULL << 2) |
288 | | { .mask = PEERS_EV_SESS_ERR, .name = "sess_err", .desc = "error on peer session" }, |
289 | | #define PEERS_EV_SESS_SHUT (1ULL << 3) |
290 | | { .mask = PEERS_EV_SESS_SHUT, .name = "sess_shut", .desc = "peer session shutdown" }, |
291 | | #define PEERS_EV_SESS_WAKE (1ULL << 4) |
292 | | { .mask = PEERS_EV_SESS_WAKE, .name = "sess_wakeup", .desc = "peer session wakeup" }, |
293 | | #define PEERS_EV_SESS_RESYNC (1ULL << 5) |
294 | | { .mask = PEERS_EV_SESS_RESYNC, .name = "sess_resync", .desc = "peer session resync" }, |
295 | | #define PEERS_EV_SESS_IO (1ULL << 6) |
296 | | { .mask = PEERS_EV_SESS_IO, .name = "sess_io", .desc = "peer session I/O" }, |
297 | | |
298 | | #define PEERS_EV_RX_MSG (1ULL << 7) |
299 | | { .mask = PEERS_EV_RX_MSG, .name = "rx_msg", .desc = "message received" }, |
300 | | #define PEERS_EV_RX_BLK (1ULL << 8) |
301 | | { .mask = PEERS_EV_RX_BLK, .name = "rx_blocked", .desc = "receive blocked" }, |
302 | | #define PEERS_EV_RX_ERR (1ULL << 9) |
303 | | { .mask = PEERS_EV_RX_ERR, .name = "rx_error", .desc = "receive error" }, |
304 | | |
305 | | #define PEERS_EV_TX_MSG (1ULL << 10) |
306 | | { .mask = PEERS_EV_TX_MSG, .name = "tx_msg", .desc = "message sent" }, |
307 | | #define PEERS_EV_TX_BLK (1ULL << 11) |
308 | | { .mask = PEERS_EV_TX_BLK, .name = "tx_blocked", .desc = "send blocked" }, |
309 | | #define PEERS_EV_TX_ERR (1ULL << 12) |
310 | | { .mask = PEERS_EV_TX_ERR, .name = "tx_error", .desc = "send error" }, |
311 | | |
312 | | |
313 | | #define PEERS_EV_PROTO_ERR (1ULL << 13) |
314 | | { .mask = PEERS_EV_PROTO_ERR, .name = "proto_error", .desc = "protocol error" }, |
315 | | #define PEERS_EV_PROTO_HELLO (1ULL << 14) |
316 | | { .mask = PEERS_EV_PROTO_HELLO, .name = "proto_hello", .desc = "protocol hello message" }, |
317 | | #define PEERS_EV_PROTO_SUCCESS (1ULL << 15) |
318 | | { .mask = PEERS_EV_PROTO_SUCCESS, .name = "proto_success", .desc = "protocol success message" }, |
319 | | #define PEERS_EV_PROTO_UPDATE (1ULL << 16) |
320 | | { .mask = PEERS_EV_PROTO_UPDATE, .name = "proto_update", .desc = "protocol UPDATE message" }, |
321 | | #define PEERS_EV_PROTO_ACK (1ULL << 17) |
322 | | { .mask = PEERS_EV_PROTO_ACK, .name = "proto_ack", .desc = "protocol ACK message" }, |
323 | | #define PEERS_EV_PROTO_SWITCH (1ULL << 18) |
324 | | { .mask = PEERS_EV_PROTO_SWITCH, .name = "proto_switch", .desc = "protocol TABLE SWITCH message" }, |
325 | | #define PEERS_EV_PROTO_DEF (1ULL << 19) |
326 | | { .mask = PEERS_EV_PROTO_DEF, .name = "proto_def", .desc = "protocol TABLE DEFINITION message" }, |
327 | | #define PEERS_EV_PROTO_CTRL (1ULL << 20) |
328 | | { .mask = PEERS_EV_PROTO_CTRL, .name = "proto_ctrl", .desc = "protocol control message" }, |
329 | | { } |
330 | | }; |
331 | | |
332 | | static const struct name_desc peers_trace_lockon_args[4] = { |
333 | | /* arg1 */ { /* already used by the appctx */ }, |
334 | | /* arg2 */ { .name="peer", .desc="Peer" }, |
335 | | /* arg3 */ { .name="peers", .desc="Peers" }, |
336 | | /* arg4 */ { } |
337 | | }; |
338 | | |
339 | | static const struct name_desc peers_trace_decoding[] = { |
340 | 0 | #define PEERS_VERB_CLEAN 1 |
341 | | { .name="clean", .desc="only user-friendly stuff, generally suitable for level \"user\"" }, |
342 | 0 | #define PEERS_VERB_MINIMAL 2 |
343 | | { .name="minimal", .desc="report only peer state and flags, no real decoding" }, |
344 | 0 | #define PEERS_VERB_SIMPLE 3 |
345 | | { .name="simple", .desc="add simple info about messages when available" }, |
346 | | #define PEERS_VERB_ADVANCED 4 |
347 | | { .name="advanced", .desc="add more info about messages when available" }, |
348 | | #define PEERS_VERB_COMPLETE 5 |
349 | | { .name="complete", .desc="add full data dump when available" }, |
350 | | { /* end */ } |
351 | | }; |
352 | | |
353 | | |
354 | | struct trace_source trace_peers = { |
355 | | .name = IST("peers"), |
356 | | .desc = "Peers protocol", |
357 | | .arg_def = TRC_ARG1_APPCTX, |
358 | | .default_cb = peers_trace, |
359 | | .known_events = peers_trace_events, |
360 | | .lockon_args = peers_trace_lockon_args, |
361 | | .decoding = peers_trace_decoding, |
362 | | .report_events = ~0, /* report everything by default */ |
363 | | }; |
364 | | |
365 | | /* Return peer control message types as strings (only for debugging purpose). */ |
366 | | static inline __maybe_unused char *ctrl_msg_type_str(unsigned int type) |
367 | 0 | { |
368 | 0 | switch (type) { |
369 | 0 | case PEER_MSG_CTRL_RESYNCREQ: |
370 | 0 | return "RESYNCREQ"; |
371 | 0 | case PEER_MSG_CTRL_RESYNCFINISHED: |
372 | 0 | return "RESYNCFINISHED"; |
373 | 0 | case PEER_MSG_CTRL_RESYNCPARTIAL: |
374 | 0 | return "RESYNCPARTIAL"; |
375 | 0 | case PEER_MSG_CTRL_RESYNCCONFIRM: |
376 | 0 | return "RESYNCCONFIRM"; |
377 | 0 | case PEER_MSG_CTRL_HEARTBEAT: |
378 | 0 | return "HEARTBEAT"; |
379 | 0 | default: |
380 | 0 | return "???"; |
381 | 0 | } |
382 | 0 | } |
383 | | |
384 | | #define TRACE_SOURCE &trace_peers |
385 | | INITCALL1(STG_REGISTER, trace_register_source, TRACE_SOURCE); |
386 | | |
387 | | static void peers_trace(enum trace_level level, uint64_t mask, |
388 | | const struct trace_source *src, |
389 | | const struct ist where, const struct ist func, |
390 | | const void *a1, const void *a2, const void *a3, const void *a4) |
391 | 0 | { |
392 | 0 | const struct appctx *appctx = a1; |
393 | 0 | const struct peer *peer = a2; |
394 | 0 | const struct peers *peers = NULL; |
395 | 0 | const struct shared_table *st = a3; |
396 | |
|
397 | 0 | if (!peer && appctx) |
398 | 0 | peer = appctx->svcctx; |
399 | 0 | if (!peer || src->verbosity < PEERS_VERB_CLEAN) |
400 | 0 | return; |
401 | 0 | if (!peers) |
402 | 0 | peers = peer->peers; |
403 | 0 | if (!appctx) |
404 | 0 | appctx = peer->appctx; |
405 | |
|
406 | 0 | chunk_appendf(&trace_buf, " : [%c,%s] <%s/%s> ", |
407 | 0 | (appctx ? (appctx_is_back(appctx) ? 'B' : 'F') : '-'), |
408 | 0 | (appctx ? peer_applet_state_str(appctx->st0) : "-"), |
409 | 0 | peers->id, peer->id); |
410 | |
|
411 | 0 | if (peer->local) |
412 | 0 | chunk_appendf(&trace_buf, "RELOADING(%s) ", stopping ? "old" : "new"); |
413 | |
|
414 | 0 | if (src->verbosity == PEERS_VERB_CLEAN) |
415 | 0 | return; |
416 | | |
417 | 0 | chunk_appendf(&trace_buf, "peer=(.fl=0x%08x, .app=%s, .learn=%s, .teach=%s, status=%s, ", |
418 | 0 | peer->flags, peer_app_state_str(peer->appstate), peer_learn_state_str(peer->learnstate), |
419 | 0 | ((peer->flags & PEER_TEACH_FLAGS) == PEER_F_TEACH_PROCESS ? "PROCESS" : |
420 | 0 | ((peer->flags & PEER_F_TEACH_FINISHED) ? "FINISHED" : "NONE")), |
421 | 0 | statuscode_str(peer->statuscode)); |
422 | |
|
423 | 0 | chunk_appendf(&trace_buf, ".reco=%s, ", (peer->reconnect |
424 | 0 | ? (tick_is_expired(peer->reconnect, now_ms) |
425 | 0 | ? "<PAST>" |
426 | 0 | : human_time(TICKS_TO_MS(peer->reconnect - now_ms), TICKS_TO_MS(1000))) |
427 | 0 | : "<NEVER>")); |
428 | |
|
429 | 0 | chunk_appendf(&trace_buf, ".heart=%s, ", (peer->heartbeat |
430 | 0 | ? (tick_is_expired(peer->heartbeat, now_ms) |
431 | 0 | ? "<PAST>" |
432 | 0 | : human_time(TICKS_TO_MS(peer->heartbeat - now_ms), TICKS_TO_MS(1000))) |
433 | 0 | : "<NEVER>")); |
434 | |
|
435 | 0 | chunk_appendf(&trace_buf, ".last_hdshk=%s) ", (peer->last_hdshk |
436 | 0 | ? (tick_is_expired(peer->last_hdshk, now_ms) |
437 | 0 | ? "<PAST>" |
438 | 0 | : human_time(TICKS_TO_MS(peer->last_hdshk - now_ms), TICKS_TO_MS(1000))) |
439 | 0 | : "<NEVER>")); |
440 | |
|
441 | 0 | if (st) |
442 | 0 | chunk_appendf(&trace_buf, "st=(.id=%s, .fl=0x%08x, .pushed=%u, .acked=%u) ", |
443 | 0 | st->table->id, st->flags, st->last_pushed, st->last_acked); |
444 | |
|
445 | 0 | if (src->verbosity == PEERS_VERB_MINIMAL) |
446 | 0 | return; |
447 | | |
448 | 0 | if (appctx) |
449 | 0 | chunk_appendf(&trace_buf, "appctx=(%p, .fl=0x%08x, .st0=%d, .st1=%d) ", |
450 | 0 | appctx, appctx->flags, appctx->st0, appctx->st1); |
451 | |
|
452 | 0 | chunk_appendf(&trace_buf, "peers=(.fl=0x%08x, local=%s) ", |
453 | 0 | peers->flags, peers->local->id); |
454 | |
|
455 | 0 | if (src->verbosity == PEERS_VERB_SIMPLE) |
456 | 0 | return; |
457 | 0 | } |
458 | | |
459 | | static const char *statuscode_str(int statuscode) |
460 | 0 | { |
461 | 0 | switch (statuscode) { |
462 | 0 | case PEER_SESS_SC_CONNECTCODE: |
463 | 0 | return "CONN"; |
464 | 0 | case PEER_SESS_SC_CONNECTEDCODE: |
465 | 0 | return "HSHK"; |
466 | 0 | case PEER_SESS_SC_SUCCESSCODE: |
467 | 0 | return "ESTA"; |
468 | 0 | case PEER_SESS_SC_TRYAGAIN: |
469 | 0 | return "RETR"; |
470 | 0 | case PEER_SESS_SC_ERRPROTO: |
471 | 0 | return "PROT"; |
472 | 0 | case PEER_SESS_SC_ERRVERSION: |
473 | 0 | return "VERS"; |
474 | 0 | case PEER_SESS_SC_ERRHOST: |
475 | 0 | return "NAME"; |
476 | 0 | case PEER_SESS_SC_ERRPEER: |
477 | 0 | return "UNKN"; |
478 | 0 | default: |
479 | 0 | return "NONE"; |
480 | 0 | } |
481 | 0 | } |
482 | | |
483 | | static const char *peer_app_state_str(enum peer_app_state appstate) |
484 | 0 | { |
485 | 0 | switch (appstate) { |
486 | 0 | case PEER_APP_ST_STOPPED: |
487 | 0 | return "STOPPED"; |
488 | 0 | case PEER_APP_ST_STARTING: |
489 | 0 | return "STARTING"; |
490 | 0 | case PEER_APP_ST_RUNNING: |
491 | 0 | return "RUNNING"; |
492 | 0 | case PEER_APP_ST_STOPPING: |
493 | 0 | return "STOPPING"; |
494 | 0 | default: |
495 | 0 | return "UNKNOWN"; |
496 | 0 | } |
497 | 0 | } |
498 | | |
499 | | static const char *peer_learn_state_str(enum peer_learn_state learnstate) |
500 | 0 | { |
501 | 0 | switch (learnstate) { |
502 | 0 | case PEER_LR_ST_NOTASSIGNED: |
503 | 0 | return "NOTASSIGNED"; |
504 | 0 | case PEER_LR_ST_ASSIGNED: |
505 | 0 | return "ASSIGNED"; |
506 | 0 | case PEER_LR_ST_PROCESSING: |
507 | 0 | return "PROCESSING"; |
508 | 0 | case PEER_LR_ST_FINISHED: |
509 | 0 | return "FINISHED"; |
510 | 0 | default: |
511 | 0 | return "UNKNOWN"; |
512 | 0 | } |
513 | 0 | } |
514 | | |
515 | | static const char *peer_applet_state_str(int state) |
516 | 0 | { |
517 | 0 | switch (state) { |
518 | 0 | case PEER_SESS_ST_ACCEPT: return "ACCEPT"; |
519 | 0 | case PEER_SESS_ST_GETVERSION: return "GETVERSION"; |
520 | 0 | case PEER_SESS_ST_GETHOST: return "GETHOST"; |
521 | 0 | case PEER_SESS_ST_GETPEER: return "GETPEER"; |
522 | 0 | case PEER_SESS_ST_SENDSUCCESS: return "SENDSUCCESS"; |
523 | 0 | case PEER_SESS_ST_CONNECT: return "CONNECT"; |
524 | 0 | case PEER_SESS_ST_GETSTATUS: return "GETSTATUS"; |
525 | 0 | case PEER_SESS_ST_WAITMSG: return "WAITMSG"; |
526 | 0 | case PEER_SESS_ST_EXIT: return "EXIT"; |
527 | 0 | case PEER_SESS_ST_ERRPROTO: return "ERRPROTO"; |
528 | 0 | case PEER_SESS_ST_ERRSIZE: return "ERRSIZE"; |
529 | 0 | case PEER_SESS_ST_END: return "END"; |
530 | 0 | default: return "UNKNOWN"; |
531 | 0 | } |
532 | 0 | } |
533 | | |
534 | | /* This function encode an uint64 to 'dynamic' length format. |
535 | | The encoded value is written at address *str, and the |
536 | | caller must assure that size after *str is large enough. |
537 | | At return, the *str is set at the next Byte after then |
538 | | encoded integer. The function returns then length of the |
539 | | encoded integer in Bytes */ |
540 | 0 | int intencode(uint64_t i, char **str) { |
541 | 0 | int idx = 0; |
542 | 0 | unsigned char *msg; |
543 | |
|
544 | 0 | msg = (unsigned char *)*str; |
545 | 0 | if (i < PEER_ENC_2BYTES_MIN) { |
546 | 0 | msg[0] = (unsigned char)i; |
547 | 0 | *str = (char *)&msg[idx+1]; |
548 | 0 | return (idx+1); |
549 | 0 | } |
550 | | |
551 | 0 | msg[idx] =(unsigned char)i | PEER_ENC_2BYTES_MIN; |
552 | 0 | i = (i - PEER_ENC_2BYTES_MIN) >> PEER_ENC_2BYTES_MIN_BITS; |
553 | 0 | while (i >= PEER_ENC_STOP_BYTE) { |
554 | 0 | msg[++idx] = (unsigned char)i | PEER_ENC_STOP_BYTE; |
555 | 0 | i = (i - PEER_ENC_STOP_BYTE) >> PEER_ENC_STOP_BIT; |
556 | 0 | } |
557 | 0 | msg[++idx] = (unsigned char)i; |
558 | 0 | *str = (char *)&msg[idx+1]; |
559 | 0 | return (idx+1); |
560 | 0 | } |
561 | | |
562 | | |
563 | | /* This function returns a decoded 64bits unsigned integer |
564 | | * from a varint |
565 | | * |
566 | | * Calling: |
567 | | * - *str must point on the first byte of the buffer to decode. |
568 | | * - end must point on the next byte after the end of the buffer |
569 | | * we are authorized to parse (buf + buflen) |
570 | | * |
571 | | * At return: |
572 | | * |
573 | | * On success *str will point at the byte following |
574 | | * the fully decoded integer into the buffer. and |
575 | | * the decoded value is returned. |
576 | | * |
577 | | * If end is reached before the integer was fully decoded, |
578 | | * *str is set to NULL and the caller have to check this |
579 | | * to know there is a decoding error. In this case |
580 | | * the returned integer is also forced to 0 |
581 | | */ |
582 | | uint64_t intdecode(char **str, char *end) |
583 | 0 | { |
584 | 0 | unsigned char *msg; |
585 | 0 | uint64_t i; |
586 | 0 | int shift; |
587 | |
|
588 | 0 | if (!*str) |
589 | 0 | return 0; |
590 | | |
591 | 0 | msg = (unsigned char *)*str; |
592 | 0 | if (msg >= (unsigned char *)end) |
593 | 0 | goto fail; |
594 | | |
595 | 0 | i = *(msg++); |
596 | 0 | if (i >= PEER_ENC_2BYTES_MIN) { |
597 | 0 | shift = PEER_ENC_2BYTES_MIN_BITS; |
598 | 0 | do { |
599 | 0 | if (msg >= (unsigned char *)end) |
600 | 0 | goto fail; |
601 | 0 | i += (uint64_t)*msg << shift; |
602 | 0 | shift += PEER_ENC_STOP_BIT; |
603 | 0 | } while (*(msg++) >= PEER_ENC_STOP_BYTE); |
604 | 0 | } |
605 | 0 | *str = (char *)msg; |
606 | 0 | return i; |
607 | | |
608 | 0 | fail: |
609 | 0 | *str = NULL; |
610 | 0 | return 0; |
611 | 0 | } |
612 | | |
613 | | /* |
614 | | * Build a "hello" peer protocol message. |
615 | | * Return the number of written bytes written to build this messages if succeeded, |
616 | | * 0 if not. |
617 | | */ |
618 | | static int peer_prepare_hellomsg(char *msg, size_t size, struct peer_prep_params *p) |
619 | 0 | { |
620 | 0 | int min_ver, ret; |
621 | 0 | struct peer *peer; |
622 | |
|
623 | 0 | peer = p->hello.peer; |
624 | 0 | min_ver = (peer->flags & PEER_F_DWNGRD) ? PEER_DWNGRD_MINOR_VER : PEER_MINOR_VER; |
625 | | /* Prepare headers */ |
626 | 0 | ret = snprintf(msg, size, PEER_SESSION_PROTO_NAME " %d.%d\n%s\n%s %d %d\n", |
627 | 0 | (int)PEER_MAJOR_VER, min_ver, peer->id, localpeer, (int)getpid(), (int)1); |
628 | 0 | if (ret >= size) |
629 | 0 | return 0; |
630 | | |
631 | 0 | return ret; |
632 | 0 | } |
633 | | |
634 | | /* |
635 | | * Build a "handshake succeeded" status message. |
636 | | * Return the number of written bytes written to build this messages if succeeded, |
637 | | * 0 if not. |
638 | | */ |
639 | | static int peer_prepare_status_successmsg(char *msg, size_t size, struct peer_prep_params *p) |
640 | 0 | { |
641 | 0 | int ret; |
642 | |
|
643 | 0 | ret = snprintf(msg, size, "%d\n", (int)PEER_SESS_SC_SUCCESSCODE); |
644 | 0 | if (ret >= size) |
645 | 0 | return 0; |
646 | | |
647 | 0 | return ret; |
648 | 0 | } |
649 | | |
650 | | /* |
651 | | * Build an error status message. |
652 | | * Return the number of written bytes written to build this messages if succeeded, |
653 | | * 0 if not. |
654 | | */ |
655 | | static int peer_prepare_status_errormsg(char *msg, size_t size, struct peer_prep_params *p) |
656 | 0 | { |
657 | 0 | int ret; |
658 | 0 | unsigned int st1; |
659 | |
|
660 | 0 | st1 = p->error_status.st1; |
661 | 0 | ret = snprintf(msg, size, "%u\n", st1); |
662 | 0 | if (ret >= size) |
663 | 0 | return 0; |
664 | | |
665 | 0 | return ret; |
666 | 0 | } |
667 | | |
668 | | /* Set the stick-table UPDATE message type byte at <msg_type> address, |
669 | | * depending on <use_identifier> and <use_timed> boolean parameters. |
670 | | * Always successful. |
671 | | */ |
672 | | static inline void peer_set_update_msg_type(char *msg_type, int use_identifier, int use_timed) |
673 | 0 | { |
674 | 0 | if (use_timed) { |
675 | 0 | if (use_identifier) |
676 | 0 | *msg_type = PEER_MSG_STKT_UPDATE_TIMED; |
677 | 0 | else |
678 | 0 | *msg_type = PEER_MSG_STKT_INCUPDATE_TIMED; |
679 | 0 | } |
680 | 0 | else { |
681 | 0 | if (use_identifier) |
682 | 0 | *msg_type = PEER_MSG_STKT_UPDATE; |
683 | 0 | else |
684 | 0 | *msg_type = PEER_MSG_STKT_INCUPDATE; |
685 | 0 | } |
686 | 0 | } |
687 | | /* |
688 | | * This prepare the data update message on the stick session <ts>, <st> is the considered |
689 | | * stick table. |
690 | | * <msg> is a buffer of <size> to receive data message content |
691 | | * If function returns 0, the caller should consider we were unable to encode this message (TODO: |
692 | | * check size) |
693 | | */ |
694 | | int peer_prepare_updatemsg(char *msg, size_t size, struct peer_prep_params *p) |
695 | 0 | { |
696 | 0 | uint32_t netinteger; |
697 | 0 | unsigned short datalen; |
698 | 0 | char *cursor, *datamsg; |
699 | 0 | unsigned int data_type; |
700 | 0 | void *data_ptr; |
701 | 0 | struct stksess *ts; |
702 | 0 | struct shared_table *st; |
703 | 0 | unsigned int updateid; |
704 | 0 | int use_identifier; |
705 | 0 | int use_timed; |
706 | 0 | struct peer *peer; |
707 | |
|
708 | 0 | ts = p->updt.stksess; |
709 | 0 | st = p->updt.shared_table; |
710 | 0 | updateid = p->updt.updateid; |
711 | 0 | use_identifier = p->updt.use_identifier; |
712 | 0 | use_timed = p->updt.use_timed; |
713 | 0 | peer = p->updt.peer; |
714 | |
|
715 | 0 | cursor = datamsg = msg + PEER_MSG_HEADER_LEN + PEER_MSG_ENC_LENGTH_MAXLEN; |
716 | | |
717 | | /* construct message */ |
718 | | |
719 | | /* check if we need to send the update identifier */ |
720 | 0 | if (!st->last_pushed || updateid < st->last_pushed || ((updateid - st->last_pushed) != 1)) { |
721 | 0 | use_identifier = 1; |
722 | 0 | } |
723 | | |
724 | | /* encode update identifier if needed */ |
725 | 0 | if (use_identifier) { |
726 | 0 | netinteger = htonl(updateid); |
727 | 0 | memcpy(cursor, &netinteger, sizeof(netinteger)); |
728 | 0 | cursor += sizeof(netinteger); |
729 | 0 | } |
730 | |
|
731 | 0 | if (use_timed) { |
732 | 0 | netinteger = htonl(tick_remain(now_ms, ts->expire)); |
733 | 0 | memcpy(cursor, &netinteger, sizeof(netinteger)); |
734 | 0 | cursor += sizeof(netinteger); |
735 | 0 | } |
736 | | |
737 | | /* encode the key */ |
738 | 0 | if (st->table->type == SMP_T_STR) { |
739 | 0 | int stlen = strlen((char *)ts->key.key); |
740 | |
|
741 | 0 | intencode(stlen, &cursor); |
742 | 0 | memcpy(cursor, ts->key.key, stlen); |
743 | 0 | cursor += stlen; |
744 | 0 | } |
745 | 0 | else if (st->table->type == SMP_T_SINT) { |
746 | 0 | netinteger = htonl(read_u32(ts->key.key)); |
747 | 0 | memcpy(cursor, &netinteger, sizeof(netinteger)); |
748 | 0 | cursor += sizeof(netinteger); |
749 | 0 | } |
750 | 0 | else { |
751 | 0 | memcpy(cursor, ts->key.key, st->table->key_size); |
752 | 0 | cursor += st->table->key_size; |
753 | 0 | } |
754 | |
|
755 | 0 | HA_RWLOCK_RDLOCK(STK_SESS_LOCK, &ts->lock); |
756 | | /* encode values */ |
757 | 0 | for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) { |
758 | |
|
759 | 0 | data_ptr = stktable_data_ptr(st->table, ts, data_type); |
760 | 0 | if (data_ptr) { |
761 | | /* in case of array all elements use |
762 | | * the same std_type and they are linearly |
763 | | * encoded. |
764 | | */ |
765 | 0 | if (stktable_data_types[data_type].is_array) { |
766 | 0 | unsigned int idx = 0; |
767 | |
|
768 | 0 | switch (stktable_data_types[data_type].std_type) { |
769 | 0 | case STD_T_SINT: { |
770 | 0 | int data; |
771 | |
|
772 | 0 | do { |
773 | 0 | data = stktable_data_cast(data_ptr, std_t_sint); |
774 | 0 | intencode(data, &cursor); |
775 | |
|
776 | 0 | data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx); |
777 | 0 | } while(data_ptr); |
778 | 0 | break; |
779 | 0 | } |
780 | 0 | case STD_T_UINT: { |
781 | 0 | unsigned int data; |
782 | |
|
783 | 0 | do { |
784 | 0 | data = stktable_data_cast(data_ptr, std_t_uint); |
785 | 0 | intencode(data, &cursor); |
786 | |
|
787 | 0 | data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx); |
788 | 0 | } while(data_ptr); |
789 | 0 | break; |
790 | 0 | } |
791 | 0 | case STD_T_ULL: { |
792 | 0 | unsigned long long data; |
793 | |
|
794 | 0 | do { |
795 | 0 | data = stktable_data_cast(data_ptr, std_t_ull); |
796 | 0 | intencode(data, &cursor); |
797 | |
|
798 | 0 | data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx); |
799 | 0 | } while(data_ptr); |
800 | 0 | break; |
801 | 0 | } |
802 | 0 | case STD_T_FRQP: { |
803 | 0 | struct freq_ctr *frqp; |
804 | |
|
805 | 0 | do { |
806 | 0 | frqp = &stktable_data_cast(data_ptr, std_t_frqp); |
807 | 0 | intencode((unsigned int)(now_ms - frqp->curr_tick), &cursor); |
808 | 0 | intencode(frqp->curr_ctr, &cursor); |
809 | 0 | intencode(frqp->prev_ctr, &cursor); |
810 | |
|
811 | 0 | data_ptr = stktable_data_ptr_idx(st->table, ts, data_type, ++idx); |
812 | 0 | } while(data_ptr); |
813 | 0 | break; |
814 | 0 | } |
815 | 0 | } |
816 | | |
817 | | /* array elements fully encoded |
818 | | * proceed next data_type. |
819 | | */ |
820 | 0 | continue; |
821 | 0 | } |
822 | 0 | switch (stktable_data_types[data_type].std_type) { |
823 | 0 | case STD_T_SINT: { |
824 | 0 | int data; |
825 | |
|
826 | 0 | data = stktable_data_cast(data_ptr, std_t_sint); |
827 | 0 | intencode(data, &cursor); |
828 | 0 | break; |
829 | 0 | } |
830 | 0 | case STD_T_UINT: { |
831 | 0 | unsigned int data; |
832 | |
|
833 | 0 | data = stktable_data_cast(data_ptr, std_t_uint); |
834 | 0 | intencode(data, &cursor); |
835 | 0 | break; |
836 | 0 | } |
837 | 0 | case STD_T_ULL: { |
838 | 0 | unsigned long long data; |
839 | |
|
840 | 0 | data = stktable_data_cast(data_ptr, std_t_ull); |
841 | 0 | intencode(data, &cursor); |
842 | 0 | break; |
843 | 0 | } |
844 | 0 | case STD_T_FRQP: { |
845 | 0 | struct freq_ctr *frqp; |
846 | |
|
847 | 0 | frqp = &stktable_data_cast(data_ptr, std_t_frqp); |
848 | 0 | intencode((unsigned int)(now_ms - frqp->curr_tick), &cursor); |
849 | 0 | intencode(frqp->curr_ctr, &cursor); |
850 | 0 | intencode(frqp->prev_ctr, &cursor); |
851 | 0 | break; |
852 | 0 | } |
853 | 0 | case STD_T_DICT: { |
854 | 0 | struct dict_entry *de; |
855 | 0 | struct ebpt_node *cached_de; |
856 | 0 | struct dcache_tx_entry cde = { }; |
857 | 0 | char *beg, *end; |
858 | 0 | size_t value_len, data_len; |
859 | 0 | struct dcache *dc; |
860 | |
|
861 | 0 | de = stktable_data_cast(data_ptr, std_t_dict); |
862 | 0 | if (!de) { |
863 | | /* No entry */ |
864 | 0 | intencode(0, &cursor); |
865 | 0 | break; |
866 | 0 | } |
867 | | |
868 | 0 | dc = peer->dcache; |
869 | 0 | cde.entry.key = de; |
870 | 0 | cached_de = dcache_tx_insert(dc, &cde); |
871 | 0 | if (cached_de == &cde.entry) { |
872 | 0 | if (cde.id + 1 >= PEER_ENC_2BYTES_MIN) |
873 | 0 | break; |
874 | | /* Encode the length of the remaining data -> 1 */ |
875 | 0 | intencode(1, &cursor); |
876 | | /* Encode the cache entry ID */ |
877 | 0 | intencode(cde.id + 1, &cursor); |
878 | 0 | } |
879 | 0 | else { |
880 | | /* Leave enough room to encode the remaining data length. */ |
881 | 0 | end = beg = cursor + PEER_MSG_ENC_LENGTH_MAXLEN; |
882 | | /* Encode the dictionary entry key */ |
883 | 0 | intencode(cde.id + 1, &end); |
884 | | /* Encode the length of the dictionary entry data */ |
885 | 0 | value_len = de->len; |
886 | 0 | intencode(value_len, &end); |
887 | | /* Copy the data */ |
888 | 0 | memcpy(end, de->value.key, value_len); |
889 | 0 | end += value_len; |
890 | | /* Encode the length of the data */ |
891 | 0 | data_len = end - beg; |
892 | 0 | intencode(data_len, &cursor); |
893 | 0 | memmove(cursor, beg, data_len); |
894 | 0 | cursor += data_len; |
895 | 0 | } |
896 | 0 | break; |
897 | 0 | } |
898 | 0 | } |
899 | 0 | } |
900 | 0 | } |
901 | 0 | HA_RWLOCK_RDUNLOCK(STK_SESS_LOCK, &ts->lock); |
902 | | |
903 | | /* Compute datalen */ |
904 | 0 | datalen = (cursor - datamsg); |
905 | | |
906 | | /* prepare message header */ |
907 | 0 | msg[0] = PEER_MSG_CLASS_STICKTABLE; |
908 | 0 | peer_set_update_msg_type(&msg[1], use_identifier, use_timed); |
909 | 0 | cursor = &msg[2]; |
910 | 0 | intencode(datalen, &cursor); |
911 | | |
912 | | /* move data after header */ |
913 | 0 | memmove(cursor, datamsg, datalen); |
914 | | |
915 | | /* return header size + data_len */ |
916 | 0 | return (cursor - msg) + datalen; |
917 | 0 | } |
918 | | |
919 | | /* |
920 | | * This prepare the switch table message to targeted share table <st>. |
921 | | * <msg> is a buffer of <size> to receive data message content |
922 | | * If function returns 0, the caller should consider we were unable to encode this message (TODO: |
923 | | * check size) |
924 | | */ |
925 | | static int peer_prepare_switchmsg(char *msg, size_t size, struct peer_prep_params *params) |
926 | 0 | { |
927 | 0 | int len; |
928 | 0 | unsigned short datalen; |
929 | 0 | struct buffer *chunk; |
930 | 0 | char *cursor, *datamsg, *chunkp, *chunkq; |
931 | 0 | uint64_t data = 0; |
932 | 0 | unsigned int data_type; |
933 | 0 | struct shared_table *st; |
934 | |
|
935 | 0 | st = params->swtch.shared_table; |
936 | 0 | cursor = datamsg = msg + PEER_MSG_HEADER_LEN + PEER_MSG_ENC_LENGTH_MAXLEN; |
937 | | |
938 | | /* Encode data */ |
939 | | |
940 | | /* encode local id */ |
941 | 0 | intencode(st->local_id, &cursor); |
942 | | |
943 | | /* encode table name */ |
944 | 0 | len = strlen(st->table->nid); |
945 | 0 | intencode(len, &cursor); |
946 | 0 | memcpy(cursor, st->table->nid, len); |
947 | 0 | cursor += len; |
948 | | |
949 | | /* encode table type */ |
950 | |
|
951 | 0 | intencode(peer_net_key_type[st->table->type], &cursor); |
952 | | |
953 | | /* encode table key size */ |
954 | 0 | intencode(st->table->key_size, &cursor); |
955 | |
|
956 | 0 | chunk = get_trash_chunk(); |
957 | 0 | chunkp = chunkq = chunk->area; |
958 | | /* encode available known data types in table */ |
959 | 0 | for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) { |
960 | 0 | if (st->table->data_ofs[data_type]) { |
961 | | /* stored data types parameters are all linearly encoded |
962 | | * at the end of the 'table definition' message. |
963 | | * |
964 | | * Currently only array data_types and and data_types |
965 | | * using freq_counter base type have parameters: |
966 | | * |
967 | | * - array has always at least one parameter set to the |
968 | | * number of elements. |
969 | | * |
970 | | * - array of base-type freq_counters has an additional |
971 | | * parameter set to the period used to compute those |
972 | | * freq_counters. |
973 | | * |
974 | | * - simple freq counter has a parameter set to the period |
975 | | * used to compute |
976 | | * |
977 | | * A set of parameter for a datatype MUST BE prefixed |
978 | | * by the data-type id itself: |
979 | | * This is useless because the data_types are ordered and |
980 | | * the data_type bitfield already gives the information of |
981 | | * stored types, but it was designed this way when the |
982 | | * push of period parameter was added for freq counters |
983 | | * and we don't want to break the compatibility. |
984 | | * |
985 | | */ |
986 | 0 | if (stktable_data_types[data_type].is_array) { |
987 | | /* This is an array type so we first encode |
988 | | * the data_type itself to prefix parameters |
989 | | */ |
990 | 0 | intencode(data_type, &chunkq); |
991 | | |
992 | | /* We encode the first parameter which is |
993 | | * the number of elements of this array |
994 | | */ |
995 | 0 | intencode(st->table->data_nbelem[data_type], &chunkq); |
996 | | |
997 | | /* for array of freq counters, there is an additional |
998 | | * period parameter to encode |
999 | | */ |
1000 | 0 | if (stktable_data_types[data_type].std_type == STD_T_FRQP) |
1001 | 0 | intencode(st->table->data_arg[data_type].u, &chunkq); |
1002 | 0 | } |
1003 | 0 | else if (stktable_data_types[data_type].std_type == STD_T_FRQP) { |
1004 | | /* this datatype is a simple freq counter not part |
1005 | | * of an array. We encode the data_type itself |
1006 | | * to prefix the 'period' parameter |
1007 | | */ |
1008 | 0 | intencode(data_type, &chunkq); |
1009 | 0 | intencode(st->table->data_arg[data_type].u, &chunkq); |
1010 | 0 | } |
1011 | | /* set the bit corresponding to stored data type */ |
1012 | 0 | data |= 1ULL << data_type; |
1013 | 0 | } |
1014 | 0 | } |
1015 | 0 | intencode(data, &cursor); |
1016 | | |
1017 | | /* Encode stick-table entries duration. */ |
1018 | 0 | intencode(st->table->expire, &cursor); |
1019 | |
|
1020 | 0 | if (chunkq > chunkp) { |
1021 | 0 | chunk->data = chunkq - chunkp; |
1022 | 0 | memcpy(cursor, chunk->area, chunk->data); |
1023 | 0 | cursor += chunk->data; |
1024 | 0 | } |
1025 | | |
1026 | | /* Compute datalen */ |
1027 | 0 | datalen = (cursor - datamsg); |
1028 | | |
1029 | | /* prepare message header */ |
1030 | 0 | msg[0] = PEER_MSG_CLASS_STICKTABLE; |
1031 | 0 | msg[1] = PEER_MSG_STKT_DEFINE; |
1032 | 0 | cursor = &msg[2]; |
1033 | 0 | intencode(datalen, &cursor); |
1034 | | |
1035 | | /* move data after header */ |
1036 | 0 | memmove(cursor, datamsg, datalen); |
1037 | | |
1038 | | /* return header size + data_len */ |
1039 | 0 | return (cursor - msg) + datalen; |
1040 | 0 | } |
1041 | | |
1042 | | /* |
1043 | | * This prepare the acknowledge message on the stick session <ts>, <st> is the considered |
1044 | | * stick table. |
1045 | | * <msg> is a buffer of <size> to receive data message content |
1046 | | * If function returns 0, the caller should consider we were unable to encode this message (TODO: |
1047 | | * check size) |
1048 | | */ |
1049 | | static int peer_prepare_ackmsg(char *msg, size_t size, struct peer_prep_params *p) |
1050 | 0 | { |
1051 | 0 | unsigned short datalen; |
1052 | 0 | char *cursor, *datamsg; |
1053 | 0 | uint32_t netinteger; |
1054 | 0 | struct shared_table *st; |
1055 | |
|
1056 | 0 | cursor = datamsg = msg + PEER_MSG_HEADER_LEN + PEER_MSG_ENC_LENGTH_MAXLEN; |
1057 | |
|
1058 | 0 | st = p->ack.shared_table; |
1059 | 0 | intencode(st->remote_id, &cursor); |
1060 | 0 | netinteger = htonl(st->last_get); |
1061 | 0 | memcpy(cursor, &netinteger, sizeof(netinteger)); |
1062 | 0 | cursor += sizeof(netinteger); |
1063 | | |
1064 | | /* Compute datalen */ |
1065 | 0 | datalen = (cursor - datamsg); |
1066 | | |
1067 | | /* prepare message header */ |
1068 | 0 | msg[0] = PEER_MSG_CLASS_STICKTABLE; |
1069 | 0 | msg[1] = PEER_MSG_STKT_ACK; |
1070 | 0 | cursor = &msg[2]; |
1071 | 0 | intencode(datalen, &cursor); |
1072 | | |
1073 | | /* move data after header */ |
1074 | 0 | memmove(cursor, datamsg, datalen); |
1075 | | |
1076 | | /* return header size + data_len */ |
1077 | 0 | return (cursor - msg) + datalen; |
1078 | 0 | } |
1079 | | |
1080 | | /* |
1081 | | * Function to deinit connected peer |
1082 | | */ |
1083 | | void __peer_session_deinit(struct peer *peer) |
1084 | 0 | { |
1085 | 0 | struct peers *peers = peer->peers; |
1086 | 0 | int thr; |
1087 | |
|
1088 | 0 | if (!peers || !peer->appctx) |
1089 | 0 | return; |
1090 | | |
1091 | 0 | thr = peer->appctx->t->tid; |
1092 | 0 | HA_ATOMIC_DEC(&peers->applet_count[thr]); |
1093 | |
|
1094 | 0 | if (peer->appctx->st0 == PEER_SESS_ST_WAITMSG) |
1095 | 0 | HA_ATOMIC_DEC(&connected_peers); |
1096 | |
|
1097 | 0 | HA_ATOMIC_DEC(&active_peers); |
1098 | |
|
1099 | 0 | flush_dcache(peer); |
1100 | | |
1101 | | /* Re-init current table pointers to force announcement on re-connect */ |
1102 | 0 | peer->remote_table = peer->last_local_table = peer->stop_local_table = NULL; |
1103 | 0 | peer->appctx = NULL; |
1104 | | |
1105 | | /* reset teaching flags to 0 */ |
1106 | 0 | peer->flags &= ~PEER_TEACH_FLAGS; |
1107 | | |
1108 | | /* Mark the peer as stopping and wait for the sync task */ |
1109 | 0 | peer->flags |= PEER_F_WAIT_SYNCTASK_ACK; |
1110 | 0 | peer->appstate = PEER_APP_ST_STOPPING; |
1111 | 0 | TRACE_STATE("peer session stopping", PEERS_EV_SESS_END, peer->appctx, peer); |
1112 | 0 | task_wakeup(peers->sync_task, TASK_WOKEN_MSG); |
1113 | 0 | } |
1114 | | |
1115 | | static int peer_session_init(struct appctx *appctx) |
1116 | 0 | { |
1117 | 0 | struct peer *peer = appctx->svcctx; |
1118 | 0 | struct stream *s; |
1119 | 0 | struct sockaddr_storage *addr = NULL; |
1120 | |
|
1121 | 0 | TRACE_ENTER(PEERS_EV_SESS_NEW, appctx, peer); |
1122 | 0 | if (!sockaddr_alloc(&addr, &peer->srv->addr, sizeof(peer->srv->addr))) |
1123 | 0 | goto out_error; |
1124 | 0 | set_host_port(addr, peer->srv->svc_port); |
1125 | |
|
1126 | 0 | if (appctx_finalize_startup(appctx, peer->peers->peers_fe, &BUF_NULL) == -1) |
1127 | 0 | goto out_free_addr; |
1128 | | |
1129 | 0 | s = appctx_strm(appctx); |
1130 | | /* applet is waiting for data */ |
1131 | 0 | applet_need_more_data(appctx); |
1132 | 0 | appctx_wakeup(appctx); |
1133 | | |
1134 | | /* initiate an outgoing connection */ |
1135 | 0 | s->scb->dst = addr; |
1136 | 0 | s->scb->flags |= (SC_FL_RCV_ONCE|SC_FL_NOLINGER); |
1137 | 0 | s->flags = SF_ASSIGNED; |
1138 | 0 | stream_set_srv_target(s, peer->srv); |
1139 | |
|
1140 | 0 | s->do_log = NULL; |
1141 | 0 | s->uniq_id = 0; |
1142 | 0 | _HA_ATOMIC_INC(&active_peers); |
1143 | 0 | TRACE_LEAVE(PEERS_EV_SESS_NEW, appctx, peer); |
1144 | 0 | return 0; |
1145 | | |
1146 | 0 | out_free_addr: |
1147 | 0 | sockaddr_free(&addr); |
1148 | 0 | out_error: |
1149 | 0 | TRACE_ERROR("peer session init failed", PEERS_EV_SESS_NEW|PEERS_EV_SESS_END|PEERS_EV_SESS_ERR, NULL, peer); |
1150 | 0 | return -1; |
1151 | 0 | } |
1152 | | |
1153 | | /* |
1154 | | * Callback to release a session with a peer |
1155 | | */ |
1156 | | static void peer_session_release(struct appctx *appctx) |
1157 | 0 | { |
1158 | 0 | struct peer *peer = appctx->svcctx; |
1159 | | |
1160 | | /* appctx->svcctx is not a peer session */ |
1161 | 0 | if (appctx->st0 < PEER_SESS_ST_SENDSUCCESS) |
1162 | 0 | return; |
1163 | | |
1164 | | /* peer session identified */ |
1165 | 0 | if (peer) { |
1166 | 0 | HA_SPIN_LOCK(PEER_LOCK, &peer->lock); |
1167 | 0 | if (peer->appctx == appctx) |
1168 | 0 | __peer_session_deinit(peer); |
1169 | 0 | peer->flags &= ~PEER_F_ALIVE; |
1170 | 0 | HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock); |
1171 | 0 | TRACE_STATE("peer session released", PEERS_EV_SESS_END, appctx, peer); |
1172 | 0 | } |
1173 | 0 | } |
1174 | | |
1175 | | /* Retrieve the major and minor versions of peers protocol |
1176 | | * announced by a remote peer. <str> is a null-terminated |
1177 | | * string with the following format: "<maj_ver>.<min_ver>". |
1178 | | */ |
1179 | | static int peer_get_version(const char *str, |
1180 | | unsigned int *maj_ver, unsigned int *min_ver) |
1181 | 0 | { |
1182 | 0 | unsigned int majv, minv; |
1183 | 0 | const char *pos, *saved; |
1184 | 0 | const char *end; |
1185 | |
|
1186 | 0 | saved = pos = str; |
1187 | 0 | end = str + strlen(str); |
1188 | |
|
1189 | 0 | majv = read_uint(&pos, end); |
1190 | 0 | if (saved == pos || *pos++ != '.') |
1191 | 0 | return -1; |
1192 | | |
1193 | 0 | saved = pos; |
1194 | 0 | minv = read_uint(&pos, end); |
1195 | 0 | if (saved == pos || pos != end) |
1196 | 0 | return -1; |
1197 | | |
1198 | 0 | *maj_ver = majv; |
1199 | 0 | *min_ver = minv; |
1200 | |
|
1201 | 0 | return 0; |
1202 | 0 | } |
1203 | | |
1204 | | /* |
1205 | | * Parse a line terminated by an optional '\r' character, followed by a mandatory |
1206 | | * '\n' character. |
1207 | | * Returns 1 if succeeded or 0 if a '\n' character could not be found, and -1 if |
1208 | | * a line could not be read because the communication channel is closed. |
1209 | | */ |
1210 | | static inline int peer_getline(struct appctx *appctx) |
1211 | 0 | { |
1212 | 0 | int n = 0; |
1213 | |
|
1214 | 0 | TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx); |
1215 | 0 | if (applet_get_inbuf(appctx) == NULL) { |
1216 | 0 | applet_need_more_data(appctx); |
1217 | 0 | goto out; |
1218 | 0 | } |
1219 | | |
1220 | 0 | n = applet_getline(appctx, trash.area, trash.size); |
1221 | 0 | if (!n) { |
1222 | 0 | applet_need_more_data(appctx); |
1223 | 0 | goto out; |
1224 | 0 | } |
1225 | | |
1226 | 0 | if (n < 0 || trash.area[n - 1] != '\n') { |
1227 | 0 | appctx->st0 = PEER_SESS_ST_END; |
1228 | 0 | TRACE_ERROR("failed to receive data (channel closed or full)", PEERS_EV_SESS_IO|PEERS_EV_RX_ERR, appctx); |
1229 | 0 | return -1; |
1230 | 0 | } |
1231 | | |
1232 | 0 | if (n > 1 && (trash.area[n - 2] == '\r')) |
1233 | 0 | trash.area[n - 2] = 0; |
1234 | 0 | else |
1235 | 0 | trash.area[n - 1] = 0; |
1236 | |
|
1237 | 0 | applet_skip_input(appctx, n); |
1238 | 0 | out: |
1239 | 0 | TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx); |
1240 | 0 | return n; |
1241 | 0 | } |
1242 | | |
1243 | | /* |
1244 | | * Send a message after having called <peer_prepare_msg> to build it. |
1245 | | * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. |
1246 | | * Returns -1 if there was not enough room left to send the message, |
1247 | | * any other negative returned value must be considered as an error with an appcxt st0 |
1248 | | * returned value equal to PEER_SESS_ST_END. |
1249 | | */ |
1250 | | static inline int peer_send_msg(struct appctx *appctx, |
1251 | | int (*peer_prepare_msg)(char *, size_t, struct peer_prep_params *), |
1252 | | struct peer_prep_params *params) |
1253 | 0 | { |
1254 | 0 | int ret, msglen; |
1255 | |
|
1256 | 0 | TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_TX_MSG, appctx); |
1257 | 0 | msglen = peer_prepare_msg(trash.area, trash.size, params); |
1258 | 0 | if (!msglen) { |
1259 | | /* internal error: message does not fit in trash */ |
1260 | 0 | appctx->st0 = PEER_SESS_ST_END; |
1261 | 0 | TRACE_ERROR("failed to send data (message too long)", PEERS_EV_SESS_IO|PEERS_EV_TX_ERR, appctx); |
1262 | 0 | return 0; |
1263 | 0 | } |
1264 | | |
1265 | | /* message to buffer */ |
1266 | 0 | ret = applet_putblk(appctx, trash.area, msglen); |
1267 | 0 | if (ret <= 0) { |
1268 | 0 | if (ret != -1) { |
1269 | 0 | TRACE_ERROR("failed to send data (channel closed)", PEERS_EV_SESS_IO|PEERS_EV_TX_ERR, appctx); |
1270 | 0 | appctx->st0 = PEER_SESS_ST_END; |
1271 | 0 | } |
1272 | 0 | } |
1273 | |
|
1274 | 0 | TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_TX_MSG, appctx); |
1275 | 0 | return ret; |
1276 | 0 | } |
1277 | | |
1278 | | /* |
1279 | | * Send a hello message. |
1280 | | * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. |
1281 | | * Returns -1 if there was not enough room left to send the message, |
1282 | | * any other negative returned value must be considered as an error with an appcxt st0 |
1283 | | * returned value equal to PEER_SESS_ST_END. |
1284 | | */ |
1285 | | static inline int peer_send_hellomsg(struct appctx *appctx, struct peer *peer) |
1286 | 0 | { |
1287 | 0 | struct peer_prep_params p = { |
1288 | 0 | .hello.peer = peer, |
1289 | 0 | }; |
1290 | |
|
1291 | 0 | TRACE_PROTO("send hello message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_HELLO, appctx, peer); |
1292 | 0 | return peer_send_msg(appctx, peer_prepare_hellomsg, &p); |
1293 | 0 | } |
1294 | | |
1295 | | /* |
1296 | | * Send a success peer handshake status message. |
1297 | | * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. |
1298 | | * Returns -1 if there was not enough room left to send the message, |
1299 | | * any other negative returned value must be considered as an error with an appcxt st0 |
1300 | | * returned value equal to PEER_SESS_ST_END. |
1301 | | */ |
1302 | | static inline int peer_send_status_successmsg(struct appctx *appctx) |
1303 | 0 | { |
1304 | 0 | TRACE_PROTO("send status success message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_SUCCESS, appctx); |
1305 | 0 | return peer_send_msg(appctx, peer_prepare_status_successmsg, NULL); |
1306 | 0 | } |
1307 | | |
1308 | | /* |
1309 | | * Send a peer handshake status error message. |
1310 | | * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. |
1311 | | * Returns -1 if there was not enough room left to send the message, |
1312 | | * any other negative returned value must be considered as an error with an appcxt st0 |
1313 | | * returned value equal to PEER_SESS_ST_END. |
1314 | | */ |
1315 | | static inline int peer_send_status_errormsg(struct appctx *appctx) |
1316 | 0 | { |
1317 | 0 | struct peer_prep_params p = { |
1318 | 0 | .error_status.st1 = appctx->st1, |
1319 | 0 | }; |
1320 | |
|
1321 | 0 | TRACE_PROTO("send status error message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_ERR, appctx); |
1322 | 0 | return peer_send_msg(appctx, peer_prepare_status_errormsg, &p); |
1323 | 0 | } |
1324 | | |
1325 | | /* |
1326 | | * Send a stick-table switch message. |
1327 | | * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. |
1328 | | * Returns -1 if there was not enough room left to send the message, |
1329 | | * any other negative returned value must be considered as an error with an appcxt st0 |
1330 | | * returned value equal to PEER_SESS_ST_END. |
1331 | | */ |
1332 | | static inline int peer_send_switchmsg(struct shared_table *st, struct appctx *appctx) |
1333 | 0 | { |
1334 | 0 | struct peer_prep_params p = { |
1335 | 0 | .swtch.shared_table = st, |
1336 | 0 | }; |
1337 | |
|
1338 | 0 | TRACE_PROTO("send table switch message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_SWITCH, appctx, NULL, st); |
1339 | 0 | return peer_send_msg(appctx, peer_prepare_switchmsg, &p); |
1340 | 0 | } |
1341 | | |
1342 | | /* |
1343 | | * Send a stick-table update acknowledgement message. |
1344 | | * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. |
1345 | | * Returns -1 if there was not enough room left to send the message, |
1346 | | * any other negative returned value must be considered as an error with an appcxt st0 |
1347 | | * returned value equal to PEER_SESS_ST_END. |
1348 | | */ |
1349 | | static inline int peer_send_ackmsg(struct shared_table *st, struct appctx *appctx) |
1350 | 0 | { |
1351 | 0 | struct peer_prep_params p = { |
1352 | 0 | .ack.shared_table = st, |
1353 | 0 | }; |
1354 | |
|
1355 | 0 | TRACE_PROTO("send ack message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_ACK, appctx, NULL, st); |
1356 | 0 | return peer_send_msg(appctx, peer_prepare_ackmsg, &p); |
1357 | 0 | } |
1358 | | |
1359 | | /* |
1360 | | * Send a stick-table update message. |
1361 | | * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. |
1362 | | * Returns -1 if there was not enough room left to send the message, |
1363 | | * any other negative returned value must be considered as an error with an appcxt st0 |
1364 | | * returned value equal to PEER_SESS_ST_END. |
1365 | | */ |
1366 | | static inline int peer_send_updatemsg(struct shared_table *st, struct appctx *appctx, struct stksess *ts, |
1367 | | unsigned int updateid, int use_identifier, int use_timed) |
1368 | 0 | { |
1369 | 0 | struct peer_prep_params p = { |
1370 | 0 | .updt = { |
1371 | 0 | .stksess = ts, |
1372 | 0 | .shared_table = st, |
1373 | 0 | .updateid = updateid, |
1374 | 0 | .use_identifier = use_identifier, |
1375 | 0 | .use_timed = use_timed, |
1376 | 0 | .peer = appctx->svcctx, |
1377 | 0 | }, |
1378 | 0 | }; |
1379 | |
|
1380 | 0 | TRACE_PROTO("send update message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_UPDATE, appctx, NULL, st); |
1381 | 0 | return peer_send_msg(appctx, peer_prepare_updatemsg, &p); |
1382 | 0 | } |
1383 | | |
1384 | | /* |
1385 | | * Build a peer protocol control class message. |
1386 | | * Returns the number of written bytes used to build the message if succeeded, |
1387 | | * 0 if not. |
1388 | | */ |
1389 | | static int peer_prepare_control_msg(char *msg, size_t size, struct peer_prep_params *p) |
1390 | 0 | { |
1391 | 0 | if (size < sizeof p->control.head) |
1392 | 0 | return 0; |
1393 | | |
1394 | 0 | msg[0] = p->control.head[0]; |
1395 | 0 | msg[1] = p->control.head[1]; |
1396 | |
|
1397 | 0 | return 2; |
1398 | 0 | } |
1399 | | |
1400 | | /* |
1401 | | * Send a stick-table synchronization request message. |
1402 | | * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. |
1403 | | * Returns -1 if there was not enough room left to send the message, |
1404 | | * any other negative returned value must be considered as an error with an appctx st0 |
1405 | | * returned value equal to PEER_SESS_ST_END. |
1406 | | */ |
1407 | | static inline int peer_send_resync_reqmsg(struct appctx *appctx, |
1408 | | struct peer *peer, struct peers *peers) |
1409 | 0 | { |
1410 | 0 | struct peer_prep_params p = { |
1411 | 0 | .control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_RESYNCREQ, }, |
1412 | 0 | }; |
1413 | |
|
1414 | 0 | TRACE_PROTO("send resync request message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer); |
1415 | 0 | return peer_send_msg(appctx, peer_prepare_control_msg, &p); |
1416 | 0 | } |
1417 | | |
1418 | | /* |
1419 | | * Send a stick-table synchronization confirmation message. |
1420 | | * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. |
1421 | | * Returns -1 if there was not enough room left to send the message, |
1422 | | * any other negative returned value must be considered as an error with an appctx st0 |
1423 | | * returned value equal to PEER_SESS_ST_END. |
1424 | | */ |
1425 | | static inline int peer_send_resync_confirmsg(struct appctx *appctx, |
1426 | | struct peer *peer, struct peers *peers) |
1427 | 0 | { |
1428 | 0 | struct peer_prep_params p = { |
1429 | 0 | .control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_RESYNCCONFIRM, }, |
1430 | 0 | }; |
1431 | |
|
1432 | 0 | TRACE_PROTO("send resync confirm message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer); |
1433 | 0 | return peer_send_msg(appctx, peer_prepare_control_msg, &p); |
1434 | 0 | } |
1435 | | |
1436 | | /* |
1437 | | * Send a stick-table synchronization finished message. |
1438 | | * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. |
1439 | | * Returns -1 if there was not enough room left to send the message, |
1440 | | * any other negative returned value must be considered as an error with an appctx st0 |
1441 | | * returned value equal to PEER_SESS_ST_END. |
1442 | | */ |
1443 | | static inline int peer_send_resync_finishedmsg(struct appctx *appctx, |
1444 | | struct peer *peer, struct peers *peers) |
1445 | 0 | { |
1446 | 0 | struct peer_prep_params p = { |
1447 | 0 | .control.head = { PEER_MSG_CLASS_CONTROL, }, |
1448 | 0 | }; |
1449 | |
|
1450 | 0 | p.control.head[1] = (HA_ATOMIC_LOAD(&peers->flags) & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED ? |
1451 | 0 | PEER_MSG_CTRL_RESYNCFINISHED : PEER_MSG_CTRL_RESYNCPARTIAL; |
1452 | |
|
1453 | 0 | TRACE_PROTO("send full resync finish message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer); |
1454 | 0 | return peer_send_msg(appctx, peer_prepare_control_msg, &p); |
1455 | 0 | } |
1456 | | |
1457 | | /* |
1458 | | * Send a heartbeat message. |
1459 | | * Return 0 if the message could not be built modifying the appctx st0 to PEER_SESS_ST_END value. |
1460 | | * Returns -1 if there was not enough room left to send the message, |
1461 | | * any other negative returned value must be considered as an error with an appctx st0 |
1462 | | * returned value equal to PEER_SESS_ST_END. |
1463 | | */ |
1464 | | static inline int peer_send_heartbeatmsg(struct appctx *appctx, |
1465 | | struct peer *peer, struct peers *peers) |
1466 | 0 | { |
1467 | 0 | struct peer_prep_params p = { |
1468 | 0 | .control.head = { PEER_MSG_CLASS_CONTROL, PEER_MSG_CTRL_HEARTBEAT, }, |
1469 | 0 | }; |
1470 | |
|
1471 | 0 | TRACE_PROTO("send heartbeat message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer); |
1472 | 0 | return peer_send_msg(appctx, peer_prepare_control_msg, &p); |
1473 | 0 | } |
1474 | | |
1475 | | /* |
1476 | | * Build a peer protocol error class message. |
1477 | | * Returns the number of written bytes used to build the message if succeeded, |
1478 | | * 0 if not. |
1479 | | */ |
1480 | | static int peer_prepare_error_msg(char *msg, size_t size, struct peer_prep_params *p) |
1481 | 0 | { |
1482 | 0 | if (size < sizeof p->error.head) |
1483 | 0 | return 0; |
1484 | | |
1485 | 0 | msg[0] = p->error.head[0]; |
1486 | 0 | msg[1] = p->error.head[1]; |
1487 | |
|
1488 | 0 | return 2; |
1489 | 0 | } |
1490 | | |
1491 | | /* |
1492 | | * Send a "size limit reached" error message. |
1493 | | * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. |
1494 | | * Returns -1 if there was not enough room left to send the message, |
1495 | | * any other negative returned value must be considered as an error with an appctx st0 |
1496 | | * returned value equal to PEER_SESS_ST_END. |
1497 | | */ |
1498 | | static inline int peer_send_error_size_limitmsg(struct appctx *appctx) |
1499 | 0 | { |
1500 | 0 | struct peer_prep_params p = { |
1501 | 0 | .error.head = { PEER_MSG_CLASS_ERROR, PEER_MSG_ERR_SIZELIMIT, }, |
1502 | 0 | }; |
1503 | |
|
1504 | 0 | TRACE_PROTO("send error size limit message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_ERR, appctx); |
1505 | 0 | return peer_send_msg(appctx, peer_prepare_error_msg, &p); |
1506 | 0 | } |
1507 | | |
1508 | | /* |
1509 | | * Send a "peer protocol" error message. |
1510 | | * Return 0 if the message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. |
1511 | | * Returns -1 if there was not enough room left to send the message, |
1512 | | * any other negative returned value must be considered as an error with an appctx st0 |
1513 | | * returned value equal to PEER_SESS_ST_END. |
1514 | | */ |
1515 | | static inline int peer_send_error_protomsg(struct appctx *appctx) |
1516 | 0 | { |
1517 | 0 | struct peer_prep_params p = { |
1518 | 0 | .error.head = { PEER_MSG_CLASS_ERROR, PEER_MSG_ERR_PROTOCOL, }, |
1519 | 0 | }; |
1520 | |
|
1521 | 0 | TRACE_PROTO("send protocol error message", PEERS_EV_SESS_IO|PEERS_EV_TX_MSG|PEERS_EV_PROTO_ERR, appctx); |
1522 | 0 | return peer_send_msg(appctx, peer_prepare_error_msg, &p); |
1523 | 0 | } |
1524 | | |
1525 | | /* |
1526 | | * Function used to lookup for recent stick-table updates associated with |
1527 | | * <st> shared stick-table when a lesson must be taught a peer (learn state is not PEER_LR_ST_NOTASSIGNED). |
1528 | | */ |
1529 | | static inline struct stksess *peer_teach_process_stksess_lookup(struct shared_table *st) |
1530 | 0 | { |
1531 | 0 | struct eb32_node *eb; |
1532 | 0 | struct stksess *ret; |
1533 | |
|
1534 | 0 | eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); |
1535 | 0 | if (!eb) { |
1536 | 0 | eb = eb32_first(&st->table->updates); |
1537 | 0 | if (!eb || (eb->key == st->last_pushed)) { |
1538 | 0 | st->last_pushed = st->table->localupdate; |
1539 | 0 | return NULL; |
1540 | 0 | } |
1541 | 0 | } |
1542 | | |
1543 | | /* if distance between the last pushed and the retrieved key |
1544 | | * is greater than the distance last_pushed and the local_update |
1545 | | * this means we are beyond localupdate. |
1546 | | */ |
1547 | 0 | if ((eb->key - st->last_pushed) > (st->table->localupdate - st->last_pushed)) { |
1548 | 0 | st->last_pushed = st->table->localupdate; |
1549 | 0 | return NULL; |
1550 | 0 | } |
1551 | | |
1552 | 0 | ret = eb32_entry(eb, struct stksess, upd); |
1553 | 0 | if (!_HA_ATOMIC_LOAD(&ret->seen)) |
1554 | 0 | _HA_ATOMIC_STORE(&ret->seen, 1); |
1555 | 0 | return ret; |
1556 | 0 | } |
1557 | | |
1558 | | /* |
1559 | | * Function used to lookup for recent stick-table updates associated with |
1560 | | * <st> shared stick-table during teach state 1 step. |
1561 | | */ |
1562 | | static inline struct stksess *peer_teach_stage1_stksess_lookup(struct shared_table *st) |
1563 | 0 | { |
1564 | 0 | struct eb32_node *eb; |
1565 | 0 | struct stksess *ret; |
1566 | |
|
1567 | 0 | eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); |
1568 | 0 | if (!eb) { |
1569 | 0 | st->flags |= SHTABLE_F_TEACH_STAGE1; |
1570 | 0 | eb = eb32_first(&st->table->updates); |
1571 | 0 | if (eb) |
1572 | 0 | st->last_pushed = eb->key - 1; |
1573 | 0 | return NULL; |
1574 | 0 | } |
1575 | | |
1576 | 0 | ret = eb32_entry(eb, struct stksess, upd); |
1577 | 0 | if (!_HA_ATOMIC_LOAD(&ret->seen)) |
1578 | 0 | _HA_ATOMIC_STORE(&ret->seen, 1); |
1579 | 0 | return ret; |
1580 | 0 | } |
1581 | | |
1582 | | /* |
1583 | | * Function used to lookup for recent stick-table updates associated with |
1584 | | * <st> shared stick-table during teach state 2 step. |
1585 | | */ |
1586 | | static inline struct stksess *peer_teach_stage2_stksess_lookup(struct shared_table *st) |
1587 | 0 | { |
1588 | 0 | struct eb32_node *eb; |
1589 | 0 | struct stksess *ret; |
1590 | |
|
1591 | 0 | eb = eb32_lookup_ge(&st->table->updates, st->last_pushed+1); |
1592 | 0 | if (!eb || eb->key > st->teaching_origin) { |
1593 | 0 | st->flags |= SHTABLE_F_TEACH_STAGE2; |
1594 | 0 | return NULL; |
1595 | 0 | } |
1596 | | |
1597 | 0 | ret = eb32_entry(eb, struct stksess, upd); |
1598 | 0 | if (!_HA_ATOMIC_LOAD(&ret->seen)) |
1599 | 0 | _HA_ATOMIC_STORE(&ret->seen, 1); |
1600 | 0 | return ret; |
1601 | 0 | } |
1602 | | |
1603 | | /* |
1604 | | * Generic function to emit update messages for <st> stick-table when a lesson must |
1605 | | * be taught to the peer <p>. |
1606 | | * |
1607 | | * This function temporary unlock/lock <st> when it sends stick-table updates or |
1608 | | * when decrementing its refcount in case of any error when it sends this updates. |
1609 | | * It must be called with the stick-table lock released. |
1610 | | * |
1611 | | * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. |
1612 | | * Returns -1 if there was not enough room left to send the message, |
1613 | | * any other negative returned value must be considered as an error with an appcxt st0 |
1614 | | * returned value equal to PEER_SESS_ST_END. |
1615 | | * If it returns 0 or -1, this function leave <st> locked if already locked when entering this function |
1616 | | * unlocked if not already locked when entering this function. |
1617 | | */ |
1618 | | int peer_send_teachmsgs(struct appctx *appctx, struct peer *p, |
1619 | | struct stksess *(*peer_stksess_lookup)(struct shared_table *), |
1620 | | struct shared_table *st) |
1621 | 0 | { |
1622 | 0 | int ret, new_pushed, use_timed; |
1623 | 0 | int updates_sent = 0; |
1624 | 0 | int failed_once = 0; |
1625 | |
|
1626 | 0 | TRACE_ENTER(PEERS_EV_SESS_IO, appctx, p, st); |
1627 | |
|
1628 | 0 | ret = 1; |
1629 | 0 | use_timed = 0; |
1630 | 0 | if (st != p->last_local_table) { |
1631 | 0 | ret = peer_send_switchmsg(st, appctx); |
1632 | 0 | if (ret <= 0) |
1633 | 0 | goto out_unlocked; |
1634 | | |
1635 | 0 | p->last_local_table = st; |
1636 | 0 | TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_SWITCH, appctx, NULL, st, NULL, |
1637 | 0 | "table switch message sent (table=%s)", st->table->id); |
1638 | 0 | } |
1639 | | |
1640 | 0 | if (peer_stksess_lookup != peer_teach_process_stksess_lookup) |
1641 | 0 | use_timed = !(p->flags & PEER_F_DWNGRD); |
1642 | | |
1643 | | /* We force new pushed to 1 to force identifier in update message */ |
1644 | 0 | new_pushed = 1; |
1645 | |
|
1646 | 0 | if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock) != 0) { |
1647 | | /* just don't engage here if there is any contention */ |
1648 | 0 | applet_have_more_data(appctx); |
1649 | 0 | ret = -1; |
1650 | 0 | goto out_unlocked; |
1651 | 0 | } |
1652 | | |
1653 | 0 | while (1) { |
1654 | 0 | struct stksess *ts; |
1655 | 0 | unsigned updateid; |
1656 | | |
1657 | | /* push local updates */ |
1658 | 0 | ts = peer_stksess_lookup(st); |
1659 | 0 | if (!ts) { |
1660 | 0 | ret = 1; // done |
1661 | 0 | break; |
1662 | 0 | } |
1663 | | |
1664 | 0 | updateid = ts->upd.key; |
1665 | 0 | if (p->srv->shard && ts->shard != p->srv->shard) { |
1666 | | /* Skip this entry */ |
1667 | 0 | st->last_pushed = updateid; |
1668 | 0 | new_pushed = 1; |
1669 | 0 | continue; |
1670 | 0 | } |
1671 | | |
1672 | 0 | HA_ATOMIC_INC(&ts->ref_cnt); |
1673 | 0 | HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock); |
1674 | |
|
1675 | 0 | ret = peer_send_updatemsg(st, appctx, ts, updateid, new_pushed, use_timed); |
1676 | |
|
1677 | 0 | if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock) != 0) { |
1678 | 0 | if (failed_once) { |
1679 | | /* we've already faced contention twice in this |
1680 | | * loop, this is getting serious, do not insist |
1681 | | * anymore and come back later |
1682 | | */ |
1683 | 0 | HA_ATOMIC_DEC(&ts->ref_cnt); |
1684 | 0 | applet_have_more_data(appctx); |
1685 | 0 | ret = -1; |
1686 | 0 | goto out_unlocked; |
1687 | 0 | } |
1688 | | /* OK contention happens, for this one we'll wait on the |
1689 | | * lock, but only once. |
1690 | | */ |
1691 | 0 | failed_once++; |
1692 | 0 | HA_RWLOCK_RDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock); |
1693 | 0 | } |
1694 | | |
1695 | 0 | HA_ATOMIC_DEC(&ts->ref_cnt); |
1696 | 0 | if (ret <= 0) |
1697 | 0 | break; |
1698 | | |
1699 | 0 | st->last_pushed = updateid; |
1700 | 0 | TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_UPDATE, appctx, NULL, st, NULL, |
1701 | 0 | "update message sent (table=%s, updateid=%u)", st->table->id, st->last_pushed); |
1702 | | |
1703 | | /* identifier may not needed in next update message */ |
1704 | 0 | new_pushed = 0; |
1705 | |
|
1706 | 0 | updates_sent++; |
1707 | 0 | if (updates_sent >= peers_max_updates_at_once) { |
1708 | 0 | applet_have_more_data(appctx); |
1709 | 0 | ret = -1; |
1710 | 0 | break; |
1711 | 0 | } |
1712 | 0 | } |
1713 | | |
1714 | 0 | out: |
1715 | 0 | HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock); |
1716 | 0 | out_unlocked: |
1717 | 0 | TRACE_LEAVE(PEERS_EV_SESS_IO, appctx, p, st); |
1718 | 0 | return ret; |
1719 | 0 | } |
1720 | | |
1721 | | /* |
1722 | | * Function to emit update messages for <st> stick-table when a lesson must |
1723 | | * be taught to the peer <p> (learn state is not PEER_LR_ST_NOTASSIGNED). |
1724 | | * |
1725 | | * Note that <st> shared stick-table is locked when calling this function, and |
1726 | | * the lock is dropped then re-acquired. |
1727 | | * |
1728 | | * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. |
1729 | | * Returns -1 if there was not enough room left to send the message, |
1730 | | * any other negative returned value must be considered as an error with an appcxt st0 |
1731 | | * returned value equal to PEER_SESS_ST_END. |
1732 | | */ |
1733 | | static inline int peer_send_teach_process_msgs(struct appctx *appctx, struct peer *p, |
1734 | | struct shared_table *st) |
1735 | 0 | { |
1736 | 0 | TRACE_PROTO("send teach process messages", PEERS_EV_SESS_IO, appctx, p, st); |
1737 | 0 | return peer_send_teachmsgs(appctx, p, peer_teach_process_stksess_lookup, st); |
1738 | 0 | } |
1739 | | |
1740 | | /* |
1741 | | * Function to emit update messages for <st> stick-table when a lesson must |
1742 | | * be taught to the peer <p> during teach state 1 step. It must be called with |
1743 | | * the stick-table lock released. |
1744 | | * |
1745 | | * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. |
1746 | | * Returns -1 if there was not enough room left to send the message, |
1747 | | * any other negative returned value must be considered as an error with an appcxt st0 |
1748 | | * returned value equal to PEER_SESS_ST_END. |
1749 | | */ |
1750 | | static inline int peer_send_teach_stage1_msgs(struct appctx *appctx, struct peer *p, |
1751 | | struct shared_table *st) |
1752 | 0 | { |
1753 | 0 | TRACE_PROTO("send teach stage1 messages", PEERS_EV_SESS_IO, appctx, p, st); |
1754 | 0 | return peer_send_teachmsgs(appctx, p, peer_teach_stage1_stksess_lookup, st); |
1755 | 0 | } |
1756 | | |
1757 | | /* |
1758 | | * Function to emit update messages for <st> stick-table when a lesson must |
1759 | | * be taught to the peer <p> during teach state 1 step. It must be called with |
1760 | | * the stick-table lock released. |
1761 | | * |
1762 | | * Return 0 if any message could not be built modifying the appcxt st0 to PEER_SESS_ST_END value. |
1763 | | * Returns -1 if there was not enough room left to send the message, |
1764 | | * any other negative returned value must be considered as an error with an appcxt st0 |
1765 | | * returned value equal to PEER_SESS_ST_END. |
1766 | | */ |
1767 | | static inline int peer_send_teach_stage2_msgs(struct appctx *appctx, struct peer *p, |
1768 | | struct shared_table *st) |
1769 | 0 | { |
1770 | 0 | TRACE_PROTO("send teach stage2 messages", PEERS_EV_SESS_IO, appctx, p, st); |
1771 | 0 | return peer_send_teachmsgs(appctx, p, peer_teach_stage2_stksess_lookup, st); |
1772 | 0 | } |
1773 | | |
1774 | | |
1775 | | /* |
1776 | | * Function used to parse a stick-table update message after it has been received |
1777 | | * by <p> peer with <msg_cur> as address of the pointer to the position in the |
1778 | | * receipt buffer with <msg_end> being position of the end of the stick-table message. |
1779 | | * Update <msg_curr> accordingly to the peer protocol specs if no peer protocol error |
1780 | | * was encountered. |
1781 | | * <exp> must be set if the stick-table entry expires. |
1782 | | * <updt> must be set for PEER_MSG_STKT_UPDATE or PEER_MSG_STKT_UPDATE_TIMED stick-table |
1783 | | * messages, in this case the stick-table update message is received with a stick-table |
1784 | | * update ID. |
1785 | | * <totl> is the length of the stick-table update message computed upon receipt. |
1786 | | */ |
1787 | | int peer_treat_updatemsg(struct appctx *appctx, struct peer *p, int updt, int exp, |
1788 | | char **msg_cur, char *msg_end, int msg_len, int totl) |
1789 | 0 | { |
1790 | 0 | struct shared_table *st = p->remote_table; |
1791 | 0 | struct stktable *table; |
1792 | 0 | struct stksess *ts, *newts; |
1793 | 0 | struct stksess *wts = NULL; /* write_to stksess */ |
1794 | 0 | uint32_t update; |
1795 | 0 | int expire; |
1796 | 0 | unsigned int data_type; |
1797 | 0 | size_t keylen; |
1798 | 0 | void *data_ptr; |
1799 | 0 | char *msg_save; |
1800 | |
|
1801 | 0 | TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_UPDATE, appctx, p, st); |
1802 | | |
1803 | | /* Here we have data message */ |
1804 | 0 | if (!st) { |
1805 | 0 | TRACE_PROTO("ignore update message: no remote table", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_UPDATE, appctx, p); |
1806 | 0 | goto ignore_msg; |
1807 | 0 | } |
1808 | | |
1809 | 0 | table = st->table; |
1810 | |
|
1811 | 0 | expire = MS_TO_TICKS(table->expire); |
1812 | |
|
1813 | 0 | if (updt) { |
1814 | 0 | if (msg_len < sizeof(update)) { |
1815 | 0 | TRACE_ERROR("malformed update message: message too small", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
1816 | 0 | goto malformed_exit; |
1817 | 0 | } |
1818 | | |
1819 | 0 | memcpy(&update, *msg_cur, sizeof(update)); |
1820 | 0 | *msg_cur += sizeof(update); |
1821 | 0 | } |
1822 | 0 | else |
1823 | 0 | update = st->last_get + 1; |
1824 | | |
1825 | 0 | if (p->learnstate != PEER_LR_ST_PROCESSING) |
1826 | 0 | st->last_get = htonl(update); |
1827 | |
|
1828 | 0 | if (exp) { |
1829 | 0 | size_t expire_sz = sizeof expire; |
1830 | |
|
1831 | 0 | if (*msg_cur + expire_sz > msg_end) { |
1832 | 0 | TRACE_ERROR("malformed update message: wrong expiration size", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
1833 | 0 | goto malformed_exit; |
1834 | 0 | } |
1835 | | |
1836 | 0 | memcpy(&expire, *msg_cur, expire_sz); |
1837 | 0 | *msg_cur += expire_sz; |
1838 | 0 | expire = ntohl(expire); |
1839 | | /* Protocol contains expire in MS, check if value is less than table config */ |
1840 | 0 | if (expire > table->expire) |
1841 | 0 | expire = table->expire; |
1842 | | /* the rest of the code considers expire as ticks and not MS */ |
1843 | 0 | expire = MS_TO_TICKS(expire); |
1844 | 0 | } |
1845 | | |
1846 | 0 | newts = stksess_new(table, NULL); |
1847 | 0 | if (!newts) { |
1848 | 0 | TRACE_PROTO("ignore update message: failed to get a new sticky session", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_UPDATE, appctx, p, st); |
1849 | 0 | goto ignore_msg; |
1850 | 0 | } |
1851 | | |
1852 | 0 | if (table->type == SMP_T_STR) { |
1853 | 0 | unsigned int to_read, to_store; |
1854 | |
|
1855 | 0 | to_read = intdecode(msg_cur, msg_end); |
1856 | 0 | if (!*msg_cur) { |
1857 | 0 | TRACE_ERROR("malformed update message: invalid string length", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
1858 | 0 | goto malformed_free_newts; |
1859 | 0 | } |
1860 | | |
1861 | 0 | to_store = MIN(to_read, table->key_size - 1); |
1862 | 0 | if (*msg_cur + to_store > msg_end) { |
1863 | 0 | TRACE_ERROR("malformed update message: invalid string (too big)", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
1864 | 0 | goto malformed_free_newts; |
1865 | 0 | } |
1866 | | |
1867 | 0 | keylen = to_store; |
1868 | 0 | memcpy(newts->key.key, *msg_cur, keylen); |
1869 | 0 | newts->key.key[keylen] = 0; |
1870 | 0 | *msg_cur += to_read; |
1871 | 0 | } |
1872 | 0 | else if (table->type == SMP_T_SINT) { |
1873 | 0 | unsigned int netinteger; |
1874 | |
|
1875 | 0 | if (*msg_cur + sizeof(netinteger) > msg_end) { |
1876 | 0 | TRACE_ERROR("malformed update message: invalid integer (too big)", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
1877 | 0 | goto malformed_free_newts; |
1878 | 0 | } |
1879 | | |
1880 | 0 | keylen = sizeof(netinteger); |
1881 | 0 | memcpy(&netinteger, *msg_cur, keylen); |
1882 | 0 | netinteger = ntohl(netinteger); |
1883 | 0 | memcpy(newts->key.key, &netinteger, keylen); |
1884 | 0 | *msg_cur += keylen; |
1885 | 0 | } |
1886 | 0 | else { |
1887 | 0 | if (*msg_cur + table->key_size > msg_end) { |
1888 | 0 | TRACE_ERROR("malformed update message: invalid key (too big)", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
1889 | 0 | goto malformed_free_newts; |
1890 | 0 | } |
1891 | | |
1892 | 0 | keylen = table->key_size; |
1893 | 0 | memcpy(newts->key.key, *msg_cur, keylen); |
1894 | 0 | *msg_cur += keylen; |
1895 | 0 | } |
1896 | | |
1897 | 0 | newts->shard = stktable_get_key_shard(table, newts->key.key, keylen); |
1898 | | |
1899 | | /* lookup for existing entry */ |
1900 | 0 | ts = stktable_set_entry(table, newts); |
1901 | 0 | if (ts != newts) { |
1902 | 0 | stksess_free(table, newts); |
1903 | 0 | newts = NULL; |
1904 | 0 | } |
1905 | |
|
1906 | 0 | msg_save = *msg_cur; |
1907 | |
|
1908 | 0 | update_wts: |
1909 | |
|
1910 | 0 | HA_RWLOCK_WRLOCK(STK_SESS_LOCK, &ts->lock); |
1911 | |
|
1912 | 0 | for (data_type = 0 ; data_type < STKTABLE_DATA_TYPES ; data_type++) { |
1913 | 0 | uint64_t decoded_int; |
1914 | 0 | unsigned int idx; |
1915 | 0 | int ignore = 0; |
1916 | |
|
1917 | 0 | if (!((1ULL << data_type) & st->remote_data)) |
1918 | 0 | continue; |
1919 | | |
1920 | | /* We shouldn't learn local-only values unless the table is |
1921 | | * considered as "recv-only". Also, when handling the write_to |
1922 | | * table we must ignore types that can be processed so we don't |
1923 | | * interfere with any potential arithmetic logic performed on |
1924 | | * them (ie: cumulative counters). |
1925 | | */ |
1926 | 0 | if ((stktable_data_types[data_type].is_local && |
1927 | 0 | !(table->flags & STK_FL_RECV_ONLY)) || |
1928 | 0 | (table != st->table && !stktable_data_types[data_type].as_is)) |
1929 | 0 | ignore = 1; |
1930 | |
|
1931 | 0 | if (stktable_data_types[data_type].is_array) { |
1932 | | /* in case of array all elements |
1933 | | * use the same std_type and they |
1934 | | * are linearly encoded. |
1935 | | * The number of elements was provided |
1936 | | * by table definition message |
1937 | | */ |
1938 | 0 | switch (stktable_data_types[data_type].std_type) { |
1939 | 0 | case STD_T_SINT: |
1940 | 0 | for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) { |
1941 | 0 | decoded_int = intdecode(msg_cur, msg_end); |
1942 | 0 | if (!*msg_cur) { |
1943 | 0 | TRACE_ERROR("malformed update message: invalid integer data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
1944 | 0 | goto malformed_unlock; |
1945 | 0 | } |
1946 | | |
1947 | 0 | data_ptr = stktable_data_ptr_idx(table, ts, data_type, idx); |
1948 | 0 | if (data_ptr && !ignore) |
1949 | 0 | stktable_data_cast(data_ptr, std_t_sint) = decoded_int; |
1950 | 0 | } |
1951 | 0 | break; |
1952 | 0 | case STD_T_UINT: |
1953 | 0 | for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) { |
1954 | 0 | decoded_int = intdecode(msg_cur, msg_end); |
1955 | 0 | if (!*msg_cur) { |
1956 | 0 | TRACE_ERROR("malformed update message: invalid unsigned integer data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
1957 | 0 | goto malformed_unlock; |
1958 | 0 | } |
1959 | | |
1960 | 0 | data_ptr = stktable_data_ptr_idx(table, ts, data_type, idx); |
1961 | 0 | if (data_ptr && !ignore) |
1962 | 0 | stktable_data_cast(data_ptr, std_t_uint) = decoded_int; |
1963 | 0 | } |
1964 | 0 | break; |
1965 | 0 | case STD_T_ULL: |
1966 | 0 | for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) { |
1967 | 0 | decoded_int = intdecode(msg_cur, msg_end); |
1968 | 0 | if (!*msg_cur) { |
1969 | 0 | TRACE_ERROR("malformed update message: invalid unsigned long data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
1970 | 0 | goto malformed_unlock; |
1971 | 0 | } |
1972 | | |
1973 | 0 | data_ptr = stktable_data_ptr_idx(table, ts, data_type, idx); |
1974 | 0 | if (data_ptr && !ignore) |
1975 | 0 | stktable_data_cast(data_ptr, std_t_ull) = decoded_int; |
1976 | 0 | } |
1977 | 0 | break; |
1978 | 0 | case STD_T_FRQP: |
1979 | 0 | for (idx = 0; idx < st->remote_data_nbelem[data_type]; idx++) { |
1980 | 0 | struct freq_ctr data; |
1981 | | |
1982 | | /* First bit is reserved for the freq_ctr lock |
1983 | | * Note: here we're still protected by the stksess lock |
1984 | | * so we don't need to update the update the freq_ctr |
1985 | | * using its internal lock. |
1986 | | */ |
1987 | |
|
1988 | 0 | decoded_int = intdecode(msg_cur, msg_end); |
1989 | 0 | if (!*msg_cur) { |
1990 | 0 | TRACE_ERROR("malformed update message: invalid freq_ctr data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
1991 | | /* TRACE_PROTO("malformed message", PEERS_EV_UPDTMSG, NULL, p); */ |
1992 | 0 | goto malformed_unlock; |
1993 | 0 | } |
1994 | | |
1995 | 0 | data.curr_tick = tick_add(now_ms, -decoded_int) & ~0x1; |
1996 | 0 | data.curr_ctr = intdecode(msg_cur, msg_end); |
1997 | 0 | if (!*msg_cur) { |
1998 | 0 | TRACE_ERROR("malformed update message: invalid freq_ctr data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
1999 | 0 | goto malformed_unlock; |
2000 | 0 | } |
2001 | | |
2002 | 0 | data.prev_ctr = intdecode(msg_cur, msg_end); |
2003 | 0 | if (!*msg_cur) { |
2004 | 0 | TRACE_ERROR("malformed update message: invalid freq_ctr data", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
2005 | 0 | goto malformed_unlock; |
2006 | 0 | } |
2007 | | |
2008 | 0 | data_ptr = stktable_data_ptr_idx(table, ts, data_type, idx); |
2009 | 0 | if (data_ptr && !ignore) |
2010 | 0 | stktable_data_cast(data_ptr, std_t_frqp) = data; |
2011 | 0 | } |
2012 | 0 | break; |
2013 | 0 | } |
2014 | | |
2015 | | /* array is fully decoded |
2016 | | * proceed next data_type. |
2017 | | */ |
2018 | 0 | continue; |
2019 | 0 | } |
2020 | 0 | decoded_int = intdecode(msg_cur, msg_end); |
2021 | 0 | if (!*msg_cur) { |
2022 | 0 | TRACE_ERROR("malformed update message: invalid data value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
2023 | 0 | goto malformed_unlock; |
2024 | 0 | } |
2025 | | |
2026 | 0 | switch (stktable_data_types[data_type].std_type) { |
2027 | 0 | case STD_T_SINT: |
2028 | 0 | data_ptr = stktable_data_ptr(table, ts, data_type); |
2029 | 0 | if (data_ptr && !ignore) |
2030 | 0 | stktable_data_cast(data_ptr, std_t_sint) = decoded_int; |
2031 | 0 | break; |
2032 | | |
2033 | 0 | case STD_T_UINT: |
2034 | 0 | data_ptr = stktable_data_ptr(table, ts, data_type); |
2035 | 0 | if (data_ptr && !ignore) |
2036 | 0 | stktable_data_cast(data_ptr, std_t_uint) = decoded_int; |
2037 | 0 | break; |
2038 | | |
2039 | 0 | case STD_T_ULL: |
2040 | 0 | data_ptr = stktable_data_ptr(table, ts, data_type); |
2041 | 0 | if (data_ptr && !ignore) |
2042 | 0 | stktable_data_cast(data_ptr, std_t_ull) = decoded_int; |
2043 | 0 | break; |
2044 | | |
2045 | 0 | case STD_T_FRQP: { |
2046 | 0 | struct freq_ctr data; |
2047 | | |
2048 | | /* First bit is reserved for the freq_ctr lock |
2049 | | Note: here we're still protected by the stksess lock |
2050 | | so we don't need to update the update the freq_ctr |
2051 | | using its internal lock. |
2052 | | */ |
2053 | |
|
2054 | 0 | data.curr_tick = tick_add(now_ms, -decoded_int) & ~0x1; |
2055 | 0 | data.curr_ctr = intdecode(msg_cur, msg_end); |
2056 | 0 | if (!*msg_cur) { |
2057 | 0 | TRACE_ERROR("malformed update message: invalid freq_ctr value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
2058 | 0 | goto malformed_unlock; |
2059 | 0 | } |
2060 | | |
2061 | 0 | data.prev_ctr = intdecode(msg_cur, msg_end); |
2062 | 0 | if (!*msg_cur) { |
2063 | 0 | TRACE_ERROR("malformed update message: invalid freq_ctr value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
2064 | 0 | goto malformed_unlock; |
2065 | 0 | } |
2066 | | |
2067 | 0 | data_ptr = stktable_data_ptr(table, ts, data_type); |
2068 | 0 | if (data_ptr && !ignore) |
2069 | 0 | stktable_data_cast(data_ptr, std_t_frqp) = data; |
2070 | 0 | break; |
2071 | 0 | } |
2072 | 0 | case STD_T_DICT: { |
2073 | 0 | struct buffer *chunk; |
2074 | 0 | size_t data_len, value_len; |
2075 | 0 | unsigned int id; |
2076 | 0 | struct dict_entry *de; |
2077 | 0 | struct dcache *dc; |
2078 | 0 | char *end; |
2079 | |
|
2080 | 0 | if (!decoded_int) { |
2081 | | /* No entry. */ |
2082 | 0 | break; |
2083 | 0 | } |
2084 | 0 | data_len = decoded_int; |
2085 | 0 | if (*msg_cur + data_len > msg_end) { |
2086 | 0 | TRACE_ERROR("malformed update message: invalid dict value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
2087 | 0 | goto malformed_unlock; |
2088 | 0 | } |
2089 | | |
2090 | | /* Compute the end of the current data, <msg_end> being at the end of |
2091 | | * the entire message. |
2092 | | */ |
2093 | 0 | end = *msg_cur + data_len; |
2094 | 0 | id = intdecode(msg_cur, end); |
2095 | 0 | if (!*msg_cur || !id) { |
2096 | 0 | TRACE_ERROR("malformed update message: invalid dict value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
2097 | 0 | goto malformed_unlock; |
2098 | 0 | } |
2099 | | |
2100 | 0 | dc = p->dcache; |
2101 | 0 | if (*msg_cur == end) { |
2102 | | /* Dictionary entry key without value. */ |
2103 | 0 | if (id > dc->max_entries) { |
2104 | 0 | TRACE_ERROR("malformed update message: invalid dict value", PEERS_EV_SESS_IO|PEERS_EV_PROTO_ERR, appctx, p, st); |
2105 | 0 | goto malformed_unlock; |
2106 | 0 | } |
2107 | | /* IDs sent over the network are numbered from 1. */ |
2108 | 0 | de = dc->rx[id - 1].de; |
2109 | 0 | } |
2110 | 0 | else { |
2111 | 0 | chunk = get_trash_chunk(); |
2112 | 0 | value_len = intdecode(msg_cur, end); |
2113 | 0 | if (!*msg_cur || *msg_cur + value_len > end || |
2114 | 0 | unlikely(value_len + 1 >= chunk->size)) { |
2115 | 0 | TRACE_ERROR("malformed update message: invalid dict value", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
2116 | 0 | goto malformed_unlock; |
2117 | 0 | } |
2118 | | |
2119 | 0 | chunk_memcpy(chunk, *msg_cur, value_len); |
2120 | 0 | chunk->area[chunk->data] = '\0'; |
2121 | 0 | *msg_cur += value_len; |
2122 | |
|
2123 | 0 | de = dict_insert(&server_key_dict, chunk->area); |
2124 | 0 | dict_entry_unref(&server_key_dict, dc->rx[id - 1].de); |
2125 | 0 | dc->rx[id - 1].de = de; |
2126 | 0 | } |
2127 | 0 | if (de) { |
2128 | 0 | data_ptr = stktable_data_ptr(table, ts, data_type); |
2129 | 0 | if (data_ptr && !ignore) { |
2130 | 0 | HA_ATOMIC_INC(&de->refcount); |
2131 | 0 | stktable_data_cast(data_ptr, std_t_dict) = de; |
2132 | 0 | } |
2133 | 0 | } |
2134 | 0 | break; |
2135 | 0 | } |
2136 | 0 | } |
2137 | 0 | } |
2138 | | |
2139 | 0 | if (st->table->write_to.t && table != st->table->write_to.t) { |
2140 | 0 | struct stktable_key stkey = { .key = ts->key.key, .key_len = keylen }; |
2141 | | |
2142 | | /* While we're still under the main ts lock, try to get related |
2143 | | * write_to stksess with main ts key |
2144 | | */ |
2145 | 0 | wts = stktable_get_entry(st->table->write_to.t, &stkey); |
2146 | 0 | } |
2147 | | |
2148 | | /* Force new expiration */ |
2149 | 0 | ts->expire = tick_add(now_ms, expire); |
2150 | |
|
2151 | 0 | HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock); |
2152 | | |
2153 | | /* we MUST NOT dec the refcnt yet because stktable_trash_oldest() or |
2154 | | * process_table_expire() could execute between the two next lines. |
2155 | | */ |
2156 | 0 | stktable_touch_remote(table, ts, 0); |
2157 | | |
2158 | | /* Entry was just learned from a peer, we want to notify this peer |
2159 | | * if we happen to modify it. Thus let's consider at least one |
2160 | | * peer has seen the update (ie: the peer that sent us the update) |
2161 | | */ |
2162 | 0 | HA_ATOMIC_STORE(&ts->seen, 1); |
2163 | | |
2164 | | /* only now we can decrement the refcnt */ |
2165 | 0 | HA_ATOMIC_DEC(&ts->ref_cnt); |
2166 | |
|
2167 | 0 | if (wts) { |
2168 | | /* Start over the message decoding for wts as we got a valid stksess |
2169 | | * for write_to table, so we need to refresh the entry with supported |
2170 | | * values. |
2171 | | * |
2172 | | * We prefer to do the decoding a second time even though it might |
2173 | | * cost a bit more than copying from main ts to wts, but doing so |
2174 | | * enables us to get rid of main ts lock: we only need the wts lock |
2175 | | * since upstream data is still available in msg_cur |
2176 | | */ |
2177 | 0 | ts = wts; |
2178 | 0 | table = st->table->write_to.t; |
2179 | 0 | wts = NULL; /* so we don't get back here */ |
2180 | 0 | *msg_cur = msg_save; |
2181 | 0 | goto update_wts; |
2182 | 0 | } |
2183 | | |
2184 | 0 | TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_UPDATE, appctx, p, NULL, NULL, |
2185 | 0 | "Update message successfully processed (table=%s, updateid=%u)", st->table->id, st->last_get); |
2186 | |
|
2187 | 0 | ignore_msg: |
2188 | 0 | TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_UPDATE, appctx, p, st); |
2189 | 0 | return 1; |
2190 | | |
2191 | 0 | malformed_unlock: |
2192 | | /* malformed message */ |
2193 | 0 | HA_RWLOCK_WRUNLOCK(STK_SESS_LOCK, &ts->lock); |
2194 | 0 | stktable_touch_remote(st->table, ts, 1); |
2195 | 0 | goto malformed_exit; |
2196 | | |
2197 | 0 | malformed_free_newts: |
2198 | | /* malformed message */ |
2199 | 0 | stksess_free(st->table, newts); |
2200 | 0 | malformed_exit: |
2201 | 0 | appctx->st0 = PEER_SESS_ST_ERRPROTO; |
2202 | 0 | TRACE_DEVEL("leaving in error", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p, st); |
2203 | 0 | return 0; |
2204 | 0 | } |
2205 | | |
2206 | | /* |
2207 | | * Function used to parse a stick-table update acknowledgement message after it |
2208 | | * has been received by <p> peer with <msg_cur> as address of the pointer to the position in the |
2209 | | * receipt buffer with <msg_end> being the position of the end of the stick-table message. |
2210 | | * Update <msg_curr> accordingly to the peer protocol specs if no peer protocol error |
2211 | | * was encountered. |
2212 | | * Return 1 if succeeded, 0 if not with the appctx state st0 set to PEER_SESS_ST_ERRPROTO. |
2213 | | */ |
2214 | | static inline int peer_treat_ackmsg(struct appctx *appctx, struct peer *p, |
2215 | | char **msg_cur, char *msg_end) |
2216 | 0 | { |
2217 | | /* ack message */ |
2218 | 0 | uint32_t table_id ; |
2219 | 0 | uint32_t update; |
2220 | 0 | struct shared_table *st = NULL; |
2221 | 0 | int ret = 1; |
2222 | |
|
2223 | 0 | TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, p); |
2224 | | /* ignore ack during teaching process */ |
2225 | 0 | if (p->flags & PEER_F_TEACH_PROCESS) { |
2226 | 0 | TRACE_DEVEL("Ignore ack during teaching process", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, p); |
2227 | 0 | goto end; |
2228 | 0 | } |
2229 | | |
2230 | 0 | table_id = intdecode(msg_cur, msg_end); |
2231 | 0 | if (!*msg_cur || (*msg_cur + sizeof(update) > msg_end)) { |
2232 | 0 | TRACE_ERROR("malformed ackk message: no table id", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p); |
2233 | 0 | appctx->st0 = PEER_SESS_ST_ERRPROTO; |
2234 | 0 | ret = 0; |
2235 | 0 | goto end; |
2236 | 0 | } |
2237 | | |
2238 | 0 | memcpy(&update, *msg_cur, sizeof(update)); |
2239 | 0 | update = ntohl(update); |
2240 | 0 | for (st = p->tables; st; st = st->next) { |
2241 | 0 | if (st->local_id == table_id) { |
2242 | 0 | st->update = update; |
2243 | 0 | TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_ACK, appctx, p, NULL, NULL, |
2244 | 0 | "Ack message successfully process (table=%s, updateid=%u)", st->table->id, st->update); |
2245 | 0 | break; |
2246 | 0 | } |
2247 | 0 | } |
2248 | |
|
2249 | 0 | end: |
2250 | 0 | TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, p, st); |
2251 | 0 | return ret; |
2252 | 0 | } |
2253 | | |
2254 | | /* |
2255 | | * Function used to parse a stick-table switch message after it has been received |
2256 | | * by <p> peer with <msg_cur> as address of the pointer to the position in the |
2257 | | * receipt buffer with <msg_end> being the position of the end of the stick-table message. |
2258 | | * Update <msg_curr> accordingly to the peer protocol specs if no peer protocol error |
2259 | | * was encountered. |
2260 | | * Return 1 if succeeded, 0 if not with the appctx state st0 set to PEER_SESS_ST_ERRPROTO. |
2261 | | */ |
2262 | | static inline int peer_treat_switchmsg(struct appctx *appctx, struct peer *p, |
2263 | | char **msg_cur, char *msg_end) |
2264 | 0 | { |
2265 | 0 | struct shared_table *st; |
2266 | 0 | int table_id; |
2267 | |
|
2268 | 0 | TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_SWITCH, appctx, p); |
2269 | 0 | table_id = intdecode(msg_cur, msg_end); |
2270 | 0 | if (!*msg_cur) { |
2271 | 0 | TRACE_ERROR("malformed table switch message: no table id", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p); |
2272 | 0 | appctx->st0 = PEER_SESS_ST_ERRPROTO; |
2273 | 0 | return 0; |
2274 | 0 | } |
2275 | | |
2276 | 0 | p->remote_table = NULL; |
2277 | 0 | for (st = p->tables; st; st = st->next) { |
2278 | 0 | if (st->remote_id == table_id) { |
2279 | 0 | p->remote_table = st; |
2280 | 0 | TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_SWITCH, appctx, p, NULL, NULL, |
2281 | 0 | "table switch message successfully process (table=%s)", st->table->id); |
2282 | 0 | break; |
2283 | 0 | } |
2284 | 0 | } |
2285 | |
|
2286 | 0 | TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_SWITCH, appctx, p, st); |
2287 | 0 | return 1; |
2288 | 0 | } |
2289 | | |
2290 | | /* |
2291 | | * Function used to parse a stick-table definition message after it has been received |
2292 | | * by <p> peer with <msg_cur> as address of the pointer to the position in the |
2293 | | * receipt buffer with <msg_end> being the position of the end of the stick-table message. |
2294 | | * Update <msg_curr> accordingly to the peer protocol specs if no peer protocol error |
2295 | | * was encountered. |
2296 | | * <totl> is the length of the stick-table update message computed upon receipt. |
2297 | | * Return 1 if succeeded, 0 if not with the appctx state st0 set to PEER_SESS_ST_ERRPROTO. |
2298 | | */ |
2299 | | static inline int peer_treat_definemsg(struct appctx *appctx, struct peer *p, |
2300 | | char **msg_cur, char *msg_end, int totl) |
2301 | 0 | { |
2302 | 0 | int table_id_len; |
2303 | 0 | struct shared_table *st; |
2304 | 0 | int table_type; |
2305 | 0 | int table_keylen; |
2306 | 0 | int table_id; |
2307 | 0 | uint64_t table_data; |
2308 | |
|
2309 | 0 | TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p); |
2310 | 0 | table_id = intdecode(msg_cur, msg_end); |
2311 | 0 | if (!*msg_cur) { |
2312 | 0 | TRACE_ERROR("malformed table definition message: no table id", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p); |
2313 | 0 | goto malformed_exit; |
2314 | 0 | } |
2315 | | |
2316 | 0 | table_id_len = intdecode(msg_cur, msg_end); |
2317 | 0 | if (!*msg_cur) { |
2318 | 0 | TRACE_ERROR("malformed table definition message: no table name length", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p); |
2319 | 0 | goto malformed_exit; |
2320 | 0 | } |
2321 | | |
2322 | 0 | p->remote_table = NULL; |
2323 | 0 | if (!table_id_len || (*msg_cur + table_id_len) >= msg_end) { |
2324 | 0 | TRACE_ERROR("malformed table definition message: no table name", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p); |
2325 | 0 | goto malformed_exit; |
2326 | 0 | } |
2327 | | |
2328 | 0 | for (st = p->tables; st; st = st->next) { |
2329 | | /* Reset IDs */ |
2330 | 0 | if (st->remote_id == table_id) |
2331 | 0 | st->remote_id = 0; |
2332 | |
|
2333 | 0 | if (!p->remote_table && (table_id_len == strlen(st->table->nid)) && |
2334 | 0 | (memcmp(st->table->nid, *msg_cur, table_id_len) == 0)) |
2335 | 0 | p->remote_table = st; |
2336 | 0 | } |
2337 | |
|
2338 | 0 | if (!p->remote_table) { |
2339 | 0 | TRACE_PROTO("ignore table definition message: table not found", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p); |
2340 | 0 | goto ignore_msg; |
2341 | 0 | } |
2342 | | |
2343 | 0 | *msg_cur += table_id_len; |
2344 | 0 | if (*msg_cur >= msg_end) { |
2345 | 0 | TRACE_ERROR("malformed table definition message: truncated message", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p); |
2346 | 0 | goto malformed_exit; |
2347 | 0 | } |
2348 | | |
2349 | 0 | table_type = intdecode(msg_cur, msg_end); |
2350 | 0 | if (!*msg_cur) { |
2351 | 0 | TRACE_ERROR("malformed table definition message: no table type", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p); |
2352 | 0 | goto malformed_exit; |
2353 | 0 | } |
2354 | | |
2355 | 0 | table_keylen = intdecode(msg_cur, msg_end); |
2356 | 0 | if (!*msg_cur) { |
2357 | 0 | TRACE_ERROR("malformed table definition message: no key length", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p); |
2358 | 0 | goto malformed_exit; |
2359 | 0 | } |
2360 | | |
2361 | 0 | table_data = intdecode(msg_cur, msg_end); |
2362 | 0 | if (!*msg_cur) { |
2363 | 0 | TRACE_ERROR("malformed table definition message: no data type", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p); |
2364 | 0 | goto malformed_exit; |
2365 | 0 | } |
2366 | | |
2367 | 0 | if (p->remote_table->table->type != peer_int_key_type[table_type] |
2368 | 0 | || p->remote_table->table->key_size != table_keylen) { |
2369 | 0 | p->remote_table = NULL; |
2370 | 0 | TRACE_PROTO("ignore table definition message: no key/type match", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p); |
2371 | 0 | goto ignore_msg; |
2372 | 0 | } |
2373 | | |
2374 | | /* Check if there there is the additional expire data */ |
2375 | 0 | intdecode(msg_cur, msg_end); |
2376 | 0 | if (*msg_cur) { |
2377 | 0 | uint64_t data_type; |
2378 | 0 | uint64_t type; |
2379 | | |
2380 | | /* This define contains the expire data so we consider |
2381 | | * it also contain all data_types parameters. |
2382 | | */ |
2383 | 0 | for (data_type = 0; data_type < STKTABLE_DATA_TYPES; data_type++) { |
2384 | 0 | if (table_data & (1ULL << data_type)) { |
2385 | 0 | if (stktable_data_types[data_type].is_array) { |
2386 | | /* This should be an array |
2387 | | * so we parse the data_type prefix |
2388 | | * because we must have parameters. |
2389 | | */ |
2390 | 0 | type = intdecode(msg_cur, msg_end); |
2391 | 0 | if (!*msg_cur) { |
2392 | 0 | p->remote_table = NULL; |
2393 | 0 | TRACE_PROTO("ignore table definition message: missing meta data for array", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p); |
2394 | 0 | goto ignore_msg; |
2395 | 0 | } |
2396 | | |
2397 | | /* check if the data_type match the current from the bitfield */ |
2398 | 0 | if (type != data_type) { |
2399 | 0 | p->remote_table = NULL; |
2400 | 0 | TRACE_PROTO("ignore table definition message: meta data mismatch type", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p); |
2401 | 0 | goto ignore_msg; |
2402 | 0 | } |
2403 | | |
2404 | | /* decode the nbelem of the array */ |
2405 | 0 | p->remote_table->remote_data_nbelem[type] = intdecode(msg_cur, msg_end); |
2406 | 0 | if (!*msg_cur) { |
2407 | 0 | p->remote_table = NULL; |
2408 | 0 | TRACE_PROTO("ignore table definition message: missing array size meta data for array", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p); |
2409 | 0 | goto ignore_msg; |
2410 | 0 | } |
2411 | | |
2412 | | /* if it is an array of frqp, we must also have the period to decode */ |
2413 | 0 | if (stktable_data_types[data_type].std_type == STD_T_FRQP) { |
2414 | 0 | intdecode(msg_cur, msg_end); |
2415 | 0 | if (!*msg_cur) { |
2416 | 0 | p->remote_table = NULL; |
2417 | 0 | TRACE_PROTO("ignore table definition message: missing period for frqp", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p); |
2418 | 0 | goto ignore_msg; |
2419 | 0 | } |
2420 | 0 | } |
2421 | 0 | } |
2422 | 0 | else if (stktable_data_types[data_type].std_type == STD_T_FRQP) { |
2423 | | /* This should be a std freq counter data_type |
2424 | | * so we parse the data_type prefix |
2425 | | * because we must have parameters. |
2426 | | */ |
2427 | 0 | type = intdecode(msg_cur, msg_end); |
2428 | 0 | if (!*msg_cur) { |
2429 | 0 | p->remote_table = NULL; |
2430 | 0 | TRACE_PROTO("ignore table definition message: missing data for frqp", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p); |
2431 | 0 | goto ignore_msg; |
2432 | 0 | } |
2433 | | |
2434 | | /* check if the data_type match the current from the bitfield */ |
2435 | 0 | if (type != data_type) { |
2436 | 0 | p->remote_table = NULL; |
2437 | 0 | TRACE_PROTO("ignore table definition message: meta data mismatch", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p); |
2438 | 0 | goto ignore_msg; |
2439 | 0 | } |
2440 | | |
2441 | | /* decode the period */ |
2442 | 0 | intdecode(msg_cur, msg_end); |
2443 | 0 | if (!*msg_cur) { |
2444 | 0 | p->remote_table = NULL; |
2445 | 0 | TRACE_PROTO("ignore table definition message: mismatch period for frqp", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p); |
2446 | 0 | goto ignore_msg; |
2447 | 0 | } |
2448 | 0 | } |
2449 | 0 | } |
2450 | 0 | } |
2451 | 0 | } |
2452 | 0 | else { |
2453 | 0 | uint64_t data_type; |
2454 | | |
2455 | | /* There is not additional data but |
2456 | | * array size parameter is mandatory to parse array |
2457 | | * so we consider an error if an array data_type is define |
2458 | | * but there is no additional data. |
2459 | | */ |
2460 | 0 | for (data_type = 0; data_type < STKTABLE_DATA_TYPES; data_type++) { |
2461 | 0 | if (table_data & (1ULL << data_type)) { |
2462 | 0 | if (stktable_data_types[data_type].is_array) { |
2463 | 0 | p->remote_table = NULL; |
2464 | 0 | TRACE_PROTO("ignore table definition message: missing array size meta data for array", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p); |
2465 | 0 | goto ignore_msg; |
2466 | 0 | } |
2467 | 0 | } |
2468 | 0 | } |
2469 | 0 | } |
2470 | | |
2471 | 0 | p->remote_table->remote_data = table_data; |
2472 | 0 | p->remote_table->remote_id = table_id; |
2473 | |
|
2474 | 0 | TRACE_PRINTF(TRACE_LEVEL_DEVELOPER, PEERS_EV_PROTO_DEF, appctx, p, NULL, NULL, |
2475 | 0 | "table definition message successfully process (table=%s)", p->remote_table->table->id); |
2476 | |
|
2477 | 0 | ignore_msg: |
2478 | 0 | TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_DEF, appctx, p); |
2479 | 0 | return 1; |
2480 | | |
2481 | 0 | malformed_exit: |
2482 | | /* malformed message */ |
2483 | 0 | appctx->st0 = PEER_SESS_ST_ERRPROTO; |
2484 | 0 | TRACE_DEVEL("leaving in error", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, p); |
2485 | 0 | return 0; |
2486 | 0 | } |
2487 | | |
2488 | | /* |
2489 | | * Receive a stick-table message or pre-parse any other message. |
2490 | | * The message's header will be sent into <msg_head> which must be at least |
2491 | | * <msg_head_sz> bytes long (at least 7 to store 32-bit variable lengths). |
2492 | | * The first two bytes are always read, and the rest is only read if the |
2493 | | * first bytes indicate a stick-table message. If the message is a stick-table |
2494 | | * message, the varint is decoded and the equivalent number of bytes will be |
2495 | | * copied into the trash at trash.area. <totl> is incremented by the number of |
2496 | | * bytes read EVEN IN CASE OF INCOMPLETE MESSAGES. |
2497 | | * Returns 1 if there was no error, if not, returns 0 if not enough data were available, |
2498 | | * -1 if there was an error updating the appctx state st0 accordingly. |
2499 | | */ |
2500 | | static inline int peer_recv_msg(struct appctx *appctx, char *msg_head, size_t msg_head_sz, |
2501 | | uint32_t *msg_len, int *totl) |
2502 | 0 | { |
2503 | 0 | int reql; |
2504 | 0 | char *cur; |
2505 | |
|
2506 | 0 | TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx); |
2507 | |
|
2508 | 0 | reql = applet_getblk(appctx, msg_head, 2 * sizeof(char), *totl); |
2509 | 0 | if (reql <= 0) /* closed or EOL not found */ |
2510 | 0 | goto incomplete; |
2511 | | |
2512 | 0 | *totl += reql; |
2513 | |
|
2514 | 0 | if (!(msg_head[1] & PEER_MSG_STKT_BIT_MASK)) |
2515 | 0 | return 1; |
2516 | | |
2517 | | /* This is a stick-table message, let's go on */ |
2518 | | |
2519 | | /* Read and Decode message length */ |
2520 | 0 | msg_head += *totl; |
2521 | 0 | msg_head_sz -= *totl; |
2522 | 0 | reql = applet_input_data(appctx) - *totl; |
2523 | 0 | if (reql > msg_head_sz) |
2524 | 0 | reql = msg_head_sz; |
2525 | |
|
2526 | 0 | reql = applet_getblk(appctx, msg_head, reql, *totl); |
2527 | 0 | if (reql <= 0) /* closed */ |
2528 | 0 | goto incomplete; |
2529 | | |
2530 | 0 | cur = msg_head; |
2531 | 0 | *msg_len = intdecode(&cur, cur + reql); |
2532 | 0 | if (!cur) { |
2533 | | /* the number is truncated, did we read enough ? */ |
2534 | 0 | if (reql < msg_head_sz) |
2535 | 0 | goto incomplete; |
2536 | | |
2537 | | /* malformed message */ |
2538 | 0 | TRACE_PROTO("malformed message: bad message length encoding", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx); |
2539 | 0 | appctx->st0 = PEER_SESS_ST_ERRPROTO; |
2540 | 0 | return -1; |
2541 | 0 | } |
2542 | 0 | *totl += cur - msg_head; |
2543 | | |
2544 | | /* Read message content */ |
2545 | 0 | if (*msg_len) { |
2546 | 0 | if (*msg_len > trash.size) { |
2547 | | /* Status code is not success, abort */ |
2548 | 0 | appctx->st0 = PEER_SESS_ST_ERRSIZE; |
2549 | 0 | TRACE_PROTO("malformed message: too large length encoding", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx); |
2550 | 0 | return -1; |
2551 | 0 | } |
2552 | | |
2553 | 0 | reql = applet_getblk(appctx, trash.area, *msg_len, *totl); |
2554 | 0 | if (reql <= 0) /* closed */ |
2555 | 0 | goto incomplete; |
2556 | 0 | *totl += reql; |
2557 | 0 | } |
2558 | | |
2559 | 0 | TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx); |
2560 | 0 | return 1; |
2561 | | |
2562 | 0 | incomplete: |
2563 | 0 | if (reql < 0 || se_fl_test(appctx->sedesc, SE_FL_SHW)) { |
2564 | | /* there was an error or the message was truncated */ |
2565 | 0 | appctx->st0 = PEER_SESS_ST_END; |
2566 | 0 | TRACE_ERROR("error or messafe truncated", PEERS_EV_SESS_IO|PEERS_EV_RX_ERR, appctx); |
2567 | 0 | return -1; |
2568 | 0 | } |
2569 | | |
2570 | 0 | TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx); |
2571 | 0 | return 0; |
2572 | 0 | } |
2573 | | |
2574 | | /* |
2575 | | * Treat the awaited message with <msg_head> as header.* |
2576 | | * Return 1 if succeeded, 0 if not. |
2577 | | */ |
2578 | | static inline int peer_treat_awaited_msg(struct appctx *appctx, struct peer *peer, unsigned char *msg_head, |
2579 | | char **msg_cur, char *msg_end, int msg_len, int totl) |
2580 | 0 | { |
2581 | 0 | struct peers *peers = peer->peers; |
2582 | |
|
2583 | 0 | TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx, peer); |
2584 | |
|
2585 | 0 | if (msg_head[0] == PEER_MSG_CLASS_CONTROL) { |
2586 | 0 | if (msg_head[1] == PEER_MSG_CTRL_RESYNCREQ) { |
2587 | 0 | struct shared_table *st; |
2588 | | /* Reset message: remote need resync */ |
2589 | 0 | TRACE_PROTO("Resync request message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer); |
2590 | | /* prepare tables for a global push */ |
2591 | 0 | for (st = peer->tables; st; st = st->next) { |
2592 | 0 | st->teaching_origin = st->last_pushed = st->update; |
2593 | 0 | st->flags = 0; |
2594 | 0 | } |
2595 | | |
2596 | | /* reset teaching flags to 0 */ |
2597 | 0 | peer->flags &= ~PEER_TEACH_FLAGS; |
2598 | | |
2599 | | /* flag to start to teach lesson */ |
2600 | 0 | peer->flags |= (PEER_F_TEACH_PROCESS|PEER_F_DBG_RESYNC_REQUESTED); |
2601 | 0 | TRACE_STATE("peer elected to teach leasson to remote peer", PEERS_EV_SESS_RESYNC|PEERS_EV_PROTO_CTRL, appctx, peer); |
2602 | 0 | } |
2603 | 0 | else if (msg_head[1] == PEER_MSG_CTRL_RESYNCFINISHED) { |
2604 | 0 | TRACE_PROTO("Full resync finished message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer); |
2605 | 0 | if (peer->learnstate == PEER_LR_ST_PROCESSING) { |
2606 | 0 | peer->learnstate = PEER_LR_ST_FINISHED; |
2607 | 0 | peer->flags |= PEER_F_WAIT_SYNCTASK_ACK; |
2608 | 0 | task_wakeup(peers->sync_task, TASK_WOKEN_MSG); |
2609 | 0 | TRACE_STATE("Full resync finished", PEERS_EV_SESS_RESYNC|PEERS_EV_PROTO_CTRL, appctx, peer); |
2610 | 0 | } |
2611 | 0 | peer->confirm++; |
2612 | 0 | } |
2613 | 0 | else if (msg_head[1] == PEER_MSG_CTRL_RESYNCPARTIAL) { |
2614 | 0 | TRACE_PROTO("Partial resync finished message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer); |
2615 | 0 | if (peer->learnstate == PEER_LR_ST_PROCESSING) { |
2616 | 0 | peer->learnstate = PEER_LR_ST_FINISHED; |
2617 | 0 | peer->flags |= (PEER_F_LEARN_NOTUP2DATE|PEER_F_WAIT_SYNCTASK_ACK); |
2618 | 0 | task_wakeup(peers->sync_task, TASK_WOKEN_MSG); |
2619 | 0 | TRACE_STATE("partial resync finished", PEERS_EV_SESS_RESYNC|PEERS_EV_PROTO_CTRL, appctx, peer); |
2620 | 0 | } |
2621 | 0 | peer->confirm++; |
2622 | 0 | } |
2623 | 0 | else if (msg_head[1] == PEER_MSG_CTRL_RESYNCCONFIRM) { |
2624 | 0 | struct shared_table *st; |
2625 | |
|
2626 | 0 | TRACE_PROTO("Resync confirm message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer); |
2627 | | /* If stopping state */ |
2628 | 0 | if (stopping) { |
2629 | | /* Close session, push resync no more needed */ |
2630 | 0 | peer->flags |= PEER_F_LOCAL_TEACH_COMPLETE; |
2631 | 0 | appctx->st0 = PEER_SESS_ST_END; |
2632 | 0 | TRACE_STATE("process stopping, stop any resync", PEERS_EV_SESS_RESYNC|PEERS_EV_PROTO_CTRL, appctx, peer); |
2633 | 0 | return 0; |
2634 | 0 | } |
2635 | 0 | for (st = peer->tables; st; st = st->next) { |
2636 | 0 | st->update = st->last_pushed = st->teaching_origin; |
2637 | 0 | st->flags = 0; |
2638 | 0 | } |
2639 | | |
2640 | | /* reset teaching flags to 0 */ |
2641 | 0 | peer->flags &= ~PEER_TEACH_FLAGS; |
2642 | 0 | TRACE_STATE("Stop teaching", PEERS_EV_SESS_RESYNC|PEERS_EV_PROTO_CTRL, appctx, peer); |
2643 | 0 | } |
2644 | 0 | else if (msg_head[1] == PEER_MSG_CTRL_HEARTBEAT) { |
2645 | 0 | TRACE_PROTO("Heartbeat message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer); |
2646 | 0 | peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); |
2647 | 0 | peer->rx_hbt++; |
2648 | 0 | } |
2649 | 0 | } |
2650 | 0 | else if (msg_head[0] == PEER_MSG_CLASS_STICKTABLE) { |
2651 | 0 | if (msg_head[1] == PEER_MSG_STKT_DEFINE) { |
2652 | 0 | TRACE_PROTO("Table definition message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer); |
2653 | 0 | if (!peer_treat_definemsg(appctx, peer, msg_cur, msg_end, totl)) |
2654 | 0 | return 0; |
2655 | 0 | } |
2656 | 0 | else if (msg_head[1] == PEER_MSG_STKT_SWITCH) { |
2657 | 0 | TRACE_PROTO("Table switch message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_CTRL, appctx, peer); |
2658 | 0 | if (!peer_treat_switchmsg(appctx, peer, msg_cur, msg_end)) |
2659 | 0 | return 0; |
2660 | 0 | } |
2661 | 0 | else if (msg_head[1] == PEER_MSG_STKT_UPDATE || |
2662 | 0 | msg_head[1] == PEER_MSG_STKT_INCUPDATE || |
2663 | 0 | msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED || |
2664 | 0 | msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED) { |
2665 | 0 | int update, expire; |
2666 | |
|
2667 | 0 | TRACE_PROTO("Update message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_UPDATE, appctx, peer); |
2668 | 0 | update = msg_head[1] == PEER_MSG_STKT_UPDATE || msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED; |
2669 | 0 | expire = msg_head[1] == PEER_MSG_STKT_UPDATE_TIMED || msg_head[1] == PEER_MSG_STKT_INCUPDATE_TIMED; |
2670 | 0 | if (!peer_treat_updatemsg(appctx, peer, update, expire, |
2671 | 0 | msg_cur, msg_end, msg_len, totl)) |
2672 | 0 | return 0; |
2673 | |
|
2674 | 0 | } |
2675 | 0 | else if (msg_head[1] == PEER_MSG_STKT_ACK) { |
2676 | 0 | TRACE_PROTO("Ack message received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ACK, appctx, peer); |
2677 | 0 | if (!peer_treat_ackmsg(appctx, peer, msg_cur, msg_end)) |
2678 | 0 | return 0; |
2679 | 0 | } |
2680 | 0 | } |
2681 | 0 | else if (msg_head[0] == PEER_MSG_CLASS_RESERVED) { |
2682 | 0 | appctx->st0 = PEER_SESS_ST_ERRPROTO; |
2683 | 0 | TRACE_PROTO("malformed message: reserved", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx, peer); |
2684 | 0 | return 0; |
2685 | 0 | } |
2686 | | |
2687 | 0 | TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_RX_MSG, appctx, peer); |
2688 | 0 | return 1; |
2689 | 0 | } |
2690 | | |
2691 | | |
2692 | | /* |
2693 | | * Send any message to <peer> peer. |
2694 | | * Returns 1 if succeeded, or -1 or 0 if failed. |
2695 | | * -1 means an internal error occurred, 0 is for a peer protocol error leading |
2696 | | * to a peer state change (from the peer I/O handler point of view). |
2697 | | * |
2698 | | * - peer->last_local_table is the last table for which we send an update |
2699 | | * messages. |
2700 | | * |
2701 | | * - peer->stop_local_table is the last evaluated table. It is unset when the |
2702 | | * teaching process starts. But we use it as a |
2703 | | * restart point when the loop is interrupted. It is |
2704 | | * especially useful when the number of tables exceeds |
2705 | | * peers_max_updates_at_once value. |
2706 | | * |
2707 | | * When a teaching lopp is started, the peer's last_local_table is saved in a |
2708 | | * local variable. This variable is used as a finish point. When the crrent |
2709 | | * table is equal to it, it means all tables were evaluated, all updates where |
2710 | | * sent and the teaching process is finished. |
2711 | | * |
2712 | | * peer->stop_local_table is always NULL when the teaching process begins. It is |
2713 | | * only reset at the end. In the mean time, it always point on a table. |
2714 | | */ |
2715 | | |
2716 | | int peer_send_msgs(struct appctx *appctx, |
2717 | | struct peer *peer, struct peers *peers) |
2718 | 0 | { |
2719 | 0 | int repl = 1; |
2720 | |
|
2721 | 0 | TRACE_ENTER(PEERS_EV_SESS_IO, appctx, peer); |
2722 | | |
2723 | | /* Need to request a resync (only possible for a remote peer at this stage) */ |
2724 | 0 | if (peer->learnstate == PEER_LR_ST_ASSIGNED) { |
2725 | 0 | BUG_ON(peer->local); |
2726 | 0 | repl = peer_send_resync_reqmsg(appctx, peer, peers); |
2727 | 0 | if (repl <= 0) |
2728 | 0 | goto end; |
2729 | 0 | peer->learnstate = PEER_LR_ST_PROCESSING; |
2730 | 0 | TRACE_STATE("Start processing resync", PEERS_EV_SESS_IO|PEERS_EV_SESS_RESYNC, appctx, peer); |
2731 | 0 | } |
2732 | | |
2733 | | /* Nothing to read, now we start to write */ |
2734 | 0 | if (peer->tables) { |
2735 | 0 | struct shared_table *st; |
2736 | 0 | struct shared_table *last_local_table; |
2737 | 0 | int updates = 0; |
2738 | |
|
2739 | 0 | last_local_table = peer->last_local_table; |
2740 | 0 | if (!last_local_table) |
2741 | 0 | last_local_table = peer->tables; |
2742 | 0 | if (!peer->stop_local_table) |
2743 | 0 | peer->stop_local_table = last_local_table; |
2744 | 0 | st = peer->stop_local_table->next; |
2745 | |
|
2746 | 0 | while (1) { |
2747 | 0 | if (!st) |
2748 | 0 | st = peer->tables; |
2749 | | /* It remains some updates to ack */ |
2750 | 0 | if (st->last_get != st->last_acked) { |
2751 | 0 | repl = peer_send_ackmsg(st, appctx); |
2752 | 0 | if (repl <= 0) |
2753 | 0 | goto end; |
2754 | | |
2755 | 0 | st->last_acked = st->last_get; |
2756 | 0 | TRACE_PRINTF(TRACE_LEVEL_PROTO, PEERS_EV_PROTO_ACK, appctx, NULL, st, NULL, |
2757 | 0 | "ack message sent (table=%s, updateid=%u)", st->table->id, st->last_acked); |
2758 | 0 | } |
2759 | | |
2760 | 0 | if (!(peer->flags & PEER_F_TEACH_PROCESS)) { |
2761 | 0 | int must_send; |
2762 | |
|
2763 | 0 | if (HA_RWLOCK_TRYRDLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock)) { |
2764 | 0 | applet_have_more_data(appctx); |
2765 | 0 | repl = -1; |
2766 | 0 | goto end; |
2767 | 0 | } |
2768 | 0 | must_send = (peer->learnstate == PEER_LR_ST_NOTASSIGNED) && (st->last_pushed != st->table->localupdate); |
2769 | 0 | HA_RWLOCK_RDUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock); |
2770 | |
|
2771 | 0 | if (must_send) { |
2772 | 0 | repl = peer_send_teach_process_msgs(appctx, peer, st); |
2773 | 0 | if (repl <= 0) { |
2774 | 0 | peer->stop_local_table = peer->last_local_table; |
2775 | 0 | goto end; |
2776 | 0 | } |
2777 | 0 | } |
2778 | 0 | } |
2779 | 0 | else if (!(peer->flags & PEER_F_TEACH_FINISHED)) { |
2780 | 0 | if (!(st->flags & SHTABLE_F_TEACH_STAGE1)) { |
2781 | 0 | repl = peer_send_teach_stage1_msgs(appctx, peer, st); |
2782 | 0 | if (repl <= 0) { |
2783 | 0 | peer->stop_local_table = peer->last_local_table; |
2784 | 0 | goto end; |
2785 | 0 | } |
2786 | 0 | } |
2787 | | |
2788 | 0 | if (!(st->flags & SHTABLE_F_TEACH_STAGE2)) { |
2789 | 0 | repl = peer_send_teach_stage2_msgs(appctx, peer, st); |
2790 | 0 | if (repl <= 0) { |
2791 | 0 | peer->stop_local_table = peer->last_local_table; |
2792 | 0 | goto end; |
2793 | 0 | } |
2794 | 0 | } |
2795 | 0 | } |
2796 | | |
2797 | 0 | if (st == last_local_table) { |
2798 | 0 | peer->stop_local_table = NULL; |
2799 | 0 | break; |
2800 | 0 | } |
2801 | | |
2802 | | /* This one is to be sure to restart from <st->next> if we are interrupted |
2803 | | * because of peer_send_teach_stage2_msgs or because buffer is full |
2804 | | * when sedning an ackmsg. In both cases current <st> was evaluated and |
2805 | | * we must restart from <st->next> |
2806 | | */ |
2807 | 0 | peer->stop_local_table = st; |
2808 | |
|
2809 | 0 | updates++; |
2810 | 0 | if (updates >= peers_max_updates_at_once) { |
2811 | 0 | applet_have_more_data(appctx); |
2812 | 0 | repl = -1; |
2813 | 0 | goto end; |
2814 | 0 | } |
2815 | | |
2816 | 0 | st = st->next; |
2817 | 0 | } |
2818 | 0 | } |
2819 | | |
2820 | 0 | if ((peer->flags & PEER_F_TEACH_PROCESS) && !(peer->flags & PEER_F_TEACH_FINISHED)) { |
2821 | 0 | repl = peer_send_resync_finishedmsg(appctx, peer, peers); |
2822 | 0 | if (repl <= 0) |
2823 | 0 | goto end; |
2824 | | |
2825 | | /* flag finished message sent */ |
2826 | 0 | peer->flags |= PEER_F_TEACH_FINISHED; |
2827 | 0 | TRACE_STATE("full/partial resync finished", PEERS_EV_SESS_IO|PEERS_EV_SESS_RESYNC, appctx, peer); |
2828 | 0 | } |
2829 | | |
2830 | | /* Confirm finished or partial messages */ |
2831 | 0 | while (peer->confirm) { |
2832 | 0 | repl = peer_send_resync_confirmsg(appctx, peer, peers); |
2833 | 0 | if (repl <= 0) |
2834 | 0 | goto end; |
2835 | 0 | TRACE_STATE("Confirm resync is finished", PEERS_EV_SESS_IO|PEERS_EV_SESS_RESYNC, appctx, peer); |
2836 | 0 | peer->confirm--; |
2837 | 0 | } |
2838 | | |
2839 | 0 | repl = 1; |
2840 | 0 | end: |
2841 | 0 | TRACE_LEAVE(PEERS_EV_SESS_IO, appctx, peer); |
2842 | 0 | return repl; |
2843 | 0 | } |
2844 | | |
2845 | | /* |
2846 | | * Read and parse a first line of a "hello" peer protocol message. |
2847 | | * Returns 0 if could not read a line, -1 if there was a read error or |
2848 | | * the line is malformed, 1 if succeeded. |
2849 | | */ |
2850 | | static inline int peer_getline_version(struct appctx *appctx, |
2851 | | unsigned int *maj_ver, unsigned int *min_ver) |
2852 | 0 | { |
2853 | 0 | int reql; |
2854 | |
|
2855 | 0 | reql = peer_getline(appctx); |
2856 | 0 | if (!reql) |
2857 | 0 | return 0; |
2858 | | |
2859 | 0 | if (reql < 0) |
2860 | 0 | return -1; |
2861 | | |
2862 | | /* test protocol */ |
2863 | 0 | if (strncmp(PEER_SESSION_PROTO_NAME " ", trash.area, proto_len + 1) != 0) { |
2864 | 0 | appctx->st0 = PEER_SESS_ST_EXIT; |
2865 | 0 | appctx->st1 = PEER_SESS_SC_ERRPROTO; |
2866 | 0 | TRACE_ERROR("protocol error: invalid version line", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx); |
2867 | 0 | return -1; |
2868 | 0 | } |
2869 | 0 | if (peer_get_version(trash.area + proto_len + 1, maj_ver, min_ver) == -1 || |
2870 | 0 | *maj_ver != PEER_MAJOR_VER || *min_ver > PEER_MINOR_VER) { |
2871 | 0 | appctx->st0 = PEER_SESS_ST_EXIT; |
2872 | 0 | appctx->st1 = PEER_SESS_SC_ERRVERSION; |
2873 | 0 | TRACE_ERROR("protocol error: invalid version", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx); |
2874 | 0 | return -1; |
2875 | 0 | } |
2876 | | |
2877 | 0 | TRACE_DATA("version line received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_HELLO, appctx); |
2878 | 0 | return 1; |
2879 | 0 | } |
2880 | | |
2881 | | /* |
2882 | | * Read and parse a second line of a "hello" peer protocol message. |
2883 | | * Returns 0 if could not read a line, -1 if there was a read error or |
2884 | | * the line is malformed, 1 if succeeded. |
2885 | | */ |
2886 | | static inline int peer_getline_host(struct appctx *appctx) |
2887 | 0 | { |
2888 | 0 | int reql; |
2889 | |
|
2890 | 0 | reql = peer_getline(appctx); |
2891 | 0 | if (!reql) |
2892 | 0 | return 0; |
2893 | | |
2894 | 0 | if (reql < 0) |
2895 | 0 | return -1; |
2896 | | |
2897 | | /* test hostname match */ |
2898 | 0 | if (strcmp(localpeer, trash.area) != 0) { |
2899 | 0 | appctx->st0 = PEER_SESS_ST_EXIT; |
2900 | 0 | appctx->st1 = PEER_SESS_SC_ERRHOST; |
2901 | 0 | TRACE_ERROR("protocol error: wrong host", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx); |
2902 | 0 | return -1; |
2903 | 0 | } |
2904 | | |
2905 | 0 | TRACE_DATA("host line received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_HELLO, appctx); |
2906 | 0 | return 1; |
2907 | 0 | } |
2908 | | |
2909 | | /* |
2910 | | * Read and parse a last line of a "hello" peer protocol message. |
2911 | | * Returns 0 if could not read a character, -1 if there was a read error or |
2912 | | * the line is malformed, 1 if succeeded. |
2913 | | * Set <curpeer> accordingly (the remote peer sending the "hello" message). |
2914 | | */ |
2915 | | static inline int peer_getline_last(struct appctx *appctx, struct peer **curpeer) |
2916 | 0 | { |
2917 | 0 | char *p; |
2918 | 0 | int reql; |
2919 | 0 | struct peer *peer; |
2920 | 0 | struct peers *peers = strm_fe(appctx_strm(appctx))->parent; |
2921 | |
|
2922 | 0 | reql = peer_getline(appctx); |
2923 | 0 | if (!reql) |
2924 | 0 | return 0; |
2925 | | |
2926 | 0 | if (reql < 0) |
2927 | 0 | return -1; |
2928 | | |
2929 | | /* parse line "<peer name> <pid> <relative_pid>" */ |
2930 | 0 | p = strchr(trash.area, ' '); |
2931 | 0 | if (!p) { |
2932 | 0 | appctx->st0 = PEER_SESS_ST_EXIT; |
2933 | 0 | appctx->st1 = PEER_SESS_SC_ERRPROTO; |
2934 | 0 | TRACE_ERROR("protocol error: invalid peer line", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx); |
2935 | 0 | return -1; |
2936 | 0 | } |
2937 | 0 | *p = 0; |
2938 | | |
2939 | | /* lookup known peer */ |
2940 | 0 | for (peer = peers->remote; peer; peer = peer->next) { |
2941 | 0 | if (strcmp(peer->id, trash.area) == 0) |
2942 | 0 | break; |
2943 | 0 | } |
2944 | | |
2945 | | /* if unknown peer */ |
2946 | 0 | if (!peer) { |
2947 | 0 | appctx->st0 = PEER_SESS_ST_EXIT; |
2948 | 0 | appctx->st1 = PEER_SESS_SC_ERRPEER; |
2949 | 0 | TRACE_ERROR("protocol error: unknown peer", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_ERR, appctx); |
2950 | 0 | return -1; |
2951 | 0 | } |
2952 | 0 | *curpeer = peer; |
2953 | |
|
2954 | 0 | TRACE_DATA("peer line received", PEERS_EV_SESS_IO|PEERS_EV_RX_MSG|PEERS_EV_PROTO_HELLO, appctx, peer); |
2955 | 0 | return 1; |
2956 | 0 | } |
2957 | | |
2958 | | /* |
2959 | | * Init <peer> peer after validating a connection at peer protocol level. It may |
2960 | | * a incoming or outgoing connection. The peer init must be acknowledge by the |
2961 | | * sync task. Message processing is blocked in the meanwhile. |
2962 | | */ |
2963 | | static inline void init_connected_peer(struct peer *peer, struct peers *peers) |
2964 | 0 | { |
2965 | 0 | struct shared_table *st; |
2966 | |
|
2967 | 0 | TRACE_ENTER(PEERS_EV_SESS_IO|PEERS_EV_SESS_NEW, NULL, peer); |
2968 | 0 | peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); |
2969 | | |
2970 | | /* Init cursors */ |
2971 | 0 | for (st = peer->tables; st ; st = st->next) { |
2972 | 0 | st->last_get = st->last_acked = 0; |
2973 | 0 | HA_RWLOCK_WRLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock); |
2974 | | /* if st->update appears to be in future it means |
2975 | | * that the last acked value is very old and we |
2976 | | * remain unconnected a too long time to use this |
2977 | | * acknowledgement as a reset. |
2978 | | * We should update the protocol to be able to |
2979 | | * signal the remote peer that it needs a full resync. |
2980 | | * Here a partial fix consist to set st->update at |
2981 | | * the max past value. |
2982 | | */ |
2983 | 0 | if ((int)(st->table->localupdate - st->update) < 0) |
2984 | 0 | st->update = st->table->localupdate + (2147483648U); |
2985 | 0 | st->teaching_origin = st->last_pushed = st->update; |
2986 | 0 | st->flags = 0; |
2987 | |
|
2988 | 0 | HA_RWLOCK_WRUNLOCK(STK_TABLE_UPDT_LOCK, &st->table->updt_lock); |
2989 | 0 | } |
2990 | | |
2991 | | /* Awake main task to ack the new peer state */ |
2992 | 0 | task_wakeup(peers->sync_task, TASK_WOKEN_MSG); |
2993 | | |
2994 | | /* Init confirm counter */ |
2995 | 0 | peer->confirm = 0; |
2996 | | |
2997 | | /* reset teaching flags to 0 */ |
2998 | 0 | peer->flags &= ~PEER_TEACH_FLAGS; |
2999 | |
|
3000 | 0 | if (peer->local && !(appctx_is_back(peer->appctx))) { |
3001 | | /* If the local peer has established the connection (appctx is |
3002 | | * on the frontend side), flag it to start to teach lesson. |
3003 | | */ |
3004 | 0 | peer->flags |= PEER_F_TEACH_PROCESS; |
3005 | 0 | TRACE_STATE("peer elected to teach lesson to local peer", PEERS_EV_SESS_NEW|PEERS_EV_SESS_RESYNC, NULL, peer); |
3006 | 0 | } |
3007 | | |
3008 | | /* Mark the peer as starting and wait the sync task */ |
3009 | 0 | peer->flags |= PEER_F_WAIT_SYNCTASK_ACK; |
3010 | 0 | peer->appstate = PEER_APP_ST_STARTING; |
3011 | 0 | TRACE_STATE("peer session starting", PEERS_EV_SESS_NEW, NULL, peer); |
3012 | 0 | TRACE_LEAVE(PEERS_EV_SESS_IO|PEERS_EV_SESS_NEW, NULL, peer); |
3013 | 0 | } |
3014 | | |
3015 | | /* |
3016 | | * IO Handler to handle message exchange with a peer |
3017 | | */ |
3018 | | void peer_io_handler(struct appctx *appctx) |
3019 | 0 | { |
3020 | 0 | struct peer *curpeer = NULL; |
3021 | 0 | int reql = 0; |
3022 | 0 | int repl = 0; |
3023 | 0 | unsigned int maj_ver, min_ver; |
3024 | 0 | int prev_state; |
3025 | 0 | int msg_done = 0; |
3026 | |
|
3027 | 0 | TRACE_ENTER(PEERS_EV_SESS_IO, appctx); |
3028 | |
|
3029 | 0 | if (unlikely(applet_fl_test(appctx, APPCTX_FL_EOS|APPCTX_FL_ERROR))) { |
3030 | 0 | applet_reset_input(appctx); |
3031 | 0 | goto out; |
3032 | 0 | } |
3033 | | |
3034 | | /* Check if the out buffer is available. */ |
3035 | 0 | if (!applet_get_outbuf(appctx)) { |
3036 | 0 | applet_have_more_data(appctx); |
3037 | 0 | goto out; |
3038 | 0 | } |
3039 | | |
3040 | 0 | while (1) { |
3041 | 0 | prev_state = appctx->st0; |
3042 | 0 | switchstate: |
3043 | 0 | maj_ver = min_ver = (unsigned int)-1; |
3044 | 0 | switch(appctx->st0) { |
3045 | 0 | case PEER_SESS_ST_ACCEPT: |
3046 | 0 | prev_state = appctx->st0; |
3047 | 0 | appctx->svcctx = NULL; |
3048 | 0 | appctx->st0 = PEER_SESS_ST_GETVERSION; |
3049 | 0 | __fallthrough; |
3050 | 0 | case PEER_SESS_ST_GETVERSION: |
3051 | 0 | prev_state = appctx->st0; |
3052 | 0 | TRACE_STATE("get version line", PEERS_EV_SESS_IO, appctx); |
3053 | 0 | reql = peer_getline_version(appctx, &maj_ver, &min_ver); |
3054 | 0 | if (reql <= 0) { |
3055 | 0 | if (!reql) |
3056 | 0 | goto out; |
3057 | 0 | goto switchstate; |
3058 | 0 | } |
3059 | | |
3060 | 0 | appctx->st0 = PEER_SESS_ST_GETHOST; |
3061 | 0 | __fallthrough; |
3062 | 0 | case PEER_SESS_ST_GETHOST: |
3063 | 0 | prev_state = appctx->st0; |
3064 | 0 | TRACE_STATE("get host line", PEERS_EV_SESS_IO, appctx); |
3065 | 0 | reql = peer_getline_host(appctx); |
3066 | 0 | if (reql <= 0) { |
3067 | 0 | if (!reql) |
3068 | 0 | goto out; |
3069 | 0 | goto switchstate; |
3070 | 0 | } |
3071 | | |
3072 | 0 | appctx->st0 = PEER_SESS_ST_GETPEER; |
3073 | 0 | __fallthrough; |
3074 | 0 | case PEER_SESS_ST_GETPEER: { |
3075 | 0 | prev_state = appctx->st0; |
3076 | 0 | TRACE_STATE("get peer line", PEERS_EV_SESS_IO, appctx); |
3077 | 0 | reql = peer_getline_last(appctx, &curpeer); |
3078 | 0 | if (reql <= 0) { |
3079 | 0 | if (!reql) |
3080 | 0 | goto out; |
3081 | 0 | goto switchstate; |
3082 | 0 | } |
3083 | | |
3084 | 0 | HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock); |
3085 | 0 | if (curpeer->appctx && curpeer->appctx != appctx) { |
3086 | 0 | if (curpeer->local) { |
3087 | | /* Local connection, reply a retry */ |
3088 | 0 | appctx->st0 = PEER_SESS_ST_EXIT; |
3089 | 0 | appctx->st1 = PEER_SESS_SC_TRYAGAIN; |
3090 | 0 | TRACE_STATE("local connection, retry", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer); |
3091 | 0 | goto switchstate; |
3092 | 0 | } |
3093 | | |
3094 | 0 | TRACE_STATE("release old session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer); |
3095 | | /* we're killing a connection, we must apply a random delay before |
3096 | | * retrying otherwise the other end will do the same and we can loop |
3097 | | * for a while. |
3098 | | */ |
3099 | 0 | curpeer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); |
3100 | 0 | peer_session_forceshutdown(curpeer); |
3101 | |
|
3102 | 0 | curpeer->heartbeat = TICK_ETERNITY; |
3103 | 0 | curpeer->coll++; |
3104 | 0 | } |
3105 | 0 | if (maj_ver != (unsigned int)-1 && min_ver != (unsigned int)-1) { |
3106 | 0 | if (min_ver == PEER_DWNGRD_MINOR_VER) { |
3107 | 0 | curpeer->flags |= PEER_F_DWNGRD; |
3108 | 0 | } |
3109 | 0 | else { |
3110 | 0 | curpeer->flags &= ~PEER_F_DWNGRD; |
3111 | 0 | } |
3112 | 0 | } |
3113 | 0 | curpeer->appctx = appctx; |
3114 | 0 | curpeer->flags |= PEER_F_ALIVE; |
3115 | 0 | appctx->svcctx = curpeer; |
3116 | 0 | appctx->st0 = PEER_SESS_ST_SENDSUCCESS; |
3117 | 0 | _HA_ATOMIC_INC(&active_peers); |
3118 | 0 | } |
3119 | 0 | __fallthrough; |
3120 | 0 | case PEER_SESS_ST_SENDSUCCESS: { |
3121 | 0 | prev_state = appctx->st0; |
3122 | 0 | if (!curpeer) { |
3123 | 0 | curpeer = appctx->svcctx; |
3124 | 0 | HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock); |
3125 | 0 | if (curpeer->appctx != appctx) { |
3126 | 0 | TRACE_STATE("release old session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer); |
3127 | 0 | appctx->st0 = PEER_SESS_ST_END; |
3128 | 0 | goto switchstate; |
3129 | 0 | } |
3130 | 0 | } |
3131 | | |
3132 | 0 | TRACE_STATE("send success", PEERS_EV_SESS_IO, appctx, curpeer); |
3133 | 0 | repl = peer_send_status_successmsg(appctx); |
3134 | 0 | if (repl <= 0) { |
3135 | 0 | if (repl == -1) |
3136 | 0 | goto out; |
3137 | 0 | goto switchstate; |
3138 | 0 | } |
3139 | | |
3140 | | /* Register status code */ |
3141 | 0 | curpeer->statuscode = PEER_SESS_SC_SUCCESSCODE; |
3142 | 0 | curpeer->last_hdshk = now_ms; |
3143 | |
|
3144 | 0 | init_connected_peer(curpeer, curpeer->peers); |
3145 | | |
3146 | | /* switch to waiting message state */ |
3147 | 0 | _HA_ATOMIC_INC(&connected_peers); |
3148 | 0 | appctx->st0 = PEER_SESS_ST_WAITMSG; |
3149 | 0 | TRACE_STATE("connected, now wait for messages", PEERS_EV_SESS_IO, appctx, curpeer); |
3150 | 0 | goto switchstate; |
3151 | 0 | } |
3152 | 0 | case PEER_SESS_ST_CONNECT: { |
3153 | 0 | prev_state = appctx->st0; |
3154 | 0 | if (!curpeer) { |
3155 | 0 | curpeer = appctx->svcctx; |
3156 | 0 | HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock); |
3157 | 0 | if (curpeer->appctx != appctx) { |
3158 | 0 | TRACE_STATE("release old session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer); |
3159 | 0 | appctx->st0 = PEER_SESS_ST_END; |
3160 | 0 | goto switchstate; |
3161 | 0 | } |
3162 | 0 | } |
3163 | | |
3164 | 0 | TRACE_STATE("send hello message", PEERS_EV_SESS_IO, appctx, curpeer); |
3165 | 0 | repl = peer_send_hellomsg(appctx, curpeer); |
3166 | 0 | if (repl <= 0) { |
3167 | 0 | if (repl == -1) |
3168 | 0 | goto out; |
3169 | 0 | goto switchstate; |
3170 | 0 | } |
3171 | | |
3172 | | /* switch to the waiting statuscode state */ |
3173 | 0 | appctx->st0 = PEER_SESS_ST_GETSTATUS; |
3174 | 0 | } |
3175 | 0 | __fallthrough; |
3176 | 0 | case PEER_SESS_ST_GETSTATUS: { |
3177 | 0 | prev_state = appctx->st0; |
3178 | 0 | if (!curpeer) { |
3179 | 0 | curpeer = appctx->svcctx; |
3180 | 0 | HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock); |
3181 | 0 | if (curpeer->appctx != appctx) { |
3182 | 0 | TRACE_STATE("release old session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer); |
3183 | 0 | appctx->st0 = PEER_SESS_ST_END; |
3184 | 0 | goto switchstate; |
3185 | 0 | } |
3186 | 0 | } |
3187 | 0 | curpeer->statuscode = PEER_SESS_SC_CONNECTEDCODE; |
3188 | 0 | TRACE_STATE("get status", PEERS_EV_SESS_IO, appctx, curpeer); |
3189 | |
|
3190 | 0 | reql = peer_getline(appctx); |
3191 | 0 | if (!reql) |
3192 | 0 | goto out; |
3193 | | |
3194 | 0 | if (reql < 0) |
3195 | 0 | goto switchstate; |
3196 | | |
3197 | | /* Register status code */ |
3198 | 0 | curpeer->statuscode = atoi(trash.area); |
3199 | 0 | curpeer->last_hdshk = now_ms; |
3200 | | |
3201 | | /* Awake main task */ |
3202 | 0 | task_wakeup(curpeer->peers->sync_task, TASK_WOKEN_MSG); |
3203 | | |
3204 | | /* If status code is success */ |
3205 | 0 | if (curpeer->statuscode == PEER_SESS_SC_SUCCESSCODE) { |
3206 | 0 | init_connected_peer(curpeer, curpeer->peers); |
3207 | 0 | } |
3208 | 0 | else { |
3209 | 0 | if (curpeer->statuscode == PEER_SESS_SC_ERRVERSION) |
3210 | 0 | curpeer->flags |= PEER_F_DWNGRD; |
3211 | | /* Status code is not success, abort */ |
3212 | 0 | appctx->st0 = PEER_SESS_ST_END; |
3213 | 0 | goto switchstate; |
3214 | 0 | } |
3215 | 0 | _HA_ATOMIC_INC(&connected_peers); |
3216 | 0 | appctx->st0 = PEER_SESS_ST_WAITMSG; |
3217 | 0 | TRACE_STATE("connected, now wait for messages", PEERS_EV_SESS_IO, appctx, curpeer); |
3218 | 0 | } |
3219 | 0 | __fallthrough; |
3220 | 0 | case PEER_SESS_ST_WAITMSG: { |
3221 | 0 | uint32_t msg_len = 0; |
3222 | 0 | char *msg_cur = trash.area; |
3223 | 0 | char *msg_end = trash.area; |
3224 | 0 | unsigned char msg_head[7]; // 2 + 5 for varint32 |
3225 | 0 | int totl = 0; |
3226 | |
|
3227 | 0 | prev_state = appctx->st0; |
3228 | 0 | if (!curpeer) { |
3229 | 0 | curpeer = appctx->svcctx; |
3230 | 0 | HA_SPIN_LOCK(PEER_LOCK, &curpeer->lock); |
3231 | 0 | if (curpeer->appctx != appctx) { |
3232 | 0 | TRACE_STATE("release old session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer); |
3233 | 0 | appctx->st0 = PEER_SESS_ST_END; |
3234 | 0 | goto switchstate; |
3235 | 0 | } |
3236 | 0 | } |
3237 | | |
3238 | 0 | if (curpeer->flags & PEER_F_WAIT_SYNCTASK_ACK) { |
3239 | 0 | applet_wont_consume(appctx); |
3240 | 0 | TRACE_STATE("peer is waiting for sync task", PEERS_EV_SESS_IO, appctx, curpeer); |
3241 | 0 | goto out; |
3242 | 0 | } |
3243 | | |
3244 | | /* check if we've already hit the rx limit (i.e. we've |
3245 | | * already gone through send_msgs and we don't want to |
3246 | | * process input messages again). We must absolutely |
3247 | | * leave via send_msgs otherwise we can leave the |
3248 | | * connection in a stuck state if acks are missing for |
3249 | | * example. |
3250 | | */ |
3251 | 0 | if (msg_done >= peers_max_updates_at_once) { |
3252 | 0 | applet_have_more_data(appctx); // make sure to come back here |
3253 | 0 | goto send_msgs; |
3254 | 0 | } |
3255 | | |
3256 | 0 | applet_will_consume(appctx); |
3257 | | |
3258 | | /* local peer is assigned of a lesson, start it */ |
3259 | 0 | if (curpeer->learnstate == PEER_LR_ST_ASSIGNED && curpeer->local) { |
3260 | 0 | curpeer->learnstate = PEER_LR_ST_PROCESSING; |
3261 | 0 | TRACE_STATE("peer starts to learn", PEERS_EV_SESS_IO, appctx, curpeer); |
3262 | 0 | } |
3263 | |
|
3264 | 0 | reql = peer_recv_msg(appctx, (char *)msg_head, sizeof msg_head, &msg_len, &totl); |
3265 | 0 | if (reql <= 0) { |
3266 | 0 | if (reql == -1) |
3267 | 0 | goto switchstate; |
3268 | 0 | goto send_msgs; |
3269 | 0 | } |
3270 | | |
3271 | 0 | msg_end += msg_len; |
3272 | 0 | if (!peer_treat_awaited_msg(appctx, curpeer, msg_head, &msg_cur, msg_end, msg_len, totl)) |
3273 | 0 | goto switchstate; |
3274 | | |
3275 | 0 | curpeer->flags |= PEER_F_ALIVE; |
3276 | | |
3277 | | /* skip consumed message */ |
3278 | 0 | applet_skip_input(appctx, totl); |
3279 | | |
3280 | | /* make sure we don't process too many at once */ |
3281 | 0 | if (msg_done >= peers_max_updates_at_once) |
3282 | 0 | goto send_msgs; |
3283 | 0 | msg_done++; |
3284 | | |
3285 | | /* loop on that state to peek next message */ |
3286 | 0 | goto switchstate; |
3287 | | |
3288 | 0 | send_msgs: |
3289 | 0 | if (curpeer->flags & PEER_F_HEARTBEAT) { |
3290 | 0 | curpeer->flags &= ~PEER_F_HEARTBEAT; |
3291 | 0 | repl = peer_send_heartbeatmsg(appctx, curpeer, curpeer->peers); |
3292 | 0 | if (repl <= 0) { |
3293 | 0 | if (repl == -1) |
3294 | 0 | goto out; |
3295 | 0 | goto switchstate; |
3296 | 0 | } |
3297 | 0 | curpeer->tx_hbt++; |
3298 | 0 | } |
3299 | | /* we get here when a peer_recv_msg() returns 0 in reql */ |
3300 | 0 | repl = peer_send_msgs(appctx, curpeer, curpeer->peers); |
3301 | 0 | if (repl <= 0) { |
3302 | 0 | if (repl == -1) |
3303 | 0 | goto out; |
3304 | 0 | goto switchstate; |
3305 | 0 | } |
3306 | | |
3307 | | /* noting more to do */ |
3308 | 0 | goto out; |
3309 | 0 | } |
3310 | 0 | case PEER_SESS_ST_EXIT: |
3311 | 0 | if (prev_state == PEER_SESS_ST_WAITMSG) |
3312 | 0 | _HA_ATOMIC_DEC(&connected_peers); |
3313 | 0 | prev_state = appctx->st0; |
3314 | 0 | TRACE_STATE("send status error message", PEERS_EV_SESS_IO|PEERS_EV_SESS_ERR, appctx, curpeer); |
3315 | 0 | if (peer_send_status_errormsg(appctx) == -1) |
3316 | 0 | goto out; |
3317 | 0 | appctx->st0 = PEER_SESS_ST_END; |
3318 | 0 | goto switchstate; |
3319 | 0 | case PEER_SESS_ST_ERRSIZE: { |
3320 | 0 | if (prev_state == PEER_SESS_ST_WAITMSG) |
3321 | 0 | _HA_ATOMIC_DEC(&connected_peers); |
3322 | 0 | prev_state = appctx->st0; |
3323 | 0 | TRACE_STATE("send error size message", PEERS_EV_SESS_IO|PEERS_EV_SESS_ERR, appctx, curpeer); |
3324 | 0 | if (peer_send_error_size_limitmsg(appctx) == -1) |
3325 | 0 | goto out; |
3326 | 0 | appctx->st0 = PEER_SESS_ST_END; |
3327 | 0 | goto switchstate; |
3328 | 0 | } |
3329 | 0 | case PEER_SESS_ST_ERRPROTO: { |
3330 | 0 | if (curpeer) |
3331 | 0 | curpeer->proto_err++; |
3332 | 0 | if (prev_state == PEER_SESS_ST_WAITMSG) |
3333 | 0 | _HA_ATOMIC_DEC(&connected_peers); |
3334 | 0 | prev_state = appctx->st0; |
3335 | 0 | TRACE_STATE("send proto error message", PEERS_EV_SESS_IO|PEERS_EV_SESS_ERR, appctx, curpeer); |
3336 | 0 | if (peer_send_error_protomsg(appctx) == -1) |
3337 | 0 | goto out; |
3338 | 0 | appctx->st0 = PEER_SESS_ST_END; |
3339 | 0 | prev_state = appctx->st0; |
3340 | 0 | } |
3341 | 0 | __fallthrough; |
3342 | 0 | case PEER_SESS_ST_END: { |
3343 | 0 | if (prev_state == PEER_SESS_ST_WAITMSG) |
3344 | 0 | _HA_ATOMIC_DEC(&connected_peers); |
3345 | 0 | prev_state = appctx->st0; |
3346 | 0 | TRACE_STATE("Terminate peer session", PEERS_EV_SESS_IO|PEERS_EV_SESS_END, appctx, curpeer); |
3347 | 0 | if (curpeer) { |
3348 | 0 | HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock); |
3349 | 0 | curpeer = NULL; |
3350 | 0 | } |
3351 | 0 | applet_set_eos(appctx); |
3352 | 0 | applet_reset_input(appctx); |
3353 | 0 | goto out; |
3354 | 0 | } |
3355 | 0 | } |
3356 | 0 | } |
3357 | 0 | out: |
3358 | | /* sc_opposite(sc)->flags |= SC_FL_RCV_ONCE; */ |
3359 | |
|
3360 | 0 | if (curpeer) |
3361 | 0 | HA_SPIN_UNLOCK(PEER_LOCK, &curpeer->lock); |
3362 | |
|
3363 | 0 | TRACE_LEAVE(PEERS_EV_SESS_IO, appctx, curpeer); |
3364 | 0 | return; |
3365 | 0 | } |
3366 | | |
3367 | | static struct applet peer_applet = { |
3368 | | .obj_type = OBJ_TYPE_APPLET, |
3369 | | .flags = APPLET_FL_NEW_API, |
3370 | | .name = "<PEER>", /* used for logging */ |
3371 | | .fct = peer_io_handler, |
3372 | | .rcv_buf = appctx_raw_rcv_buf, |
3373 | | .snd_buf = appctx_raw_snd_buf, |
3374 | | .init = peer_session_init, |
3375 | | .release = peer_session_release, |
3376 | | }; |
3377 | | |
3378 | | |
3379 | | /* |
3380 | | * Use this function to force a close of a peer session |
3381 | | */ |
3382 | | static void peer_session_forceshutdown(struct peer *peer) |
3383 | 0 | { |
3384 | 0 | struct appctx *appctx = peer->appctx; |
3385 | | |
3386 | | /* Note that the peer sessions which have just been created |
3387 | | * (->st0 == PEER_SESS_ST_CONNECT) must not |
3388 | | * be shutdown, if not, the TCP session will never be closed |
3389 | | * and stay in CLOSE_WAIT state after having been closed by |
3390 | | * the remote side. |
3391 | | */ |
3392 | 0 | if (!appctx || appctx->st0 == PEER_SESS_ST_CONNECT) |
3393 | 0 | return; |
3394 | | |
3395 | 0 | if (appctx->applet != &peer_applet) |
3396 | 0 | return; |
3397 | | |
3398 | 0 | TRACE_STATE("peer session shutdown", PEERS_EV_SESS_SHUT|PEERS_EV_SESS_END, appctx, peer); |
3399 | 0 | __peer_session_deinit(peer); |
3400 | |
|
3401 | 0 | appctx->st0 = PEER_SESS_ST_END; |
3402 | 0 | appctx_wakeup(appctx); |
3403 | 0 | } |
3404 | | |
3405 | | /* Pre-configures a peers frontend to accept incoming connections */ |
3406 | | void peers_setup_frontend(struct proxy *fe) |
3407 | 0 | { |
3408 | 0 | fe->mode = PR_MODE_PEERS; |
3409 | 0 | fe->maxconn = 0; |
3410 | 0 | fe->conn_retries = CONN_RETRIES; /* FIXME ignored since 91e785ed |
3411 | | * ("MINOR: stream: Rely on a per-stream max connection retries value") |
3412 | | * If this is really expected this should be set on the stream directly |
3413 | | * because the proxy is not part of the main proxy list and thus |
3414 | | * lacks the required post init for this setting to be considered |
3415 | | */ |
3416 | 0 | fe->timeout.connect = MS_TO_TICKS(1000); |
3417 | 0 | fe->timeout.client = MS_TO_TICKS(5000); |
3418 | 0 | fe->timeout.server = MS_TO_TICKS(5000); |
3419 | 0 | fe->accept = frontend_accept; |
3420 | 0 | fe->default_target = &peer_applet.obj_type; |
3421 | 0 | fe->options2 |= PR_O2_INDEPSTR | PR_O2_SMARTCON | PR_O2_SMARTACC; |
3422 | 0 | } |
3423 | | |
3424 | | /* |
3425 | | * Create a new peer session in assigned state (connect will start automatically) |
3426 | | */ |
3427 | | static struct appctx *peer_session_create(struct peers *peers, struct peer *peer) |
3428 | 0 | { |
3429 | 0 | struct appctx *appctx; |
3430 | 0 | unsigned int thr = 0; |
3431 | 0 | int idx; |
3432 | |
|
3433 | 0 | TRACE_ENTER(PEERS_EV_SESS_NEW, NULL, peer); |
3434 | |
|
3435 | 0 | peer->new_conn++; |
3436 | 0 | peer->reconnect = tick_add(now_ms, (stopping ? MS_TO_TICKS(PEER_LOCAL_RECONNECT_TIMEOUT) : MS_TO_TICKS(PEER_RECONNECT_TIMEOUT))); |
3437 | 0 | peer->heartbeat = TICK_ETERNITY; |
3438 | 0 | peer->statuscode = PEER_SESS_SC_CONNECTCODE; |
3439 | 0 | peer->last_hdshk = now_ms; |
3440 | |
|
3441 | 0 | for (idx = 0; idx < global.nbthread; idx++) |
3442 | 0 | thr = peers->applet_count[idx] < peers->applet_count[thr] ? idx : thr; |
3443 | 0 | appctx = appctx_new_on(&peer_applet, NULL, thr); |
3444 | 0 | if (!appctx) { |
3445 | 0 | TRACE_ERROR("peer APPCTX creation failed", PEERS_EV_SESS_NEW|PEERS_EV_SESS_END|PEERS_EV_SESS_ERR, NULL, peer); |
3446 | 0 | goto out_close; |
3447 | 0 | } |
3448 | 0 | appctx->svcctx = (void *)peer; |
3449 | |
|
3450 | 0 | appctx->st0 = PEER_SESS_ST_CONNECT; |
3451 | 0 | peer->appctx = appctx; |
3452 | |
|
3453 | 0 | HA_ATOMIC_INC(&peers->applet_count[thr]); |
3454 | 0 | appctx_wakeup(appctx); |
3455 | |
|
3456 | 0 | TRACE_LEAVE(PEERS_EV_SESS_NEW, appctx, peer); |
3457 | 0 | return appctx; |
3458 | | |
3459 | 0 | out_close: |
3460 | 0 | return NULL; |
3461 | 0 | } |
3462 | | |
3463 | | /* Clear LEARN flags to a given peer, dealing with aborts if it was assigned for |
3464 | | * learning. In this case, the resync timeout is re-armed. |
3465 | | */ |
3466 | | static void clear_peer_learning_status(struct peer *peer) |
3467 | 0 | { |
3468 | 0 | if (peer->learnstate != PEER_LR_ST_NOTASSIGNED) { |
3469 | 0 | struct peers *peers = peer->peers; |
3470 | | |
3471 | | /* unassign current peer for learning */ |
3472 | 0 | HA_ATOMIC_AND(&peers->flags, ~PEERS_F_RESYNC_ASSIGN); |
3473 | 0 | HA_ATOMIC_OR(&peers->flags, (peer->local ? PEERS_F_DBG_RESYNC_LOCALABORT : PEERS_F_DBG_RESYNC_REMOTEABORT)); |
3474 | | |
3475 | | /* reschedule a resync */ |
3476 | 0 | peer->peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(5000)); |
3477 | 0 | peer->learnstate = PEER_LR_ST_NOTASSIGNED; |
3478 | 0 | } |
3479 | 0 | peer->flags &= ~PEER_F_LEARN_NOTUP2DATE; |
3480 | 0 | } |
3481 | | |
3482 | | static void sync_peer_learn_state(struct peers *peers, struct peer *peer) |
3483 | 0 | { |
3484 | 0 | unsigned int flags = 0; |
3485 | |
|
3486 | 0 | if (peer->learnstate != PEER_LR_ST_FINISHED) |
3487 | 0 | return; |
3488 | | |
3489 | | /* The learning process is now finished */ |
3490 | 0 | if (peer->flags & PEER_F_LEARN_NOTUP2DATE) { |
3491 | | /* Partial resync */ |
3492 | 0 | flags |= (peer->local ? PEERS_F_DBG_RESYNC_LOCALPARTIAL : PEERS_F_DBG_RESYNC_REMOTEPARTIAL); |
3493 | 0 | peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); |
3494 | 0 | TRACE_STATE("learning finished, peer session partially resync", PEERS_EV_SESS_RESYNC, NULL, peer); |
3495 | 0 | } |
3496 | 0 | else { |
3497 | | /* Full resync */ |
3498 | 0 | struct peer *rem_peer; |
3499 | 0 | int commit_a_finish = 1; |
3500 | |
|
3501 | 0 | if (peer->srv->shard) { |
3502 | 0 | flags |= PEERS_F_DBG_RESYNC_REMOTEPARTIAL; |
3503 | 0 | peer->flags |= PEER_F_LEARN_NOTUP2DATE; |
3504 | 0 | for (rem_peer = peers->remote; rem_peer; rem_peer = rem_peer->next) { |
3505 | 0 | if (rem_peer->srv->shard && rem_peer != peer) { |
3506 | 0 | HA_SPIN_LOCK(PEER_LOCK, &rem_peer->lock); |
3507 | 0 | if (rem_peer->srv->shard == peer->srv->shard) { |
3508 | | /* flag all peers from same shard |
3509 | | * notup2date to disable request |
3510 | | * of a resync frm them |
3511 | | */ |
3512 | 0 | rem_peer->flags |= PEER_F_LEARN_NOTUP2DATE; |
3513 | 0 | } |
3514 | 0 | else if (!(rem_peer->flags & PEER_F_LEARN_NOTUP2DATE)) { |
3515 | | /* it remains some other shards not requested |
3516 | | * we don't commit a resync finish to request |
3517 | | * the other shards |
3518 | | */ |
3519 | 0 | commit_a_finish = 0; |
3520 | 0 | } |
3521 | 0 | HA_SPIN_UNLOCK(PEER_LOCK, &rem_peer->lock); |
3522 | 0 | } |
3523 | 0 | } |
3524 | |
|
3525 | 0 | if (!commit_a_finish) { |
3526 | | /* it remains some shard to request, we schedule a new request */ |
3527 | 0 | peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); |
3528 | 0 | TRACE_STATE("Resync in progress, some shard not resync yet", PEERS_EV_SESS_RESYNC, NULL, peer); |
3529 | 0 | } |
3530 | 0 | } |
3531 | |
|
3532 | 0 | if (commit_a_finish) { |
3533 | 0 | flags |= (PEERS_F_RESYNC_LOCAL_FINISHED|PEERS_F_RESYNC_REMOTE_FINISHED); |
3534 | 0 | flags |= (peer->local ? PEERS_F_DBG_RESYNC_LOCALFINISHED : PEERS_F_DBG_RESYNC_REMOTEFINISHED); |
3535 | 0 | TRACE_STATE("learning finished, peer session fully resync", PEERS_EV_SESS_RESYNC, NULL, peer); |
3536 | 0 | } |
3537 | 0 | } |
3538 | 0 | peer->learnstate = PEER_LR_ST_NOTASSIGNED; |
3539 | 0 | HA_ATOMIC_AND(&peers->flags, ~PEERS_F_RESYNC_ASSIGN); |
3540 | 0 | HA_ATOMIC_OR(&peers->flags, flags); |
3541 | |
|
3542 | 0 | if (peer->appctx) |
3543 | 0 | appctx_wakeup(peer->appctx); |
3544 | 0 | } |
3545 | | |
3546 | | /* Synchronise the peer applet state with its associated peers section. This |
3547 | | * function handles STARTING->RUNNING and STOPPING->STOPPED transitions. |
3548 | | */ |
3549 | | static void sync_peer_app_state(struct peers *peers, struct peer *peer) |
3550 | 0 | { |
3551 | 0 | if (peer->appstate == PEER_APP_ST_STOPPING) { |
3552 | 0 | clear_peer_learning_status(peer); |
3553 | 0 | peer->appstate = PEER_APP_ST_STOPPED; |
3554 | 0 | TRACE_STATE("peer session now stopped", PEERS_EV_SESS_END, NULL, peer); |
3555 | 0 | } |
3556 | 0 | else if (peer->appstate == PEER_APP_ST_STARTING) { |
3557 | 0 | clear_peer_learning_status(peer); |
3558 | 0 | if (peer->local & appctx_is_back(peer->appctx)) { |
3559 | | /* if local peer has accepted the connection (appctx is |
3560 | | * on the backend side), flag it to learn a lesson and |
3561 | | * be sure it will start immediately. This only happens |
3562 | | * if no resync is in progress and if the lacal resync |
3563 | | * was not already performed. |
3564 | | */ |
3565 | 0 | if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL && |
3566 | 0 | !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { |
3567 | | /* assign local peer for a lesson */ |
3568 | 0 | peer->learnstate = PEER_LR_ST_ASSIGNED; |
3569 | 0 | HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_LOCALASSIGN); |
3570 | 0 | TRACE_STATE("peer session assigned for a local resync", PEERS_EV_SESS_RESYNC|PEERS_EV_SESS_WAKE, NULL, peer); |
3571 | 0 | } |
3572 | 0 | } |
3573 | 0 | else if (!peer->local) { |
3574 | | /* If a connection was validated for a remote peer, flag |
3575 | | * it to learn a lesson but don't start it yet. The peer |
3576 | | * must request it explicitly. This only happens if no |
3577 | | * resync is in progress and if the remote resync was |
3578 | | * not already performed. |
3579 | | */ |
3580 | 0 | if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE && |
3581 | 0 | !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { |
3582 | | /* assign remote peer for a lesson */ |
3583 | 0 | peer->learnstate = PEER_LR_ST_ASSIGNED; |
3584 | 0 | HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_REMOTEASSIGN); |
3585 | 0 | TRACE_STATE("peer session assigned for a remote resync", PEERS_EV_SESS_RESYNC|PEERS_EV_SESS_WAKE, NULL, peer); |
3586 | 0 | } |
3587 | 0 | } |
3588 | 0 | peer->appstate = PEER_APP_ST_RUNNING; |
3589 | 0 | TRACE_STATE("peer session running", PEERS_EV_SESS_NEW|PEERS_EV_SESS_WAKE, NULL, peer); |
3590 | 0 | appctx_wakeup(peer->appctx); |
3591 | 0 | } |
3592 | 0 | } |
3593 | | |
3594 | | /* Process the sync task for a running process. It is called from process_peer_sync() only */ |
3595 | | static void __process_running_peer_sync(struct task *task, struct peers *peers, unsigned int state) |
3596 | 0 | { |
3597 | 0 | struct peer *peer; |
3598 | 0 | struct shared_table *st; |
3599 | 0 | int must_resched = 0; |
3600 | | |
3601 | | /* resync timeout set to TICK_ETERNITY means we just start |
3602 | | * a new process and timer was not initialized. |
3603 | | * We must arm this timer to switch to a request to a remote |
3604 | | * node if incoming connection from old local process never |
3605 | | * comes. |
3606 | | */ |
3607 | 0 | if (peers->resync_timeout == TICK_ETERNITY) |
3608 | 0 | peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); |
3609 | |
|
3610 | 0 | if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMLOCAL) && |
3611 | 0 | (!nb_oldpids || tick_is_expired(peers->resync_timeout, now_ms)) && |
3612 | 0 | !(peers->flags & PEERS_F_RESYNC_ASSIGN)) { |
3613 | | /* Resync from local peer needed |
3614 | | no peer was assigned for the lesson |
3615 | | and no old local peer found |
3616 | | or resync timeout expire */ |
3617 | | |
3618 | | /* flag no more resync from local, to try resync from remotes */ |
3619 | 0 | HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_LOCAL_FINISHED|PEERS_F_DBG_RESYNC_LOCALTIMEOUT); |
3620 | | |
3621 | | /* reschedule a resync */ |
3622 | 0 | peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); |
3623 | 0 | } |
3624 | | |
3625 | | /* For each session */ |
3626 | 0 | for (peer = peers->remote; peer; peer = peer->next) { |
3627 | 0 | if (HA_SPIN_TRYLOCK(PEER_LOCK, &peer->lock) != 0) { |
3628 | 0 | must_resched = 1; |
3629 | 0 | continue; |
3630 | 0 | } |
3631 | | |
3632 | 0 | sync_peer_learn_state(peers, peer); |
3633 | 0 | sync_peer_app_state(peers, peer); |
3634 | | |
3635 | | /* Peer changes, if any, were now ack by the sync task. Unblock |
3636 | | * the peer (any wakeup should already be performed, no need to |
3637 | | * do it here) |
3638 | | */ |
3639 | 0 | peer->flags &= ~PEER_F_WAIT_SYNCTASK_ACK; |
3640 | | |
3641 | | /* For each remote peers */ |
3642 | 0 | if (!peer->local) { |
3643 | 0 | if (!peer->appctx) { |
3644 | | /* no active peer connection */ |
3645 | 0 | if (peer->statuscode == 0 || |
3646 | 0 | ((peer->statuscode == PEER_SESS_SC_CONNECTCODE || |
3647 | 0 | peer->statuscode == PEER_SESS_SC_SUCCESSCODE || |
3648 | 0 | peer->statuscode == PEER_SESS_SC_CONNECTEDCODE) && |
3649 | 0 | tick_is_expired(peer->reconnect, now_ms))) { |
3650 | | /* connection never tried |
3651 | | * or previous peer connection established with success |
3652 | | * or previous peer connection failed while connecting |
3653 | | * and reconnection timer is expired */ |
3654 | | |
3655 | | /* retry a connect */ |
3656 | 0 | peer->appctx = peer_session_create(peers, peer); |
3657 | 0 | } |
3658 | 0 | else if (!tick_is_expired(peer->reconnect, now_ms)) { |
3659 | | /* If previous session failed during connection |
3660 | | * but reconnection timer is not expired */ |
3661 | | |
3662 | | /* reschedule task for reconnect */ |
3663 | 0 | task->expire = tick_first(task->expire, peer->reconnect); |
3664 | 0 | } |
3665 | | /* else do nothing */ |
3666 | 0 | } /* !peer->appctx */ |
3667 | 0 | else if (peer->statuscode == PEER_SESS_SC_SUCCESSCODE) { |
3668 | | /* current peer connection is active and established */ |
3669 | 0 | if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) && |
3670 | 0 | !(peers->flags & PEERS_F_RESYNC_ASSIGN) && |
3671 | 0 | !(peer->flags & PEER_F_LEARN_NOTUP2DATE)) { |
3672 | | /* Resync from a remote is needed |
3673 | | * and no peer was assigned for lesson |
3674 | | * and current peer may be up2date */ |
3675 | | |
3676 | | /* assign peer for the lesson */ |
3677 | 0 | peer->learnstate = PEER_LR_ST_ASSIGNED; |
3678 | 0 | HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_ASSIGN|PEERS_F_DBG_RESYNC_REMOTEASSIGN); |
3679 | 0 | TRACE_STATE("peer session assigned for a remote resync", PEERS_EV_SESS_RESYNC|PEERS_EV_SESS_WAKE, NULL, peer); |
3680 | | |
3681 | | /* wake up peer handler to handle a request of resync */ |
3682 | 0 | appctx_wakeup(peer->appctx); |
3683 | 0 | } |
3684 | 0 | else { |
3685 | 0 | int update_to_push = 0; |
3686 | | |
3687 | | /* Awake session if there is data to push */ |
3688 | 0 | for (st = peer->tables; st ; st = st->next) { |
3689 | 0 | if (st->last_pushed != st->table->localupdate) { |
3690 | | /* wake up the peer handler to push local updates */ |
3691 | 0 | update_to_push = 1; |
3692 | | /* There is no need to send a heartbeat message |
3693 | | * when some updates must be pushed. The remote |
3694 | | * peer will consider <peer> peer as alive when it will |
3695 | | * receive these updates. |
3696 | | */ |
3697 | 0 | peer->flags &= ~PEER_F_HEARTBEAT; |
3698 | | /* Re-schedule another one later. */ |
3699 | 0 | peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); |
3700 | | /* Refresh reconnect if necessary */ |
3701 | 0 | if (tick_is_expired(peer->reconnect, now_ms)) |
3702 | 0 | peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); |
3703 | | /* We are going to send updates, let's ensure we will |
3704 | | * come back to send heartbeat messages or to reconnect. |
3705 | | */ |
3706 | 0 | TRACE_DEVEL("wakeup peer session to send update", PEERS_EV_SESS_WAKE, NULL, peer); |
3707 | 0 | task->expire = tick_first(peer->reconnect, peer->heartbeat); |
3708 | 0 | appctx_wakeup(peer->appctx); |
3709 | 0 | break; |
3710 | 0 | } |
3711 | 0 | } |
3712 | | /* When there are updates to send we do not reconnect |
3713 | | * and do not send heartbeat message either. |
3714 | | */ |
3715 | 0 | if (!update_to_push) { |
3716 | 0 | if (tick_is_expired(peer->reconnect, now_ms)) { |
3717 | 0 | if (peer->flags & PEER_F_ALIVE) { |
3718 | | /* This peer was alive during a 'reconnect' period. |
3719 | | * Flag it as not alive again for the next period. |
3720 | | */ |
3721 | 0 | peer->flags &= ~PEER_F_ALIVE; |
3722 | 0 | TRACE_STATE("unresponsive peer session detected", PEERS_EV_SESS_SHUT, NULL, peer); |
3723 | 0 | peer->reconnect = tick_add(now_ms, MS_TO_TICKS(PEER_RECONNECT_TIMEOUT)); |
3724 | 0 | } |
3725 | 0 | else { |
3726 | 0 | peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); |
3727 | 0 | peer->heartbeat = TICK_ETERNITY; |
3728 | 0 | TRACE_STATE("dead peer session, force shutdown", PEERS_EV_SESS_SHUT, NULL, peer); |
3729 | 0 | peer_session_forceshutdown(peer); |
3730 | 0 | sync_peer_app_state(peers, peer); |
3731 | 0 | peer->no_hbt++; |
3732 | 0 | } |
3733 | 0 | } |
3734 | 0 | else if (tick_is_expired(peer->heartbeat, now_ms)) { |
3735 | 0 | peer->heartbeat = tick_add(now_ms, MS_TO_TICKS(PEER_HEARTBEAT_TIMEOUT)); |
3736 | 0 | peer->flags |= PEER_F_HEARTBEAT; |
3737 | 0 | TRACE_DEVEL("wakeup peer session to send heartbeat message", PEERS_EV_SESS_WAKE, NULL, peer); |
3738 | 0 | appctx_wakeup(peer->appctx); |
3739 | 0 | } |
3740 | 0 | task->expire = tick_first(peer->reconnect, peer->heartbeat); |
3741 | 0 | } |
3742 | 0 | } |
3743 | | /* else do nothing */ |
3744 | 0 | } /* SUCCESSCODE */ |
3745 | 0 | } /* !peer->peer->local */ |
3746 | |
|
3747 | 0 | HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock); |
3748 | 0 | } /* for */ |
3749 | | |
3750 | | /* Resync from remotes expired or no remote peer: consider resync is finished */ |
3751 | 0 | if (((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FROMREMOTE) && |
3752 | 0 | !(peers->flags & PEERS_F_RESYNC_ASSIGN) && |
3753 | 0 | (tick_is_expired(peers->resync_timeout, now_ms) || !peers->remote->next)) { |
3754 | | /* Resync from remote peer needed |
3755 | | * no peer was assigned for the lesson |
3756 | | * and resync timeout expire */ |
3757 | | |
3758 | | /* flag no more resync from remote, consider resync is finished */ |
3759 | 0 | HA_ATOMIC_OR(&peers->flags, PEERS_F_RESYNC_REMOTE_FINISHED|PEERS_F_DBG_RESYNC_REMOTETIMEOUT); |
3760 | 0 | } |
3761 | |
|
3762 | 0 | if (!must_resched && (peers->flags & PEERS_RESYNC_STATEMASK) != PEERS_RESYNC_FINISHED) { |
3763 | | /* Resync not finished*/ |
3764 | | /* reschedule task to resync timeout if not expired, to ended resync if needed */ |
3765 | 0 | if (!tick_is_expired(peers->resync_timeout, now_ms)) |
3766 | 0 | task->expire = tick_first(task->expire, peers->resync_timeout); |
3767 | 0 | } else if (must_resched) |
3768 | 0 | task_wakeup(task, TASK_WOKEN_OTHER); |
3769 | 0 | } |
3770 | | |
3771 | | /* Process the sync task for a stopping process. It is called from process_peer_sync() only */ |
3772 | | static void __process_stopping_peer_sync(struct task *task, struct peers *peers, unsigned int state) |
3773 | 0 | { |
3774 | 0 | struct peer *peer; |
3775 | 0 | struct shared_table *st; |
3776 | 0 | static int dont_stop = 0; |
3777 | | |
3778 | | /* For each peer */ |
3779 | 0 | for (peer = peers->remote; peer; peer = peer->next) { |
3780 | 0 | HA_SPIN_LOCK(PEER_LOCK, &peer->lock); |
3781 | |
|
3782 | 0 | sync_peer_learn_state(peers, peer); |
3783 | 0 | sync_peer_app_state(peers, peer); |
3784 | | |
3785 | | /* Peer changes, if any, were now ack by the sync task. Unblock |
3786 | | * the peer (any wakeup should already be performed, no need to |
3787 | | * do it here) |
3788 | | */ |
3789 | 0 | peer->flags &= ~PEER_F_WAIT_SYNCTASK_ACK; |
3790 | |
|
3791 | 0 | if ((state & TASK_WOKEN_SIGNAL) && !dont_stop) { |
3792 | | /* we're killing a connection, we must apply a random delay before |
3793 | | * retrying otherwise the other end will do the same and we can loop |
3794 | | * for a while. |
3795 | | */ |
3796 | 0 | peer->reconnect = tick_add(now_ms, MS_TO_TICKS(50 + ha_random() % 2000)); |
3797 | 0 | if (peer->appctx) { |
3798 | 0 | peer_session_forceshutdown(peer); |
3799 | 0 | sync_peer_app_state(peers, peer); |
3800 | 0 | } |
3801 | 0 | } |
3802 | |
|
3803 | 0 | HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock); |
3804 | 0 | } |
3805 | | |
3806 | | /* We've just received the signal */ |
3807 | 0 | if (state & TASK_WOKEN_SIGNAL) { |
3808 | 0 | if (!dont_stop) { |
3809 | | /* add DO NOT STOP flag if not present */ |
3810 | 0 | _HA_ATOMIC_INC(&jobs); |
3811 | 0 | dont_stop = 1; |
3812 | | |
3813 | | /* Set resync timeout for the local peer and request a immediate reconnect */ |
3814 | 0 | peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); |
3815 | 0 | peers->local->reconnect = tick_add(now_ms, 0); |
3816 | 0 | } |
3817 | 0 | } |
3818 | |
|
3819 | 0 | peer = peers->local; |
3820 | 0 | HA_SPIN_LOCK(PEER_LOCK, &peer->lock); |
3821 | 0 | if (peer->flags & PEER_F_LOCAL_TEACH_COMPLETE) { |
3822 | 0 | if (dont_stop) { |
3823 | | /* resync of new process was complete, current process can die now */ |
3824 | 0 | _HA_ATOMIC_DEC(&jobs); |
3825 | 0 | dont_stop = 0; |
3826 | 0 | for (st = peer->tables; st ; st = st->next) |
3827 | 0 | HA_ATOMIC_DEC(&st->table->refcnt); |
3828 | 0 | } |
3829 | 0 | } |
3830 | 0 | else if (!peer->appctx) { |
3831 | | /* Re-arm resync timeout if necessary */ |
3832 | 0 | if (!tick_isset(peers->resync_timeout)) |
3833 | 0 | peers->resync_timeout = tick_add(now_ms, MS_TO_TICKS(PEER_RESYNC_TIMEOUT)); |
3834 | | |
3835 | | /* If there's no active peer connection */ |
3836 | 0 | if ((peers->flags & PEERS_RESYNC_STATEMASK) == PEERS_RESYNC_FINISHED && |
3837 | 0 | !tick_is_expired(peers->resync_timeout, now_ms) && |
3838 | 0 | (peer->statuscode == 0 || |
3839 | 0 | peer->statuscode == PEER_SESS_SC_SUCCESSCODE || |
3840 | 0 | peer->statuscode == PEER_SESS_SC_CONNECTEDCODE || |
3841 | 0 | peer->statuscode == PEER_SESS_SC_TRYAGAIN)) { |
3842 | | /* The resync is finished for the local peer and |
3843 | | * the resync timeout is not expired and |
3844 | | * connection never tried |
3845 | | * or previous peer connection was successfully established |
3846 | | * or previous tcp connect succeeded but init state incomplete |
3847 | | * or during previous connect, peer replies a try again statuscode */ |
3848 | |
|
3849 | 0 | if (!tick_is_expired(peer->reconnect, now_ms)) { |
3850 | | /* reconnection timer is not expired. reschedule task for reconnect */ |
3851 | 0 | task->expire = tick_first(task->expire, peer->reconnect); |
3852 | 0 | } |
3853 | 0 | else { |
3854 | | /* connect to the local peer if we must push a local sync */ |
3855 | 0 | if (dont_stop) { |
3856 | 0 | peer_session_create(peers, peer); |
3857 | 0 | } |
3858 | 0 | } |
3859 | 0 | } |
3860 | 0 | else { |
3861 | | /* Other error cases */ |
3862 | 0 | if (dont_stop) { |
3863 | | /* unable to resync new process, current process can die now */ |
3864 | 0 | _HA_ATOMIC_DEC(&jobs); |
3865 | 0 | dont_stop = 0; |
3866 | 0 | for (st = peer->tables; st ; st = st->next) |
3867 | 0 | HA_ATOMIC_DEC(&st->table->refcnt); |
3868 | 0 | } |
3869 | 0 | } |
3870 | 0 | } |
3871 | 0 | else if (peer->statuscode == PEER_SESS_SC_SUCCESSCODE ) { |
3872 | | /* Reset resync timeout during a resync */ |
3873 | 0 | peers->resync_timeout = TICK_ETERNITY; |
3874 | | |
3875 | | /* current peer connection is active and established |
3876 | | * wake up all peer handlers to push remaining local updates */ |
3877 | 0 | for (st = peer->tables; st ; st = st->next) { |
3878 | 0 | if (st->last_pushed != st->table->localupdate) { |
3879 | 0 | appctx_wakeup(peer->appctx); |
3880 | 0 | break; |
3881 | 0 | } |
3882 | 0 | } |
3883 | 0 | } |
3884 | 0 | HA_SPIN_UNLOCK(PEER_LOCK, &peer->lock); |
3885 | 0 | } |
3886 | | |
3887 | | /* |
3888 | | * Task processing function to manage re-connect, peer session |
3889 | | * tasks wakeup on local update and heartbeat. Let's keep it exported so that it |
3890 | | * resolves in stack traces and "show tasks". |
3891 | | */ |
3892 | | struct task *process_peer_sync(struct task * task, void *context, unsigned int state) |
3893 | 0 | { |
3894 | 0 | struct peers *peers = context; |
3895 | |
|
3896 | 0 | task->expire = TICK_ETERNITY; |
3897 | |
|
3898 | 0 | if (!stopping) { |
3899 | | /* Normal case (not soft stop)*/ |
3900 | 0 | __process_running_peer_sync(task, peers, state); |
3901 | |
|
3902 | 0 | } |
3903 | 0 | else { |
3904 | | /* soft stop case */ |
3905 | 0 | __process_stopping_peer_sync(task, peers, state); |
3906 | 0 | } /* stopping */ |
3907 | | |
3908 | | /* Wakeup for re-connect */ |
3909 | 0 | return task; |
3910 | 0 | } |
3911 | | |
3912 | | |
3913 | | /* |
3914 | | * returns 0 in case of error. |
3915 | | */ |
3916 | | int peers_init_sync(struct peers *peers) |
3917 | 0 | { |
3918 | 0 | static uint operating_thread = 0; |
3919 | 0 | struct peer * curpeer; |
3920 | |
|
3921 | 0 | for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) { |
3922 | 0 | peers->peers_fe->maxconn += 3; |
3923 | 0 | } |
3924 | | |
3925 | | /* go backwards so as to distribute the load to other threads |
3926 | | * than the ones operating the stick-tables for small confs. |
3927 | | */ |
3928 | 0 | operating_thread = (operating_thread - 1) % global.nbthread; |
3929 | 0 | peers->sync_task = task_new_on(operating_thread); |
3930 | 0 | if (!peers->sync_task) |
3931 | 0 | return 0; |
3932 | | |
3933 | 0 | memset(peers->applet_count, 0, sizeof(peers->applet_count)); |
3934 | 0 | peers->sync_task->process = process_peer_sync; |
3935 | 0 | peers->sync_task->context = (void *)peers; |
3936 | 0 | peers->sighandler = signal_register_task(0, peers->sync_task, 0); |
3937 | 0 | task_wakeup(peers->sync_task, TASK_WOKEN_INIT); |
3938 | 0 | return 1; |
3939 | 0 | } |
3940 | | |
3941 | | /* |
3942 | | * Allocate a cache a dictionary entries used upon transmission. |
3943 | | */ |
3944 | | static struct dcache_tx *new_dcache_tx(size_t max_entries) |
3945 | 0 | { |
3946 | 0 | struct dcache_tx *d; |
3947 | 0 | struct ebpt_node *entries; |
3948 | |
|
3949 | 0 | d = malloc(sizeof *d); |
3950 | 0 | entries = calloc(max_entries, sizeof *entries); |
3951 | 0 | if (!d || !entries) |
3952 | 0 | goto err; |
3953 | | |
3954 | 0 | d->lru_key = 0; |
3955 | 0 | d->prev_lookup = NULL; |
3956 | 0 | d->cached_entries = EB_ROOT_UNIQUE; |
3957 | 0 | d->entries = entries; |
3958 | |
|
3959 | 0 | return d; |
3960 | | |
3961 | 0 | err: |
3962 | 0 | free(d); |
3963 | 0 | free(entries); |
3964 | 0 | return NULL; |
3965 | 0 | } |
3966 | | |
3967 | | /* |
3968 | | * Allocate a cache of dictionary entries with <name> as name and <max_entries> |
3969 | | * as maximum of entries. |
3970 | | * Return the dictionary cache if succeeded, NULL if not. |
3971 | | * Must be deallocated calling free_dcache(). |
3972 | | */ |
3973 | | static struct dcache *new_dcache(size_t max_entries) |
3974 | 0 | { |
3975 | 0 | struct dcache_tx *dc_tx; |
3976 | 0 | struct dcache *dc; |
3977 | 0 | struct dcache_rx *dc_rx; |
3978 | |
|
3979 | 0 | dc = calloc(1, sizeof *dc); |
3980 | 0 | dc_tx = new_dcache_tx(max_entries); |
3981 | 0 | dc_rx = calloc(max_entries, sizeof *dc_rx); |
3982 | 0 | if (!dc || !dc_tx || !dc_rx) |
3983 | 0 | goto err; |
3984 | | |
3985 | 0 | dc->tx = dc_tx; |
3986 | 0 | dc->rx = dc_rx; |
3987 | 0 | dc->max_entries = max_entries; |
3988 | |
|
3989 | 0 | return dc; |
3990 | | |
3991 | 0 | err: |
3992 | 0 | free(dc); |
3993 | 0 | free(dc_tx); |
3994 | 0 | free(dc_rx); |
3995 | 0 | return NULL; |
3996 | 0 | } |
3997 | | |
3998 | | /* |
3999 | | * Look for the dictionary entry with the value of <i> in <d> cache of dictionary |
4000 | | * entries used upon transmission. |
4001 | | * Return the entry if found, NULL if not. |
4002 | | */ |
4003 | | static struct ebpt_node *dcache_tx_lookup_value(struct dcache_tx *d, |
4004 | | struct dcache_tx_entry *i) |
4005 | 0 | { |
4006 | 0 | return ebpt_lookup(&d->cached_entries, i->entry.key); |
4007 | 0 | } |
4008 | | |
4009 | | /* |
4010 | | * Flush <dc> cache. |
4011 | | * Always succeeds. |
4012 | | */ |
4013 | | static inline void flush_dcache(struct peer *peer) |
4014 | 0 | { |
4015 | 0 | int i; |
4016 | 0 | struct dcache *dc = peer->dcache; |
4017 | |
|
4018 | 0 | for (i = 0; i < dc->max_entries; i++) { |
4019 | 0 | ebpt_delete(&dc->tx->entries[i]); |
4020 | 0 | dc->tx->entries[i].key = NULL; |
4021 | 0 | dict_entry_unref(&server_key_dict, dc->rx[i].de); |
4022 | 0 | dc->rx[i].de = NULL; |
4023 | 0 | } |
4024 | 0 | dc->tx->prev_lookup = NULL; |
4025 | 0 | dc->tx->lru_key = 0; |
4026 | |
|
4027 | 0 | memset(dc->rx, 0, dc->max_entries * sizeof *dc->rx); |
4028 | 0 | } |
4029 | | |
4030 | | /* |
4031 | | * Insert a dictionary entry in <dc> cache part used upon transmission (->tx) |
4032 | | * with information provided by <i> dictionary cache entry (especially the value |
4033 | | * to be inserted if not already). Return <i> if already present in the cache |
4034 | | * or something different of <i> if not. |
4035 | | */ |
4036 | | static struct ebpt_node *dcache_tx_insert(struct dcache *dc, struct dcache_tx_entry *i) |
4037 | 0 | { |
4038 | 0 | struct dcache_tx *dc_tx; |
4039 | 0 | struct ebpt_node *o; |
4040 | |
|
4041 | 0 | dc_tx = dc->tx; |
4042 | |
|
4043 | 0 | if (dc_tx->prev_lookup && dc_tx->prev_lookup->key == i->entry.key) { |
4044 | 0 | o = dc_tx->prev_lookup; |
4045 | 0 | } else { |
4046 | 0 | o = dcache_tx_lookup_value(dc_tx, i); |
4047 | 0 | if (o) { |
4048 | | /* Save it */ |
4049 | 0 | dc_tx->prev_lookup = o; |
4050 | 0 | } |
4051 | 0 | } |
4052 | |
|
4053 | 0 | if (o) { |
4054 | | /* Copy the ID. */ |
4055 | 0 | i->id = o - dc->tx->entries; |
4056 | 0 | return &i->entry; |
4057 | 0 | } |
4058 | | |
4059 | | /* The new entry to put in cache */ |
4060 | 0 | dc_tx->prev_lookup = o = &dc_tx->entries[dc_tx->lru_key]; |
4061 | |
|
4062 | 0 | ebpt_delete(o); |
4063 | 0 | o->key = i->entry.key; |
4064 | 0 | ebpt_insert(&dc_tx->cached_entries, o); |
4065 | 0 | i->id = dc_tx->lru_key; |
4066 | | |
4067 | | /* Update the index for the next entry to put in cache */ |
4068 | 0 | dc_tx->lru_key = (dc_tx->lru_key + 1) & (dc->max_entries - 1); |
4069 | |
|
4070 | 0 | return o; |
4071 | 0 | } |
4072 | | |
4073 | | /* |
4074 | | * Allocate a dictionary cache for each peer of <peers> section. |
4075 | | * Return 1 if succeeded, 0 if not. |
4076 | | */ |
4077 | | int peers_alloc_dcache(struct peers *peers) |
4078 | 0 | { |
4079 | 0 | struct peer *p; |
4080 | |
|
4081 | 0 | for (p = peers->remote; p; p = p->next) { |
4082 | 0 | p->dcache = new_dcache(PEER_STKT_CACHE_MAX_ENTRIES); |
4083 | 0 | if (!p->dcache) |
4084 | 0 | return 0; |
4085 | 0 | } |
4086 | | |
4087 | 0 | return 1; |
4088 | 0 | } |
4089 | | |
4090 | | /* |
4091 | | * Function used to register a table for sync on a group of peers |
4092 | | * Returns 0 in case of success. |
4093 | | */ |
4094 | | int peers_register_table(struct peers *peers, struct stktable *table) |
4095 | 0 | { |
4096 | 0 | struct shared_table *st; |
4097 | 0 | struct peer * curpeer; |
4098 | 0 | int id = 0; |
4099 | 0 | int retval = 0; |
4100 | |
|
4101 | 0 | for (curpeer = peers->remote; curpeer; curpeer = curpeer->next) { |
4102 | 0 | st = calloc(1,sizeof(*st)); |
4103 | 0 | if (!st) { |
4104 | 0 | retval = 1; |
4105 | 0 | break; |
4106 | 0 | } |
4107 | 0 | st->table = table; |
4108 | 0 | st->next = curpeer->tables; |
4109 | 0 | if (curpeer->tables) |
4110 | 0 | id = curpeer->tables->local_id; |
4111 | 0 | st->local_id = id + 1; |
4112 | | |
4113 | | /* If peer is local we inc table |
4114 | | * refcnt to protect against flush |
4115 | | * until this process pushed all |
4116 | | * table content to the new one |
4117 | | */ |
4118 | 0 | if (curpeer->local) |
4119 | 0 | HA_ATOMIC_INC(&st->table->refcnt); |
4120 | 0 | curpeer->tables = st; |
4121 | 0 | } |
4122 | |
|
4123 | 0 | table->sync_task = peers->sync_task; |
4124 | |
|
4125 | 0 | return retval; |
4126 | 0 | } |
4127 | | |
4128 | | /* context used by a "show peers" command */ |
4129 | | struct show_peers_ctx { |
4130 | | void *target; /* if non-null, dump only this section and stop */ |
4131 | | struct peers *peers; /* "peers" section being currently dumped. */ |
4132 | | struct peer *peer; /* "peer" being currently dumped. */ |
4133 | | int flags; /* non-zero if "dict" dump requested */ |
4134 | | enum { |
4135 | | STATE_HEAD = 0, /* dump the section's header */ |
4136 | | STATE_PEER, /* dump the whole peer */ |
4137 | | STATE_DONE, /* finished */ |
4138 | | } state; /* parser's state */ |
4139 | | }; |
4140 | | |
4141 | | /* |
4142 | | * Parse the "show peers" command arguments. |
4143 | | * Returns 0 if succeeded, 1 if not with the ->msg of the appctx set as |
4144 | | * error message. |
4145 | | */ |
4146 | | static int cli_parse_show_peers(char **args, char *payload, struct appctx *appctx, void *private) |
4147 | 0 | { |
4148 | 0 | struct show_peers_ctx *ctx = applet_reserve_svcctx(appctx, sizeof(*ctx)); |
4149 | |
|
4150 | 0 | if (strcmp(args[2], "dict") == 0) { |
4151 | | /* show the dictionaries (large dump) */ |
4152 | 0 | ctx->flags |= PEERS_SHOW_F_DICT; |
4153 | 0 | args++; |
4154 | 0 | } else if (strcmp(args[2], "-") == 0) |
4155 | 0 | args++; // allows to show a section called "dict" |
4156 | |
|
4157 | 0 | if (*args[2]) { |
4158 | 0 | struct peers *p; |
4159 | |
|
4160 | 0 | for (p = cfg_peers; p; p = p->next) { |
4161 | 0 | if (strcmp(p->id, args[2]) == 0) { |
4162 | 0 | ctx->target = p; |
4163 | 0 | break; |
4164 | 0 | } |
4165 | 0 | } |
4166 | |
|
4167 | 0 | if (!p) |
4168 | 0 | return cli_err(appctx, "No such peers\n"); |
4169 | 0 | } |
4170 | | |
4171 | | /* where to start from */ |
4172 | 0 | ctx->peers = ctx->target ? ctx->target : cfg_peers; |
4173 | 0 | return 0; |
4174 | 0 | } |
4175 | | |
4176 | | /* |
4177 | | * This function dumps the peer state information of <peers> "peers" section. |
4178 | | * Returns 0 if the output buffer is full and needs to be called again, non-zero if not. |
4179 | | * Dedicated to be called by cli_io_handler_show_peers() cli I/O handler. |
4180 | | */ |
4181 | | static int peers_dump_head(struct buffer *msg, struct appctx *appctx, struct peers *peers) |
4182 | 0 | { |
4183 | 0 | struct tm tm; |
4184 | |
|
4185 | 0 | get_localtime(peers->last_change, &tm); |
4186 | 0 | chunk_appendf(msg, "%p: [%02d/%s/%04d:%02d:%02d:%02d] id=%s disabled=%d flags=0x%x resync_timeout=%s task_calls=%u\n", |
4187 | 0 | peers, |
4188 | 0 | tm.tm_mday, monthname[tm.tm_mon], tm.tm_year+1900, |
4189 | 0 | tm.tm_hour, tm.tm_min, tm.tm_sec, |
4190 | 0 | peers->id, peers->disabled, HA_ATOMIC_LOAD(&peers->flags), |
4191 | 0 | peers->resync_timeout ? |
4192 | 0 | tick_is_expired(peers->resync_timeout, now_ms) ? "<PAST>" : |
4193 | 0 | human_time(TICKS_TO_MS(peers->resync_timeout - now_ms), |
4194 | 0 | TICKS_TO_MS(1000)) : "<NEVER>", |
4195 | 0 | peers->sync_task ? peers->sync_task->calls : 0); |
4196 | |
|
4197 | 0 | if (applet_putchk(appctx, msg) == -1) |
4198 | 0 | return 0; |
4199 | | |
4200 | 0 | return 1; |
4201 | 0 | } |
4202 | | |
4203 | | /* |
4204 | | * This function dumps <peer> state information. |
4205 | | * Returns 0 if the output buffer is full and needs to be called again, non-zero |
4206 | | * if not. Dedicated to be called by cli_io_handler_show_peers() cli I/O handler. |
4207 | | */ |
4208 | | static int peers_dump_peer(struct buffer *msg, struct appctx *appctx, struct peer *peer, int flags) |
4209 | 0 | { |
4210 | 0 | struct connection *conn; |
4211 | 0 | char pn[INET6_ADDRSTRLEN]; |
4212 | 0 | struct stconn *peer_cs; |
4213 | 0 | struct stream *peer_s; |
4214 | 0 | struct shared_table *st; |
4215 | |
|
4216 | 0 | addr_to_str(&peer->srv->addr, pn, sizeof pn); |
4217 | 0 | chunk_appendf(msg, " %p: id=%s(%s,%s) addr=%s:%d app_state=%s learn_state=%s last_status=%s", |
4218 | 0 | peer, peer->id, |
4219 | 0 | peer->local ? "local" : "remote", |
4220 | 0 | peer->appctx ? "active" : "inactive", |
4221 | 0 | pn, peer->srv->svc_port, |
4222 | 0 | peer_app_state_str(peer->appstate), |
4223 | 0 | peer_learn_state_str(peer->learnstate), |
4224 | 0 | statuscode_str(peer->statuscode)); |
4225 | |
|
4226 | 0 | chunk_appendf(msg, " last_hdshk=%s\n", |
4227 | 0 | peer->last_hdshk ? human_time(TICKS_TO_MS(now_ms - peer->last_hdshk), |
4228 | 0 | TICKS_TO_MS(1000)) : "<NEVER>"); |
4229 | |
|
4230 | 0 | chunk_appendf(msg, " reconnect=%s", |
4231 | 0 | peer->reconnect ? |
4232 | 0 | tick_is_expired(peer->reconnect, now_ms) ? "<PAST>" : |
4233 | 0 | human_time(TICKS_TO_MS(peer->reconnect - now_ms), |
4234 | 0 | TICKS_TO_MS(1000)) : "<NEVER>"); |
4235 | |
|
4236 | 0 | chunk_appendf(msg, " heartbeat=%s", |
4237 | 0 | peer->heartbeat ? |
4238 | 0 | tick_is_expired(peer->heartbeat, now_ms) ? "<PAST>" : |
4239 | 0 | human_time(TICKS_TO_MS(peer->heartbeat - now_ms), |
4240 | 0 | TICKS_TO_MS(1000)) : "<NEVER>"); |
4241 | |
|
4242 | 0 | chunk_appendf(msg, " confirm=%u tx_hbt=%u rx_hbt=%u no_hbt=%u new_conn=%u proto_err=%u coll=%u\n", |
4243 | 0 | peer->confirm, peer->tx_hbt, peer->rx_hbt, |
4244 | 0 | peer->no_hbt, peer->new_conn, peer->proto_err, peer->coll); |
4245 | |
|
4246 | 0 | chunk_appendf(&trash, " flags=0x%x", peer->flags); |
4247 | |
|
4248 | 0 | if (!peer->appctx) |
4249 | 0 | goto table_info; |
4250 | | |
4251 | 0 | chunk_appendf(&trash, " appctx:%p st0=%d st1=%d task_calls=%u", |
4252 | 0 | peer->appctx, peer->appctx->st0, peer->appctx->st1, |
4253 | 0 | peer->appctx->t ? peer->appctx->t->calls : 0); |
4254 | |
|
4255 | 0 | peer_cs = appctx_sc(peer->appctx); |
4256 | 0 | if (!peer_cs) { |
4257 | | /* the appctx might exist but not yet be initialized due to |
4258 | | * deferred initialization used to balance applets across |
4259 | | * threads. |
4260 | | */ |
4261 | 0 | goto table_info; |
4262 | 0 | } |
4263 | | |
4264 | 0 | peer_s = __sc_strm(peer_cs); |
4265 | |
|
4266 | 0 | chunk_appendf(&trash, " state=%s", sc_state_str(sc_opposite(peer_cs)->state)); |
4267 | |
|
4268 | 0 | conn = objt_conn(strm_orig(peer_s)); |
4269 | 0 | if (conn) |
4270 | 0 | chunk_appendf(&trash, "\n xprt=%s", conn_get_xprt_name(conn)); |
4271 | |
|
4272 | 0 | switch (conn && conn_get_src(conn) ? addr_to_str(conn->src, pn, sizeof(pn)) : AF_UNSPEC) { |
4273 | 0 | case AF_INET: |
4274 | 0 | case AF_INET6: |
4275 | 0 | chunk_appendf(&trash, " src=%s:%d", pn, get_host_port(conn->src)); |
4276 | 0 | break; |
4277 | 0 | case AF_UNIX: |
4278 | 0 | case AF_CUST_ABNS: |
4279 | 0 | case AF_CUST_ABNSZ: |
4280 | 0 | chunk_appendf(&trash, " src=unix:%d", strm_li(peer_s)->luid); |
4281 | 0 | break; |
4282 | 0 | } |
4283 | | |
4284 | 0 | switch (conn && conn_get_dst(conn) ? addr_to_str(conn->dst, pn, sizeof(pn)) : AF_UNSPEC) { |
4285 | 0 | case AF_INET: |
4286 | 0 | case AF_INET6: |
4287 | 0 | chunk_appendf(&trash, " addr=%s:%d", pn, get_host_port(conn->dst)); |
4288 | 0 | break; |
4289 | 0 | case AF_UNIX: |
4290 | 0 | case AF_CUST_ABNS: |
4291 | 0 | case AF_CUST_ABNSZ: |
4292 | 0 | chunk_appendf(&trash, " addr=unix:%d", strm_li(peer_s)->luid); |
4293 | 0 | break; |
4294 | 0 | } |
4295 | | |
4296 | 0 | table_info: |
4297 | 0 | if (peer->remote_table) |
4298 | 0 | chunk_appendf(&trash, "\n remote_table:%p id=%s local_id=%d remote_id=%d", |
4299 | 0 | peer->remote_table, |
4300 | 0 | peer->remote_table->table->id, |
4301 | 0 | peer->remote_table->local_id, |
4302 | 0 | peer->remote_table->remote_id); |
4303 | |
|
4304 | 0 | if (peer->last_local_table) |
4305 | 0 | chunk_appendf(&trash, "\n last_local_table:%p id=%s local_id=%d remote_id=%d", |
4306 | 0 | peer->last_local_table, |
4307 | 0 | peer->last_local_table->table->id, |
4308 | 0 | peer->last_local_table->local_id, |
4309 | 0 | peer->last_local_table->remote_id); |
4310 | |
|
4311 | 0 | if (peer->tables) { |
4312 | 0 | chunk_appendf(&trash, "\n shared tables:"); |
4313 | 0 | for (st = peer->tables; st; st = st->next) { |
4314 | 0 | int i, count; |
4315 | 0 | struct stktable *t; |
4316 | 0 | struct dcache *dcache; |
4317 | |
|
4318 | 0 | t = st->table; |
4319 | 0 | dcache = peer->dcache; |
4320 | |
|
4321 | 0 | chunk_appendf(&trash, "\n %p local_id=%d remote_id=%d " |
4322 | 0 | "flags=0x%x remote_data=0x%llx", |
4323 | 0 | st, st->local_id, st->remote_id, |
4324 | 0 | st->flags, (unsigned long long)st->remote_data); |
4325 | 0 | chunk_appendf(&trash, "\n last_acked=%u last_pushed=%u last_get=%u" |
4326 | 0 | " teaching_origin=%u update=%u", |
4327 | 0 | st->last_acked, st->last_pushed, st->last_get, st->teaching_origin, st->update); |
4328 | 0 | chunk_appendf(&trash, "\n table:%p id=%s update=%u localupdate=%u refcnt=%u", |
4329 | 0 | t, t->id, t->update, t->localupdate, t->refcnt); |
4330 | 0 | if (flags & PEERS_SHOW_F_DICT) { |
4331 | 0 | chunk_appendf(&trash, "\n TX dictionary cache:"); |
4332 | 0 | count = 0; |
4333 | 0 | for (i = 0; i < dcache->max_entries; i++) { |
4334 | 0 | struct ebpt_node *node; |
4335 | 0 | struct dict_entry *de; |
4336 | |
|
4337 | 0 | node = &dcache->tx->entries[i]; |
4338 | 0 | if (!node->key) |
4339 | 0 | break; |
4340 | | |
4341 | 0 | if (!count++) |
4342 | 0 | chunk_appendf(&trash, "\n "); |
4343 | 0 | de = node->key; |
4344 | 0 | chunk_appendf(&trash, " %3u -> %s", i, (char *)de->value.key); |
4345 | 0 | count &= 0x3; |
4346 | 0 | } |
4347 | 0 | chunk_appendf(&trash, "\n RX dictionary cache:"); |
4348 | 0 | count = 0; |
4349 | 0 | for (i = 0; i < dcache->max_entries; i++) { |
4350 | 0 | if (!count++) |
4351 | 0 | chunk_appendf(&trash, "\n "); |
4352 | 0 | chunk_appendf(&trash, " %3u -> %s", i, |
4353 | 0 | dcache->rx[i].de ? |
4354 | 0 | (char *)dcache->rx[i].de->value.key : "-"); |
4355 | 0 | count &= 0x3; |
4356 | 0 | } |
4357 | 0 | } else { |
4358 | 0 | chunk_appendf(&trash, "\n Dictionary cache not dumped (use \"show peers dict\")"); |
4359 | 0 | } |
4360 | 0 | } |
4361 | 0 | } |
4362 | |
|
4363 | 0 | end: |
4364 | 0 | chunk_appendf(&trash, "\n"); |
4365 | 0 | if (applet_putchk(appctx, msg) == -1) |
4366 | 0 | return 0; |
4367 | | |
4368 | 0 | return 1; |
4369 | 0 | } |
4370 | | |
4371 | | /* |
4372 | | * This function dumps all the peers of "peers" section. |
4373 | | * Returns 0 if the output buffer is full and needs to be called |
4374 | | * again, non-zero if not. It proceeds in an isolated thread, so |
4375 | | * there is no thread safety issue here. |
4376 | | */ |
4377 | | static int cli_io_handler_show_peers(struct appctx *appctx) |
4378 | 0 | { |
4379 | 0 | struct show_peers_ctx *ctx = appctx->svcctx; |
4380 | 0 | int ret = 0, first_peers = 1; |
4381 | |
|
4382 | 0 | thread_isolate(); |
4383 | |
|
4384 | 0 | chunk_reset(&trash); |
4385 | |
|
4386 | 0 | while (ctx->state != STATE_DONE) { |
4387 | 0 | switch (ctx->state) { |
4388 | 0 | case STATE_HEAD: |
4389 | 0 | if (!ctx->peers) { |
4390 | | /* No more peers list. */ |
4391 | 0 | ctx->state = STATE_DONE; |
4392 | 0 | } |
4393 | 0 | else { |
4394 | 0 | if (!first_peers) |
4395 | 0 | chunk_appendf(&trash, "\n"); |
4396 | 0 | else |
4397 | 0 | first_peers = 0; |
4398 | 0 | if (!peers_dump_head(&trash, appctx, ctx->peers)) |
4399 | 0 | goto out; |
4400 | | |
4401 | 0 | ctx->peer = ctx->peers->remote; |
4402 | 0 | ctx->peers = ctx->peers->next; |
4403 | 0 | ctx->state = STATE_PEER; |
4404 | 0 | } |
4405 | 0 | break; |
4406 | | |
4407 | 0 | case STATE_PEER: |
4408 | 0 | if (!ctx->peer) { |
4409 | | /* End of peer list */ |
4410 | 0 | if (!ctx->target) |
4411 | 0 | ctx->state = STATE_HEAD; // next one |
4412 | 0 | else |
4413 | 0 | ctx->state = STATE_DONE; |
4414 | 0 | } |
4415 | 0 | else { |
4416 | 0 | if (!peers_dump_peer(&trash, appctx, ctx->peer, ctx->flags)) |
4417 | 0 | goto out; |
4418 | | |
4419 | 0 | ctx->peer = ctx->peer->next; |
4420 | 0 | } |
4421 | 0 | break; |
4422 | | |
4423 | 0 | default: |
4424 | 0 | break; |
4425 | 0 | } |
4426 | 0 | } |
4427 | 0 | ret = 1; |
4428 | 0 | out: |
4429 | 0 | thread_release(); |
4430 | 0 | return ret; |
4431 | 0 | } |
4432 | | |
4433 | | |
4434 | | struct peers_kw_list peers_keywords = { |
4435 | | .list = LIST_HEAD_INIT(peers_keywords.list) |
4436 | | }; |
4437 | | |
4438 | | void peers_register_keywords(struct peers_kw_list *pkwl) |
4439 | 0 | { |
4440 | 0 | LIST_APPEND(&peers_keywords.list, &pkwl->list); |
4441 | 0 | } |
4442 | | |
4443 | | /* config parser for global "tune.peers.max-updates-at-once" */ |
4444 | | static int cfg_parse_max_updt_at_once(char **args, int section_type, struct proxy *curpx, |
4445 | | const struct proxy *defpx, const char *file, int line, |
4446 | | char **err) |
4447 | 0 | { |
4448 | 0 | int arg = -1; |
4449 | |
|
4450 | 0 | if (too_many_args(1, args, err, NULL)) |
4451 | 0 | return -1; |
4452 | | |
4453 | 0 | if (*(args[1]) != 0) |
4454 | 0 | arg = atoi(args[1]); |
4455 | |
|
4456 | 0 | if (arg < 1) { |
4457 | 0 | memprintf(err, "'%s' expects an integer argument greater than 0.", args[0]); |
4458 | 0 | return -1; |
4459 | 0 | } |
4460 | | |
4461 | 0 | peers_max_updates_at_once = arg; |
4462 | 0 | return 0; |
4463 | 0 | } |
4464 | | |
4465 | | /* config keyword parsers */ |
4466 | | static struct cfg_kw_list cfg_kws = {ILH, { |
4467 | | { CFG_GLOBAL, "tune.peers.max-updates-at-once", cfg_parse_max_updt_at_once }, |
4468 | | { 0, NULL, NULL } |
4469 | | }}; |
4470 | | |
4471 | | INITCALL1(STG_REGISTER, cfg_register_keywords, &cfg_kws); |
4472 | | |
4473 | | /* |
4474 | | * CLI keywords. |
4475 | | */ |
4476 | | static struct cli_kw_list cli_kws = {{ }, { |
4477 | | { { "show", "peers", NULL }, "show peers [dict|-] [section] : dump some information about all the peers or this peers section", cli_parse_show_peers, cli_io_handler_show_peers, }, |
4478 | | {}, |
4479 | | }}; |
4480 | | |
4481 | | /* Register cli keywords */ |
4482 | | INITCALL1(STG_REGISTER, cli_register_kw, &cli_kws); |