/src/systemd/src/journal-remote/journal-remote.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* SPDX-License-Identifier: LGPL-2.1+ */ |
2 | | |
3 | | #include <errno.h> |
4 | | #include <fcntl.h> |
5 | | #include <stdio.h> |
6 | | #include <stdlib.h> |
7 | | #include <string.h> |
8 | | #include <sys/prctl.h> |
9 | | #include <sys/socket.h> |
10 | | #include <stdint.h> |
11 | | |
12 | | #include "sd-daemon.h" |
13 | | |
14 | | #include "alloc-util.h" |
15 | | #include "def.h" |
16 | | #include "errno-util.h" |
17 | | #include "escape.h" |
18 | | #include "fd-util.h" |
19 | | #include "journal-file.h" |
20 | | #include "journal-remote-write.h" |
21 | | #include "journal-remote.h" |
22 | | #include "journald-native.h" |
23 | | #include "macro.h" |
24 | | #include "parse-util.h" |
25 | | #include "process-util.h" |
26 | | #include "socket-util.h" |
27 | | #include "stdio-util.h" |
28 | | #include "string-util.h" |
29 | | #include "strv.h" |
30 | | |
31 | 0 | #define REMOTE_JOURNAL_PATH "/var/log/journal/remote" |
32 | | |
33 | 0 | #define filename_escape(s) xescape((s), "/ ") |
34 | | |
35 | 4.30k | static int open_output(RemoteServer *s, Writer *w, const char* host) { |
36 | 4.30k | _cleanup_free_ char *_filename = NULL; |
37 | 4.30k | const char *filename; |
38 | 4.30k | int r; |
39 | 4.30k | |
40 | 4.30k | switch (s->split_mode) { |
41 | 4.30k | case JOURNAL_WRITE_SPLIT_NONE: |
42 | 4.30k | filename = s->output; |
43 | 4.30k | break; |
44 | 4.30k | |
45 | 4.30k | case JOURNAL_WRITE_SPLIT_HOST: { |
46 | 0 | _cleanup_free_ char *name; |
47 | 0 |
|
48 | 0 | assert(host); |
49 | 0 |
|
50 | 0 | name = filename_escape(host); |
51 | 0 | if (!name) |
52 | 0 | return log_oom(); |
53 | 0 | |
54 | 0 | r = asprintf(&_filename, "%s/remote-%s.journal", s->output, name); |
55 | 0 | if (r < 0) |
56 | 0 | return log_oom(); |
57 | 0 | |
58 | 0 | filename = _filename; |
59 | 0 | break; |
60 | 0 | } |
61 | 0 |
|
62 | 0 | default: |
63 | 0 | assert_not_reached("what?"); |
64 | 4.30k | } |
65 | 4.30k | |
66 | 4.30k | r = journal_file_open_reliably(filename, |
67 | 4.30k | O_RDWR|O_CREAT, 0640, |
68 | 4.30k | s->compress, (uint64_t) -1, s->seal, |
69 | 4.30k | &w->metrics, |
70 | 4.30k | w->mmap, NULL, |
71 | 4.30k | NULL, &w->journal); |
72 | 4.30k | if (r < 0) |
73 | 0 | return log_error_errno(r, "Failed to open output journal %s: %m", filename); |
74 | 4.30k | |
75 | 4.30k | log_debug("Opened output file %s", w->journal->path); |
76 | 4.30k | return 0; |
77 | 4.30k | } |
78 | | |
79 | | /********************************************************************** |
80 | | ********************************************************************** |
81 | | **********************************************************************/ |
82 | | |
83 | 4.30k | static int init_writer_hashmap(RemoteServer *s) { |
84 | 4.30k | static const struct hash_ops* const hash_ops[] = { |
85 | 4.30k | [JOURNAL_WRITE_SPLIT_NONE] = NULL, |
86 | 4.30k | [JOURNAL_WRITE_SPLIT_HOST] = &string_hash_ops, |
87 | 4.30k | }; |
88 | 4.30k | |
89 | 4.30k | assert(s); |
90 | 4.30k | assert(s->split_mode >= 0 && s->split_mode < (int) ELEMENTSOF(hash_ops)); |
91 | 4.30k | |
92 | 4.30k | s->writers = hashmap_new(hash_ops[s->split_mode]); |
93 | 4.30k | if (!s->writers) |
94 | 0 | return log_oom(); |
95 | 4.30k | |
96 | 4.30k | return 0; |
97 | 4.30k | } |
98 | | |
99 | 4.30k | int journal_remote_get_writer(RemoteServer *s, const char *host, Writer **writer) { |
100 | 4.30k | _cleanup_(writer_unrefp) Writer *w = NULL; |
101 | 4.30k | const void *key; |
102 | 4.30k | int r; |
103 | 4.30k | |
104 | 4.30k | switch(s->split_mode) { |
105 | 4.30k | case JOURNAL_WRITE_SPLIT_NONE: |
106 | 4.30k | key = "one and only"; |
107 | 4.30k | break; |
108 | 4.30k | |
109 | 4.30k | case JOURNAL_WRITE_SPLIT_HOST: |
110 | 0 | assert(host); |
111 | 0 | key = host; |
112 | 0 | break; |
113 | 0 |
|
114 | 0 | default: |
115 | 0 | assert_not_reached("what split mode?"); |
116 | 4.30k | } |
117 | 4.30k | |
118 | 4.30k | w = hashmap_get(s->writers, key); |
119 | 4.30k | if (w) |
120 | 0 | writer_ref(w); |
121 | 4.30k | else { |
122 | 4.30k | w = writer_new(s); |
123 | 4.30k | if (!w) |
124 | 0 | return log_oom(); |
125 | 4.30k | |
126 | 4.30k | if (s->split_mode == JOURNAL_WRITE_SPLIT_HOST) { |
127 | 0 | w->hashmap_key = strdup(key); |
128 | 0 | if (!w->hashmap_key) |
129 | 0 | return log_oom(); |
130 | 4.30k | } |
131 | 4.30k | |
132 | 4.30k | r = open_output(s, w, host); |
133 | 4.30k | if (r < 0) |
134 | 0 | return r; |
135 | 4.30k | |
136 | 4.30k | r = hashmap_put(s->writers, w->hashmap_key ?: key, w); |
137 | 4.30k | if (r < 0) |
138 | 0 | return r; |
139 | 4.30k | } |
140 | 4.30k | |
141 | 4.30k | *writer = TAKE_PTR(w); |
142 | 4.30k | |
143 | 4.30k | return 0; |
144 | 4.30k | } |
145 | | |
146 | | /********************************************************************** |
147 | | ********************************************************************** |
148 | | **********************************************************************/ |
149 | | |
150 | | /* This should go away as soon as µhttpd allows state to be passed around. */ |
151 | | RemoteServer *journal_remote_server_global; |
152 | | |
153 | | static int dispatch_raw_source_event(sd_event_source *event, |
154 | | int fd, |
155 | | uint32_t revents, |
156 | | void *userdata); |
157 | | static int dispatch_raw_source_until_block(sd_event_source *event, |
158 | | void *userdata); |
159 | | static int dispatch_blocking_source_event(sd_event_source *event, |
160 | | void *userdata); |
161 | | static int dispatch_raw_connection_event(sd_event_source *event, |
162 | | int fd, |
163 | | uint32_t revents, |
164 | | void *userdata); |
165 | | |
166 | | static int get_source_for_fd(RemoteServer *s, |
167 | 4.30k | int fd, char *name, RemoteSource **source) { |
168 | 4.30k | Writer *writer; |
169 | 4.30k | int r; |
170 | 4.30k | |
171 | 4.30k | /* This takes ownership of name, but only on success. */ |
172 | 4.30k | |
173 | 4.30k | assert(fd >= 0); |
174 | 4.30k | assert(source); |
175 | 4.30k | |
176 | 4.30k | if (!GREEDY_REALLOC0(s->sources, s->sources_size, fd + 1)) |
177 | 4.30k | return log_oom(); |
178 | 4.30k | |
179 | 4.30k | r = journal_remote_get_writer(s, name, &writer); |
180 | 4.30k | if (r < 0) |
181 | 0 | return log_warning_errno(r, "Failed to get writer for source %s: %m", |
182 | 4.30k | name); |
183 | 4.30k | |
184 | 4.30k | if (!s->sources[fd]) { |
185 | 4.30k | s->sources[fd] = source_new(fd, false, name, writer); |
186 | 4.30k | if (!s->sources[fd]) { |
187 | 0 | writer_unref(writer); |
188 | 0 | return log_oom(); |
189 | 0 | } |
190 | 4.30k | |
191 | 4.30k | s->active++; |
192 | 4.30k | } |
193 | 4.30k | |
194 | 4.30k | *source = s->sources[fd]; |
195 | 4.30k | return 0; |
196 | 4.30k | } |
197 | | |
198 | 68.9k | static int remove_source(RemoteServer *s, int fd) { |
199 | 68.9k | RemoteSource *source; |
200 | 68.9k | |
201 | 68.9k | assert(s); |
202 | 68.9k | assert(fd >= 0 && fd < (ssize_t) s->sources_size); |
203 | 68.9k | |
204 | 68.9k | source = s->sources[fd]; |
205 | 68.9k | if (source) { |
206 | 4.30k | /* this closes fd too */ |
207 | 4.30k | source_free(source); |
208 | 4.30k | s->sources[fd] = NULL; |
209 | 4.30k | s->active--; |
210 | 4.30k | } |
211 | 68.9k | |
212 | 68.9k | return 0; |
213 | 68.9k | } |
214 | | |
215 | 4.30k | int journal_remote_add_source(RemoteServer *s, int fd, char* name, bool own_name) { |
216 | 4.30k | RemoteSource *source = NULL; |
217 | 4.30k | int r; |
218 | 4.30k | |
219 | 4.30k | /* This takes ownership of name, even on failure, if own_name is true. */ |
220 | 4.30k | |
221 | 4.30k | assert(s); |
222 | 4.30k | assert(fd >= 0); |
223 | 4.30k | assert(name); |
224 | 4.30k | |
225 | 4.30k | if (!own_name) { |
226 | 4.30k | name = strdup(name); |
227 | 4.30k | if (!name) |
228 | 0 | return log_oom(); |
229 | 4.30k | } |
230 | 4.30k | |
231 | 4.30k | r = get_source_for_fd(s, fd, name, &source); |
232 | 4.30k | if (r < 0) { |
233 | 0 | log_error_errno(r, "Failed to create source for fd:%d (%s): %m", |
234 | 0 | fd, name); |
235 | 0 | free(name); |
236 | 0 | return r; |
237 | 0 | } |
238 | 4.30k | |
239 | 4.30k | r = sd_event_add_io(s->events, &source->event, |
240 | 4.30k | fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI, |
241 | 4.30k | dispatch_raw_source_event, source); |
242 | 4.30k | if (r == 0) { |
243 | 0 | /* Add additional source for buffer processing. It will be |
244 | 0 | * enabled later. */ |
245 | 0 | r = sd_event_add_defer(s->events, &source->buffer_event, |
246 | 0 | dispatch_raw_source_until_block, source); |
247 | 0 | if (r == 0) |
248 | 0 | sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF); |
249 | 4.30k | } else if (r == -EPERM) { |
250 | 4.30k | log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name); |
251 | 4.30k | r = sd_event_add_defer(s->events, &source->event, |
252 | 4.30k | dispatch_blocking_source_event, source); |
253 | 4.30k | if (r == 0) |
254 | 4.30k | sd_event_source_set_enabled(source->event, SD_EVENT_ON); |
255 | 4.30k | } |
256 | 4.30k | if (r < 0) { |
257 | 0 | log_error_errno(r, "Failed to register event source for fd:%d: %m", |
258 | 0 | fd); |
259 | 0 | goto error; |
260 | 0 | } |
261 | 4.30k | |
262 | 4.30k | r = sd_event_source_set_description(source->event, name); |
263 | 4.30k | if (r < 0) { |
264 | 0 | log_error_errno(r, "Failed to set source name for fd:%d: %m", fd); |
265 | 0 | goto error; |
266 | 0 | } |
267 | 4.30k | |
268 | 4.30k | return 1; /* work to do */ |
269 | 0 | |
270 | 0 | error: |
271 | 0 | remove_source(s, fd); |
272 | 0 | return r; |
273 | 4.30k | } |
274 | | |
275 | 0 | int journal_remote_add_raw_socket(RemoteServer *s, int fd) { |
276 | 0 | int r; |
277 | 0 | _cleanup_close_ int fd_ = fd; |
278 | 0 | char name[STRLEN("raw-socket-") + DECIMAL_STR_MAX(int) + 1]; |
279 | 0 |
|
280 | 0 | assert(fd >= 0); |
281 | 0 |
|
282 | 0 | r = sd_event_add_io(s->events, &s->listen_event, |
283 | 0 | fd, EPOLLIN, |
284 | 0 | dispatch_raw_connection_event, s); |
285 | 0 | if (r < 0) |
286 | 0 | return r; |
287 | 0 | |
288 | 0 | xsprintf(name, "raw-socket-%d", fd); |
289 | 0 |
|
290 | 0 | r = sd_event_source_set_description(s->listen_event, name); |
291 | 0 | if (r < 0) |
292 | 0 | return r; |
293 | 0 | |
294 | 0 | fd_ = -1; |
295 | 0 | s->active++; |
296 | 0 | return 0; |
297 | 0 | } |
298 | | |
299 | | /********************************************************************** |
300 | | ********************************************************************** |
301 | | **********************************************************************/ |
302 | | |
303 | | int journal_remote_server_init( |
304 | | RemoteServer *s, |
305 | | const char *output, |
306 | | JournalWriteSplitMode split_mode, |
307 | | bool compress, |
308 | 4.30k | bool seal) { |
309 | 4.30k | |
310 | 4.30k | int r; |
311 | 4.30k | |
312 | 4.30k | assert(s); |
313 | 4.30k | |
314 | 4.30k | assert(journal_remote_server_global == NULL); |
315 | 4.30k | journal_remote_server_global = s; |
316 | 4.30k | |
317 | 4.30k | s->split_mode = split_mode; |
318 | 4.30k | s->compress = compress; |
319 | 4.30k | s->seal = seal; |
320 | 4.30k | |
321 | 4.30k | if (output) |
322 | 4.30k | s->output = output; |
323 | 0 | else if (split_mode == JOURNAL_WRITE_SPLIT_NONE) |
324 | 0 | s->output = REMOTE_JOURNAL_PATH "/remote.journal"; |
325 | 0 | else if (split_mode == JOURNAL_WRITE_SPLIT_HOST) |
326 | 0 | s->output = REMOTE_JOURNAL_PATH; |
327 | 0 | else |
328 | 0 | assert_not_reached("bad split mode"); |
329 | 4.30k | |
330 | 4.30k | r = sd_event_default(&s->events); |
331 | 4.30k | if (r < 0) |
332 | 0 | return log_error_errno(r, "Failed to allocate event loop: %m"); |
333 | 4.30k | |
334 | 4.30k | r = init_writer_hashmap(s); |
335 | 4.30k | if (r < 0) |
336 | 0 | return r; |
337 | 4.30k | |
338 | 4.30k | return 0; |
339 | 4.30k | } |
340 | | |
341 | | #if HAVE_MICROHTTPD |
342 | | static void MHDDaemonWrapper_free(MHDDaemonWrapper *d) { |
343 | | MHD_stop_daemon(d->daemon); |
344 | | sd_event_source_unref(d->io_event); |
345 | | sd_event_source_unref(d->timer_event); |
346 | | free(d); |
347 | | } |
348 | | #endif |
349 | | |
350 | 4.30k | void journal_remote_server_destroy(RemoteServer *s) { |
351 | 4.30k | size_t i; |
352 | 4.30k | |
353 | | #if HAVE_MICROHTTPD |
354 | | hashmap_free_with_destructor(s->daemons, MHDDaemonWrapper_free); |
355 | | #endif |
356 | | |
357 | 4.30k | assert(s->sources_size == 0 || s->sources); |
358 | 68.9k | for (i = 0; i < s->sources_size; i++) |
359 | 64.6k | remove_source(s, i); |
360 | 4.30k | free(s->sources); |
361 | 4.30k | |
362 | 4.30k | writer_unref(s->_single_writer); |
363 | 4.30k | hashmap_free(s->writers); |
364 | 4.30k | |
365 | 4.30k | sd_event_source_unref(s->sigterm_event); |
366 | 4.30k | sd_event_source_unref(s->sigint_event); |
367 | 4.30k | sd_event_source_unref(s->listen_event); |
368 | 4.30k | sd_event_unref(s->events); |
369 | 4.30k | |
370 | 4.30k | if (s == journal_remote_server_global) |
371 | 4.30k | journal_remote_server_global = NULL; |
372 | 4.30k | |
373 | 4.30k | /* fds that we're listening on remain open... */ |
374 | 4.30k | } |
375 | | |
376 | | /********************************************************************** |
377 | | ********************************************************************** |
378 | | **********************************************************************/ |
379 | | |
380 | | int journal_remote_handle_raw_source( |
381 | | sd_event_source *event, |
382 | | int fd, |
383 | | uint32_t revents, |
384 | 1.04M | RemoteServer *s) { |
385 | 1.04M | |
386 | 1.04M | RemoteSource *source; |
387 | 1.04M | int r; |
388 | 1.04M | |
389 | 1.04M | /* Returns 1 if there might be more data pending, |
390 | 1.04M | * 0 if data is currently exhausted, negative on error. |
391 | 1.04M | */ |
392 | 1.04M | |
393 | 1.04M | assert(fd >= 0 && fd < (ssize_t) s->sources_size); |
394 | 1.04M | source = s->sources[fd]; |
395 | 1.04M | assert(source->importer.fd == fd); |
396 | 1.04M | |
397 | 1.04M | r = process_source(source, s->compress, s->seal); |
398 | 1.04M | if (journal_importer_eof(&source->importer)) { |
399 | 3.66k | size_t remaining; |
400 | 3.66k | |
401 | 3.66k | log_debug("EOF reached with source %s (fd=%d)", |
402 | 3.66k | source->importer.name, source->importer.fd); |
403 | 3.66k | |
404 | 3.66k | remaining = journal_importer_bytes_remaining(&source->importer); |
405 | 3.66k | if (remaining > 0) |
406 | 3.66k | log_notice("Premature EOF. %zu bytes lost.", remaining); |
407 | 3.66k | remove_source(s, source->importer.fd); |
408 | 3.66k | log_debug("%zu active sources remaining", s->active); |
409 | 3.66k | return 0; |
410 | 1.04M | } else if (r == -E2BIG) { |
411 | 77.7k | log_notice("Entry with too many fields, skipped"); |
412 | 77.7k | return 1; |
413 | 964k | } else if (r == -ENOBUFS) { |
414 | 0 | log_notice("Entry too big, skipped"); |
415 | 0 | return 1; |
416 | 964k | } else if (r == -EAGAIN) { |
417 | 0 | return 0; |
418 | 964k | } else if (r < 0) { |
419 | 639 | log_debug_errno(r, "Closing connection: %m"); |
420 | 639 | remove_source(s, fd); |
421 | 639 | return 0; |
422 | 639 | } else |
423 | 964k | return 1; |
424 | 1.04M | } |
425 | | |
426 | | static int dispatch_raw_source_until_block(sd_event_source *event, |
427 | 0 | void *userdata) { |
428 | 0 | RemoteSource *source = userdata; |
429 | 0 | int r; |
430 | 0 |
|
431 | 0 | /* Make sure event stays around even if source is destroyed */ |
432 | 0 | sd_event_source_ref(event); |
433 | 0 |
|
434 | 0 | r = journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global); |
435 | 0 | if (r != 1) |
436 | 0 | /* No more data for now */ |
437 | 0 | sd_event_source_set_enabled(event, SD_EVENT_OFF); |
438 | 0 |
|
439 | 0 | sd_event_source_unref(event); |
440 | 0 |
|
441 | 0 | return r; |
442 | 0 | } |
443 | | |
444 | | static int dispatch_raw_source_event(sd_event_source *event, |
445 | | int fd, |
446 | | uint32_t revents, |
447 | 0 | void *userdata) { |
448 | 0 | RemoteSource *source = userdata; |
449 | 0 | int r; |
450 | 0 |
|
451 | 0 | assert(source->event); |
452 | 0 | assert(source->buffer_event); |
453 | 0 |
|
454 | 0 | r = journal_remote_handle_raw_source(event, fd, EPOLLIN, journal_remote_server_global); |
455 | 0 | if (r == 1) |
456 | 0 | /* Might have more data. We need to rerun the handler |
457 | 0 | * until we are sure the buffer is exhausted. */ |
458 | 0 | sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON); |
459 | 0 |
|
460 | 0 | return r; |
461 | 0 | } |
462 | | |
463 | | static int dispatch_blocking_source_event(sd_event_source *event, |
464 | 0 | void *userdata) { |
465 | 0 | RemoteSource *source = userdata; |
466 | 0 |
|
467 | 0 | return journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global); |
468 | 0 | } |
469 | | |
470 | | static int accept_connection( |
471 | | const char* type, |
472 | | int fd, |
473 | | SocketAddress *addr, |
474 | 0 | char **hostname) { |
475 | 0 |
|
476 | 0 | _cleanup_close_ int fd2 = -1; |
477 | 0 | int r; |
478 | 0 |
|
479 | 0 | log_debug("Accepting new %s connection on fd:%d", type, fd); |
480 | 0 | fd2 = accept4(fd, &addr->sockaddr.sa, &addr->size, SOCK_NONBLOCK|SOCK_CLOEXEC); |
481 | 0 | if (fd2 < 0) { |
482 | 0 | if (ERRNO_IS_ACCEPT_AGAIN(errno)) |
483 | 0 | return -EAGAIN; |
484 | 0 | |
485 | 0 | return log_error_errno(errno, "accept() on fd:%d failed: %m", fd); |
486 | 0 | } |
487 | 0 |
|
488 | 0 | switch(socket_address_family(addr)) { |
489 | 0 | case AF_INET: |
490 | 0 | case AF_INET6: { |
491 | 0 | _cleanup_free_ char *a = NULL; |
492 | 0 | char *b; |
493 | 0 |
|
494 | 0 | r = socket_address_print(addr, &a); |
495 | 0 | if (r < 0) |
496 | 0 | return log_error_errno(r, "socket_address_print(): %m"); |
497 | 0 | |
498 | 0 | r = socknameinfo_pretty(&addr->sockaddr, addr->size, &b); |
499 | 0 | if (r < 0) |
500 | 0 | return log_error_errno(r, "Resolving hostname failed: %m"); |
501 | 0 | |
502 | 0 | log_debug("Accepted %s %s connection from %s", |
503 | 0 | type, |
504 | 0 | socket_address_family(addr) == AF_INET ? "IP" : "IPv6", |
505 | 0 | a); |
506 | 0 |
|
507 | 0 | *hostname = b; |
508 | 0 | return TAKE_FD(fd2); |
509 | 0 | } |
510 | 0 |
|
511 | 0 | default: |
512 | 0 | return log_error_errno(SYNTHETIC_ERRNO(EINVAL), |
513 | 0 | "Rejected %s connection with unsupported family %d", |
514 | 0 | type, socket_address_family(addr)); |
515 | 0 | } |
516 | 0 | } |
517 | | |
518 | | static int dispatch_raw_connection_event( |
519 | | sd_event_source *event, |
520 | | int fd, |
521 | | uint32_t revents, |
522 | 0 | void *userdata) { |
523 | 0 |
|
524 | 0 | RemoteServer *s = userdata; |
525 | 0 | int fd2; |
526 | 0 | SocketAddress addr = { |
527 | 0 | .size = sizeof(union sockaddr_union), |
528 | 0 | .type = SOCK_STREAM, |
529 | 0 | }; |
530 | 0 | char *hostname = NULL; |
531 | 0 |
|
532 | 0 | fd2 = accept_connection("raw", fd, &addr, &hostname); |
533 | 0 | if (fd2 == -EAGAIN) |
534 | 0 | return 0; |
535 | 0 | if (fd2 < 0) |
536 | 0 | return fd2; |
537 | 0 | |
538 | 0 | return journal_remote_add_source(s, fd2, hostname, true); |
539 | 0 | } |