Coverage Report

Created: 2025-08-26 06:34

/src/libwebsockets/lib/system/smd/smd.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * lws System Message Distribution
3
 *
4
 * Copyright (C) 2019 - 2025 Andy Green <andy@warmcat.com>
5
 *
6
 * Permission is hereby granted, free of charge, to any person obtaining a copy
7
 * of this software and associated documentation files (the "Software"), to
8
 * deal in the Software without restriction, including without limitation the
9
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10
 * sell copies of the Software, and to permit persons to whom the Software is
11
 * furnished to do so, subject to the following conditions:
12
 *
13
 * The above copyright notice and this permission notice shall be included in
14
 * all copies or substantial portions of the Software.
15
 *
16
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22
 * IN THE SOFTWARE.
23
 */
24
25
#include "private-lib-core.h"
26
#include <assert.h>
27
28
/* comment me to remove extra debug and sanity checks */
29
// #define LWS_SMD_DEBUG
30
31
32
#if defined(LWS_SMD_DEBUG)
33
#define lwsl_smd lwsl_notice
34
#else
35
#define lwsl_smd(_s, ...)
36
#endif
37
38
void *
39
lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len)
40
0
{
41
0
  lws_smd_msg_t *msg;
42
43
  /* only allow it if someone wants to consume this class of event */
44
45
0
  if (!(ctx->smd._class_filter & _class)) {
46
0
    lwsl_cx_info(ctx, "rejecting class 0x%x as no participant wants",
47
0
        (unsigned int)_class);
48
0
    return NULL;
49
0
  }
50
51
0
  assert(len <= LWS_SMD_MAX_PAYLOAD);
52
53
54
  /*
55
   * If SS configured, over-allocate LWS_SMD_SS_RX_HEADER_LEN behind
56
   * payload, ie,  msg_t (gap LWS_SMD_SS_RX_HEADER_LEN) payload
57
   */
58
0
  msg = lws_malloc(sizeof(*msg) + LWS_SMD_SS_RX_HEADER_LEN_EFF + len,
59
0
       __func__);
60
0
  if (!msg)
61
0
    return NULL;
62
63
0
  memset(msg, 0, sizeof(*msg));
64
0
  msg->timestamp = lws_now_usecs();
65
0
  msg->length = (uint16_t)len;
66
0
  msg->_class = _class;
67
68
0
  return ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF;
69
0
}
70
71
void
72
lws_smd_msg_free(void **ppay)
73
0
{
74
0
  lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)*ppay) -
75
0
        LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
76
77
  /* if SS configured, actual alloc is LWS_SMD_SS_RX_HEADER_LEN behind */
78
0
  lws_free(msg);
79
0
  *ppay = NULL;
80
0
}
81
82
#if defined(LWS_SMD_DEBUG)
83
84
/*
85
 * Caller must have peers and messages locks
86
 */
87
  
88
static void
89
_lws_smd_dump(lws_smd_t *smd)
90
{
91
  int n = 1;
92
93
  lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
94
           smd->owner_messages.head) {
95
    lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
96
97
    lwsl_info(" msg %d: %p: ref %d, lat %dms, cls: 0x%x, len %u: '%s'\n",
98
          n++, msg, msg->refcount,
99
          (unsigned int)((lws_now_usecs() - msg->timestamp) / 1000),
100
          msg->length, msg->_class,
101
          (const char *)&msg[1] + LWS_SMD_SS_RX_HEADER_LEN_EFF);
102
103
  } lws_end_foreach_dll_safe(p, p1);
104
105
  n = 1;
106
  lws_start_foreach_dll(struct lws_dll2 *, p, smd->owner_peers.head) {
107
    lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
108
109
    lwsl_info(" peer %d: %p: tail: %p, filt 0x%x\n",
110
          n++, pr, pr->tail, pr->_class_filter);
111
  } lws_end_foreach_dll(p);
112
}
113
#endif
114
115
static int
116
_lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t *pr, lws_smd_msg_t *msg)
117
0
{
118
0
    return !!(msg->_class & pr->_class_filter);
119
0
}
120
121
/*
122
 * Figure out what to set the initial refcount for the message to
123
 *
124
 * Caller must have peers and messages locks
125
 */
126
127
static int
128
_lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg,
129
             struct lws_smd_peer *exc)
130
0
{
131
0
  struct lws_context *ctx = lws_container_of(smd, struct lws_context, smd);
132
0
  int interested = 0;
133
134
0
  lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
135
0
    lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
136
137
0
    if (pr != exc && _lws_smd_msg_peer_interested_in_msg(pr, msg))
138
      /*
139
       * This peer wants to consume it
140
       */
141
0
      interested++;
142
143
0
  } lws_end_foreach_dll(p);
144
145
0
  return interested;
146
0
}
147
148
static int
149
_lws_smd_class_mask_union(lws_smd_t *smd)
150
0
{
151
0
  uint32_t mask = 0;
152
153
0
  lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
154
0
           smd->owner_peers.head) {
155
0
    lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
156
157
0
    mask |= pr->_class_filter;
158
159
0
  } lws_end_foreach_dll_safe(p, p1);
160
161
0
  smd->_class_filter = mask;
162
163
0
  return 0;
164
0
}
165
166
/* Call with message lock held */
167
168
static void
169
_lws_smd_msg_destroy(struct lws_context *cx, lws_smd_t *smd, lws_smd_msg_t *msg)
170
0
{
171
  /*
172
   * We think we gave the message to everyone and can destroy it.
173
   * Sanity check that no peer holds a pointer to this guy
174
   */
175
176
0
  lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
177
0
           smd->owner_peers.head) {
178
0
    lws_smd_peer_t *xpr = lws_container_of(p, lws_smd_peer_t, list);
179
180
0
    if (xpr->tail == msg) {
181
0
      lwsl_cx_err(cx, "peer %p has msg %p "
182
0
         "we are about to destroy as tail", xpr, msg);
183
0
#if !defined(LWS_PLAT_FREERTOS)
184
0
      assert(0);
185
0
#endif
186
0
    }
187
188
0
  } lws_end_foreach_dll_safe(p, p1);
189
190
  /*
191
   * We have fully delivered the message now, it
192
   * can be unlinked and destroyed
193
   */
194
0
  lwsl_cx_info(cx, "destroy msg %p", msg);
195
0
  lws_dll2_remove(&msg->list);
196
0
  lws_free(msg);
197
0
}
198
199
/*
200
 * This is wanting to be threadsafe, limiting the apis we can call
201
 */
202
203
int
204
_lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc)
205
0
{
206
0
  lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)pay) -
207
0
        LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
208
209
0
  if (ctx->smd.owner_messages.count >= ctx->smd_queue_depth) {
210
    // lwsl_cx_debug(ctx, "rejecting message on queue depth %d",
211
    //      (int)ctx->smd.owner_messages.count);
212
    /* reject the message due to max queue depth reached */
213
0
    return 1;
214
0
  }
215
216
  /*
217
   * In the case we received a message and in the callback for that, send
218
   * one, we end up here already holding lock_peers and will deadlock if
219
   * we try to take it again.  Throughout the callback, ctx->smd.delivering
220
   * is set in that case so we can avoid it.
221
   */
222
223
0
  if ((!ctx->smd.delivering || !lws_thread_is(ctx->smd.tid_holding)) &&
224
0
      lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */
225
0
    return 1; /* For Coverity */
226
227
0
  if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */
228
0
    goto bail;
229
230
0
  msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested(
231
0
              &ctx->smd, msg, exc);
232
0
  if (!msg->refcount) {
233
    /* possible, condsidering exc and no other participants */
234
0
    lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
235
236
0
    lws_free(msg);
237
0
    if (!ctx->smd.delivering || !lws_thread_is(ctx->smd.tid_holding))
238
0
      lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
239
240
0
    return 0;
241
0
  }
242
243
0
  msg->exc = exc;
244
245
  /* let's add him on the queue... */
246
247
0
  lws_dll2_add_tail(&msg->list, &ctx->smd.owner_messages);
248
249
  /*
250
   * Any peer with no active tail needs to check our class to see if we
251
   * should become his tail
252
   */
253
254
0
  lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
255
0
    lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
256
257
0
    if (pr != exc &&
258
0
                   !pr->tail && _lws_smd_msg_peer_interested_in_msg(pr, msg)) {
259
0
      pr->tail = msg;
260
      /* tail message has to actually be of interest to the peer */
261
0
      assert(!pr->tail || (pr->tail->_class & pr->_class_filter));
262
0
    }
263
264
0
  } lws_end_foreach_dll(p);
265
266
#if defined(LWS_SMD_DEBUG)
267
  lwsl_smd("%s: added %p (refc %u) depth now %d\n", __func__,
268
     msg, msg->refcount, ctx->smd.owner_messages.count);
269
  _lws_smd_dump(&ctx->smd);
270
#endif
271
272
0
  lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
273
274
0
bail:
275
0
  if (!ctx->smd.delivering || !lws_thread_is(ctx->smd.tid_holding))
276
0
    lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
277
278
  /* we may be happening from another thread context */
279
0
  lws_cancel_service(ctx);
280
281
0
  return 0;
282
0
}
283
284
/*
285
 * This is wanting to be threadsafe, limiting the apis we can call
286
 */
287
288
int
289
lws_smd_msg_send(struct lws_context *ctx, void *pay)
290
0
{
291
0
  return _lws_smd_msg_send(ctx, pay, NULL);
292
0
}
293
294
/*
295
 * This is wanting to be threadsafe, limiting the apis we can call
296
 */
297
298
int
299
lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class,
300
       const char *format, ...)
301
0
{
302
0
  lws_smd_msg_t *msg;
303
0
  va_list ap;
304
0
  void *p;
305
0
  int n;
306
307
0
  if (!(ctx->smd._class_filter & _class))
308
    /*
309
     * There's nobody interested in messages of this class atm.
310
     * Don't bother generating it, and act like all is well.
311
     */
312
0
    return 0;
313
314
0
  va_start(ap, format);
315
0
  n = vsnprintf(NULL, 0, format, ap);
316
0
  va_end(ap);
317
0
  if (n > LWS_SMD_MAX_PAYLOAD)
318
    /* too large to send */
319
0
    return 1;
320
321
0
  p = lws_smd_msg_alloc(ctx, _class, (size_t)n + 2);
322
0
  if (!p)
323
0
    return 1;
324
0
  msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF -
325
0
                sizeof(*msg));
326
0
  msg->length = (uint16_t)n;
327
0
  va_start(ap, format);
328
0
  vsnprintf((char *)p, (unsigned int)n + 2, format, ap);
329
0
  va_end(ap);
330
331
  /*
332
   * locks taken and released in here
333
   */
334
335
0
  if (lws_smd_msg_send(ctx, p)) {
336
0
    lws_smd_msg_free(&p);
337
0
    return 1;
338
0
  }
339
340
0
  return 0;
341
0
}
342
343
#if defined(LWS_WITH_SECURE_STREAMS)
344
int
345
lws_smd_ss_msg_printf(const char *tag, uint8_t *buf, size_t *len,
346
          lws_smd_class_t _class, const char *format, ...)
347
0
{
348
0
  char *content = (char *)buf + LWS_SMD_SS_RX_HEADER_LEN;
349
0
  va_list ap;
350
0
  int n;
351
352
0
  if (*len < LWS_SMD_SS_RX_HEADER_LEN)
353
0
    return 1;
354
355
0
  lws_ser_wu64be(buf, _class);
356
0
  lws_ser_wu64be(buf + 8, 0); /* valgrind notices uninitialized if left */
357
358
0
  va_start(ap, format);
359
0
  n = vsnprintf(content, (*len) - LWS_SMD_SS_RX_HEADER_LEN, format, ap);
360
0
  va_end(ap);
361
362
0
  if (n > LWS_SMD_MAX_PAYLOAD ||
363
0
      (unsigned int)n > (*len) - LWS_SMD_SS_RX_HEADER_LEN)
364
    /* too large to send */
365
0
    return 1;
366
367
0
  *len = LWS_SMD_SS_RX_HEADER_LEN + (unsigned int)n;
368
369
0
  lwsl_info("%s: %s send cl 0x%x, len %u\n", __func__, tag, (unsigned int)_class,
370
0
      (unsigned int)n);
371
372
0
  return 0;
373
0
}
374
375
/*
376
 * This is a helper that user rx handler for LWS_SMD_STREAMTYPENAME SS can
377
 * call through to with the payload it received from the proxy.  It will then
378
 * forward the recieved SMD message to all local (same-context) participants
379
 * that are interested in that class (except ones with callback skip_cb, so
380
 * we don't loop).
381
 */
382
383
static int
384
_lws_smd_ss_rx_forward(struct lws_context *ctx, const char *tag,
385
           struct lws_smd_peer *pr, const uint8_t *buf, size_t len)
386
0
{
387
0
  lws_smd_class_t _class;
388
0
  lws_smd_msg_t *msg;
389
0
  void *p;
390
391
0
  if (len < LWS_SMD_SS_RX_HEADER_LEN_EFF)
392
0
    return 1;
393
394
0
  if (len >= LWS_SMD_MAX_PAYLOAD + LWS_SMD_SS_RX_HEADER_LEN_EFF)
395
0
    return 1;
396
397
0
  _class = (lws_smd_class_t)lws_ser_ru64be(buf);
398
399
  //if (_class == LWSSMDCL_METRICS) {
400
401
  //}
402
403
  /* only locally forward messages that we care about in this process */
404
405
0
  if (!(ctx->smd._class_filter & _class))
406
    /*
407
     * There's nobody interested in messages of this class atm.
408
     * Don't bother generating it, and act like all is well.
409
     */
410
0
    return 0;
411
412
0
  p = lws_smd_msg_alloc(ctx, _class, len);
413
0
  if (!p)
414
0
    return 1;
415
416
0
  msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF -
417
0
                sizeof(*msg));
418
0
  msg->length = (uint16_t)(len - LWS_SMD_SS_RX_HEADER_LEN_EFF);
419
  /* adopt the original source timestamp, not time we forwarded it */
420
0
  msg->timestamp = (lws_usec_t)lws_ser_ru64be(buf + 8);
421
422
  /* copy the message payload in */
423
0
  memcpy(p, buf + LWS_SMD_SS_RX_HEADER_LEN_EFF, msg->length);
424
425
  /*
426
   * locks taken and released in here
427
   */
428
429
0
  if (_lws_smd_msg_send(ctx, p, pr)) {
430
    /* we couldn't send it after all that... */
431
0
    lws_smd_msg_free(&p);
432
433
0
    return 1;
434
0
  }
435
436
0
  lwsl_info("%s: %s send cl 0x%x, len %u, ts %llu\n", __func__,
437
0
        tag, (unsigned int)_class, msg->length,
438
0
        (unsigned long long)msg->timestamp);
439
440
0
  return 0;
441
0
}
442
443
int
444
lws_smd_ss_rx_forward(void *ss_user, const uint8_t *buf, size_t len)
445
0
{
446
0
  struct lws_ss_handle *h = (struct lws_ss_handle *)
447
0
          (((char *)ss_user) - sizeof(*h));
448
0
  struct lws_context *ctx = lws_ss_get_context(h);
449
450
0
  return _lws_smd_ss_rx_forward(ctx, lws_ss_tag(h), h->u.smd.smd_peer, buf, len);
451
0
}
452
453
#if defined(LWS_WITH_SECURE_STREAMS_PROXY_API)
454
int
455
lws_smd_sspc_rx_forward(void *ss_user, const uint8_t *buf, size_t len)
456
{
457
  struct lws_sspc_handle *h = (struct lws_sspc_handle *)
458
          (((char *)ss_user) - sizeof(*h));
459
  struct lws_context *ctx = lws_sspc_get_context(h);
460
461
  return _lws_smd_ss_rx_forward(ctx, lws_sspc_tag(h), NULL, buf, len);
462
}
463
#endif
464
465
#endif
466
467
/*
468
 * Peers that deregister need to adjust the refcount of messages they would
469
 * have been interested in, but didn't take delivery of yet
470
 */
471
472
static void
473
_lws_smd_peer_destroy(lws_smd_peer_t *pr)
474
0
{
475
0
  lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t,
476
0
            owner_peers);
477
478
0
  if (lws_mutex_lock(smd->lock_messages)) /* +++++++++ messages */
479
0
    return; /* For Coverity */
480
481
0
  lws_dll2_remove(&pr->list);
482
483
  /*
484
   * We take the approach to adjust the refcount of every would-have-been
485
   * delivered message we were interested in
486
   */
487
488
0
  while (pr->tail) {
489
490
0
    lws_smd_msg_t *m1 = lws_container_of(pr->tail->list.next,
491
0
              lws_smd_msg_t, list);
492
493
0
    if (_lws_smd_msg_peer_interested_in_msg(pr, pr->tail)) {
494
0
      if (!--pr->tail->refcount)
495
0
        _lws_smd_msg_destroy(pr->ctx, smd, pr->tail);
496
0
    }
497
498
0
    pr->tail = m1;
499
0
  }
500
501
0
  lws_free(pr);
502
503
0
  lws_mutex_unlock(smd->lock_messages); /* messages ------- */
504
0
}
505
506
static lws_smd_msg_t *
507
_lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr)
508
0
{
509
0
  lws_dll2_t *tail = &pr->tail->list;
510
0
  lws_smd_msg_t *msg;
511
512
0
  do {
513
0
    tail = tail->next;
514
0
    if (!tail)
515
0
      return NULL;
516
517
0
    msg = lws_container_of(tail, lws_smd_msg_t, list);
518
0
    if (msg->exc != pr &&
519
0
        _lws_smd_msg_peer_interested_in_msg(pr, msg))
520
0
      return msg;
521
0
  } while (1);
522
523
0
  return NULL;
524
0
}
525
526
/*
527
 * Delivers only one message to the peer and advances the tail, or sets to NULL
528
 * if no more filtered queued messages.  Returns nonzero if tail non-NULL.
529
 *
530
 * For Proxied SS, only asks for writeable and does not advance or change the
531
 * tail.
532
 *
533
 * This is done so if multiple messages queued, we don't get a situation where
534
 * one participant gets them all spammed, then the next etc.  Instead they are
535
 * delivered round-robin.
536
 *
537
 * Requires peer lock, may take message lock
538
 */
539
540
static int
541
_lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr)
542
0
{
543
0
  lws_smd_msg_t *msg;
544
545
0
  if (!pr->tail)
546
0
    return 0;
547
548
0
  msg = lws_container_of(pr->tail, lws_smd_msg_t, list);
549
550
0
  lwsl_cx_info(ctx, "deliver cl 0x%x, len %d, to peer %p",
551
0
        (unsigned int)msg->_class, (int)msg->length,
552
0
        pr);
553
554
  /*
555
   * We call the peer's callback to deliver the message.
556
   * We hold the peer lock for the duration.
557
   * That's tricky because if, in the callback, he uses smd
558
   * apis to send, we will deadlock if we try to grab the
559
   * peer lock as usual in there.
560
   *
561
   * Another way to express this is that for this thread
562
   * (only) we know we already hold the peer lock.
563
   */
564
565
0
  ctx->smd.tid_holding = lws_thread_id();
566
0
  ctx->smd.delivering = 1;
567
0
  pr->cb(pr->opaque, msg->_class, msg->timestamp,
568
0
         ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF,
569
0
         (size_t)msg->length);
570
0
  ctx->smd.delivering = 0;
571
0
#if !defined(__COVERITY__)
572
0
  assert(msg->refcount);
573
0
#endif
574
  /*
575
   * If there is one, move forward to the next queued
576
   * message that meets the filters of this peer
577
   */
578
0
  pr->tail = _lws_smd_msg_next_matching_filter(pr);
579
580
  /* tail message has to actually be of interest to the peer */
581
0
  assert(!pr->tail || (pr->tail->_class & pr->_class_filter));
582
583
0
  if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++ messages */
584
0
    return 1; /* For Coverity */
585
586
0
  if (!--msg->refcount)
587
0
    _lws_smd_msg_destroy(ctx, &ctx->smd, msg);
588
0
  lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */
589
590
0
  return !!pr->tail;
591
0
}
592
593
/*
594
 * Called when the event loop could deliver messages synchronously, eg, on
595
 * entry to idle
596
 */
597
598
int
599
lws_smd_msg_distribute(struct lws_context *ctx)
600
0
{
601
0
  char more;
602
603
  /* commonly, no messages and nothing to do... */
604
605
0
  if (!ctx->smd.owner_messages.count)
606
0
    return 0;
607
608
609
0
  do {
610
0
    more = 0;
611
0
    if (lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */
612
0
      return 1; /* For Coverity */
613
614
0
    lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
615
0
             ctx->smd.owner_peers.head) {
616
0
      lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
617
618
0
      more = (char)(more | !!_lws_smd_msg_deliver_peer(ctx, pr));
619
620
0
    } lws_end_foreach_dll_safe(p, p1);
621
622
0
    lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
623
0
  } while (more);
624
625
626
0
  return 0;
627
0
}
628
629
struct lws_smd_peer *
630
lws_smd_register(struct lws_context *ctx, void *opaque, int flags,
631
     lws_smd_class_t _class_filter, lws_smd_notification_cb_t cb)
632
0
{
633
0
  lws_smd_peer_t *pr = lws_zalloc(sizeof(*pr), __func__);
634
635
0
  if (!pr)
636
0
    return NULL;
637
638
0
  pr->cb = cb;
639
0
  pr->opaque = opaque;
640
0
  pr->_class_filter = _class_filter;
641
0
  pr->ctx = ctx;
642
643
0
  if ((!ctx->smd.delivering || !lws_thread_is(ctx->smd.tid_holding)) &&
644
0
      lws_mutex_lock(ctx->smd.lock_peers)) { /* +++++++++++++++ peers */
645
0
    lws_free(pr);
646
0
    return NULL; /* For Coverity */
647
0
  }
648
649
  /*
650
   * Let's lock the message list before adding this peer... because...
651
   */
652
653
0
  if (lws_mutex_lock(ctx->smd.lock_messages)) { /* +++++++++ messages */
654
0
    lws_free(pr);
655
0
    pr = NULL;
656
0
    goto bail1; /* For Coverity */
657
0
  }
658
659
0
  lws_dll2_add_tail(&pr->list, &ctx->smd.owner_peers);
660
661
  /* update the global class mask union to account for new peer mask */
662
0
  _lws_smd_class_mask_union(&ctx->smd);
663
664
  /*
665
   * Now there's a new peer added, any messages we have stashed will try
666
   * to deliver to this guy too, if he's interested in that class.  So we
667
   * have to update the message refcounts for queued messages-he's-
668
   * interested-in accordingly.
669
   */
670
671
0
  lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
672
0
           ctx->smd.owner_messages.head) {
673
0
    lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
674
675
0
    if (_lws_smd_msg_peer_interested_in_msg(pr, msg))
676
0
      msg->refcount++;
677
678
0
  } lws_end_foreach_dll_safe(p, p1);
679
680
  /* ... ok we are done adding the peer */
681
682
0
  lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */
683
684
0
  lwsl_cx_info(ctx, "peer %p (count %u) registered", pr,
685
0
      (unsigned int)ctx->smd.owner_peers.count);
686
687
0
bail1:
688
0
  if (!ctx->smd.delivering || !lws_thread_is(ctx->smd.tid_holding))
689
0
    lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
690
691
0
  return pr;
692
0
}
693
694
void
695
lws_smd_unregister(struct lws_smd_peer *pr)
696
0
{
697
0
  lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, owner_peers);
698
699
0
  if ((!smd->delivering || !lws_thread_is(smd->tid_holding)) &&
700
0
      lws_mutex_lock(smd->lock_peers)) /* +++++++++++++++++++ peers */
701
0
    return; /* For Coverity */
702
0
  lwsl_cx_notice(pr->ctx, "destroying peer %p", pr);
703
0
  _lws_smd_peer_destroy(pr);
704
705
0
  if (!smd->delivering || !lws_thread_is(smd->tid_holding))
706
0
    lws_mutex_unlock(smd->lock_peers); /* ----------------- peers */
707
0
}
708
709
int
710
lws_smd_message_pending(struct lws_context *ctx)
711
0
{
712
0
  int ret = 1;
713
714
  /*
715
   * First cheaply check the common case no messages pending, so there's
716
   * definitely nothing for this tsi or anything else
717
   */
718
719
0
  if (!ctx->smd.owner_messages.count)
720
0
    return 0;
721
722
  /*
723
   * If there are any messages, check their age and expire ones that
724
   * have been hanging around too long
725
   */
726
727
0
  if ((!ctx->smd.delivering || !lws_thread_is(ctx->smd.tid_holding)) &&
728
0
      lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++++++++++ peers */
729
0
    return 1; /* For Coverity */
730
0
  if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */
731
0
    goto bail; /* For Coverity */
732
733
0
  lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
734
0
           ctx->smd.owner_messages.head) {
735
0
    lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
736
737
0
    if ((lws_now_usecs() - msg->timestamp) > ctx->smd_ttl_us) {
738
0
      lwsl_cx_warn(ctx, "timing out queued message %p",
739
0
          msg);
740
741
      /*
742
       * We're forcibly yanking this guy, we can expect that
743
       * there might be peers that point to it as their tail.
744
       *
745
       * In that case, move their tails on to the next guy
746
       * they are interested in, if any.
747
       */
748
749
0
      lws_start_foreach_dll_safe(struct lws_dll2 *, pp, pp1,
750
0
               ctx->smd.owner_peers.head) {
751
0
        lws_smd_peer_t *pr = lws_container_of(pp,
752
0
              lws_smd_peer_t, list);
753
754
0
        if (pr->tail == msg)
755
0
          pr->tail = _lws_smd_msg_next_matching_filter(pr);
756
757
0
      } lws_end_foreach_dll_safe(pp, pp1);
758
759
      /*
760
       * No peer should fall foul of the peer tail checks
761
       * when destroying the message now.
762
       */
763
764
0
      _lws_smd_msg_destroy(ctx, &ctx->smd, msg);
765
0
    }
766
0
  } lws_end_foreach_dll_safe(p, p1);
767
768
0
  lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
769
770
  /*
771
   * Walk the peer list
772
   */
773
774
0
  lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
775
0
    lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
776
777
0
    if (pr->tail)
778
0
      goto bail;
779
780
0
  } lws_end_foreach_dll(p);
781
782
  /*
783
   * There's no message pending that we need to handle
784
   */
785
786
0
  ret = 0;
787
788
0
bail:
789
0
  if (!ctx->smd.delivering || !lws_thread_is(ctx->smd.tid_holding))
790
0
    lws_mutex_unlock(ctx->smd.lock_peers); /* --------------------- peers */
791
792
0
  return ret;
793
0
}
794
795
int
796
_lws_smd_destroy(struct lws_context *ctx)
797
0
{
798
  /* stop any message creation */
799
800
0
  ctx->smd._class_filter = 0;
801
802
  /*
803
   * Walk the message list, destroying them
804
   */
805
806
0
  lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
807
0
           ctx->smd.owner_messages.head) {
808
0
    lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
809
810
0
    lws_dll2_remove(&msg->list);
811
0
    lws_free(msg);
812
813
0
  } lws_end_foreach_dll_safe(p, p1);
814
815
  /*
816
   * Walk the peer list, destroying them
817
   */
818
819
0
  lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
820
0
           ctx->smd.owner_peers.head) {
821
0
    lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
822
823
0
    pr->tail = NULL; /* we just nuked all the messages, ignore */
824
0
    _lws_smd_peer_destroy(pr);
825
826
0
  } lws_end_foreach_dll_safe(p, p1);
827
828
0
  lws_mutex_destroy(ctx->smd.lock_messages);
829
0
  lws_mutex_destroy(ctx->smd.lock_peers);
830
831
0
  return 0;
832
0
}