/src/libwebsockets/lib/system/smd/smd.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * lws System Message Distribution |
3 | | * |
4 | | * Copyright (C) 2019 - 2021 Andy Green <andy@warmcat.com> |
5 | | * |
6 | | * Permission is hereby granted, free of charge, to any person obtaining a copy |
7 | | * of this software and associated documentation files (the "Software"), to |
8 | | * deal in the Software without restriction, including without limitation the |
9 | | * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or |
10 | | * sell copies of the Software, and to permit persons to whom the Software is |
11 | | * furnished to do so, subject to the following conditions: |
12 | | * |
13 | | * The above copyright notice and this permission notice shall be included in |
14 | | * all copies or substantial portions of the Software. |
15 | | * |
16 | | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
17 | | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
18 | | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
19 | | * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
20 | | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
21 | | * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
22 | | * IN THE SOFTWARE. |
23 | | */ |
24 | | |
25 | | #include "private-lib-core.h" |
26 | | #include <assert.h> |
27 | | |
28 | | /* comment me to remove extra debug and sanity checks */ |
29 | | // #define LWS_SMD_DEBUG |
30 | | |
31 | | |
32 | | #if defined(LWS_SMD_DEBUG) |
33 | | #define lwsl_smd lwsl_notice |
34 | | #else |
35 | | #define lwsl_smd(_s, ...) |
36 | | #endif |
37 | | |
38 | | void * |
39 | | lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len) |
40 | 0 | { |
41 | 0 | lws_smd_msg_t *msg; |
42 | | |
43 | | /* only allow it if someone wants to consume this class of event */ |
44 | |
|
45 | 0 | if (!(ctx->smd._class_filter & _class)) { |
46 | 0 | lwsl_cx_info(ctx, "rejecting class 0x%x as no participant wants", |
47 | 0 | (unsigned int)_class); |
48 | 0 | return NULL; |
49 | 0 | } |
50 | | |
51 | 0 | assert(len <= LWS_SMD_MAX_PAYLOAD); |
52 | | |
53 | | |
54 | | /* |
55 | | * If SS configured, over-allocate LWS_SMD_SS_RX_HEADER_LEN behind |
56 | | * payload, ie, msg_t (gap LWS_SMD_SS_RX_HEADER_LEN) payload |
57 | | */ |
58 | 0 | msg = lws_malloc(sizeof(*msg) + LWS_SMD_SS_RX_HEADER_LEN_EFF + len, |
59 | 0 | __func__); |
60 | 0 | if (!msg) |
61 | 0 | return NULL; |
62 | | |
63 | 0 | memset(msg, 0, sizeof(*msg)); |
64 | 0 | msg->timestamp = lws_now_usecs(); |
65 | 0 | msg->length = (uint16_t)len; |
66 | 0 | msg->_class = _class; |
67 | |
|
68 | 0 | return ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF; |
69 | 0 | } |
70 | | |
71 | | void |
72 | | lws_smd_msg_free(void **ppay) |
73 | 0 | { |
74 | 0 | lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)*ppay) - |
75 | 0 | LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg)); |
76 | | |
77 | | /* if SS configured, actual alloc is LWS_SMD_SS_RX_HEADER_LEN behind */ |
78 | 0 | lws_free(msg); |
79 | 0 | *ppay = NULL; |
80 | 0 | } |
81 | | |
82 | | #if defined(LWS_SMD_DEBUG) |
83 | | static void |
84 | | lws_smd_dump(lws_smd_t *smd) |
85 | | { |
86 | | int n = 1; |
87 | | |
88 | | lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, |
89 | | smd->owner_messages.head) { |
90 | | lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); |
91 | | |
92 | | lwsl_info(" msg %d: %p: ref %d, lat %dms, cls: 0x%x, len %u: '%s'\n", |
93 | | n++, msg, msg->refcount, |
94 | | (unsigned int)((lws_now_usecs() - msg->timestamp) / 1000), |
95 | | msg->length, msg->_class, |
96 | | (const char *)&msg[1] + LWS_SMD_SS_RX_HEADER_LEN_EFF); |
97 | | |
98 | | } lws_end_foreach_dll_safe(p, p1); |
99 | | |
100 | | n = 1; |
101 | | lws_start_foreach_dll(struct lws_dll2 *, p, smd->owner_peers.head) { |
102 | | lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); |
103 | | |
104 | | lwsl_info(" peer %d: %p: tail: %p, filt 0x%x\n", |
105 | | n++, pr, pr->tail, pr->_class_filter); |
106 | | } lws_end_foreach_dll(p); |
107 | | } |
108 | | #endif |
109 | | |
110 | | static int |
111 | | _lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t *pr, lws_smd_msg_t *msg) |
112 | 0 | { |
113 | 0 | return !!(msg->_class & pr->_class_filter); |
114 | 0 | } |
115 | | |
116 | | /* |
117 | | * Figure out what to set the initial refcount for the message to |
118 | | */ |
119 | | |
120 | | static int |
121 | | _lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg, |
122 | | struct lws_smd_peer *exc) |
123 | 0 | { |
124 | 0 | struct lws_context *ctx = lws_container_of(smd, struct lws_context, smd); |
125 | 0 | int interested = 0; |
126 | |
|
127 | 0 | lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) { |
128 | 0 | lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); |
129 | |
|
130 | 0 | if (pr != exc && _lws_smd_msg_peer_interested_in_msg(pr, msg)) |
131 | | /* |
132 | | * This peer wants to consume it |
133 | | */ |
134 | 0 | interested++; |
135 | |
|
136 | 0 | } lws_end_foreach_dll(p); |
137 | |
|
138 | 0 | return interested; |
139 | 0 | } |
140 | | |
141 | | static int |
142 | | _lws_smd_class_mask_union(lws_smd_t *smd) |
143 | 0 | { |
144 | 0 | uint32_t mask = 0; |
145 | |
|
146 | 0 | lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, |
147 | 0 | smd->owner_peers.head) { |
148 | 0 | lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); |
149 | |
|
150 | 0 | mask |= pr->_class_filter; |
151 | |
|
152 | 0 | } lws_end_foreach_dll_safe(p, p1); |
153 | |
|
154 | 0 | smd->_class_filter = mask; |
155 | |
|
156 | 0 | return 0; |
157 | 0 | } |
158 | | |
159 | | /* Call with message lock held */ |
160 | | |
161 | | static void |
162 | | _lws_smd_msg_destroy(struct lws_context *cx, lws_smd_t *smd, lws_smd_msg_t *msg) |
163 | 0 | { |
164 | | /* |
165 | | * We think we gave the message to everyone and can destroy it. |
166 | | * Sanity check that no peer holds a pointer to this guy |
167 | | */ |
168 | |
|
169 | 0 | lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, |
170 | 0 | smd->owner_peers.head) { |
171 | 0 | lws_smd_peer_t *xpr = lws_container_of(p, lws_smd_peer_t, list); |
172 | |
|
173 | 0 | if (xpr->tail == msg) { |
174 | 0 | lwsl_cx_err(cx, "peer %p has msg %p " |
175 | 0 | "we are about to destroy as tail", xpr, msg); |
176 | 0 | #if !defined(LWS_PLAT_FREERTOS) |
177 | 0 | assert(0); |
178 | 0 | #endif |
179 | 0 | } |
180 | |
|
181 | 0 | } lws_end_foreach_dll_safe(p, p1); |
182 | | |
183 | | /* |
184 | | * We have fully delivered the message now, it |
185 | | * can be unlinked and destroyed |
186 | | */ |
187 | 0 | lwsl_cx_info(cx, "destroy msg %p", msg); |
188 | 0 | lws_dll2_remove(&msg->list); |
189 | 0 | lws_free(msg); |
190 | 0 | } |
191 | | |
192 | | /* |
193 | | * This is wanting to be threadsafe, limiting the apis we can call |
194 | | */ |
195 | | |
196 | | int |
197 | | _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc) |
198 | 0 | { |
199 | 0 | lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)pay) - |
200 | 0 | LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg)); |
201 | |
|
202 | 0 | if (ctx->smd.owner_messages.count >= ctx->smd_queue_depth) { |
203 | 0 | lwsl_cx_warn(ctx, "rejecting message on queue depth %d", |
204 | 0 | (int)ctx->smd.owner_messages.count); |
205 | | /* reject the message due to max queue depth reached */ |
206 | 0 | return 1; |
207 | 0 | } |
208 | | |
209 | 0 | if (!ctx->smd.delivering && |
210 | 0 | lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */ |
211 | 0 | return 1; /* For Coverity */ |
212 | | |
213 | 0 | if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */ |
214 | 0 | goto bail; |
215 | | |
216 | 0 | msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested( |
217 | 0 | &ctx->smd, msg, exc); |
218 | 0 | if (!msg->refcount) { |
219 | | /* possible, condsidering exc and no other participants */ |
220 | 0 | lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */ |
221 | |
|
222 | 0 | lws_free(msg); |
223 | 0 | if (!ctx->smd.delivering) |
224 | 0 | lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ |
225 | |
|
226 | 0 | return 0; |
227 | 0 | } |
228 | | |
229 | 0 | msg->exc = exc; |
230 | | |
231 | | /* let's add him on the queue... */ |
232 | |
|
233 | 0 | lws_dll2_add_tail(&msg->list, &ctx->smd.owner_messages); |
234 | | |
235 | | /* |
236 | | * Any peer with no active tail needs to check our class to see if we |
237 | | * should become his tail |
238 | | */ |
239 | |
|
240 | 0 | lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) { |
241 | 0 | lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); |
242 | |
|
243 | 0 | if (pr != exc && |
244 | 0 | !pr->tail && _lws_smd_msg_peer_interested_in_msg(pr, msg)) { |
245 | 0 | pr->tail = msg; |
246 | | /* tail message has to actually be of interest to the peer */ |
247 | 0 | assert(!pr->tail || (pr->tail->_class & pr->_class_filter)); |
248 | 0 | } |
249 | |
|
250 | 0 | } lws_end_foreach_dll(p); |
251 | |
|
252 | | #if defined(LWS_SMD_DEBUG) |
253 | | lwsl_smd("%s: added %p (refc %u) depth now %d\n", __func__, |
254 | | msg, msg->refcount, ctx->smd.owner_messages.count); |
255 | | lws_smd_dump(&ctx->smd); |
256 | | #endif |
257 | |
|
258 | 0 | lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */ |
259 | |
|
260 | 0 | bail: |
261 | 0 | if (!ctx->smd.delivering) |
262 | 0 | lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ |
263 | | |
264 | | /* we may be happening from another thread context */ |
265 | 0 | lws_cancel_service(ctx); |
266 | |
|
267 | 0 | return 0; |
268 | 0 | } |
269 | | |
270 | | /* |
271 | | * This is wanting to be threadsafe, limiting the apis we can call |
272 | | */ |
273 | | |
274 | | int |
275 | | lws_smd_msg_send(struct lws_context *ctx, void *pay) |
276 | 0 | { |
277 | 0 | return _lws_smd_msg_send(ctx, pay, NULL); |
278 | 0 | } |
279 | | |
280 | | /* |
281 | | * This is wanting to be threadsafe, limiting the apis we can call |
282 | | */ |
283 | | |
284 | | int |
285 | | lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class, |
286 | | const char *format, ...) |
287 | 0 | { |
288 | 0 | lws_smd_msg_t *msg; |
289 | 0 | va_list ap; |
290 | 0 | void *p; |
291 | 0 | int n; |
292 | |
|
293 | 0 | if (!(ctx->smd._class_filter & _class)) |
294 | | /* |
295 | | * There's nobody interested in messages of this class atm. |
296 | | * Don't bother generating it, and act like all is well. |
297 | | */ |
298 | 0 | return 0; |
299 | | |
300 | 0 | va_start(ap, format); |
301 | 0 | n = vsnprintf(NULL, 0, format, ap); |
302 | 0 | va_end(ap); |
303 | 0 | if (n > LWS_SMD_MAX_PAYLOAD) |
304 | | /* too large to send */ |
305 | 0 | return 1; |
306 | | |
307 | 0 | p = lws_smd_msg_alloc(ctx, _class, (size_t)n + 2); |
308 | 0 | if (!p) |
309 | 0 | return 1; |
310 | 0 | msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF - |
311 | 0 | sizeof(*msg)); |
312 | 0 | msg->length = (uint16_t)n; |
313 | 0 | va_start(ap, format); |
314 | 0 | vsnprintf((char *)p, (unsigned int)n + 2, format, ap); |
315 | 0 | va_end(ap); |
316 | | |
317 | | /* |
318 | | * locks taken and released in here |
319 | | */ |
320 | |
|
321 | 0 | if (lws_smd_msg_send(ctx, p)) { |
322 | 0 | lws_smd_msg_free(&p); |
323 | 0 | return 1; |
324 | 0 | } |
325 | | |
326 | 0 | return 0; |
327 | 0 | } |
328 | | |
329 | | #if defined(LWS_WITH_SECURE_STREAMS) |
330 | | int |
331 | | lws_smd_ss_msg_printf(const char *tag, uint8_t *buf, size_t *len, |
332 | | lws_smd_class_t _class, const char *format, ...) |
333 | 0 | { |
334 | 0 | char *content = (char *)buf + LWS_SMD_SS_RX_HEADER_LEN; |
335 | 0 | va_list ap; |
336 | 0 | int n; |
337 | |
|
338 | 0 | if (*len < LWS_SMD_SS_RX_HEADER_LEN) |
339 | 0 | return 1; |
340 | | |
341 | 0 | lws_ser_wu64be(buf, _class); |
342 | 0 | lws_ser_wu64be(buf + 8, 0); /* valgrind notices uninitialized if left */ |
343 | |
|
344 | 0 | va_start(ap, format); |
345 | 0 | n = vsnprintf(content, (*len) - LWS_SMD_SS_RX_HEADER_LEN, format, ap); |
346 | 0 | va_end(ap); |
347 | |
|
348 | 0 | if (n > LWS_SMD_MAX_PAYLOAD || |
349 | 0 | (unsigned int)n > (*len) - LWS_SMD_SS_RX_HEADER_LEN) |
350 | | /* too large to send */ |
351 | 0 | return 1; |
352 | | |
353 | 0 | *len = LWS_SMD_SS_RX_HEADER_LEN + (unsigned int)n; |
354 | |
|
355 | 0 | lwsl_info("%s: %s send cl 0x%x, len %u\n", __func__, tag, (unsigned int)_class, |
356 | 0 | (unsigned int)n); |
357 | |
|
358 | 0 | return 0; |
359 | 0 | } |
360 | | |
361 | | /* |
362 | | * This is a helper that user rx handler for LWS_SMD_STREAMTYPENAME SS can |
363 | | * call through to with the payload it received from the proxy. It will then |
364 | | * forward the recieved SMD message to all local (same-context) participants |
365 | | * that are interested in that class (except ones with callback skip_cb, so |
366 | | * we don't loop). |
367 | | */ |
368 | | |
369 | | static int |
370 | | _lws_smd_ss_rx_forward(struct lws_context *ctx, const char *tag, |
371 | | struct lws_smd_peer *pr, const uint8_t *buf, size_t len) |
372 | 0 | { |
373 | 0 | lws_smd_class_t _class; |
374 | 0 | lws_smd_msg_t *msg; |
375 | 0 | void *p; |
376 | |
|
377 | 0 | if (len < LWS_SMD_SS_RX_HEADER_LEN_EFF) |
378 | 0 | return 1; |
379 | | |
380 | 0 | if (len >= LWS_SMD_MAX_PAYLOAD + LWS_SMD_SS_RX_HEADER_LEN_EFF) |
381 | 0 | return 1; |
382 | | |
383 | 0 | _class = (lws_smd_class_t)lws_ser_ru64be(buf); |
384 | | |
385 | | //if (_class == LWSSMDCL_METRICS) { |
386 | | |
387 | | //} |
388 | | |
389 | | /* only locally forward messages that we care about in this process */ |
390 | |
|
391 | 0 | if (!(ctx->smd._class_filter & _class)) |
392 | | /* |
393 | | * There's nobody interested in messages of this class atm. |
394 | | * Don't bother generating it, and act like all is well. |
395 | | */ |
396 | 0 | return 0; |
397 | | |
398 | 0 | p = lws_smd_msg_alloc(ctx, _class, len); |
399 | 0 | if (!p) |
400 | 0 | return 1; |
401 | | |
402 | 0 | msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF - |
403 | 0 | sizeof(*msg)); |
404 | 0 | msg->length = (uint16_t)(len - LWS_SMD_SS_RX_HEADER_LEN_EFF); |
405 | | /* adopt the original source timestamp, not time we forwarded it */ |
406 | 0 | msg->timestamp = (lws_usec_t)lws_ser_ru64be(buf + 8); |
407 | | |
408 | | /* copy the message payload in */ |
409 | 0 | memcpy(p, buf + LWS_SMD_SS_RX_HEADER_LEN_EFF, msg->length); |
410 | | |
411 | | /* |
412 | | * locks taken and released in here |
413 | | */ |
414 | |
|
415 | 0 | if (_lws_smd_msg_send(ctx, p, pr)) { |
416 | | /* we couldn't send it after all that... */ |
417 | 0 | lws_smd_msg_free(&p); |
418 | |
|
419 | 0 | return 1; |
420 | 0 | } |
421 | | |
422 | 0 | lwsl_info("%s: %s send cl 0x%x, len %u, ts %llu\n", __func__, |
423 | 0 | tag, (unsigned int)_class, msg->length, |
424 | 0 | (unsigned long long)msg->timestamp); |
425 | |
|
426 | 0 | return 0; |
427 | 0 | } |
428 | | |
429 | | int |
430 | | lws_smd_ss_rx_forward(void *ss_user, const uint8_t *buf, size_t len) |
431 | 0 | { |
432 | 0 | struct lws_ss_handle *h = (struct lws_ss_handle *) |
433 | 0 | (((char *)ss_user) - sizeof(*h)); |
434 | 0 | struct lws_context *ctx = lws_ss_get_context(h); |
435 | |
|
436 | 0 | return _lws_smd_ss_rx_forward(ctx, lws_ss_tag(h), h->u.smd.smd_peer, buf, len); |
437 | 0 | } |
438 | | |
439 | | #if defined(LWS_WITH_SECURE_STREAMS_PROXY_API) |
440 | | int |
441 | | lws_smd_sspc_rx_forward(void *ss_user, const uint8_t *buf, size_t len) |
442 | | { |
443 | | struct lws_sspc_handle *h = (struct lws_sspc_handle *) |
444 | | (((char *)ss_user) - sizeof(*h)); |
445 | | struct lws_context *ctx = lws_sspc_get_context(h); |
446 | | |
447 | | return _lws_smd_ss_rx_forward(ctx, lws_sspc_tag(h), NULL, buf, len); |
448 | | } |
449 | | #endif |
450 | | |
451 | | #endif |
452 | | |
453 | | /* |
454 | | * Peers that deregister need to adjust the refcount of messages they would |
455 | | * have been interested in, but didn't take delivery of yet |
456 | | */ |
457 | | |
458 | | static void |
459 | | _lws_smd_peer_destroy(lws_smd_peer_t *pr) |
460 | 0 | { |
461 | 0 | lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, |
462 | 0 | owner_peers); |
463 | |
|
464 | 0 | if (lws_mutex_lock(smd->lock_messages)) /* +++++++++ messages */ |
465 | 0 | return; /* For Coverity */ |
466 | | |
467 | 0 | lws_dll2_remove(&pr->list); |
468 | | |
469 | | /* |
470 | | * We take the approach to adjust the refcount of every would-have-been |
471 | | * delivered message we were interested in |
472 | | */ |
473 | |
|
474 | 0 | while (pr->tail) { |
475 | |
|
476 | 0 | lws_smd_msg_t *m1 = lws_container_of(pr->tail->list.next, |
477 | 0 | lws_smd_msg_t, list); |
478 | |
|
479 | 0 | if (_lws_smd_msg_peer_interested_in_msg(pr, pr->tail)) { |
480 | 0 | if (!--pr->tail->refcount) |
481 | 0 | _lws_smd_msg_destroy(pr->ctx, smd, pr->tail); |
482 | 0 | } |
483 | |
|
484 | 0 | pr->tail = m1; |
485 | 0 | } |
486 | |
|
487 | 0 | lws_free(pr); |
488 | |
|
489 | 0 | lws_mutex_unlock(smd->lock_messages); /* messages ------- */ |
490 | 0 | } |
491 | | |
492 | | static lws_smd_msg_t * |
493 | | _lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr) |
494 | 0 | { |
495 | 0 | lws_dll2_t *tail = &pr->tail->list; |
496 | 0 | lws_smd_msg_t *msg; |
497 | |
|
498 | 0 | do { |
499 | 0 | tail = tail->next; |
500 | 0 | if (!tail) |
501 | 0 | return NULL; |
502 | | |
503 | 0 | msg = lws_container_of(tail, lws_smd_msg_t, list); |
504 | 0 | if (msg->exc != pr && |
505 | 0 | _lws_smd_msg_peer_interested_in_msg(pr, msg)) |
506 | 0 | return msg; |
507 | 0 | } while (1); |
508 | | |
509 | 0 | return NULL; |
510 | 0 | } |
511 | | |
512 | | /* |
513 | | * Delivers only one message to the peer and advances the tail, or sets to NULL |
514 | | * if no more filtered queued messages. Returns nonzero if tail non-NULL. |
515 | | * |
516 | | * For Proxied SS, only asks for writeable and does not advance or change the |
517 | | * tail. |
518 | | * |
519 | | * This is done so if multiple messages queued, we don't get a situation where |
520 | | * one participant gets them all spammed, then the next etc. Instead they are |
521 | | * delivered round-robin. |
522 | | * |
523 | | * Requires peer lock, may take message lock |
524 | | */ |
525 | | |
526 | | static int |
527 | | _lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr) |
528 | 0 | { |
529 | 0 | lws_smd_msg_t *msg; |
530 | |
|
531 | 0 | if (!pr->tail) |
532 | 0 | return 0; |
533 | | |
534 | 0 | msg = lws_container_of(pr->tail, lws_smd_msg_t, list); |
535 | |
|
536 | 0 | lwsl_cx_info(ctx, "deliver cl 0x%x, len %d, to peer %p", |
537 | 0 | (unsigned int)msg->_class, (int)msg->length, |
538 | 0 | pr); |
539 | |
|
540 | 0 | pr->cb(pr->opaque, msg->_class, msg->timestamp, |
541 | 0 | ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF, |
542 | 0 | (size_t)msg->length); |
543 | 0 | #if !defined(__COVERITY__) |
544 | 0 | assert(msg->refcount); |
545 | 0 | #endif |
546 | | /* |
547 | | * If there is one, move forward to the next queued |
548 | | * message that meets the filters of this peer |
549 | | */ |
550 | 0 | pr->tail = _lws_smd_msg_next_matching_filter(pr); |
551 | | |
552 | | /* tail message has to actually be of interest to the peer */ |
553 | 0 | assert(!pr->tail || (pr->tail->_class & pr->_class_filter)); |
554 | | |
555 | 0 | if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++ messages */ |
556 | 0 | return 1; /* For Coverity */ |
557 | | |
558 | 0 | if (!--msg->refcount) |
559 | 0 | _lws_smd_msg_destroy(ctx, &ctx->smd, msg); |
560 | 0 | lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */ |
561 | |
|
562 | 0 | return !!pr->tail; |
563 | 0 | } |
564 | | |
565 | | /* |
566 | | * Called when the event loop could deliver messages synchronously, eg, on |
567 | | * entry to idle |
568 | | */ |
569 | | |
570 | | int |
571 | | lws_smd_msg_distribute(struct lws_context *ctx) |
572 | 0 | { |
573 | 0 | char more; |
574 | | |
575 | | /* commonly, no messages and nothing to do... */ |
576 | |
|
577 | 0 | if (!ctx->smd.owner_messages.count) |
578 | 0 | return 0; |
579 | | |
580 | 0 | ctx->smd.delivering = 1; |
581 | |
|
582 | 0 | do { |
583 | 0 | more = 0; |
584 | 0 | if (lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */ |
585 | 0 | return 1; /* For Coverity */ |
586 | | |
587 | 0 | lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, |
588 | 0 | ctx->smd.owner_peers.head) { |
589 | 0 | lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); |
590 | |
|
591 | 0 | more = (char)(more | !!_lws_smd_msg_deliver_peer(ctx, pr)); |
592 | |
|
593 | 0 | } lws_end_foreach_dll_safe(p, p1); |
594 | |
|
595 | 0 | lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ |
596 | 0 | } while (more); |
597 | | |
598 | 0 | ctx->smd.delivering = 0; |
599 | |
|
600 | 0 | return 0; |
601 | 0 | } |
602 | | |
603 | | struct lws_smd_peer * |
604 | | lws_smd_register(struct lws_context *ctx, void *opaque, int flags, |
605 | | lws_smd_class_t _class_filter, lws_smd_notification_cb_t cb) |
606 | 0 | { |
607 | 0 | lws_smd_peer_t *pr = lws_zalloc(sizeof(*pr), __func__); |
608 | |
|
609 | 0 | if (!pr) |
610 | 0 | return NULL; |
611 | | |
612 | 0 | pr->cb = cb; |
613 | 0 | pr->opaque = opaque; |
614 | 0 | pr->_class_filter = _class_filter; |
615 | 0 | pr->ctx = ctx; |
616 | |
|
617 | 0 | if (!ctx->smd.delivering && |
618 | 0 | lws_mutex_lock(ctx->smd.lock_peers)) { /* +++++++++++++++ peers */ |
619 | 0 | lws_free(pr); |
620 | 0 | return NULL; /* For Coverity */ |
621 | 0 | } |
622 | | |
623 | | /* |
624 | | * Let's lock the message list before adding this peer... because... |
625 | | */ |
626 | | |
627 | 0 | if (lws_mutex_lock(ctx->smd.lock_messages)) { /* +++++++++ messages */ |
628 | 0 | lws_free(pr); |
629 | 0 | pr = NULL; |
630 | 0 | goto bail1; /* For Coverity */ |
631 | 0 | } |
632 | | |
633 | 0 | lws_dll2_add_tail(&pr->list, &ctx->smd.owner_peers); |
634 | | |
635 | | /* update the global class mask union to account for new peer mask */ |
636 | 0 | _lws_smd_class_mask_union(&ctx->smd); |
637 | | |
638 | | /* |
639 | | * Now there's a new peer added, any messages we have stashed will try |
640 | | * to deliver to this guy too, if he's interested in that class. So we |
641 | | * have to update the message refcounts for queued messages-he's- |
642 | | * interested-in accordingly. |
643 | | */ |
644 | |
|
645 | 0 | lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, |
646 | 0 | ctx->smd.owner_messages.head) { |
647 | 0 | lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); |
648 | |
|
649 | 0 | if (_lws_smd_msg_peer_interested_in_msg(pr, msg)) |
650 | 0 | msg->refcount++; |
651 | |
|
652 | 0 | } lws_end_foreach_dll_safe(p, p1); |
653 | | |
654 | | /* ... ok we are done adding the peer */ |
655 | |
|
656 | 0 | lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */ |
657 | |
|
658 | 0 | lwsl_cx_info(ctx, "peer %p (count %u) registered", pr, |
659 | 0 | (unsigned int)ctx->smd.owner_peers.count); |
660 | |
|
661 | 0 | bail1: |
662 | 0 | if (!ctx->smd.delivering) |
663 | 0 | lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ |
664 | |
|
665 | 0 | return pr; |
666 | 0 | } |
667 | | |
668 | | void |
669 | | lws_smd_unregister(struct lws_smd_peer *pr) |
670 | 0 | { |
671 | 0 | lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, owner_peers); |
672 | |
|
673 | 0 | if (!smd->delivering && |
674 | 0 | lws_mutex_lock(smd->lock_peers)) /* +++++++++++++++++++ peers */ |
675 | 0 | return; /* For Coverity */ |
676 | 0 | lwsl_cx_notice(pr->ctx, "destroying peer %p", pr); |
677 | 0 | _lws_smd_peer_destroy(pr); |
678 | 0 | if (!smd->delivering) |
679 | 0 | lws_mutex_unlock(smd->lock_peers); /* ----------------- peers */ |
680 | 0 | } |
681 | | |
682 | | int |
683 | | lws_smd_message_pending(struct lws_context *ctx) |
684 | 0 | { |
685 | 0 | int ret = 1; |
686 | | |
687 | | /* |
688 | | * First cheaply check the common case no messages pending, so there's |
689 | | * definitely nothing for this tsi or anything else |
690 | | */ |
691 | |
|
692 | 0 | if (!ctx->smd.owner_messages.count) |
693 | 0 | return 0; |
694 | | |
695 | | /* |
696 | | * If there are any messages, check their age and expire ones that |
697 | | * have been hanging around too long |
698 | | */ |
699 | | |
700 | 0 | if (lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++++++++++ peers */ |
701 | 0 | return 1; /* For Coverity */ |
702 | 0 | if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */ |
703 | 0 | goto bail; /* For Coverity */ |
704 | | |
705 | 0 | lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, |
706 | 0 | ctx->smd.owner_messages.head) { |
707 | 0 | lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); |
708 | |
|
709 | 0 | if ((lws_now_usecs() - msg->timestamp) > ctx->smd_ttl_us) { |
710 | 0 | lwsl_cx_warn(ctx, "timing out queued message %p", |
711 | 0 | msg); |
712 | | |
713 | | /* |
714 | | * We're forcibly yanking this guy, we can expect that |
715 | | * there might be peers that point to it as their tail. |
716 | | * |
717 | | * In that case, move their tails on to the next guy |
718 | | * they are interested in, if any. |
719 | | */ |
720 | |
|
721 | 0 | lws_start_foreach_dll_safe(struct lws_dll2 *, pp, pp1, |
722 | 0 | ctx->smd.owner_peers.head) { |
723 | 0 | lws_smd_peer_t *pr = lws_container_of(pp, |
724 | 0 | lws_smd_peer_t, list); |
725 | |
|
726 | 0 | if (pr->tail == msg) |
727 | 0 | pr->tail = _lws_smd_msg_next_matching_filter(pr); |
728 | |
|
729 | 0 | } lws_end_foreach_dll_safe(pp, pp1); |
730 | | |
731 | | /* |
732 | | * No peer should fall foul of the peer tail checks |
733 | | * when destroying the message now. |
734 | | */ |
735 | |
|
736 | 0 | _lws_smd_msg_destroy(ctx, &ctx->smd, msg); |
737 | 0 | } |
738 | 0 | } lws_end_foreach_dll_safe(p, p1); |
739 | |
|
740 | 0 | lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */ |
741 | | |
742 | | /* |
743 | | * Walk the peer list |
744 | | */ |
745 | |
|
746 | 0 | lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) { |
747 | 0 | lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); |
748 | |
|
749 | 0 | if (pr->tail) |
750 | 0 | goto bail; |
751 | |
|
752 | 0 | } lws_end_foreach_dll(p); |
753 | | |
754 | | /* |
755 | | * There's no message pending that we need to handle |
756 | | */ |
757 | |
|
758 | 0 | ret = 0; |
759 | |
|
760 | 0 | bail: |
761 | 0 | lws_mutex_unlock(ctx->smd.lock_peers); /* --------------------- peers */ |
762 | |
|
763 | 0 | return ret; |
764 | 0 | } |
765 | | |
766 | | int |
767 | | _lws_smd_destroy(struct lws_context *ctx) |
768 | 0 | { |
769 | | /* stop any message creation */ |
770 | |
|
771 | 0 | ctx->smd._class_filter = 0; |
772 | | |
773 | | /* |
774 | | * Walk the message list, destroying them |
775 | | */ |
776 | |
|
777 | 0 | lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, |
778 | 0 | ctx->smd.owner_messages.head) { |
779 | 0 | lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); |
780 | |
|
781 | 0 | lws_dll2_remove(&msg->list); |
782 | 0 | lws_free(msg); |
783 | |
|
784 | 0 | } lws_end_foreach_dll_safe(p, p1); |
785 | | |
786 | | /* |
787 | | * Walk the peer list, destroying them |
788 | | */ |
789 | |
|
790 | 0 | lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, |
791 | 0 | ctx->smd.owner_peers.head) { |
792 | 0 | lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); |
793 | |
|
794 | 0 | pr->tail = NULL; /* we just nuked all the messages, ignore */ |
795 | 0 | _lws_smd_peer_destroy(pr); |
796 | |
|
797 | 0 | } lws_end_foreach_dll_safe(p, p1); |
798 | |
|
799 | 0 | lws_mutex_destroy(ctx->smd.lock_messages); |
800 | 0 | lws_mutex_destroy(ctx->smd.lock_peers); |
801 | |
|
802 | 0 | return 0; |
803 | 0 | } |