Coverage Report

Created: 2024-09-08 06:10

/src/libwebsockets/lib/system/smd/smd.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * lws System Message Distribution
3
 *
4
 * Copyright (C) 2019 - 2021 Andy Green <andy@warmcat.com>
5
 *
6
 * Permission is hereby granted, free of charge, to any person obtaining a copy
7
 * of this software and associated documentation files (the "Software"), to
8
 * deal in the Software without restriction, including without limitation the
9
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10
 * sell copies of the Software, and to permit persons to whom the Software is
11
 * furnished to do so, subject to the following conditions:
12
 *
13
 * The above copyright notice and this permission notice shall be included in
14
 * all copies or substantial portions of the Software.
15
 *
16
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22
 * IN THE SOFTWARE.
23
 */
24
25
#include "private-lib-core.h"
26
#include <assert.h>
27
28
/* comment me to remove extra debug and sanity checks */
29
// #define LWS_SMD_DEBUG
30
31
32
#if defined(LWS_SMD_DEBUG)
33
#define lwsl_smd lwsl_notice
34
#else
35
#define lwsl_smd(_s, ...)
36
#endif
37
38
void *
39
lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len)
40
0
{
41
0
  lws_smd_msg_t *msg;
42
43
  /* only allow it if someone wants to consume this class of event */
44
45
0
  if (!(ctx->smd._class_filter & _class)) {
46
0
    lwsl_cx_info(ctx, "rejecting class 0x%x as no participant wants",
47
0
        (unsigned int)_class);
48
0
    return NULL;
49
0
  }
50
51
0
  assert(len <= LWS_SMD_MAX_PAYLOAD);
52
53
54
  /*
55
   * If SS configured, over-allocate LWS_SMD_SS_RX_HEADER_LEN behind
56
   * payload, ie,  msg_t (gap LWS_SMD_SS_RX_HEADER_LEN) payload
57
   */
58
0
  msg = lws_malloc(sizeof(*msg) + LWS_SMD_SS_RX_HEADER_LEN_EFF + len,
59
0
       __func__);
60
0
  if (!msg)
61
0
    return NULL;
62
63
0
  memset(msg, 0, sizeof(*msg));
64
0
  msg->timestamp = lws_now_usecs();
65
0
  msg->length = (uint16_t)len;
66
0
  msg->_class = _class;
67
68
0
  return ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF;
69
0
}
70
71
void
72
lws_smd_msg_free(void **ppay)
73
0
{
74
0
  lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)*ppay) -
75
0
        LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
76
77
  /* if SS configured, actual alloc is LWS_SMD_SS_RX_HEADER_LEN behind */
78
0
  lws_free(msg);
79
0
  *ppay = NULL;
80
0
}
81
82
#if defined(LWS_SMD_DEBUG)
83
static void
84
lws_smd_dump(lws_smd_t *smd)
85
{
86
  int n = 1;
87
88
  lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
89
           smd->owner_messages.head) {
90
    lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
91
92
    lwsl_info(" msg %d: %p: ref %d, lat %dms, cls: 0x%x, len %u: '%s'\n",
93
          n++, msg, msg->refcount,
94
          (unsigned int)((lws_now_usecs() - msg->timestamp) / 1000),
95
          msg->length, msg->_class,
96
          (const char *)&msg[1] + LWS_SMD_SS_RX_HEADER_LEN_EFF);
97
98
  } lws_end_foreach_dll_safe(p, p1);
99
100
  n = 1;
101
  lws_start_foreach_dll(struct lws_dll2 *, p, smd->owner_peers.head) {
102
    lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
103
104
    lwsl_info(" peer %d: %p: tail: %p, filt 0x%x\n",
105
          n++, pr, pr->tail, pr->_class_filter);
106
  } lws_end_foreach_dll(p);
107
}
108
#endif
109
110
static int
111
_lws_smd_msg_peer_interested_in_msg(lws_smd_peer_t *pr, lws_smd_msg_t *msg)
112
0
{
113
0
    return !!(msg->_class & pr->_class_filter);
114
0
}
115
116
/*
117
 * Figure out what to set the initial refcount for the message to
118
 */
119
120
static int
121
_lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg,
122
             struct lws_smd_peer *exc)
123
0
{
124
0
  struct lws_context *ctx = lws_container_of(smd, struct lws_context, smd);
125
0
  int interested = 0;
126
127
0
  lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
128
0
    lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
129
130
0
    if (pr != exc && _lws_smd_msg_peer_interested_in_msg(pr, msg))
131
      /*
132
       * This peer wants to consume it
133
       */
134
0
      interested++;
135
136
0
  } lws_end_foreach_dll(p);
137
138
0
  return interested;
139
0
}
140
141
static int
142
_lws_smd_class_mask_union(lws_smd_t *smd)
143
0
{
144
0
  uint32_t mask = 0;
145
146
0
  lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
147
0
           smd->owner_peers.head) {
148
0
    lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
149
150
0
    mask |= pr->_class_filter;
151
152
0
  } lws_end_foreach_dll_safe(p, p1);
153
154
0
  smd->_class_filter = mask;
155
156
0
  return 0;
157
0
}
158
159
/* Call with message lock held */
160
161
static void
162
_lws_smd_msg_destroy(struct lws_context *cx, lws_smd_t *smd, lws_smd_msg_t *msg)
163
0
{
164
  /*
165
   * We think we gave the message to everyone and can destroy it.
166
   * Sanity check that no peer holds a pointer to this guy
167
   */
168
169
0
  lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
170
0
           smd->owner_peers.head) {
171
0
    lws_smd_peer_t *xpr = lws_container_of(p, lws_smd_peer_t, list);
172
173
0
    if (xpr->tail == msg) {
174
0
      lwsl_cx_err(cx, "peer %p has msg %p "
175
0
         "we are about to destroy as tail", xpr, msg);
176
0
#if !defined(LWS_PLAT_FREERTOS)
177
0
      assert(0);
178
0
#endif
179
0
    }
180
181
0
  } lws_end_foreach_dll_safe(p, p1);
182
183
  /*
184
   * We have fully delivered the message now, it
185
   * can be unlinked and destroyed
186
   */
187
0
  lwsl_cx_info(cx, "destroy msg %p", msg);
188
0
  lws_dll2_remove(&msg->list);
189
0
  lws_free(msg);
190
0
}
191
192
/*
193
 * This is wanting to be threadsafe, limiting the apis we can call
194
 */
195
196
int
197
_lws_smd_msg_send(struct lws_context *ctx, void *pay, struct lws_smd_peer *exc)
198
0
{
199
0
  lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)pay) -
200
0
        LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
201
202
0
  if (ctx->smd.owner_messages.count >= ctx->smd_queue_depth) {
203
0
    lwsl_cx_warn(ctx, "rejecting message on queue depth %d",
204
0
          (int)ctx->smd.owner_messages.count);
205
    /* reject the message due to max queue depth reached */
206
0
    return 1;
207
0
  }
208
209
0
  if (!ctx->smd.delivering &&
210
0
      lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */
211
0
    return 1; /* For Coverity */
212
213
0
  if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */
214
0
    goto bail;
215
216
0
  msg->refcount = (uint16_t)_lws_smd_msg_assess_peers_interested(
217
0
              &ctx->smd, msg, exc);
218
0
  if (!msg->refcount) {
219
    /* possible, condsidering exc and no other participants */
220
0
    lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
221
222
0
    lws_free(msg);
223
0
    if (!ctx->smd.delivering)
224
0
      lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
225
226
0
    return 0;
227
0
  }
228
229
0
  msg->exc = exc;
230
231
  /* let's add him on the queue... */
232
233
0
  lws_dll2_add_tail(&msg->list, &ctx->smd.owner_messages);
234
235
  /*
236
   * Any peer with no active tail needs to check our class to see if we
237
   * should become his tail
238
   */
239
240
0
  lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
241
0
    lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
242
243
0
    if (pr != exc &&
244
0
                   !pr->tail && _lws_smd_msg_peer_interested_in_msg(pr, msg)) {
245
0
      pr->tail = msg;
246
      /* tail message has to actually be of interest to the peer */
247
0
      assert(!pr->tail || (pr->tail->_class & pr->_class_filter));
248
0
    }
249
250
0
  } lws_end_foreach_dll(p);
251
252
#if defined(LWS_SMD_DEBUG)
253
  lwsl_smd("%s: added %p (refc %u) depth now %d\n", __func__,
254
     msg, msg->refcount, ctx->smd.owner_messages.count);
255
  lws_smd_dump(&ctx->smd);
256
#endif
257
258
0
  lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
259
260
0
bail:
261
0
  if (!ctx->smd.delivering)
262
0
    lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
263
264
  /* we may be happening from another thread context */
265
0
  lws_cancel_service(ctx);
266
267
0
  return 0;
268
0
}
269
270
/*
271
 * This is wanting to be threadsafe, limiting the apis we can call
272
 */
273
274
int
275
lws_smd_msg_send(struct lws_context *ctx, void *pay)
276
0
{
277
0
  return _lws_smd_msg_send(ctx, pay, NULL);
278
0
}
279
280
/*
281
 * This is wanting to be threadsafe, limiting the apis we can call
282
 */
283
284
int
285
lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class,
286
       const char *format, ...)
287
0
{
288
0
  lws_smd_msg_t *msg;
289
0
  va_list ap;
290
0
  void *p;
291
0
  int n;
292
293
0
  if (!(ctx->smd._class_filter & _class))
294
    /*
295
     * There's nobody interested in messages of this class atm.
296
     * Don't bother generating it, and act like all is well.
297
     */
298
0
    return 0;
299
300
0
  va_start(ap, format);
301
0
  n = vsnprintf(NULL, 0, format, ap);
302
0
  va_end(ap);
303
0
  if (n > LWS_SMD_MAX_PAYLOAD)
304
    /* too large to send */
305
0
    return 1;
306
307
0
  p = lws_smd_msg_alloc(ctx, _class, (size_t)n + 2);
308
0
  if (!p)
309
0
    return 1;
310
0
  msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF -
311
0
                sizeof(*msg));
312
0
  msg->length = (uint16_t)n;
313
0
  va_start(ap, format);
314
0
  vsnprintf((char *)p, (unsigned int)n + 2, format, ap);
315
0
  va_end(ap);
316
317
  /*
318
   * locks taken and released in here
319
   */
320
321
0
  if (lws_smd_msg_send(ctx, p)) {
322
0
    lws_smd_msg_free(&p);
323
0
    return 1;
324
0
  }
325
326
0
  return 0;
327
0
}
328
329
#if defined(LWS_WITH_SECURE_STREAMS)
330
int
331
lws_smd_ss_msg_printf(const char *tag, uint8_t *buf, size_t *len,
332
          lws_smd_class_t _class, const char *format, ...)
333
0
{
334
0
  char *content = (char *)buf + LWS_SMD_SS_RX_HEADER_LEN;
335
0
  va_list ap;
336
0
  int n;
337
338
0
  if (*len < LWS_SMD_SS_RX_HEADER_LEN)
339
0
    return 1;
340
341
0
  lws_ser_wu64be(buf, _class);
342
0
  lws_ser_wu64be(buf + 8, 0); /* valgrind notices uninitialized if left */
343
344
0
  va_start(ap, format);
345
0
  n = vsnprintf(content, (*len) - LWS_SMD_SS_RX_HEADER_LEN, format, ap);
346
0
  va_end(ap);
347
348
0
  if (n > LWS_SMD_MAX_PAYLOAD ||
349
0
      (unsigned int)n > (*len) - LWS_SMD_SS_RX_HEADER_LEN)
350
    /* too large to send */
351
0
    return 1;
352
353
0
  *len = LWS_SMD_SS_RX_HEADER_LEN + (unsigned int)n;
354
355
0
  lwsl_info("%s: %s send cl 0x%x, len %u\n", __func__, tag, (unsigned int)_class,
356
0
      (unsigned int)n);
357
358
0
  return 0;
359
0
}
360
361
/*
362
 * This is a helper that user rx handler for LWS_SMD_STREAMTYPENAME SS can
363
 * call through to with the payload it received from the proxy.  It will then
364
 * forward the recieved SMD message to all local (same-context) participants
365
 * that are interested in that class (except ones with callback skip_cb, so
366
 * we don't loop).
367
 */
368
369
static int
370
_lws_smd_ss_rx_forward(struct lws_context *ctx, const char *tag,
371
           struct lws_smd_peer *pr, const uint8_t *buf, size_t len)
372
0
{
373
0
  lws_smd_class_t _class;
374
0
  lws_smd_msg_t *msg;
375
0
  void *p;
376
377
0
  if (len < LWS_SMD_SS_RX_HEADER_LEN_EFF)
378
0
    return 1;
379
380
0
  if (len >= LWS_SMD_MAX_PAYLOAD + LWS_SMD_SS_RX_HEADER_LEN_EFF)
381
0
    return 1;
382
383
0
  _class = (lws_smd_class_t)lws_ser_ru64be(buf);
384
385
  //if (_class == LWSSMDCL_METRICS) {
386
387
  //}
388
389
  /* only locally forward messages that we care about in this process */
390
391
0
  if (!(ctx->smd._class_filter & _class))
392
    /*
393
     * There's nobody interested in messages of this class atm.
394
     * Don't bother generating it, and act like all is well.
395
     */
396
0
    return 0;
397
398
0
  p = lws_smd_msg_alloc(ctx, _class, len);
399
0
  if (!p)
400
0
    return 1;
401
402
0
  msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF -
403
0
                sizeof(*msg));
404
0
  msg->length = (uint16_t)(len - LWS_SMD_SS_RX_HEADER_LEN_EFF);
405
  /* adopt the original source timestamp, not time we forwarded it */
406
0
  msg->timestamp = (lws_usec_t)lws_ser_ru64be(buf + 8);
407
408
  /* copy the message payload in */
409
0
  memcpy(p, buf + LWS_SMD_SS_RX_HEADER_LEN_EFF, msg->length);
410
411
  /*
412
   * locks taken and released in here
413
   */
414
415
0
  if (_lws_smd_msg_send(ctx, p, pr)) {
416
    /* we couldn't send it after all that... */
417
0
    lws_smd_msg_free(&p);
418
419
0
    return 1;
420
0
  }
421
422
0
  lwsl_info("%s: %s send cl 0x%x, len %u, ts %llu\n", __func__,
423
0
        tag, (unsigned int)_class, msg->length,
424
0
        (unsigned long long)msg->timestamp);
425
426
0
  return 0;
427
0
}
428
429
int
430
lws_smd_ss_rx_forward(void *ss_user, const uint8_t *buf, size_t len)
431
0
{
432
0
  struct lws_ss_handle *h = (struct lws_ss_handle *)
433
0
          (((char *)ss_user) - sizeof(*h));
434
0
  struct lws_context *ctx = lws_ss_get_context(h);
435
436
0
  return _lws_smd_ss_rx_forward(ctx, lws_ss_tag(h), h->u.smd.smd_peer, buf, len);
437
0
}
438
439
#if defined(LWS_WITH_SECURE_STREAMS_PROXY_API)
440
int
441
lws_smd_sspc_rx_forward(void *ss_user, const uint8_t *buf, size_t len)
442
{
443
  struct lws_sspc_handle *h = (struct lws_sspc_handle *)
444
          (((char *)ss_user) - sizeof(*h));
445
  struct lws_context *ctx = lws_sspc_get_context(h);
446
447
  return _lws_smd_ss_rx_forward(ctx, lws_sspc_tag(h), NULL, buf, len);
448
}
449
#endif
450
451
#endif
452
453
/*
454
 * Peers that deregister need to adjust the refcount of messages they would
455
 * have been interested in, but didn't take delivery of yet
456
 */
457
458
static void
459
_lws_smd_peer_destroy(lws_smd_peer_t *pr)
460
0
{
461
0
  lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t,
462
0
            owner_peers);
463
464
0
  if (lws_mutex_lock(smd->lock_messages)) /* +++++++++ messages */
465
0
    return; /* For Coverity */
466
467
0
  lws_dll2_remove(&pr->list);
468
469
  /*
470
   * We take the approach to adjust the refcount of every would-have-been
471
   * delivered message we were interested in
472
   */
473
474
0
  while (pr->tail) {
475
476
0
    lws_smd_msg_t *m1 = lws_container_of(pr->tail->list.next,
477
0
              lws_smd_msg_t, list);
478
479
0
    if (_lws_smd_msg_peer_interested_in_msg(pr, pr->tail)) {
480
0
      if (!--pr->tail->refcount)
481
0
        _lws_smd_msg_destroy(pr->ctx, smd, pr->tail);
482
0
    }
483
484
0
    pr->tail = m1;
485
0
  }
486
487
0
  lws_free(pr);
488
489
0
  lws_mutex_unlock(smd->lock_messages); /* messages ------- */
490
0
}
491
492
static lws_smd_msg_t *
493
_lws_smd_msg_next_matching_filter(lws_smd_peer_t *pr)
494
0
{
495
0
  lws_dll2_t *tail = &pr->tail->list;
496
0
  lws_smd_msg_t *msg;
497
498
0
  do {
499
0
    tail = tail->next;
500
0
    if (!tail)
501
0
      return NULL;
502
503
0
    msg = lws_container_of(tail, lws_smd_msg_t, list);
504
0
    if (msg->exc != pr &&
505
0
        _lws_smd_msg_peer_interested_in_msg(pr, msg))
506
0
      return msg;
507
0
  } while (1);
508
509
0
  return NULL;
510
0
}
511
512
/*
513
 * Delivers only one message to the peer and advances the tail, or sets to NULL
514
 * if no more filtered queued messages.  Returns nonzero if tail non-NULL.
515
 *
516
 * For Proxied SS, only asks for writeable and does not advance or change the
517
 * tail.
518
 *
519
 * This is done so if multiple messages queued, we don't get a situation where
520
 * one participant gets them all spammed, then the next etc.  Instead they are
521
 * delivered round-robin.
522
 *
523
 * Requires peer lock, may take message lock
524
 */
525
526
static int
527
_lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr)
528
0
{
529
0
  lws_smd_msg_t *msg;
530
531
0
  if (!pr->tail)
532
0
    return 0;
533
534
0
  msg = lws_container_of(pr->tail, lws_smd_msg_t, list);
535
536
0
  lwsl_cx_info(ctx, "deliver cl 0x%x, len %d, to peer %p",
537
0
        (unsigned int)msg->_class, (int)msg->length,
538
0
        pr);
539
540
0
  pr->cb(pr->opaque, msg->_class, msg->timestamp,
541
0
         ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF,
542
0
         (size_t)msg->length);
543
0
#if !defined(__COVERITY__)
544
0
  assert(msg->refcount);
545
0
#endif
546
  /*
547
   * If there is one, move forward to the next queued
548
   * message that meets the filters of this peer
549
   */
550
0
  pr->tail = _lws_smd_msg_next_matching_filter(pr);
551
552
  /* tail message has to actually be of interest to the peer */
553
0
  assert(!pr->tail || (pr->tail->_class & pr->_class_filter));
554
555
0
  if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++ messages */
556
0
    return 1; /* For Coverity */
557
558
0
  if (!--msg->refcount)
559
0
    _lws_smd_msg_destroy(ctx, &ctx->smd, msg);
560
0
  lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */
561
562
0
  return !!pr->tail;
563
0
}
564
565
/*
566
 * Called when the event loop could deliver messages synchronously, eg, on
567
 * entry to idle
568
 */
569
570
int
571
lws_smd_msg_distribute(struct lws_context *ctx)
572
0
{
573
0
  char more;
574
575
  /* commonly, no messages and nothing to do... */
576
577
0
  if (!ctx->smd.owner_messages.count)
578
0
    return 0;
579
580
0
  ctx->smd.delivering = 1;
581
582
0
  do {
583
0
    more = 0;
584
0
    if (lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++ peers */
585
0
      return 1; /* For Coverity */
586
587
0
    lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
588
0
             ctx->smd.owner_peers.head) {
589
0
      lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
590
591
0
      more = (char)(more | !!_lws_smd_msg_deliver_peer(ctx, pr));
592
593
0
    } lws_end_foreach_dll_safe(p, p1);
594
595
0
    lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
596
0
  } while (more);
597
598
0
  ctx->smd.delivering = 0;
599
600
0
  return 0;
601
0
}
602
603
struct lws_smd_peer *
604
lws_smd_register(struct lws_context *ctx, void *opaque, int flags,
605
     lws_smd_class_t _class_filter, lws_smd_notification_cb_t cb)
606
0
{
607
0
  lws_smd_peer_t *pr = lws_zalloc(sizeof(*pr), __func__);
608
609
0
  if (!pr)
610
0
    return NULL;
611
612
0
  pr->cb = cb;
613
0
  pr->opaque = opaque;
614
0
  pr->_class_filter = _class_filter;
615
0
  pr->ctx = ctx;
616
617
0
  if (!ctx->smd.delivering &&
618
0
      lws_mutex_lock(ctx->smd.lock_peers)) { /* +++++++++++++++ peers */
619
0
      lws_free(pr);
620
0
      return NULL; /* For Coverity */
621
0
    }
622
623
  /*
624
   * Let's lock the message list before adding this peer... because...
625
   */
626
627
0
  if (lws_mutex_lock(ctx->smd.lock_messages)) { /* +++++++++ messages */
628
0
    lws_free(pr);
629
0
    pr = NULL;
630
0
    goto bail1; /* For Coverity */
631
0
  }
632
633
0
  lws_dll2_add_tail(&pr->list, &ctx->smd.owner_peers);
634
635
  /* update the global class mask union to account for new peer mask */
636
0
  _lws_smd_class_mask_union(&ctx->smd);
637
638
  /*
639
   * Now there's a new peer added, any messages we have stashed will try
640
   * to deliver to this guy too, if he's interested in that class.  So we
641
   * have to update the message refcounts for queued messages-he's-
642
   * interested-in accordingly.
643
   */
644
645
0
  lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
646
0
           ctx->smd.owner_messages.head) {
647
0
    lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
648
649
0
    if (_lws_smd_msg_peer_interested_in_msg(pr, msg))
650
0
      msg->refcount++;
651
652
0
  } lws_end_foreach_dll_safe(p, p1);
653
654
  /* ... ok we are done adding the peer */
655
656
0
  lws_mutex_unlock(ctx->smd.lock_messages); /* messages ------- */
657
658
0
  lwsl_cx_info(ctx, "peer %p (count %u) registered", pr,
659
0
      (unsigned int)ctx->smd.owner_peers.count);
660
661
0
bail1:
662
0
  if (!ctx->smd.delivering)
663
0
    lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
664
665
0
  return pr;
666
0
}
667
668
void
669
lws_smd_unregister(struct lws_smd_peer *pr)
670
0
{
671
0
  lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, owner_peers);
672
673
0
  if (!smd->delivering &&
674
0
      lws_mutex_lock(smd->lock_peers)) /* +++++++++++++++++++ peers */
675
0
    return; /* For Coverity */
676
0
  lwsl_cx_notice(pr->ctx, "destroying peer %p", pr);
677
0
  _lws_smd_peer_destroy(pr);
678
0
  if (!smd->delivering)
679
0
    lws_mutex_unlock(smd->lock_peers); /* ----------------- peers */
680
0
}
681
682
int
683
lws_smd_message_pending(struct lws_context *ctx)
684
0
{
685
0
  int ret = 1;
686
687
  /*
688
   * First cheaply check the common case no messages pending, so there's
689
   * definitely nothing for this tsi or anything else
690
   */
691
692
0
  if (!ctx->smd.owner_messages.count)
693
0
    return 0;
694
695
  /*
696
   * If there are any messages, check their age and expire ones that
697
   * have been hanging around too long
698
   */
699
700
0
  if (lws_mutex_lock(ctx->smd.lock_peers)) /* +++++++++++++++++++++++ peers */
701
0
    return 1; /* For Coverity */
702
0
  if (lws_mutex_lock(ctx->smd.lock_messages)) /* +++++++++++++++++ messages */
703
0
    goto bail; /* For Coverity */
704
705
0
  lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
706
0
           ctx->smd.owner_messages.head) {
707
0
    lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
708
709
0
    if ((lws_now_usecs() - msg->timestamp) > ctx->smd_ttl_us) {
710
0
      lwsl_cx_warn(ctx, "timing out queued message %p",
711
0
          msg);
712
713
      /*
714
       * We're forcibly yanking this guy, we can expect that
715
       * there might be peers that point to it as their tail.
716
       *
717
       * In that case, move their tails on to the next guy
718
       * they are interested in, if any.
719
       */
720
721
0
      lws_start_foreach_dll_safe(struct lws_dll2 *, pp, pp1,
722
0
               ctx->smd.owner_peers.head) {
723
0
        lws_smd_peer_t *pr = lws_container_of(pp,
724
0
              lws_smd_peer_t, list);
725
726
0
        if (pr->tail == msg)
727
0
          pr->tail = _lws_smd_msg_next_matching_filter(pr);
728
729
0
      } lws_end_foreach_dll_safe(pp, pp1);
730
731
      /*
732
       * No peer should fall foul of the peer tail checks
733
       * when destroying the message now.
734
       */
735
736
0
      _lws_smd_msg_destroy(ctx, &ctx->smd, msg);
737
0
    }
738
0
  } lws_end_foreach_dll_safe(p, p1);
739
740
0
  lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
741
742
  /*
743
   * Walk the peer list
744
   */
745
746
0
  lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
747
0
    lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
748
749
0
    if (pr->tail)
750
0
      goto bail;
751
752
0
  } lws_end_foreach_dll(p);
753
754
  /*
755
   * There's no message pending that we need to handle
756
   */
757
758
0
  ret = 0;
759
760
0
bail:
761
0
  lws_mutex_unlock(ctx->smd.lock_peers); /* --------------------- peers */
762
763
0
  return ret;
764
0
}
765
766
int
767
_lws_smd_destroy(struct lws_context *ctx)
768
0
{
769
  /* stop any message creation */
770
771
0
  ctx->smd._class_filter = 0;
772
773
  /*
774
   * Walk the message list, destroying them
775
   */
776
777
0
  lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
778
0
           ctx->smd.owner_messages.head) {
779
0
    lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
780
781
0
    lws_dll2_remove(&msg->list);
782
0
    lws_free(msg);
783
784
0
  } lws_end_foreach_dll_safe(p, p1);
785
786
  /*
787
   * Walk the peer list, destroying them
788
   */
789
790
0
  lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
791
0
           ctx->smd.owner_peers.head) {
792
0
    lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
793
794
0
    pr->tail = NULL; /* we just nuked all the messages, ignore */
795
0
    _lws_smd_peer_destroy(pr);
796
797
0
  } lws_end_foreach_dll_safe(p, p1);
798
799
0
  lws_mutex_destroy(ctx->smd.lock_messages);
800
0
  lws_mutex_destroy(ctx->smd.lock_peers);
801
802
0
  return 0;
803
0
}