Coverage Report

Created: 2025-08-28 06:29

/src/frr/zebra/zebra_opaque.c
Line
Count
Source (jump to first uncovered line)
1
// SPDX-License-Identifier: GPL-2.0-or-later
2
/*
3
 * Zebra opaque message handler module
4
 * Copyright (c) 2020 Volta Networks, Inc.
5
 */
6
7
8
#include <zebra.h>
9
#include "lib/debug.h"
10
#include "lib/frr_pthread.h"
11
#include "lib/stream.h"
12
#include "zebra/debug.h"
13
#include "zebra/zserv.h"
14
#include "zebra/zebra_opaque.h"
15
#include "zebra/rib.h"
16
17
/* Mem type */
18
DEFINE_MTYPE_STATIC(ZEBRA, OPQ, "ZAPI Opaque Information");
19
20
/* Hash to hold message registration info from zapi clients */
21
PREDECL_HASH(opq_regh);
22
23
/* Registered client info */
24
struct opq_client_reg {
25
  int proto;
26
  int instance;
27
  uint32_t session_id;
28
29
  struct opq_client_reg *next;
30
  struct opq_client_reg *prev;
31
};
32
33
/* Opaque message registration info */
34
struct opq_msg_reg {
35
  struct opq_regh_item item;
36
37
  /* Message type */
38
  uint32_t type;
39
40
  struct opq_client_reg *clients;
41
};
42
43
/* Registration helper prototypes */
44
static uint32_t registration_hash(const struct opq_msg_reg *reg);
45
static int registration_compare(const struct opq_msg_reg *reg1,
46
        const struct opq_msg_reg *reg2);
47
48
DECLARE_HASH(opq_regh, struct opq_msg_reg, item, registration_compare,
49
       registration_hash);
50
51
static struct opq_regh_head opq_reg_hash;
52
53
/*
54
 * Globals
55
 */
56
static struct zebra_opaque_globals {
57
58
  /* Sentinel for run or start of shutdown */
59
  _Atomic uint32_t run;
60
61
  /* Limit number of pending, unprocessed updates */
62
  _Atomic uint32_t max_queued_updates;
63
64
  /* Limit number of new messages dequeued at once, to pace an
65
   * incoming burst.
66
   */
67
  uint32_t msgs_per_cycle;
68
69
  /* Stats: counters of incoming messages, errors, and yields (when
70
   * the limit has been reached.)
71
   */
72
  _Atomic uint32_t msgs_in;
73
  _Atomic uint32_t msg_errors;
74
  _Atomic uint32_t yields;
75
76
  /* pthread */
77
  struct frr_pthread *pthread;
78
79
  /* Event-delivery context 'master' for the module */
80
  struct event_loop *master;
81
82
  /* Event/'thread' pointer for queued zapi messages */
83
  struct event *t_msgs;
84
85
  /* Input fifo queue to the module, and lock to protect it. */
86
  pthread_mutex_t mutex;
87
  struct stream_fifo in_fifo;
88
89
} zo_info;
90
91
/* Name string for debugs/logs */
92
static const char LOG_NAME[] = "Zebra Opaque";
93
94
/* Prototypes */
95
96
/* Main event loop, processing incoming message queue */
97
static void process_messages(struct event *event);
98
static int handle_opq_registration(const struct zmsghdr *hdr,
99
           struct stream *msg);
100
static int handle_opq_unregistration(const struct zmsghdr *hdr,
101
             struct stream *msg);
102
static int dispatch_opq_messages(struct stream_fifo *msg_fifo);
103
static struct opq_msg_reg *opq_reg_lookup(uint32_t type);
104
static bool opq_client_match(const struct opq_client_reg *client,
105
           const struct zapi_opaque_reg_info *info);
106
static struct opq_msg_reg *opq_reg_alloc(uint32_t type);
107
static void opq_reg_free(struct opq_msg_reg **reg);
108
static struct opq_client_reg *opq_client_alloc(
109
  const struct zapi_opaque_reg_info *info);
110
static void opq_client_free(struct opq_client_reg **client);
111
static const char *opq_client2str(char *buf, size_t buflen,
112
          const struct opq_client_reg *client);
113
114
/*
115
 * Initialize the module at startup
116
 */
117
void zebra_opaque_init(void)
118
0
{
119
0
  memset(&zo_info, 0, sizeof(zo_info));
120
121
0
  pthread_mutex_init(&zo_info.mutex, NULL);
122
0
  stream_fifo_init(&zo_info.in_fifo);
123
124
0
  zo_info.msgs_per_cycle = ZEBRA_OPAQUE_MSG_LIMIT;
125
0
}
126
127
/*
128
 * Start the module pthread. This step is run later than the
129
 * 'init' step, in case zebra has fork-ed.
130
 */
131
void zebra_opaque_start(void)
132
0
{
133
0
  struct frr_pthread_attr pattr = {
134
0
    .start = frr_pthread_attr_default.start,
135
0
    .stop = frr_pthread_attr_default.stop
136
0
  };
137
138
0
  if (IS_ZEBRA_DEBUG_EVENT)
139
0
    zlog_debug("%s module starting", LOG_NAME);
140
141
  /* Start pthread */
142
0
  zo_info.pthread = frr_pthread_new(&pattr, "Zebra Opaque thread",
143
0
            "zebra_opaque");
144
145
  /* Associate event 'master' */
146
0
  zo_info.master = zo_info.pthread->master;
147
148
0
  atomic_store_explicit(&zo_info.run, 1, memory_order_relaxed);
149
150
  /* Enqueue an initial event for the pthread */
151
0
  event_add_event(zo_info.master, process_messages, NULL, 0,
152
0
      &zo_info.t_msgs);
153
154
  /* And start the pthread */
155
0
  frr_pthread_run(zo_info.pthread, NULL);
156
0
}
157
158
/*
159
 * Module stop, halting the dedicated pthread; called from the main pthread.
160
 */
161
void zebra_opaque_stop(void)
162
0
{
163
0
  if (IS_ZEBRA_DEBUG_EVENT)
164
0
    zlog_debug("%s module stop", LOG_NAME);
165
166
0
  atomic_store_explicit(&zo_info.run, 0, memory_order_relaxed);
167
168
0
  frr_pthread_stop(zo_info.pthread, NULL);
169
170
0
  frr_pthread_destroy(zo_info.pthread);
171
172
0
  if (IS_ZEBRA_DEBUG_EVENT)
173
0
    zlog_debug("%s module stop complete", LOG_NAME);
174
0
}
175
176
/*
177
 * Module final cleanup, called from the zebra main pthread.
178
 */
179
void zebra_opaque_finish(void)
180
0
{
181
0
  struct opq_msg_reg *reg;
182
0
  struct opq_client_reg *client;
183
184
0
  if (IS_ZEBRA_DEBUG_EVENT)
185
0
    zlog_debug("%s module shutdown", LOG_NAME);
186
187
  /* Clear out registration info */
188
0
  while ((reg = opq_regh_pop(&opq_reg_hash)) != NULL) {
189
0
    client = reg->clients;
190
0
    while (client) {
191
0
      reg->clients = client->next;
192
0
      opq_client_free(&client);
193
0
      client = reg->clients;
194
0
    }
195
196
0
    opq_reg_free(&reg);
197
0
  }
198
199
0
  opq_regh_fini(&opq_reg_hash);
200
201
0
  pthread_mutex_destroy(&zo_info.mutex);
202
0
  stream_fifo_deinit(&zo_info.in_fifo);
203
0
}
204
205
/*
206
 * Does this module handle (intercept) the specified zapi message type?
207
 */
208
bool zebra_opaque_handles_msgid(uint16_t id)
209
0
{
210
0
  bool ret = false;
211
212
0
  switch (id) {
213
0
  case ZEBRA_OPAQUE_MESSAGE:
214
0
  case ZEBRA_OPAQUE_REGISTER:
215
0
  case ZEBRA_OPAQUE_UNREGISTER:
216
0
    ret = true;
217
0
    break;
218
0
  default:
219
0
    break;
220
0
  }
221
222
0
  return ret;
223
0
}
224
225
/*
226
 * Enqueue a batch of messages for processing - this is the public api
227
 * used from the zapi processing threads.
228
 */
229
uint32_t zebra_opaque_enqueue_batch(struct stream_fifo *batch)
230
0
{
231
0
  uint32_t counter = 0;
232
0
  struct stream *msg;
233
234
  /* Dequeue messages from the incoming batch, and save them
235
   * on the module fifo.
236
   */
237
0
  frr_with_mutex (&zo_info.mutex) {
238
0
    msg = stream_fifo_pop(batch);
239
0
    while (msg) {
240
0
      stream_fifo_push(&zo_info.in_fifo, msg);
241
0
      counter++;
242
0
      msg = stream_fifo_pop(batch);
243
0
    }
244
0
  }
245
246
  /* Schedule module pthread to process the batch */
247
0
  if (counter > 0) {
248
0
    if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
249
0
      zlog_debug("%s: received %u messages",
250
0
           __func__, counter);
251
0
    event_add_event(zo_info.master, process_messages, NULL, 0,
252
0
        &zo_info.t_msgs);
253
0
  }
254
255
0
  return counter;
256
0
}
257
258
/*
259
 * Pthread event loop, process the incoming message queue.
260
 */
261
static void process_messages(struct event *event)
262
0
{
263
0
  struct stream_fifo fifo;
264
0
  struct stream *msg;
265
0
  uint32_t i;
266
0
  bool need_resched = false;
267
0
268
0
  stream_fifo_init(&fifo);
269
0
270
0
  /* Check for zebra shutdown */
271
0
  if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0)
272
0
    goto done;
273
0
274
0
  /*
275
0
   * Dequeue some messages from the incoming queue, temporarily
276
0
   * save them on the local fifo
277
0
   */
278
0
  frr_with_mutex (&zo_info.mutex) {
279
0
280
0
    for (i = 0; i < zo_info.msgs_per_cycle; i++) {
281
0
      msg = stream_fifo_pop(&zo_info.in_fifo);
282
0
      if (msg == NULL)
283
0
        break;
284
0
285
0
      stream_fifo_push(&fifo, msg);
286
0
    }
287
0
288
0
    /*
289
0
     * We may need to reschedule, if there are still
290
0
     * queued messages
291
0
     */
292
0
    if (stream_fifo_head(&zo_info.in_fifo) != NULL)
293
0
      need_resched = true;
294
0
  }
295
0
296
0
  /* Update stats */
297
0
  atomic_fetch_add_explicit(&zo_info.msgs_in, i, memory_order_relaxed);
298
0
299
0
  /* Check for zebra shutdown */
300
0
  if (atomic_load_explicit(&zo_info.run, memory_order_relaxed) == 0) {
301
0
    need_resched = false;
302
0
    goto done;
303
0
  }
304
0
305
0
  if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
306
0
    zlog_debug("%s: processing %u messages", __func__, i);
307
0
308
0
  /*
309
0
   * Process the messages from the temporary fifo. We send the whole
310
0
   * fifo so that we can take advantage of batching internally. Note
311
0
   * that registration/deregistration messages are handled here also.
312
0
   */
313
0
  dispatch_opq_messages(&fifo);
314
0
315
0
done:
316
0
317
0
  if (need_resched) {
318
0
    atomic_fetch_add_explicit(&zo_info.yields, 1,
319
0
            memory_order_relaxed);
320
0
    event_add_event(zo_info.master, process_messages, NULL, 0,
321
0
        &zo_info.t_msgs);
322
0
  }
323
0
324
0
  /* This will also free any leftover messages, in the shutdown case */
325
0
  stream_fifo_deinit(&fifo);
326
0
}
327
328
/*
329
 * Process (dispatch) or drop opaque messages.
330
 */
331
static int dispatch_opq_messages(struct stream_fifo *msg_fifo)
332
0
{
333
0
  struct stream *msg, *dup;
334
0
  struct zmsghdr hdr;
335
0
  struct zapi_opaque_msg info;
336
0
  struct opq_msg_reg *reg;
337
0
  int ret;
338
0
  struct opq_client_reg *client;
339
0
  struct zserv *zclient;
340
0
  char buf[50];
341
0
342
0
  while ((msg = stream_fifo_pop(msg_fifo)) != NULL) {
343
0
    zapi_parse_header(msg, &hdr);
344
0
    hdr.length -= ZEBRA_HEADER_SIZE;
345
0
346
0
    /* Handle client registration messages */
347
0
    if (hdr.command == ZEBRA_OPAQUE_REGISTER) {
348
0
      handle_opq_registration(&hdr, msg);
349
0
      continue;
350
0
    } else if (hdr.command == ZEBRA_OPAQUE_UNREGISTER) {
351
0
      handle_opq_unregistration(&hdr, msg);
352
0
      continue;
353
0
    }
354
0
355
0
    /* We only process OPAQUE messages - drop anything else */
356
0
    if (hdr.command != ZEBRA_OPAQUE_MESSAGE)
357
0
      goto drop_it;
358
0
359
0
    /* Dispatch to any registered ZAPI client(s) */
360
0
361
0
    /* Extract subtype and flags */
362
0
    ret = zclient_opaque_decode(msg, &info);
363
0
    if (ret != 0)
364
0
      goto drop_it;
365
0
366
0
    /* Look up registered ZAPI client(s) */
367
0
    reg = opq_reg_lookup(info.type);
368
0
    if (reg == NULL) {
369
0
      if (IS_ZEBRA_DEBUG_RECV && IS_ZEBRA_DEBUG_DETAIL)
370
0
        zlog_debug("%s: no registrations for opaque type %u, flags %#x",
371
0
             __func__, info.type, info.flags);
372
0
      goto drop_it;
373
0
    }
374
0
375
0
    /* Reset read pointer, since we'll be re-sending message */
376
0
    stream_set_getp(msg, 0);
377
0
378
0
    /* Send a copy of the message to all registered clients */
379
0
    for (client = reg->clients; client; client = client->next) {
380
0
      dup = NULL;
381
0
382
0
      if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST)) {
383
0
384
0
        if (client->proto != info.proto ||
385
0
            client->instance != info.instance ||
386
0
            client->session_id != info.session_id)
387
0
          continue;
388
0
389
0
        if (IS_ZEBRA_DEBUG_RECV &&
390
0
            IS_ZEBRA_DEBUG_DETAIL)
391
0
          zlog_debug("%s: found matching unicast client %s",
392
0
               __func__,
393
0
               opq_client2str(buf,
394
0
                  sizeof(buf),
395
0
                  client));
396
0
397
0
      } else {
398
0
        /* Copy message if more clients */
399
0
        if (client->next)
400
0
          dup = stream_dup(msg);
401
0
      }
402
0
403
0
      /*
404
0
       * TODO -- this isn't ideal: we're going through an
405
0
       * acquire/release cycle for each client for each
406
0
       * message. Replace this with a batching version.
407
0
       */
408
0
      zclient = zserv_acquire_client(client->proto,
409
0
                   client->instance,
410
0
                   client->session_id);
411
0
      if (zclient) {
412
0
        if (IS_ZEBRA_DEBUG_SEND &&
413
0
            IS_ZEBRA_DEBUG_DETAIL)
414
0
          zlog_debug("%s: sending %s to client %s",
415
0
               __func__,
416
0
               (dup ? "dup" : "msg"),
417
0
               opq_client2str(buf,
418
0
                  sizeof(buf),
419
0
                  client));
420
0
421
0
        /*
422
0
         * Sending a message actually means enqueuing
423
0
         * it for a zapi io pthread to send - so we
424
0
         * don't touch the message after this call.
425
0
         */
426
0
        zserv_send_message(zclient, dup ? dup : msg);
427
0
        if (dup)
428
0
          dup = NULL;
429
0
        else
430
0
          msg = NULL;
431
0
432
0
        zserv_release_client(zclient);
433
0
      } else {
434
0
        if (IS_ZEBRA_DEBUG_RECV &&
435
0
            IS_ZEBRA_DEBUG_DETAIL)
436
0
          zlog_debug("%s: type %u: no zclient for %s",
437
0
               __func__, info.type,
438
0
               opq_client2str(buf,
439
0
                  sizeof(buf),
440
0
                  client));
441
0
        /* Registered but gone? */
442
0
        if (dup)
443
0
          stream_free(dup);
444
0
      }
445
0
446
0
      /* If unicast, we're done */
447
0
      if (CHECK_FLAG(info.flags, ZAPI_OPAQUE_FLAG_UNICAST))
448
0
        break;
449
0
    }
450
0
451
0
drop_it:
452
0
453
0
    if (msg)
454
0
      stream_free(msg);
455
0
  }
456
0
457
0
  return 0;
458
0
}
459
460
/*
461
 * Process a register/unregister message
462
 */
463
static int handle_opq_registration(const struct zmsghdr *hdr,
464
           struct stream *msg)
465
0
{
466
0
  int ret = 0;
467
0
  struct zapi_opaque_reg_info info;
468
0
  struct opq_client_reg *client;
469
0
  struct opq_msg_reg key, *reg;
470
0
  char buf[50];
471
0
472
0
  memset(&info, 0, sizeof(info));
473
0
474
0
  if (zapi_opaque_reg_decode(msg, &info) < 0) {
475
0
    ret = -1;
476
0
    goto done;
477
0
  }
478
0
479
0
  memset(&key, 0, sizeof(key));
480
0
481
0
  key.type = info.type;
482
0
483
0
  reg = opq_regh_find(&opq_reg_hash, &key);
484
0
  if (reg) {
485
0
    /* Look for dup client */
486
0
    for (client = reg->clients; client != NULL;
487
0
         client = client->next) {
488
0
      if (opq_client_match(client, &info))
489
0
        break;
490
0
    }
491
0
492
0
    if (client) {
493
0
      /* Oops - duplicate registration? */
494
0
      if (IS_ZEBRA_DEBUG_RECV)
495
0
        zlog_debug("%s: duplicate opq reg for client %s",
496
0
             __func__,
497
0
             opq_client2str(buf, sizeof(buf),
498
0
                client));
499
0
      goto done;
500
0
    }
501
0
502
0
    client = opq_client_alloc(&info);
503
0
504
0
    if (IS_ZEBRA_DEBUG_RECV)
505
0
      zlog_debug("%s: client %s registers for %u",
506
0
           __func__,
507
0
           opq_client2str(buf, sizeof(buf), client),
508
0
           info.type);
509
0
510
0
    /* Link client into registration */
511
0
    client->next = reg->clients;
512
0
    if (reg->clients)
513
0
      reg->clients->prev = client;
514
0
    reg->clients = client;
515
0
  } else {
516
0
    /*
517
0
     * No existing registrations - create one, add the
518
0
     * client, and add registration to hash.
519
0
     */
520
0
    reg = opq_reg_alloc(info.type);
521
0
    client = opq_client_alloc(&info);
522
0
523
0
    if (IS_ZEBRA_DEBUG_RECV)
524
0
      zlog_debug("%s: client %s registers for new reg %u",
525
0
           __func__,
526
0
           opq_client2str(buf, sizeof(buf), client),
527
0
           info.type);
528
0
529
0
    reg->clients = client;
530
0
531
0
    opq_regh_add(&opq_reg_hash, reg);
532
0
  }
533
0
534
0
done:
535
0
536
0
  stream_free(msg);
537
0
  return ret;
538
0
}
539
540
/*
541
 * Process a register/unregister message
542
 */
543
static int handle_opq_unregistration(const struct zmsghdr *hdr,
544
             struct stream *msg)
545
0
{
546
0
  int ret = 0;
547
0
  struct zapi_opaque_reg_info info;
548
0
  struct opq_client_reg *client;
549
0
  struct opq_msg_reg key, *reg;
550
0
  char buf[50];
551
0
552
0
  memset(&info, 0, sizeof(info));
553
0
554
0
  if (zapi_opaque_reg_decode(msg, &info) < 0) {
555
0
    ret = -1;
556
0
    goto done;
557
0
  }
558
0
559
0
  memset(&key, 0, sizeof(key));
560
0
561
0
  key.type = info.type;
562
0
563
0
  reg = opq_regh_find(&opq_reg_hash, &key);
564
0
  if (reg == NULL) {
565
0
    /* Weird: unregister for unknown message? */
566
0
    if (IS_ZEBRA_DEBUG_RECV)
567
0
      zlog_debug("%s: unknown client %s/%u/%u unregisters for unknown type %u",
568
0
           __func__,
569
0
           zebra_route_string(info.proto),
570
0
           info.instance, info.session_id, info.type);
571
0
    goto done;
572
0
  }
573
0
574
0
  /* Look for client */
575
0
  for (client = reg->clients; client != NULL;
576
0
       client = client->next) {
577
0
    if (opq_client_match(client, &info))
578
0
      break;
579
0
  }
580
0
581
0
  if (client == NULL) {
582
0
    /* Oops - unregister for unknown client? */
583
0
    if (IS_ZEBRA_DEBUG_RECV)
584
0
      zlog_debug("%s: unknown client %s/%u/%u unregisters for %u",
585
0
           __func__, zebra_route_string(info.proto),
586
0
           info.instance, info.session_id, info.type);
587
0
    goto done;
588
0
  }
589
0
590
0
  if (IS_ZEBRA_DEBUG_RECV)
591
0
    zlog_debug("%s: client %s unregisters for %u",
592
0
         __func__, opq_client2str(buf, sizeof(buf), client),
593
0
         info.type);
594
0
595
0
  if (client->prev)
596
0
    client->prev->next = client->next;
597
0
  if (client->next)
598
0
    client->next->prev = client->prev;
599
0
  if (reg->clients == client)
600
0
    reg->clients = client->next;
601
0
602
0
  opq_client_free(&client);
603
0
604
0
  /* Is registration empty now? */
605
0
  if (reg->clients == NULL) {
606
0
    opq_regh_del(&opq_reg_hash, reg);
607
0
    opq_reg_free(&reg);
608
0
  }
609
0
610
0
done:
611
0
612
0
  stream_free(msg);
613
0
  return ret;
614
0
}
615
616
/* Compare utility for registered clients */
617
static bool opq_client_match(const struct opq_client_reg *client,
618
           const struct zapi_opaque_reg_info *info)
619
0
{
620
0
  if (client->proto == info->proto &&
621
0
      client->instance == info->instance &&
622
0
      client->session_id == info->session_id)
623
0
    return true;
624
0
  else
625
0
    return false;
626
0
}
627
628
static struct opq_msg_reg *opq_reg_lookup(uint32_t type)
629
0
{
630
0
  struct opq_msg_reg key, *reg;
631
0
632
0
  memset(&key, 0, sizeof(key));
633
0
634
0
  key.type = type;
635
0
636
0
  reg = opq_regh_find(&opq_reg_hash, &key);
637
0
638
0
  return reg;
639
0
}
640
641
static struct opq_msg_reg *opq_reg_alloc(uint32_t type)
642
0
{
643
0
  struct opq_msg_reg *reg;
644
0
645
0
  reg = XCALLOC(MTYPE_OPQ, sizeof(struct opq_msg_reg));
646
0
647
0
  reg->type = type;
648
0
  INIT_HASH(&reg->item);
649
0
650
0
  return reg;
651
0
}
652
653
static void opq_reg_free(struct opq_msg_reg **reg)
654
0
{
655
0
  XFREE(MTYPE_OPQ, (*reg));
656
0
}
657
658
static struct opq_client_reg *opq_client_alloc(
659
  const struct zapi_opaque_reg_info *info)
660
0
{
661
0
  struct opq_client_reg *client;
662
0
663
0
  client = XCALLOC(MTYPE_OPQ, sizeof(struct opq_client_reg));
664
0
665
0
  client->proto = info->proto;
666
0
  client->instance = info->instance;
667
0
  client->session_id = info->session_id;
668
0
669
0
  return client;
670
0
}
671
672
static void opq_client_free(struct opq_client_reg **client)
673
0
{
674
0
  XFREE(MTYPE_OPQ, (*client));
675
0
}
676
677
static const char *opq_client2str(char *buf, size_t buflen,
678
          const struct opq_client_reg *client)
679
0
{
680
0
  char sbuf[20];
681
0
682
0
  snprintf(buf, buflen, "%s/%u", zebra_route_string(client->proto),
683
0
     client->instance);
684
0
  if (client->session_id > 0) {
685
0
    snprintf(sbuf, sizeof(sbuf), "/%u", client->session_id);
686
0
    strlcat(buf, sbuf, buflen);
687
0
  }
688
0
689
0
  return buf;
690
0
}
691
692
/* Hash function for clients registered for messages */
693
static uint32_t registration_hash(const struct opq_msg_reg *reg)
694
0
{
695
0
  return reg->type;
696
0
}
697
698
/* Comparison function for client registrations */
699
static int registration_compare(const struct opq_msg_reg *reg1,
700
        const struct opq_msg_reg *reg2)
701
0
{
702
0
  if (reg1->type == reg2->type)
703
0
    return 0;
704
0
  else
705
0
    return -1;
706
0
}