/src/libwebsockets/lib/system/smd/smd.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * lws System Message Distribution |
3 | | * |
4 | | * Copyright (C) 2019 - 2025 Andy Green <andy@warmcat.com> |
5 | | * |
6 | | * Permission is hereby granted, free of charge, to any person obtaining a copy |
7 | | * of this software and associated documentation files (the "Software"), to |
8 | | * deal in the Software without restriction, including without limitation the |
9 | | * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or |
10 | | * sell copies of the Software, and to permit persons to whom the Software is |
11 | | * furnished to do so, subject to the following conditions: |
12 | | * |
13 | | * The above copyright notice and this permission notice shall be included in |
14 | | * all copies or substantial portions of the Software. |
15 | | * |
16 | | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
17 | | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
18 | | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
19 | | * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
20 | | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
21 | | * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
22 | | * IN THE SOFTWARE. |
23 | | */ |
24 | | |
25 | | #include "private-lib-core.h" |
26 | | #include <assert.h> |
27 | | |
28 | | /* comment me to remove extra debug and sanity checks */ |
29 | | // #define LWS_SMD_DEBUG |
30 | | |
31 | | |
32 | | #if defined(LWS_SMD_DEBUG) |
33 | | #define lwsl_smd lwsl_notice |
34 | | #else |
35 | | #define lwsl_smd(_s, ...) |
36 | | #endif |
37 | | |
38 | | void * |
39 | | lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len) |
40 | 0 | { |
41 | 0 | lws_smd_msg_t *msg; |
42 | | |
43 | | /* only allow it if someone wants to consume this class of event */ |
44 | |
|
45 | 0 | if (!(ctx->smd._class_filter & _class)) { |
46 | 0 | lwsl_cx_info(ctx, "rejecting class 0x%x as no participant wants", |
47 | 0 | (unsigned int)_class); |
48 | 0 | return NULL; |
49 | 0 | } |
50 | | |
51 | 0 | assert(len <= LWS_SMD_MAX_PAYLOAD); |
52 | | |
53 | | |
54 | | /* |
55 | | * If SS configured, over-allocate LWS_SMD_SS_RX_HEADER_LEN behind |
56 | | * payload, ie, msg_t (gap LWS_SMD_SS_RX_HEADER_LEN) payload |
57 | | */ |
58 | 0 | msg = lws_malloc(sizeof(*msg) + LWS_SMD_SS_RX_HEADER_LEN_EFF + len, |
59 | 0 | __func__); |
60 | 0 | if (!msg) |
61 | 0 | return NULL; |
62 | | |
63 | 0 | memset(msg, 0, sizeof(*msg)); |
64 | 0 | msg->timestamp = lws_now_usecs(); |
65 | 0 | msg->length = (uint16_t)len; |
66 | 0 | msg->_class = _class; |
67 | |
|
68 | 0 | return ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF; |
69 | 0 | } |
70 | | |
71 | | void |
72 | | lws_smd_msg_free(void **ppay) |
73 | 0 | { |
74 | 0 | lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)*ppay) - |
75 | 0 | LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg)); |
76 | | |
77 | | /* if SS configured, actual alloc is LWS_SMD_SS_RX_HEADER_LEN behind */ |
78 | 0 | lws_free(msg); |
79 | 0 | *ppay = NULL; |
80 | 0 | } |
81 | | |
82 | | #if defined(LWS_SMD_DEBUG) |
83 | | |
84 | | /* |
85 | | * Caller must have peers and messages locks |
86 | | */ |
87 | | |
88 | | static void |
89 | | _lws_smd_dump(lws_smd_t *smd) |
90 | | { |
91 | | int n = 1; |
92 | | |
93 | | lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, |
94 | | smd->owner_messages.head) { |
95 | | lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); |
96 | | |
97 | | lwsl_info(" msg %d: %p: ref %d, lat %dms, cls: 0x%x, len %u: '%s'\n", |
98 | | n++, msg, msg->refcount, |
99 | | (unsigned int)((lws_now_usecs() - msg->timestamp) / 1000), |
100 | | msg->length, msg->_class, |
101 | | (const char *)&msg[1] + LWS_SMD_SS_RX_HEADER_LEN_EFF); |
102 | | |
103 | | } lws_end_foreach_dll_safe(p, p1); |
104 | | |
105 | | n = 1; |
106 | | lws_start_foreach_dll(struct lws_dll2 *, p, smd->owner_peers.head) { |
107 | | lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); |
108 | | |
109 | | lwsl_info(" peer %d: %p: tail: %p, filt 0x%x\n", |
110 | | n++, pr, pr->tail, pr->_class_filter); |
111 | | } lws_end_foreach_dll(p); |
112 | | } |
113 | | #endif |
114 | | |
115 | | static int |
116 | | _lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t *pr, lws_smd_msg_t *msg) |
117 | 0 | { |
118 | 0 | return !!(msg->_class & pr->_class_filter); |
119 | 0 | } |
120 | | |
121 | | /* |
122 | | * Figure out what to set the initial refcount for the message to |
123 | | * |
124 | | * Caller must have peers and messages locks |
125 | | */ |
126 | | |
127 | | static int |
128 | | _lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg, |
129 | | struct lws_smd_peer *exc) |
130 | 0 | { |
131 | 0 | struct lws_context *ctx = lws_container_of(smd, struct lws_context, smd); |
132 | 0 | int interested = 0; |
133 | |
|
134 | 0 | lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) { |
135 | 0 | lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); |
136 | |
|
137 | 0 | if (pr != exc && _lws_smd_msg_peer_interested_in_msg(pr, msg)) |
138 | | /* |
139 | | * This peer wants to consume it |
140 | | */ |
141 | 0 | interested++; |
142 | |
|
143 | 0 | } lws_end_foreach_dll(p); |
144 | |
|
145 | 0 | return interested; |
146 | 0 | } |
147 | | |
148 | | static int |
149 | | _lws_smd_class_mask_union(lws_smd_t *smd) |
150 | 0 | { |
151 | 0 | uint32_t mask = 0; |
152 | |
|
153 | 0 | lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, |
154 | 0 | smd->owner_peers.head) { |
155 | 0 | lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); |
156 | |
|
157 | 0 | mask |= pr->_class_filter; |
158 | |
|
159 | 0 | } lws_end_foreach_dll_safe(p, p1); |
160 | |
|
161 | 0 | smd->_class_filter = mask; |
162 | |
|
163 | 0 | return 0; |
164 | 0 | } |
165 | | |
166 | | /* Call with message lock held */ |
167 | | |
168 | | static void |
169 | | _lws_smd_msg_destroy(struct lws_context *cx, lws_smd_t *smd, lws_smd_msg_t *msg) |
170 | 0 | { |
171 | | /* |
172 | | * We think we gave the message to everyone and can destroy it. |
173 | | * Sanity check that no peer holds a pointer to this guy |
174 | | */ |
175 | |
|
176 | 0 | lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, |
177 | 0 | smd->owner_peers.head) { |
178 | 0 | lws_smd_peer_t *xpr = lws_container_of(p, lws_smd_peer_t, list); |
179 | |
|
180 | 0 | if (xpr->tail == msg) { |
181 | 0 | lwsl_cx_err(cx, "peer %p has msg %p " |
182 | 0 | "we are about to destroy as tail", xpr, msg); |
183 | 0 | #if !defined(LWS_PLAT_FREERTOS) |
184 | 0 | assert(0); |
185 | 0 | #endif |
186 | 0 | } |
187 | |
|
188 | 0 | } lws_end_foreach_dll_safe(p, p1); |
189 | | |
190 | | /* |
191 | | * We have fully delivered the message now, it |
192 | | * can be unlinked and destroyed |
193 | | */ |
194 | 0 | lwsl_cx_info(cx, "destroy msg %p", msg); |
195 | 0 | lws_dll2_remove(&msg->list); |
196 | 0 | lws_free(msg); |
197 | 0 | } |
198 | | |
199 | | /* |
200 | | * This is wanting to be threadsafe, limiting the apis we can call |
201 | | */ |
202 | | |
203 | | int |
204 | | _lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc) |
205 | 0 | { |
206 | 0 | lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)pay) - |
207 | 0 | LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg)); |
208 | |
|
209 | 0 | if (ctx->smd.owner_messages.count >= ctx->smd_queue_depth) { |
210 | | // lwsl_cx_debug(ctx, "rejecting message on queue depth %d", |
211 | | // (int)ctx->smd.owner_messages.count); |
212 | | /* reject the message due to max queue depth reached */ |
213 | 0 | return 1; |
214 | 0 | } |
215 | | |
216 | | /* |
217 | | * In the case we received a message and in the callback for that, send |
218 | | * one, we end up here already holding lock_peers and will deadlock if |
219 | | * we try to take it again. Throughout the callback, ctx->smd.delivering |
220 | | * is set in that case so we can avoid it. |
221 | | */ |
222 | | |
223 | 0 | if ((!ctx->smd.delivering || !lws_thread_is(ctx->smd.tid_holding)) && |
224 | 0 | lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */ |
225 | 0 | return 1; /* For Coverity */ |
226 | | |
227 | 0 | if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */ |
228 | 0 | goto bail; |
229 | | |
230 | 0 | msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested( |
231 | 0 | &ctx->smd, msg, exc); |
232 | 0 | if (!msg->refcount) { |
233 | | /* possible, condsidering exc and no other participants */ |
234 | 0 | lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */ |
235 | |
|
236 | 0 | lws_free(msg); |
237 | 0 | if (!ctx->smd.delivering || !lws_thread_is(ctx->smd.tid_holding)) |
238 | 0 | lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ |
239 | |
|
240 | 0 | return 0; |
241 | 0 | } |
242 | | |
243 | 0 | msg->exc = exc; |
244 | | |
245 | | /* let's add him on the queue... */ |
246 | |
|
247 | 0 | lws_dll2_add_tail(&msg->list, &ctx->smd.owner_messages); |
248 | | |
249 | | /* |
250 | | * Any peer with no active tail needs to check our class to see if we |
251 | | * should become his tail |
252 | | */ |
253 | |
|
254 | 0 | lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) { |
255 | 0 | lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); |
256 | |
|
257 | 0 | if (pr != exc && |
258 | 0 | !pr->tail && _lws_smd_msg_peer_interested_in_msg(pr, msg)) { |
259 | 0 | pr->tail = msg; |
260 | | /* tail message has to actually be of interest to the peer */ |
261 | 0 | assert(!pr->tail || (pr->tail->_class & pr->_class_filter)); |
262 | 0 | } |
263 | |
|
264 | 0 | } lws_end_foreach_dll(p); |
265 | |
|
266 | | #if defined(LWS_SMD_DEBUG) |
267 | | lwsl_smd("%s: added %p (refc %u) depth now %d\n", __func__, |
268 | | msg, msg->refcount, ctx->smd.owner_messages.count); |
269 | | _lws_smd_dump(&ctx->smd); |
270 | | #endif |
271 | |
|
272 | 0 | lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */ |
273 | |
|
274 | 0 | bail: |
275 | 0 | if (!ctx->smd.delivering || !lws_thread_is(ctx->smd.tid_holding)) |
276 | 0 | lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ |
277 | | |
278 | | /* we may be happening from another thread context */ |
279 | 0 | lws_cancel_service(ctx); |
280 | |
|
281 | 0 | return 0; |
282 | 0 | } |
283 | | |
284 | | /* |
285 | | * This is wanting to be threadsafe, limiting the apis we can call |
286 | | */ |
287 | | |
288 | | int |
289 | | lws_smd_msg_send(struct lws_context *ctx, void *pay) |
290 | 0 | { |
291 | 0 | return _lws_smd_msg_send(ctx, pay, NULL); |
292 | 0 | } |
293 | | |
294 | | /* |
295 | | * This is wanting to be threadsafe, limiting the apis we can call |
296 | | */ |
297 | | |
298 | | int |
299 | | lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class, |
300 | | const char *format, ...) |
301 | 0 | { |
302 | 0 | lws_smd_msg_t *msg; |
303 | 0 | va_list ap; |
304 | 0 | void *p; |
305 | 0 | int n; |
306 | |
|
307 | 0 | if (!(ctx->smd._class_filter & _class)) |
308 | | /* |
309 | | * There's nobody interested in messages of this class atm. |
310 | | * Don't bother generating it, and act like all is well. |
311 | | */ |
312 | 0 | return 0; |
313 | | |
314 | 0 | va_start(ap, format); |
315 | 0 | n = vsnprintf(NULL, 0, format, ap); |
316 | 0 | va_end(ap); |
317 | 0 | if (n > LWS_SMD_MAX_PAYLOAD) |
318 | | /* too large to send */ |
319 | 0 | return 1; |
320 | | |
321 | 0 | p = lws_smd_msg_alloc(ctx, _class, (size_t)n + 2); |
322 | 0 | if (!p) |
323 | 0 | return 1; |
324 | 0 | msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF - |
325 | 0 | sizeof(*msg)); |
326 | 0 | msg->length = (uint16_t)n; |
327 | 0 | va_start(ap, format); |
328 | 0 | vsnprintf((char *)p, (unsigned int)n + 2, format, ap); |
329 | 0 | va_end(ap); |
330 | | |
331 | | /* |
332 | | * locks taken and released in here |
333 | | */ |
334 | |
|
335 | 0 | if (lws_smd_msg_send(ctx, p)) { |
336 | 0 | lws_smd_msg_free(&p); |
337 | 0 | return 1; |
338 | 0 | } |
339 | | |
340 | 0 | return 0; |
341 | 0 | } |
342 | | |
343 | | #if defined(LWS_WITH_SECURE_STREAMS) |
344 | | int |
345 | | lws_smd_ss_msg_printf(const char *tag, uint8_t *buf, size_t *len, |
346 | | lws_smd_class_t _class, const char *format, ...) |
347 | 0 | { |
348 | 0 | char *content = (char *)buf + LWS_SMD_SS_RX_HEADER_LEN; |
349 | 0 | va_list ap; |
350 | 0 | int n; |
351 | |
|
352 | 0 | if (*len < LWS_SMD_SS_RX_HEADER_LEN) |
353 | 0 | return 1; |
354 | | |
355 | 0 | lws_ser_wu64be(buf, _class); |
356 | 0 | lws_ser_wu64be(buf + 8, 0); /* valgrind notices uninitialized if left */ |
357 | |
|
358 | 0 | va_start(ap, format); |
359 | 0 | n = vsnprintf(content, (*len) - LWS_SMD_SS_RX_HEADER_LEN, format, ap); |
360 | 0 | va_end(ap); |
361 | |
|
362 | 0 | if (n > LWS_SMD_MAX_PAYLOAD || |
363 | 0 | (unsigned int)n > (*len) - LWS_SMD_SS_RX_HEADER_LEN) |
364 | | /* too large to send */ |
365 | 0 | return 1; |
366 | | |
367 | 0 | *len = LWS_SMD_SS_RX_HEADER_LEN + (unsigned int)n; |
368 | |
|
369 | 0 | lwsl_info("%s: %s send cl 0x%x, len %u\n", __func__, tag, (unsigned int)_class, |
370 | 0 | (unsigned int)n); |
371 | |
|
372 | 0 | return 0; |
373 | 0 | } |
374 | | |
375 | | /* |
376 | | * This is a helper that user rx handler for LWS_SMD_STREAMTYPENAME SS can |
377 | | * call through to with the payload it received from the proxy. It will then |
378 | | * forward the recieved SMD message to all local (same-context) participants |
379 | | * that are interested in that class (except ones with callback skip_cb, so |
380 | | * we don't loop). |
381 | | */ |
382 | | |
383 | | static int |
384 | | _lws_smd_ss_rx_forward(struct lws_context *ctx, const char *tag, |
385 | | struct lws_smd_peer *pr, const uint8_t *buf, size_t len) |
386 | 0 | { |
387 | 0 | lws_smd_class_t _class; |
388 | 0 | lws_smd_msg_t *msg; |
389 | 0 | void *p; |
390 | |
|
391 | 0 | if (len < LWS_SMD_SS_RX_HEADER_LEN_EFF) |
392 | 0 | return 1; |
393 | | |
394 | 0 | if (len >= LWS_SMD_MAX_PAYLOAD + LWS_SMD_SS_RX_HEADER_LEN_EFF) |
395 | 0 | return 1; |
396 | | |
397 | 0 | _class = (lws_smd_class_t)lws_ser_ru64be(buf); |
398 | | |
399 | | //if (_class == LWSSMDCL_METRICS) { |
400 | | |
401 | | //} |
402 | | |
403 | | /* only locally forward messages that we care about in this process */ |
404 | |
|
405 | 0 | if (!(ctx->smd._class_filter & _class)) |
406 | | /* |
407 | | * There's nobody interested in messages of this class atm. |
408 | | * Don't bother generating it, and act like all is well. |
409 | | */ |
410 | 0 | return 0; |
411 | | |
412 | 0 | p = lws_smd_msg_alloc(ctx, _class, len); |
413 | 0 | if (!p) |
414 | 0 | return 1; |
415 | | |
416 | 0 | msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF - |
417 | 0 | sizeof(*msg)); |
418 | 0 | msg->length = (uint16_t)(len - LWS_SMD_SS_RX_HEADER_LEN_EFF); |
419 | | /* adopt the original source timestamp, not time we forwarded it */ |
420 | 0 | msg->timestamp = (lws_usec_t)lws_ser_ru64be(buf + 8); |
421 | | |
422 | | /* copy the message payload in */ |
423 | 0 | memcpy(p, buf + LWS_SMD_SS_RX_HEADER_LEN_EFF, msg->length); |
424 | | |
425 | | /* |
426 | | * locks taken and released in here |
427 | | */ |
428 | |
|
429 | 0 | if (_lws_smd_msg_send(ctx, p, pr)) { |
430 | | /* we couldn't send it after all that... */ |
431 | 0 | lws_smd_msg_free(&p); |
432 | |
|
433 | 0 | return 1; |
434 | 0 | } |
435 | | |
436 | 0 | lwsl_info("%s: %s send cl 0x%x, len %u, ts %llu\n", __func__, |
437 | 0 | tag, (unsigned int)_class, msg->length, |
438 | 0 | (unsigned long long)msg->timestamp); |
439 | |
|
440 | 0 | return 0; |
441 | 0 | } |
442 | | |
443 | | int |
444 | | lws_smd_ss_rx_forward(void *ss_user, const uint8_t *buf, size_t len) |
445 | 0 | { |
446 | 0 | struct lws_ss_handle *h = (struct lws_ss_handle *) |
447 | 0 | (((char *)ss_user) - sizeof(*h)); |
448 | 0 | struct lws_context *ctx = lws_ss_get_context(h); |
449 | |
|
450 | 0 | return _lws_smd_ss_rx_forward(ctx, lws_ss_tag(h), h->u.smd.smd_peer, buf, len); |
451 | 0 | } |
452 | | |
453 | | #if defined(LWS_WITH_SECURE_STREAMS_PROXY_API) |
454 | | int |
455 | | lws_smd_sspc_rx_forward(void *ss_user, const uint8_t *buf, size_t len) |
456 | | { |
457 | | struct lws_sspc_handle *h = (struct lws_sspc_handle *) |
458 | | (((char *)ss_user) - sizeof(*h)); |
459 | | struct lws_context *ctx = lws_sspc_get_context(h); |
460 | | |
461 | | return _lws_smd_ss_rx_forward(ctx, lws_sspc_tag(h), NULL, buf, len); |
462 | | } |
463 | | #endif |
464 | | |
465 | | #endif |
466 | | |
467 | | /* |
468 | | * Peers that deregister need to adjust the refcount of messages they would |
469 | | * have been interested in, but didn't take delivery of yet |
470 | | */ |
471 | | |
472 | | static void |
473 | | _lws_smd_peer_destroy(lws_smd_peer_t *pr) |
474 | 0 | { |
475 | 0 | lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, |
476 | 0 | owner_peers); |
477 | |
|
478 | 0 | if (lws_mutex_lock(smd->lock_messages)) /* +++++++++ messages */ |
479 | 0 | return; /* For Coverity */ |
480 | | |
481 | 0 | lws_dll2_remove(&pr->list); |
482 | | |
483 | | /* |
484 | | * We take the approach to adjust the refcount of every would-have-been |
485 | | * delivered message we were interested in |
486 | | */ |
487 | |
|
488 | 0 | while (pr->tail) { |
489 | |
|
490 | 0 | lws_smd_msg_t *m1 = lws_container_of(pr->tail->list.next, |
491 | 0 | lws_smd_msg_t, list); |
492 | |
|
493 | 0 | if (_lws_smd_msg_peer_interested_in_msg(pr, pr->tail)) { |
494 | 0 | if (!--pr->tail->refcount) |
495 | 0 | _lws_smd_msg_destroy(pr->ctx, smd, pr->tail); |
496 | 0 | } |
497 | |
|
498 | 0 | pr->tail = m1; |
499 | 0 | } |
500 | |
|
501 | 0 | lws_free(pr); |
502 | |
|
503 | 0 | lws_mutex_unlock(smd->lock_messages); /* messages ------- */ |
504 | 0 | } |
505 | | |
506 | | static lws_smd_msg_t * |
507 | | _lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr) |
508 | 0 | { |
509 | 0 | lws_dll2_t *tail = &pr->tail->list; |
510 | 0 | lws_smd_msg_t *msg; |
511 | |
|
512 | 0 | do { |
513 | 0 | tail = tail->next; |
514 | 0 | if (!tail) |
515 | 0 | return NULL; |
516 | | |
517 | 0 | msg = lws_container_of(tail, lws_smd_msg_t, list); |
518 | 0 | if (msg->exc != pr && |
519 | 0 | _lws_smd_msg_peer_interested_in_msg(pr, msg)) |
520 | 0 | return msg; |
521 | 0 | } while (1); |
522 | | |
523 | 0 | return NULL; |
524 | 0 | } |
525 | | |
526 | | /* |
527 | | * Delivers only one message to the peer and advances the tail, or sets to NULL |
528 | | * if no more filtered queued messages. Returns nonzero if tail non-NULL. |
529 | | * |
530 | | * For Proxied SS, only asks for writeable and does not advance or change the |
531 | | * tail. |
532 | | * |
533 | | * This is done so if multiple messages queued, we don't get a situation where |
534 | | * one participant gets them all spammed, then the next etc. Instead they are |
535 | | * delivered round-robin. |
536 | | * |
537 | | * Requires peer lock, may take message lock |
538 | | */ |
539 | | |
540 | | static int |
541 | | _lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr) |
542 | 0 | { |
543 | 0 | lws_smd_msg_t *msg; |
544 | |
|
545 | 0 | if (!pr->tail) |
546 | 0 | return 0; |
547 | | |
548 | 0 | msg = lws_container_of(pr->tail, lws_smd_msg_t, list); |
549 | |
|
550 | 0 | lwsl_cx_info(ctx, "deliver cl 0x%x, len %d, to peer %p", |
551 | 0 | (unsigned int)msg->_class, (int)msg->length, |
552 | 0 | pr); |
553 | | |
554 | | /* |
555 | | * We call the peer's callback to deliver the message. |
556 | | * We hold the peer lock for the duration. |
557 | | * That's tricky because if, in the callback, he uses smd |
558 | | * apis to send, we will deadlock if we try to grab the |
559 | | * peer lock as usual in there. |
560 | | * |
561 | | * Another way to express this is that for this thread |
562 | | * (only) we know we already hold the peer lock. |
563 | | */ |
564 | |
|
565 | 0 | ctx->smd.tid_holding = lws_thread_id(); |
566 | 0 | ctx->smd.delivering = 1; |
567 | 0 | pr->cb(pr->opaque, msg->_class, msg->timestamp, |
568 | 0 | ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF, |
569 | 0 | (size_t)msg->length); |
570 | 0 | ctx->smd.delivering = 0; |
571 | 0 | #if !defined(__COVERITY__) |
572 | 0 | assert(msg->refcount); |
573 | 0 | #endif |
574 | | /* |
575 | | * If there is one, move forward to the next queued |
576 | | * message that meets the filters of this peer |
577 | | */ |
578 | 0 | pr->tail = _lws_smd_msg_next_matching_filter(pr); |
579 | | |
580 | | /* tail message has to actually be of interest to the peer */ |
581 | 0 | assert(!pr->tail || (pr->tail->_class & pr->_class_filter)); |
582 | | |
583 | 0 | if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++ messages */ |
584 | 0 | return 1; /* For Coverity */ |
585 | | |
586 | 0 | if (!--msg->refcount) |
587 | 0 | _lws_smd_msg_destroy(ctx, &ctx->smd, msg); |
588 | 0 | lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */ |
589 | |
|
590 | 0 | return !!pr->tail; |
591 | 0 | } |
592 | | |
593 | | /* |
594 | | * Called when the event loop could deliver messages synchronously, eg, on |
595 | | * entry to idle |
596 | | */ |
597 | | |
598 | | int |
599 | | lws_smd_msg_distribute(struct lws_context *ctx) |
600 | 0 | { |
601 | 0 | char more; |
602 | | |
603 | | /* commonly, no messages and nothing to do... */ |
604 | |
|
605 | 0 | if (!ctx->smd.owner_messages.count) |
606 | 0 | return 0; |
607 | | |
608 | | |
609 | 0 | do { |
610 | 0 | more = 0; |
611 | 0 | if (lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */ |
612 | 0 | return 1; /* For Coverity */ |
613 | | |
614 | 0 | lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, |
615 | 0 | ctx->smd.owner_peers.head) { |
616 | 0 | lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); |
617 | |
|
618 | 0 | more = (char)(more | !!_lws_smd_msg_deliver_peer(ctx, pr)); |
619 | |
|
620 | 0 | } lws_end_foreach_dll_safe(p, p1); |
621 | |
|
622 | 0 | lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ |
623 | 0 | } while (more); |
624 | | |
625 | | |
626 | 0 | return 0; |
627 | 0 | } |
628 | | |
629 | | struct lws_smd_peer * |
630 | | lws_smd_register(struct lws_context *ctx, void *opaque, int flags, |
631 | | lws_smd_class_t _class_filter, lws_smd_notification_cb_t cb) |
632 | 0 | { |
633 | 0 | lws_smd_peer_t *pr = lws_zalloc(sizeof(*pr), __func__); |
634 | |
|
635 | 0 | if (!pr) |
636 | 0 | return NULL; |
637 | | |
638 | 0 | pr->cb = cb; |
639 | 0 | pr->opaque = opaque; |
640 | 0 | pr->_class_filter = _class_filter; |
641 | 0 | pr->ctx = ctx; |
642 | |
|
643 | 0 | if ((!ctx->smd.delivering || !lws_thread_is(ctx->smd.tid_holding)) && |
644 | 0 | lws_mutex_lock(ctx->smd.lock_peers)) { /* +++++++++++++++ peers */ |
645 | 0 | lws_free(pr); |
646 | 0 | return NULL; /* For Coverity */ |
647 | 0 | } |
648 | | |
649 | | /* |
650 | | * Let's lock the message list before adding this peer... because... |
651 | | */ |
652 | | |
653 | 0 | if (lws_mutex_lock(ctx->smd.lock_messages)) { /* +++++++++ messages */ |
654 | 0 | lws_free(pr); |
655 | 0 | pr = NULL; |
656 | 0 | goto bail1; /* For Coverity */ |
657 | 0 | } |
658 | | |
659 | 0 | lws_dll2_add_tail(&pr->list, &ctx->smd.owner_peers); |
660 | | |
661 | | /* update the global class mask union to account for new peer mask */ |
662 | 0 | _lws_smd_class_mask_union(&ctx->smd); |
663 | | |
664 | | /* |
665 | | * Now there's a new peer added, any messages we have stashed will try |
666 | | * to deliver to this guy too, if he's interested in that class. So we |
667 | | * have to update the message refcounts for queued messages-he's- |
668 | | * interested-in accordingly. |
669 | | */ |
670 | |
|
671 | 0 | lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, |
672 | 0 | ctx->smd.owner_messages.head) { |
673 | 0 | lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); |
674 | |
|
675 | 0 | if (_lws_smd_msg_peer_interested_in_msg(pr, msg)) |
676 | 0 | msg->refcount++; |
677 | |
|
678 | 0 | } lws_end_foreach_dll_safe(p, p1); |
679 | | |
680 | | /* ... ok we are done adding the peer */ |
681 | |
|
682 | 0 | lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */ |
683 | |
|
684 | 0 | lwsl_cx_info(ctx, "peer %p (count %u) registered", pr, |
685 | 0 | (unsigned int)ctx->smd.owner_peers.count); |
686 | |
|
687 | 0 | bail1: |
688 | 0 | if (!ctx->smd.delivering || !lws_thread_is(ctx->smd.tid_holding)) |
689 | 0 | lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */ |
690 | |
|
691 | 0 | return pr; |
692 | 0 | } |
693 | | |
694 | | void |
695 | | lws_smd_unregister(struct lws_smd_peer *pr) |
696 | 0 | { |
697 | 0 | lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, owner_peers); |
698 | |
|
699 | 0 | if ((!smd->delivering || !lws_thread_is(smd->tid_holding)) && |
700 | 0 | lws_mutex_lock(smd->lock_peers)) /* +++++++++++++++++++ peers */ |
701 | 0 | return; /* For Coverity */ |
702 | 0 | lwsl_cx_notice(pr->ctx, "destroying peer %p", pr); |
703 | 0 | _lws_smd_peer_destroy(pr); |
704 | |
|
705 | 0 | if (!smd->delivering || !lws_thread_is(smd->tid_holding)) |
706 | 0 | lws_mutex_unlock(smd->lock_peers); /* ----------------- peers */ |
707 | 0 | } |
708 | | |
709 | | int |
710 | | lws_smd_message_pending(struct lws_context *ctx) |
711 | 0 | { |
712 | 0 | int ret = 1; |
713 | | |
714 | | /* |
715 | | * First cheaply check the common case no messages pending, so there's |
716 | | * definitely nothing for this tsi or anything else |
717 | | */ |
718 | |
|
719 | 0 | if (!ctx->smd.owner_messages.count) |
720 | 0 | return 0; |
721 | | |
722 | | /* |
723 | | * If there are any messages, check their age and expire ones that |
724 | | * have been hanging around too long |
725 | | */ |
726 | | |
727 | 0 | if ((!ctx->smd.delivering || !lws_thread_is(ctx->smd.tid_holding)) && |
728 | 0 | lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++++++++++ peers */ |
729 | 0 | return 1; /* For Coverity */ |
730 | 0 | if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */ |
731 | 0 | goto bail; /* For Coverity */ |
732 | | |
733 | 0 | lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, |
734 | 0 | ctx->smd.owner_messages.head) { |
735 | 0 | lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); |
736 | |
|
737 | 0 | if ((lws_now_usecs() - msg->timestamp) > ctx->smd_ttl_us) { |
738 | 0 | lwsl_cx_warn(ctx, "timing out queued message %p", |
739 | 0 | msg); |
740 | | |
741 | | /* |
742 | | * We're forcibly yanking this guy, we can expect that |
743 | | * there might be peers that point to it as their tail. |
744 | | * |
745 | | * In that case, move their tails on to the next guy |
746 | | * they are interested in, if any. |
747 | | */ |
748 | |
|
749 | 0 | lws_start_foreach_dll_safe(struct lws_dll2 *, pp, pp1, |
750 | 0 | ctx->smd.owner_peers.head) { |
751 | 0 | lws_smd_peer_t *pr = lws_container_of(pp, |
752 | 0 | lws_smd_peer_t, list); |
753 | |
|
754 | 0 | if (pr->tail == msg) |
755 | 0 | pr->tail = _lws_smd_msg_next_matching_filter(pr); |
756 | |
|
757 | 0 | } lws_end_foreach_dll_safe(pp, pp1); |
758 | | |
759 | | /* |
760 | | * No peer should fall foul of the peer tail checks |
761 | | * when destroying the message now. |
762 | | */ |
763 | |
|
764 | 0 | _lws_smd_msg_destroy(ctx, &ctx->smd, msg); |
765 | 0 | } |
766 | 0 | } lws_end_foreach_dll_safe(p, p1); |
767 | |
|
768 | 0 | lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */ |
769 | | |
770 | | /* |
771 | | * Walk the peer list |
772 | | */ |
773 | |
|
774 | 0 | lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) { |
775 | 0 | lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); |
776 | |
|
777 | 0 | if (pr->tail) |
778 | 0 | goto bail; |
779 | |
|
780 | 0 | } lws_end_foreach_dll(p); |
781 | | |
782 | | /* |
783 | | * There's no message pending that we need to handle |
784 | | */ |
785 | |
|
786 | 0 | ret = 0; |
787 | |
|
788 | 0 | bail: |
789 | 0 | if (!ctx->smd.delivering || !lws_thread_is(ctx->smd.tid_holding)) |
790 | 0 | lws_mutex_unlock(ctx->smd.lock_peers); /* --------------------- peers */ |
791 | |
|
792 | 0 | return ret; |
793 | 0 | } |
794 | | |
795 | | int |
796 | | _lws_smd_destroy(struct lws_context *ctx) |
797 | 0 | { |
798 | | /* stop any message creation */ |
799 | |
|
800 | 0 | ctx->smd._class_filter = 0; |
801 | | |
802 | | /* |
803 | | * Walk the message list, destroying them |
804 | | */ |
805 | |
|
806 | 0 | lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, |
807 | 0 | ctx->smd.owner_messages.head) { |
808 | 0 | lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list); |
809 | |
|
810 | 0 | lws_dll2_remove(&msg->list); |
811 | 0 | lws_free(msg); |
812 | |
|
813 | 0 | } lws_end_foreach_dll_safe(p, p1); |
814 | | |
815 | | /* |
816 | | * Walk the peer list, destroying them |
817 | | */ |
818 | |
|
819 | 0 | lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1, |
820 | 0 | ctx->smd.owner_peers.head) { |
821 | 0 | lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list); |
822 | |
|
823 | 0 | pr->tail = NULL; /* we just nuked all the messages, ignore */ |
824 | 0 | _lws_smd_peer_destroy(pr); |
825 | |
|
826 | 0 | } lws_end_foreach_dll_safe(p, p1); |
827 | |
|
828 | 0 | lws_mutex_destroy(ctx->smd.lock_messages); |
829 | 0 | lws_mutex_destroy(ctx->smd.lock_peers); |
830 | |
|
831 | 0 | return 0; |
832 | 0 | } |