Coverage Report

Created: 2026-05-12 06:44

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/zeek/src/logging/WriterFrontend.cc
Line
Count
Source
1
// See the file "COPYING" in the main distribution directory for copyright.
2
3
#include "zeek/logging/WriterFrontend.h"
4
5
#include <span>
6
7
#include "zeek/RunState.h"
8
#include "zeek/broker/Manager.h"
9
#include "zeek/cluster/Backend.h"
10
#include "zeek/logging/Manager.h"
11
#include "zeek/logging/WriterBackend.h"
12
#include "zeek/threading/SerialTypes.h"
13
14
using zeek::threading::Field;
15
using zeek::threading::Value;
16
17
namespace zeek::logging {
18
19
// Messages sent from frontend to backend (i.e., "InputMessages").
20
21
class InitMessage final : public threading::InputMessage<WriterBackend> {
22
public:
23
    InitMessage(WriterBackend* backend, const int num_fields, const Field* const* fields)
24
330
        : threading::InputMessage<WriterBackend>("Init", backend), num_fields(num_fields), fields(fields) {}
25
26
330
    bool Process() override { return Object()->Init(num_fields, fields); }
27
28
private:
29
    const int num_fields;
30
    const Field* const* fields;
31
};
32
33
class RotateMessage final : public threading::InputMessage<WriterBackend> {
34
public:
35
    RotateMessage(WriterBackend* backend, WriterFrontend* frontend, const char* rotated_path, const double open,
36
                  const double close, const bool terminating)
37
0
        : threading::InputMessage<WriterBackend>("Rotate", backend),
38
0
          frontend(frontend),
39
0
          rotated_path(util::copy_string(rotated_path)),
40
0
          open(open),
41
0
          close(close),
42
0
          terminating(terminating) {}
43
44
0
    ~RotateMessage() override { delete[] rotated_path; }
45
46
0
    bool Process() override { return Object()->Rotate(rotated_path, open, close, terminating); }
47
48
private:
49
    WriterFrontend* frontend;
50
    const char* rotated_path;
51
    const double open;
52
    const double close;
53
    const bool terminating;
54
};
55
56
class WriteMessage final : public threading::InputMessage<WriterBackend> {
57
public:
58
    WriteMessage(WriterBackend* backend, int num_fields, std::vector<detail::LogRecord>&& records)
59
554k
        : threading::InputMessage<WriterBackend>("Write", backend),
60
554k
          num_fields(num_fields),
61
554k
          records(std::move(records)) {}
62
63
552k
    bool Process() override { return Object()->Write(num_fields, std::span{records}); }
64
65
private:
66
    int num_fields;
67
    std::vector<detail::LogRecord> records;
68
};
69
70
class SetBufMessage final : public threading::InputMessage<WriterBackend> {
71
public:
72
    SetBufMessage(WriterBackend* backend, const bool enabled)
73
0
        : threading::InputMessage<WriterBackend>("SetBuf", backend), enabled(enabled) {}
74
75
0
    bool Process() override { return Object()->SetBuf(enabled); }
76
77
private:
78
    const bool enabled;
79
};
80
81
class FlushMessage final : public threading::InputMessage<WriterBackend> {
82
public:
83
    FlushMessage(WriterBackend* backend, double network_time)
84
0
        : threading::InputMessage<WriterBackend>("Flush", backend), network_time(network_time) {}
85
86
0
    bool Process() override { return Object()->Flush(network_time); }
87
88
private:
89
    double network_time;
90
};
91
92
// Frontend methods.
93
94
WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVal* arg_stream, EnumVal* arg_writer,
95
                               bool arg_local, bool arg_remote)
96
330
    : write_buffer(detail::WriteBuffer(BifConst::Log::write_buffer_size)) {
97
    // The header's fields are initialized in Init()
98
330
    header = detail::LogWriteHeader{{zeek::NewRef{}, arg_stream},
99
330
                                    {zeek::NewRef{}, arg_writer},
100
330
                                    arg_info.filter_name,
101
330
                                    arg_info.path};
102
103
330
    disabled = initialized = false;
104
330
    buf = true;
105
330
    local = arg_local;
106
330
    remote = arg_remote;
107
330
    info = new WriterBackend::WriterInfo(arg_info);
108
109
330
    const char* w = arg_writer->GetType()->AsEnumType()->Lookup(arg_writer->InternalInt());
110
330
    name = util::copy_string(util::fmt("%s/%s", arg_info.path, w));
111
112
330
    if ( local ) {
113
330
        backend = log_mgr->CreateBackend(this, header.writer_id.get());
114
115
330
        if ( backend )
116
330
            backend->Start();
117
330
    }
118
119
0
    else
120
0
        backend = nullptr;
121
330
}
122
123
0
WriterFrontend::~WriterFrontend() {
124
0
    delete info;
125
0
    delete[] name;
126
0
}
127
128
0
void WriterFrontend::Stop() {
129
0
    if ( disabled ) {
130
0
        return;
131
0
    }
132
133
0
    FlushWriteBuffer();
134
0
    SetDisable();
135
136
0
    if ( backend ) {
137
0
        backend->SignalStop();
138
0
        backend = nullptr; // Thread manager will clean it up once it finishes.
139
0
    }
140
0
}
141
142
330
void WriterFrontend::Init(int arg_num_fields, const Field* const* arg_fields) {
143
330
    if ( disabled )
144
0
        return;
145
146
330
    if ( initialized )
147
0
        reporter->InternalError("writer initialize twice");
148
149
330
    initialized = true;
150
151
330
    header.fields.reserve(arg_num_fields);
152
4.89k
    for ( int i = 0; i < arg_num_fields; i++ )
153
4.56k
        header.fields.emplace_back(*arg_fields[i]);
154
155
330
    if ( remote ) {
156
330
        broker_mgr->PublishLogCreate(header.stream_id.get(), header.writer_id.get(), *info, arg_num_fields, arg_fields);
157
330
    }
158
159
330
    if ( backend )
160
        // InitMessage takes ownership of the pointer passed in here and deletes it and
161
        // the fields when done processing the message.
162
330
        backend->SendIn(new InitMessage(backend, arg_num_fields, arg_fields));
163
0
    else {
164
0
        for ( int i = 0; i < arg_num_fields; i++ )
165
0
            delete arg_fields[i];
166
0
        delete[] arg_fields;
167
0
    }
168
330
}
169
170
17.6M
void WriterFrontend::Write(detail::LogRecord&& arg_vals) {
171
17.6M
    std::vector<threading::Value> vals = std::move(arg_vals);
172
173
17.6M
    if ( disabled )
174
0
        return;
175
176
17.6M
    if ( vals.size() != header.fields.size() ) {
177
0
        reporter->Warning("WriterFrontend %s expected %zu fields in write, got %zu. Skipping line.", name,
178
0
                          header.fields.size(), vals.size());
179
0
        return;
180
0
    }
181
182
    // If remote logging is enabled *and* broker is used as cluster backend,
183
    // push the single log record directly to broker_mgr, it uses its own
184
    // buffering logic currently.
185
    //
186
    // Other cluster backends leverage the write buffering logic in the
187
    // WriterFrontend. See FlushWriteBuffer().
188
17.6M
    const bool broker_is_cluster_backend = zeek::cluster::backend == zeek::broker_mgr;
189
190
17.6M
    if ( remote ) {
191
17.6M
        if ( broker_is_cluster_backend ) {
192
0
            zeek::broker_mgr->PublishLogWrite(header.stream_id.get(), header.writer_id.get(), info->path, vals);
193
194
0
            if ( ! backend ) // nothing left do do if we do not log locally
195
0
                return;
196
0
        }
197
17.6M
    }
198
0
    else if ( ! backend ) {
199
0
        assert(! remote);
200
        // Not remote and no backend, we're done.
201
0
        return;
202
0
    }
203
204
    // Either non-broker remote or local logging.
205
17.6M
    assert(backend || (remote && ! broker_is_cluster_backend));
206
207
17.6M
    write_buffer.WriteRecord(std::move(vals));
208
209
17.6M
    if ( write_buffer.Full() || ! buf || run_state::terminating )
210
        // Buffer full (or no buffering desired or terminating).
211
554k
        FlushWriteBuffer();
212
17.6M
}
213
214
554k
void WriterFrontend::FlushWriteBuffer() {
215
554k
    if ( disabled )
216
0
        return;
217
218
554k
    if ( write_buffer.Empty() )
219
        // Nothing to do.
220
0
        return;
221
222
554k
    auto records = std::move(write_buffer).TakeRecords();
223
224
    // We've already pushed to broker during Write(). If another backend
225
    // is used, push all the buffered log records to it now.
226
554k
    const bool broker_is_cluster_backend = zeek::cluster::backend == zeek::broker_mgr;
227
554k
    if ( remote && ! broker_is_cluster_backend )
228
554k
        zeek::cluster::backend->PublishLogWrites(header, std::span{records});
229
230
554k
    if ( backend )
231
554k
        backend->SendIn(new WriteMessage(backend, header.fields.size(), std::move(records)));
232
554k
}
233
234
0
void WriterFrontend::SetBuf(bool enabled) {
235
0
    if ( disabled )
236
0
        return;
237
238
0
    buf = enabled;
239
240
0
    if ( backend )
241
0
        backend->SendIn(new SetBufMessage(backend, enabled));
242
243
0
    if ( ! buf )
244
        // Make sure no longer buffer any still queued data.
245
0
        FlushWriteBuffer();
246
0
}
247
248
0
void WriterFrontend::Flush(double network_time) {
249
0
    if ( disabled )
250
0
        return;
251
252
0
    FlushWriteBuffer();
253
254
0
    if ( backend )
255
0
        backend->SendIn(new FlushMessage(backend, network_time));
256
0
}
257
258
0
void WriterFrontend::Rotate(const char* rotated_path, double open, double close, bool terminating) {
259
0
    if ( disabled )
260
0
        return;
261
262
0
    FlushWriteBuffer();
263
264
0
    if ( backend )
265
0
        backend->SendIn(new RotateMessage(backend, this, rotated_path, open, close, terminating));
266
0
    else
267
        // Still signal log manager that we're done.
268
0
        log_mgr->FinishedRotation(this, nullptr, nullptr, 0, 0, false, terminating);
269
0
}
270
271
} // namespace zeek::logging