Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (C) 2017 OpenSIPS Project |
3 | | * |
4 | | * This file is part of opensips, a free SIP server. |
5 | | * |
6 | | * opensips is free software; you can redistribute it and/or modify |
7 | | * it under the terms of the GNU General Public License as published by |
8 | | * the Free Software Foundation; either version 2 of the License, or |
9 | | * (at your option) any later version |
10 | | * |
11 | | * opensips is distributed in the hope that it will be useful, |
12 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
13 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
14 | | * GNU General Public License for more details. |
15 | | * |
16 | | * You should have received a copy of the GNU General Public License |
17 | | * along with this program; if not, write to the Free Software |
18 | | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA |
19 | | */ |
20 | | |
21 | | |
22 | | #include <string.h> |
23 | | #include <errno.h> |
24 | | #include <sys/types.h> |
25 | | #include <sys/socket.h> |
26 | | |
27 | | #include "ipc.h" |
28 | | #include "dprint.h" |
29 | | #include "mem/mem.h" |
30 | | |
31 | | #include <fcntl.h> |
32 | | |
33 | 0 | #define IPC_HANDLER_NAME_MAX 32 |
34 | | typedef struct _ipc_handler { |
35 | | /* handler function */ |
36 | | ipc_handler_f *func; |
37 | | /* same name/description, null terminated */ |
38 | | char name[IPC_HANDLER_NAME_MAX+1]; |
39 | | } ipc_handler; |
40 | | |
41 | | typedef struct _ipc_job { |
42 | | /* the ID (internal) of the process sending the job */ |
43 | | unsigned short snd_proc; |
44 | | /* the job's handler type */ |
45 | | ipc_handler_type handler_type; |
46 | | /* the payload of the job, just pointers */ |
47 | | void *payload1; |
48 | | void *payload2; |
49 | | } ipc_job; |
50 | | |
51 | | static ipc_handler *ipc_handlers = NULL; |
52 | | static unsigned int ipc_handlers_no = 0; |
53 | | |
54 | | /* shared IPC support: dispatching a job to a random OpenSIPS worker */ |
55 | | static int ipc_shared_pipe[2]; |
56 | | |
57 | | /* IPC type used for RPC - a self registered type */ |
58 | | static ipc_handler_type ipc_rpc_type = 0; |
59 | | |
60 | | /* FD (pipe) used for dispatching IPC jobs between all processes (1 to any) */ |
61 | | int ipc_shared_fd_read; |
62 | | |
63 | | int init_ipc(void) |
64 | 0 | { |
65 | 0 | int optval; |
66 | | |
67 | | /* create the pipe for dispatching the timer jobs */ |
68 | 0 | if (pipe(ipc_shared_pipe) != 0) { |
69 | 0 | LM_ERR("failed to create ipc pipe (%s)!\n", strerror(errno)); |
70 | 0 | return -1; |
71 | 0 | } |
72 | | |
73 | | /* make reading fd non-blocking */ |
74 | 0 | optval = fcntl(ipc_shared_pipe[0], F_GETFL); |
75 | 0 | if (optval == -1) { |
76 | 0 | LM_ERR("fcntl failed: (%d) %s\n", errno, strerror(errno)); |
77 | 0 | return -1; |
78 | 0 | } |
79 | | |
80 | 0 | if (fcntl(ipc_shared_pipe[0], F_SETFL, optval|O_NONBLOCK) == -1) { |
81 | 0 | LM_ERR("set non-blocking failed: (%d) %s\n", errno, strerror(errno)); |
82 | 0 | return -1; |
83 | 0 | } |
84 | | |
85 | 0 | ipc_shared_fd_read = ipc_shared_pipe[0]; |
86 | | |
87 | | /* self-register the IPC type for RPC */ |
88 | 0 | ipc_rpc_type = ipc_register_handler( NULL, "RPC"); |
89 | 0 | if (ipc_bad_handler_type(ipc_rpc_type)) { |
90 | 0 | LM_ERR("failed to self register RPC type\n"); |
91 | 0 | return -1; |
92 | 0 | } |
93 | | |
94 | | /* we are all set */ |
95 | 0 | return 0; |
96 | 0 | } |
97 | | |
98 | | |
99 | | int create_ipc_pipes( int proc_no ) |
100 | 0 | { |
101 | 0 | int optval, i; |
102 | |
|
103 | 0 | for( i=0 ; i<proc_no ; i++ ) { |
104 | 0 | if (pipe(pt[i].ipc_pipe_holder)<0) { |
105 | 0 | LM_ERR("failed to create IPC pipe for process %d, err %d/%s\n", |
106 | 0 | i, errno, strerror(errno)); |
107 | 0 | return -1; |
108 | 0 | } |
109 | | |
110 | | /* make writing fd non-blocking */ |
111 | 0 | optval = fcntl( pt[i].ipc_pipe_holder[1], F_GETFL); |
112 | 0 | if (optval == -1) { |
113 | 0 | LM_ERR("fcntl failed: (%d) %s\n", errno, strerror(errno)); |
114 | 0 | return -1; |
115 | 0 | } |
116 | | |
117 | 0 | if (fcntl(pt[i].ipc_pipe_holder[1], F_SETFL, optval|O_NONBLOCK) == -1){ |
118 | 0 | LM_ERR("set non-blocking write failed: (%d) %s\n", |
119 | 0 | errno, strerror(errno)); |
120 | 0 | return -1; |
121 | 0 | } |
122 | | |
123 | | |
124 | 0 | if (pipe(pt[i].ipc_sync_pipe_holder)<0) { |
125 | 0 | LM_ERR("failed to create IPC sync pipe for process %d, " |
126 | 0 | "err %d/%s\n", i, errno, strerror(errno)); |
127 | 0 | return -1; |
128 | 0 | } |
129 | | |
130 | | /* make writing fd non-blocking */ |
131 | 0 | optval = fcntl( pt[i].ipc_sync_pipe_holder[1], F_GETFL); |
132 | 0 | if (optval == -1) { |
133 | 0 | LM_ERR("fcntl failed: (%d) %s\n", errno, strerror(errno)); |
134 | 0 | return -1; |
135 | 0 | } |
136 | | |
137 | 0 | if (fcntl(pt[i].ipc_sync_pipe_holder[1], F_SETFL, optval|O_NONBLOCK) == -1){ |
138 | 0 | LM_ERR("set non-blocking write failed: (%d) %s\n", |
139 | 0 | errno, strerror(errno)); |
140 | 0 | return -1; |
141 | 0 | } |
142 | |
|
143 | 0 | } |
144 | 0 | return 0; |
145 | 0 | } |
146 | | |
147 | | |
148 | | ipc_handler_type ipc_register_handler( ipc_handler_f *hdl, char *name) |
149 | 0 | { |
150 | 0 | ipc_handler *new; |
151 | | |
152 | | /* allocate an n+1 new buffer to accomodate the new handler */ |
153 | 0 | new = (ipc_handler*) |
154 | 0 | pkg_malloc( (ipc_handlers_no+1)*sizeof(ipc_handler) ); |
155 | 0 | if (new==NULL) { |
156 | 0 | LM_ERR("failed to alloctes IPC handler array for size %d\n", |
157 | 0 | ipc_handlers_no+1); |
158 | 0 | return -1; |
159 | 0 | } |
160 | | |
161 | | /* copy previous records, if any */ |
162 | 0 | if (ipc_handlers) { |
163 | 0 | memcpy( new, ipc_handlers, ipc_handlers_no*sizeof(ipc_handler) ); |
164 | 0 | pkg_free( ipc_handlers ); |
165 | 0 | } |
166 | | |
167 | | /* copy handler function */ |
168 | 0 | new[ipc_handlers_no].func = hdl; |
169 | | |
170 | | /* copy the name, trunkate it needed, but keep it null terminated */ |
171 | 0 | strncpy( new[ipc_handlers_no].name , name, IPC_HANDLER_NAME_MAX); |
172 | 0 | new[ipc_handlers_no].name[IPC_HANDLER_NAME_MAX] = 0; |
173 | |
|
174 | 0 | ipc_handlers = new; |
175 | |
|
176 | 0 | LM_DBG("IPC type %d [%s] registered with handler %p\n", |
177 | 0 | ipc_handlers_no, ipc_handlers[ipc_handlers_no].name, hdl ); |
178 | |
|
179 | 0 | return ipc_handlers_no++; |
180 | 0 | } |
181 | | |
182 | | |
183 | | static inline int __ipc_send_job(int fd, int dst_proc, ipc_handler_type type, |
184 | | void *payload1, void *payload2) |
185 | 0 | { |
186 | 0 | ipc_job job; |
187 | 0 | int n; |
188 | | |
189 | | // FIXME - we should check if the destination process really listens |
190 | | // for read, otherwise we may end up filling in the pipe and block |
191 | 0 | memset(&job, 0, sizeof job); |
192 | |
|
193 | 0 | job.snd_proc = (short)process_no; |
194 | 0 | job.handler_type = type; |
195 | 0 | job.payload1 = payload1; |
196 | 0 | job.payload2 = payload2; |
197 | |
|
198 | 0 | again: |
199 | | /* The per-proc IPC write fds are sent to non-blocking (to be sure we |
200 | | * do not escalate into a global blocking if a single process got stuck. |
201 | | * In such care the EAGAIN or EWOULDBLOCK will be thrown and we will |
202 | | * handle as generic error, nothing special to do. |
203 | | */ |
204 | 0 | n = write(fd, &job, sizeof(job) ); |
205 | 0 | if (n<0) { |
206 | 0 | if (errno==EAGAIN || errno==EWOULDBLOCK) |
207 | 0 | LM_CRIT("blocking detected while sending job type %d[%s] on %d " |
208 | 0 | " to proc id %d/%d [%s]\n", type, ipc_handlers[type].name, fd, |
209 | 0 | dst_proc, (dst_proc==-1)?-1:pt[dst_proc].pid , |
210 | 0 | (dst_proc==-1)?"n/a":pt[dst_proc].desc); |
211 | 0 | else if (errno==EINTR) |
212 | 0 | goto again; |
213 | 0 | else |
214 | 0 | LM_ERR("sending job type %d[%s] on %d failed: %s\n", |
215 | 0 | type, ipc_handlers[type].name, fd, strerror(errno)); |
216 | 0 | return -1; |
217 | 0 | } |
218 | 0 | return 0; |
219 | 0 | } |
220 | | |
221 | | int ipc_send_job(int dst_proc, ipc_handler_type type, void *payload) |
222 | 0 | { |
223 | 0 | return __ipc_send_job(IPC_FD_WRITE(dst_proc), dst_proc, |
224 | 0 | type, payload, NULL); |
225 | 0 | } |
226 | | |
227 | | int ipc_dispatch_job(ipc_handler_type type, void *payload) |
228 | 0 | { |
229 | 0 | return __ipc_send_job(ipc_shared_pipe[1], -1, type, payload, NULL); |
230 | 0 | } |
231 | | |
232 | | int ipc_send_rpc(int dst_proc, ipc_rpc_f *rpc, void *param) |
233 | 0 | { |
234 | | /* wait for the write IPC FD to be available, for a maximum 200ms */ |
235 | 0 | busy_wait_for(IPC_FD_WRITE(dst_proc) >= 0, 200000, 10); |
236 | 0 | return __ipc_send_job(IPC_FD_WRITE(dst_proc), dst_proc, |
237 | 0 | ipc_rpc_type, rpc, param); |
238 | 0 | } |
239 | | |
240 | | int ipc_send_rpc_all(ipc_rpc_f *rpc, void *param) |
241 | 0 | { |
242 | 0 | int p, count = 0; |
243 | |
|
244 | 0 | for (p = 1; p < counted_max_processes; p++) { |
245 | 0 | if (pt[p].flags & OSS_PROC_NO_IPC) |
246 | 0 | continue; |
247 | 0 | if (p == process_no) { |
248 | | /* run line the cmd for the proc itself */ |
249 | 0 | rpc(process_no, param); |
250 | 0 | count++; |
251 | 0 | } else { |
252 | 0 | if (ipc_send_rpc(p, rpc, param) >= 0) |
253 | 0 | count++; |
254 | 0 | } |
255 | 0 | } |
256 | 0 | return count; |
257 | 0 | } |
258 | | |
259 | | int ipc_dispatch_rpc( ipc_rpc_f *rpc, void *param) |
260 | 0 | { |
261 | 0 | return __ipc_send_job(ipc_shared_pipe[1], -1, ipc_rpc_type, rpc, param); |
262 | 0 | } |
263 | | |
264 | | int ipc_send_sync_reply(int dst_proc, void *param) |
265 | 0 | { |
266 | 0 | int n; |
267 | |
|
268 | 0 | again: |
269 | 0 | n = write(IPC_FD_SYNC_WRITE(dst_proc), ¶m, sizeof(param)); |
270 | 0 | if (n<0) { |
271 | 0 | if (errno==EINTR) |
272 | 0 | goto again; |
273 | 0 | LM_ERR("sending sync rpc %d[%s]\n", errno, strerror(errno)); |
274 | 0 | return -1; |
275 | 0 | } |
276 | 0 | return 0; |
277 | 0 | } |
278 | | |
279 | | int ipc_recv_sync_reply(void **param) |
280 | 0 | { |
281 | 0 | void *ret; |
282 | 0 | int n; |
283 | |
|
284 | 0 | again: |
285 | 0 | n = read(IPC_FD_SYNC_READ_SELF, &ret, sizeof(ret)); |
286 | 0 | if (n < sizeof(*ret)) { |
287 | 0 | if (errno == EINTR) |
288 | 0 | goto again; |
289 | | /* if we got here, it's definitely an error, because the socket is |
290 | | * blocking, so we can't read partial messages */ |
291 | 0 | LM_ERR("read failed:[%d] %s\n", errno, strerror(errno)); |
292 | 0 | return -1; |
293 | 0 | } |
294 | 0 | *param = ret; |
295 | 0 | return 0; |
296 | 0 | } |
297 | | |
298 | | void ipc_handle_job(int fd) |
299 | 0 | { |
300 | 0 | ipc_job job; |
301 | 0 | int n; |
302 | | |
303 | | /* read one IPC job from the pipe; even if the read is blocking, |
304 | | * we are here triggered from the reactor, on a READ event, so |
305 | | * we shouldn;t ever block */ |
306 | 0 | n = read(fd, &job, sizeof(job) ); |
307 | 0 | if (n==-1) { |
308 | 0 | if (errno==EAGAIN || errno==EINTR || errno==EWOULDBLOCK ) |
309 | 0 | return; |
310 | 0 | LM_ERR("read failed:[%d] %s\n", errno, strerror(errno)); |
311 | 0 | return; |
312 | 0 | } |
313 | | |
314 | | /* suppress the E_CORE_LOG event for the below log while handling |
315 | | * the event itself */ |
316 | 0 | suppress_proc_log_event(); |
317 | |
|
318 | 0 | LM_DBG("received job type %d[%s] from process %d\n", |
319 | 0 | job.handler_type, ipc_handlers[job.handler_type].name, job.snd_proc); |
320 | |
|
321 | 0 | reset_proc_log_event(); |
322 | | |
323 | | /* custom handling for RPC type */ |
324 | 0 | if (job.handler_type==ipc_rpc_type) { |
325 | 0 | ((ipc_rpc_f*)job.payload1)( job.snd_proc, job.payload2); |
326 | 0 | } else { |
327 | | /* generic registered type */ |
328 | 0 | ipc_handlers[job.handler_type].func( job.snd_proc, job.payload1); |
329 | 0 | } |
330 | |
|
331 | 0 | return; |
332 | 0 | } |
333 | | |
334 | | |
335 | | void ipc_handle_all_pending_jobs(int fd) |
336 | 0 | { |
337 | 0 | char buf; |
338 | |
|
339 | 0 | while ( recv(fd, &buf, 1, MSG_DONTWAIT|MSG_PEEK)==1 ) |
340 | 0 | ipc_handle_job(fd); |
341 | 0 | } |
342 | | |