/src/openvswitch/lib/stream-replay.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2021, Red Hat, Inc. |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at: |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | |
17 | | #include <config.h> |
18 | | #include <ctype.h> |
19 | | #include <errno.h> |
20 | | #include <poll.h> |
21 | | #include <stdlib.h> |
22 | | #include <string.h> |
23 | | #include <sys/socket.h> |
24 | | #include <sys/types.h> |
25 | | #include <unistd.h> |
26 | | #include "ovs-atomic.h" |
27 | | #include "ovs-replay.h" |
28 | | #include "util.h" |
29 | | #include "stream-provider.h" |
30 | | #include "stream.h" |
31 | | #include "openvswitch/poll-loop.h" |
32 | | #include "openvswitch/vlog.h" |
33 | | |
34 | | VLOG_DEFINE_THIS_MODULE(stream_replay); |
35 | | |
36 | | static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25); |
37 | | |
38 | | /* Active replay stream. */ |
39 | | |
40 | | struct stream_replay { |
41 | | struct stream stream; |
42 | | replay_file_t f; |
43 | | int seqno; |
44 | | }; |
45 | | |
46 | | const struct stream_class replay_stream_class; |
47 | | |
48 | | /* Creates a new stream named 'name' that will emulate sending and receiving |
49 | | * data using replay file and stores a pointer to the stream in '*streamp'. |
50 | | * |
51 | | * Returns 0 if successful, otherwise a positive errno value. */ |
52 | | static int |
53 | | new_replay_stream(const char *name, struct stream **streamp) |
54 | 0 | { |
55 | 0 | struct stream_replay *s; |
56 | 0 | int seqno = 0, error = 0, open_result; |
57 | 0 | replay_file_t f; |
58 | |
|
59 | 0 | ovs_replay_lock(); |
60 | 0 | error = ovs_replay_file_open(name, &f, &seqno); |
61 | 0 | if (error) { |
62 | 0 | VLOG_ERR_RL(&rl, "%s: failed to open stream.", name); |
63 | 0 | goto unlock; |
64 | 0 | } |
65 | | |
66 | 0 | error = ovs_replay_read(f, NULL, 0, &open_result, &seqno, true); |
67 | 0 | if (error) { |
68 | 0 | VLOG_ERR_RL(&rl, "%s: failed to read 'open' record.", name); |
69 | 0 | ovs_replay_file_close(f); |
70 | 0 | goto unlock; |
71 | 0 | } |
72 | | |
73 | 0 | if (open_result) { |
74 | 0 | error = -open_result; |
75 | 0 | ovs_replay_file_close(f); |
76 | 0 | goto unlock; |
77 | 0 | } |
78 | | |
79 | 0 | s = xmalloc(sizeof *s); |
80 | 0 | stream_init(&s->stream, &replay_stream_class, 0, xstrdup(name)); |
81 | 0 | s->f = f; |
82 | 0 | s->seqno = seqno; |
83 | 0 | *streamp = &s->stream; |
84 | 0 | unlock: |
85 | 0 | ovs_replay_unlock(); |
86 | 0 | return error; |
87 | 0 | } |
88 | | |
89 | | static struct stream_replay * |
90 | | stream_replay_cast(struct stream *stream) |
91 | 0 | { |
92 | 0 | stream_assert_class(stream, &replay_stream_class); |
93 | 0 | return CONTAINER_OF(stream, struct stream_replay, stream); |
94 | 0 | } |
95 | | |
96 | | void |
97 | | stream_replay_open_wfd(struct stream *s, int open_result, const char *name) |
98 | 0 | { |
99 | 0 | int state = ovs_replay_get_state(); |
100 | 0 | int error = 0; |
101 | 0 | replay_file_t f; |
102 | |
|
103 | 0 | if (OVS_LIKELY(state != OVS_REPLAY_WRITE)) { |
104 | 0 | return; |
105 | 0 | } |
106 | | |
107 | 0 | ovs_replay_lock(); |
108 | 0 | error = ovs_replay_file_open(name, &f, NULL); |
109 | 0 | if (error) { |
110 | 0 | VLOG_ERR_RL(&rl, "%s: failed to open replay file for stream.", name); |
111 | 0 | ovs_replay_unlock(); |
112 | 0 | return; |
113 | 0 | } |
114 | 0 | ovs_replay_unlock(); |
115 | |
|
116 | 0 | if (ovs_replay_write(f, NULL, -open_result, true)) { |
117 | 0 | VLOG_ERR_RL(&rl, "%s: failed to write 'open' result: %d", |
118 | 0 | name, open_result); |
119 | 0 | } |
120 | 0 | if (open_result) { |
121 | | /* We recorded failure to open the stream. */ |
122 | 0 | ovs_replay_file_close(f); |
123 | 0 | } else { |
124 | 0 | s->replay_wfd = f; |
125 | 0 | } |
126 | 0 | } |
127 | | |
128 | | void |
129 | | stream_replay_write(struct stream *s, const void *buffer, int n, bool is_read) |
130 | 0 | { |
131 | 0 | int state = ovs_replay_get_state(); |
132 | |
|
133 | 0 | if (OVS_LIKELY(state != OVS_REPLAY_WRITE)) { |
134 | 0 | return; |
135 | 0 | } |
136 | | |
137 | 0 | if (ovs_replay_write(s->replay_wfd, buffer, n, is_read)) { |
138 | 0 | VLOG_ERR_RL(&rl, "%s: failed to write buffer.", s->name); |
139 | 0 | } |
140 | 0 | } |
141 | | |
142 | | void |
143 | | stream_replay_close_wfd(struct stream *s) |
144 | 0 | { |
145 | 0 | if (s->replay_wfd) { |
146 | 0 | ovs_replay_file_close(s->replay_wfd); |
147 | 0 | } |
148 | 0 | } |
149 | | |
150 | | static int |
151 | | stream_replay_open(const char *name, char *suffix OVS_UNUSED, |
152 | | struct stream **streamp, uint8_t dscp OVS_UNUSED) |
153 | 0 | { |
154 | 0 | return new_replay_stream(name, streamp); |
155 | 0 | } |
156 | | |
157 | | static void |
158 | | stream_replay_close(struct stream *stream) |
159 | 0 | { |
160 | 0 | struct stream_replay *s = stream_replay_cast(stream); |
161 | 0 | ovs_replay_file_close(s->f); |
162 | 0 | free(s); |
163 | 0 | } |
164 | | |
165 | | static ssize_t |
166 | | stream_replay_recv(struct stream *stream, void *buffer, size_t n) |
167 | 0 | { |
168 | 0 | struct stream_replay *s = stream_replay_cast(stream); |
169 | 0 | int norm_seqno = ovs_replay_normalized_seqno(s->seqno); |
170 | 0 | int error, len; |
171 | |
|
172 | 0 | ovs_replay_lock(); |
173 | 0 | ovs_assert(norm_seqno >= ovs_replay_seqno()); |
174 | |
|
175 | 0 | if (norm_seqno != ovs_replay_seqno() |
176 | 0 | || !ovs_replay_seqno_is_read(s->seqno)) { |
177 | 0 | error = EAGAIN; |
178 | 0 | goto unlock; |
179 | 0 | } |
180 | | |
181 | 0 | error = ovs_replay_read(s->f, buffer, n, &len, &s->seqno, true); |
182 | 0 | if (error) { |
183 | 0 | VLOG_ERR_RL(&rl, "%s: failed to read from replay file.", stream->name); |
184 | 0 | goto unlock; |
185 | 0 | } |
186 | | |
187 | 0 | unlock: |
188 | 0 | ovs_replay_unlock(); |
189 | 0 | return error ? -error : len; |
190 | 0 | } |
191 | | |
192 | | static ssize_t |
193 | | stream_replay_send(struct stream *stream OVS_UNUSED, |
194 | | const void *buffer OVS_UNUSED, size_t n) |
195 | 0 | { |
196 | 0 | struct stream_replay *s = stream_replay_cast(stream); |
197 | 0 | int norm_seqno = ovs_replay_normalized_seqno(s->seqno); |
198 | 0 | int error, len; |
199 | |
|
200 | 0 | ovs_replay_lock(); |
201 | 0 | ovs_assert(norm_seqno >= ovs_replay_seqno()); |
202 | |
|
203 | 0 | if (norm_seqno != ovs_replay_seqno() |
204 | 0 | || ovs_replay_seqno_is_read(s->seqno)) { |
205 | 0 | error = EAGAIN; |
206 | 0 | goto unlock; |
207 | 0 | } |
208 | | |
209 | 0 | error = ovs_replay_read(s->f, NULL, 0, &len, &s->seqno, false); |
210 | 0 | if (error) { |
211 | 0 | VLOG_ERR_RL(&rl, "%s: failed to read from replay file.", stream->name); |
212 | 0 | goto unlock; |
213 | 0 | } |
214 | 0 | ovs_assert(len < 0 || len <= n); |
215 | |
|
216 | 0 | unlock: |
217 | 0 | ovs_replay_unlock(); |
218 | 0 | return error ? -error : len; |
219 | 0 | } |
220 | | |
221 | | static void |
222 | | stream_replay_wait(struct stream *stream, enum stream_wait_type wait) |
223 | 0 | { |
224 | 0 | struct stream_replay *s = stream_replay_cast(stream); |
225 | 0 | switch (wait) { |
226 | 0 | case STREAM_CONNECT: |
227 | | /* Connect does nothing and always available. */ |
228 | 0 | poll_immediate_wake(); |
229 | 0 | break; |
230 | | |
231 | 0 | case STREAM_SEND: |
232 | 0 | if (s->seqno != INT_MAX && !ovs_replay_seqno_is_read(s->seqno)) { |
233 | | /* Stream waits for write. */ |
234 | 0 | poll_immediate_wake(); |
235 | 0 | } |
236 | 0 | break; |
237 | | |
238 | 0 | case STREAM_RECV: |
239 | 0 | if (s->seqno != INT_MAX && ovs_replay_seqno_is_read(s->seqno)) { |
240 | | /* We still have something to read. */ |
241 | 0 | poll_immediate_wake(); |
242 | 0 | } |
243 | 0 | break; |
244 | | |
245 | 0 | default: |
246 | 0 | OVS_NOT_REACHED(); |
247 | 0 | } |
248 | 0 | } |
249 | | |
250 | | const struct stream_class replay_stream_class = { |
251 | | "replay", /* name */ |
252 | | false, /* needs_probes */ |
253 | | stream_replay_open, /* open */ |
254 | | stream_replay_close, /* close */ |
255 | | NULL, /* connect */ |
256 | | stream_replay_recv, /* recv */ |
257 | | stream_replay_send, /* send */ |
258 | | NULL, /* run */ |
259 | | NULL, /* run_wait */ |
260 | | stream_replay_wait, /* wait */ |
261 | | }; |
262 | | |
263 | | /* Passive replay stream. */ |
264 | | |
265 | | struct replay_pstream |
266 | | { |
267 | | struct pstream pstream; |
268 | | replay_file_t f; |
269 | | int seqno; |
270 | | }; |
271 | | |
272 | | const struct pstream_class preplay_pstream_class; |
273 | | |
274 | | static struct replay_pstream * |
275 | | replay_pstream_cast(struct pstream *pstream) |
276 | 0 | { |
277 | 0 | pstream_assert_class(pstream, &preplay_pstream_class); |
278 | 0 | return CONTAINER_OF(pstream, struct replay_pstream, pstream); |
279 | 0 | } |
280 | | |
281 | | /* Creates a new pstream named 'name' that will accept new replay connections |
282 | | * reading them from the replay file and stores a pointer to the stream in |
283 | | * '*pstreamp'. |
284 | | * |
285 | | * Returns 0 if successful, otherwise a positive errno value. */ |
286 | | static int |
287 | | pstream_replay_listen(const char *name, char *suffix OVS_UNUSED, |
288 | | struct pstream **pstreamp, uint8_t dscp OVS_UNUSED) |
289 | 0 | { |
290 | 0 | int seqno = 0, error = 0, listen_result; |
291 | 0 | replay_file_t f; |
292 | |
|
293 | 0 | ovs_replay_lock(); |
294 | 0 | error = ovs_replay_file_open(name, &f, &seqno); |
295 | 0 | if (error) { |
296 | 0 | VLOG_ERR_RL(&rl, "%s: failed to open pstream.", name); |
297 | 0 | goto unlock; |
298 | 0 | } |
299 | | |
300 | 0 | error = ovs_replay_read(f, NULL, 0, &listen_result, &seqno, true); |
301 | 0 | if (error) { |
302 | 0 | VLOG_ERR_RL(&rl, "%s: failed to read 'listen' record.", name); |
303 | 0 | ovs_replay_file_close(f); |
304 | 0 | goto unlock; |
305 | 0 | } |
306 | | |
307 | 0 | if (listen_result) { |
308 | 0 | error = -listen_result; |
309 | 0 | ovs_replay_file_close(f); |
310 | 0 | goto unlock; |
311 | 0 | } |
312 | | |
313 | 0 | struct replay_pstream *ps = xmalloc(sizeof *ps); |
314 | 0 | pstream_init(&ps->pstream, &preplay_pstream_class, xstrdup(name)); |
315 | 0 | ps->f = f; |
316 | 0 | ps->seqno = seqno; |
317 | 0 | *pstreamp = &ps->pstream; |
318 | 0 | unlock: |
319 | 0 | ovs_replay_unlock(); |
320 | 0 | return error; |
321 | 0 | } |
322 | | |
323 | | void |
324 | | pstream_replay_open_wfd(struct pstream *ps, int listen_result, |
325 | | const char *name) |
326 | 0 | { |
327 | 0 | int state = ovs_replay_get_state(); |
328 | 0 | int error = 0; |
329 | 0 | replay_file_t f; |
330 | |
|
331 | 0 | if (OVS_LIKELY(state != OVS_REPLAY_WRITE)) { |
332 | 0 | return; |
333 | 0 | } |
334 | | |
335 | 0 | ovs_replay_lock(); |
336 | 0 | error = ovs_replay_file_open(name, &f, NULL); |
337 | 0 | if (error) { |
338 | 0 | VLOG_ERR_RL(&rl, "%s: failed to open replay file for pstream.", name); |
339 | 0 | ovs_replay_unlock(); |
340 | 0 | return; |
341 | 0 | } |
342 | 0 | ovs_replay_unlock(); |
343 | |
|
344 | 0 | if (ovs_replay_write(f, NULL, -listen_result, true)) { |
345 | 0 | VLOG_ERR_RL(&rl, "%s: failed to write 'listen' result: %d", |
346 | 0 | name, listen_result); |
347 | 0 | } |
348 | |
|
349 | 0 | if (listen_result) { |
350 | | /* We recorded failure to open the stream. */ |
351 | 0 | ovs_replay_file_close(f); |
352 | 0 | } else { |
353 | 0 | ps->replay_wfd = f; |
354 | 0 | } |
355 | 0 | } |
356 | | |
357 | | void |
358 | | pstream_replay_write_accept(struct pstream *ps, const struct stream *s, |
359 | | int accept_result) |
360 | 0 | { |
361 | 0 | int state = ovs_replay_get_state(); |
362 | 0 | int len; |
363 | |
|
364 | 0 | if (OVS_LIKELY(state != OVS_REPLAY_WRITE)) { |
365 | 0 | return; |
366 | 0 | } |
367 | | |
368 | 0 | if (!accept_result) { |
369 | 0 | len = strlen(s->name); |
370 | 0 | if (ovs_replay_write(ps->replay_wfd, s->name, len, true)) { |
371 | 0 | VLOG_ERR_RL(&rl, "%s: failed to write accept name: %s", |
372 | 0 | ps->name, s->name); |
373 | 0 | } |
374 | 0 | } else if (ovs_replay_write(ps->replay_wfd, NULL, -accept_result, true)) { |
375 | 0 | VLOG_ERR_RL(&rl, "%s: failed to write 'accept' failure: %d", |
376 | 0 | ps->name, accept_result); |
377 | 0 | } |
378 | 0 | } |
379 | | |
380 | | void |
381 | | pstream_replay_close_wfd(struct pstream *ps) |
382 | 0 | { |
383 | 0 | if (ps->replay_wfd) { |
384 | 0 | ovs_replay_file_close(ps->replay_wfd); |
385 | 0 | } |
386 | 0 | } |
387 | | |
388 | | static void |
389 | | pstream_replay_close(struct pstream *pstream) |
390 | 0 | { |
391 | 0 | struct replay_pstream *ps = replay_pstream_cast(pstream); |
392 | |
|
393 | 0 | ovs_replay_file_close(ps->f); |
394 | 0 | free(ps); |
395 | 0 | } |
396 | | |
397 | 0 | #define MAX_NAME_LEN 65536 |
398 | | |
399 | | static int |
400 | | pstream_replay_accept(struct pstream *pstream, struct stream **new_streamp) |
401 | 0 | { |
402 | 0 | struct replay_pstream *ps = replay_pstream_cast(pstream); |
403 | 0 | int norm_seqno = ovs_replay_normalized_seqno(ps->seqno); |
404 | 0 | int retval, len; |
405 | 0 | char name[MAX_NAME_LEN]; |
406 | |
|
407 | 0 | ovs_replay_lock(); |
408 | 0 | ovs_assert(norm_seqno >= ovs_replay_seqno()); |
409 | |
|
410 | 0 | if (norm_seqno != ovs_replay_seqno() |
411 | 0 | || !ovs_replay_seqno_is_read(ps->seqno)) { |
412 | 0 | retval = EAGAIN; |
413 | 0 | ovs_replay_unlock(); |
414 | 0 | goto exit; |
415 | 0 | } |
416 | | |
417 | 0 | retval = ovs_replay_read(ps->f, name, MAX_NAME_LEN - 1, |
418 | 0 | &len, &ps->seqno, true); |
419 | 0 | if (retval) { |
420 | 0 | VLOG_ERR_RL(&rl, "%s: failed to read from replay file.", |
421 | 0 | pstream->name); |
422 | 0 | ovs_replay_unlock(); |
423 | 0 | goto exit; |
424 | 0 | } |
425 | | |
426 | 0 | ovs_replay_unlock(); |
427 | |
|
428 | 0 | if (len > 0) { |
429 | 0 | name[len] = 0; |
430 | 0 | retval = new_replay_stream(name, new_streamp); |
431 | 0 | } else { |
432 | 0 | retval = -len; |
433 | 0 | } |
434 | 0 | exit: |
435 | 0 | return retval; |
436 | 0 | } |
437 | | |
438 | | static void |
439 | | pstream_replay_wait(struct pstream *pstream) |
440 | 0 | { |
441 | 0 | struct replay_pstream *ps = replay_pstream_cast(pstream); |
442 | |
|
443 | 0 | if (ps->seqno != INT_MAX) { |
444 | | /* Replay always has something to say. */ |
445 | 0 | poll_immediate_wake(); |
446 | 0 | } |
447 | 0 | } |
448 | | |
449 | | const struct pstream_class preplay_pstream_class = { |
450 | | "preplay", |
451 | | false, |
452 | | pstream_replay_listen, |
453 | | pstream_replay_close, |
454 | | pstream_replay_accept, |
455 | | pstream_replay_wait, |
456 | | }; |