Coverage Report

Created: 2019-06-19 13:33

/src/systemd/src/journal-remote/journal-remote.c
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: LGPL-2.1+ */
2
3
#include <errno.h>
4
#include <fcntl.h>
5
#include <stdio.h>
6
#include <stdlib.h>
7
#include <string.h>
8
#include <sys/prctl.h>
9
#include <sys/socket.h>
10
#include <stdint.h>
11
12
#include "sd-daemon.h"
13
14
#include "alloc-util.h"
15
#include "def.h"
16
#include "errno-util.h"
17
#include "escape.h"
18
#include "fd-util.h"
19
#include "journal-file.h"
20
#include "journal-remote-write.h"
21
#include "journal-remote.h"
22
#include "journald-native.h"
23
#include "macro.h"
24
#include "parse-util.h"
25
#include "process-util.h"
26
#include "socket-util.h"
27
#include "stdio-util.h"
28
#include "string-util.h"
29
#include "strv.h"
30
31
0
#define REMOTE_JOURNAL_PATH "/var/log/journal/remote"
32
33
0
#define filename_escape(s) xescape((s), "/ ")
34
35
4.30k
static int open_output(RemoteServer *s, Writer *w, const char* host) {
36
4.30k
        _cleanup_free_ char *_filename = NULL;
37
4.30k
        const char *filename;
38
4.30k
        int r;
39
4.30k
40
4.30k
        switch (s->split_mode) {
41
4.30k
        case JOURNAL_WRITE_SPLIT_NONE:
42
4.30k
                filename = s->output;
43
4.30k
                break;
44
4.30k
45
4.30k
        case JOURNAL_WRITE_SPLIT_HOST: {
46
0
                _cleanup_free_ char *name;
47
0
48
0
                assert(host);
49
0
50
0
                name = filename_escape(host);
51
0
                if (!name)
52
0
                        return log_oom();
53
0
54
0
                r = asprintf(&_filename, "%s/remote-%s.journal", s->output, name);
55
0
                if (r < 0)
56
0
                        return log_oom();
57
0
58
0
                filename = _filename;
59
0
                break;
60
0
        }
61
0
62
0
        default:
63
0
                assert_not_reached("what?");
64
4.30k
        }
65
4.30k
66
4.30k
        r = journal_file_open_reliably(filename,
67
4.30k
                                       O_RDWR|O_CREAT, 0640,
68
4.30k
                                       s->compress, (uint64_t) -1, s->seal,
69
4.30k
                                       &w->metrics,
70
4.30k
                                       w->mmap, NULL,
71
4.30k
                                       NULL, &w->journal);
72
4.30k
        if (r < 0)
73
0
                return log_error_errno(r, "Failed to open output journal %s: %m", filename);
74
4.30k
75
4.30k
        log_debug("Opened output file %s", w->journal->path);
76
4.30k
        return 0;
77
4.30k
}
78
79
/**********************************************************************
80
 **********************************************************************
81
 **********************************************************************/
82
83
4.30k
static int init_writer_hashmap(RemoteServer *s) {
84
4.30k
        static const struct hash_ops* const hash_ops[] = {
85
4.30k
                [JOURNAL_WRITE_SPLIT_NONE] = NULL,
86
4.30k
                [JOURNAL_WRITE_SPLIT_HOST] = &string_hash_ops,
87
4.30k
        };
88
4.30k
89
4.30k
        assert(s);
90
4.30k
        assert(s->split_mode >= 0 && s->split_mode < (int) ELEMENTSOF(hash_ops));
91
4.30k
92
4.30k
        s->writers = hashmap_new(hash_ops[s->split_mode]);
93
4.30k
        if (!s->writers)
94
0
                return log_oom();
95
4.30k
96
4.30k
        return 0;
97
4.30k
}
98
99
4.30k
int journal_remote_get_writer(RemoteServer *s, const char *host, Writer **writer) {
100
4.30k
        _cleanup_(writer_unrefp) Writer *w = NULL;
101
4.30k
        const void *key;
102
4.30k
        int r;
103
4.30k
104
4.30k
        switch(s->split_mode) {
105
4.30k
        case JOURNAL_WRITE_SPLIT_NONE:
106
4.30k
                key = "one and only";
107
4.30k
                break;
108
4.30k
109
4.30k
        case JOURNAL_WRITE_SPLIT_HOST:
110
0
                assert(host);
111
0
                key = host;
112
0
                break;
113
0
114
0
        default:
115
0
                assert_not_reached("what split mode?");
116
4.30k
        }
117
4.30k
118
4.30k
        w = hashmap_get(s->writers, key);
119
4.30k
        if (w)
120
0
                writer_ref(w);
121
4.30k
        else {
122
4.30k
                w = writer_new(s);
123
4.30k
                if (!w)
124
0
                        return log_oom();
125
4.30k
126
4.30k
                if (s->split_mode == JOURNAL_WRITE_SPLIT_HOST) {
127
0
                        w->hashmap_key = strdup(key);
128
0
                        if (!w->hashmap_key)
129
0
                                return log_oom();
130
4.30k
                }
131
4.30k
132
4.30k
                r = open_output(s, w, host);
133
4.30k
                if (r < 0)
134
0
                        return r;
135
4.30k
136
4.30k
                r = hashmap_put(s->writers, w->hashmap_key ?: key, w);
137
4.30k
                if (r < 0)
138
0
                        return r;
139
4.30k
        }
140
4.30k
141
4.30k
        *writer = TAKE_PTR(w);
142
4.30k
143
4.30k
        return 0;
144
4.30k
}
145
146
/**********************************************************************
147
 **********************************************************************
148
 **********************************************************************/
149
150
/* This should go away as soon as µhttpd allows state to be passed around. */
151
RemoteServer *journal_remote_server_global;
152
153
static int dispatch_raw_source_event(sd_event_source *event,
154
                                     int fd,
155
                                     uint32_t revents,
156
                                     void *userdata);
157
static int dispatch_raw_source_until_block(sd_event_source *event,
158
                                           void *userdata);
159
static int dispatch_blocking_source_event(sd_event_source *event,
160
                                          void *userdata);
161
static int dispatch_raw_connection_event(sd_event_source *event,
162
                                         int fd,
163
                                         uint32_t revents,
164
                                         void *userdata);
165
166
static int get_source_for_fd(RemoteServer *s,
167
4.30k
                             int fd, char *name, RemoteSource **source) {
168
4.30k
        Writer *writer;
169
4.30k
        int r;
170
4.30k
171
4.30k
        /* This takes ownership of name, but only on success. */
172
4.30k
173
4.30k
        assert(fd >= 0);
174
4.30k
        assert(source);
175
4.30k
176
4.30k
        if (!GREEDY_REALLOC0(s->sources, s->sources_size, fd + 1))
177
4.30k
                return log_oom();
178
4.30k
179
4.30k
        r = journal_remote_get_writer(s, name, &writer);
180
4.30k
        if (r < 0)
181
0
                return log_warning_errno(r, "Failed to get writer for source %s: %m",
182
4.30k
                                         name);
183
4.30k
184
4.30k
        if (!s->sources[fd]) {
185
4.30k
                s->sources[fd] = source_new(fd, false, name, writer);
186
4.30k
                if (!s->sources[fd]) {
187
0
                        writer_unref(writer);
188
0
                        return log_oom();
189
0
                }
190
4.30k
191
4.30k
                s->active++;
192
4.30k
        }
193
4.30k
194
4.30k
        *source = s->sources[fd];
195
4.30k
        return 0;
196
4.30k
}
197
198
68.9k
static int remove_source(RemoteServer *s, int fd) {
199
68.9k
        RemoteSource *source;
200
68.9k
201
68.9k
        assert(s);
202
68.9k
        assert(fd >= 0 && fd < (ssize_t) s->sources_size);
203
68.9k
204
68.9k
        source = s->sources[fd];
205
68.9k
        if (source) {
206
4.30k
                /* this closes fd too */
207
4.30k
                source_free(source);
208
4.30k
                s->sources[fd] = NULL;
209
4.30k
                s->active--;
210
4.30k
        }
211
68.9k
212
68.9k
        return 0;
213
68.9k
}
214
215
4.30k
int journal_remote_add_source(RemoteServer *s, int fd, char* name, bool own_name) {
216
4.30k
        RemoteSource *source = NULL;
217
4.30k
        int r;
218
4.30k
219
4.30k
        /* This takes ownership of name, even on failure, if own_name is true. */
220
4.30k
221
4.30k
        assert(s);
222
4.30k
        assert(fd >= 0);
223
4.30k
        assert(name);
224
4.30k
225
4.30k
        if (!own_name) {
226
4.30k
                name = strdup(name);
227
4.30k
                if (!name)
228
0
                        return log_oom();
229
4.30k
        }
230
4.30k
231
4.30k
        r = get_source_for_fd(s, fd, name, &source);
232
4.30k
        if (r < 0) {
233
0
                log_error_errno(r, "Failed to create source for fd:%d (%s): %m",
234
0
                                fd, name);
235
0
                free(name);
236
0
                return r;
237
0
        }
238
4.30k
239
4.30k
        r = sd_event_add_io(s->events, &source->event,
240
4.30k
                            fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI,
241
4.30k
                            dispatch_raw_source_event, source);
242
4.30k
        if (r == 0) {
243
0
                /* Add additional source for buffer processing. It will be
244
0
                 * enabled later. */
245
0
                r = sd_event_add_defer(s->events, &source->buffer_event,
246
0
                                       dispatch_raw_source_until_block, source);
247
0
                if (r == 0)
248
0
                        sd_event_source_set_enabled(source->buffer_event, SD_EVENT_OFF);
249
4.30k
        } else if (r == -EPERM) {
250
4.30k
                log_debug("Falling back to sd_event_add_defer for fd:%d (%s)", fd, name);
251
4.30k
                r = sd_event_add_defer(s->events, &source->event,
252
4.30k
                                       dispatch_blocking_source_event, source);
253
4.30k
                if (r == 0)
254
4.30k
                        sd_event_source_set_enabled(source->event, SD_EVENT_ON);
255
4.30k
        }
256
4.30k
        if (r < 0) {
257
0
                log_error_errno(r, "Failed to register event source for fd:%d: %m",
258
0
                                fd);
259
0
                goto error;
260
0
        }
261
4.30k
262
4.30k
        r = sd_event_source_set_description(source->event, name);
263
4.30k
        if (r < 0) {
264
0
                log_error_errno(r, "Failed to set source name for fd:%d: %m", fd);
265
0
                goto error;
266
0
        }
267
4.30k
268
4.30k
        return 1; /* work to do */
269
0
270
0
 error:
271
0
        remove_source(s, fd);
272
0
        return r;
273
4.30k
}
274
275
0
int journal_remote_add_raw_socket(RemoteServer *s, int fd) {
276
0
        int r;
277
0
        _cleanup_close_ int fd_ = fd;
278
0
        char name[STRLEN("raw-socket-") + DECIMAL_STR_MAX(int) + 1];
279
0
280
0
        assert(fd >= 0);
281
0
282
0
        r = sd_event_add_io(s->events, &s->listen_event,
283
0
                            fd, EPOLLIN,
284
0
                            dispatch_raw_connection_event, s);
285
0
        if (r < 0)
286
0
                return r;
287
0
288
0
        xsprintf(name, "raw-socket-%d", fd);
289
0
290
0
        r = sd_event_source_set_description(s->listen_event, name);
291
0
        if (r < 0)
292
0
                return r;
293
0
294
0
        fd_ = -1;
295
0
        s->active++;
296
0
        return 0;
297
0
}
298
299
/**********************************************************************
300
 **********************************************************************
301
 **********************************************************************/
302
303
int journal_remote_server_init(
304
                RemoteServer *s,
305
                const char *output,
306
                JournalWriteSplitMode split_mode,
307
                bool compress,
308
4.30k
                bool seal) {
309
4.30k
310
4.30k
        int r;
311
4.30k
312
4.30k
        assert(s);
313
4.30k
314
4.30k
        assert(journal_remote_server_global == NULL);
315
4.30k
        journal_remote_server_global = s;
316
4.30k
317
4.30k
        s->split_mode = split_mode;
318
4.30k
        s->compress = compress;
319
4.30k
        s->seal = seal;
320
4.30k
321
4.30k
        if (output)
322
4.30k
                s->output = output;
323
0
        else if (split_mode == JOURNAL_WRITE_SPLIT_NONE)
324
0
                s->output = REMOTE_JOURNAL_PATH "/remote.journal";
325
0
        else if (split_mode == JOURNAL_WRITE_SPLIT_HOST)
326
0
                s->output = REMOTE_JOURNAL_PATH;
327
0
        else
328
0
                assert_not_reached("bad split mode");
329
4.30k
330
4.30k
        r = sd_event_default(&s->events);
331
4.30k
        if (r < 0)
332
0
                return log_error_errno(r, "Failed to allocate event loop: %m");
333
4.30k
334
4.30k
        r = init_writer_hashmap(s);
335
4.30k
        if (r < 0)
336
0
                return r;
337
4.30k
338
4.30k
        return 0;
339
4.30k
}
340
341
#if HAVE_MICROHTTPD
342
static void MHDDaemonWrapper_free(MHDDaemonWrapper *d) {
343
        MHD_stop_daemon(d->daemon);
344
        sd_event_source_unref(d->io_event);
345
        sd_event_source_unref(d->timer_event);
346
        free(d);
347
}
348
#endif
349
350
4.30k
void journal_remote_server_destroy(RemoteServer *s) {
351
4.30k
        size_t i;
352
4.30k
353
#if HAVE_MICROHTTPD
354
        hashmap_free_with_destructor(s->daemons, MHDDaemonWrapper_free);
355
#endif
356
357
4.30k
        assert(s->sources_size == 0 || s->sources);
358
68.9k
        for (i = 0; i < s->sources_size; i++)
359
64.6k
                remove_source(s, i);
360
4.30k
        free(s->sources);
361
4.30k
362
4.30k
        writer_unref(s->_single_writer);
363
4.30k
        hashmap_free(s->writers);
364
4.30k
365
4.30k
        sd_event_source_unref(s->sigterm_event);
366
4.30k
        sd_event_source_unref(s->sigint_event);
367
4.30k
        sd_event_source_unref(s->listen_event);
368
4.30k
        sd_event_unref(s->events);
369
4.30k
370
4.30k
        if (s == journal_remote_server_global)
371
4.30k
                journal_remote_server_global = NULL;
372
4.30k
373
4.30k
        /* fds that we're listening on remain open... */
374
4.30k
}
375
376
/**********************************************************************
377
 **********************************************************************
378
 **********************************************************************/
379
380
int journal_remote_handle_raw_source(
381
                sd_event_source *event,
382
                int fd,
383
                uint32_t revents,
384
1.04M
                RemoteServer *s) {
385
1.04M
386
1.04M
        RemoteSource *source;
387
1.04M
        int r;
388
1.04M
389
1.04M
        /* Returns 1 if there might be more data pending,
390
1.04M
         * 0 if data is currently exhausted, negative on error.
391
1.04M
         */
392
1.04M
393
1.04M
        assert(fd >= 0 && fd < (ssize_t) s->sources_size);
394
1.04M
        source = s->sources[fd];
395
1.04M
        assert(source->importer.fd == fd);
396
1.04M
397
1.04M
        r = process_source(source, s->compress, s->seal);
398
1.04M
        if (journal_importer_eof(&source->importer)) {
399
3.66k
                size_t remaining;
400
3.66k
401
3.66k
                log_debug("EOF reached with source %s (fd=%d)",
402
3.66k
                          source->importer.name, source->importer.fd);
403
3.66k
404
3.66k
                remaining = journal_importer_bytes_remaining(&source->importer);
405
3.66k
                if (remaining > 0)
406
3.66k
                        log_notice("Premature EOF. %zu bytes lost.", remaining);
407
3.66k
                remove_source(s, source->importer.fd);
408
3.66k
                log_debug("%zu active sources remaining", s->active);
409
3.66k
                return 0;
410
1.04M
        } else if (r == -E2BIG) {
411
77.7k
                log_notice("Entry with too many fields, skipped");
412
77.7k
                return 1;
413
964k
        } else if (r == -ENOBUFS) {
414
0
                log_notice("Entry too big, skipped");
415
0
                return 1;
416
964k
        } else if (r == -EAGAIN) {
417
0
                return 0;
418
964k
        } else if (r < 0) {
419
639
                log_debug_errno(r, "Closing connection: %m");
420
639
                remove_source(s, fd);
421
639
                return 0;
422
639
        } else
423
964k
                return 1;
424
1.04M
}
425
426
static int dispatch_raw_source_until_block(sd_event_source *event,
427
0
                                           void *userdata) {
428
0
        RemoteSource *source = userdata;
429
0
        int r;
430
0
431
0
        /* Make sure event stays around even if source is destroyed */
432
0
        sd_event_source_ref(event);
433
0
434
0
        r = journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
435
0
        if (r != 1)
436
0
                /* No more data for now */
437
0
                sd_event_source_set_enabled(event, SD_EVENT_OFF);
438
0
439
0
        sd_event_source_unref(event);
440
0
441
0
        return r;
442
0
}
443
444
static int dispatch_raw_source_event(sd_event_source *event,
445
                                     int fd,
446
                                     uint32_t revents,
447
0
                                     void *userdata) {
448
0
        RemoteSource *source = userdata;
449
0
        int r;
450
0
451
0
        assert(source->event);
452
0
        assert(source->buffer_event);
453
0
454
0
        r = journal_remote_handle_raw_source(event, fd, EPOLLIN, journal_remote_server_global);
455
0
        if (r == 1)
456
0
                /* Might have more data. We need to rerun the handler
457
0
                 * until we are sure the buffer is exhausted. */
458
0
                sd_event_source_set_enabled(source->buffer_event, SD_EVENT_ON);
459
0
460
0
        return r;
461
0
}
462
463
static int dispatch_blocking_source_event(sd_event_source *event,
464
0
                                          void *userdata) {
465
0
        RemoteSource *source = userdata;
466
0
467
0
        return journal_remote_handle_raw_source(event, source->importer.fd, EPOLLIN, journal_remote_server_global);
468
0
}
469
470
static int accept_connection(
471
                const char* type,
472
                int fd,
473
                SocketAddress *addr,
474
0
                char **hostname) {
475
0
476
0
        _cleanup_close_ int fd2 = -1;
477
0
        int r;
478
0
479
0
        log_debug("Accepting new %s connection on fd:%d", type, fd);
480
0
        fd2 = accept4(fd, &addr->sockaddr.sa, &addr->size, SOCK_NONBLOCK|SOCK_CLOEXEC);
481
0
        if (fd2 < 0) {
482
0
                if (ERRNO_IS_ACCEPT_AGAIN(errno))
483
0
                        return -EAGAIN;
484
0
485
0
                return log_error_errno(errno, "accept() on fd:%d failed: %m", fd);
486
0
        }
487
0
488
0
        switch(socket_address_family(addr)) {
489
0
        case AF_INET:
490
0
        case AF_INET6: {
491
0
                _cleanup_free_ char *a = NULL;
492
0
                char *b;
493
0
494
0
                r = socket_address_print(addr, &a);
495
0
                if (r < 0)
496
0
                        return log_error_errno(r, "socket_address_print(): %m");
497
0
498
0
                r = socknameinfo_pretty(&addr->sockaddr, addr->size, &b);
499
0
                if (r < 0)
500
0
                        return log_error_errno(r, "Resolving hostname failed: %m");
501
0
502
0
                log_debug("Accepted %s %s connection from %s",
503
0
                          type,
504
0
                          socket_address_family(addr) == AF_INET ? "IP" : "IPv6",
505
0
                          a);
506
0
507
0
                *hostname = b;
508
0
                return TAKE_FD(fd2);
509
0
        }
510
0
511
0
        default:
512
0
                return log_error_errno(SYNTHETIC_ERRNO(EINVAL),
513
0
                                       "Rejected %s connection with unsupported family %d",
514
0
                                       type, socket_address_family(addr));
515
0
        }
516
0
}
517
518
static int dispatch_raw_connection_event(
519
                sd_event_source *event,
520
                int fd,
521
                uint32_t revents,
522
0
                void *userdata) {
523
0
524
0
        RemoteServer *s = userdata;
525
0
        int fd2;
526
0
        SocketAddress addr = {
527
0
                .size = sizeof(union sockaddr_union),
528
0
                .type = SOCK_STREAM,
529
0
        };
530
0
        char *hostname = NULL;
531
0
532
0
        fd2 = accept_connection("raw", fd, &addr, &hostname);
533
0
        if (fd2 == -EAGAIN)
534
0
                return 0;
535
0
        if (fd2 < 0)
536
0
                return fd2;
537
0
538
0
        return journal_remote_add_source(s, fd2, hostname, true);
539
0
}