Coverage Report

Created: 2025-07-01 06:50

/src/openvswitch/lib/jsonrpc.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2009-2017 Nicira, Inc.
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at:
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
17
#include <config.h>
18
19
#include "jsonrpc.h"
20
21
#include <errno.h>
22
23
#include "byteq.h"
24
#include "coverage.h"
25
#include "openvswitch/dynamic-string.h"
26
#include "fatal-signal.h"
27
#include "openvswitch/json.h"
28
#include "openvswitch/list.h"
29
#include "openvswitch/ofpbuf.h"
30
#include "ovs-replay.h"
31
#include "ovs-thread.h"
32
#include "openvswitch/poll-loop.h"
33
#include "reconnect.h"
34
#include "stream.h"
35
#include "svec.h"
36
#include "timeval.h"
37
#include "openvswitch/vlog.h"
38
39
VLOG_DEFINE_THIS_MODULE(jsonrpc);
40
41
COVERAGE_DEFINE(jsonrpc_recv_incomplete);
42

43
struct jsonrpc {
44
    struct stream *stream;
45
    char *name;
46
    int status;
47
48
    /* Input. */
49
    struct byteq input;
50
    uint8_t input_buffer[4096];
51
    struct json_parser *parser;
52
53
    /* Output. */
54
    struct ovs_list output;     /* Contains "struct ofpbuf"s. */
55
    size_t output_count;        /* Number of elements in "output". */
56
    size_t backlog;
57
58
    /* Limits. */
59
    size_t max_output;          /* 'output_count' disconnection threshold. */
60
    size_t max_backlog;         /* 'backlog' disconnection threshold. */
61
};
62
63
/* Rate limit for error messages. */
64
static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
65
66
static struct jsonrpc_msg *jsonrpc_parse_received_message(struct jsonrpc *);
67
static void jsonrpc_cleanup(struct jsonrpc *);
68
static void jsonrpc_error(struct jsonrpc *, int error);
69
70
/* This is just the same as stream_open() except that it uses the default
71
 * JSONRPC port if none is specified. */
72
int
73
jsonrpc_stream_open(const char *name, struct stream **streamp, uint8_t dscp)
74
0
{
75
0
    return stream_open_with_default_port(name, OVSDB_PORT, streamp, dscp);
76
0
}
77
78
/* This is just the same as pstream_open() except that it uses the default
79
 * JSONRPC port if none is specified. */
80
int
81
jsonrpc_pstream_open(const char *name, struct pstream **pstreamp, uint8_t dscp)
82
0
{
83
0
    return pstream_open_with_default_port(name, OVSDB_PORT, pstreamp, dscp);
84
0
}
85
86
/* Returns a new JSON-RPC stream that uses 'stream' for input and output.  The
87
 * new jsonrpc object takes ownership of 'stream'. */
88
struct jsonrpc *
89
jsonrpc_open(struct stream *stream)
90
0
{
91
0
    struct jsonrpc *rpc;
92
93
0
    ovs_assert(stream != NULL);
94
95
0
    rpc = xzalloc(sizeof *rpc);
96
0
    rpc->name = xstrdup(stream_get_name(stream));
97
0
    rpc->stream = stream;
98
0
    byteq_init(&rpc->input, rpc->input_buffer, sizeof rpc->input_buffer);
99
0
    ovs_list_init(&rpc->output);
100
101
0
    return rpc;
102
0
}
103
104
/* Destroys 'rpc', closing the stream on which it is based, and frees its
105
 * memory. */
106
void
107
jsonrpc_close(struct jsonrpc *rpc)
108
0
{
109
0
    if (rpc) {
110
0
        jsonrpc_cleanup(rpc);
111
0
        free(rpc->name);
112
0
        free(rpc);
113
0
    }
114
0
}
115
116
/* Performs periodic maintenance on 'rpc', such as flushing output buffers. */
117
void
118
jsonrpc_run(struct jsonrpc *rpc)
119
0
{
120
0
    if (rpc->status) {
121
0
        return;
122
0
    }
123
124
0
    stream_run(rpc->stream);
125
0
    while (!ovs_list_is_empty(&rpc->output)) {
126
0
        struct ovs_list *head = ovs_list_front(&rpc->output);
127
0
        struct ofpbuf *buf = ofpbuf_from_list(head);
128
0
        int retval;
129
130
0
        retval = stream_send(rpc->stream, buf->data, buf->size);
131
0
        if (retval >= 0) {
132
0
            rpc->backlog -= retval;
133
0
            ofpbuf_pull(buf, retval);
134
0
            if (!buf->size) {
135
0
                ovs_list_remove(head);
136
0
                rpc->output_count--;
137
0
                ofpbuf_delete(buf);
138
0
            }
139
0
        } else {
140
0
            if (retval != -EAGAIN) {
141
0
                VLOG_WARN_RL(&rl, "%s: send error: %s",
142
0
                             rpc->name, ovs_strerror(-retval));
143
0
                jsonrpc_error(rpc, -retval);
144
0
            }
145
0
            break;
146
0
        }
147
0
    }
148
0
}
149
150
/* Arranges for the poll loop to wake up when 'rpc' needs to perform
151
 * maintenance activities. */
152
void
153
jsonrpc_wait(struct jsonrpc *rpc)
154
0
{
155
0
    if (!rpc->status) {
156
0
        stream_run_wait(rpc->stream);
157
0
        if (!ovs_list_is_empty(&rpc->output)) {
158
0
            stream_send_wait(rpc->stream);
159
0
        }
160
0
    }
161
0
}
162
163
/*
164
 * Returns the current status of 'rpc'.  The possible return values are:
165
 * - 0: no error yet
166
 * - >0: errno value
167
 * - EOF: end of file (remote end closed connection; not necessarily an error).
168
 *
169
 * When this function returns nonzero, 'rpc' is effectively out of
170
 * commission.  'rpc' will not receive any more messages and any further
171
 * messages that one attempts to send with 'rpc' will be discarded.  The
172
 * caller can keep 'rpc' around as long as it wants, but it's not going
173
 * to provide any more useful services.
174
 */
175
int
176
jsonrpc_get_status(const struct jsonrpc *rpc)
177
0
{
178
0
    return rpc->status;
179
0
}
180
181
/* Returns the number of bytes buffered by 'rpc' to be written to the
182
 * underlying stream.  Always returns 0 if 'rpc' has encountered an error or if
183
 * the remote end closed the connection. */
184
size_t
185
jsonrpc_get_backlog(const struct jsonrpc *rpc)
186
0
{
187
0
    return rpc->status ? 0 : rpc->backlog;
188
0
}
189
190
/* Sets thresholds for send backlog.  If send backlog contains more than
191
 * 'max_n_msgs' messages or is larger than 'max_backlog_bytes' bytes,
192
 * connection will be dropped. */
193
void
194
jsonrpc_set_backlog_threshold(struct jsonrpc *rpc,
195
                              size_t max_n_msgs, size_t max_backlog_bytes)
196
0
{
197
0
    rpc->max_output = max_n_msgs;
198
0
    rpc->max_backlog = max_backlog_bytes;
199
0
}
200
201
/* Returns the number of bytes that have been received on 'rpc''s underlying
202
 * stream.  (The value wraps around if it exceeds UINT_MAX.) */
203
unsigned int
204
jsonrpc_get_received_bytes(const struct jsonrpc *rpc)
205
0
{
206
0
    return rpc->input.head;
207
0
}
208
209
/* Returns 'rpc''s name, that is, the name returned by stream_get_name() for
210
 * the stream underlying 'rpc' when 'rpc' was created. */
211
const char *
212
jsonrpc_get_name(const struct jsonrpc *rpc)
213
0
{
214
0
    return rpc->name;
215
0
}
216
217
static void
218
jsonrpc_log_msg(const struct jsonrpc *rpc, const char *title,
219
                const struct jsonrpc_msg *msg)
220
0
{
221
0
    if (VLOG_IS_DBG_ENABLED()) {
222
0
        struct ds s = DS_EMPTY_INITIALIZER;
223
0
        if (msg->method) {
224
0
            ds_put_format(&s, ", method=\"%s\"", msg->method);
225
0
        }
226
0
        if (msg->params) {
227
0
            ds_put_cstr(&s, ", params=");
228
0
            json_to_ds(msg->params, JSSF_SORT, &s);
229
0
        }
230
0
        if (msg->result) {
231
0
            ds_put_cstr(&s, ", result=");
232
0
            json_to_ds(msg->result, JSSF_SORT, &s);
233
0
        }
234
0
        if (msg->error) {
235
0
            ds_put_cstr(&s, ", error=");
236
0
            json_to_ds(msg->error, JSSF_SORT, &s);
237
0
        }
238
0
        if (msg->id) {
239
0
            ds_put_cstr(&s, ", id=");
240
0
            json_to_ds(msg->id, JSSF_SORT, &s);
241
0
        }
242
0
        VLOG_DBG("%s: %s %s%s", rpc->name, title,
243
0
                 jsonrpc_msg_type_to_string(msg->type), ds_cstr(&s));
244
0
        ds_destroy(&s);
245
0
    }
246
0
}
247
248
/* Schedules 'msg' to be sent on 'rpc' and returns 'rpc''s status (as with
249
 * jsonrpc_get_status()).
250
 *
251
 * If 'msg' cannot be sent immediately, it is appended to a buffer.  The caller
252
 * is responsible for ensuring that the amount of buffered data is somehow
253
 * limited.  (jsonrpc_get_backlog() returns the amount of data currently
254
 * buffered in 'rpc'.)
255
 *
256
 * Always takes ownership of 'msg', regardless of success. */
257
int
258
jsonrpc_send(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
259
0
{
260
0
    struct ofpbuf *buf;
261
0
    struct json *json;
262
0
    struct ds ds = DS_EMPTY_INITIALIZER;
263
0
    size_t length;
264
265
0
    if (rpc->status) {
266
0
        jsonrpc_msg_destroy(msg);
267
0
        return rpc->status;
268
0
    }
269
270
0
    jsonrpc_log_msg(rpc, "send", msg);
271
272
0
    json = jsonrpc_msg_to_json(msg);
273
0
    json_to_ds(json, 0, &ds);
274
0
    length = ds.length;
275
0
    json_destroy(json);
276
277
0
    buf = xmalloc(sizeof *buf);
278
0
    ofpbuf_use_ds(buf, &ds);
279
0
    ovs_list_push_back(&rpc->output, &buf->list_node);
280
0
    rpc->output_count++;
281
0
    rpc->backlog += length;
282
283
0
    if (rpc->output_count >= 50) {
284
0
        static struct vlog_rate_limit bl_rl = VLOG_RATE_LIMIT_INIT(5, 5);
285
0
        bool disconnect = false;
286
287
0
        VLOG_INFO_RL(&bl_rl, "excessive sending backlog, jsonrpc: %s, num of"
288
0
                     " msgs: %"PRIuSIZE", backlog: %"PRIuSIZE".", rpc->name,
289
0
                     rpc->output_count, rpc->backlog);
290
0
        if (rpc->max_output && rpc->output_count > rpc->max_output) {
291
0
            disconnect = true;
292
0
            VLOG_WARN("sending backlog exceeded maximum number of messages (%"
293
0
                      PRIuSIZE" > %"PRIuSIZE"), disconnecting, jsonrpc: %s.",
294
0
                      rpc->output_count, rpc->max_output, rpc->name);
295
0
        } else if (rpc->max_backlog && rpc->backlog > rpc->max_backlog) {
296
0
            disconnect = true;
297
0
            VLOG_WARN("sending backlog exceeded maximum size (%"PRIuSIZE" > %"
298
0
                      PRIuSIZE" bytes), disconnecting, jsonrpc: %s.",
299
0
                      rpc->backlog, rpc->max_backlog, rpc->name);
300
0
        }
301
0
        if (disconnect) {
302
0
            jsonrpc_error(rpc, E2BIG);
303
0
        }
304
0
    }
305
306
0
    if (rpc->backlog == length) {
307
0
        jsonrpc_run(rpc);
308
0
    }
309
0
    return rpc->status;
310
0
}
311
312
/* Attempts to receive a message from 'rpc'.
313
 *
314
 * If successful, stores the received message in '*msgp' and returns 0.  The
315
 * caller takes ownership of '*msgp' and must eventually destroy it with
316
 * jsonrpc_msg_destroy().
317
 *
318
 * Otherwise, stores NULL in '*msgp' and returns one of the following:
319
 *
320
 *   - EAGAIN: No message has been received.
321
 *
322
 *   - EOF: The remote end closed the connection gracefully.
323
 *
324
 *   - Otherwise an errno value that represents a JSON-RPC protocol violation
325
 *     or another error fatal to the connection.  'rpc' will not send or
326
 *     receive any more messages.
327
 */
328
int
329
jsonrpc_recv(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
330
0
{
331
0
    int i;
332
333
0
    *msgp = NULL;
334
0
    if (rpc->status) {
335
0
        return rpc->status;
336
0
    }
337
338
0
    for (i = 0; i < 50; i++) {
339
0
        size_t n, used;
340
341
        /* Fill our input buffer if it's empty. */
342
0
        if (byteq_is_empty(&rpc->input)) {
343
0
            size_t chunk;
344
0
            int retval;
345
346
0
            byteq_fast_forward(&rpc->input);
347
0
            chunk = byteq_headroom(&rpc->input);
348
0
            retval = stream_recv(rpc->stream, byteq_head(&rpc->input), chunk);
349
0
            if (retval < 0) {
350
0
                if (retval == -EAGAIN) {
351
0
                    return EAGAIN;
352
0
                } else {
353
0
                    VLOG_WARN_RL(&rl, "%s: receive error: %s",
354
0
                                 rpc->name, ovs_strerror(-retval));
355
0
                    jsonrpc_error(rpc, -retval);
356
0
                    return rpc->status;
357
0
                }
358
0
            } else if (retval == 0) {
359
0
                jsonrpc_error(rpc, EOF);
360
0
                return EOF;
361
0
            }
362
0
            byteq_advance_head(&rpc->input, retval);
363
0
        }
364
365
        /* We have some input.  Feed it into the JSON parser. */
366
0
        if (!rpc->parser) {
367
0
            rpc->parser = json_parser_create(0);
368
0
        }
369
0
        n = byteq_tailroom(&rpc->input);
370
0
        used = json_parser_feed(rpc->parser,
371
0
                                (char *) byteq_tail(&rpc->input), n);
372
0
        byteq_advance_tail(&rpc->input, used);
373
374
        /* If we have complete JSON, attempt to parse it as JSON-RPC. */
375
0
        if (json_parser_is_done(rpc->parser)) {
376
0
            *msgp = jsonrpc_parse_received_message(rpc);
377
0
            if (*msgp) {
378
0
                return 0;
379
0
            }
380
381
0
            if (rpc->status) {
382
0
                const struct byteq *q = &rpc->input;
383
0
                if (q->head <= q->size) {
384
0
                    stream_report_content(q->buffer, q->head, STREAM_JSONRPC,
385
0
                                          &this_module, rpc->name);
386
0
                }
387
0
                return rpc->status;
388
0
            }
389
0
        }
390
0
    }
391
392
    /* We tried hard but didn't get a complete JSON message within the above
393
     * iterations.  We want to know how often we abort for this reason. */
394
0
    COVERAGE_INC(jsonrpc_recv_incomplete);
395
396
0
    return EAGAIN;
397
0
}
398
399
/* Causes the poll loop to wake up when jsonrpc_recv() may return a value other
400
 * than EAGAIN. */
401
void
402
jsonrpc_recv_wait(struct jsonrpc *rpc)
403
0
{
404
0
    if (rpc->status || !byteq_is_empty(&rpc->input)) {
405
0
        poll_immediate_wake_at(rpc->name);
406
0
    } else {
407
0
        stream_recv_wait(rpc->stream);
408
0
    }
409
0
}
410
411
/* Sends 'msg' on 'rpc' and waits for it to be successfully queued to the
412
 * underlying stream.  Returns 0 if 'msg' was sent successfully, otherwise a
413
 * status value (see jsonrpc_get_status()).
414
 *
415
 * Always takes ownership of 'msg', regardless of success. */
416
int
417
jsonrpc_send_block(struct jsonrpc *rpc, struct jsonrpc_msg *msg)
418
0
{
419
0
    int error;
420
421
0
    fatal_signal_run();
422
423
0
    error = jsonrpc_send(rpc, msg);
424
0
    if (error) {
425
0
        return error;
426
0
    }
427
428
0
    for (;;) {
429
0
        jsonrpc_run(rpc);
430
0
        if (ovs_list_is_empty(&rpc->output) || rpc->status) {
431
0
            return rpc->status;
432
0
        }
433
0
        jsonrpc_wait(rpc);
434
0
        poll_block();
435
0
    }
436
0
}
437
438
/* Waits for a message to be received on 'rpc'.  Same semantics as
439
 * jsonrpc_recv() except that EAGAIN will never be returned. */
440
int
441
jsonrpc_recv_block(struct jsonrpc *rpc, struct jsonrpc_msg **msgp)
442
0
{
443
0
    for (;;) {
444
0
        int error = jsonrpc_recv(rpc, msgp);
445
0
        if (error != EAGAIN) {
446
0
            fatal_signal_run();
447
0
            return error;
448
0
        }
449
450
0
        jsonrpc_run(rpc);
451
0
        jsonrpc_wait(rpc);
452
0
        jsonrpc_recv_wait(rpc);
453
0
        poll_block();
454
0
    }
455
0
}
456
457
/* Sends 'request' to 'rpc' then waits for a reply.  The return value is 0 if
458
 * successful, in which case '*replyp' is set to the reply, which the caller
459
 * must eventually free with jsonrpc_msg_destroy().  Otherwise returns a status
460
 * value (see jsonrpc_get_status()).
461
 *
462
 * Discards any message received on 'rpc' that is not a reply to 'request'
463
 * (based on message id).
464
 *
465
 * Always takes ownership of 'request', regardless of success. */
466
int
467
jsonrpc_transact_block(struct jsonrpc *rpc, struct jsonrpc_msg *request,
468
                       struct jsonrpc_msg **replyp)
469
0
{
470
0
    struct jsonrpc_msg *reply = NULL;
471
0
    struct json *id;
472
0
    int error;
473
474
0
    id = json_clone(request->id);
475
0
    error = jsonrpc_send_block(rpc, request);
476
0
    if (!error) {
477
0
        for (;;) {
478
0
            error = jsonrpc_recv_block(rpc, &reply);
479
0
            if (error) {
480
0
                break;
481
0
            }
482
0
            if ((reply->type == JSONRPC_REPLY || reply->type == JSONRPC_ERROR)
483
0
                && json_equal(id, reply->id)) {
484
0
                break;
485
0
            }
486
0
            jsonrpc_msg_destroy(reply);
487
0
        }
488
0
    }
489
0
    *replyp = error ? NULL : reply;
490
0
    json_destroy(id);
491
0
    return error;
492
0
}
493
494
/* Attempts to parse the content of 'rpc->parser' (which is complete JSON) as a
495
 * JSON-RPC message.  If successful, returns the JSON-RPC message.  On failure,
496
 * signals an error on 'rpc' with jsonrpc_error() and returns NULL. */
497
static struct jsonrpc_msg *
498
jsonrpc_parse_received_message(struct jsonrpc *rpc)
499
0
{
500
0
    struct jsonrpc_msg *msg;
501
0
    struct json *json;
502
0
    char *error;
503
504
0
    json = json_parser_finish(rpc->parser);
505
0
    rpc->parser = NULL;
506
0
    if (json->type == JSON_STRING) {
507
0
        VLOG_WARN_RL(&rl, "%s: error parsing stream: %s",
508
0
                     rpc->name, json_string(json));
509
0
        jsonrpc_error(rpc, EPROTO);
510
0
        json_destroy(json);
511
0
        return NULL;
512
0
    }
513
514
0
    error = jsonrpc_msg_from_json(json, &msg);
515
0
    if (error) {
516
0
        VLOG_WARN_RL(&rl, "%s: received bad JSON-RPC message: %s",
517
0
                     rpc->name, error);
518
0
        free(error);
519
0
        jsonrpc_error(rpc, EPROTO);
520
0
        return NULL;
521
0
    }
522
523
0
    jsonrpc_log_msg(rpc, "received", msg);
524
0
    return msg;
525
0
}
526
527
static void
528
jsonrpc_error(struct jsonrpc *rpc, int error)
529
0
{
530
0
    ovs_assert(error);
531
0
    if (!rpc->status) {
532
0
        rpc->status = error;
533
0
        jsonrpc_cleanup(rpc);
534
0
    }
535
0
}
536
537
static void
538
jsonrpc_cleanup(struct jsonrpc *rpc)
539
0
{
540
0
    stream_close(rpc->stream);
541
0
    rpc->stream = NULL;
542
543
0
    json_parser_abort(rpc->parser);
544
0
    rpc->parser = NULL;
545
546
0
    ofpbuf_list_delete(&rpc->output);
547
0
    rpc->backlog = 0;
548
0
    rpc->output_count = 0;
549
0
}
550

551
static struct jsonrpc_msg *
552
jsonrpc_create(enum jsonrpc_msg_type type, const char *method,
553
                struct json *params, struct json *result, struct json *error,
554
                struct json *id)
555
0
{
556
0
    struct jsonrpc_msg *msg = xmalloc(sizeof *msg);
557
0
    msg->type = type;
558
0
    msg->method = nullable_xstrdup(method);
559
0
    msg->params = params;
560
0
    msg->result = result;
561
0
    msg->error = error;
562
0
    msg->id = id;
563
0
    return msg;
564
0
}
565
566
static struct json *
567
jsonrpc_create_id(void)
568
0
{
569
0
    static atomic_count next_id = ATOMIC_COUNT_INIT(0);
570
0
    unsigned int id;
571
572
0
    id = atomic_count_inc(&next_id);
573
0
    return json_integer_create(id);
574
0
}
575
576
struct jsonrpc_msg *
577
jsonrpc_create_request(const char *method, struct json *params,
578
                       struct json **idp)
579
0
{
580
0
    struct json *id = jsonrpc_create_id();
581
0
    if (idp) {
582
0
        *idp = json_clone(id);
583
0
    }
584
0
    return jsonrpc_create(JSONRPC_REQUEST, method, params, NULL, NULL, id);
585
0
}
586
587
struct jsonrpc_msg *
588
jsonrpc_create_notify(const char *method, struct json *params)
589
0
{
590
0
    return jsonrpc_create(JSONRPC_NOTIFY, method, params, NULL, NULL, NULL);
591
0
}
592
593
struct jsonrpc_msg *
594
jsonrpc_create_reply(struct json *result, const struct json *id)
595
0
{
596
0
    return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, result, NULL,
597
0
                           json_clone(id));
598
0
}
599
600
struct jsonrpc_msg *
601
jsonrpc_create_error(struct json *error, const struct json *id)
602
0
{
603
0
    return jsonrpc_create(JSONRPC_REPLY, NULL, NULL, NULL, error,
604
0
                           json_clone(id));
605
0
}
606
607
struct jsonrpc_msg *
608
jsonrpc_msg_clone(const struct jsonrpc_msg *old)
609
0
{
610
0
    return jsonrpc_create(old->type, old->method,
611
0
                          json_nullable_clone(old->params),
612
0
                          json_nullable_clone(old->result),
613
0
                          json_nullable_clone(old->error),
614
0
                          json_nullable_clone(old->id));
615
0
}
616
617
const char *
618
jsonrpc_msg_type_to_string(enum jsonrpc_msg_type type)
619
0
{
620
0
    switch (type) {
621
0
    case JSONRPC_REQUEST:
622
0
        return "request";
623
624
0
    case JSONRPC_NOTIFY:
625
0
        return "notification";
626
627
0
    case JSONRPC_REPLY:
628
0
        return "reply";
629
630
0
    case JSONRPC_ERROR:
631
0
        return "error";
632
0
    }
633
0
    return "(null)";
634
0
}
635
636
char *
637
jsonrpc_msg_is_valid(const struct jsonrpc_msg *m)
638
0
{
639
0
    const char *type_name;
640
0
    unsigned int pattern;
641
642
0
    if (m->params && m->params->type != JSON_ARRAY) {
643
0
        return xstrdup("\"params\" must be JSON array");
644
0
    }
645
646
0
    switch (m->type) {
647
0
    case JSONRPC_REQUEST:
648
0
        pattern = 0x11001;
649
0
        break;
650
651
0
    case JSONRPC_NOTIFY:
652
0
        pattern = 0x11000;
653
0
        break;
654
655
0
    case JSONRPC_REPLY:
656
0
        pattern = 0x00101;
657
0
        break;
658
659
0
    case JSONRPC_ERROR:
660
0
        pattern = 0x00011;
661
0
        break;
662
663
0
    default:
664
0
        return xasprintf("invalid JSON-RPC message type %d", m->type);
665
0
    }
666
667
0
    type_name = jsonrpc_msg_type_to_string(m->type);
668
0
    if ((m->method != NULL) != ((pattern & 0x10000) != 0)) {
669
0
        return xasprintf("%s must%s have \"method\"",
670
0
                         type_name, (pattern & 0x10000) ? "" : " not");
671
672
0
    }
673
0
    if ((m->params != NULL) != ((pattern & 0x1000) != 0)) {
674
0
        return xasprintf("%s must%s have \"params\"",
675
0
                         type_name, (pattern & 0x1000) ? "" : " not");
676
677
0
    }
678
0
    if ((m->result != NULL) != ((pattern & 0x100) != 0)) {
679
0
        return xasprintf("%s must%s have \"result\"",
680
0
                         type_name, (pattern & 0x100) ? "" : " not");
681
682
0
    }
683
0
    if ((m->error != NULL) != ((pattern & 0x10) != 0)) {
684
0
        return xasprintf("%s must%s have \"error\"",
685
0
                         type_name, (pattern & 0x10) ? "" : " not");
686
687
0
    }
688
0
    if ((m->id != NULL) != ((pattern & 0x1) != 0)) {
689
0
        return xasprintf("%s must%s have \"id\"",
690
0
                         type_name, (pattern & 0x1) ? "" : " not");
691
692
0
    }
693
0
    return NULL;
694
0
}
695
696
void
697
jsonrpc_msg_destroy(struct jsonrpc_msg *m)
698
0
{
699
0
    if (m) {
700
0
        free(m->method);
701
0
        json_destroy(m->params);
702
0
        json_destroy(m->result);
703
0
        json_destroy(m->error);
704
0
        json_destroy(m->id);
705
0
        free(m);
706
0
    }
707
0
}
708
709
static struct json *
710
null_from_json_null(struct json *json)
711
0
{
712
0
    if (json && json->type == JSON_NULL) {
713
0
        json_destroy(json);
714
0
        return NULL;
715
0
    }
716
0
    return json;
717
0
}
718
719
char *
720
jsonrpc_msg_from_json(struct json *json, struct jsonrpc_msg **msgp)
721
0
{
722
0
    struct json *method = NULL;
723
0
    struct jsonrpc_msg *msg = NULL;
724
0
    struct shash *object;
725
0
    char *error;
726
727
0
    if (json->type != JSON_OBJECT) {
728
0
        error = xstrdup("message is not a JSON object");
729
0
        goto exit;
730
0
    }
731
0
    object = json_object(json);
732
733
0
    method = shash_find_and_delete(object, "method");
734
0
    if (method && method->type != JSON_STRING) {
735
0
        error = xstrdup("method is not a JSON string");
736
0
        goto exit;
737
0
    }
738
739
0
    msg = xzalloc(sizeof *msg);
740
0
    msg->method = method ? xstrdup(json_string(method)) : NULL;
741
0
    msg->params = null_from_json_null(shash_find_and_delete(object, "params"));
742
0
    msg->result = null_from_json_null(shash_find_and_delete(object, "result"));
743
0
    msg->error = null_from_json_null(shash_find_and_delete(object, "error"));
744
0
    msg->id = null_from_json_null(shash_find_and_delete(object, "id"));
745
0
    msg->type = (msg->result ? JSONRPC_REPLY
746
0
                 : msg->error ? JSONRPC_ERROR
747
0
                 : msg->id ? JSONRPC_REQUEST
748
0
                 : JSONRPC_NOTIFY);
749
0
    if (!shash_is_empty(object)) {
750
0
        error = xasprintf("message has unexpected member \"%s\"",
751
0
                          shash_first(object)->name);
752
0
        goto exit;
753
0
    }
754
0
    error = jsonrpc_msg_is_valid(msg);
755
0
    if (error) {
756
0
        goto exit;
757
0
    }
758
759
0
exit:
760
0
    json_destroy(method);
761
0
    json_destroy(json);
762
0
    if (error) {
763
0
        jsonrpc_msg_destroy(msg);
764
0
        msg = NULL;
765
0
    }
766
0
    *msgp = msg;
767
0
    return error;
768
0
}
769
770
/* Returns 'm' converted to JSON suitable for sending as a JSON-RPC message.
771
 *
772
 * Consumes and destroys 'm'. */
773
struct json *
774
jsonrpc_msg_to_json(struct jsonrpc_msg *m)
775
0
{
776
0
    struct json *json = json_object_create();
777
778
0
    if (m->method) {
779
0
        json_object_put(json, "method", json_string_create_nocopy(m->method));
780
0
    }
781
782
0
    if (m->params) {
783
0
        json_object_put(json, "params", m->params);
784
0
    }
785
786
0
    if (m->result) {
787
0
        json_object_put(json, "result", m->result);
788
0
    } else if (m->type == JSONRPC_ERROR) {
789
0
        json_object_put(json, "result", json_null_create());
790
0
    }
791
792
0
    if (m->error) {
793
0
        json_object_put(json, "error", m->error);
794
0
    } else if (m->type == JSONRPC_REPLY) {
795
0
        json_object_put(json, "error", json_null_create());
796
0
    }
797
798
0
    if (m->id) {
799
0
        json_object_put(json, "id", m->id);
800
0
    } else if (m->type == JSONRPC_NOTIFY) {
801
0
        json_object_put(json, "id", json_null_create());
802
0
    }
803
804
0
    free(m);
805
806
0
    return json;
807
0
}
808
809
char *
810
jsonrpc_msg_to_string(const struct jsonrpc_msg *m)
811
0
{
812
0
    struct jsonrpc_msg *copy = jsonrpc_msg_clone(m);
813
0
    struct json *json = jsonrpc_msg_to_json(copy);
814
0
    char *s = json_to_string(json, JSSF_SORT);
815
0
    json_destroy(json);
816
0
    return s;
817
0
}
818

819
/* A JSON-RPC session with reconnection. */
820
821
struct jsonrpc_session {
822
    struct svec remotes;
823
    size_t next_remote;
824
825
    struct reconnect *reconnect;
826
    struct jsonrpc *rpc;
827
    struct stream *stream;
828
    struct pstream *pstream;
829
    int last_error;
830
    unsigned int seqno;
831
    uint8_t dscp;
832
833
    /* Limits for jsonrpc. */
834
    size_t max_n_msgs;
835
    size_t max_backlog_bytes;
836
};
837
838
static void
839
jsonrpc_session_pick_remote(struct jsonrpc_session *s)
840
0
{
841
0
    reconnect_set_name(s->reconnect,
842
0
                       s->remotes.names[s->next_remote++ % s->remotes.n]);
843
0
}
844
845
/* Creates and returns a jsonrpc_session to 'name', which should be a string
846
 * acceptable to stream_open() or pstream_open().
847
 *
848
 * If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new
849
 * jsonrpc_session connects to 'name'.  If 'retry' is true, then the new
850
 * session connects and reconnects to 'name', with backoff.  If 'retry' is
851
 * false, the new session will only try to connect once and after a connection
852
 * failure or a disconnection jsonrpc_session_is_alive() will return false for
853
 * the new session.
854
 *
855
 * If 'name' is a passive connection method, e.g. "ptcp:", the new
856
 * jsonrpc_session listens for connections to 'name'.  It maintains at most one
857
 * connection at any given time.  Any new connection causes the previous one
858
 * (if any) to be dropped. */
859
struct jsonrpc_session *
860
jsonrpc_session_open(const char *name, bool retry)
861
0
{
862
0
    const struct svec remotes = { .names = (char **) &name, .n = 1 };
863
0
    return jsonrpc_session_open_multiple(&remotes, retry);
864
0
}
865
866
struct jsonrpc_session *
867
jsonrpc_session_open_multiple(const struct svec *remotes, bool retry)
868
0
{
869
0
    struct jsonrpc_session *s;
870
871
0
    s = xmalloc(sizeof *s);
872
873
    /* Set 'n' remotes from 'names'. */
874
0
    svec_clone(&s->remotes, remotes);
875
0
    if (!s->remotes.n) {
876
0
        svec_add(&s->remotes, "invalid:");
877
0
    }
878
0
    s->next_remote = 0;
879
880
0
    s->reconnect = reconnect_create(time_msec());
881
0
    jsonrpc_session_pick_remote(s);
882
0
    reconnect_enable(s->reconnect, time_msec());
883
0
    reconnect_set_backoff_free_tries(s->reconnect, remotes->n);
884
0
    s->rpc = NULL;
885
0
    s->stream = NULL;
886
0
    s->pstream = NULL;
887
0
    s->seqno = 0;
888
0
    s->dscp = 0;
889
0
    s->last_error = 0;
890
891
0
    jsonrpc_session_set_backlog_threshold(s, 0, 0);
892
893
0
    const char *name = reconnect_get_name(s->reconnect);
894
0
    if (!pstream_verify_name(name)) {
895
0
        reconnect_set_passive(s->reconnect, true, time_msec());
896
0
    } else if (!retry) {
897
0
        reconnect_set_max_tries(s->reconnect, remotes->n);
898
0
        reconnect_set_backoff(s->reconnect, INT_MAX, INT_MAX);
899
0
    }
900
901
0
    if (!stream_or_pstream_needs_probes(name) || ovs_replay_is_active()) {
902
0
        reconnect_set_probe_interval(s->reconnect, 0);
903
0
    }
904
905
0
    return s;
906
0
}
907
908
/* Creates and returns a jsonrpc_session that is initially connected to
909
 * 'jsonrpc'.  If the connection is dropped, it will not be reconnected.
910
 *
911
 * On the assumption that such connections are likely to be short-lived
912
 * (e.g. from ovs-vsctl), informational logging for them is suppressed. */
913
struct jsonrpc_session *
914
jsonrpc_session_open_unreliably(struct jsonrpc *jsonrpc, uint8_t dscp)
915
0
{
916
0
    struct jsonrpc_session *s;
917
918
0
    s = xmalloc(sizeof *s);
919
0
    svec_init(&s->remotes);
920
0
    svec_add(&s->remotes, jsonrpc_get_name(jsonrpc));
921
0
    s->next_remote = 0;
922
0
    s->reconnect = reconnect_create(time_msec());
923
0
    reconnect_set_quiet(s->reconnect, true);
924
0
    reconnect_set_name(s->reconnect, jsonrpc_get_name(jsonrpc));
925
0
    reconnect_set_max_tries(s->reconnect, 0);
926
0
    reconnect_connected(s->reconnect, time_msec());
927
928
0
    if (ovs_replay_is_active()) {
929
0
        reconnect_set_probe_interval(s->reconnect, 0);
930
0
    }
931
932
0
    s->dscp = dscp;
933
0
    s->rpc = jsonrpc;
934
0
    s->stream = NULL;
935
0
    s->pstream = NULL;
936
0
    s->seqno = 1;
937
938
0
    jsonrpc_session_set_backlog_threshold(s, 0, 0);
939
0
    return s;
940
0
}
941
942
void
943
jsonrpc_session_close(struct jsonrpc_session *s)
944
0
{
945
0
    if (s) {
946
0
        jsonrpc_close(s->rpc);
947
0
        reconnect_destroy(s->reconnect);
948
0
        stream_close(s->stream);
949
0
        pstream_close(s->pstream);
950
0
        svec_destroy(&s->remotes);
951
0
        free(s);
952
0
    }
953
0
}
954
955
struct jsonrpc *
956
jsonrpc_session_steal(struct jsonrpc_session *s)
957
0
{
958
0
    struct jsonrpc *rpc = s->rpc;
959
0
    s->rpc = NULL;
960
0
    jsonrpc_session_close(s);
961
0
    return rpc;
962
0
}
963
964
void
965
jsonrpc_session_replace(struct jsonrpc_session *s, struct jsonrpc *rpc)
966
0
{
967
0
    if (s->rpc) {
968
0
        jsonrpc_close(s->rpc);
969
0
    }
970
0
    s->rpc = rpc;
971
0
    if (s->rpc) {
972
0
        reconnect_set_name(s->reconnect, jsonrpc_get_name(s->rpc));
973
0
        reconnect_connected(s->reconnect, time_msec());
974
0
    }
975
0
}
976
977
static void
978
jsonrpc_session_disconnect(struct jsonrpc_session *s)
979
0
{
980
0
    if (s->rpc) {
981
0
        jsonrpc_error(s->rpc, EOF);
982
0
        jsonrpc_close(s->rpc);
983
0
        s->rpc = NULL;
984
0
    } else if (s->stream) {
985
0
        stream_close(s->stream);
986
0
        s->stream = NULL;
987
0
    } else {
988
0
        return;
989
0
    }
990
991
0
    s->seqno++;
992
0
    jsonrpc_session_pick_remote(s);
993
0
}
994
995
static void
996
jsonrpc_session_connect(struct jsonrpc_session *s)
997
0
{
998
0
    const char *name = reconnect_get_name(s->reconnect);
999
0
    int error;
1000
1001
0
    jsonrpc_session_disconnect(s);
1002
0
    if (!reconnect_is_passive(s->reconnect)) {
1003
0
        error = jsonrpc_stream_open(name, &s->stream, s->dscp);
1004
0
        if (!error) {
1005
0
            reconnect_connecting(s->reconnect, time_msec());
1006
0
        } else {
1007
0
            s->last_error = error;
1008
0
        }
1009
0
    } else {
1010
0
        error = s->pstream ? 0 : jsonrpc_pstream_open(name, &s->pstream,
1011
0
                                                      s->dscp);
1012
0
        if (!error) {
1013
0
            reconnect_listening(s->reconnect, time_msec());
1014
0
        }
1015
0
    }
1016
1017
0
    if (error) {
1018
0
        reconnect_connect_failed(s->reconnect, time_msec(), error);
1019
0
        jsonrpc_session_pick_remote(s);
1020
0
    }
1021
0
}
1022
1023
void
1024
jsonrpc_session_run(struct jsonrpc_session *s)
1025
0
{
1026
0
    if (s->pstream) {
1027
0
        struct stream *stream;
1028
0
        int error;
1029
1030
0
        error = pstream_accept(s->pstream, &stream);
1031
0
        if (!error) {
1032
0
            if (s->rpc || s->stream) {
1033
0
                VLOG_INFO_RL(&rl,
1034
0
                             "%s: new connection replacing active connection",
1035
0
                             reconnect_get_name(s->reconnect));
1036
0
                jsonrpc_session_disconnect(s);
1037
0
            }
1038
0
            reconnect_connected(s->reconnect, time_msec());
1039
0
            s->rpc = jsonrpc_open(stream);
1040
0
            jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
1041
0
                                                  s->max_backlog_bytes);
1042
0
            s->seqno++;
1043
0
        } else if (error != EAGAIN) {
1044
0
            reconnect_listen_error(s->reconnect, time_msec(), error);
1045
0
            pstream_close(s->pstream);
1046
0
            s->pstream = NULL;
1047
0
        }
1048
0
    }
1049
1050
0
    if (s->rpc) {
1051
0
        size_t backlog;
1052
0
        int error;
1053
1054
0
        backlog = jsonrpc_get_backlog(s->rpc);
1055
0
        jsonrpc_run(s->rpc);
1056
0
        if (jsonrpc_get_backlog(s->rpc) < backlog) {
1057
            /* Data previously caught in a queue was successfully sent (or
1058
             * there's an error, which we'll catch below.)
1059
             *
1060
             * We don't count data that is successfully sent immediately as
1061
             * activity, because there's a lot of queuing downstream from us,
1062
             * which means that we can push a lot of data into a connection
1063
             * that has stalled and won't ever recover.
1064
             */
1065
0
            reconnect_activity(s->reconnect, time_msec());
1066
0
        }
1067
1068
0
        error = jsonrpc_get_status(s->rpc);
1069
0
        if (error) {
1070
0
            reconnect_disconnected(s->reconnect, time_msec(), error);
1071
0
            jsonrpc_session_disconnect(s);
1072
0
            s->last_error = error;
1073
0
        }
1074
0
    } else if (s->stream) {
1075
0
        int error;
1076
1077
0
        stream_run(s->stream);
1078
0
        error = stream_connect(s->stream);
1079
0
        if (!error) {
1080
0
            reconnect_connected(s->reconnect, time_msec());
1081
0
            s->rpc = jsonrpc_open(s->stream);
1082
0
            jsonrpc_set_backlog_threshold(s->rpc, s->max_n_msgs,
1083
0
                                                  s->max_backlog_bytes);
1084
0
            s->stream = NULL;
1085
0
            s->seqno++;
1086
0
        } else if (error != EAGAIN) {
1087
0
            reconnect_connect_failed(s->reconnect, time_msec(), error);
1088
0
            jsonrpc_session_pick_remote(s);
1089
0
            stream_close(s->stream);
1090
0
            s->stream = NULL;
1091
0
            s->last_error = error;
1092
0
        }
1093
0
    }
1094
1095
0
    switch (reconnect_run(s->reconnect, time_msec())) {
1096
0
    case RECONNECT_CONNECT:
1097
0
        jsonrpc_session_connect(s);
1098
0
        break;
1099
1100
0
    case RECONNECT_DISCONNECT:
1101
0
        reconnect_disconnected(s->reconnect, time_msec(), 0);
1102
0
        jsonrpc_session_disconnect(s);
1103
0
        break;
1104
1105
0
    case RECONNECT_PROBE:
1106
0
        if (s->rpc) {
1107
0
            struct json *params;
1108
0
            struct jsonrpc_msg *request;
1109
1110
0
            params = json_array_create_empty();
1111
0
            request = jsonrpc_create_request("echo", params, NULL);
1112
0
            json_destroy(request->id);
1113
0
            request->id = json_string_create("echo");
1114
0
            jsonrpc_send(s->rpc, request);
1115
0
        }
1116
0
        break;
1117
0
    }
1118
0
}
1119
1120
void
1121
jsonrpc_session_wait(struct jsonrpc_session *s)
1122
0
{
1123
0
    if (s->rpc) {
1124
0
        jsonrpc_wait(s->rpc);
1125
0
    } else if (s->stream) {
1126
0
        stream_run_wait(s->stream);
1127
0
        stream_connect_wait(s->stream);
1128
0
    }
1129
0
    if (s->pstream) {
1130
0
        pstream_wait(s->pstream);
1131
0
    }
1132
0
    reconnect_wait(s->reconnect, time_msec());
1133
0
}
1134
1135
size_t
1136
jsonrpc_session_get_backlog(const struct jsonrpc_session *s)
1137
0
{
1138
0
    return s->rpc ? jsonrpc_get_backlog(s->rpc) : 0;
1139
0
}
1140
1141
/* Always returns a pointer to a valid C string, assuming 's' was initialized
1142
 * correctly. */
1143
const char *
1144
jsonrpc_session_get_name(const struct jsonrpc_session *s)
1145
0
{
1146
0
    return reconnect_get_name(s->reconnect);
1147
0
}
1148
1149
const char *
1150
jsonrpc_session_get_id(const struct jsonrpc_session *s)
1151
0
{
1152
0
    if (s->rpc && s->rpc->stream) {
1153
0
        return stream_get_peer_id(s->rpc->stream);
1154
0
    } else {
1155
0
        return NULL;
1156
0
    }
1157
0
}
1158
1159
size_t
1160
jsonrpc_session_get_n_remotes(const struct jsonrpc_session *s)
1161
0
{
1162
0
    return s->remotes.n;
1163
0
}
1164
1165
/* Always takes ownership of 'msg', regardless of success. */
1166
int
1167
jsonrpc_session_send(struct jsonrpc_session *s, struct jsonrpc_msg *msg)
1168
0
{
1169
0
    if (s->rpc) {
1170
0
        return jsonrpc_send(s->rpc, msg);
1171
0
    } else {
1172
0
        jsonrpc_msg_destroy(msg);
1173
0
        return ENOTCONN;
1174
0
    }
1175
0
}
1176
1177
struct jsonrpc_msg *
1178
jsonrpc_session_recv(struct jsonrpc_session *s)
1179
0
{
1180
0
    if (s->rpc) {
1181
0
        unsigned int received_bytes;
1182
0
        struct jsonrpc_msg *msg;
1183
1184
0
        received_bytes = jsonrpc_get_received_bytes(s->rpc);
1185
0
        jsonrpc_recv(s->rpc, &msg);
1186
1187
0
        long long int now = time_msec();
1188
0
        reconnect_receive_attempted(s->reconnect, now);
1189
0
        if (received_bytes != jsonrpc_get_received_bytes(s->rpc)) {
1190
            /* Data was successfully received.
1191
             *
1192
             * Previously we only counted receiving a full message as activity,
1193
             * but with large messages or a slow connection that policy could
1194
             * time out the session mid-message. */
1195
0
            reconnect_activity(s->reconnect, now);
1196
0
        }
1197
1198
0
        if (msg) {
1199
0
            if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
1200
                /* Echo request.  Send reply. */
1201
0
                struct jsonrpc_msg *reply;
1202
1203
0
                reply = jsonrpc_create_reply(json_clone(msg->params), msg->id);
1204
0
                jsonrpc_session_send(s, reply);
1205
0
            } else if (msg->type == JSONRPC_REPLY
1206
0
                       && msg->id && msg->id->type == JSON_STRING
1207
0
                       && !strcmp(json_string(msg->id), "echo")) {
1208
                /* It's a reply to our echo request.  Suppress it. */
1209
0
            } else {
1210
0
                return msg;
1211
0
            }
1212
0
            jsonrpc_msg_destroy(msg);
1213
0
        }
1214
0
    }
1215
0
    return NULL;
1216
0
}
1217
1218
void
1219
jsonrpc_session_recv_wait(struct jsonrpc_session *s)
1220
0
{
1221
0
    if (s->rpc) {
1222
0
        jsonrpc_recv_wait(s->rpc);
1223
0
    }
1224
0
}
1225
1226
/* Returns true if 's' is currently connected or trying to connect. */
1227
bool
1228
jsonrpc_session_is_alive(const struct jsonrpc_session *s)
1229
0
{
1230
0
    return s->rpc || s->stream || reconnect_get_max_tries(s->reconnect);
1231
0
}
1232
1233
/* Returns true if 's' is currently connected. */
1234
bool
1235
jsonrpc_session_is_connected(const struct jsonrpc_session *s)
1236
0
{
1237
0
    return s->rpc != NULL;
1238
0
}
1239
1240
/* Returns a sequence number for 's'.  The sequence number increments every
1241
 * time 's' connects or disconnects.  Thus, a caller can use the change (or
1242
 * lack of change) in the sequence number to figure out whether the underlying
1243
 * connection is the same as before. */
1244
unsigned int
1245
jsonrpc_session_get_seqno(const struct jsonrpc_session *s)
1246
0
{
1247
0
    return s->seqno;
1248
0
}
1249
1250
/* Returns the current status of 's'.  If 's' is NULL or is disconnected, this
1251
 * is 0, otherwise it is the status of the connection, as reported by
1252
 * jsonrpc_get_status(). */
1253
int
1254
jsonrpc_session_get_status(const struct jsonrpc_session *s)
1255
0
{
1256
0
    return s && s->rpc ? jsonrpc_get_status(s->rpc) : 0;
1257
0
}
1258
1259
/* Returns the last error reported on a connection by 's'.  The return value is
1260
 * 0 only if no connection made by 's' has ever encountered an error.  See
1261
 * jsonrpc_get_status() for return value interpretation. */
1262
int
1263
jsonrpc_session_get_last_error(const struct jsonrpc_session *s)
1264
0
{
1265
0
    return s->last_error;
1266
0
}
1267
1268
/* Populates 'stats' with statistics from 's'. */
1269
void
1270
jsonrpc_session_get_reconnect_stats(const struct jsonrpc_session *s,
1271
                                    struct reconnect_stats *stats)
1272
0
{
1273
0
    reconnect_get_stats(s->reconnect, time_msec(), stats);
1274
0
}
1275
1276
/* Enables 's' to reconnect to the peer if the connection drops. */
1277
void
1278
jsonrpc_session_enable_reconnect(struct jsonrpc_session *s)
1279
0
{
1280
0
    reconnect_set_max_tries(s->reconnect, UINT_MAX);
1281
0
    reconnect_set_backoff(s->reconnect, RECONNECT_DEFAULT_MIN_BACKOFF,
1282
0
                          RECONNECT_DEFAULT_MAX_BACKOFF);
1283
0
}
1284
1285
/* Forces 's' to drop its connection (if any) and reconnect. */
1286
void
1287
jsonrpc_session_force_reconnect(struct jsonrpc_session *s)
1288
0
{
1289
0
    reconnect_force_reconnect(s->reconnect, time_msec());
1290
0
}
1291
1292
/* Resets the reconnect backoff for 's' by allowing as many free tries as the
1293
 * number of configured remotes.  This is to be used by upper layers before
1294
 * calling jsonrpc_session_force_reconnect() if backoff is undesirable.
1295
 */
1296
void
1297
jsonrpc_session_reset_backoff(struct jsonrpc_session *s)
1298
0
{
1299
0
    unsigned int free_tries = s->remotes.n;
1300
1301
0
    if (jsonrpc_session_is_connected(s)) {
1302
        /* The extra free try will be consumed when the current remote
1303
         * is disconnected.
1304
         */
1305
0
        free_tries++;
1306
0
    }
1307
0
    reconnect_set_backoff_free_tries(s->reconnect, free_tries);
1308
0
}
1309
1310
/* Sets 'max_backoff' as the maximum time, in milliseconds, to wait after a
1311
 * connection attempt fails before attempting to connect again. */
1312
void
1313
jsonrpc_session_set_max_backoff(struct jsonrpc_session *s, int max_backoff)
1314
0
{
1315
0
    reconnect_set_backoff(s->reconnect, 0, max_backoff);
1316
0
}
1317
1318
/* Sets the "probe interval" for 's' to 'probe_interval', in milliseconds.  If
1319
 * this is zero, it disables the connection keepalive feature.  Otherwise, if
1320
 * 's' is idle for 'probe_interval' milliseconds then 's' will send an echo
1321
 * request and, if no reply is received within an additional 'probe_interval'
1322
 * milliseconds, close the connection (then reconnect, if that feature is
1323
 * enabled). */
1324
void
1325
jsonrpc_session_set_probe_interval(struct jsonrpc_session *s,
1326
                                   int probe_interval)
1327
0
{
1328
0
    if (ovs_replay_is_active()) {
1329
0
        return;
1330
0
    }
1331
0
    reconnect_set_probe_interval(s->reconnect, probe_interval);
1332
0
}
1333
1334
/* Sets the DSCP value used for 's''s connection to 'dscp'.  If this is
1335
 * different from the DSCP value currently in use then the connection is closed
1336
 * and reconnected. */
1337
void
1338
jsonrpc_session_set_dscp(struct jsonrpc_session *s, uint8_t dscp)
1339
0
{
1340
0
    if (s->dscp != dscp) {
1341
0
        pstream_close(s->pstream);
1342
0
        s->pstream = NULL;
1343
1344
0
        s->dscp = dscp;
1345
0
        jsonrpc_session_force_reconnect(s);
1346
0
    }
1347
0
}
1348
1349
void
1350
jsonrpc_session_set_options(struct jsonrpc_session *s,
1351
                            const struct jsonrpc_session_options *options)
1352
0
{
1353
0
    jsonrpc_session_set_max_backoff(s, options->max_backoff);
1354
0
    jsonrpc_session_set_probe_interval(s, options->probe_interval);
1355
0
    jsonrpc_session_set_dscp(s, options->dscp);
1356
0
}
1357
1358
/* Sets thresholds for send backlog.  If send backlog contains more than
1359
 * 'max_n_msgs' messages or is larger than 'max_backlog_bytes' bytes,
1360
 * connection will be closed (then reconnected, if that feature is enabled). */
1361
void
1362
jsonrpc_session_set_backlog_threshold(struct jsonrpc_session *s,
1363
                                      size_t max_n_msgs,
1364
                                      size_t max_backlog_bytes)
1365
0
{
1366
0
    s->max_n_msgs = max_n_msgs;
1367
0
    s->max_backlog_bytes = max_backlog_bytes;
1368
0
    if (s->rpc) {
1369
0
        jsonrpc_set_backlog_threshold(s->rpc, max_n_msgs, max_backlog_bytes);
1370
0
    }
1371
0
}