Coverage Report

Created: 2023-11-27 06:54

/src/openvswitch/lib/jsonrpc.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2009-2017 Nicira, Inc.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at:
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
17
#include <config.h>
18
19
#include "jsonrpc.h"
20
21
#include <errno.h>
22
23
#include "byteq.h"
24
#include "openvswitch/dynamic-string.h"
25
#include "fatal-signal.h"
26
#include "openvswitch/json.h"
27
#include "openvswitch/list.h"
28
#include "openvswitch/ofpbuf.h"
29
#include "ovs-replay.h"
30
#include "ovs-thread.h"
31
#include "openvswitch/poll-loop.h"
32
#include "reconnect.h"
33
#include "stream.h"
34
#include "svec.h"
35
#include "timeval.h"
36
#include "openvswitch/vlog.h"
37
38
VLOG_DEFINE_THIS_MODULE(jsonrpc);
39

40
struct jsonrpc {
41
    struct stream *stream;
42
    char *name;
43
    int status;
44
45
    /* Input. */
46
    struct byteq input;
47
    uint8_t input_buffer[4096];
48
    struct json_parser *parser;
49
50
    /* Output. */
51
    struct ovs_list output;     /* Contains "struct ofpbuf"s. */
52
    size_t output_count;        /* Number of elements in "output". */
53
    size_t backlog;
54
55
    /* Limits. */
56
    size_t max_output;          /* 'output_count' disconnection threshold. */
57
    size_t max_backlog;         /* 'backlog' disconnection threshold. */
58
};
59
60
/* Rate limit for error messages. */
61
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
62
63
static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *);
64
static void jsonrpc_cleanup(struct jsonrpc *);
65
static void jsonrpc_error(struct jsonrpc *, int error);
66
67
/* This is just the same as stream_open() except that it uses the default
68
 * JSONRPC port if none is specified. */
69
int
70
jsonrpc_stream_open(const char *name, struct stream **streamp, uint8_t dscp)
71
0
{
72
0
    return stream_open_with_default_port(name, OVSDB_PORT, streamp, dscp);
73
0
}
74
75
/* This is just the same as pstream_open() except that it uses the default
76
 * JSONRPC port if none is specified. */
77
int
78
jsonrpc_pstream_open(const char *name, struct pstream **pstreamp, uint8_t dscp)
79
0
{
80
0
    return pstream_open_with_default_port(name, OVSDB_PORT, pstreamp, dscp);
81
0
}
82
83
/* Returns a new JSON-RPC stream that uses 'stream' for input and output.  The
84
 * new jsonrpc object takes ownership of 'stream'. */
85
struct jsonrpc *
86
jsonrpc_open(struct stream *stream)
87
0
{
88
0
    struct jsonrpc *rpc;
89
90
0
    ovs_assert(stream != NULL);
91
92
0
    rpc = xzalloc(sizeof *rpc);
93
0
    rpc->name = xstrdup(stream_get_name(stream));
94
0
    rpc->stream = stream;
95
0
    byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer);
96
0
    ovs_list_init(&rpc->output);
97
98
0
    return rpc;
99
0
}
100
101
/* Destroys 'rpc', closing the stream on which it is based, and frees its
102
 * memory. */
103
void
104
jsonrpc_close(struct jsonrpc *rpc)
105
0
{
106
0
    if (rpc) {
107
0
        jsonrpc_cleanup(rpc);
108
0
        free(rpc->name);
109
0
        free(rpc);
110
0
    }
111
0
}
112
113
/* Performs periodic maintenance on 'rpc', such as flushing output buffers. */
114
void
115
jsonrpc_run(struct jsonrpc *rpc)
116
0
{
117
0
    if (rpc->status) {
118
0
        return;
119
0
    }
120
121
0
    stream_run(rpc->stream);
122
0
    while (!ovs_list_is_empty(&rpc->output)) {
123
0
        struct ofpbuf *buf = ofpbuf_from_list(rpc->output.next);
124
0
        int retval;
125
126
0
        retval = stream_send(rpc->stream, buf->data, buf->size);
127
0
        if (retval >= 0) {
128
0
            rpc->backlog -= retval;
129
0
            ofpbuf_pull(buf, retval);
130
0
            if (!buf->size) {
131
0
                ovs_list_remove(&buf->list_node);
132
0
                rpc->output_count--;
133
0
                ofpbuf_delete(buf);
134
0
            }
135
0
        } else {
136
0
            if (retval != -EAGAIN) {
137
0
                VLOG_WARN_RL(&rl, "%s: send error: %s",
138
0
                             rpc->name, ovs_strerror(-retval));
139
0
                jsonrpc_error(rpc, -retval);
140
0
            }
141
0
            break;
142
0
        }
143
0
    }
144
0
}
145
146
/* Arranges for the poll loop to wake up when 'rpc' needs to perform
147
 * maintenance activities. */
148
void
149
jsonrpc_wait(struct jsonrpc *rpc)
150
0
{
151
0
    if (!rpc->status) {
152
0
        stream_run_wait(rpc->stream);
153
0
        if (!ovs_list_is_empty(&rpc->output)) {
154
0
            stream_send_wait(rpc->stream);
155
0
        }
156
0
    }
157
0
}
158
159
/*
160
 * Returns the current status of 'rpc'.  The possible return values are:
161
 * - 0: no error yet
162
 * - >0: errno value
163
 * - EOF: end of file (remote end closed connection; not necessarily an error).
164
 *
165
 * When this function returns nonzero, 'rpc' is effectively out of
166
 * commission.  'rpc' will not receive any more messages and any further
167
 * messages that one attempts to send with 'rpc' will be discarded.  The
168
 * caller can keep 'rpc' around as long as it wants, but it's not going
169
 * to provide any more useful services.
170
 */
171
int
172
jsonrpc_get_status(const struct jsonrpc *rpc)
173
0
{
174
0
    return rpc->status;
175
0
}
176
177
/* Returns the number of bytes buffered by 'rpc' to be written to the
178
 * underlying stream.  Always returns 0 if 'rpc' has encountered an error or if
179
 * the remote end closed the connection. */
180
size_t
181
jsonrpc_get_backlog(const struct jsonrpc *rpc)
182
0
{
183
0
    return rpc->status ? 0 : rpc->backlog;
184
0
}
185
186
/* Sets thresholds for send backlog.  If send backlog contains more than
187
 * 'max_n_msgs' messages or is larger than 'max_backlog_bytes' bytes,
188
 * connection will be dropped. */
189
void
190
jsonrpc_set_backlog_threshold(struct jsonrpc *rpc,
191
                              size_t max_n_msgs, size_t max_backlog_bytes)
192
0
{
193
0
    rpc->max_output = max_n_msgs;
194
0
    rpc->max_backlog = max_backlog_bytes;
195
0
}
196
197
/* Returns the number of bytes that have been received on 'rpc''s underlying
198
 * stream.  (The value wraps around if it exceeds UINT_MAX.) */
199
unsigned int
200
jsonrpc_get_received_bytes(const struct jsonrpc *rpc)
201
0
{
202
0
    return rpc->input.head;
203
0
}
204
205
/* Returns 'rpc''s name, that is, the name returned by stream_get_name() for
206
 * the stream underlying 'rpc' when 'rpc' was created. */
207
const char *
208
jsonrpc_get_name(const struct jsonrpc *rpc)
209
0
{
210
0
    return rpc->name;
211
0
}
212
213
static void
214
jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
215
                const struct jsonrpc_msg *msg)
216
0
{
217
0
    if (VLOG_IS_DBG_ENABLED()) {
218
0
        struct ds s = DS_EMPTY_INITIALIZER;
219
0
        if (msg->method) {
220
0
            ds_put_format(&s, ", method=\"%s\"", msg->method);
221
0
        }
222
0
        if (msg->params) {
223
0
            ds_put_cstr(&s, ", params=");
224
0
            json_to_ds(msg->params, 0, &s);
225
0
        }
226
0
        if (msg->result) {
227
0
            ds_put_cstr(&s, ", result=");
228
0
            json_to_ds(msg->result, 0, &s);
229
0
        }
230
0
        if (msg->error) {
231
0
            ds_put_cstr(&s, ", error=");
232
0
            json_to_ds(msg->error, 0, &s);
233
0
        }
234
0
        if (msg->id) {
235
0
            ds_put_cstr(&s, ", id=");
236
0
            json_to_ds(msg->id, 0, &s);
237
0
        }
238
0
        VLOG_DBG("%s: %s %s%s", rpc->name, title,
239
0
                 jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
240
0
        ds_destroy(&s);
241
0
    }
242
0
}
243
244
/* Schedules 'msg' to be sent on 'rpc' and returns 'rpc''s status (as with
245
 * jsonrpc_get_status()).
246
 *
247
 * If 'msg' cannot be sent immediately, it is appended to a buffer.  The caller
248
 * is responsible for ensuring that the amount of buffered data is somehow
249
 * limited.  (jsonrpc_get_backlog() returns the amount of data currently
250
 * buffered in 'rpc'.)
251
 *
252
 * Always takes ownership of 'msg', regardless of success. */
253
int
254
jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
255
0
{
256
0
    struct ofpbuf *buf;
257
0
    struct json *json;
258
0
    struct ds ds = DS_EMPTY_INITIALIZER;
259
0
    size_t length;
260
261
0
    if (rpc->status) {
262
0
        jsonrpc_msg_destroy(msg);
263
0
        return rpc->status;
264
0
    }
265
266
0
    jsonrpc_log_msg(rpc, "send", msg);
267
268
0
    json = jsonrpc_msg_to_json(msg);
269
0
    json_to_ds(json, 0, &ds);
270
0
    length = ds.length;
271
0
    json_destroy(json);
272
273
0
    buf = xmalloc(sizeof *buf);
274
0
    ofpbuf_use_ds(buf, &ds);
275
0
    ovs_list_push_back(&rpc->output, &buf->list_node);
276
0
    rpc->output_count++;
277
0
    rpc->backlog += length;
278
279
0
    if (rpc->output_count >= 50) {
280
0
        static struct vlog_rate_limit bl_rl = VLOG_RATE_LIMIT_INIT(5, 5);
281
0
        bool disconnect = false;
282
283
0
        VLOG_INFO_RL(&bl_rl, "excessive sending backlog, jsonrpc: %s, num of"
284
0
                     " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
285
0
                     rpc->output_count, rpc->backlog);
286
0
        if (rpc->max_output && rpc->output_count > rpc->max_output) {
287
0
            disconnect = true;
288
0
            VLOG_WARN("sending backlog exceeded maximum number of messages (%"
289
0
                      PRIuSIZE" > %"PRIuSIZE"), disconnecting, jsonrpc: %s.",
290
0
                      rpc->output_count, rpc->max_output, rpc->name);
291
0
        } else if (rpc->max_backlog && rpc->backlog > rpc->max_backlog) {
292
0
            disconnect = true;
293
0
            VLOG_WARN("sending backlog exceeded maximum size (%"PRIuSIZE" > %"
294
0
                      PRIuSIZE" bytes), disconnecting, jsonrpc: %s.",
295
0
                      rpc->backlog, rpc->max_backlog, rpc->name);
296
0
        }
297
0
        if (disconnect) {
298
0
            jsonrpc_error(rpc, E2BIG);
299
0
        }
300
0
    }
301
302
0
    if (rpc->backlog == length) {
303
0
        jsonrpc_run(rpc);
304
0
    }
305
0
    return rpc->status;
306
0
}
307
308
/* Attempts to receive a message from 'rpc'.
309
 *
310
 * If successful, stores the received message in '*msgp' and returns 0.  The
311
 * caller takes ownership of '*msgp' and must eventually destroy it with
312
 * jsonrpc_msg_destroy().
313
 *
314
 * Otherwise, stores NULL in '*msgp' and returns one of the following:
315
 *
316
 *   - EAGAIN: No message has been received.
317
 *
318
 *   - EOF: The remote end closed the connection gracefully.
319
 *
320
 *   - Otherwise an errno value that represents a JSON-RPC protocol violation
321
 *     or another error fatal to the connection.  'rpc' will not send or
322
 *     receive any more messages.
323
 */
324
int
325
jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
326
0
{
327
0
    int i;
328
329
0
    *msgp = NULL;
330
0
    if (rpc->status) {
331
0
        return rpc->status;
332
0
    }
333
334
0
    for (i = 0; i < 50; i++) {
335
0
        size_t n, used;
336
337
        /* Fill our input buffer if it's empty. */
338
0
        if (byteq_is_empty(&rpc->input)) {
339
0
            size_t chunk;
340
0
            int retval;
341
342
0
            chunk = byteq_headroom(&rpc->input);
343
0
            retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
344
0
            if (retval < 0) {
345
0
                if (retval == -EAGAIN) {
346
0
                    return EAGAIN;
347
0
                } else {
348
0
                    VLOG_WARN_RL(&rl, "%s: receive error: %s",
349
0
                                 rpc->name, ovs_strerror(-retval));
350
0
                    jsonrpc_error(rpc, -retval);
351
0
                    return rpc->status;
352
0
                }
353
0
            } else if (retval == 0) {
354
0
                jsonrpc_error(rpc, EOF);
355
0
                return EOF;
356
0
            }
357
0
            byteq_advance_head(&rpc->input, retval);
358
0
        }
359
360
        /* We have some input.  Feed it into the JSON parser. */
361
0
        if (!rpc->parser) {
362
0
            rpc->parser = json_parser_create(0);
363
0
        }
364
0
        n = byteq_tailroom(&rpc->input);
365
0
        used = json_parser_feed(rpc->parser,
366
0
                                (char *) byteq_tail(&rpc->input), n);
367
0
        byteq_advance_tail(&rpc->input, used);
368
369
        /* If we have complete JSON, attempt to parse it as JSON-RPC. */
370
0
        if (json_parser_is_done(rpc->parser)) {
371
0
            *msgp = jsonrpc_parse_received_message(rpc);
372
0
            if (*msgp) {
373
0
                return 0;
374
0
            }
375
376
0
            if (rpc->status) {
377
0
                const struct byteq *q = &rpc->input;
378
0
                if (q->head <= q->size) {
379
0
                    stream_report_content(q->buffer, q->head, STREAM_JSONRPC,
380
0
                                          &this_module, rpc->name);
381
0
                }
382
0
                return rpc->status;
383
0
            }
384
0
        }
385
0
    }
386
387
0
    return EAGAIN;
388
0
}
389
390
/* Causes the poll loop to wake up when jsonrpc_recv() may return a value other
391
 * than EAGAIN. */
392
void
393
jsonrpc_recv_wait(struct jsonrpc *rpc)
394
0
{
395
0
    if (rpc->status || !byteq_is_empty(&rpc->input)) {
396
0
        poll_immediate_wake_at(rpc->name);
397
0
    } else {
398
0
        stream_recv_wait(rpc->stream);
399
0
    }
400
0
}
401
402
/* Sends 'msg' on 'rpc' and waits for it to be successfully queued to the
403
 * underlying stream.  Returns 0 if 'msg' was sent successfully, otherwise a
404
 * status value (see jsonrpc_get_status()).
405
 *
406
 * Always takes ownership of 'msg', regardless of success. */
407
int
408
jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
409
0
{
410
0
    int error;
411
412
0
    fatal_signal_run();
413
414
0
    error = jsonrpc_send(rpc, msg);
415
0
    if (error) {
416
0
        return error;
417
0
    }
418
419
0
    for (;;) {
420
0
        jsonrpc_run(rpc);
421
0
        if (ovs_list_is_empty(&rpc->output) || rpc->status) {
422
0
            return rpc->status;
423
0
        }
424
0
        jsonrpc_wait(rpc);
425
0
        poll_block();
426
0
    }
427
0
}
428
429
/* Waits for a message to be received on 'rpc'.  Same semantics as
430
 * jsonrpc_recv() except that EAGAIN will never be returned. */
431
int
432
jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
433
0
{
434
0
    for (;;) {
435
0
        int error = jsonrpc_recv(rpc, msgp);
436
0
        if (error != EAGAIN) {
437
0
            fatal_signal_run();
438
0
            return error;
439
0
        }
440
441
0
        jsonrpc_run(rpc);
442
0
        jsonrpc_wait(rpc);
443
0
        jsonrpc_recv_wait(rpc);
444
0
        poll_block();
445
0
    }
446
0
}
447
448
/* Sends 'request' to 'rpc' then waits for a reply.  The return value is 0 if
449
 * successful, in which case '*replyp' is set to the reply, which the caller
450
 * must eventually free with jsonrpc_msg_destroy().  Otherwise returns a status
451
 * value (see jsonrpc_get_status()).
452
 *
453
 * Discards any message received on 'rpc' that is not a reply to 'request'
454
 * (based on message id).
455
 *
456
 * Always takes ownership of 'request', regardless of success. */
457
int
458
jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
459
                       struct jsonrpc_msg **replyp)
460
0
{
461
0
    struct jsonrpc_msg *reply = NULL;
462
0
    struct json *id;
463
0
    int error;
464
465
0
    id = json_clone(request->id);
466
0
    error = jsonrpc_send_block(rpc, request);
467
0
    if (!error) {
468
0
        for (;;) {
469
0
            error = jsonrpc_recv_block(rpc, &reply);
470
0
            if (error) {
471
0
                break;
472
0
            }
473
0
            if ((reply->type == JSONRPC_REPLY || reply->type == JSONRPC_ERROR)
474
0
                && json_equal(id, reply->id)) {
475
0
                break;
476
0
            }
477
0
            jsonrpc_msg_destroy(reply);
478
0
        }
479
0
    }
480
0
    *replyp = error ? NULL : reply;
481
0
    json_destroy(id);
482
0
    return error;
483
0
}
484
485
/* Attempts to parse the content of 'rpc->parser' (which is complete JSON) as a
486
 * JSON-RPC message.  If successful, returns the JSON-RPC message.  On failure,
487
 * signals an error on 'rpc' with jsonrpc_error() and returns NULL. */
488
static struct jsonrpc_msg *
489
jsonrpc_parse_received_message(struct jsonrpc *rpc)
490
0
{
491
0
    struct jsonrpc_msg *msg;
492
0
    struct json *json;
493
0
    char *error;
494
495
0
    json = json_parser_finish(rpc->parser);
496
0
    rpc->parser = NULL;
497
0
    if (json->type == JSON_STRING) {
498
0
        VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
499
0
                     rpc->name, json_string(json));
500
0
        jsonrpc_error(rpc, EPROTO);
501
0
        json_destroy(json);
502
0
        return NULL;
503
0
    }
504
505
0
    error = jsonrpc_msg_from_json(json, &msg);
506
0
    if (error) {
507
0
        VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
508
0
                     rpc->name, error);
509
0
        free(error);
510
0
        jsonrpc_error(rpc, EPROTO);
511
0
        return NULL;
512
0
    }
513
514
0
    jsonrpc_log_msg(rpc, "received", msg);
515
0
    return msg;
516
0
}
517
518
static void
519
jsonrpc_error(struct jsonrpc *rpc, int error)
520
0
{
521
0
    ovs_assert(error);
522
0
    if (!rpc->status) {
523
0
        rpc->status = error;
524
0
        jsonrpc_cleanup(rpc);
525
0
    }
526
0
}
527
528
static void
529
jsonrpc_cleanup(struct jsonrpc *rpc)
530
0
{
531
0
    stream_close(rpc->stream);
532
0
    rpc->stream = NULL;
533
534
0
    json_parser_abort(rpc->parser);
535
0
    rpc->parser = NULL;
536
537
0
    ofpbuf_list_delete(&rpc->output);
538
0
    rpc->backlog = 0;
539
0
    rpc->output_count = 0;
540
0
}
541

542
static struct jsonrpc_msg *
543
jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
544
                struct json *params, struct json *result, struct json *error,
545
                struct json *id)
546
0
{
547
0
    struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
548
0
    msg->type = type;
549
0
    msg->method = nullable_xstrdup(method);
550
0
    msg->params = params;
551
0
    msg->result = result;
552
0
    msg->error = error;
553
0
    msg->id = id;
554
0
    return msg;
555
0
}
556
557
static struct json *
558
jsonrpc_create_id(void)
559
0
{
560
0
    static atomic_count next_id = ATOMIC_COUNT_INIT(0);
561
0
    unsigned int id;
562
563
0
    id = atomic_count_inc(&next_id);
564
0
    return json_integer_create(id);
565
0
}
566
567
struct jsonrpc_msg *
568
jsonrpc_create_request(const char *method, struct json *params,
569
                       struct json **idp)
570
0
{
571
0
    struct json *id = jsonrpc_create_id();
572
0
    if (idp) {
573
0
        *idp = json_clone(id);
574
0
    }
575
0
    return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL, id);
576
0
}
577
578
struct jsonrpc_msg *
579
jsonrpc_create_notify(const char *method, struct json *params)
580
0
{
581
0
    return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
582
0
}
583
584
struct jsonrpc_msg *
585
jsonrpc_create_reply(struct json *result, const struct json *id)
586
0
{
587
0
    return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
588
0
                           json_clone(id));
589
0
}
590
591
struct jsonrpc_msg *
592
jsonrpc_create_error(struct json *error, const struct json *id)
593
0
{
594
0
    return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
595
0
                           json_clone(id));
596
0
}
597
598
struct jsonrpc_msg *
599
jsonrpc_msg_clone(const struct jsonrpc_msg *old)
600
0
{
601
0
    return jsonrpc_create(old->type, old->method,
602
0
                          json_nullable_clone(old->params),
603
0
                          json_nullable_clone(old->result),
604
0
                          json_nullable_clone(old->error),
605
0
                          json_nullable_clone(old->id));
606
0
}
607
608
const char *
609
jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
610
238
{
611
238
    switch (type) {
612
15
    case JSONRPC_REQUEST:
613
15
        return "request";
614
615
9
    case JSONRPC_NOTIFY:
616
9
        return "notification";
617
618
8
    case JSONRPC_REPLY:
619
8
        return "reply";
620
621
206
    case JSONRPC_ERROR:
622
206
        return "error";
623
238
    }
624
0
    return "(null)";
625
238
}
626
627
char *
628
jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
629
246
{
630
246
    const char *type_name;
631
246
    unsigned int pattern;
632
633
246
    if (m->params && m->params->type != JSON_ARRAY) {
634
8
        return xstrdup("\"params\" must be JSON array");
635
8
    }
636
637
238
    switch (m->type) {
638
15
    case JSONRPC_REQUEST:
639
15
        pattern = 0x11001;
640
15
        break;
641
642
9
    case JSONRPC_NOTIFY:
643
9
        pattern = 0x11000;
644
9
        break;
645
646
8
    case JSONRPC_REPLY:
647
8
        pattern = 0x00101;
648
8
        break;
649
650
206
    case JSONRPC_ERROR:
651
206
        pattern = 0x00011;
652
206
        break;
653
654
0
    default:
655
0
        return xasprintf("invalid JSON-RPC message type %d", m->type);
656
238
    }
657
658
238
    type_name = jsonrpc_msg_type_to_string(m->type);
659
238
    if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
660
13
        return xasprintf("%s must%s have \"method\"",
661
13
                         type_name, (pattern & 0x10000) ? "" : " not");
662
663
13
    }
664
225
    if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
665
4
        return xasprintf("%s must%s have \"params\"",
666
4
                         type_name, (pattern & 0x1000) ? "" : " not");
667
668
4
    }
669
221
    if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
670
0
        return xasprintf("%s must%s have \"result\"",
671
0
                         type_name, (pattern & 0x100) ? "" : " not");
672
673
0
    }
674
221
    if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
675
2
        return xasprintf("%s must%s have \"error\"",
676
2
                         type_name, (pattern & 0x10) ? "" : " not");
677
678
2
    }
679
219
    if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
680
6
        return xasprintf("%s must%s have \"id\"",
681
6
                         type_name, (pattern & 0x1) ? "" : " not");
682
683
6
    }
684
213
    return NULL;
685
219
}
686
687
void
688
jsonrpc_msg_destroy(struct jsonrpc_msg *m)
689
1.33k
{
690
1.33k
    if (m) {
691
398
        free(m->method);
692
398
        json_destroy(m->params);
693
398
        json_destroy(m->result);
694
398
        json_destroy(m->error);
695
398
        json_destroy(m->id);
696
398
        free(m);
697
398
    }
698
1.33k
}
699
700
static struct json *
701
null_from_json_null(struct json *json)
702
2.44k
{
703
2.44k
    if (json && json->type == JSON_NULL) {
704
4
        json_destroy(json);
705
4
        return NULL;
706
4
    }
707
2.44k
    return json;
708
2.44k
}
709
710
char *
711
jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
712
1.54k
{
713
1.54k
    struct json *method = NULL;
714
1.54k
    struct jsonrpc_msg *msg = NULL;
715
1.54k
    struct shash *object;
716
1.54k
    char *error;
717
718
1.54k
    if (json->type != JSON_OBJECT) {
719
926
        error = xstrdup("message is not a JSON object");
720
926
        goto exit;
721
926
    }
722
617
    object = json_object(json);
723
724
617
    method = shash_find_and_delete(object, "method");
725
617
    if (method && method->type != JSON_STRING) {
726
6
        error = xstrdup("method is not a JSON string");
727
6
        goto exit;
728
6
    }
729
730
611
    msg = xzalloc(sizeof *msg);
731
611
    msg->method = method ? xstrdup(method->string) : NULL;
732
611
    msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
733
611
    msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
734
611
    msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
735
611
    msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
736
611
    msg->type = (msg->result ? JSONRPC_REPLY
737
611
                 : msg->error ? JSONRPC_ERROR
738
600
                 : msg->id ? JSONRPC_REQUEST
739
389
                 : JSONRPC_NOTIFY);
740
611
    if (!shash_is_empty(object)) {
741
365
        error = xasprintf("message has unexpected member \"%s\"",
742
365
                          shash_first(object)->name);
743
365
        goto exit;
744
365
    }
745
246
    error = jsonrpc_msg_is_valid(msg);
746
246
    if (error) {
747
33
        goto exit;
748
33
    }
749
750
1.54k
exit:
751
1.54k
    json_destroy(method);
752
1.54k
    json_destroy(json);
753
1.54k
    if (error) {
754
1.33k
        jsonrpc_msg_destroy(msg);
755
1.33k
        msg = NULL;
756
1.33k
    }
757
1.54k
    *msgp = msg;
758
1.54k
    return error;
759
246
}
760
761
/* Returns 'm' converted to JSON suitable for sending as a JSON-RPC message.
762
 *
763
 * Consumes and destroys 'm'. */
764
struct json *
765
jsonrpc_msg_to_json(struct jsonrpc_msg *m)
766
213
{
767
213
    struct json *json = json_object_create();
768
769
213
    if (m->method) {
770
10
        json_object_put(json, "method", json_string_create_nocopy(m->method));
771
10
    }
772
773
213
    if (m->params) {
774
10
        json_object_put(json, "params", m->params);
775
10
    }
776
777
213
    if (m->result) {
778
2
        json_object_put(json, "result", m->result);
779
211
    } else if (m->type == JSONRPC_ERROR) {
780
201
        json_object_put(json, "result", json_null_create());
781
201
    }
782
783
213
    if (m->error) {
784
201
        json_object_put(json, "error", m->error);
785
201
    } else if (m->type == JSONRPC_REPLY) {
786
2
        json_object_put(json, "error", json_null_create());
787
2
    }
788
789
213
    if (m->id) {
790
211
        json_object_put(json, "id", m->id);
791
211
    } else if (m->type == JSONRPC_NOTIFY) {
792
2
        json_object_put(json, "id", json_null_create());
793
2
    }
794
795
213
    free(m);
796
797
213
    return json;
798
213
}
799
800
char *
801
jsonrpc_msg_to_string(const struct jsonrpc_msg *m)
802
0
{
803
0
    struct jsonrpc_msg *copy = jsonrpc_msg_clone(m);
804
0
    struct json *json = jsonrpc_msg_to_json(copy);
805
0
    char *s = json_to_string(json, JSSF_SORT);
806
0
    json_destroy(json);
807
0
    return s;
808
0
}
809

810
/* A JSON-RPC session with reconnection. */
811
812
struct jsonrpc_session {
813
    struct svec remotes;
814
    size_t next_remote;
815
816
    struct reconnect *reconnect;
817
    struct jsonrpc *rpc;
818
    struct stream *stream;
819
    struct pstream *pstream;
820
    int last_error;
821
    unsigned int seqno;
822
    uint8_t dscp;
823
824
    /* Limits for jsonrpc. */
825
    size_t max_n_msgs;
826
    size_t max_backlog_bytes;
827
};
828
829
static void
830
jsonrpc_session_pick_remote(struct jsonrpc_session *s)
831
0
{
832
0
    reconnect_set_name(s->reconnect,
833
0
                       s->remotes.names[s->next_remote++ % s->remotes.n]);
834
0
}
835
836
/* Creates and returns a jsonrpc_session to 'name', which should be a string
837
 * acceptable to stream_open() or pstream_open().
838
 *
839
 * If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
840
 * jsonrpc_session connects to 'name'.  If 'retry' is true, then the new
841
 * session connects and reconnects to 'name', with backoff.  If 'retry' is
842
 * false, the new session will only try to connect once and after a connection
843
 * failure or a disconnection jsonrpc_session_is_alive() will return false for
844
 * the new session.
845
 *
846
 * If 'name' is a passive connection method, e.g. "ptcp:", the new
847
 * jsonrpc_session listens for connections to 'name'.  It maintains at most one
848
 * connection at any given time.  Any new connection causes the previous one
849
 * (if any) to be dropped. */
850
struct jsonrpc_session *
851
jsonrpc_session_open(const char *name, bool retry)
852
0
{
853
0
    const struct svec remotes = { .names = (char **) &name, .n = 1 };
854
0
    return jsonrpc_session_open_multiple(&remotes, retry);
855
0
}
856
857
struct jsonrpc_session *
858
jsonrpc_session_open_multiple(const struct svec *remotes, bool retry)
859
0
{
860
0
    struct jsonrpc_session *s;
861
862
0
    s = xmalloc(sizeof *s);
863
864
    /* Set 'n' remotes from 'names'. */
865
0
    svec_clone(&s->remotes, remotes);
866
0
    if (!s->remotes.n) {
867
0
        svec_add(&s->remotes, "invalid:");
868
0
    }
869
0
    s->next_remote = 0;
870
871
0
    s->reconnect = reconnect_create(time_msec());
872
0
    jsonrpc_session_pick_remote(s);
873
0
    reconnect_enable(s->reconnect, time_msec());
874
0
    reconnect_set_backoff_free_tries(s->reconnect, remotes->n);
875
0
    s->rpc = NULL;
876
0
    s->stream = NULL;
877
0
    s->pstream = NULL;
878
0
    s->seqno = 0;
879
0
    s->dscp = 0;
880
0
    s->last_error = 0;
881
882
0
    jsonrpc_session_set_backlog_threshold(s, 0, 0);
883
884
0
    const char *name = reconnect_get_name(s->reconnect);
885
0
    if (!pstream_verify_name(name)) {
886
0
        reconnect_set_passive(s->reconnect, true, time_msec());
887
0
    } else if (!retry) {
888
0
        reconnect_set_max_tries(s->reconnect, remotes->n);
889
0
        reconnect_set_backoff(s->reconnect, INT_MAX, INT_MAX);
890
0
    }
891
892
0
    if (!stream_or_pstream_needs_probes(name) || ovs_replay_is_active()) {
893
0
        reconnect_set_probe_interval(s->reconnect, 0);
894
0
    }
895
896
0
    return s;
897
0
}
898
899
/* Creates and returns a jsonrpc_session that is initially connected to
900
 * 'jsonrpc'.  If the connection is dropped, it will not be reconnected.
901
 *
902
 * On the assumption that such connections are likely to be short-lived
903
 * (e.g. from ovs-vsctl), informational logging for them is suppressed. */
904
struct jsonrpc_session *
905
jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp)
906
0
{
907
0
    struct jsonrpc_session *s;
908
909
0
    s = xmalloc(sizeof *s);
910
0
    svec_init(&s->remotes);
911
0
    svec_add(&s->remotes, jsonrpc_get_name(jsonrpc));
912
0
    s->next_remote = 0;
913
0
    s->reconnect = reconnect_create(time_msec());
914
0
    reconnect_set_quiet(s->reconnect, true);
915
0
    reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
916
0
    reconnect_set_max_tries(s->reconnect, 0);
917
0
    reconnect_connected(s->reconnect, time_msec());
918
919
0
    if (ovs_replay_is_active()) {
920
0
        reconnect_set_probe_interval(s->reconnect, 0);
921
0
    }
922
923
0
    s->dscp = dscp;
924
0
    s->rpc = jsonrpc;
925
0
    s->stream = NULL;
926
0
    s->pstream = NULL;
927
0
    s->seqno = 1;
928
929
0
    jsonrpc_session_set_backlog_threshold(s, 0, 0);
930
0
    return s;
931
0
}
932
933
void
934
jsonrpc_session_close(struct jsonrpc_session *s)
935
0
{
936
0
    if (s) {
937
0
        jsonrpc_close(s->rpc);
938
0
        reconnect_destroy(s->reconnect);
939
0
        stream_close(s->stream);
940
0
        pstream_close(s->pstream);
941
0
        svec_destroy(&s->remotes);
942
0
        free(s);
943
0
    }
944
0
}
945
946
struct jsonrpc *
947
jsonrpc_session_steal(struct jsonrpc_session *s)
948
0
{
949
0
    struct jsonrpc *rpc = s->rpc;
950
0
    s->rpc = NULL;
951
0
    jsonrpc_session_close(s);
952
0
    return rpc;
953
0
}
954
955
void
956
jsonrpc_session_replace(struct jsonrpc_session *s, struct jsonrpc *rpc)
957
0
{
958
0
    if (s->rpc) {
959
0
        jsonrpc_close(s->rpc);
960
0
    }
961
0
    s->rpc = rpc;
962
0
    if (s->rpc) {
963
0
        reconnect_set_name(s->reconnect, jsonrpc_get_name(s->rpc));
964
0
        reconnect_connected(s->reconnect, time_msec());
965
0
    }
966
0
}
967
968
static void
969
jsonrpc_session_disconnect(struct jsonrpc_session *s)
970
0
{
971
0
    if (s->rpc) {
972
0
        jsonrpc_error(s->rpc, EOF);
973
0
        jsonrpc_close(s->rpc);
974
0
        s->rpc = NULL;
975
0
    } else if (s->stream) {
976
0
        stream_close(s->stream);
977
0
        s->stream = NULL;
978
0
    } else {
979
0
        return;
980
0
    }
981
982
0
    s->seqno++;
983
0
    jsonrpc_session_pick_remote(s);
984
0
}
985
986
static void
987
jsonrpc_session_connect(struct jsonrpc_session *s)
988
0
{
989
0
    const char *name = reconnect_get_name(s->reconnect);
990
0
    int error;
991
992
0
    jsonrpc_session_disconnect(s);
993
0
    if (!reconnect_is_passive(s->reconnect)) {
994
0
        error = jsonrpc_stream_open(name, &s->stream, s->dscp);
995
0
        if (!error) {
996
0
            reconnect_connecting(s->reconnect, time_msec());
997
0
        } else {
998
0
            s->last_error = error;
999
0
        }
1000
0
    } else {
1001
0
        error = s->pstream ? 0 : jsonrpc_pstream_open(name, &s->pstream,
1002
0
                                                      s->dscp);
1003
0
        if (!error) {
1004
0
            reconnect_listening(s->reconnect, time_msec());
1005
0
        }
1006
0
    }
1007
1008
0
    if (error) {
1009
0
        reconnect_connect_failed(s->reconnect, time_msec(), error);
1010
0
        jsonrpc_session_pick_remote(s);
1011
0
    }
1012
0
}
1013
1014
void
1015
jsonrpc_session_run(struct jsonrpc_session *s)
1016
0
{
1017
0
    if (s->pstream) {
1018
0
        struct stream *stream;
1019
0
        int error;
1020
1021
0
        error = pstream_accept(s->pstream, &stream);
1022
0
        if (!error) {
1023
0
            if (s->rpc || s->stream) {
1024
0
                VLOG_INFO_RL(&rl,
1025
0
                             "%s: new connection replacing active connection",
1026
0
                             reconnect_get_name(s->reconnect));
1027
0
                jsonrpc_session_disconnect(s);
1028
0
            }
1029
0
            reconnect_connected(s->reconnect, time_msec());
1030
0
            s->rpc = jsonrpc_open(stream);
1031
0
            jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
1032
0
                                                  s->max_backlog_bytes);
1033
0
            s->seqno++;
1034
0
        } else if (error != EAGAIN) {
1035
0
            reconnect_listen_error(s->reconnect, time_msec(), error);
1036
0
            pstream_close(s->pstream);
1037
0
            s->pstream = NULL;
1038
0
        }
1039
0
    }
1040
1041
0
    if (s->rpc) {
1042
0
        size_t backlog;
1043
0
        int error;
1044
1045
0
        backlog = jsonrpc_get_backlog(s->rpc);
1046
0
        jsonrpc_run(s->rpc);
1047
0
        if (jsonrpc_get_backlog(s->rpc) < backlog) {
1048
            /* Data previously caught in a queue was successfully sent (or
1049
             * there's an error, which we'll catch below.)
1050
             *
1051
             * We don't count data that is successfully sent immediately as
1052
             * activity, because there's a lot of queuing downstream from us,
1053
             * which means that we can push a lot of data into a connection
1054
             * that has stalled and won't ever recover.
1055
             */
1056
0
            reconnect_activity(s->reconnect, time_msec());
1057
0
        }
1058
1059
0
        error = jsonrpc_get_status(s->rpc);
1060
0
        if (error) {
1061
0
            reconnect_disconnected(s->reconnect, time_msec(), error);
1062
0
            jsonrpc_session_disconnect(s);
1063
0
            s->last_error = error;
1064
0
        }
1065
0
    } else if (s->stream) {
1066
0
        int error;
1067
1068
0
        stream_run(s->stream);
1069
0
        error = stream_connect(s->stream);
1070
0
        if (!error) {
1071
0
            reconnect_connected(s->reconnect, time_msec());
1072
0
            s->rpc = jsonrpc_open(s->stream);
1073
0
            jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
1074
0
                                                  s->max_backlog_bytes);
1075
0
            s->stream = NULL;
1076
0
            s->seqno++;
1077
0
        } else if (error != EAGAIN) {
1078
0
            reconnect_connect_failed(s->reconnect, time_msec(), error);
1079
0
            jsonrpc_session_pick_remote(s);
1080
0
            stream_close(s->stream);
1081
0
            s->stream = NULL;
1082
0
            s->last_error = error;
1083
0
        }
1084
0
    }
1085
1086
0
    switch (reconnect_run(s->reconnect, time_msec())) {
1087
0
    case RECONNECT_CONNECT:
1088
0
        jsonrpc_session_connect(s);
1089
0
        break;
1090
1091
0
    case RECONNECT_DISCONNECT:
1092
0
        reconnect_disconnected(s->reconnect, time_msec(), 0);
1093
0
        jsonrpc_session_disconnect(s);
1094
0
        break;
1095
1096
0
    case RECONNECT_PROBE:
1097
0
        if (s->rpc) {
1098
0
            struct json *params;
1099
0
            struct jsonrpc_msg *request;
1100
1101
0
            params = json_array_create_empty();
1102
0
            request = jsonrpc_create_request("echo", params, NULL);
1103
0
            json_destroy(request->id);
1104
0
            request->id = json_string_create("echo");
1105
0
            jsonrpc_send(s->rpc, request);
1106
0
        }
1107
0
        break;
1108
0
    }
1109
0
}
1110
1111
void
1112
jsonrpc_session_wait(struct jsonrpc_session *s)
1113
0
{
1114
0
    if (s->rpc) {
1115
0
        jsonrpc_wait(s->rpc);
1116
0
    } else if (s->stream) {
1117
0
        stream_run_wait(s->stream);
1118
0
        stream_connect_wait(s->stream);
1119
0
    }
1120
0
    if (s->pstream) {
1121
0
        pstream_wait(s->pstream);
1122
0
    }
1123
0
    reconnect_wait(s->reconnect, time_msec());
1124
0
}
1125
1126
size_t
1127
jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
1128
0
{
1129
0
    return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
1130
0
}
1131
1132
/* Always returns a pointer to a valid C string, assuming 's' was initialized
1133
 * correctly. */
1134
const char *
1135
jsonrpc_session_get_name(const struct jsonrpc_session *s)
1136
0
{
1137
0
    return reconnect_get_name(s->reconnect);
1138
0
}
1139
1140
const char *
1141
jsonrpc_session_get_id(const struct jsonrpc_session *s)
1142
0
{
1143
0
    if (s->rpc && s->rpc->stream) {
1144
0
        return stream_get_peer_id(s->rpc->stream);
1145
0
    } else {
1146
0
        return NULL;
1147
0
    }
1148
0
}
1149
1150
size_t
1151
jsonrpc_session_get_n_remotes(const struct jsonrpc_session *s)
1152
0
{
1153
0
    return s->remotes.n;
1154
0
}
1155
1156
/* Always takes ownership of 'msg', regardless of success. */
1157
int
1158
jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
1159
0
{
1160
0
    if (s->rpc) {
1161
0
        return jsonrpc_send(s->rpc, msg);
1162
0
    } else {
1163
0
        jsonrpc_msg_destroy(msg);
1164
0
        return ENOTCONN;
1165
0
    }
1166
0
}
1167
1168
struct jsonrpc_msg *
1169
jsonrpc_session_recv(struct jsonrpc_session *s)
1170
0
{
1171
0
    if (s->rpc) {
1172
0
        unsigned int received_bytes;
1173
0
        struct jsonrpc_msg *msg;
1174
1175
0
        received_bytes = jsonrpc_get_received_bytes(s->rpc);
1176
0
        jsonrpc_recv(s->rpc, &msg);
1177
1178
0
        long long int now = time_msec();
1179
0
        reconnect_receive_attempted(s->reconnect, now);
1180
0
        if (received_bytes != jsonrpc_get_received_bytes(s->rpc)) {
1181
            /* Data was successfully received.
1182
             *
1183
             * Previously we only counted receiving a full message as activity,
1184
             * but with large messages or a slow connection that policy could
1185
             * time out the session mid-message. */
1186
0
            reconnect_activity(s->reconnect, now);
1187
0
        }
1188
1189
0
        if (msg) {
1190
0
            if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
1191
                /* Echo request.  Send reply. */
1192
0
                struct jsonrpc_msg *reply;
1193
1194
0
                reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
1195
0
                jsonrpc_session_send(s, reply);
1196
0
            } else if (msg->type == JSONRPC_REPLY
1197
0
                       && msg->id && msg->id->type == JSON_STRING
1198
0
                       && !strcmp(msg->id->string, "echo")) {
1199
                /* It's a reply to our echo request.  Suppress it. */
1200
0
            } else {
1201
0
                return msg;
1202
0
            }
1203
0
            jsonrpc_msg_destroy(msg);
1204
0
        }
1205
0
    }
1206
0
    return NULL;
1207
0
}
1208
1209
void
1210
jsonrpc_session_recv_wait(struct jsonrpc_session *s)
1211
0
{
1212
0
    if (s->rpc) {
1213
0
        jsonrpc_recv_wait(s->rpc);
1214
0
    }
1215
0
}
1216
1217
/* Returns true if 's' is currently connected or trying to connect. */
1218
bool
1219
jsonrpc_session_is_alive(const struct jsonrpc_session *s)
1220
0
{
1221
0
    return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
1222
0
}
1223
1224
/* Returns true if 's' is currently connected. */
1225
bool
1226
jsonrpc_session_is_connected(const struct jsonrpc_session *s)
1227
0
{
1228
0
    return s->rpc != NULL;
1229
0
}
1230
1231
/* Returns a sequence number for 's'.  The sequence number increments every
1232
 * time 's' connects or disconnects.  Thus, a caller can use the change (or
1233
 * lack of change) in the sequence number to figure out whether the underlying
1234
 * connection is the same as before. */
1235
unsigned int
1236
jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
1237
0
{
1238
0
    return s->seqno;
1239
0
}
1240
1241
/* Returns the current status of 's'.  If 's' is NULL or is disconnected, this
1242
 * is 0, otherwise it is the status of the connection, as reported by
1243
 * jsonrpc_get_status(). */
1244
int
1245
jsonrpc_session_get_status(const struct jsonrpc_session *s)
1246
0
{
1247
0
    return s && s->rpc ? jsonrpc_get_status(s->rpc) : 0;
1248
0
}
1249
1250
/* Returns the last error reported on a connection by 's'.  The return value is
1251
 * 0 only if no connection made by 's' has ever encountered an error.  See
1252
 * jsonrpc_get_status() for return value interpretation. */
1253
int
1254
jsonrpc_session_get_last_error(const struct jsonrpc_session *s)
1255
0
{
1256
0
    return s->last_error;
1257
0
}
1258
1259
/* Populates 'stats' with statistics from 's'. */
1260
void
1261
jsonrpc_session_get_reconnect_stats(const struct jsonrpc_session *s,
1262
                                    struct reconnect_stats *stats)
1263
0
{
1264
0
    reconnect_get_stats(s->reconnect, time_msec(), stats);
1265
0
}
1266
1267
/* Enables 's' to reconnect to the peer if the connection drops. */
1268
void
1269
jsonrpc_session_enable_reconnect(struct jsonrpc_session *s)
1270
0
{
1271
0
    reconnect_set_max_tries(s->reconnect, UINT_MAX);
1272
0
    reconnect_set_backoff(s->reconnect, RECONNECT_DEFAULT_MIN_BACKOFF,
1273
0
                          RECONNECT_DEFAULT_MAX_BACKOFF);
1274
0
}
1275
1276
/* Forces 's' to drop its connection (if any) and reconnect. */
1277
void
1278
jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
1279
0
{
1280
0
    reconnect_force_reconnect(s->reconnect, time_msec());
1281
0
}
1282
1283
/* Resets the reconnect backoff for 's' by allowing as many free tries as the
1284
 * number of configured remotes.  This is to be used by upper layers before
1285
 * calling jsonrpc_session_force_reconnect() if backoff is undesirable.
1286
 */
1287
void
1288
jsonrpc_session_reset_backoff(struct jsonrpc_session *s)
1289
0
{
1290
0
    unsigned int free_tries = s->remotes.n;
1291
1292
0
    if (jsonrpc_session_is_connected(s)) {
1293
        /* The extra free try will be consumed when the current remote
1294
         * is disconnected.
1295
         */
1296
0
        free_tries++;
1297
0
    }
1298
0
    reconnect_set_backoff_free_tries(s->reconnect, free_tries);
1299
0
}
1300
1301
/* Sets 'max_backoff' as the maximum time, in milliseconds, to wait after a
1302
 * connection attempt fails before attempting to connect again. */
1303
void
1304
jsonrpc_session_set_max_backoff(struct jsonrpc_session *s, int max_backoff)
1305
0
{
1306
0
    reconnect_set_backoff(s->reconnect, 0, max_backoff);
1307
0
}
1308
1309
/* Sets the "probe interval" for 's' to 'probe_interval', in milliseconds.  If
1310
 * this is zero, it disables the connection keepalive feature.  Otherwise, if
1311
 * 's' is idle for 'probe_interval' milliseconds then 's' will send an echo
1312
 * request and, if no reply is received within an additional 'probe_interval'
1313
 * milliseconds, close the connection (then reconnect, if that feature is
1314
 * enabled). */
1315
void
1316
jsonrpc_session_set_probe_interval(struct jsonrpc_session *s,
1317
                                   int probe_interval)
1318
0
{
1319
0
    if (ovs_replay_is_active()) {
1320
0
        return;
1321
0
    }
1322
0
    reconnect_set_probe_interval(s->reconnect, probe_interval);
1323
0
}
1324
1325
/* Sets the DSCP value used for 's''s connection to 'dscp'.  If this is
1326
 * different from the DSCP value currently in use then the connection is closed
1327
 * and reconnected. */
1328
void
1329
jsonrpc_session_set_dscp(struct jsonrpc_session *s, uint8_t dscp)
1330
0
{
1331
0
    if (s->dscp != dscp) {
1332
0
        pstream_close(s->pstream);
1333
0
        s->pstream = NULL;
1334
1335
0
        s->dscp = dscp;
1336
0
        jsonrpc_session_force_reconnect(s);
1337
0
    }
1338
0
}
1339
1340
/* Sets thresholds for send backlog.  If send backlog contains more than
1341
 * 'max_n_msgs' messages or is larger than 'max_backlog_bytes' bytes,
1342
 * connection will be closed (then reconnected, if that feature is enabled). */
1343
void
1344
jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *s,
1345
                                      size_t max_n_msgs,
1346
                                      size_t max_backlog_bytes)
1347
0
{
1348
0
    s->max_n_msgs = max_n_msgs;
1349
0
    s->max_backlog_bytes = max_backlog_bytes;
1350
0
    if (s->rpc) {
1351
0
        jsonrpc_set_backlog_threshold(s->rpc, max_n_msgs, max_backlog_bytes);
1352
0
    }
1353
0
}