/src/zeek/src/logging/WriterFrontend.cc
Line | Count | Source |
1 | | // See the file "COPYING" in the main distribution directory for copyright. |
2 | | |
3 | | #include "zeek/logging/WriterFrontend.h" |
4 | | |
5 | | #include <span> |
6 | | |
7 | | #include "zeek/RunState.h" |
8 | | #include "zeek/broker/Manager.h" |
9 | | #include "zeek/cluster/Backend.h" |
10 | | #include "zeek/logging/Manager.h" |
11 | | #include "zeek/logging/WriterBackend.h" |
12 | | #include "zeek/threading/SerialTypes.h" |
13 | | |
14 | | using zeek::threading::Field; |
15 | | using zeek::threading::Value; |
16 | | |
17 | | namespace zeek::logging { |
18 | | |
19 | | // Messages sent from frontend to backend (i.e., "InputMessages"). |
20 | | |
21 | | class InitMessage final : public threading::InputMessage<WriterBackend> { |
22 | | public: |
23 | | InitMessage(WriterBackend* backend, const int num_fields, const Field* const* fields) |
24 | 330 | : threading::InputMessage<WriterBackend>("Init", backend), num_fields(num_fields), fields(fields) {} |
25 | | |
26 | 330 | bool Process() override { return Object()->Init(num_fields, fields); } |
27 | | |
28 | | private: |
29 | | const int num_fields; |
30 | | const Field* const* fields; |
31 | | }; |
32 | | |
33 | | class RotateMessage final : public threading::InputMessage<WriterBackend> { |
34 | | public: |
35 | | RotateMessage(WriterBackend* backend, WriterFrontend* frontend, const char* rotated_path, const double open, |
36 | | const double close, const bool terminating) |
37 | 0 | : threading::InputMessage<WriterBackend>("Rotate", backend), |
38 | 0 | frontend(frontend), |
39 | 0 | rotated_path(util::copy_string(rotated_path)), |
40 | 0 | open(open), |
41 | 0 | close(close), |
42 | 0 | terminating(terminating) {} |
43 | | |
44 | 0 | ~RotateMessage() override { delete[] rotated_path; } |
45 | | |
46 | 0 | bool Process() override { return Object()->Rotate(rotated_path, open, close, terminating); } |
47 | | |
48 | | private: |
49 | | WriterFrontend* frontend; |
50 | | const char* rotated_path; |
51 | | const double open; |
52 | | const double close; |
53 | | const bool terminating; |
54 | | }; |
55 | | |
56 | | class WriteMessage final : public threading::InputMessage<WriterBackend> { |
57 | | public: |
58 | | WriteMessage(WriterBackend* backend, int num_fields, std::vector<detail::LogRecord>&& records) |
59 | 554k | : threading::InputMessage<WriterBackend>("Write", backend), |
60 | 554k | num_fields(num_fields), |
61 | 554k | records(std::move(records)) {} |
62 | | |
63 | 552k | bool Process() override { return Object()->Write(num_fields, std::span{records}); } |
64 | | |
65 | | private: |
66 | | int num_fields; |
67 | | std::vector<detail::LogRecord> records; |
68 | | }; |
69 | | |
70 | | class SetBufMessage final : public threading::InputMessage<WriterBackend> { |
71 | | public: |
72 | | SetBufMessage(WriterBackend* backend, const bool enabled) |
73 | 0 | : threading::InputMessage<WriterBackend>("SetBuf", backend), enabled(enabled) {} |
74 | | |
75 | 0 | bool Process() override { return Object()->SetBuf(enabled); } |
76 | | |
77 | | private: |
78 | | const bool enabled; |
79 | | }; |
80 | | |
81 | | class FlushMessage final : public threading::InputMessage<WriterBackend> { |
82 | | public: |
83 | | FlushMessage(WriterBackend* backend, double network_time) |
84 | 0 | : threading::InputMessage<WriterBackend>("Flush", backend), network_time(network_time) {} |
85 | | |
86 | 0 | bool Process() override { return Object()->Flush(network_time); } |
87 | | |
88 | | private: |
89 | | double network_time; |
90 | | }; |
91 | | |
92 | | // Frontend methods. |
93 | | |
94 | | WriterFrontend::WriterFrontend(const WriterBackend::WriterInfo& arg_info, EnumVal* arg_stream, EnumVal* arg_writer, |
95 | | bool arg_local, bool arg_remote) |
96 | 330 | : write_buffer(detail::WriteBuffer(BifConst::Log::write_buffer_size)) { |
97 | | // The header's fields are initialized in Init() |
98 | 330 | header = detail::LogWriteHeader{{zeek::NewRef{}, arg_stream}, |
99 | 330 | {zeek::NewRef{}, arg_writer}, |
100 | 330 | arg_info.filter_name, |
101 | 330 | arg_info.path}; |
102 | | |
103 | 330 | disabled = initialized = false; |
104 | 330 | buf = true; |
105 | 330 | local = arg_local; |
106 | 330 | remote = arg_remote; |
107 | 330 | info = new WriterBackend::WriterInfo(arg_info); |
108 | | |
109 | 330 | const char* w = arg_writer->GetType()->AsEnumType()->Lookup(arg_writer->InternalInt()); |
110 | 330 | name = util::copy_string(util::fmt("%s/%s", arg_info.path, w)); |
111 | | |
112 | 330 | if ( local ) { |
113 | 330 | backend = log_mgr->CreateBackend(this, header.writer_id.get()); |
114 | | |
115 | 330 | if ( backend ) |
116 | 330 | backend->Start(); |
117 | 330 | } |
118 | | |
119 | 0 | else |
120 | 0 | backend = nullptr; |
121 | 330 | } |
122 | | |
123 | 0 | WriterFrontend::~WriterFrontend() { |
124 | 0 | delete info; |
125 | 0 | delete[] name; |
126 | 0 | } |
127 | | |
128 | 0 | void WriterFrontend::Stop() { |
129 | 0 | if ( disabled ) { |
130 | 0 | return; |
131 | 0 | } |
132 | | |
133 | 0 | FlushWriteBuffer(); |
134 | 0 | SetDisable(); |
135 | |
|
136 | 0 | if ( backend ) { |
137 | 0 | backend->SignalStop(); |
138 | 0 | backend = nullptr; // Thread manager will clean it up once it finishes. |
139 | 0 | } |
140 | 0 | } |
141 | | |
142 | 330 | void WriterFrontend::Init(int arg_num_fields, const Field* const* arg_fields) { |
143 | 330 | if ( disabled ) |
144 | 0 | return; |
145 | | |
146 | 330 | if ( initialized ) |
147 | 0 | reporter->InternalError("writer initialize twice"); |
148 | | |
149 | 330 | initialized = true; |
150 | | |
151 | 330 | header.fields.reserve(arg_num_fields); |
152 | 4.89k | for ( int i = 0; i < arg_num_fields; i++ ) |
153 | 4.56k | header.fields.emplace_back(*arg_fields[i]); |
154 | | |
155 | 330 | if ( remote ) { |
156 | 330 | broker_mgr->PublishLogCreate(header.stream_id.get(), header.writer_id.get(), *info, arg_num_fields, arg_fields); |
157 | 330 | } |
158 | | |
159 | 330 | if ( backend ) |
160 | | // InitMessage takes ownership of the pointer passed in here and deletes it and |
161 | | // the fields when done processing the message. |
162 | 330 | backend->SendIn(new InitMessage(backend, arg_num_fields, arg_fields)); |
163 | 0 | else { |
164 | 0 | for ( int i = 0; i < arg_num_fields; i++ ) |
165 | 0 | delete arg_fields[i]; |
166 | 0 | delete[] arg_fields; |
167 | 0 | } |
168 | 330 | } |
169 | | |
170 | 17.6M | void WriterFrontend::Write(detail::LogRecord&& arg_vals) { |
171 | 17.6M | std::vector<threading::Value> vals = std::move(arg_vals); |
172 | | |
173 | 17.6M | if ( disabled ) |
174 | 0 | return; |
175 | | |
176 | 17.6M | if ( vals.size() != header.fields.size() ) { |
177 | 0 | reporter->Warning("WriterFrontend %s expected %zu fields in write, got %zu. Skipping line.", name, |
178 | 0 | header.fields.size(), vals.size()); |
179 | 0 | return; |
180 | 0 | } |
181 | | |
182 | | // If remote logging is enabled *and* broker is used as cluster backend, |
183 | | // push the single log record directly to broker_mgr, it uses its own |
184 | | // buffering logic currently. |
185 | | // |
186 | | // Other cluster backends leverage the write buffering logic in the |
187 | | // WriterFrontend. See FlushWriteBuffer(). |
188 | 17.6M | const bool broker_is_cluster_backend = zeek::cluster::backend == zeek::broker_mgr; |
189 | | |
190 | 17.6M | if ( remote ) { |
191 | 17.6M | if ( broker_is_cluster_backend ) { |
192 | 0 | zeek::broker_mgr->PublishLogWrite(header.stream_id.get(), header.writer_id.get(), info->path, vals); |
193 | |
|
194 | 0 | if ( ! backend ) // nothing left do do if we do not log locally |
195 | 0 | return; |
196 | 0 | } |
197 | 17.6M | } |
198 | 0 | else if ( ! backend ) { |
199 | 0 | assert(! remote); |
200 | | // Not remote and no backend, we're done. |
201 | 0 | return; |
202 | 0 | } |
203 | | |
204 | | // Either non-broker remote or local logging. |
205 | 17.6M | assert(backend || (remote && ! broker_is_cluster_backend)); |
206 | | |
207 | 17.6M | write_buffer.WriteRecord(std::move(vals)); |
208 | | |
209 | 17.6M | if ( write_buffer.Full() || ! buf || run_state::terminating ) |
210 | | // Buffer full (or no buffering desired or terminating). |
211 | 554k | FlushWriteBuffer(); |
212 | 17.6M | } |
213 | | |
214 | 554k | void WriterFrontend::FlushWriteBuffer() { |
215 | 554k | if ( disabled ) |
216 | 0 | return; |
217 | | |
218 | 554k | if ( write_buffer.Empty() ) |
219 | | // Nothing to do. |
220 | 0 | return; |
221 | | |
222 | 554k | auto records = std::move(write_buffer).TakeRecords(); |
223 | | |
224 | | // We've already pushed to broker during Write(). If another backend |
225 | | // is used, push all the buffered log records to it now. |
226 | 554k | const bool broker_is_cluster_backend = zeek::cluster::backend == zeek::broker_mgr; |
227 | 554k | if ( remote && ! broker_is_cluster_backend ) |
228 | 554k | zeek::cluster::backend->PublishLogWrites(header, std::span{records}); |
229 | | |
230 | 554k | if ( backend ) |
231 | 554k | backend->SendIn(new WriteMessage(backend, header.fields.size(), std::move(records))); |
232 | 554k | } |
233 | | |
234 | 0 | void WriterFrontend::SetBuf(bool enabled) { |
235 | 0 | if ( disabled ) |
236 | 0 | return; |
237 | | |
238 | 0 | buf = enabled; |
239 | |
|
240 | 0 | if ( backend ) |
241 | 0 | backend->SendIn(new SetBufMessage(backend, enabled)); |
242 | |
|
243 | 0 | if ( ! buf ) |
244 | | // Make sure no longer buffer any still queued data. |
245 | 0 | FlushWriteBuffer(); |
246 | 0 | } |
247 | | |
248 | 0 | void WriterFrontend::Flush(double network_time) { |
249 | 0 | if ( disabled ) |
250 | 0 | return; |
251 | | |
252 | 0 | FlushWriteBuffer(); |
253 | |
|
254 | 0 | if ( backend ) |
255 | 0 | backend->SendIn(new FlushMessage(backend, network_time)); |
256 | 0 | } |
257 | | |
258 | 0 | void WriterFrontend::Rotate(const char* rotated_path, double open, double close, bool terminating) { |
259 | 0 | if ( disabled ) |
260 | 0 | return; |
261 | | |
262 | 0 | FlushWriteBuffer(); |
263 | |
|
264 | 0 | if ( backend ) |
265 | 0 | backend->SendIn(new RotateMessage(backend, this, rotated_path, open, close, terminating)); |
266 | 0 | else |
267 | | // Still signal log manager that we're done. |
268 | 0 | log_mgr->FinishedRotation(this, nullptr, nullptr, 0, 0, false, terminating); |
269 | 0 | } |
270 | | |
271 | | } // namespace zeek::logging |