Coverage Report

Created: 2025-10-23 06:55

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/frr/lib/mgmt_msg.c
Line
Count
Source
1
// SPDX-License-Identifier: GPL-2.0-or-later
2
/*
3
 * March 6 2023, Christian Hopps <chopps@labn.net>
4
 *
5
 * Copyright (C) 2021  Vmware, Inc.
6
 *           Pushpasis Sarkar <spushpasis@vmware.com>
7
 * Copyright (c) 2023, LabN Consulting, L.L.C.
8
 */
9
#include <zebra.h>
10
#include "debug.h"
11
#include "network.h"
12
#include "sockopt.h"
13
#include "stream.h"
14
#include "frrevent.h"
15
#include "mgmt_msg.h"
16
17
18
#define MGMT_MSG_DBG(dbgtag, fmt, ...)                                         \
19
0
  do {                                                                   \
20
0
    if (dbgtag)                                                    \
21
0
      zlog_debug("%s: %s: " fmt, dbgtag, __func__,           \
22
0
           ##__VA_ARGS__);                             \
23
0
  } while (0)
24
25
#define MGMT_MSG_ERR(ms, fmt, ...)                                             \
26
0
  zlog_err("%s: %s: " fmt, (ms)->idtag, __func__, ##__VA_ARGS__)
27
28
8
DEFINE_MTYPE(LIB, MSG_CONN, "msg connection state");
29
8
30
8
/**
31
8
 * Read data from a socket into streams containing 1 or more full msgs headed by
32
8
 * mgmt_msg_hdr which contain API messages (currently protobuf).
33
8
 *
34
8
 * Args:
35
8
 *  ms: mgmt_msg_state for this process.
36
8
 *  fd: socket/file to read data from.
37
8
 *  debug: true to enable debug logging.
38
8
 *
39
8
 * Returns:
40
8
 *  MPP_DISCONNECT - socket should be closed and connect retried.
41
8
 *  MSV_SCHED_STREAM - this call should be rescheduled to run.
42
8
 *  MPP_SCHED_BOTH - this call and the procmsg buf should be scheduled to
43
8
 *run.
44
8
 */
45
8
enum mgmt_msg_rsched mgmt_msg_read(struct mgmt_msg_state *ms, int fd,
46
8
           bool debug)
47
8
{
48
0
  const char *dbgtag = debug ? ms->idtag : NULL;
49
0
  size_t avail = STREAM_WRITEABLE(ms->ins);
50
0
  struct mgmt_msg_hdr *mhdr = NULL;
51
0
  size_t total = 0;
52
0
  size_t mcount = 0;
53
0
  ssize_t n, left;
54
55
0
  assert(ms && fd != -1);
56
57
  /*
58
   * Read as much as we can into the stream.
59
   */
60
0
  while (avail > sizeof(struct mgmt_msg_hdr)) {
61
0
    n = stream_read_try(ms->ins, fd, avail);
62
63
    /* -2 is normal nothing read, and to retry */
64
0
    if (n == -2) {
65
0
      MGMT_MSG_DBG(dbgtag, "nothing more to read");
66
0
      break;
67
0
    }
68
0
    if (n <= 0) {
69
0
      if (n == 0)
70
0
        MGMT_MSG_ERR(ms, "got EOF/disconnect");
71
0
      else
72
0
        MGMT_MSG_ERR(ms,
73
0
               "got error while reading: '%s'",
74
0
               safe_strerror(errno));
75
0
      return MSR_DISCONNECT;
76
0
    }
77
0
    MGMT_MSG_DBG(dbgtag, "read %zd bytes", n);
78
0
    ms->nrxb += n;
79
0
    avail -= n;
80
0
  }
81
82
  /*
83
   * Check if we have read a complete messages or not.
84
   */
85
0
  assert(stream_get_getp(ms->ins) == 0);
86
0
  left = stream_get_endp(ms->ins);
87
0
  while (left > (long)sizeof(struct mgmt_msg_hdr)) {
88
0
    mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(ms->ins) + total);
89
0
    if (!MGMT_MSG_IS_MARKER(mhdr->marker)) {
90
0
      MGMT_MSG_DBG(dbgtag, "recv corrupt buffer, disconnect");
91
0
      return MSR_DISCONNECT;
92
0
    }
93
0
    if ((ssize_t)mhdr->len > left)
94
0
      break;
95
96
0
    MGMT_MSG_DBG(dbgtag, "read full message len %u", mhdr->len);
97
0
    total += mhdr->len;
98
0
    left -= mhdr->len;
99
0
    mcount++;
100
0
  }
101
102
0
  if (!mcount)
103
0
    return MSR_SCHED_STREAM;
104
105
  /*
106
   * We have read at least one message into the stream, queue it up.
107
   */
108
0
  mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(ms->ins) + total);
109
0
  stream_set_endp(ms->ins, total);
110
0
  stream_fifo_push(&ms->inq, ms->ins);
111
0
  ms->ins = stream_new(ms->max_msg_sz);
112
0
  if (left) {
113
0
    stream_put(ms->ins, mhdr, left);
114
0
    stream_set_endp(ms->ins, left);
115
0
  }
116
117
0
  return MSR_SCHED_BOTH;
118
0
}
119
120
/**
121
 * Process streams containing whole messages that have been pushed onto the
122
 * FIFO. This should be called from an event/timer handler and should be
123
 * reschedulable.
124
 *
125
 * Args:
126
 *  ms: mgmt_msg_state for this process.
127
 *  handle_mgs: function to call for each received message.
128
 *  user: opaque value passed through to handle_msg.
129
 *  debug: true to enable debug logging.
130
 *
131
 * Returns:
132
 *  true if more to process (so reschedule) else false
133
 */
134
bool mgmt_msg_procbufs(struct mgmt_msg_state *ms,
135
           void (*handle_msg)(uint8_t version, uint8_t *msg,
136
            size_t msglen, void *user),
137
           void *user, bool debug)
138
0
{
139
0
  const char *dbgtag = debug ? ms->idtag : NULL;
140
0
  struct mgmt_msg_hdr *mhdr;
141
0
  struct stream *work;
142
0
  uint8_t *data;
143
0
  size_t left, nproc;
144
145
0
  MGMT_MSG_DBG(dbgtag, "Have %zu streams to process", ms->inq.count);
146
147
0
  nproc = 0;
148
0
  while (nproc < ms->max_read_buf) {
149
0
    work = stream_fifo_pop(&ms->inq);
150
0
    if (!work)
151
0
      break;
152
153
0
    data = STREAM_DATA(work);
154
0
    left = stream_get_endp(work);
155
0
    MGMT_MSG_DBG(dbgtag, "Processing stream of len %zu", left);
156
157
0
    for (; left > sizeof(struct mgmt_msg_hdr);
158
0
         left -= mhdr->len, data += mhdr->len) {
159
0
      mhdr = (struct mgmt_msg_hdr *)data;
160
161
0
      assert(MGMT_MSG_IS_MARKER(mhdr->marker));
162
0
      assert(left >= mhdr->len);
163
164
0
      handle_msg(MGMT_MSG_MARKER_VERSION(mhdr->marker),
165
0
           (uint8_t *)(mhdr + 1),
166
0
           mhdr->len - sizeof(struct mgmt_msg_hdr),
167
0
           user);
168
0
      ms->nrxm++;
169
0
      nproc++;
170
0
    }
171
172
0
    if (work != ms->ins)
173
0
      stream_free(work); /* Free it up */
174
0
    else
175
0
      stream_reset(work); /* Reset stream for next read */
176
0
  }
177
178
  /* return true if should reschedule b/c more to process. */
179
0
  return stream_fifo_head(&ms->inq) != NULL;
180
0
}
181
182
/**
183
 * Write data from a onto the socket, using streams that have been queued for
184
 * sending by mgmt_msg_send_msg. This function should be reschedulable.
185
 *
186
 * Args:
187
 *  ms: mgmt_msg_state for this process.
188
 *  fd: socket/file to read data from.
189
 *  debug: true to enable debug logging.
190
 *
191
 * Returns:
192
 *  MSW_SCHED_NONE - do not reschedule anything.
193
 *  MSW_SCHED_STREAM - this call should be rescheduled to run again.
194
 *  MSW_SCHED_WRITES_OFF - writes should be disabled with a timer to
195
 *      re-enable them a short time later
196
 *  MSW_DISCONNECT - socket should be closed and reconnect retried.
197
 *run.
198
 */
199
enum mgmt_msg_wsched mgmt_msg_write(struct mgmt_msg_state *ms, int fd,
200
            bool debug)
201
0
{
202
0
  const char *dbgtag = debug ? ms->idtag : NULL;
203
0
  struct stream *s;
204
0
  size_t nproc = 0;
205
0
  ssize_t left;
206
0
  ssize_t n;
207
208
0
  if (ms->outs) {
209
0
    MGMT_MSG_DBG(dbgtag,
210
0
           "found unqueued stream with %zu bytes, queueing",
211
0
           stream_get_endp(ms->outs));
212
0
    stream_fifo_push(&ms->outq, ms->outs);
213
0
    ms->outs = NULL;
214
0
  }
215
216
0
  for (s = stream_fifo_head(&ms->outq); s && nproc < ms->max_write_buf;
217
0
       s = stream_fifo_head(&ms->outq)) {
218
0
    left = STREAM_READABLE(s);
219
0
    assert(left);
220
221
0
    n = stream_flush(s, fd);
222
0
    if (n <= 0) {
223
0
      if (n == 0)
224
0
        MGMT_MSG_ERR(ms,
225
0
               "connection closed while writing");
226
0
      else if (ERRNO_IO_RETRY(errno)) {
227
0
        MGMT_MSG_DBG(
228
0
          dbgtag,
229
0
          "retry error while writing %zd bytes: %s (%d)",
230
0
          left, safe_strerror(errno), errno);
231
0
        return MSW_SCHED_STREAM;
232
0
      } else
233
0
        MGMT_MSG_ERR(
234
0
          ms,
235
0
          "error while writing %zd bytes: %s (%d)",
236
0
          left, safe_strerror(errno), errno);
237
238
0
      n = mgmt_msg_reset_writes(ms);
239
0
      MGMT_MSG_DBG(dbgtag, "drop and freed %zd streams", n);
240
241
0
      return MSW_DISCONNECT;
242
0
    }
243
244
0
    ms->ntxb += n;
245
0
    if (n != left) {
246
0
      MGMT_MSG_DBG(dbgtag, "short stream write %zd of %zd", n,
247
0
             left);
248
0
      stream_forward_getp(s, n);
249
0
      return MSW_SCHED_STREAM;
250
0
    }
251
252
0
    stream_free(stream_fifo_pop(&ms->outq));
253
0
    MGMT_MSG_DBG(dbgtag, "wrote stream of %zd bytes", n);
254
0
    nproc++;
255
0
  }
256
0
  if (s) {
257
0
    MGMT_MSG_DBG(
258
0
      dbgtag,
259
0
      "reached %zu buffer writes, pausing with %zu streams left",
260
0
      ms->max_write_buf, ms->outq.count);
261
0
    return MSW_SCHED_STREAM;
262
0
  }
263
0
  MGMT_MSG_DBG(dbgtag, "flushed all streams from output q");
264
0
  return MSW_SCHED_NONE;
265
0
}
266
267
268
/**
269
 * Send a message by enqueueing it to be written over the socket by
270
 * mgmt_msg_write.
271
 *
272
 * Args:
273
 *  ms: mgmt_msg_state for this process.
274
 *  version: version of this message, will be given to receiving side.
275
 *  msg: the message to be sent.
276
 *  len: the length of the message.
277
 *  packf: a function to pack the message.
278
 *  debug: true to enable debug logging.
279
 *
280
 * Returns:
281
 *      0 on success, otherwise -1 on failure. The only failure mode is if a
282
 *      the message exceeds the maximum message size configured on init.
283
 */
284
int mgmt_msg_send_msg(struct mgmt_msg_state *ms, uint8_t version, void *msg,
285
          size_t len, size_t (*packf)(void *msg, void *buf),
286
          bool debug)
287
0
{
288
0
  const char *dbgtag = debug ? ms->idtag : NULL;
289
0
  struct mgmt_msg_hdr *mhdr;
290
0
  struct stream *s;
291
0
  uint8_t *dstbuf;
292
0
  size_t endp, n;
293
0
  size_t mlen = len + sizeof(*mhdr);
294
295
0
  if (mlen > ms->max_msg_sz) {
296
0
    MGMT_MSG_ERR(ms, "Message %zu > max size %zu, dropping", mlen,
297
0
           ms->max_msg_sz);
298
0
    return -1;
299
0
  }
300
301
0
  if (!ms->outs) {
302
0
    MGMT_MSG_DBG(dbgtag, "creating new stream for msg len %zu",
303
0
           len);
304
0
    ms->outs = stream_new(ms->max_msg_sz);
305
0
  } else if (STREAM_WRITEABLE(ms->outs) < mlen) {
306
0
    MGMT_MSG_DBG(
307
0
      dbgtag,
308
0
      "enq existing stream len %zu and creating new stream for msg len %zu",
309
0
      STREAM_WRITEABLE(ms->outs), mlen);
310
0
    stream_fifo_push(&ms->outq, ms->outs);
311
0
    ms->outs = stream_new(ms->max_msg_sz);
312
0
  } else {
313
0
    MGMT_MSG_DBG(
314
0
      dbgtag,
315
0
      "using existing stream with avail %zu for msg len %zu",
316
0
      STREAM_WRITEABLE(ms->outs), mlen);
317
0
  }
318
0
  s = ms->outs;
319
320
  /* We have a stream with space, pack the message into it. */
321
0
  mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(s) + s->endp);
322
0
  mhdr->marker = MGMT_MSG_MARKER(version);
323
0
  mhdr->len = mlen;
324
0
  stream_forward_endp(s, sizeof(*mhdr));
325
0
  endp = stream_get_endp(s);
326
0
  dstbuf = STREAM_DATA(s) + endp;
327
0
  if (packf)
328
0
    n = packf(msg, dstbuf);
329
0
  else {
330
0
    memcpy(dstbuf, msg, len);
331
0
    n = len;
332
0
  }
333
0
  stream_set_endp(s, endp + n);
334
0
  ms->ntxm++;
335
336
0
  return 0;
337
0
}
338
339
/**
340
 * Create and open a unix domain stream socket on the given path
341
 * setting non-blocking and send and receive buffer sizes.
342
 *
343
 * Args:
344
 *  path: path of unix domain socket to connect to.
345
 *  sendbuf: size of socket send buffer.
346
 *  recvbuf: size of socket receive buffer.
347
 *  dbgtag: if non-NULL enable log debug, and use this tag.
348
 *
349
 * Returns:
350
 *  socket fd or -1 on error.
351
 */
352
int mgmt_msg_connect(const char *path, size_t sendbuf, size_t recvbuf,
353
         const char *dbgtag)
354
0
{
355
0
  int ret, sock, len;
356
0
  struct sockaddr_un addr;
357
358
0
  MGMT_MSG_DBG(dbgtag, "connecting to server on %s", path);
359
0
  sock = socket(AF_UNIX, SOCK_STREAM, 0);
360
0
  if (sock < 0) {
361
0
    MGMT_MSG_DBG(dbgtag, "socket failed: %s", safe_strerror(errno));
362
0
    return -1;
363
0
  }
364
365
0
  memset(&addr, 0, sizeof(struct sockaddr_un));
366
0
  addr.sun_family = AF_UNIX;
367
0
  strlcpy(addr.sun_path, path, sizeof(addr.sun_path));
368
#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
369
  len = addr.sun_len = SUN_LEN(&addr);
370
#else
371
0
  len = sizeof(addr.sun_family) + strlen(addr.sun_path);
372
0
#endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */
373
0
  ret = connect(sock, (struct sockaddr *)&addr, len);
374
0
  if (ret < 0) {
375
0
    MGMT_MSG_DBG(dbgtag, "failed to connect on %s: %s", path,
376
0
           safe_strerror(errno));
377
0
    close(sock);
378
0
    return -1;
379
0
  }
380
381
0
  MGMT_MSG_DBG(dbgtag, "connected to server on %s", path);
382
0
  set_nonblocking(sock);
383
0
  setsockopt_so_sendbuf(sock, sendbuf);
384
0
  setsockopt_so_recvbuf(sock, recvbuf);
385
0
  return sock;
386
0
}
387
388
/**
389
 * Reset the sending queue, by dequeueing all streams and freeing them. Return
390
 * the number of streams freed.
391
 *
392
 * Args:
393
 *  ms: mgmt_msg_state for this process.
394
 *
395
 * Returns:
396
 *      Number of streams that were freed.
397
 *
398
 */
399
size_t mgmt_msg_reset_writes(struct mgmt_msg_state *ms)
400
0
{
401
0
  struct stream *s;
402
0
  size_t nproc = 0;
403
404
0
  for (s = stream_fifo_pop(&ms->outq); s;
405
0
       s = stream_fifo_pop(&ms->outq), nproc++)
406
0
    stream_free(s);
407
408
0
  return nproc;
409
0
}
410
411
412
void mgmt_msg_init(struct mgmt_msg_state *ms, size_t max_read_buf,
413
       size_t max_write_buf, size_t max_msg_sz, const char *idtag)
414
0
{
415
0
  memset(ms, 0, sizeof(*ms));
416
0
  ms->ins = stream_new(max_msg_sz);
417
0
  stream_fifo_init(&ms->inq);
418
0
  stream_fifo_init(&ms->outq);
419
0
  ms->max_read_buf = max_write_buf;
420
0
  ms->max_write_buf = max_read_buf;
421
0
  ms->max_msg_sz = max_msg_sz;
422
0
  ms->idtag = strdup(idtag);
423
0
}
424
425
void mgmt_msg_destroy(struct mgmt_msg_state *ms)
426
0
{
427
0
  mgmt_msg_reset_writes(ms);
428
0
  if (ms->ins)
429
0
    stream_free(ms->ins);
430
0
  free(ms->idtag);
431
0
}
432
433
/*
434
 * Connections
435
 */
436
437
#define MSG_CONN_DEFAULT_CONN_RETRY_MSEC 250
438
0
#define MSG_CONN_SEND_BUF_SIZE (1u << 16)
439
0
#define MSG_CONN_RECV_BUF_SIZE (1u << 16)
440
441
static void msg_client_sched_connect(struct msg_client *client,
442
             unsigned long msec);
443
444
static void msg_conn_sched_proc_msgs(struct msg_conn *conn);
445
static void msg_conn_sched_read(struct msg_conn *conn);
446
static void msg_conn_sched_write(struct msg_conn *conn);
447
448
static void msg_conn_write(struct event *thread)
449
0
{
450
0
  struct msg_conn *conn = EVENT_ARG(thread);
451
0
  enum mgmt_msg_wsched rv;
452
0
453
0
  rv = mgmt_msg_write(&conn->mstate, conn->fd, conn->debug);
454
0
  if (rv == MSW_SCHED_STREAM)
455
0
    msg_conn_sched_write(conn);
456
0
  else if (rv == MSW_DISCONNECT)
457
0
    msg_conn_disconnect(conn, conn->is_client);
458
0
  else
459
0
    assert(rv == MSW_SCHED_NONE);
460
0
}
461
462
static void msg_conn_read(struct event *thread)
463
0
{
464
0
  struct msg_conn *conn = EVENT_ARG(thread);
465
0
  enum mgmt_msg_rsched rv;
466
0
467
0
  rv = mgmt_msg_read(&conn->mstate, conn->fd, conn->debug);
468
0
  if (rv == MSR_DISCONNECT) {
469
0
    msg_conn_disconnect(conn, conn->is_client);
470
0
    return;
471
0
  }
472
0
  if (rv == MSR_SCHED_BOTH)
473
0
    msg_conn_sched_proc_msgs(conn);
474
0
  msg_conn_sched_read(conn);
475
0
}
476
477
/* collapse this into mgmt_msg_procbufs */
478
static void msg_conn_proc_msgs(struct event *thread)
479
0
{
480
0
  struct msg_conn *conn = EVENT_ARG(thread);
481
0
482
0
  if (mgmt_msg_procbufs(&conn->mstate,
483
0
            (void (*)(uint8_t, uint8_t *, size_t,
484
0
          void *))conn->handle_msg,
485
0
            conn, conn->debug))
486
0
    /* there's more, schedule handling more */
487
0
    msg_conn_sched_proc_msgs(conn);
488
0
}
489
490
static void msg_conn_sched_read(struct msg_conn *conn)
491
0
{
492
0
  event_add_read(conn->loop, msg_conn_read, conn, conn->fd,
493
0
           &conn->read_ev);
494
0
}
495
496
static void msg_conn_sched_write(struct msg_conn *conn)
497
0
{
498
0
  event_add_write(conn->loop, msg_conn_write, conn, conn->fd,
499
0
      &conn->write_ev);
500
0
}
501
502
static void msg_conn_sched_proc_msgs(struct msg_conn *conn)
503
0
{
504
0
  event_add_event(conn->loop, msg_conn_proc_msgs, conn, 0,
505
0
      &conn->proc_msg_ev);
506
0
}
507
508
509
void msg_conn_disconnect(struct msg_conn *conn, bool reconnect)
510
{
511
512
  /* disconnect short-circuit if present */
513
  if (conn->remote_conn) {
514
    conn->remote_conn->remote_conn = NULL;
515
    conn->remote_conn = NULL;
516
  }
517
518
  if (conn->fd != -1) {
519
    close(conn->fd);
520
    conn->fd = -1;
521
522
    /* Notify client through registered callback (if any) */
523
    if (conn->notify_disconnect)
524
      (void)(*conn->notify_disconnect)(conn);
525
  }
526
527
  if (reconnect) {
528
    assert(conn->is_client);
529
    msg_client_sched_connect(
530
      container_of(conn, struct msg_client, conn),
531
      MSG_CONN_DEFAULT_CONN_RETRY_MSEC);
532
  }
533
}
534
535
int msg_conn_send_msg(struct msg_conn *conn, uint8_t version, void *msg,
536
          size_t mlen, size_t (*packf)(void *, void *),
537
          bool short_circuit_ok)
538
0
{
539
0
  const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL;
540
541
0
  if (conn->fd == -1) {
542
0
    MGMT_MSG_ERR(&conn->mstate,
543
0
           "can't send message on closed connection");
544
0
    return -1;
545
0
  }
546
547
  /* immediately handle the message if short-circuit is present */
548
0
  if (conn->remote_conn && short_circuit_ok) {
549
0
    uint8_t *buf = msg;
550
0
    size_t n = mlen;
551
0
    bool old;
552
553
0
    if (packf) {
554
0
      buf = XMALLOC(MTYPE_TMP, mlen);
555
0
      n = packf(msg, buf);
556
0
    }
557
558
0
    ++conn->short_circuit_depth;
559
0
    MGMT_MSG_DBG(dbgtag, "SC send: depth %u msg: %p",
560
0
           conn->short_circuit_depth, msg);
561
562
0
    old = conn->remote_conn->is_short_circuit;
563
0
    conn->remote_conn->is_short_circuit = true;
564
0
    conn->remote_conn->handle_msg(version, buf, n,
565
0
                conn->remote_conn);
566
0
    conn->remote_conn->is_short_circuit = old;
567
568
0
    --conn->short_circuit_depth;
569
0
    MGMT_MSG_DBG(dbgtag, "SC return from depth: %u msg: %p",
570
0
           conn->short_circuit_depth, msg);
571
572
0
    if (packf)
573
0
      XFREE(MTYPE_TMP, buf);
574
0
    return 0;
575
0
  }
576
577
0
  int rv = mgmt_msg_send_msg(&conn->mstate, version, msg, mlen, packf,
578
0
           conn->debug);
579
580
0
  msg_conn_sched_write(conn);
581
582
0
  return rv;
583
0
}
584
585
void msg_conn_cleanup(struct msg_conn *conn)
586
0
{
587
0
  struct mgmt_msg_state *ms = &conn->mstate;
588
589
  /* disconnect short-circuit if present */
590
0
  if (conn->remote_conn) {
591
0
    conn->remote_conn->remote_conn = NULL;
592
0
    conn->remote_conn = NULL;
593
0
  }
594
595
0
  if (conn->fd != -1) {
596
0
    close(conn->fd);
597
0
    conn->fd = -1;
598
0
  }
599
600
0
  EVENT_OFF(conn->read_ev);
601
0
  EVENT_OFF(conn->write_ev);
602
0
  EVENT_OFF(conn->proc_msg_ev);
603
604
0
  mgmt_msg_destroy(ms);
605
0
}
606
607
/*
608
 * Client Connections
609
 */
610
611
0
DECLARE_LIST(msg_server_list, struct msg_server, link);
612
613
static struct msg_server_list_head msg_servers;
614
615
static void msg_client_connect(struct msg_client *conn);
616
617
static void msg_client_connect_timer(struct event *thread)
618
0
{
619
0
  msg_client_connect(EVENT_ARG(thread));
620
0
}
621
622
static void msg_client_sched_connect(struct msg_client *client,
623
             unsigned long msec)
624
0
{
625
0
  struct msg_conn *conn = &client->conn;
626
0
  const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL;
627
628
0
  MGMT_MSG_DBG(dbgtag, "connection retry in %lu msec", msec);
629
0
  if (msec)
630
0
    event_add_timer_msec(conn->loop, msg_client_connect_timer,
631
0
             client, msec, &client->conn_retry_tmr);
632
0
  else
633
0
    event_add_event(conn->loop, msg_client_connect_timer, client, 0,
634
0
        &client->conn_retry_tmr);
635
0
}
636
637
static bool msg_client_connect_short_circuit(struct msg_client *client)
638
0
{
639
0
  struct msg_conn *server_conn;
640
0
  struct msg_server *server;
641
0
  const char *dbgtag =
642
0
    client->conn.debug ? client->conn.mstate.idtag : NULL;
643
0
  union sockunion su = {0};
644
0
  int sockets[2];
645
0
646
0
  frr_each (msg_server_list, &msg_servers, server)
647
0
    if (!strcmp(server->sopath, client->sopath))
648
0
      break;
649
0
  if (!server) {
650
0
    MGMT_MSG_DBG(dbgtag,
651
0
           "no short-circuit connection available for %s",
652
0
           client->sopath);
653
0
654
0
    return false;
655
0
  }
656
0
657
0
  if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets)) {
658
0
    MGMT_MSG_ERR(
659
0
      &client->conn.mstate,
660
0
      "socketpair failed trying to short-circuit connection on %s: %s",
661
0
      client->sopath, safe_strerror(errno));
662
0
    return false;
663
0
  }
664
0
665
0
  /* client side */
666
0
  client->conn.fd = sockets[0];
667
0
  set_nonblocking(sockets[0]);
668
0
  setsockopt_so_sendbuf(sockets[0], client->conn.mstate.max_write_buf);
669
0
  setsockopt_so_recvbuf(sockets[0], client->conn.mstate.max_read_buf);
670
0
671
0
  /* server side */
672
0
  memset(&su, 0, sizeof(union sockunion));
673
0
  server_conn = server->create(sockets[1], &su);
674
0
675
0
  client->conn.remote_conn = server_conn;
676
0
  server_conn->remote_conn = &client->conn;
677
0
678
0
  MGMT_MSG_DBG(
679
0
    dbgtag,
680
0
    "short-circuit connection on %s server %s:%d to client %s:%d",
681
0
    client->sopath, server_conn->mstate.idtag, server_conn->fd,
682
0
    client->conn.mstate.idtag, client->conn.fd);
683
0
684
0
  MGMT_MSG_DBG(
685
0
    server_conn->debug ? server_conn->mstate.idtag : NULL,
686
0
    "short-circuit connection on %s client %s:%d to server %s:%d",
687
0
    client->sopath, client->conn.mstate.idtag, client->conn.fd,
688
0
    server_conn->mstate.idtag, server_conn->fd);
689
0
690
0
  return true;
691
0
}
692
693
694
/* Connect and start reading from the socket */
695
static void msg_client_connect(struct msg_client *client)
696
0
{
697
0
  struct msg_conn *conn = &client->conn;
698
0
  const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL;
699
0
700
0
  if (!client->short_circuit_ok ||
701
0
      !msg_client_connect_short_circuit(client))
702
0
    conn->fd =
703
0
      mgmt_msg_connect(client->sopath, MSG_CONN_SEND_BUF_SIZE,
704
0
           MSG_CONN_RECV_BUF_SIZE, dbgtag);
705
0
706
0
  if (conn->fd == -1)
707
0
    /* retry the connection */
708
0
    msg_client_sched_connect(client,
709
0
           MSG_CONN_DEFAULT_CONN_RETRY_MSEC);
710
0
  else if (client->notify_connect && client->notify_connect(client))
711
0
    /* client connect notify failed */
712
0
    msg_conn_disconnect(conn, true);
713
0
  else
714
0
    /* start reading */
715
0
    msg_conn_sched_read(conn);
716
0
}
717
718
void msg_client_init(struct msg_client *client, struct event_loop *tm,
719
         const char *sopath,
720
         int (*notify_connect)(struct msg_client *client),
721
         int (*notify_disconnect)(struct msg_conn *client),
722
         void (*handle_msg)(uint8_t version, uint8_t *data,
723
          size_t len, struct msg_conn *client),
724
         size_t max_read_buf, size_t max_write_buf,
725
         size_t max_msg_sz, bool short_circuit_ok,
726
         const char *idtag, bool debug)
727
0
{
728
0
  struct msg_conn *conn = &client->conn;
729
0
  memset(client, 0, sizeof(*client));
730
731
0
  conn->loop = tm;
732
0
  conn->fd = -1;
733
0
  conn->handle_msg = handle_msg;
734
0
  conn->notify_disconnect = notify_disconnect;
735
0
  conn->is_client = true;
736
0
  conn->debug = debug;
737
0
  client->short_circuit_ok = short_circuit_ok;
738
0
  client->sopath = strdup(sopath);
739
0
  client->notify_connect = notify_connect;
740
741
0
  mgmt_msg_init(&conn->mstate, max_read_buf, max_write_buf, max_msg_sz,
742
0
          idtag);
743
744
  /* XXX maybe just have client kick this off */
745
  /* Start trying to connect to server */
746
0
  msg_client_sched_connect(client, 0);
747
0
}
748
749
void msg_client_cleanup(struct msg_client *client)
750
0
{
751
0
  assert(client->conn.is_client);
752
753
0
  EVENT_OFF(client->conn_retry_tmr);
754
0
  free(client->sopath);
755
756
0
  msg_conn_cleanup(&client->conn);
757
0
}
758
759
760
/*
761
 * Server-side connections
762
 */
763
764
static void msg_server_accept(struct event *event)
765
0
{
766
0
  struct msg_server *server = EVENT_ARG(event);
767
0
  int fd;
768
0
  union sockunion su;
769
0
770
0
  if (server->fd < 0)
771
0
    return;
772
0
773
0
  /* We continue hearing server listen socket. */
774
0
  event_add_read(server->loop, msg_server_accept, server, server->fd,
775
0
           &server->listen_ev);
776
0
777
0
  memset(&su, 0, sizeof(union sockunion));
778
0
779
0
  /* We can handle IPv4 or IPv6 socket. */
780
0
  fd = sockunion_accept(server->fd, &su);
781
0
  if (fd < 0) {
782
0
    zlog_err("Failed to accept %s client connection: %s",
783
0
       server->idtag, safe_strerror(errno));
784
0
    return;
785
0
  }
786
0
  set_nonblocking(fd);
787
0
  set_cloexec(fd);
788
0
789
0
  DEBUGD(server->debug, "Accepted new %s connection", server->idtag);
790
0
791
0
  server->create(fd, &su);
792
0
}
793
794
int msg_server_init(struct msg_server *server, const char *sopath,
795
        struct event_loop *loop,
796
        struct msg_conn *(*create)(int fd, union sockunion *su),
797
        const char *idtag, struct debug *debug)
798
0
{
799
0
  int ret;
800
0
  int sock;
801
0
  struct sockaddr_un addr;
802
0
  mode_t old_mask;
803
804
0
  memset(server, 0, sizeof(*server));
805
0
  server->fd = -1;
806
807
0
  sock = socket(AF_UNIX, SOCK_STREAM, PF_UNSPEC);
808
0
  if (sock < 0) {
809
0
    zlog_err("Failed to create %s server socket: %s", server->idtag,
810
0
       safe_strerror(errno));
811
0
    goto fail;
812
0
  }
813
814
0
  addr.sun_family = AF_UNIX,
815
0
  strlcpy(addr.sun_path, sopath, sizeof(addr.sun_path));
816
0
  unlink(addr.sun_path);
817
0
  old_mask = umask(0077);
818
0
  ret = bind(sock, (struct sockaddr *)&addr, sizeof(addr));
819
0
  if (ret < 0) {
820
0
    zlog_err("Failed to bind %s server socket to '%s': %s",
821
0
       server->idtag, addr.sun_path, safe_strerror(errno));
822
0
    umask(old_mask);
823
0
    goto fail;
824
0
  }
825
0
  umask(old_mask);
826
827
0
  ret = listen(sock, MGMTD_MAX_CONN);
828
0
  if (ret < 0) {
829
0
    zlog_err("Failed to listen on %s server socket: %s",
830
0
       server->idtag, safe_strerror(errno));
831
0
    goto fail;
832
0
  }
833
834
0
  server->fd = sock;
835
0
  server->loop = loop;
836
0
  server->sopath = strdup(sopath);
837
0
  server->idtag = strdup(idtag);
838
0
  server->create = create;
839
0
  server->debug = debug;
840
841
0
  msg_server_list_add_head(&msg_servers, server);
842
843
0
  event_add_read(server->loop, msg_server_accept, server, server->fd,
844
0
           &server->listen_ev);
845
846
847
0
  DEBUGD(debug, "Started %s server, listening on %s", idtag, sopath);
848
0
  return 0;
849
850
0
fail:
851
0
  if (sock >= 0)
852
0
    close(sock);
853
0
  server->fd = -1;
854
0
  return -1;
855
0
}
856
857
void msg_server_cleanup(struct msg_server *server)
858
0
{
859
0
  DEBUGD(server->debug, "Closing %s server", server->idtag);
860
861
0
  if (server->listen_ev)
862
0
    EVENT_OFF(server->listen_ev);
863
864
0
  msg_server_list_del(&msg_servers, server);
865
866
0
  if (server->fd >= 0)
867
0
    close(server->fd);
868
0
  free((char *)server->sopath);
869
0
  free((char *)server->idtag);
870
871
0
  memset(server, 0, sizeof(*server));
872
0
  server->fd = -1;
873
0
}
874
875
/*
876
 * Initialize and start reading from the accepted socket
877
 *
878
 *     notify_connect - only called for disconnect i.e., connected == false
879
 */
880
void msg_conn_accept_init(struct msg_conn *conn, struct event_loop *tm, int fd,
881
        int (*notify_disconnect)(struct msg_conn *conn),
882
        void (*handle_msg)(uint8_t version, uint8_t *data,
883
               size_t len, struct msg_conn *conn),
884
        size_t max_read, size_t max_write, size_t max_size,
885
        const char *idtag)
886
0
{
887
0
  conn->loop = tm;
888
0
  conn->fd = fd;
889
0
  conn->notify_disconnect = notify_disconnect;
890
0
  conn->handle_msg = handle_msg;
891
0
  conn->is_client = false;
892
893
0
  mgmt_msg_init(&conn->mstate, max_read, max_write, max_size, idtag);
894
895
  /* start reading */
896
0
  msg_conn_sched_read(conn);
897
898
  /* Make socket non-blocking.  */
899
0
  set_nonblocking(conn->fd);
900
0
  setsockopt_so_sendbuf(conn->fd, MSG_CONN_SEND_BUF_SIZE);
901
0
  setsockopt_so_recvbuf(conn->fd, MSG_CONN_RECV_BUF_SIZE);
902
0
}
903
904
struct msg_conn *
905
msg_server_conn_create(struct event_loop *tm, int fd,
906
           int (*notify_disconnect)(struct msg_conn *conn),
907
           void (*handle_msg)(uint8_t version, uint8_t *data,
908
            size_t len, struct msg_conn *conn),
909
           size_t max_read, size_t max_write, size_t max_size,
910
           void *user, const char *idtag)
911
0
{
912
0
  struct msg_conn *conn = XMALLOC(MTYPE_MSG_CONN, sizeof(*conn));
913
0
  memset(conn, 0, sizeof(*conn));
914
0
  msg_conn_accept_init(conn, tm, fd, notify_disconnect, handle_msg,
915
0
           max_read, max_write, max_size, idtag);
916
0
  conn->user = user;
917
0
  return conn;
918
0
}
919
920
void msg_server_conn_delete(struct msg_conn *conn)
921
0
{
922
0
  if (!conn)
923
0
    return;
924
0
  msg_conn_cleanup(conn);
925
  XFREE(MTYPE_MSG_CONN, conn);
926
0
}