Line  | Count  | Source  | 
1  |  | // SPDX-License-Identifier: GPL-2.0-or-later  | 
2  |  | /*  | 
3  |  |  * March 6 2023, Christian Hopps <chopps@labn.net>  | 
4  |  |  *  | 
5  |  |  * Copyright (C) 2021  Vmware, Inc.  | 
6  |  |  *           Pushpasis Sarkar <spushpasis@vmware.com>  | 
7  |  |  * Copyright (c) 2023, LabN Consulting, L.L.C.  | 
8  |  |  */  | 
9  |  | #include <zebra.h>  | 
10  |  | #include "debug.h"  | 
11  |  | #include "network.h"  | 
12  |  | #include "sockopt.h"  | 
13  |  | #include "stream.h"  | 
14  |  | #include "frrevent.h"  | 
15  |  | #include "mgmt_msg.h"  | 
16  |  |  | 
17  |  |  | 
18  |  | #define MGMT_MSG_DBG(dbgtag, fmt, ...)                                         \  | 
19  | 0  |   do {                                                                   \ | 
20  | 0  |     if (dbgtag)                                                    \  | 
21  | 0  |       zlog_debug("%s: %s: " fmt, dbgtag, __func__,           \ | 
22  | 0  |            ##__VA_ARGS__);                             \  | 
23  | 0  |   } while (0)  | 
24  |  |  | 
25  |  | #define MGMT_MSG_ERR(ms, fmt, ...)                                             \  | 
26  | 0  |   zlog_err("%s: %s: " fmt, (ms)->idtag, __func__, ##__VA_ARGS__) | 
27  |  |  | 
28  | 8  | DEFINE_MTYPE(LIB, MSG_CONN, "msg connection state");  | 
29  | 8  |  | 
30  | 8  | /**  | 
31  | 8  |  * Read data from a socket into streams containing 1 or more full msgs headed by  | 
32  | 8  |  * mgmt_msg_hdr which contain API messages (currently protobuf).  | 
33  | 8  |  *  | 
34  | 8  |  * Args:  | 
35  | 8  |  *  ms: mgmt_msg_state for this process.  | 
36  | 8  |  *  fd: socket/file to read data from.  | 
37  | 8  |  *  debug: true to enable debug logging.  | 
38  | 8  |  *  | 
39  | 8  |  * Returns:  | 
40  | 8  |  *  MPP_DISCONNECT - socket should be closed and connect retried.  | 
41  | 8  |  *  MSV_SCHED_STREAM - this call should be rescheduled to run.  | 
42  | 8  |  *  MPP_SCHED_BOTH - this call and the procmsg buf should be scheduled to  | 
43  | 8  |  *run.  | 
44  | 8  |  */  | 
45  | 8  | enum mgmt_msg_rsched mgmt_msg_read(struct mgmt_msg_state *ms, int fd,  | 
46  | 8  |            bool debug)  | 
47  | 8  | { | 
48  | 0  |   const char *dbgtag = debug ? ms->idtag : NULL;  | 
49  | 0  |   size_t avail = STREAM_WRITEABLE(ms->ins);  | 
50  | 0  |   struct mgmt_msg_hdr *mhdr = NULL;  | 
51  | 0  |   size_t total = 0;  | 
52  | 0  |   size_t mcount = 0;  | 
53  | 0  |   ssize_t n, left;  | 
54  |  | 
  | 
55  | 0  |   assert(ms && fd != -1);  | 
56  |  |  | 
57  |  |   /*  | 
58  |  |    * Read as much as we can into the stream.  | 
59  |  |    */  | 
60  | 0  |   while (avail > sizeof(struct mgmt_msg_hdr)) { | 
61  | 0  |     n = stream_read_try(ms->ins, fd, avail);  | 
62  |  |  | 
63  |  |     /* -2 is normal nothing read, and to retry */  | 
64  | 0  |     if (n == -2) { | 
65  | 0  |       MGMT_MSG_DBG(dbgtag, "nothing more to read");  | 
66  | 0  |       break;  | 
67  | 0  |     }  | 
68  | 0  |     if (n <= 0) { | 
69  | 0  |       if (n == 0)  | 
70  | 0  |         MGMT_MSG_ERR(ms, "got EOF/disconnect");  | 
71  | 0  |       else  | 
72  | 0  |         MGMT_MSG_ERR(ms,  | 
73  | 0  |                "got error while reading: '%s'",  | 
74  | 0  |                safe_strerror(errno));  | 
75  | 0  |       return MSR_DISCONNECT;  | 
76  | 0  |     }  | 
77  | 0  |     MGMT_MSG_DBG(dbgtag, "read %zd bytes", n);  | 
78  | 0  |     ms->nrxb += n;  | 
79  | 0  |     avail -= n;  | 
80  | 0  |   }  | 
81  |  |  | 
82  |  |   /*  | 
83  |  |    * Check if we have read a complete messages or not.  | 
84  |  |    */  | 
85  | 0  |   assert(stream_get_getp(ms->ins) == 0);  | 
86  | 0  |   left = stream_get_endp(ms->ins);  | 
87  | 0  |   while (left > (long)sizeof(struct mgmt_msg_hdr)) { | 
88  | 0  |     mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(ms->ins) + total);  | 
89  | 0  |     if (!MGMT_MSG_IS_MARKER(mhdr->marker)) { | 
90  | 0  |       MGMT_MSG_DBG(dbgtag, "recv corrupt buffer, disconnect");  | 
91  | 0  |       return MSR_DISCONNECT;  | 
92  | 0  |     }  | 
93  | 0  |     if ((ssize_t)mhdr->len > left)  | 
94  | 0  |       break;  | 
95  |  |  | 
96  | 0  |     MGMT_MSG_DBG(dbgtag, "read full message len %u", mhdr->len);  | 
97  | 0  |     total += mhdr->len;  | 
98  | 0  |     left -= mhdr->len;  | 
99  | 0  |     mcount++;  | 
100  | 0  |   }  | 
101  |  |  | 
102  | 0  |   if (!mcount)  | 
103  | 0  |     return MSR_SCHED_STREAM;  | 
104  |  |  | 
105  |  |   /*  | 
106  |  |    * We have read at least one message into the stream, queue it up.  | 
107  |  |    */  | 
108  | 0  |   mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(ms->ins) + total);  | 
109  | 0  |   stream_set_endp(ms->ins, total);  | 
110  | 0  |   stream_fifo_push(&ms->inq, ms->ins);  | 
111  | 0  |   ms->ins = stream_new(ms->max_msg_sz);  | 
112  | 0  |   if (left) { | 
113  | 0  |     stream_put(ms->ins, mhdr, left);  | 
114  | 0  |     stream_set_endp(ms->ins, left);  | 
115  | 0  |   }  | 
116  |  | 
  | 
117  | 0  |   return MSR_SCHED_BOTH;  | 
118  | 0  | }  | 
119  |  |  | 
120  |  | /**  | 
121  |  |  * Process streams containing whole messages that have been pushed onto the  | 
122  |  |  * FIFO. This should be called from an event/timer handler and should be  | 
123  |  |  * reschedulable.  | 
124  |  |  *  | 
125  |  |  * Args:  | 
126  |  |  *  ms: mgmt_msg_state for this process.  | 
127  |  |  *  handle_mgs: function to call for each received message.  | 
128  |  |  *  user: opaque value passed through to handle_msg.  | 
129  |  |  *  debug: true to enable debug logging.  | 
130  |  |  *  | 
131  |  |  * Returns:  | 
132  |  |  *  true if more to process (so reschedule) else false  | 
133  |  |  */  | 
134  |  | bool mgmt_msg_procbufs(struct mgmt_msg_state *ms,  | 
135  |  |            void (*handle_msg)(uint8_t version, uint8_t *msg,  | 
136  |  |             size_t msglen, void *user),  | 
137  |  |            void *user, bool debug)  | 
138  | 0  | { | 
139  | 0  |   const char *dbgtag = debug ? ms->idtag : NULL;  | 
140  | 0  |   struct mgmt_msg_hdr *mhdr;  | 
141  | 0  |   struct stream *work;  | 
142  | 0  |   uint8_t *data;  | 
143  | 0  |   size_t left, nproc;  | 
144  |  | 
  | 
145  | 0  |   MGMT_MSG_DBG(dbgtag, "Have %zu streams to process", ms->inq.count);  | 
146  |  | 
  | 
147  | 0  |   nproc = 0;  | 
148  | 0  |   while (nproc < ms->max_read_buf) { | 
149  | 0  |     work = stream_fifo_pop(&ms->inq);  | 
150  | 0  |     if (!work)  | 
151  | 0  |       break;  | 
152  |  |  | 
153  | 0  |     data = STREAM_DATA(work);  | 
154  | 0  |     left = stream_get_endp(work);  | 
155  | 0  |     MGMT_MSG_DBG(dbgtag, "Processing stream of len %zu", left);  | 
156  |  | 
  | 
157  | 0  |     for (; left > sizeof(struct mgmt_msg_hdr);  | 
158  | 0  |          left -= mhdr->len, data += mhdr->len) { | 
159  | 0  |       mhdr = (struct mgmt_msg_hdr *)data;  | 
160  |  | 
  | 
161  | 0  |       assert(MGMT_MSG_IS_MARKER(mhdr->marker));  | 
162  | 0  |       assert(left >= mhdr->len);  | 
163  |  | 
  | 
164  | 0  |       handle_msg(MGMT_MSG_MARKER_VERSION(mhdr->marker),  | 
165  | 0  |            (uint8_t *)(mhdr + 1),  | 
166  | 0  |            mhdr->len - sizeof(struct mgmt_msg_hdr),  | 
167  | 0  |            user);  | 
168  | 0  |       ms->nrxm++;  | 
169  | 0  |       nproc++;  | 
170  | 0  |     }  | 
171  |  | 
  | 
172  | 0  |     if (work != ms->ins)  | 
173  | 0  |       stream_free(work); /* Free it up */  | 
174  | 0  |     else  | 
175  | 0  |       stream_reset(work); /* Reset stream for next read */  | 
176  | 0  |   }  | 
177  |  |  | 
178  |  |   /* return true if should reschedule b/c more to process. */  | 
179  | 0  |   return stream_fifo_head(&ms->inq) != NULL;  | 
180  | 0  | }  | 
181  |  |  | 
182  |  | /**  | 
183  |  |  * Write data from a onto the socket, using streams that have been queued for  | 
184  |  |  * sending by mgmt_msg_send_msg. This function should be reschedulable.  | 
185  |  |  *  | 
186  |  |  * Args:  | 
187  |  |  *  ms: mgmt_msg_state for this process.  | 
188  |  |  *  fd: socket/file to read data from.  | 
189  |  |  *  debug: true to enable debug logging.  | 
190  |  |  *  | 
191  |  |  * Returns:  | 
192  |  |  *  MSW_SCHED_NONE - do not reschedule anything.  | 
193  |  |  *  MSW_SCHED_STREAM - this call should be rescheduled to run again.  | 
194  |  |  *  MSW_SCHED_WRITES_OFF - writes should be disabled with a timer to  | 
195  |  |  *      re-enable them a short time later  | 
196  |  |  *  MSW_DISCONNECT - socket should be closed and reconnect retried.  | 
197  |  |  *run.  | 
198  |  |  */  | 
199  |  | enum mgmt_msg_wsched mgmt_msg_write(struct mgmt_msg_state *ms, int fd,  | 
200  |  |             bool debug)  | 
201  | 0  | { | 
202  | 0  |   const char *dbgtag = debug ? ms->idtag : NULL;  | 
203  | 0  |   struct stream *s;  | 
204  | 0  |   size_t nproc = 0;  | 
205  | 0  |   ssize_t left;  | 
206  | 0  |   ssize_t n;  | 
207  |  | 
  | 
208  | 0  |   if (ms->outs) { | 
209  | 0  |     MGMT_MSG_DBG(dbgtag,  | 
210  | 0  |            "found unqueued stream with %zu bytes, queueing",  | 
211  | 0  |            stream_get_endp(ms->outs));  | 
212  | 0  |     stream_fifo_push(&ms->outq, ms->outs);  | 
213  | 0  |     ms->outs = NULL;  | 
214  | 0  |   }  | 
215  |  | 
  | 
216  | 0  |   for (s = stream_fifo_head(&ms->outq); s && nproc < ms->max_write_buf;  | 
217  | 0  |        s = stream_fifo_head(&ms->outq)) { | 
218  | 0  |     left = STREAM_READABLE(s);  | 
219  | 0  |     assert(left);  | 
220  |  | 
  | 
221  | 0  |     n = stream_flush(s, fd);  | 
222  | 0  |     if (n <= 0) { | 
223  | 0  |       if (n == 0)  | 
224  | 0  |         MGMT_MSG_ERR(ms,  | 
225  | 0  |                "connection closed while writing");  | 
226  | 0  |       else if (ERRNO_IO_RETRY(errno)) { | 
227  | 0  |         MGMT_MSG_DBG(  | 
228  | 0  |           dbgtag,  | 
229  | 0  |           "retry error while writing %zd bytes: %s (%d)",  | 
230  | 0  |           left, safe_strerror(errno), errno);  | 
231  | 0  |         return MSW_SCHED_STREAM;  | 
232  | 0  |       } else  | 
233  | 0  |         MGMT_MSG_ERR(  | 
234  | 0  |           ms,  | 
235  | 0  |           "error while writing %zd bytes: %s (%d)",  | 
236  | 0  |           left, safe_strerror(errno), errno);  | 
237  |  |  | 
238  | 0  |       n = mgmt_msg_reset_writes(ms);  | 
239  | 0  |       MGMT_MSG_DBG(dbgtag, "drop and freed %zd streams", n);  | 
240  |  | 
  | 
241  | 0  |       return MSW_DISCONNECT;  | 
242  | 0  |     }  | 
243  |  |  | 
244  | 0  |     ms->ntxb += n;  | 
245  | 0  |     if (n != left) { | 
246  | 0  |       MGMT_MSG_DBG(dbgtag, "short stream write %zd of %zd", n,  | 
247  | 0  |              left);  | 
248  | 0  |       stream_forward_getp(s, n);  | 
249  | 0  |       return MSW_SCHED_STREAM;  | 
250  | 0  |     }  | 
251  |  |  | 
252  | 0  |     stream_free(stream_fifo_pop(&ms->outq));  | 
253  | 0  |     MGMT_MSG_DBG(dbgtag, "wrote stream of %zd bytes", n);  | 
254  | 0  |     nproc++;  | 
255  | 0  |   }  | 
256  | 0  |   if (s) { | 
257  | 0  |     MGMT_MSG_DBG(  | 
258  | 0  |       dbgtag,  | 
259  | 0  |       "reached %zu buffer writes, pausing with %zu streams left",  | 
260  | 0  |       ms->max_write_buf, ms->outq.count);  | 
261  | 0  |     return MSW_SCHED_STREAM;  | 
262  | 0  |   }  | 
263  | 0  |   MGMT_MSG_DBG(dbgtag, "flushed all streams from output q");  | 
264  | 0  |   return MSW_SCHED_NONE;  | 
265  | 0  | }  | 
266  |  |  | 
267  |  |  | 
268  |  | /**  | 
269  |  |  * Send a message by enqueueing it to be written over the socket by  | 
270  |  |  * mgmt_msg_write.  | 
271  |  |  *  | 
272  |  |  * Args:  | 
273  |  |  *  ms: mgmt_msg_state for this process.  | 
274  |  |  *  version: version of this message, will be given to receiving side.  | 
275  |  |  *  msg: the message to be sent.  | 
276  |  |  *  len: the length of the message.  | 
277  |  |  *  packf: a function to pack the message.  | 
278  |  |  *  debug: true to enable debug logging.  | 
279  |  |  *  | 
280  |  |  * Returns:  | 
281  |  |  *      0 on success, otherwise -1 on failure. The only failure mode is if a  | 
282  |  |  *      the message exceeds the maximum message size configured on init.  | 
283  |  |  */  | 
284  |  | int mgmt_msg_send_msg(struct mgmt_msg_state *ms, uint8_t version, void *msg,  | 
285  |  |           size_t len, size_t (*packf)(void *msg, void *buf),  | 
286  |  |           bool debug)  | 
287  | 0  | { | 
288  | 0  |   const char *dbgtag = debug ? ms->idtag : NULL;  | 
289  | 0  |   struct mgmt_msg_hdr *mhdr;  | 
290  | 0  |   struct stream *s;  | 
291  | 0  |   uint8_t *dstbuf;  | 
292  | 0  |   size_t endp, n;  | 
293  | 0  |   size_t mlen = len + sizeof(*mhdr);  | 
294  |  | 
  | 
295  | 0  |   if (mlen > ms->max_msg_sz) { | 
296  | 0  |     MGMT_MSG_ERR(ms, "Message %zu > max size %zu, dropping", mlen,  | 
297  | 0  |            ms->max_msg_sz);  | 
298  | 0  |     return -1;  | 
299  | 0  |   }  | 
300  |  |  | 
301  | 0  |   if (!ms->outs) { | 
302  | 0  |     MGMT_MSG_DBG(dbgtag, "creating new stream for msg len %zu",  | 
303  | 0  |            len);  | 
304  | 0  |     ms->outs = stream_new(ms->max_msg_sz);  | 
305  | 0  |   } else if (STREAM_WRITEABLE(ms->outs) < mlen) { | 
306  | 0  |     MGMT_MSG_DBG(  | 
307  | 0  |       dbgtag,  | 
308  | 0  |       "enq existing stream len %zu and creating new stream for msg len %zu",  | 
309  | 0  |       STREAM_WRITEABLE(ms->outs), mlen);  | 
310  | 0  |     stream_fifo_push(&ms->outq, ms->outs);  | 
311  | 0  |     ms->outs = stream_new(ms->max_msg_sz);  | 
312  | 0  |   } else { | 
313  | 0  |     MGMT_MSG_DBG(  | 
314  | 0  |       dbgtag,  | 
315  | 0  |       "using existing stream with avail %zu for msg len %zu",  | 
316  | 0  |       STREAM_WRITEABLE(ms->outs), mlen);  | 
317  | 0  |   }  | 
318  | 0  |   s = ms->outs;  | 
319  |  |  | 
320  |  |   /* We have a stream with space, pack the message into it. */  | 
321  | 0  |   mhdr = (struct mgmt_msg_hdr *)(STREAM_DATA(s) + s->endp);  | 
322  | 0  |   mhdr->marker = MGMT_MSG_MARKER(version);  | 
323  | 0  |   mhdr->len = mlen;  | 
324  | 0  |   stream_forward_endp(s, sizeof(*mhdr));  | 
325  | 0  |   endp = stream_get_endp(s);  | 
326  | 0  |   dstbuf = STREAM_DATA(s) + endp;  | 
327  | 0  |   if (packf)  | 
328  | 0  |     n = packf(msg, dstbuf);  | 
329  | 0  |   else { | 
330  | 0  |     memcpy(dstbuf, msg, len);  | 
331  | 0  |     n = len;  | 
332  | 0  |   }  | 
333  | 0  |   stream_set_endp(s, endp + n);  | 
334  | 0  |   ms->ntxm++;  | 
335  |  | 
  | 
336  | 0  |   return 0;  | 
337  | 0  | }  | 
338  |  |  | 
339  |  | /**  | 
340  |  |  * Create and open a unix domain stream socket on the given path  | 
341  |  |  * setting non-blocking and send and receive buffer sizes.  | 
342  |  |  *  | 
343  |  |  * Args:  | 
344  |  |  *  path: path of unix domain socket to connect to.  | 
345  |  |  *  sendbuf: size of socket send buffer.  | 
346  |  |  *  recvbuf: size of socket receive buffer.  | 
347  |  |  *  dbgtag: if non-NULL enable log debug, and use this tag.  | 
348  |  |  *  | 
349  |  |  * Returns:  | 
350  |  |  *  socket fd or -1 on error.  | 
351  |  |  */  | 
352  |  | int mgmt_msg_connect(const char *path, size_t sendbuf, size_t recvbuf,  | 
353  |  |          const char *dbgtag)  | 
354  | 0  | { | 
355  | 0  |   int ret, sock, len;  | 
356  | 0  |   struct sockaddr_un addr;  | 
357  |  | 
  | 
358  | 0  |   MGMT_MSG_DBG(dbgtag, "connecting to server on %s", path);  | 
359  | 0  |   sock = socket(AF_UNIX, SOCK_STREAM, 0);  | 
360  | 0  |   if (sock < 0) { | 
361  | 0  |     MGMT_MSG_DBG(dbgtag, "socket failed: %s", safe_strerror(errno));  | 
362  | 0  |     return -1;  | 
363  | 0  |   }  | 
364  |  |  | 
365  | 0  |   memset(&addr, 0, sizeof(struct sockaddr_un));  | 
366  | 0  |   addr.sun_family = AF_UNIX;  | 
367  | 0  |   strlcpy(addr.sun_path, path, sizeof(addr.sun_path));  | 
368  |  | #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN  | 
369  |  |   len = addr.sun_len = SUN_LEN(&addr);  | 
370  |  | #else  | 
371  | 0  |   len = sizeof(addr.sun_family) + strlen(addr.sun_path);  | 
372  | 0  | #endif /* HAVE_STRUCT_SOCKADDR_UN_SUN_LEN */  | 
373  | 0  |   ret = connect(sock, (struct sockaddr *)&addr, len);  | 
374  | 0  |   if (ret < 0) { | 
375  | 0  |     MGMT_MSG_DBG(dbgtag, "failed to connect on %s: %s", path,  | 
376  | 0  |            safe_strerror(errno));  | 
377  | 0  |     close(sock);  | 
378  | 0  |     return -1;  | 
379  | 0  |   }  | 
380  |  |  | 
381  | 0  |   MGMT_MSG_DBG(dbgtag, "connected to server on %s", path);  | 
382  | 0  |   set_nonblocking(sock);  | 
383  | 0  |   setsockopt_so_sendbuf(sock, sendbuf);  | 
384  | 0  |   setsockopt_so_recvbuf(sock, recvbuf);  | 
385  | 0  |   return sock;  | 
386  | 0  | }  | 
387  |  |  | 
388  |  | /**  | 
389  |  |  * Reset the sending queue, by dequeueing all streams and freeing them. Return  | 
390  |  |  * the number of streams freed.  | 
391  |  |  *  | 
392  |  |  * Args:  | 
393  |  |  *  ms: mgmt_msg_state for this process.  | 
394  |  |  *  | 
395  |  |  * Returns:  | 
396  |  |  *      Number of streams that were freed.  | 
397  |  |  *  | 
398  |  |  */  | 
399  |  | size_t mgmt_msg_reset_writes(struct mgmt_msg_state *ms)  | 
400  | 0  | { | 
401  | 0  |   struct stream *s;  | 
402  | 0  |   size_t nproc = 0;  | 
403  |  | 
  | 
404  | 0  |   for (s = stream_fifo_pop(&ms->outq); s;  | 
405  | 0  |        s = stream_fifo_pop(&ms->outq), nproc++)  | 
406  | 0  |     stream_free(s);  | 
407  |  | 
  | 
408  | 0  |   return nproc;  | 
409  | 0  | }  | 
410  |  |  | 
411  |  |  | 
412  |  | void mgmt_msg_init(struct mgmt_msg_state *ms, size_t max_read_buf,  | 
413  |  |        size_t max_write_buf, size_t max_msg_sz, const char *idtag)  | 
414  | 0  | { | 
415  | 0  |   memset(ms, 0, sizeof(*ms));  | 
416  | 0  |   ms->ins = stream_new(max_msg_sz);  | 
417  | 0  |   stream_fifo_init(&ms->inq);  | 
418  | 0  |   stream_fifo_init(&ms->outq);  | 
419  | 0  |   ms->max_read_buf = max_write_buf;  | 
420  | 0  |   ms->max_write_buf = max_read_buf;  | 
421  | 0  |   ms->max_msg_sz = max_msg_sz;  | 
422  | 0  |   ms->idtag = strdup(idtag);  | 
423  | 0  | }  | 
424  |  |  | 
425  |  | void mgmt_msg_destroy(struct mgmt_msg_state *ms)  | 
426  | 0  | { | 
427  | 0  |   mgmt_msg_reset_writes(ms);  | 
428  | 0  |   if (ms->ins)  | 
429  | 0  |     stream_free(ms->ins);  | 
430  | 0  |   free(ms->idtag);  | 
431  | 0  | }  | 
432  |  |  | 
433  |  | /*  | 
434  |  |  * Connections  | 
435  |  |  */  | 
436  |  |  | 
437  |  | #define MSG_CONN_DEFAULT_CONN_RETRY_MSEC 250  | 
438  | 0  | #define MSG_CONN_SEND_BUF_SIZE (1u << 16)  | 
439  | 0  | #define MSG_CONN_RECV_BUF_SIZE (1u << 16)  | 
440  |  |  | 
441  |  | static void msg_client_sched_connect(struct msg_client *client,  | 
442  |  |              unsigned long msec);  | 
443  |  |  | 
444  |  | static void msg_conn_sched_proc_msgs(struct msg_conn *conn);  | 
445  |  | static void msg_conn_sched_read(struct msg_conn *conn);  | 
446  |  | static void msg_conn_sched_write(struct msg_conn *conn);  | 
447  |  |  | 
448  |  | static void msg_conn_write(struct event *thread)  | 
449  | 0  | { | 
450  | 0  |   struct msg_conn *conn = EVENT_ARG(thread);  | 
451  | 0  |   enum mgmt_msg_wsched rv;  | 
452  | 0  | 
  | 
453  | 0  |   rv = mgmt_msg_write(&conn->mstate, conn->fd, conn->debug);  | 
454  | 0  |   if (rv == MSW_SCHED_STREAM)  | 
455  | 0  |     msg_conn_sched_write(conn);  | 
456  | 0  |   else if (rv == MSW_DISCONNECT)  | 
457  | 0  |     msg_conn_disconnect(conn, conn->is_client);  | 
458  | 0  |   else  | 
459  | 0  |     assert(rv == MSW_SCHED_NONE);  | 
460  | 0  | }  | 
461  |  |  | 
462  |  | static void msg_conn_read(struct event *thread)  | 
463  | 0  | { | 
464  | 0  |   struct msg_conn *conn = EVENT_ARG(thread);  | 
465  | 0  |   enum mgmt_msg_rsched rv;  | 
466  | 0  | 
  | 
467  | 0  |   rv = mgmt_msg_read(&conn->mstate, conn->fd, conn->debug);  | 
468  | 0  |   if (rv == MSR_DISCONNECT) { | 
469  | 0  |     msg_conn_disconnect(conn, conn->is_client);  | 
470  | 0  |     return;  | 
471  | 0  |   }  | 
472  | 0  |   if (rv == MSR_SCHED_BOTH)  | 
473  | 0  |     msg_conn_sched_proc_msgs(conn);  | 
474  | 0  |   msg_conn_sched_read(conn);  | 
475  | 0  | }  | 
476  |  |  | 
477  |  | /* collapse this into mgmt_msg_procbufs */  | 
478  |  | static void msg_conn_proc_msgs(struct event *thread)  | 
479  | 0  | { | 
480  | 0  |   struct msg_conn *conn = EVENT_ARG(thread);  | 
481  | 0  | 
  | 
482  | 0  |   if (mgmt_msg_procbufs(&conn->mstate,  | 
483  | 0  |             (void (*)(uint8_t, uint8_t *, size_t,  | 
484  | 0  |           void *))conn->handle_msg,  | 
485  | 0  |             conn, conn->debug))  | 
486  | 0  |     /* there's more, schedule handling more */  | 
487  | 0  |     msg_conn_sched_proc_msgs(conn);  | 
488  | 0  | }  | 
489  |  |  | 
490  |  | static void msg_conn_sched_read(struct msg_conn *conn)  | 
491  | 0  | { | 
492  | 0  |   event_add_read(conn->loop, msg_conn_read, conn, conn->fd,  | 
493  | 0  |            &conn->read_ev);  | 
494  | 0  | }  | 
495  |  |  | 
496  |  | static void msg_conn_sched_write(struct msg_conn *conn)  | 
497  | 0  | { | 
498  | 0  |   event_add_write(conn->loop, msg_conn_write, conn, conn->fd,  | 
499  | 0  |       &conn->write_ev);  | 
500  | 0  | }  | 
501  |  |  | 
502  |  | static void msg_conn_sched_proc_msgs(struct msg_conn *conn)  | 
503  | 0  | { | 
504  | 0  |   event_add_event(conn->loop, msg_conn_proc_msgs, conn, 0,  | 
505  | 0  |       &conn->proc_msg_ev);  | 
506  | 0  | }  | 
507  |  |  | 
508  |  |  | 
509  |  | void msg_conn_disconnect(struct msg_conn *conn, bool reconnect)  | 
510  |  | { | 
511  |  |  | 
512  |  |   /* disconnect short-circuit if present */  | 
513  |  |   if (conn->remote_conn) { | 
514  |  |     conn->remote_conn->remote_conn = NULL;  | 
515  |  |     conn->remote_conn = NULL;  | 
516  |  |   }  | 
517  |  |  | 
518  |  |   if (conn->fd != -1) { | 
519  |  |     close(conn->fd);  | 
520  |  |     conn->fd = -1;  | 
521  |  |  | 
522  |  |     /* Notify client through registered callback (if any) */  | 
523  |  |     if (conn->notify_disconnect)  | 
524  |  |       (void)(*conn->notify_disconnect)(conn);  | 
525  |  |   }  | 
526  |  |  | 
527  |  |   if (reconnect) { | 
528  |  |     assert(conn->is_client);  | 
529  |  |     msg_client_sched_connect(  | 
530  |  |       container_of(conn, struct msg_client, conn),  | 
531  |  |       MSG_CONN_DEFAULT_CONN_RETRY_MSEC);  | 
532  |  |   }  | 
533  |  | }  | 
534  |  |  | 
535  |  | int msg_conn_send_msg(struct msg_conn *conn, uint8_t version, void *msg,  | 
536  |  |           size_t mlen, size_t (*packf)(void *, void *),  | 
537  |  |           bool short_circuit_ok)  | 
538  | 0  | { | 
539  | 0  |   const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL;  | 
540  |  | 
  | 
541  | 0  |   if (conn->fd == -1) { | 
542  | 0  |     MGMT_MSG_ERR(&conn->mstate,  | 
543  | 0  |            "can't send message on closed connection");  | 
544  | 0  |     return -1;  | 
545  | 0  |   }  | 
546  |  |  | 
547  |  |   /* immediately handle the message if short-circuit is present */  | 
548  | 0  |   if (conn->remote_conn && short_circuit_ok) { | 
549  | 0  |     uint8_t *buf = msg;  | 
550  | 0  |     size_t n = mlen;  | 
551  | 0  |     bool old;  | 
552  |  | 
  | 
553  | 0  |     if (packf) { | 
554  | 0  |       buf = XMALLOC(MTYPE_TMP, mlen);  | 
555  | 0  |       n = packf(msg, buf);  | 
556  | 0  |     }  | 
557  |  | 
  | 
558  | 0  |     ++conn->short_circuit_depth;  | 
559  | 0  |     MGMT_MSG_DBG(dbgtag, "SC send: depth %u msg: %p",  | 
560  | 0  |            conn->short_circuit_depth, msg);  | 
561  |  | 
  | 
562  | 0  |     old = conn->remote_conn->is_short_circuit;  | 
563  | 0  |     conn->remote_conn->is_short_circuit = true;  | 
564  | 0  |     conn->remote_conn->handle_msg(version, buf, n,  | 
565  | 0  |                 conn->remote_conn);  | 
566  | 0  |     conn->remote_conn->is_short_circuit = old;  | 
567  |  | 
  | 
568  | 0  |     --conn->short_circuit_depth;  | 
569  | 0  |     MGMT_MSG_DBG(dbgtag, "SC return from depth: %u msg: %p",  | 
570  | 0  |            conn->short_circuit_depth, msg);  | 
571  |  | 
  | 
572  | 0  |     if (packf)  | 
573  | 0  |       XFREE(MTYPE_TMP, buf);  | 
574  | 0  |     return 0;  | 
575  | 0  |   }  | 
576  |  |  | 
577  | 0  |   int rv = mgmt_msg_send_msg(&conn->mstate, version, msg, mlen, packf,  | 
578  | 0  |            conn->debug);  | 
579  |  | 
  | 
580  | 0  |   msg_conn_sched_write(conn);  | 
581  |  | 
  | 
582  | 0  |   return rv;  | 
583  | 0  | }  | 
584  |  |  | 
585  |  | void msg_conn_cleanup(struct msg_conn *conn)  | 
586  | 0  | { | 
587  | 0  |   struct mgmt_msg_state *ms = &conn->mstate;  | 
588  |  |  | 
589  |  |   /* disconnect short-circuit if present */  | 
590  | 0  |   if (conn->remote_conn) { | 
591  | 0  |     conn->remote_conn->remote_conn = NULL;  | 
592  | 0  |     conn->remote_conn = NULL;  | 
593  | 0  |   }  | 
594  |  | 
  | 
595  | 0  |   if (conn->fd != -1) { | 
596  | 0  |     close(conn->fd);  | 
597  | 0  |     conn->fd = -1;  | 
598  | 0  |   }  | 
599  |  | 
  | 
600  | 0  |   EVENT_OFF(conn->read_ev);  | 
601  | 0  |   EVENT_OFF(conn->write_ev);  | 
602  | 0  |   EVENT_OFF(conn->proc_msg_ev);  | 
603  |  | 
  | 
604  | 0  |   mgmt_msg_destroy(ms);  | 
605  | 0  | }  | 
606  |  |  | 
607  |  | /*  | 
608  |  |  * Client Connections  | 
609  |  |  */  | 
610  |  |  | 
611  | 0  | DECLARE_LIST(msg_server_list, struct msg_server, link);  | 
612  |  |  | 
613  |  | static struct msg_server_list_head msg_servers;  | 
614  |  |  | 
615  |  | static void msg_client_connect(struct msg_client *conn);  | 
616  |  |  | 
617  |  | static void msg_client_connect_timer(struct event *thread)  | 
618  | 0  | { | 
619  | 0  |   msg_client_connect(EVENT_ARG(thread));  | 
620  | 0  | }  | 
621  |  |  | 
622  |  | static void msg_client_sched_connect(struct msg_client *client,  | 
623  |  |              unsigned long msec)  | 
624  | 0  | { | 
625  | 0  |   struct msg_conn *conn = &client->conn;  | 
626  | 0  |   const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL;  | 
627  |  | 
  | 
628  | 0  |   MGMT_MSG_DBG(dbgtag, "connection retry in %lu msec", msec);  | 
629  | 0  |   if (msec)  | 
630  | 0  |     event_add_timer_msec(conn->loop, msg_client_connect_timer,  | 
631  | 0  |              client, msec, &client->conn_retry_tmr);  | 
632  | 0  |   else  | 
633  | 0  |     event_add_event(conn->loop, msg_client_connect_timer, client, 0,  | 
634  | 0  |         &client->conn_retry_tmr);  | 
635  | 0  | }  | 
636  |  |  | 
637  |  | static bool msg_client_connect_short_circuit(struct msg_client *client)  | 
638  | 0  | { | 
639  | 0  |   struct msg_conn *server_conn;  | 
640  | 0  |   struct msg_server *server;  | 
641  | 0  |   const char *dbgtag =  | 
642  | 0  |     client->conn.debug ? client->conn.mstate.idtag : NULL;  | 
643  | 0  |   union sockunion su = {0}; | 
644  | 0  |   int sockets[2];  | 
645  | 0  | 
  | 
646  | 0  |   frr_each (msg_server_list, &msg_servers, server)  | 
647  | 0  |     if (!strcmp(server->sopath, client->sopath))  | 
648  | 0  |       break;  | 
649  | 0  |   if (!server) { | 
650  | 0  |     MGMT_MSG_DBG(dbgtag,  | 
651  | 0  |            "no short-circuit connection available for %s",  | 
652  | 0  |            client->sopath);  | 
653  | 0  | 
  | 
654  | 0  |     return false;  | 
655  | 0  |   }  | 
656  | 0  | 
  | 
657  | 0  |   if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets)) { | 
658  | 0  |     MGMT_MSG_ERR(  | 
659  | 0  |       &client->conn.mstate,  | 
660  | 0  |       "socketpair failed trying to short-circuit connection on %s: %s",  | 
661  | 0  |       client->sopath, safe_strerror(errno));  | 
662  | 0  |     return false;  | 
663  | 0  |   }  | 
664  | 0  | 
  | 
665  | 0  |   /* client side */  | 
666  | 0  |   client->conn.fd = sockets[0];  | 
667  | 0  |   set_nonblocking(sockets[0]);  | 
668  | 0  |   setsockopt_so_sendbuf(sockets[0], client->conn.mstate.max_write_buf);  | 
669  | 0  |   setsockopt_so_recvbuf(sockets[0], client->conn.mstate.max_read_buf);  | 
670  | 0  | 
  | 
671  | 0  |   /* server side */  | 
672  | 0  |   memset(&su, 0, sizeof(union sockunion));  | 
673  | 0  |   server_conn = server->create(sockets[1], &su);  | 
674  | 0  | 
  | 
675  | 0  |   client->conn.remote_conn = server_conn;  | 
676  | 0  |   server_conn->remote_conn = &client->conn;  | 
677  | 0  | 
  | 
678  | 0  |   MGMT_MSG_DBG(  | 
679  | 0  |     dbgtag,  | 
680  | 0  |     "short-circuit connection on %s server %s:%d to client %s:%d",  | 
681  | 0  |     client->sopath, server_conn->mstate.idtag, server_conn->fd,  | 
682  | 0  |     client->conn.mstate.idtag, client->conn.fd);  | 
683  | 0  | 
  | 
684  | 0  |   MGMT_MSG_DBG(  | 
685  | 0  |     server_conn->debug ? server_conn->mstate.idtag : NULL,  | 
686  | 0  |     "short-circuit connection on %s client %s:%d to server %s:%d",  | 
687  | 0  |     client->sopath, client->conn.mstate.idtag, client->conn.fd,  | 
688  | 0  |     server_conn->mstate.idtag, server_conn->fd);  | 
689  | 0  | 
  | 
690  | 0  |   return true;  | 
691  | 0  | }  | 
692  |  |  | 
693  |  |  | 
694  |  | /* Connect and start reading from the socket */  | 
695  |  | static void msg_client_connect(struct msg_client *client)  | 
696  | 0  | { | 
697  | 0  |   struct msg_conn *conn = &client->conn;  | 
698  | 0  |   const char *dbgtag = conn->debug ? conn->mstate.idtag : NULL;  | 
699  | 0  | 
  | 
700  | 0  |   if (!client->short_circuit_ok ||  | 
701  | 0  |       !msg_client_connect_short_circuit(client))  | 
702  | 0  |     conn->fd =  | 
703  | 0  |       mgmt_msg_connect(client->sopath, MSG_CONN_SEND_BUF_SIZE,  | 
704  | 0  |            MSG_CONN_RECV_BUF_SIZE, dbgtag);  | 
705  | 0  | 
  | 
706  | 0  |   if (conn->fd == -1)  | 
707  | 0  |     /* retry the connection */  | 
708  | 0  |     msg_client_sched_connect(client,  | 
709  | 0  |            MSG_CONN_DEFAULT_CONN_RETRY_MSEC);  | 
710  | 0  |   else if (client->notify_connect && client->notify_connect(client))  | 
711  | 0  |     /* client connect notify failed */  | 
712  | 0  |     msg_conn_disconnect(conn, true);  | 
713  | 0  |   else  | 
714  | 0  |     /* start reading */  | 
715  | 0  |     msg_conn_sched_read(conn);  | 
716  | 0  | }  | 
717  |  |  | 
718  |  | void msg_client_init(struct msg_client *client, struct event_loop *tm,  | 
719  |  |          const char *sopath,  | 
720  |  |          int (*notify_connect)(struct msg_client *client),  | 
721  |  |          int (*notify_disconnect)(struct msg_conn *client),  | 
722  |  |          void (*handle_msg)(uint8_t version, uint8_t *data,  | 
723  |  |           size_t len, struct msg_conn *client),  | 
724  |  |          size_t max_read_buf, size_t max_write_buf,  | 
725  |  |          size_t max_msg_sz, bool short_circuit_ok,  | 
726  |  |          const char *idtag, bool debug)  | 
727  | 0  | { | 
728  | 0  |   struct msg_conn *conn = &client->conn;  | 
729  | 0  |   memset(client, 0, sizeof(*client));  | 
730  |  | 
  | 
731  | 0  |   conn->loop = tm;  | 
732  | 0  |   conn->fd = -1;  | 
733  | 0  |   conn->handle_msg = handle_msg;  | 
734  | 0  |   conn->notify_disconnect = notify_disconnect;  | 
735  | 0  |   conn->is_client = true;  | 
736  | 0  |   conn->debug = debug;  | 
737  | 0  |   client->short_circuit_ok = short_circuit_ok;  | 
738  | 0  |   client->sopath = strdup(sopath);  | 
739  | 0  |   client->notify_connect = notify_connect;  | 
740  |  | 
  | 
741  | 0  |   mgmt_msg_init(&conn->mstate, max_read_buf, max_write_buf, max_msg_sz,  | 
742  | 0  |           idtag);  | 
743  |  |  | 
744  |  |   /* XXX maybe just have client kick this off */  | 
745  |  |   /* Start trying to connect to server */  | 
746  | 0  |   msg_client_sched_connect(client, 0);  | 
747  | 0  | }  | 
748  |  |  | 
749  |  | void msg_client_cleanup(struct msg_client *client)  | 
750  | 0  | { | 
751  | 0  |   assert(client->conn.is_client);  | 
752  |  | 
  | 
753  | 0  |   EVENT_OFF(client->conn_retry_tmr);  | 
754  | 0  |   free(client->sopath);  | 
755  |  | 
  | 
756  | 0  |   msg_conn_cleanup(&client->conn);  | 
757  | 0  | }  | 
758  |  |  | 
759  |  |  | 
760  |  | /*  | 
761  |  |  * Server-side connections  | 
762  |  |  */  | 
763  |  |  | 
764  |  | static void msg_server_accept(struct event *event)  | 
765  | 0  | { | 
766  | 0  |   struct msg_server *server = EVENT_ARG(event);  | 
767  | 0  |   int fd;  | 
768  | 0  |   union sockunion su;  | 
769  | 0  | 
  | 
770  | 0  |   if (server->fd < 0)  | 
771  | 0  |     return;  | 
772  | 0  | 
  | 
773  | 0  |   /* We continue hearing server listen socket. */  | 
774  | 0  |   event_add_read(server->loop, msg_server_accept, server, server->fd,  | 
775  | 0  |            &server->listen_ev);  | 
776  | 0  | 
  | 
777  | 0  |   memset(&su, 0, sizeof(union sockunion));  | 
778  | 0  | 
  | 
779  | 0  |   /* We can handle IPv4 or IPv6 socket. */  | 
780  | 0  |   fd = sockunion_accept(server->fd, &su);  | 
781  | 0  |   if (fd < 0) { | 
782  | 0  |     zlog_err("Failed to accept %s client connection: %s", | 
783  | 0  |        server->idtag, safe_strerror(errno));  | 
784  | 0  |     return;  | 
785  | 0  |   }  | 
786  | 0  |   set_nonblocking(fd);  | 
787  | 0  |   set_cloexec(fd);  | 
788  | 0  | 
  | 
789  | 0  |   DEBUGD(server->debug, "Accepted new %s connection", server->idtag);  | 
790  | 0  | 
  | 
791  | 0  |   server->create(fd, &su);  | 
792  | 0  | }  | 
793  |  |  | 
794  |  | int msg_server_init(struct msg_server *server, const char *sopath,  | 
795  |  |         struct event_loop *loop,  | 
796  |  |         struct msg_conn *(*create)(int fd, union sockunion *su),  | 
797  |  |         const char *idtag, struct debug *debug)  | 
798  | 0  | { | 
799  | 0  |   int ret;  | 
800  | 0  |   int sock;  | 
801  | 0  |   struct sockaddr_un addr;  | 
802  | 0  |   mode_t old_mask;  | 
803  |  | 
  | 
804  | 0  |   memset(server, 0, sizeof(*server));  | 
805  | 0  |   server->fd = -1;  | 
806  |  | 
  | 
807  | 0  |   sock = socket(AF_UNIX, SOCK_STREAM, PF_UNSPEC);  | 
808  | 0  |   if (sock < 0) { | 
809  | 0  |     zlog_err("Failed to create %s server socket: %s", server->idtag, | 
810  | 0  |        safe_strerror(errno));  | 
811  | 0  |     goto fail;  | 
812  | 0  |   }  | 
813  |  |  | 
814  | 0  |   addr.sun_family = AF_UNIX,  | 
815  | 0  |   strlcpy(addr.sun_path, sopath, sizeof(addr.sun_path));  | 
816  | 0  |   unlink(addr.sun_path);  | 
817  | 0  |   old_mask = umask(0077);  | 
818  | 0  |   ret = bind(sock, (struct sockaddr *)&addr, sizeof(addr));  | 
819  | 0  |   if (ret < 0) { | 
820  | 0  |     zlog_err("Failed to bind %s server socket to '%s': %s", | 
821  | 0  |        server->idtag, addr.sun_path, safe_strerror(errno));  | 
822  | 0  |     umask(old_mask);  | 
823  | 0  |     goto fail;  | 
824  | 0  |   }  | 
825  | 0  |   umask(old_mask);  | 
826  |  | 
  | 
827  | 0  |   ret = listen(sock, MGMTD_MAX_CONN);  | 
828  | 0  |   if (ret < 0) { | 
829  | 0  |     zlog_err("Failed to listen on %s server socket: %s", | 
830  | 0  |        server->idtag, safe_strerror(errno));  | 
831  | 0  |     goto fail;  | 
832  | 0  |   }  | 
833  |  |  | 
834  | 0  |   server->fd = sock;  | 
835  | 0  |   server->loop = loop;  | 
836  | 0  |   server->sopath = strdup(sopath);  | 
837  | 0  |   server->idtag = strdup(idtag);  | 
838  | 0  |   server->create = create;  | 
839  | 0  |   server->debug = debug;  | 
840  |  | 
  | 
841  | 0  |   msg_server_list_add_head(&msg_servers, server);  | 
842  |  | 
  | 
843  | 0  |   event_add_read(server->loop, msg_server_accept, server, server->fd,  | 
844  | 0  |            &server->listen_ev);  | 
845  |  |  | 
846  |  | 
  | 
847  | 0  |   DEBUGD(debug, "Started %s server, listening on %s", idtag, sopath);  | 
848  | 0  |   return 0;  | 
849  |  |  | 
850  | 0  | fail:  | 
851  | 0  |   if (sock >= 0)  | 
852  | 0  |     close(sock);  | 
853  | 0  |   server->fd = -1;  | 
854  | 0  |   return -1;  | 
855  | 0  | }  | 
856  |  |  | 
857  |  | void msg_server_cleanup(struct msg_server *server)  | 
858  | 0  | { | 
859  | 0  |   DEBUGD(server->debug, "Closing %s server", server->idtag);  | 
860  |  | 
  | 
861  | 0  |   if (server->listen_ev)  | 
862  | 0  |     EVENT_OFF(server->listen_ev);  | 
863  |  | 
  | 
864  | 0  |   msg_server_list_del(&msg_servers, server);  | 
865  |  | 
  | 
866  | 0  |   if (server->fd >= 0)  | 
867  | 0  |     close(server->fd);  | 
868  | 0  |   free((char *)server->sopath);  | 
869  | 0  |   free((char *)server->idtag);  | 
870  |  | 
  | 
871  | 0  |   memset(server, 0, sizeof(*server));  | 
872  | 0  |   server->fd = -1;  | 
873  | 0  | }  | 
874  |  |  | 
875  |  | /*  | 
876  |  |  * Initialize and start reading from the accepted socket  | 
877  |  |  *  | 
878  |  |  *     notify_connect - only called for disconnect i.e., connected == false  | 
879  |  |  */  | 
880  |  | void msg_conn_accept_init(struct msg_conn *conn, struct event_loop *tm, int fd,  | 
881  |  |         int (*notify_disconnect)(struct msg_conn *conn),  | 
882  |  |         void (*handle_msg)(uint8_t version, uint8_t *data,  | 
883  |  |                size_t len, struct msg_conn *conn),  | 
884  |  |         size_t max_read, size_t max_write, size_t max_size,  | 
885  |  |         const char *idtag)  | 
886  | 0  | { | 
887  | 0  |   conn->loop = tm;  | 
888  | 0  |   conn->fd = fd;  | 
889  | 0  |   conn->notify_disconnect = notify_disconnect;  | 
890  | 0  |   conn->handle_msg = handle_msg;  | 
891  | 0  |   conn->is_client = false;  | 
892  |  | 
  | 
893  | 0  |   mgmt_msg_init(&conn->mstate, max_read, max_write, max_size, idtag);  | 
894  |  |  | 
895  |  |   /* start reading */  | 
896  | 0  |   msg_conn_sched_read(conn);  | 
897  |  |  | 
898  |  |   /* Make socket non-blocking.  */  | 
899  | 0  |   set_nonblocking(conn->fd);  | 
900  | 0  |   setsockopt_so_sendbuf(conn->fd, MSG_CONN_SEND_BUF_SIZE);  | 
901  | 0  |   setsockopt_so_recvbuf(conn->fd, MSG_CONN_RECV_BUF_SIZE);  | 
902  | 0  | }  | 
903  |  |  | 
904  |  | struct msg_conn *  | 
905  |  | msg_server_conn_create(struct event_loop *tm, int fd,  | 
906  |  |            int (*notify_disconnect)(struct msg_conn *conn),  | 
907  |  |            void (*handle_msg)(uint8_t version, uint8_t *data,  | 
908  |  |             size_t len, struct msg_conn *conn),  | 
909  |  |            size_t max_read, size_t max_write, size_t max_size,  | 
910  |  |            void *user, const char *idtag)  | 
911  | 0  | { | 
912  | 0  |   struct msg_conn *conn = XMALLOC(MTYPE_MSG_CONN, sizeof(*conn));  | 
913  | 0  |   memset(conn, 0, sizeof(*conn));  | 
914  | 0  |   msg_conn_accept_init(conn, tm, fd, notify_disconnect, handle_msg,  | 
915  | 0  |            max_read, max_write, max_size, idtag);  | 
916  | 0  |   conn->user = user;  | 
917  | 0  |   return conn;  | 
918  | 0  | }  | 
919  |  |  | 
920  |  | void msg_server_conn_delete(struct msg_conn *conn)  | 
921  | 0  | { | 
922  | 0  |   if (!conn)  | 
923  | 0  |     return;  | 
924  | 0  |   msg_conn_cleanup(conn);  | 
925  |  |   XFREE(MTYPE_MSG_CONN, conn);  | 
926  | 0  | }  |