Coverage Report

Created: 2025-07-11 06:28

/src/opensips/ipc.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (C) 2017 OpenSIPS Project
3
 *
4
 * This file is part of opensips, a free SIP server.
5
 *
6
 * opensips is free software; you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 2 of the License, or
9
 * (at your option) any later version
10
 *
11
 * opensips is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License
17
 * along with this program; if not, write to the Free Software
18
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19
 */
20
21
22
#include <string.h>
23
#include <errno.h>
24
#include <sys/types.h>
25
#include <sys/socket.h>
26
27
#include "ipc.h"
28
#include "dprint.h"
29
#include "mem/mem.h"
30
31
#include <fcntl.h>
32
33
0
#define IPC_HANDLER_NAME_MAX  32
34
typedef struct _ipc_handler {
35
  /* handler function */
36
  ipc_handler_f *func;
37
  /* same name/description, null terminated */
38
  char name[IPC_HANDLER_NAME_MAX+1];
39
} ipc_handler;
40
41
typedef struct _ipc_job {
42
  /* the ID (internal) of the process sending the job */
43
  unsigned short snd_proc;
44
  /* the job's handler type */
45
  ipc_handler_type handler_type;
46
  /* the payload of the job, just pointers */
47
  void *payload1;
48
  void *payload2;
49
} ipc_job;
50
51
static ipc_handler *ipc_handlers = NULL;
52
static unsigned int ipc_handlers_no = 0;
53
54
/* shared IPC support: dispatching a job to a random OpenSIPS worker */
55
static int ipc_shared_pipe[2];
56
57
/* IPC type used for RPC - a self registered type */
58
static ipc_handler_type ipc_rpc_type = 0;
59
60
/* FD (pipe) used for dispatching IPC jobs between all processes (1 to any) */
61
int ipc_shared_fd_read;
62
63
int init_ipc(void)
64
0
{
65
0
  int optval;
66
67
  /* create the pipe for dispatching the timer jobs */
68
0
  if (pipe(ipc_shared_pipe) != 0) {
69
0
    LM_ERR("failed to create ipc pipe (%s)!\n", strerror(errno));
70
0
    return -1;
71
0
  }
72
73
  /* make reading fd non-blocking */
74
0
  optval = fcntl(ipc_shared_pipe[0], F_GETFL);
75
0
  if (optval == -1) {
76
0
    LM_ERR("fcntl failed: (%d) %s\n", errno, strerror(errno));
77
0
    return -1;
78
0
  }
79
80
0
  if (fcntl(ipc_shared_pipe[0], F_SETFL, optval|O_NONBLOCK) == -1) {
81
0
    LM_ERR("set non-blocking failed: (%d) %s\n", errno, strerror(errno));
82
0
    return -1;
83
0
  }
84
85
0
  ipc_shared_fd_read = ipc_shared_pipe[0];
86
87
  /* self-register the IPC type for RPC */
88
0
  ipc_rpc_type = ipc_register_handler( NULL, "RPC");
89
0
  if (ipc_bad_handler_type(ipc_rpc_type)) {
90
0
    LM_ERR("failed to self register RPC type\n");
91
0
    return -1;
92
0
  }
93
94
  /* we are all set */
95
0
  return 0;
96
0
}
97
98
99
int create_ipc_pipes( int proc_no )
100
0
{
101
0
  int optval, i;
102
103
0
  for( i=0 ; i<proc_no ; i++ ) {
104
0
    if (pipe(pt[i].ipc_pipe_holder)<0) {
105
0
      LM_ERR("failed to create IPC pipe for process %d, err %d/%s\n",
106
0
        i, errno, strerror(errno));
107
0
      return -1;
108
0
    }
109
110
    /* make writing fd non-blocking */
111
0
    optval = fcntl( pt[i].ipc_pipe_holder[1], F_GETFL);
112
0
    if (optval == -1) {
113
0
      LM_ERR("fcntl failed: (%d) %s\n", errno, strerror(errno));
114
0
      return -1;
115
0
    }
116
117
0
    if (fcntl(pt[i].ipc_pipe_holder[1], F_SETFL, optval|O_NONBLOCK) == -1){
118
0
      LM_ERR("set non-blocking write failed: (%d) %s\n",
119
0
        errno, strerror(errno));
120
0
      return -1;
121
0
    }
122
123
124
0
    if (pipe(pt[i].ipc_sync_pipe_holder)<0) {
125
0
      LM_ERR("failed to create IPC sync pipe for process %d, "
126
0
        "err %d/%s\n", i, errno, strerror(errno));
127
0
      return -1;
128
0
    }
129
130
    /* make writing fd non-blocking */
131
0
    optval = fcntl( pt[i].ipc_sync_pipe_holder[1], F_GETFL);
132
0
    if (optval == -1) {
133
0
      LM_ERR("fcntl failed: (%d) %s\n", errno, strerror(errno));
134
0
      return -1;
135
0
    }
136
137
0
    if (fcntl(pt[i].ipc_sync_pipe_holder[1], F_SETFL, optval|O_NONBLOCK) == -1){
138
0
      LM_ERR("set non-blocking write failed: (%d) %s\n",
139
0
        errno, strerror(errno));
140
0
      return -1;
141
0
    }
142
143
0
  }
144
0
  return 0;
145
0
}
146
147
148
ipc_handler_type ipc_register_handler( ipc_handler_f *hdl, char *name)
149
0
{
150
0
  ipc_handler *new;
151
152
  /* allocate an n+1 new buffer to accomodate the new handler */
153
0
  new = (ipc_handler*)
154
0
    pkg_malloc( (ipc_handlers_no+1)*sizeof(ipc_handler) );
155
0
  if (new==NULL) {
156
0
    LM_ERR("failed to alloctes IPC handler array for size %d\n",
157
0
      ipc_handlers_no+1);
158
0
    return -1;
159
0
  }
160
161
  /* copy previous records, if any */
162
0
  if (ipc_handlers) {
163
0
    memcpy( new, ipc_handlers, ipc_handlers_no*sizeof(ipc_handler) );
164
0
    pkg_free( ipc_handlers );
165
0
  }
166
167
  /* copy handler function */
168
0
  new[ipc_handlers_no].func = hdl;
169
170
  /* copy the name, trunkate it needed, but keep it null terminated */
171
0
  strncpy( new[ipc_handlers_no].name , name, IPC_HANDLER_NAME_MAX);
172
0
  new[ipc_handlers_no].name[IPC_HANDLER_NAME_MAX] = 0;
173
174
0
  ipc_handlers = new;
175
176
0
  LM_DBG("IPC type %d [%s] registered with handler %p\n",
177
0
    ipc_handlers_no, ipc_handlers[ipc_handlers_no].name, hdl );
178
179
0
  return ipc_handlers_no++;
180
0
}
181
182
183
static inline int __ipc_send_job(int fd, int dst_proc, ipc_handler_type type,
184
                        void *payload1, void *payload2)
185
0
{
186
0
  ipc_job job;
187
0
  int n;
188
189
  // FIXME - we should check if the destination process really listens
190
  // for read, otherwise we may end up filling in the pipe and block
191
0
  memset(&job, 0, sizeof job);
192
193
0
  job.snd_proc = (short)process_no;
194
0
  job.handler_type = type;
195
0
  job.payload1 = payload1;
196
0
  job.payload2 = payload2;
197
198
0
again:
199
  /* The per-proc IPC write fds are sent to non-blocking (to be sure we
200
   * do not escalate into a global blocking if a single process got stuck.
201
   * In such care the EAGAIN or EWOULDBLOCK will be thrown and we will
202
   * handle as generic error, nothing special to do.
203
   */
204
0
  n = write(fd, &job, sizeof(job) );
205
0
  if (n<0) {
206
0
    if (errno==EAGAIN || errno==EWOULDBLOCK)
207
0
      LM_CRIT("blocking detected while sending job type %d[%s] on %d "
208
0
        " to proc id %d/%d [%s]\n", type, ipc_handlers[type].name, fd,
209
0
        dst_proc, (dst_proc==-1)?-1:pt[dst_proc].pid ,
210
0
        (dst_proc==-1)?"n/a":pt[dst_proc].desc);
211
0
    else if (errno==EINTR)
212
0
      goto again;
213
0
    else
214
0
      LM_ERR("sending job type %d[%s] on %d failed: %s\n",
215
0
        type, ipc_handlers[type].name, fd, strerror(errno));
216
0
    return -1;
217
0
  }
218
0
  return 0;
219
0
}
220
221
int ipc_send_job(int dst_proc, ipc_handler_type type, void *payload)
222
0
{
223
0
  return __ipc_send_job(IPC_FD_WRITE(dst_proc), dst_proc,
224
0
    type, payload, NULL);
225
0
}
226
227
int ipc_dispatch_job(ipc_handler_type type, void *payload)
228
0
{
229
0
  return __ipc_send_job(ipc_shared_pipe[1], -1, type, payload, NULL);
230
0
}
231
232
int ipc_send_rpc(int dst_proc, ipc_rpc_f *rpc, void *param)
233
0
{
234
  /* wait for the write IPC FD to be available, for a maximum 200ms */
235
0
  busy_wait_for(IPC_FD_WRITE(dst_proc) >= 0, 200000, 10);
236
0
  return __ipc_send_job(IPC_FD_WRITE(dst_proc), dst_proc,
237
0
    ipc_rpc_type, rpc, param);
238
0
}
239
240
int ipc_send_rpc_all(ipc_rpc_f *rpc, void *param)
241
0
{
242
0
  int p, count = 0;
243
244
0
  for (p = 1; p < counted_max_processes; p++) {
245
0
    if (pt[p].flags & OSS_PROC_NO_IPC)
246
0
      continue;
247
0
    if (p == process_no) {
248
      /* run line the cmd for the proc itself */
249
0
      rpc(process_no, param);
250
0
      count++;
251
0
    } else {
252
0
      if (ipc_send_rpc(p, rpc, param) >= 0)
253
0
        count++;
254
0
    }
255
0
  }
256
0
  return count;
257
0
}
258
259
int ipc_dispatch_rpc( ipc_rpc_f *rpc, void *param)
260
0
{
261
0
  return __ipc_send_job(ipc_shared_pipe[1], -1, ipc_rpc_type, rpc, param);
262
0
}
263
264
int ipc_send_sync_reply(int dst_proc, void *param)
265
0
{
266
0
  int n;
267
268
0
again:
269
0
  n = write(IPC_FD_SYNC_WRITE(dst_proc), &param, sizeof(param));
270
0
  if (n<0) {
271
0
    if (errno==EINTR)
272
0
      goto again;
273
0
    LM_ERR("sending sync rpc %d[%s]\n", errno, strerror(errno));
274
0
    return -1;
275
0
  }
276
0
  return 0;
277
0
}
278
279
int ipc_recv_sync_reply(void **param)
280
0
{
281
0
  void *ret;
282
0
  int n;
283
284
0
again:
285
0
  n = read(IPC_FD_SYNC_READ_SELF, &ret, sizeof(ret));
286
0
  if (n < sizeof(*ret)) {
287
0
    if (errno == EINTR)
288
0
      goto again;
289
    /* if we got here, it's definitely an error, because the socket is
290
     * blocking, so we can't read partial messages */
291
0
    LM_ERR("read failed:[%d] %s\n", errno, strerror(errno));
292
0
    return -1;
293
0
  }
294
0
  *param = ret;
295
0
  return 0;
296
0
}
297
298
void ipc_handle_job(int fd)
299
0
{
300
0
  ipc_job job;
301
0
  int n;
302
303
  /* read one IPC job from the pipe; even if the read is blocking,
304
   * we are here triggered from the reactor, on a READ event, so 
305
   * we shouldn;t ever block */
306
0
  n = read(fd, &job, sizeof(job) );
307
0
  if (n==-1) {
308
0
    if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK )
309
0
      return;
310
0
    LM_ERR("read failed:[%d] %s\n", errno, strerror(errno));
311
0
    return;
312
0
  }
313
314
  /* suppress the E_CORE_LOG event for the below log while handling
315
   * the event itself */
316
0
  suppress_proc_log_event();
317
318
0
  LM_DBG("received job type %d[%s] from process %d\n",
319
0
    job.handler_type, ipc_handlers[job.handler_type].name, job.snd_proc);
320
321
0
  reset_proc_log_event();
322
323
  /* custom handling for RPC type */
324
0
  if (job.handler_type==ipc_rpc_type) {
325
0
    ((ipc_rpc_f*)job.payload1)( job.snd_proc, job.payload2);
326
0
  } else {
327
    /* generic registered type */
328
0
    ipc_handlers[job.handler_type].func( job.snd_proc, job.payload1);
329
0
  }
330
331
0
  return;
332
0
}
333
334
335
void ipc_handle_all_pending_jobs(int fd)
336
0
{
337
0
  char buf;
338
339
0
  while ( recv(fd, &buf, 1, MSG_DONTWAIT|MSG_PEEK)==1 )
340
0
    ipc_handle_job(fd);
341
0
}
342