Coverage Report

Created: 2025-07-01 06:50

/src/openvswitch/lib/stream-replay.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2021, Red Hat, Inc.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at:
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
17
#include <config.h>
18
#include <ctype.h>
19
#include <errno.h>
20
#include <poll.h>
21
#include <stdlib.h>
22
#include <string.h>
23
#include <sys/socket.h>
24
#include <sys/types.h>
25
#include <unistd.h>
26
#include "ovs-atomic.h"
27
#include "ovs-replay.h"
28
#include "util.h"
29
#include "stream-provider.h"
30
#include "stream.h"
31
#include "openvswitch/poll-loop.h"
32
#include "openvswitch/vlog.h"
33
34
VLOG_DEFINE_THIS_MODULE(stream_replay);
35
36
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 25);
37
38
/* Active replay stream. */
39
40
struct stream_replay {
41
    struct stream stream;
42
    replay_file_t f;
43
    int seqno;
44
};
45
46
const struct stream_class replay_stream_class;
47
48
/* Creates a new stream named 'name' that will emulate sending and receiving
49
 * data using replay file and stores a pointer to the stream in '*streamp'.
50
 *
51
 * Returns 0 if successful, otherwise a positive errno value. */
52
static int
53
new_replay_stream(const char *name, struct stream **streamp)
54
0
{
55
0
    struct stream_replay *s;
56
0
    int seqno = 0, error = 0, open_result;
57
0
    replay_file_t f;
58
59
0
    ovs_replay_lock();
60
0
    error = ovs_replay_file_open(name, &f, &seqno);
61
0
    if (error) {
62
0
        VLOG_ERR_RL(&rl, "%s: failed to open stream.", name);
63
0
        goto unlock;
64
0
    }
65
66
0
    error = ovs_replay_read(f, NULL, 0, &open_result, &seqno, true);
67
0
    if (error) {
68
0
        VLOG_ERR_RL(&rl, "%s: failed to read 'open' record.", name);
69
0
        ovs_replay_file_close(f);
70
0
        goto unlock;
71
0
    }
72
73
0
    if (open_result) {
74
0
        error = -open_result;
75
0
        ovs_replay_file_close(f);
76
0
        goto unlock;
77
0
    }
78
79
0
    s = xmalloc(sizeof *s);
80
0
    stream_init(&s->stream, &replay_stream_class, 0, xstrdup(name));
81
0
    s->f = f;
82
0
    s->seqno = seqno;
83
0
    *streamp = &s->stream;
84
0
unlock:
85
0
    ovs_replay_unlock();
86
0
    return error;
87
0
}
88
89
static struct stream_replay *
90
stream_replay_cast(struct stream *stream)
91
0
{
92
0
    stream_assert_class(stream, &replay_stream_class);
93
0
    return CONTAINER_OF(stream, struct stream_replay, stream);
94
0
}
95
96
void
97
stream_replay_open_wfd(struct stream *s, int open_result, const char *name)
98
0
{
99
0
    int state = ovs_replay_get_state();
100
0
    int error = 0;
101
0
    replay_file_t f;
102
103
0
    if (OVS_LIKELY(state != OVS_REPLAY_WRITE)) {
104
0
        return;
105
0
    }
106
107
0
    ovs_replay_lock();
108
0
    error = ovs_replay_file_open(name, &f, NULL);
109
0
    if (error) {
110
0
        VLOG_ERR_RL(&rl, "%s: failed to open replay file for stream.", name);
111
0
        ovs_replay_unlock();
112
0
        return;
113
0
    }
114
0
    ovs_replay_unlock();
115
116
0
    if (ovs_replay_write(f, NULL, -open_result, true)) {
117
0
        VLOG_ERR_RL(&rl, "%s: failed to write 'open' result: %d",
118
0
                    name, open_result);
119
0
    }
120
0
    if (open_result) {
121
        /* We recorded failure to open the stream. */
122
0
        ovs_replay_file_close(f);
123
0
    } else {
124
0
        s->replay_wfd = f;
125
0
    }
126
0
}
127
128
void
129
stream_replay_write(struct stream *s, const void *buffer, int n, bool is_read)
130
0
{
131
0
    int state = ovs_replay_get_state();
132
133
0
    if (OVS_LIKELY(state != OVS_REPLAY_WRITE)) {
134
0
        return;
135
0
    }
136
137
0
    if (ovs_replay_write(s->replay_wfd, buffer, n, is_read)) {
138
0
        VLOG_ERR_RL(&rl, "%s: failed to write buffer.", s->name);
139
0
    }
140
0
}
141
142
void
143
stream_replay_close_wfd(struct stream *s)
144
0
{
145
0
    if (s->replay_wfd) {
146
0
        ovs_replay_file_close(s->replay_wfd);
147
0
    }
148
0
}
149
150
static int
151
stream_replay_open(const char *name, char *suffix OVS_UNUSED,
152
                   struct stream **streamp, uint8_t dscp OVS_UNUSED)
153
0
{
154
0
    return new_replay_stream(name, streamp);
155
0
}
156
157
static void
158
stream_replay_close(struct stream *stream)
159
0
{
160
0
    struct stream_replay *s = stream_replay_cast(stream);
161
0
    ovs_replay_file_close(s->f);
162
0
    free(s);
163
0
}
164
165
static ssize_t
166
stream_replay_recv(struct stream *stream, void *buffer, size_t n)
167
0
{
168
0
    struct stream_replay *s = stream_replay_cast(stream);
169
0
    int norm_seqno = ovs_replay_normalized_seqno(s->seqno);
170
0
    int error, len;
171
172
0
    ovs_replay_lock();
173
0
    ovs_assert(norm_seqno >= ovs_replay_seqno());
174
175
0
    if (norm_seqno != ovs_replay_seqno()
176
0
        || !ovs_replay_seqno_is_read(s->seqno)) {
177
0
        error = EAGAIN;
178
0
        goto unlock;
179
0
    }
180
181
0
    error = ovs_replay_read(s->f, buffer, n, &len, &s->seqno, true);
182
0
    if (error) {
183
0
        VLOG_ERR_RL(&rl, "%s: failed to read from replay file.", stream->name);
184
0
        goto unlock;
185
0
    }
186
187
0
unlock:
188
0
    ovs_replay_unlock();
189
0
    return error ? -error : len;
190
0
}
191
192
static ssize_t
193
stream_replay_send(struct stream *stream OVS_UNUSED,
194
                   const void *buffer OVS_UNUSED, size_t n)
195
0
{
196
0
    struct stream_replay *s = stream_replay_cast(stream);
197
0
    int norm_seqno = ovs_replay_normalized_seqno(s->seqno);
198
0
    int error, len;
199
200
0
    ovs_replay_lock();
201
0
    ovs_assert(norm_seqno >= ovs_replay_seqno());
202
203
0
    if (norm_seqno != ovs_replay_seqno()
204
0
        || ovs_replay_seqno_is_read(s->seqno)) {
205
0
        error = EAGAIN;
206
0
        goto unlock;
207
0
    }
208
209
0
    error = ovs_replay_read(s->f, NULL, 0, &len, &s->seqno, false);
210
0
    if (error) {
211
0
        VLOG_ERR_RL(&rl, "%s: failed to read from replay file.", stream->name);
212
0
        goto unlock;
213
0
    }
214
0
    ovs_assert(len < 0 || len <= n);
215
216
0
unlock:
217
0
    ovs_replay_unlock();
218
0
    return error ? -error : len;
219
0
}
220
221
static void
222
stream_replay_wait(struct stream *stream, enum stream_wait_type wait)
223
0
{
224
0
    struct stream_replay *s = stream_replay_cast(stream);
225
0
    switch (wait) {
226
0
    case STREAM_CONNECT:
227
        /* Connect does nothing and always available. */
228
0
        poll_immediate_wake();
229
0
        break;
230
231
0
    case STREAM_SEND:
232
0
        if (s->seqno != INT_MAX && !ovs_replay_seqno_is_read(s->seqno)) {
233
            /* Stream waits for write. */
234
0
            poll_immediate_wake();
235
0
        }
236
0
        break;
237
238
0
    case STREAM_RECV:
239
0
        if (s->seqno != INT_MAX && ovs_replay_seqno_is_read(s->seqno)) {
240
            /* We still have something to read. */
241
0
            poll_immediate_wake();
242
0
        }
243
0
        break;
244
245
0
    default:
246
0
        OVS_NOT_REACHED();
247
0
    }
248
0
}
249
250
const struct stream_class replay_stream_class = {
251
    "replay",                   /* name */
252
    false,                      /* needs_probes */
253
    stream_replay_open,         /* open */
254
    stream_replay_close,        /* close */
255
    NULL,                       /* connect */
256
    stream_replay_recv,         /* recv */
257
    stream_replay_send,         /* send */
258
    NULL,                       /* run */
259
    NULL,                       /* run_wait */
260
    stream_replay_wait,         /* wait */
261
};
262

263
/* Passive replay stream. */
264
265
struct replay_pstream
266
{
267
    struct pstream pstream;
268
    replay_file_t f;
269
    int seqno;
270
};
271
272
const struct pstream_class preplay_pstream_class;
273
274
static struct replay_pstream *
275
replay_pstream_cast(struct pstream *pstream)
276
0
{
277
0
    pstream_assert_class(pstream, &preplay_pstream_class);
278
0
    return CONTAINER_OF(pstream, struct replay_pstream, pstream);
279
0
}
280
281
/* Creates a new pstream named 'name' that will accept new replay connections
282
 * reading them from the replay file and stores a pointer to the stream in
283
 * '*pstreamp'.
284
 *
285
 * Returns 0 if successful, otherwise a positive errno value. */
286
static int
287
pstream_replay_listen(const char *name, char *suffix OVS_UNUSED,
288
                      struct pstream **pstreamp, uint8_t dscp OVS_UNUSED)
289
0
{
290
0
    int seqno = 0, error = 0, listen_result;
291
0
    replay_file_t f;
292
293
0
    ovs_replay_lock();
294
0
    error = ovs_replay_file_open(name, &f, &seqno);
295
0
    if (error) {
296
0
        VLOG_ERR_RL(&rl, "%s: failed to open pstream.", name);
297
0
        goto unlock;
298
0
    }
299
300
0
    error = ovs_replay_read(f, NULL, 0, &listen_result, &seqno, true);
301
0
    if (error) {
302
0
        VLOG_ERR_RL(&rl, "%s: failed to read 'listen' record.", name);
303
0
        ovs_replay_file_close(f);
304
0
        goto unlock;
305
0
    }
306
307
0
    if (listen_result) {
308
0
        error = -listen_result;
309
0
        ovs_replay_file_close(f);
310
0
        goto unlock;
311
0
    }
312
313
0
    struct replay_pstream *ps = xmalloc(sizeof *ps);
314
0
    pstream_init(&ps->pstream, &preplay_pstream_class, xstrdup(name));
315
0
    ps->f = f;
316
0
    ps->seqno = seqno;
317
0
    *pstreamp = &ps->pstream;
318
0
unlock:
319
0
    ovs_replay_unlock();
320
0
    return error;
321
0
}
322
323
void
324
pstream_replay_open_wfd(struct pstream *ps, int listen_result,
325
                        const char *name)
326
0
{
327
0
    int state = ovs_replay_get_state();
328
0
    int error = 0;
329
0
    replay_file_t f;
330
331
0
    if (OVS_LIKELY(state != OVS_REPLAY_WRITE)) {
332
0
        return;
333
0
    }
334
335
0
    ovs_replay_lock();
336
0
    error = ovs_replay_file_open(name, &f, NULL);
337
0
    if (error) {
338
0
        VLOG_ERR_RL(&rl, "%s: failed to open replay file for pstream.", name);
339
0
        ovs_replay_unlock();
340
0
        return;
341
0
    }
342
0
    ovs_replay_unlock();
343
344
0
    if (ovs_replay_write(f, NULL, -listen_result, true)) {
345
0
        VLOG_ERR_RL(&rl, "%s: failed to write 'listen' result: %d",
346
0
                    name, listen_result);
347
0
    }
348
349
0
    if (listen_result) {
350
        /* We recorded failure to open the stream. */
351
0
        ovs_replay_file_close(f);
352
0
    } else {
353
0
        ps->replay_wfd = f;
354
0
    }
355
0
}
356
357
void
358
pstream_replay_write_accept(struct pstream *ps, const struct stream *s,
359
                            int accept_result)
360
0
{
361
0
    int state = ovs_replay_get_state();
362
0
    int len;
363
364
0
    if (OVS_LIKELY(state != OVS_REPLAY_WRITE)) {
365
0
        return;
366
0
    }
367
368
0
    if (!accept_result) {
369
0
        len = strlen(s->name);
370
0
        if (ovs_replay_write(ps->replay_wfd, s->name, len, true)) {
371
0
            VLOG_ERR_RL(&rl, "%s: failed to write accept name: %s",
372
0
                        ps->name, s->name);
373
0
        }
374
0
    } else if (ovs_replay_write(ps->replay_wfd, NULL, -accept_result, true)) {
375
0
        VLOG_ERR_RL(&rl, "%s: failed to write 'accept' failure: %d",
376
0
                    ps->name, accept_result);
377
0
    }
378
0
}
379
380
void
381
pstream_replay_close_wfd(struct pstream *ps)
382
0
{
383
0
    if (ps->replay_wfd) {
384
0
        ovs_replay_file_close(ps->replay_wfd);
385
0
    }
386
0
}
387
388
static void
389
pstream_replay_close(struct pstream *pstream)
390
0
{
391
0
    struct replay_pstream *ps = replay_pstream_cast(pstream);
392
393
0
    ovs_replay_file_close(ps->f);
394
0
    free(ps);
395
0
}
396
397
0
#define MAX_NAME_LEN 65536
398
399
static int
400
pstream_replay_accept(struct pstream *pstream, struct stream **new_streamp)
401
0
{
402
0
    struct replay_pstream *ps = replay_pstream_cast(pstream);
403
0
    int norm_seqno = ovs_replay_normalized_seqno(ps->seqno);
404
0
    int retval, len;
405
0
    char name[MAX_NAME_LEN];
406
407
0
    ovs_replay_lock();
408
0
    ovs_assert(norm_seqno >= ovs_replay_seqno());
409
410
0
    if (norm_seqno != ovs_replay_seqno()
411
0
        || !ovs_replay_seqno_is_read(ps->seqno)) {
412
0
        retval = EAGAIN;
413
0
        ovs_replay_unlock();
414
0
        goto exit;
415
0
    }
416
417
0
    retval = ovs_replay_read(ps->f, name, MAX_NAME_LEN - 1,
418
0
                             &len, &ps->seqno, true);
419
0
    if (retval) {
420
0
        VLOG_ERR_RL(&rl, "%s: failed to read from replay file.",
421
0
                    pstream->name);
422
0
        ovs_replay_unlock();
423
0
        goto exit;
424
0
    }
425
426
0
    ovs_replay_unlock();
427
428
0
    if (len > 0) {
429
0
        name[len] = 0;
430
0
        retval = new_replay_stream(name, new_streamp);
431
0
    } else {
432
0
        retval = -len;
433
0
    }
434
0
exit:
435
0
    return retval;
436
0
}
437
438
static void
439
pstream_replay_wait(struct pstream *pstream)
440
0
{
441
0
    struct replay_pstream *ps = replay_pstream_cast(pstream);
442
443
0
    if (ps->seqno != INT_MAX) {
444
        /* Replay always has something to say. */
445
0
        poll_immediate_wake();
446
0
    }
447
0
}
448
449
const struct pstream_class preplay_pstream_class = {
450
    "preplay",
451
    false,
452
    pstream_replay_listen,
453
    pstream_replay_close,
454
    pstream_replay_accept,
455
    pstream_replay_wait,
456
};