Coverage Report

Created: 2025-07-18 06:07

/src/openvswitch/lib/ovsdb-cs.c
Line
Count
Source (jump to first uncovered line)
1
/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017 Nicira, Inc.
2
 * Copyright (C) 2016 Hewlett Packard Enterprise Development LP
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at:
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITION OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
17
#include <config.h>
18
19
#include "ovsdb-cs.h"
20
21
#include <errno.h>
22
23
#include "hash.h"
24
#include "jsonrpc.h"
25
#include "openvswitch/dynamic-string.h"
26
#include "openvswitch/hmap.h"
27
#include "openvswitch/json.h"
28
#include "openvswitch/poll-loop.h"
29
#include "openvswitch/shash.h"
30
#include "openvswitch/vlog.h"
31
#include "ovsdb-data.h"
32
#include "ovsdb-error.h"
33
#include "ovsdb-parser.h"
34
#include "ovsdb-session.h"
35
#include "ovsdb-types.h"
36
#include "svec.h"
37
#include "util.h"
38
#include "uuid.h"
39
40
VLOG_DEFINE_THIS_MODULE(ovsdb_cs);
41
42
/* Connection state machine.
43
 *
44
 * When a JSON-RPC session connects, the CS layer sends a "monitor_cond"
45
 * request for the Database table in the _Server database and transitions to
46
 * the CS_S_SERVER_MONITOR_REQUESTED state.  If the session drops and
47
 * reconnects, or if the CS receives a "monitor_canceled" notification for a
48
 * table it is monitoring, the CS starts over again in the same way. */
49
#define OVSDB_CS_STATES                                                 \
50
    /* Waits for "get_schema" reply, then sends "monitor_cond"          \
51
     * request for the Database table in the _Server database, whose    \
52
     * details are informed by the schema, and transitions to           \
53
     * CS_S_SERVER_MONITOR_REQUESTED. */                                \
54
0
    OVSDB_CS_STATE(SERVER_SCHEMA_REQUESTED)                             \
55
0
                                                                        \
56
0
    /* Waits for "monitor_cond" reply for the Database table:           \
57
0
     *                                                                  \
58
0
     * - If the reply indicates success, and the Database table has a   \
59
0
     *   row for the CS database:                                       \
60
0
     *                                                                  \
61
0
     *   * If the row indicates that this is a clustered database       \
62
0
     *     that is not connected to the cluster, closes the             \
63
0
     *     connection.  The next connection attempt has a chance at     \
64
0
     *     picking a connected server.                                  \
65
0
     *                                                                  \
66
0
     *   * Otherwise, sends a monitoring request for the CS             \
67
0
     *     database whose details are informed by the schema            \
68
0
     *     (obtained from the row), and transitions to                  \
69
0
     *     CS_S_DATA_MONITOR_(COND_(SINCE_))REQUESTED.                  \
70
0
     *                                                                  \
71
0
     * - If the reply indicates success, but the Database table does    \
72
0
     *   not have a row for the CS database, transitions to             \
73
0
     *   CS_S_ERROR.                                                    \
74
0
     *                                                                  \
75
0
     * - If the reply indicates failure, sends a "get_schema" request   \
76
0
     *   for the CS database and transitions to                         \
77
0
     *   CS_S_DATA_SCHEMA_REQUESTED. */                                 \
78
0
    OVSDB_CS_STATE(SERVER_MONITOR_REQUESTED)                            \
79
0
                                                                        \
80
0
    /* Waits for "get_schema" reply, then sends "monitor_cond"          \
81
0
     * request whose details are informed by the schema, and            \
82
0
     * transitions to CS_S_DATA_MONITOR_COND_REQUESTED. */              \
83
0
    OVSDB_CS_STATE(DATA_SCHEMA_REQUESTED)                               \
84
0
                                                                        \
85
0
    /* Waits for "monitor_cond_since" reply.  If successful, replaces   \
86
0
     * the CS contents by the data carried in the reply and             \
87
0
     * transitions to CS_S_MONITORING.  On failure, sends a             \
88
0
     * "monitor_cond" request and transitions to                        \
89
0
     * CS_S_DATA_MONITOR_COND_REQUESTED. */                             \
90
0
    OVSDB_CS_STATE(DATA_MONITOR_COND_SINCE_REQUESTED)                   \
91
0
                                                                        \
92
0
    /* Waits for "monitor_cond" reply.  If successful, replaces the     \
93
0
     * CS contents by the data carried in the reply and transitions     \
94
0
     * to CS_S_MONITORING.  On failure, sends a "monitor" request       \
95
0
     * and transitions to CS_S_DATA_MONITOR_REQUESTED. */               \
96
0
    OVSDB_CS_STATE(DATA_MONITOR_COND_REQUESTED)                         \
97
0
                                                                        \
98
0
    /* Waits for "monitor" reply.  If successful, replaces the CS       \
99
0
     * contents by the data carried in the reply and transitions to     \
100
0
     * CS_S_MONITORING.  On failure, transitions to CS_S_ERROR. */      \
101
0
    OVSDB_CS_STATE(DATA_MONITOR_REQUESTED)                              \
102
0
                                                                        \
103
0
    /* State that processes "update", "update2" or "update3"            \
104
0
     * notifications for the main database (and the Database table      \
105
0
     * in _Server if available).                                        \
106
0
     *                                                                  \
107
0
     * If we're monitoring the Database table and we get notified       \
108
0
     * that the CS database has been deleted, we close the              \
109
0
     * connection (which will restart the state machine). */            \
110
0
    OVSDB_CS_STATE(MONITORING)                                          \
111
0
                                                                        \
112
0
    /* Terminal error state that indicates that nothing useful can be   \
113
0
     * done, for example because the database server doesn't actually   \
114
0
     * have the desired database.  We maintain the session with the     \
115
0
     * database server anyway.  If it starts serving the database       \
116
0
     * that we want, or if someone fixes and restarts the database,     \
117
0
     * then it will kill the session and we will automatically          \
118
0
     * reconnect and try again. */                                      \
119
0
    OVSDB_CS_STATE(ERROR)                                               \
120
0
                                                                        \
121
0
    /* Terminal state that indicates we connected to a useless server   \
122
0
     * in a cluster, e.g. one that is partitioned from the rest of      \
123
0
     * the cluster. We're waiting to retry. */                          \
124
0
    OVSDB_CS_STATE(RETRY)
125
126
enum ovsdb_cs_state {
127
#define OVSDB_CS_STATE(NAME) CS_S_##NAME,
128
    OVSDB_CS_STATES
129
#undef OVSDB_CS_STATE
130
};
131
132
static const char *
133
ovsdb_cs_state_to_string(enum ovsdb_cs_state state)
134
0
{
135
0
    switch (state) {
136
0
#define OVSDB_CS_STATE(NAME) case CS_S_##NAME: return #NAME;
137
0
        OVSDB_CS_STATES
138
0
#undef OVSDB_CS_STATE
139
0
    default: return "<unknown>";
140
0
    }
141
0
}
142
143
/* A database being monitored.
144
 *
145
 * There are two instances of this data structure for each CS instance, one for
146
 * the _Server database used for working with clusters, and the other one for
147
 * the actual database that the client is interested in.  */
148
struct ovsdb_cs_db {
149
    struct ovsdb_cs *cs;
150
151
    /* Data. */
152
    const char *db_name;        /* Database's name. */
153
    struct hmap tables;         /* Contains "struct ovsdb_cs_db_table *"s.*/
154
    struct json *monitor_id;
155
    struct json *schema;
156
157
    /* Monitor version. */
158
    int max_version;            /* Maximum version of monitor request to use. */
159
    int monitor_version;        /* 0 if not monitoring, 1=monitor,
160
                                 * 2=monitor_cond, 3=monitor_cond_since. */
161
162
    /* Condition changes. */
163
    bool cond_changed;          /* Change not yet sent to server? */
164
    unsigned int cond_seqno;    /* Increments when condition changes. */
165
166
    /* Database locking. */
167
    char *lock_name;            /* Name of lock we need, NULL if none. */
168
    bool has_lock;              /* Has db server told us we have the lock? */
169
    bool is_lock_contended;     /* Has db server told us we can't get lock? */
170
    struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
171
172
    /* Last db txn id, used for fast resync through monitor_cond_since */
173
    struct uuid last_id;
174
175
    /* Client interface. */
176
    struct ovs_list events;
177
    const struct ovsdb_cs_ops *ops;
178
    void *ops_aux;
179
};
180
181
static const struct ovsdb_cs_ops ovsdb_cs_server_ops;
182
183
static void ovsdb_cs_db_destroy_tables(struct ovsdb_cs_db *);
184
static unsigned int ovsdb_cs_db_set_condition(
185
    struct ovsdb_cs_db *, const char *db_name, const struct json *condition);
186
187
static void ovsdb_cs_send_schema_request(struct ovsdb_cs *,
188
                                          struct ovsdb_cs_db *);
189
static void ovsdb_cs_send_db_change_aware(struct ovsdb_cs *);
190
static bool ovsdb_cs_check_server_db(struct ovsdb_cs *);
191
static void ovsdb_cs_clear_server_rows(struct ovsdb_cs *);
192
static void ovsdb_cs_send_monitor_request(struct ovsdb_cs *,
193
                                          struct ovsdb_cs_db *, int version);
194
static void ovsdb_cs_db_ack_condition(struct ovsdb_cs_db *db);
195
static void ovsdb_cs_db_sync_condition(struct ovsdb_cs_db *db);
196
197
struct ovsdb_cs {
198
    struct ovsdb_cs_db server;
199
    struct ovsdb_cs_db data;
200
201
    /* Session state.
202
     *
203
     * 'state_seqno' is a snapshot of the session's sequence number as returned
204
     * jsonrpc_session_get_seqno(session), so if it differs from the value that
205
     * function currently returns then the session has reconnected and the
206
     * state machine must restart.  */
207
    struct jsonrpc_session *session; /* Connection to the server. */
208
    char *remote;                    /* 'session' remote name. */
209
    enum ovsdb_cs_state state;       /* Current session state. */
210
    unsigned int state_seqno;        /* See above. */
211
    struct json *request_id;         /* JSON ID for request awaiting reply. */
212
213
    /* IDs of outstanding transactions. */
214
    struct json **txns;
215
    size_t n_txns, allocated_txns;
216
217
    /* Info for the _Server database. */
218
    struct uuid cid;
219
    struct hmap server_rows;
220
221
    /* Whether to send 'set_db_change_aware'. */
222
    bool set_db_change_aware;
223
224
    /* Clustered servers. */
225
    uint64_t min_index;      /* Minimum allowed index, to avoid regression. */
226
    bool leader_only;        /* If true, do not connect to Raft followers. */
227
    bool shuffle_remotes;    /* If true, connect to servers in random order. */
228
};
229
230
static void ovsdb_cs_transition_at(struct ovsdb_cs *, enum ovsdb_cs_state,
231
                                    const char *where);
232
#define ovsdb_cs_transition(CS, STATE) \
233
0
    ovsdb_cs_transition_at(CS, STATE, OVS_SOURCE_LOCATOR)
234
235
static void ovsdb_cs_retry_at(struct ovsdb_cs *, const char *where);
236
0
#define ovsdb_cs_retry(CS) ovsdb_cs_retry_at(CS, OVS_SOURCE_LOCATOR)
237
238
static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
239
240
static void ovsdb_cs_db_parse_monitor_reply(struct ovsdb_cs_db *,
241
                                            const struct json *result,
242
                                            int version);
243
static bool ovsdb_cs_db_parse_update_rpc(struct ovsdb_cs_db *,
244
                                         const struct jsonrpc_msg *);
245
static bool ovsdb_cs_handle_monitor_canceled(struct ovsdb_cs *,
246
                                              struct ovsdb_cs_db *,
247
                                              const struct jsonrpc_msg *);
248
249
static bool ovsdb_cs_db_process_lock_replies(struct ovsdb_cs_db *,
250
                                              const struct jsonrpc_msg *);
251
static struct jsonrpc_msg *ovsdb_cs_db_compose_lock_request(
252
    struct ovsdb_cs_db *);
253
static struct jsonrpc_msg *ovsdb_cs_db_compose_unlock_request(
254
    struct ovsdb_cs_db *);
255
static void ovsdb_cs_db_parse_lock_reply(struct ovsdb_cs_db *,
256
                                          const struct json *);
257
static bool ovsdb_cs_db_parse_lock_notify(struct ovsdb_cs_db *,
258
                                           const struct json *params,
259
                                           bool new_has_lock);
260
static void ovsdb_cs_send_cond_change(struct ovsdb_cs *);
261
262
static bool ovsdb_cs_db_txn_process_reply(struct ovsdb_cs *,
263
                                          const struct jsonrpc_msg *reply);
264

265
/* Events. */
266
267
void
268
ovsdb_cs_event_destroy(struct ovsdb_cs_event *event)
269
0
{
270
0
    if (event) {
271
0
        switch (event->type) {
272
0
        case OVSDB_CS_EVENT_TYPE_RECONNECT:
273
0
        case OVSDB_CS_EVENT_TYPE_LOCKED:
274
0
            break;
275
276
0
        case OVSDB_CS_EVENT_TYPE_UPDATE:
277
0
            json_destroy(event->update.table_updates);
278
0
            break;
279
280
0
        case OVSDB_CS_EVENT_TYPE_TXN_REPLY:
281
0
            jsonrpc_msg_destroy(event->txn_reply);
282
0
            break;
283
0
        }
284
0
        free(event);
285
0
    }
286
0
}
287

288
/* Lifecycle. */
289
290
static void
291
ovsdb_cs_db_init(struct ovsdb_cs_db *db, const char *db_name,
292
                 struct ovsdb_cs *parent, int max_version,
293
                 const struct ovsdb_cs_ops *ops, void *ops_aux)
294
0
{
295
0
    *db = (struct ovsdb_cs_db) {
296
0
        .cs = parent,
297
0
        .db_name = db_name,
298
0
        .tables = HMAP_INITIALIZER(&db->tables),
299
0
        .max_version = max_version,
300
0
        .monitor_id = json_array_create_2(json_string_create("monid"),
301
0
                                          json_string_create(db_name)),
302
0
        .events = OVS_LIST_INITIALIZER(&db->events),
303
0
        .ops = ops,
304
0
        .ops_aux = ops_aux,
305
0
    };
306
0
}
307
308
/* Creates and returns a new client synchronization object.  The connection
309
 * will monitor remote database 'db_name'.  If 'retry' is true, then also
310
 * reconnect if the connection fails.
311
 *
312
 * XXX 'max_version' should ordinarily be 3, to allow use of the most efficient
313
 * "monitor_cond_since" method with the database.  Currently there's some kind
314
 * of bug in the DDlog Rust code that interfaces to that, so instead
315
 * ovn-northd-ddlog passes 1 to use plain 'monitor' instead.  Once the DDlog
316
 * Rust code gets fixed, we might as well just delete 'max_version'
317
 * entirely.
318
 *
319
 * 'ops' is a struct for northd_cs_run() to use, and 'ops_aux' is a pointer
320
 * that gets passed into each call.
321
 *
322
 * Use ovsdb_cs_set_remote() to configure the database to which to connect.
323
 * Until a remote is configured, no data can be retrieved.
324
 */
325
struct ovsdb_cs *
326
ovsdb_cs_create(const char *db_name, int max_version,
327
                const struct ovsdb_cs_ops *ops, void *ops_aux)
328
0
{
329
0
    struct ovsdb_cs *cs = xzalloc(sizeof *cs);
330
0
    ovsdb_cs_db_init(&cs->server, "_Server", cs, 2, &ovsdb_cs_server_ops, cs);
331
0
    ovsdb_cs_db_init(&cs->data, db_name, cs, max_version, ops, ops_aux);
332
0
    cs->state_seqno = UINT_MAX;
333
0
    cs->request_id = NULL;
334
0
    cs->leader_only = true;
335
0
    cs->shuffle_remotes = true;
336
0
    cs->set_db_change_aware = true;
337
0
    hmap_init(&cs->server_rows);
338
339
0
    return cs;
340
0
}
341
342
static void
343
ovsdb_cs_db_destroy(struct ovsdb_cs_db *db)
344
0
{
345
0
    ovsdb_cs_db_destroy_tables(db);
346
347
0
    json_destroy(db->monitor_id);
348
0
    json_destroy(db->schema);
349
350
0
    free(db->lock_name);
351
352
0
    json_destroy(db->lock_request_id);
353
354
    /* This list always gets flushed out at the end of ovsdb_cs_run(). */
355
0
    ovs_assert(ovs_list_is_empty(&db->events));
356
0
}
357
358
/* Destroys 'cs' and all of the data structures that it manages. */
359
void
360
ovsdb_cs_destroy(struct ovsdb_cs *cs)
361
0
{
362
0
    if (cs) {
363
0
        ovsdb_cs_db_destroy(&cs->server);
364
0
        ovsdb_cs_db_destroy(&cs->data);
365
0
        jsonrpc_session_close(cs->session);
366
0
        free(cs->remote);
367
0
        json_destroy(cs->request_id);
368
369
0
        for (size_t i = 0; i < cs->n_txns; i++) {
370
0
            json_destroy(cs->txns[i]);
371
0
        }
372
0
        free(cs->txns);
373
374
0
        ovsdb_cs_clear_server_rows(cs);
375
0
        hmap_destroy(&cs->server_rows);
376
377
0
        free(cs);
378
0
    }
379
0
}
380
381
static void
382
ovsdb_cs_transition_at(struct ovsdb_cs *cs, enum ovsdb_cs_state new_state,
383
                        const char *where)
384
0
{
385
0
    VLOG_DBG("%s: %s -> %s at %s",
386
0
             cs->session ? jsonrpc_session_get_name(cs->session) : "void",
387
0
             ovsdb_cs_state_to_string(cs->state),
388
0
             ovsdb_cs_state_to_string(new_state),
389
0
             where);
390
0
    cs->state = new_state;
391
0
}
392
393
static void
394
ovsdb_cs_send_request(struct ovsdb_cs *cs, struct jsonrpc_msg *request)
395
0
{
396
0
    json_destroy(cs->request_id);
397
0
    cs->request_id = json_clone(request->id);
398
0
    if (cs->session) {
399
0
        jsonrpc_session_send(cs->session, request);
400
0
    } else {
401
0
        jsonrpc_msg_destroy(request);
402
0
    }
403
0
}
404
405
static void
406
ovsdb_cs_retry_at(struct ovsdb_cs *cs, const char *where)
407
0
{
408
0
    ovsdb_cs_force_reconnect(cs);
409
0
    ovsdb_cs_transition_at(cs, CS_S_RETRY, where);
410
0
}
411
412
static void
413
ovsdb_cs_restart_fsm(struct ovsdb_cs *cs)
414
0
{
415
    /* Resync data DB table conditions to avoid missing updates due to
416
     * conditions that were in flight or changed locally while the connection
417
     * was down.
418
     */
419
0
    ovsdb_cs_db_sync_condition(&cs->data);
420
421
0
    ovsdb_cs_send_schema_request(cs, &cs->server);
422
0
    ovsdb_cs_transition(cs, CS_S_SERVER_SCHEMA_REQUESTED);
423
0
    cs->data.monitor_version = 0;
424
0
    cs->server.monitor_version = 0;
425
0
}
426
427
static void
428
ovsdb_cs_process_response(struct ovsdb_cs *cs, struct jsonrpc_msg *msg)
429
0
{
430
0
    bool ok = msg->type == JSONRPC_REPLY;
431
0
    if (!ok
432
0
        && cs->state != CS_S_SERVER_SCHEMA_REQUESTED
433
0
        && cs->state != CS_S_SERVER_MONITOR_REQUESTED
434
0
        && cs->state != CS_S_DATA_MONITOR_COND_REQUESTED
435
0
        && cs->state != CS_S_DATA_MONITOR_COND_SINCE_REQUESTED) {
436
0
        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
437
0
        char *s = jsonrpc_msg_to_string(msg);
438
0
        VLOG_INFO_RL(&rl, "%s: received unexpected %s response in "
439
0
                     "%s state: %s", jsonrpc_session_get_name(cs->session),
440
0
                     jsonrpc_msg_type_to_string(msg->type),
441
0
                     ovsdb_cs_state_to_string(cs->state),
442
0
                     s);
443
0
        free(s);
444
0
        ovsdb_cs_retry(cs);
445
0
        return;
446
0
    }
447
448
0
    switch (cs->state) {
449
0
    case CS_S_SERVER_SCHEMA_REQUESTED:
450
0
        if (ok) {
451
0
            json_destroy(cs->server.schema);
452
0
            cs->server.schema = json_clone(msg->result);
453
0
            ovsdb_cs_send_monitor_request(cs, &cs->server,
454
0
                                          cs->server.max_version);
455
0
            ovsdb_cs_transition(cs, CS_S_SERVER_MONITOR_REQUESTED);
456
0
        } else {
457
0
            ovsdb_cs_send_schema_request(cs, &cs->data);
458
0
            ovsdb_cs_transition(cs, CS_S_DATA_SCHEMA_REQUESTED);
459
0
        }
460
0
        break;
461
462
0
    case CS_S_SERVER_MONITOR_REQUESTED:
463
0
        if (ok) {
464
0
            cs->server.monitor_version = cs->server.max_version;
465
0
            ovsdb_cs_db_parse_monitor_reply(&cs->server, msg->result,
466
0
                                            cs->server.monitor_version);
467
0
            if (ovsdb_cs_check_server_db(cs) && cs->set_db_change_aware) {
468
0
                ovsdb_cs_send_db_change_aware(cs);
469
0
            }
470
0
        } else {
471
0
            ovsdb_cs_send_schema_request(cs, &cs->data);
472
0
            ovsdb_cs_transition(cs, CS_S_DATA_SCHEMA_REQUESTED);
473
0
        }
474
0
        break;
475
476
0
    case CS_S_DATA_SCHEMA_REQUESTED:
477
0
        json_destroy(cs->data.schema);
478
0
        cs->data.schema = json_clone(msg->result);
479
0
        if (cs->data.max_version >= 2) {
480
0
            ovsdb_cs_send_monitor_request(cs, &cs->data, 2);
481
0
            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED);
482
0
        } else {
483
0
            ovsdb_cs_send_monitor_request(cs, &cs->data, 1);
484
0
            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED);
485
0
        }
486
0
        break;
487
488
0
    case CS_S_DATA_MONITOR_COND_SINCE_REQUESTED:
489
0
        if (!ok) {
490
            /* "monitor_cond_since" not supported.  Try "monitor_cond". */
491
0
            ovsdb_cs_send_monitor_request(cs, &cs->data, 2);
492
0
            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED);
493
0
        } else {
494
0
            cs->data.monitor_version = 3;
495
0
            ovsdb_cs_transition(cs, CS_S_MONITORING);
496
0
            ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 3);
497
0
        }
498
0
        break;
499
500
0
    case CS_S_DATA_MONITOR_COND_REQUESTED:
501
0
        if (!ok) {
502
            /* "monitor_cond" not supported.  Try "monitor". */
503
0
            ovsdb_cs_send_monitor_request(cs, &cs->data, 1);
504
0
            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED);
505
0
        } else {
506
0
            cs->data.monitor_version = 2;
507
0
            ovsdb_cs_transition(cs, CS_S_MONITORING);
508
0
            ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 2);
509
0
        }
510
0
        break;
511
512
0
    case CS_S_DATA_MONITOR_REQUESTED:
513
0
        cs->data.monitor_version = 1;
514
0
        ovsdb_cs_transition(cs, CS_S_MONITORING);
515
0
        ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 1);
516
0
        break;
517
518
0
    case CS_S_MONITORING:
519
        /* We don't normally have a request outstanding in this state.  If we
520
         * do, it's a "monitor_cond_change", which means that the conditional
521
         * monitor clauses were updated.
522
         *
523
         * Mark the last requested conditions as acked and if further
524
         * condition changes were pending, send them now. */
525
0
        ovsdb_cs_db_ack_condition(&cs->data);
526
0
        ovsdb_cs_send_cond_change(cs);
527
0
        cs->data.cond_seqno++;
528
0
        break;
529
530
0
    case CS_S_ERROR:
531
0
    case CS_S_RETRY:
532
        /* Nothing to do in this state. */
533
0
        break;
534
535
0
    default:
536
0
        OVS_NOT_REACHED();
537
0
    }
538
0
}
539
540
static void
541
ovsdb_cs_process_msg(struct ovsdb_cs *cs, struct jsonrpc_msg *msg)
542
0
{
543
0
    bool is_response = (msg->type == JSONRPC_REPLY ||
544
0
                        msg->type == JSONRPC_ERROR);
545
546
    /* Process a reply to an outstanding request. */
547
0
    if (is_response
548
0
        && cs->request_id && json_equal(cs->request_id, msg->id)) {
549
0
        json_destroy(cs->request_id);
550
0
        cs->request_id = NULL;
551
0
        ovsdb_cs_process_response(cs, msg);
552
0
        return;
553
0
    }
554
555
    /* Process database contents updates. */
556
0
    if (ovsdb_cs_db_parse_update_rpc(&cs->data, msg)) {
557
0
        return;
558
0
    }
559
0
    if (cs->server.monitor_version
560
0
        && ovsdb_cs_db_parse_update_rpc(&cs->server, msg)) {
561
0
        ovsdb_cs_check_server_db(cs);
562
0
        return;
563
0
    }
564
565
0
    if (ovsdb_cs_handle_monitor_canceled(cs, &cs->data, msg)
566
0
        || (cs->server.monitor_version
567
0
            && ovsdb_cs_handle_monitor_canceled(cs, &cs->server, msg))) {
568
0
        return;
569
0
    }
570
571
    /* Process "lock" replies and related notifications. */
572
0
    if (ovsdb_cs_db_process_lock_replies(&cs->data, msg)) {
573
0
        return;
574
0
    }
575
576
    /* Process response to a database transaction we submitted. */
577
0
    if (is_response && ovsdb_cs_db_txn_process_reply(cs, msg)) {
578
0
        return;
579
0
    }
580
581
    /* Unknown message.  Log at a low level because this can happen if
582
     * ovsdb_cs_txn_destroy() is called to destroy a transaction
583
     * before we receive the reply.
584
     *
585
     * (We could sort those out from other kinds of unknown messages by
586
     * using distinctive IDs for transactions, if it seems valuable to
587
     * do so, and then it would be possible to use different log
588
     * levels. XXX?) */
589
0
    char *s = jsonrpc_msg_to_string(msg);
590
0
    VLOG_DBG("%s: received unexpected %s message: %s",
591
0
             jsonrpc_session_get_name(cs->session),
592
0
             jsonrpc_msg_type_to_string(msg->type), s);
593
0
    free(s);
594
0
}
595
596
static struct ovsdb_cs_event *
597
ovsdb_cs_db_add_event(struct ovsdb_cs_db *db, enum ovsdb_cs_event_type type)
598
0
{
599
0
    struct ovsdb_cs_event *event = xmalloc(sizeof *event);
600
0
    event->type = type;
601
0
    ovs_list_push_back(&db->events, &event->list_node);
602
0
    return event;
603
0
}
604
605
/* Processes a batch of messages from the database server on 'cs'.  This may
606
 * cause the CS's contents to change.
607
 *
608
 * Initializes 'events' with a list of events that occurred on 'cs'.  The
609
 * caller must process and destroy all of the events. */
610
void
611
ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events)
612
0
{
613
0
    ovs_list_init(events);
614
0
    if (!cs->session) {
615
0
        return;
616
0
    }
617
618
0
    ovsdb_cs_send_cond_change(cs);
619
620
0
    jsonrpc_session_run(cs->session);
621
622
0
    unsigned int seqno = jsonrpc_session_get_seqno(cs->session);
623
0
    if (cs->state_seqno != seqno) {
624
0
        cs->state_seqno = seqno;
625
0
        ovsdb_cs_restart_fsm(cs);
626
627
0
        for (size_t i = 0; i < cs->n_txns; i++) {
628
0
            json_destroy(cs->txns[i]);
629
0
        }
630
0
        cs->n_txns = 0;
631
632
0
        ovsdb_cs_db_add_event(&cs->data, OVSDB_CS_EVENT_TYPE_RECONNECT);
633
634
0
        if (cs->data.lock_name) {
635
0
            jsonrpc_session_send(
636
0
                cs->session,
637
0
                ovsdb_cs_db_compose_lock_request(&cs->data));
638
0
        }
639
0
    }
640
641
0
    for (int i = 0; i < 50; i++) {
642
0
        struct jsonrpc_msg *msg = jsonrpc_session_recv(cs->session);
643
0
        if (!msg) {
644
0
            break;
645
0
        }
646
0
        ovsdb_cs_process_msg(cs, msg);
647
0
        jsonrpc_msg_destroy(msg);
648
0
    }
649
0
    ovs_list_push_back_all(events, &cs->data.events);
650
0
}
651
652
/* Arranges for poll_block() to wake up when ovsdb_cs_run() has something to
653
 * do or when activity occurs on a transaction on 'cs'. */
654
void
655
ovsdb_cs_wait(struct ovsdb_cs *cs)
656
0
{
657
0
    if (!cs->session) {
658
0
        return;
659
0
    }
660
0
    jsonrpc_session_wait(cs->session);
661
0
    jsonrpc_session_recv_wait(cs->session);
662
0
}
663

664
/* Network connection. */
665
666
/* Changes the remote and creates a new session.  Keeps existing connection
667
 * if current remote is still valid.
668
 *
669
 * If 'retry' is true, the connection to the remote will automatically retry
670
 * when it fails.  If 'retry' is false, the connection is one-time. */
671
void
672
ovsdb_cs_set_remote(struct ovsdb_cs *cs, const char *remote, bool retry)
673
0
{
674
0
    if (cs
675
0
        && ((remote != NULL) != (cs->remote != NULL)
676
0
            || (remote && cs->remote && strcmp(remote, cs->remote)))) {
677
0
        struct jsonrpc *rpc = NULL;
678
679
        /* Close the old session, if any. */
680
0
        if (cs->session) {
681
            /* Save the current open connection and close the session. */
682
0
            rpc = jsonrpc_session_steal(cs->session);
683
0
            cs->session = NULL;
684
685
0
            free(cs->remote);
686
0
            cs->remote = NULL;
687
0
        }
688
689
        /* Open new session, if any. */
690
0
        if (remote) {
691
0
            struct svec remotes = SVEC_EMPTY_INITIALIZER;
692
0
            struct uuid old_cid = cs->cid;
693
694
0
            ovsdb_session_parse_remote(remote, &remotes, &cs->cid);
695
0
            if (cs->shuffle_remotes) {
696
0
                svec_shuffle(&remotes);
697
0
            }
698
0
            cs->session = jsonrpc_session_open_multiple(&remotes, retry);
699
700
            /* Use old connection, if cluster id didn't change and the remote
701
             * is still on the list, to avoid unnecessary re-connection. */
702
0
            if (rpc && uuid_equals(&old_cid, &cs->cid)
703
0
                && svec_contains_unsorted(&remotes, jsonrpc_get_name(rpc))) {
704
0
                jsonrpc_session_replace(cs->session, rpc);
705
0
                cs->state_seqno = jsonrpc_session_get_seqno(cs->session);
706
0
                rpc = NULL;
707
0
            } else {
708
0
                cs->state_seqno = UINT_MAX;
709
0
            }
710
711
0
            svec_destroy(&remotes);
712
0
            cs->remote = xstrdup(remote);
713
0
        }
714
715
0
        jsonrpc_close(rpc);
716
0
    }
717
0
}
718
719
/* Reconfigures 'cs' so that it would reconnect to the database, if
720
 * connection was dropped. */
721
void
722
ovsdb_cs_enable_reconnect(struct ovsdb_cs *cs)
723
0
{
724
0
    if (cs->session) {
725
0
        jsonrpc_session_enable_reconnect(cs->session);
726
0
    }
727
0
}
728
729
/* Forces 'cs' to drop its connection to the database and reconnect.  In the
730
 * meantime, the contents of 'cs' will not change. */
731
void
732
ovsdb_cs_force_reconnect(struct ovsdb_cs *cs)
733
0
{
734
0
    if (cs->session) {
735
0
        if (cs->state == CS_S_MONITORING) {
736
            /* The ovsdb-cs was in MONITORING state, so we either had data
737
             * inconsistency on this server, or it stopped being the cluster
738
             * leader, or the user requested to re-connect.  Avoiding backoff
739
             * in these cases, as we need to re-connect as soon as possible.
740
             * Connections that are not in MONITORING state should have their
741
             * backoff to avoid constant flood of re-connection attempts in
742
             * case there is no suitable database server. */
743
0
            jsonrpc_session_reset_backoff(cs->session);
744
0
        }
745
0
        jsonrpc_session_force_reconnect(cs->session);
746
0
    }
747
0
}
748
749
/* Drops 'cs''s current connection and the cached session.  This is useful if
750
 * the client notices some kind of inconsistency. */
751
void
752
ovsdb_cs_flag_inconsistency(struct ovsdb_cs *cs)
753
0
{
754
0
    cs->data.last_id = UUID_ZERO;
755
0
    ovsdb_cs_retry(cs);
756
0
}
757
758
/* Returns true if 'cs' is currently connected or will eventually try to
759
 * reconnect. */
760
bool
761
ovsdb_cs_is_alive(const struct ovsdb_cs *cs)
762
0
{
763
0
    return (cs->session
764
0
            && jsonrpc_session_is_alive(cs->session)
765
0
            && cs->state != CS_S_ERROR);
766
0
}
767
768
/* Returns true if 'cs' is currently connected to a server. */
769
bool
770
ovsdb_cs_is_connected(const struct ovsdb_cs *cs)
771
0
{
772
0
    return cs->session && jsonrpc_session_is_connected(cs->session);
773
0
}
774
775
/* Returns the last error reported on a connection by 'cs'.  The return value
776
 * is 0 only if no connection made by 'cs' has ever encountered an error and
777
 * a negative response to a schema request has never been received. See
778
 * jsonrpc_get_status() for jsonrpc_session_get_last_error() return value
779
 * interpretation. */
780
int
781
ovsdb_cs_get_last_error(const struct ovsdb_cs *cs)
782
0
{
783
0
    int err = cs->session ? jsonrpc_session_get_last_error(cs->session) : 0;
784
0
    if (err) {
785
0
        return err;
786
0
    } else if (cs->state == CS_S_ERROR) {
787
0
        return ENOENT;
788
0
    } else {
789
0
        return 0;
790
0
    }
791
0
}
792
793
/* Sets all the JSON-RPC session 'options' for 'cs''s current session. */
794
void
795
ovsdb_cs_set_jsonrpc_options(const struct ovsdb_cs *cs,
796
                             const struct jsonrpc_session_options *options)
797
0
{
798
0
    if (cs->session) {
799
0
        jsonrpc_session_set_options(cs->session, options);
800
0
    }
801
0
}
802
803
/* Sets the "probe interval" for 'cs''s current session to 'probe_interval', in
804
 * milliseconds. */
805
void
806
ovsdb_cs_set_probe_interval(const struct ovsdb_cs *cs, int probe_interval)
807
0
{
808
0
    if (cs->session) {
809
0
        jsonrpc_session_set_probe_interval(cs->session, probe_interval);
810
0
    }
811
0
}
812

813
/* Conditional monitoring. */
814
815
/* A table being monitored.
816
 *
817
 * At the CS layer, the only thing we care about, table-wise, is the conditions
818
 * we're using for monitoring them, so there's little here.  We only create
819
 * these table structures at all for tables that have conditions. */
820
struct ovsdb_cs_db_table {
821
    struct hmap_node hmap_node; /* Indexed by 'name'. */
822
    char *name;                 /* Table name. */
823
824
    /* Each of these is a null pointer if it is empty, or JSON [<condition>*]
825
     * or [true] or [false] otherwise.  [true] could be represented as a null
826
     * pointer, but we want to distinguish "empty slot" from "a condition that
827
     * is always true" in a slot. */
828
    struct json *ack_cond; /* Last condition acked by the server. */
829
    struct json *req_cond; /* Last condition requested to the server. */
830
    struct json *new_cond; /* Latest condition set by the IDL client. */
831
};
832
833
/* A kind of condition, so that we can treat equivalent JSON as equivalent. */
834
enum condition_type {
835
    COND_FALSE,                 /* [] or [false] */
836
    COND_TRUE,                  /* Null pointer or [true] */
837
    COND_OTHER                  /* Anything else. */
838
};
839
840
/* Returns the condition_type for 'condition'. */
841
static enum condition_type
842
condition_classify(const struct json *condition)
843
0
{
844
0
    if (condition) {
845
0
        switch (json_array_size(condition)) {
846
0
        case 0:
847
0
            return COND_FALSE;
848
849
0
        case 1: {
850
0
            const struct json *cond = json_array_at(condition, 0);
851
852
0
            return (cond->type == JSON_FALSE ? COND_FALSE
853
0
                    : cond->type == JSON_TRUE ? COND_TRUE
854
0
                    : COND_OTHER);
855
0
        }
856
857
0
        default:
858
0
            return COND_OTHER;
859
0
        }
860
0
    } else {
861
0
        return COND_TRUE;
862
0
    }
863
0
}
864
865
/* Returns true if 'a' and 'b' are the same condition (in an obvious way; we're
866
 * not going to compare for boolean equivalence or anything). */
867
static bool
868
condition_equal(const struct json *a, const struct json *b)
869
0
{
870
0
    enum condition_type type = condition_classify(a);
871
0
    return (type == condition_classify(b)
872
0
            && (type != COND_OTHER || json_equal(a, b)));
873
0
}
874
875
/* Returns a clone of 'condition', translating always-true and always-false to
876
 * [true] and [false], respectively. */
877
static struct json *
878
condition_clone(const struct json *condition)
879
0
{
880
0
    switch (condition_classify(condition)) {
881
0
    case COND_TRUE:
882
0
        return json_array_create_1(json_boolean_create(true));
883
884
0
    case COND_FALSE:
885
0
        return json_array_create_1(json_boolean_create(false));
886
887
0
    case COND_OTHER:
888
0
        return json_clone(condition);
889
0
    }
890
891
0
    OVS_NOT_REACHED();
892
0
}
893
894
/* Returns the ovsdb_cs_db_table associated with 'table' in 'db', creating an
895
 * empty one if necessary. */
896
static struct ovsdb_cs_db_table *
897
ovsdb_cs_db_get_table(struct ovsdb_cs_db *db, const char *table)
898
0
{
899
0
    uint32_t hash = hash_string(table, 0);
900
0
    struct ovsdb_cs_db_table *t;
901
902
0
    HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &db->tables) {
903
0
        if (!strcmp(t->name, table)) {
904
0
            return t;
905
0
        }
906
0
    }
907
908
0
    t = xzalloc(sizeof *t);
909
0
    t->name = xstrdup(table);
910
0
    t->ack_cond = json_array_create_1(json_boolean_create(true));
911
0
    hmap_insert(&db->tables, &t->hmap_node, hash);
912
0
    return t;
913
0
}
914
915
static void
916
ovsdb_cs_db_destroy_tables(struct ovsdb_cs_db *db)
917
0
{
918
0
    struct ovsdb_cs_db_table *table;
919
0
    HMAP_FOR_EACH_SAFE (table, hmap_node, &db->tables) {
920
0
        json_destroy(table->ack_cond);
921
0
        json_destroy(table->req_cond);
922
0
        json_destroy(table->new_cond);
923
0
        hmap_remove(&db->tables, &table->hmap_node);
924
0
        free(table->name);
925
0
        free(table);
926
0
    }
927
0
    hmap_destroy(&db->tables);
928
0
}
929
930
static unsigned int
931
ovsdb_cs_db_set_condition(struct ovsdb_cs_db *db, const char *table,
932
                          const struct json *condition)
933
0
{
934
    /* Compare the new condition to the last known condition which can be
935
     * either "new" (not sent yet), "requested" or "acked", in this order. */
936
0
    struct ovsdb_cs_db_table *t = ovsdb_cs_db_get_table(db, table);
937
0
    const struct json *table_cond = (t->new_cond ? t->new_cond
938
0
                                     : t->req_cond ? t->req_cond
939
0
                                     : t->ack_cond);
940
0
    if (!condition_equal(condition, table_cond)) {
941
0
        json_destroy(t->new_cond);
942
0
        t->new_cond = condition_clone(condition);
943
0
        db->cond_changed = true;
944
0
        poll_immediate_wake();
945
0
    }
946
947
    /* Conditions will be up to date when we receive replies for already
948
     * requested and new conditions, if any.  This includes condition change
949
     * requests for other tables too.
950
     */
951
0
    if (t->new_cond) {
952
        /* New condition will be sent out after all already requested ones
953
         * are acked.
954
         */
955
0
        bool any_req_cond = false;
956
0
        HMAP_FOR_EACH (t, hmap_node, &db->tables) {
957
0
            if (t->req_cond) {
958
0
                any_req_cond = true;
959
0
                break;
960
0
            }
961
0
        }
962
0
        return db->cond_seqno + any_req_cond + 1;
963
0
    } else {
964
        /* Already requested conditions should be up to date at
965
         * db->cond_seqno + 1 while acked conditions are already up to date.
966
         */
967
0
        return db->cond_seqno + !!t->req_cond;
968
0
    }
969
0
}
970
971
/* Sets the replication condition for 'tc' in 'cs' to 'condition' and arranges
972
 * to send the new condition to the database server.
973
 *
974
 * Return the next conditional update sequence number.  When this value and
975
 * ovsdb_cs_get_condition_seqno() matches, 'cs' contains rows that match the
976
 * 'condition'. */
977
unsigned int
978
ovsdb_cs_set_condition(struct ovsdb_cs *cs, const char *table,
979
                       const struct json *condition)
980
0
{
981
0
    return ovsdb_cs_db_set_condition(&cs->data, table, condition);
982
0
}
983
984
/* Returns a "sequence number" that represents the number of conditional
985
 * monitoring updates successfully received by the OVSDB server of a CS
986
 * connection.
987
 *
988
 * ovsdb_cs_set_condition() sets a new condition that is different from the
989
 * current condtion, the next expected "sequence number" is returned.
990
 *
991
 * Whenever ovsdb_cs_get_condition_seqno() returns a value that matches the
992
 * return value of ovsdb_cs_set_condition(), the client is assured that:
993
 *
994
 *   - The ovsdb_cs_set_condition() changes has been acknowledged by the OVSDB
995
 *     server.
996
 *
997
 *   -  'cs' now contains the content matches the new conditions.   */
998
unsigned int
999
ovsdb_cs_get_condition_seqno(const struct ovsdb_cs *cs)
1000
0
{
1001
0
    return cs->data.cond_seqno;
1002
0
}
1003
1004
static struct json *
1005
ovsdb_cs_create_cond_change_req(const struct json *cond)
1006
0
{
1007
0
    struct json *monitor_cond_change_request = json_object_create();
1008
0
    json_object_put(monitor_cond_change_request, "where", json_clone(cond));
1009
0
    return monitor_cond_change_request;
1010
0
}
1011
1012
static struct jsonrpc_msg *
1013
ovsdb_cs_db_compose_cond_change(struct ovsdb_cs_db *db)
1014
0
{
1015
0
    if (!db->cond_changed) {
1016
0
        return NULL;
1017
0
    }
1018
1019
0
    struct json *monitor_cond_change_requests = NULL;
1020
0
    struct ovsdb_cs_db_table *table;
1021
0
    HMAP_FOR_EACH (table, hmap_node, &db->tables) {
1022
        /* Always use the most recent conditions set by the CS client when
1023
         * requesting monitor_cond_change, i.e., table->new_cond.
1024
         */
1025
0
        if (table->new_cond) {
1026
0
            struct json *req =
1027
0
                ovsdb_cs_create_cond_change_req(table->new_cond);
1028
0
            if (req) {
1029
0
                if (!monitor_cond_change_requests) {
1030
0
                    monitor_cond_change_requests = json_object_create();
1031
0
                }
1032
0
                json_object_put(monitor_cond_change_requests,
1033
0
                                table->name,
1034
0
                                json_array_create_1(req));
1035
0
            }
1036
            /* Mark the new condition as requested by moving it to req_cond.
1037
             * If there's already requested condition that's a bug.
1038
             */
1039
0
            ovs_assert(table->req_cond == NULL);
1040
0
            table->req_cond = table->new_cond;
1041
0
            table->new_cond = NULL;
1042
0
        }
1043
0
    }
1044
1045
0
    if (!monitor_cond_change_requests) {
1046
0
        return NULL;
1047
0
    }
1048
1049
0
    db->cond_changed = false;
1050
0
    struct json *params = json_array_create_3(json_clone(db->monitor_id),
1051
0
                                              json_clone(db->monitor_id),
1052
0
                                              monitor_cond_change_requests);
1053
0
    return jsonrpc_create_request("monitor_cond_change", params, NULL);
1054
0
}
1055
1056
/* Marks all requested table conditions in 'db' as acked by the server.
1057
 * It should be called when the server replies to monitor_cond_change
1058
 * requests.
1059
 */
1060
static void
1061
ovsdb_cs_db_ack_condition(struct ovsdb_cs_db *db)
1062
0
{
1063
0
    struct ovsdb_cs_db_table *table;
1064
0
    HMAP_FOR_EACH (table, hmap_node, &db->tables) {
1065
0
        if (table->req_cond) {
1066
0
            json_destroy(table->ack_cond);
1067
0
            table->ack_cond = table->req_cond;
1068
0
            table->req_cond = NULL;
1069
0
        }
1070
0
    }
1071
0
}
1072
1073
/* Should be called when the CS fsm is restarted and resyncs table conditions
1074
 * based on the state the DB is in:
1075
 * - if a non-zero last_id is available for the DB then upon reconnect
1076
 *   the CS should first request acked conditions to avoid missing updates
1077
 *   about records that were added before the transaction with
1078
 *   txn-id == last_id. If there were requested condition changes in flight
1079
 *   (i.e., req_cond not NULL) and the CS client didn't set new conditions
1080
 *   (i.e., new_cond is NULL) then move req_cond to new_cond to trigger a
1081
 *   follow up monitor_cond_change request.
1082
 * - if there's no last_id available for the DB then it's safe to use the
1083
 *   latest conditions set by the CS client even if they weren't acked yet.
1084
 */
1085
static void
1086
ovsdb_cs_db_sync_condition(struct ovsdb_cs_db *db)
1087
0
{
1088
0
    bool ack_all = uuid_is_zero(&db->last_id);
1089
0
    if (ack_all) {
1090
0
        db->cond_changed = false;
1091
0
    }
1092
1093
0
    struct ovsdb_cs_db_table *table;
1094
0
    HMAP_FOR_EACH (table, hmap_node, &db->tables) {
1095
        /* When monitor_cond_since requests will be issued, the
1096
         * table->ack_cond condition will be added to the "where" clause".
1097
         * Follow up monitor_cond_change requests will use table->new_cond.
1098
         */
1099
0
        if (ack_all) {
1100
0
            if (table->new_cond) {
1101
0
                json_destroy(table->req_cond);
1102
0
                table->req_cond = table->new_cond;
1103
0
                table->new_cond = NULL;
1104
0
            }
1105
1106
0
            if (table->req_cond) {
1107
0
                json_destroy(table->ack_cond);
1108
0
                table->ack_cond = table->req_cond;
1109
0
                table->req_cond = NULL;
1110
0
            }
1111
0
        } else {
1112
0
            if (table->req_cond) {
1113
                /* There was an in-flight monitor_cond_change request.  It's no
1114
                 * longer relevant in the restarted FSM, so clear it. */
1115
0
                if (table->new_cond) {
1116
                    /* We will send a new monitor_cond_change with the new
1117
                     * condition.  The previously in-flight condition is
1118
                     * irrelevant and we can just forget about it. */
1119
0
                    json_destroy(table->req_cond);
1120
0
                } else {
1121
                    /* The restarted FSM needs to again send a request for the
1122
                     * previously in-flight condition. */
1123
0
                    table->new_cond = table->req_cond;
1124
0
                }
1125
0
                table->req_cond = NULL;
1126
0
                db->cond_changed = true;
1127
1128
                /* There are two cases:
1129
                 * a. either the server already processed the requested monitor
1130
                 *    condition change but the FSM was restarted before the
1131
                 *    client was notified.  In this case the client should
1132
                 *    clear its local cache because it's out of sync with the
1133
                 *    monitor view on the server side.
1134
                 *
1135
                 * b. OR the server hasn't processed the requested monitor
1136
                 *    condition change yet.
1137
                 *
1138
                 * As there's no easy way to differentiate between the two,
1139
                 * and given that this condition should be rare, reset the
1140
                 * 'last_id', essentially flushing the local cached DB
1141
                 * contents.
1142
                 */
1143
0
                db->last_id = UUID_ZERO;
1144
0
            }
1145
0
        }
1146
0
    }
1147
0
}
1148
1149
static void
1150
ovsdb_cs_send_cond_change(struct ovsdb_cs *cs)
1151
0
{
1152
    /* When 'cs->request_id' is not NULL, there is an outstanding
1153
     * conditional monitoring update request that we have not heard
1154
     * from the server yet. Don't generate another request in this case. */
1155
0
    if (!jsonrpc_session_is_connected(cs->session)
1156
0
        || cs->data.monitor_version == 1
1157
0
        || cs->request_id) {
1158
0
        return;
1159
0
    }
1160
1161
0
    struct jsonrpc_msg *msg = ovsdb_cs_db_compose_cond_change(&cs->data);
1162
0
    if (msg) {
1163
0
        cs->request_id = json_clone(msg->id);
1164
0
        jsonrpc_session_send(cs->session, msg);
1165
0
    }
1166
0
}
1167

1168
/* Database change awareness. */
1169
1170
/* By default, or if 'set_db_change_aware' is true, 'cs' will send
1171
 * 'set_db_change_aware' request to the server after receiving the _SERVER data
1172
 * (when the server supports it), which is useful for clients that intends to
1173
 * keep long connections to the server.  Otherwise, 'cs' will not send the
1174
 * 'set_db_change_aware' request, which is more reasonable for short-lived
1175
 * connections to avoid unnecessary processing at the server side and possible
1176
 * error handling due to connections being closed by the clients before the
1177
 * responses are sent by the server. */
1178
void
1179
ovsdb_cs_set_db_change_aware(struct ovsdb_cs *cs, bool set_db_change_aware)
1180
0
{
1181
0
    cs->set_db_change_aware = set_db_change_aware;
1182
0
}
1183

1184
/* Clustered servers. */
1185
1186
/* By default, or if 'leader_only' is true, when 'cs' connects to a clustered
1187
 * database, the CS layer will avoid servers other than the cluster
1188
 * leader. This ensures that any data that it reads and reports is up-to-date.
1189
 * If 'leader_only' is false, the CS layer will accept any server in the
1190
 * cluster, which means that for read-only transactions it can report and act
1191
 * on stale data (transactions that modify the database are always serialized
1192
 * even with false 'leader_only').  Refer to Understanding Cluster Consistency
1193
 * in ovsdb(7) for more information. */
1194
void
1195
ovsdb_cs_set_leader_only(struct ovsdb_cs *cs, bool leader_only)
1196
0
{
1197
0
    cs->leader_only = leader_only;
1198
0
    if (leader_only && cs->server.monitor_version) {
1199
0
        ovsdb_cs_check_server_db(cs);
1200
0
    }
1201
0
}
1202
1203
/* Set whether the order of remotes should be shuffled, when there is more than
1204
 * one remote.  The setting doesn't take effect until the next time when
1205
 * ovsdb_cs_set_remote() is called. */
1206
void
1207
ovsdb_cs_set_shuffle_remotes(struct ovsdb_cs *cs, bool shuffle)
1208
0
{
1209
0
    cs->shuffle_remotes = shuffle;
1210
0
}
1211
1212
/* Reset min_index to 0. This prevents a situation where the client
1213
 * thinks all databases have stale data, when they actually have all
1214
 * been destroyed and rebuilt from scratch.
1215
 */
1216
void
1217
ovsdb_cs_reset_min_index(struct ovsdb_cs *cs)
1218
0
{
1219
0
    cs->min_index = 0;
1220
0
}
1221

1222
/* Database locks. */
1223
1224
static struct jsonrpc_msg *
1225
ovsdb_cs_db_set_lock(struct ovsdb_cs_db *db, const char *lock_name)
1226
0
{
1227
0
    if (db->lock_name
1228
0
        && (!lock_name || strcmp(lock_name, db->lock_name))) {
1229
        /* Release previous lock. */
1230
0
        struct jsonrpc_msg *msg = ovsdb_cs_db_compose_unlock_request(db);
1231
0
        free(db->lock_name);
1232
0
        db->lock_name = NULL;
1233
0
        db->is_lock_contended = false;
1234
0
        return msg;
1235
0
    }
1236
1237
0
    if (lock_name && !db->lock_name) {
1238
        /* Acquire new lock. */
1239
0
        db->lock_name = xstrdup(lock_name);
1240
0
        return ovsdb_cs_db_compose_lock_request(db);
1241
0
    }
1242
1243
0
    return NULL;
1244
0
}
1245
1246
/* If 'lock_name' is nonnull, configures 'cs' to obtain the named lock from the
1247
 * database server and to prevent modifying the database when the lock cannot
1248
 * be acquired (that is, when another client has the same lock).
1249
 *
1250
 * If 'lock_name' is NULL, drops the locking requirement and releases the
1251
 * lock. */
1252
void
1253
ovsdb_cs_set_lock(struct ovsdb_cs *cs, const char *lock_name)
1254
0
{
1255
0
    for (;;) {
1256
0
        struct jsonrpc_msg *msg = ovsdb_cs_db_set_lock(&cs->data, lock_name);
1257
0
        if (!msg) {
1258
0
            break;
1259
0
        }
1260
0
        if (cs->session) {
1261
0
            jsonrpc_session_send(cs->session, msg);
1262
0
        } else {
1263
0
            jsonrpc_msg_destroy(msg);
1264
0
        }
1265
0
    }
1266
0
}
1267
1268
/* Returns the name of the lock that 'cs' is trying to obtain, or NULL if none
1269
 * is configured. */
1270
const char *
1271
ovsdb_cs_get_lock(const struct ovsdb_cs *cs)
1272
0
{
1273
0
    return cs->data.lock_name;
1274
0
}
1275
1276
/* Returns true if 'cs' is configured to obtain a lock and owns that lock,
1277
 * false if it doesn't own the lock or isn't configured to obtain one.
1278
 *
1279
 * Locking and unlocking happens asynchronously from the database client's
1280
 * point of view, so the information is only useful for optimization (e.g. if
1281
 * the client doesn't have the lock then there's no point in trying to write to
1282
 * the database). */
1283
bool
1284
ovsdb_cs_has_lock(const struct ovsdb_cs *cs)
1285
0
{
1286
0
    return cs->data.has_lock;
1287
0
}
1288
1289
/* Returns true if 'cs' is configured to obtain a lock but the database server
1290
 * has indicated that some other client already owns the requested lock. */
1291
bool
1292
ovsdb_cs_is_lock_contended(const struct ovsdb_cs *cs)
1293
0
{
1294
0
    return cs->data.is_lock_contended;
1295
0
}
1296
1297
static void
1298
ovsdb_cs_db_update_has_lock(struct ovsdb_cs_db *db, bool new_has_lock)
1299
0
{
1300
0
    if (new_has_lock && !db->has_lock) {
1301
0
        ovsdb_cs_db_add_event(db, OVSDB_CS_EVENT_TYPE_LOCKED);
1302
0
        db->is_lock_contended = false;
1303
0
    }
1304
0
    db->has_lock = new_has_lock;
1305
0
}
1306
1307
static bool
1308
ovsdb_cs_db_process_lock_replies(struct ovsdb_cs_db *db,
1309
                                  const struct jsonrpc_msg *msg)
1310
0
{
1311
0
    if (msg->type == JSONRPC_REPLY
1312
0
        && db->lock_request_id
1313
0
        && json_equal(db->lock_request_id, msg->id)) {
1314
        /* Reply to our "lock" request. */
1315
0
        ovsdb_cs_db_parse_lock_reply(db, msg->result);
1316
0
        return true;
1317
0
    }
1318
1319
0
    if (msg->type == JSONRPC_NOTIFY) {
1320
0
        if (!strcmp(msg->method, "locked")) {
1321
            /* We got our lock. */
1322
0
            return ovsdb_cs_db_parse_lock_notify(db, msg->params, true);
1323
0
        } else if (!strcmp(msg->method, "stolen")) {
1324
            /* Someone else stole our lock. */
1325
0
            return ovsdb_cs_db_parse_lock_notify(db, msg->params, false);
1326
0
        }
1327
0
    }
1328
1329
0
    return false;
1330
0
}
1331
1332
static struct jsonrpc_msg *
1333
ovsdb_cs_db_compose_lock_request__(struct ovsdb_cs_db *db,
1334
                                    const char *method)
1335
0
{
1336
0
    ovsdb_cs_db_update_has_lock(db, false);
1337
1338
0
    json_destroy(db->lock_request_id);
1339
0
    db->lock_request_id = NULL;
1340
1341
0
    struct json *params = json_array_create_1(json_string_create(
1342
0
                                                  db->lock_name));
1343
0
    return jsonrpc_create_request(method, params, NULL);
1344
0
}
1345
1346
static struct jsonrpc_msg *
1347
ovsdb_cs_db_compose_lock_request(struct ovsdb_cs_db *db)
1348
0
{
1349
0
    struct jsonrpc_msg *msg = ovsdb_cs_db_compose_lock_request__(db, "lock");
1350
0
    db->lock_request_id = json_clone(msg->id);
1351
0
    return msg;
1352
0
}
1353
1354
static struct jsonrpc_msg *
1355
ovsdb_cs_db_compose_unlock_request(struct ovsdb_cs_db *db)
1356
0
{
1357
0
    return ovsdb_cs_db_compose_lock_request__(db, "unlock");
1358
0
}
1359
1360
static void
1361
ovsdb_cs_db_parse_lock_reply(struct ovsdb_cs_db *db,
1362
                              const struct json *result)
1363
0
{
1364
0
    bool got_lock;
1365
1366
0
    json_destroy(db->lock_request_id);
1367
0
    db->lock_request_id = NULL;
1368
1369
0
    if (result->type == JSON_OBJECT) {
1370
0
        const struct json *locked;
1371
1372
0
        locked = shash_find_data(json_object(result), "locked");
1373
0
        got_lock = locked && locked->type == JSON_TRUE;
1374
0
    } else {
1375
0
        got_lock = false;
1376
0
    }
1377
1378
0
    ovsdb_cs_db_update_has_lock(db, got_lock);
1379
0
    if (!got_lock) {
1380
0
        db->is_lock_contended = true;
1381
0
    }
1382
0
}
1383
1384
static bool
1385
ovsdb_cs_db_parse_lock_notify(struct ovsdb_cs_db *db,
1386
                               const struct json *params,
1387
                               bool new_has_lock)
1388
0
{
1389
0
    if (db->lock_name
1390
0
        && params->type == JSON_ARRAY
1391
0
        && json_array_size(params) > 0
1392
0
        && json_array_at(params, 0)->type == JSON_STRING) {
1393
0
        const char *lock_name = json_string(json_array_at(params, 0));
1394
1395
0
        if (!strcmp(db->lock_name, lock_name)) {
1396
0
            ovsdb_cs_db_update_has_lock(db, new_has_lock);
1397
0
            if (!new_has_lock) {
1398
0
                db->is_lock_contended = true;
1399
0
            }
1400
0
            return true;
1401
0
        }
1402
0
    }
1403
0
    return false;
1404
0
}
1405

1406
/* Transactions. */
1407
1408
static bool
1409
ovsdb_cs_db_txn_process_reply(struct ovsdb_cs *cs,
1410
                              const struct jsonrpc_msg *reply)
1411
0
{
1412
0
    bool found = ovsdb_cs_forget_transaction(cs, reply->id);
1413
0
    if (found) {
1414
0
        struct ovsdb_cs_event *event
1415
0
            = ovsdb_cs_db_add_event(&cs->data, OVSDB_CS_EVENT_TYPE_TXN_REPLY);
1416
0
        event->txn_reply = jsonrpc_msg_clone(reply);
1417
0
    }
1418
0
    return found;
1419
0
}
1420
1421
/* Returns true if 'cs' can be sent a transaction now, false otherwise.  This
1422
 * is useful for optimization: there is no point in composing and sending a
1423
 * transaction if it returns false. */
1424
bool
1425
ovsdb_cs_may_send_transaction(const struct ovsdb_cs *cs)
1426
0
{
1427
0
    return (cs->session != NULL
1428
0
            && cs->state == CS_S_MONITORING
1429
0
            && (!cs->data.lock_name || ovsdb_cs_has_lock(cs)));
1430
0
}
1431
1432
/* Attempts to send a transaction with the specified 'operations' to 'cs''s
1433
 * server.  On success, returns the request ID; the caller must eventually free
1434
 * it.  On failure, returns NULL. */
1435
struct json * OVS_WARN_UNUSED_RESULT
1436
ovsdb_cs_send_transaction(struct ovsdb_cs *cs, struct json *operations)
1437
0
{
1438
0
    if (!ovsdb_cs_may_send_transaction(cs)) {
1439
0
        json_destroy(operations);
1440
0
        return NULL;
1441
0
    }
1442
1443
0
    if (cs->data.lock_name) {
1444
0
        struct json *assertion = json_object_create();
1445
0
        json_object_put_string(assertion, "op", "assert");
1446
0
        json_object_put_string(assertion, "lock", cs->data.lock_name);
1447
0
        json_array_add(operations, assertion);
1448
0
    }
1449
1450
0
    struct json *request_id;
1451
0
    struct jsonrpc_msg *request = jsonrpc_create_request(
1452
0
        "transact", operations, &request_id);
1453
0
    int error = jsonrpc_session_send(cs->session, request);
1454
0
    if (error) {
1455
0
        json_destroy(request_id);
1456
0
        return NULL;
1457
0
    }
1458
1459
0
    if (cs->n_txns >= cs->allocated_txns) {
1460
0
        cs->txns = x2nrealloc(cs->txns, &cs->allocated_txns,
1461
0
                              sizeof *cs->txns);
1462
0
    }
1463
0
    cs->txns[cs->n_txns++] = request_id;
1464
0
    return json_clone(request_id);
1465
0
}
1466
1467
/* Makes 'cs' drop its record of transaction 'request_id'.  If a reply arrives
1468
 * for it later (which it will, unless the connection drops in the meantime),
1469
 * it won't be reported through an event.
1470
 *
1471
 * Returns true if 'request_id' was known, false otherwise. */
1472
bool
1473
ovsdb_cs_forget_transaction(struct ovsdb_cs *cs, const struct json *request_id)
1474
0
{
1475
0
    for (size_t i = 0; i < cs->n_txns; i++) {
1476
0
        if (json_equal(request_id, cs->txns[i])) {
1477
0
            json_destroy(cs->txns[i]);
1478
0
            cs->txns[i] = cs->txns[--cs->n_txns];
1479
0
            return true;
1480
0
        }
1481
0
    }
1482
0
    return false;
1483
0
}
1484

1485
static void
1486
ovsdb_cs_send_schema_request(struct ovsdb_cs *cs,
1487
                              struct ovsdb_cs_db *db)
1488
0
{
1489
0
    ovsdb_cs_send_request(cs, jsonrpc_create_request(
1490
0
                               "get_schema",
1491
0
                               json_array_create_1(json_string_create(
1492
0
                                                       db->db_name)),
1493
0
                               NULL));
1494
0
}
1495
1496
static void
1497
ovsdb_cs_send_db_change_aware(struct ovsdb_cs *cs)
1498
0
{
1499
0
    struct jsonrpc_msg *msg = jsonrpc_create_request(
1500
0
        "set_db_change_aware", json_array_create_1(json_boolean_create(true)),
1501
0
        NULL);
1502
0
    jsonrpc_session_send(cs->session, msg);
1503
0
}
1504
1505
static void
1506
ovsdb_cs_send_monitor_request(struct ovsdb_cs *cs, struct ovsdb_cs_db *db,
1507
                              int version)
1508
0
{
1509
0
    struct json *mrs = db->ops->compose_monitor_requests(
1510
0
        db->schema, db->ops_aux);
1511
    /* XXX handle failure */
1512
0
    ovs_assert(mrs->type == JSON_OBJECT);
1513
1514
0
    if (version > 1) {
1515
0
        struct ovsdb_cs_db_table *table;
1516
0
        HMAP_FOR_EACH (table, hmap_node, &db->tables) {
1517
0
            if (table->ack_cond) {
1518
0
                struct json *mr = shash_find_data(json_object(mrs),
1519
0
                                                  table->name);
1520
0
                if (!mr) {
1521
0
                    mr = json_array_create_empty();
1522
0
                    json_object_put(mrs, table->name, mr);
1523
0
                }
1524
0
                ovs_assert(mr->type == JSON_ARRAY);
1525
1526
0
                struct json *mr0;
1527
0
                if (json_array_size(mr) == 0) {
1528
0
                    mr0 = json_object_create();
1529
0
                    json_object_put(mr0, "columns", json_array_create_empty());
1530
0
                    json_array_add(mr, mr0);
1531
0
                } else {
1532
0
                    mr0 = CONST_CAST(struct json *, json_array_at(mr, 0));
1533
0
                }
1534
0
                ovs_assert(mr0->type == JSON_OBJECT);
1535
1536
0
                json_object_put(mr0, "where",
1537
0
                                json_clone(table->ack_cond));
1538
0
            }
1539
0
        }
1540
0
    }
1541
1542
0
    const char *method = (version == 1 ? "monitor"
1543
0
                          : version == 2 ? "monitor_cond"
1544
0
                          : "monitor_cond_since");
1545
0
    struct json *params = json_array_create_3(
1546
0
                              json_string_create(db->db_name),
1547
0
                              json_clone(db->monitor_id),
1548
0
                              mrs);
1549
0
    if (version == 3) {
1550
0
        json_array_add(params, json_string_create_uuid(&db->last_id));
1551
0
    }
1552
0
    ovsdb_cs_send_request(cs, jsonrpc_create_request(method, params, NULL));
1553
0
}
1554
1555
static void
1556
log_parse_update_error(struct ovsdb_error *error)
1557
0
{
1558
0
    if (!VLOG_DROP_WARN(&syntax_rl)) {
1559
0
        char *s = ovsdb_error_to_string(error);
1560
0
        VLOG_WARN_RL(&syntax_rl, "%s", s);
1561
0
        free(s);
1562
0
    }
1563
0
    ovsdb_error_destroy(error);
1564
0
}
1565
1566
static void
1567
ovsdb_cs_db_add_update(struct ovsdb_cs_db *db,
1568
                       const struct json *table_updates, int version,
1569
                       bool clear, bool monitor_reply)
1570
0
{
1571
0
    struct ovsdb_cs_event *event = ovsdb_cs_db_add_event(
1572
0
        db, OVSDB_CS_EVENT_TYPE_UPDATE);
1573
0
    event->update = (struct ovsdb_cs_update_event) {
1574
0
        .table_updates = json_clone(table_updates),
1575
0
        .clear = clear,
1576
0
        .monitor_reply = monitor_reply,
1577
0
        .version = version,
1578
0
        .last_id = db->last_id,
1579
0
    };
1580
0
}
1581
1582
static void
1583
ovsdb_cs_db_parse_monitor_reply(struct ovsdb_cs_db *db,
1584
                                const struct json *result, int version)
1585
0
{
1586
0
    const struct json *table_updates;
1587
0
    bool clear;
1588
0
    if (version == 3) {
1589
0
        if (result->type != JSON_ARRAY || json_array_size(result) != 3
1590
0
            || (json_array_at(result, 0)->type != JSON_TRUE &&
1591
0
                json_array_at(result, 0)->type != JSON_FALSE)
1592
0
            || json_array_at(result, 1)->type != JSON_STRING
1593
0
            || !uuid_from_string(&db->last_id,
1594
0
                                 json_string(json_array_at(result, 1)))) {
1595
0
            struct ovsdb_error *error = ovsdb_syntax_error(
1596
0
                result, NULL, "bad monitor_cond_since reply format");
1597
0
            log_parse_update_error(error);
1598
0
            return;
1599
0
        }
1600
1601
0
        bool found = json_boolean(json_array_at(result, 0));
1602
0
        clear = !found;
1603
0
        table_updates = json_array_at(result, 2);
1604
0
    } else {
1605
0
        clear = true;
1606
0
        table_updates = result;
1607
0
    }
1608
1609
0
    ovsdb_cs_db_add_update(db, table_updates, version, clear, true);
1610
0
}
1611
1612
static bool
1613
ovsdb_cs_db_parse_update_rpc(struct ovsdb_cs_db *db,
1614
                             const struct jsonrpc_msg *msg)
1615
0
{
1616
0
    if (msg->type != JSONRPC_NOTIFY) {
1617
0
        return false;
1618
0
    }
1619
1620
0
    int version = (!strcmp(msg->method, "update") ? 1
1621
0
                   : !strcmp(msg->method, "update2") ? 2
1622
0
                   : !strcmp(msg->method, "update3") ? 3
1623
0
                   : 0);
1624
0
    if (!version) {
1625
0
        return false;
1626
0
    }
1627
1628
0
    struct json *params = msg->params;
1629
0
    int n = version == 3 ? 3 : 2;
1630
0
    if (params->type != JSON_ARRAY || json_array_size(params) != n) {
1631
0
        struct ovsdb_error *error = ovsdb_syntax_error(
1632
0
            params, NULL, "%s must be an array with %u elements.",
1633
0
            msg->method, n);
1634
0
        log_parse_update_error(error);
1635
0
        return false;
1636
0
    }
1637
1638
0
    if (!json_equal(json_array_at(params, 0), db->monitor_id)) {
1639
0
        return false;
1640
0
    }
1641
1642
0
    if (version == 3) {
1643
0
        const char *last_id = json_string(json_array_at(params, 1));
1644
0
        if (!uuid_from_string(&db->last_id, last_id)) {
1645
0
            struct ovsdb_error *error = ovsdb_syntax_error(
1646
0
                params, NULL, "Last-id %s is not in UUID format.", last_id);
1647
0
            log_parse_update_error(error);
1648
0
            return false;
1649
0
        }
1650
0
    }
1651
1652
0
    const struct json *table_updates = json_array_at(params,
1653
0
                                                     version == 3 ? 2 : 1);
1654
0
    ovsdb_cs_db_add_update(db, table_updates, version, false, false);
1655
0
    return true;
1656
0
}
1657
1658
static bool
1659
ovsdb_cs_handle_monitor_canceled(struct ovsdb_cs *cs,
1660
                                 struct ovsdb_cs_db *db,
1661
                                 const struct jsonrpc_msg *msg)
1662
0
{
1663
0
    if (msg->type != JSONRPC_NOTIFY
1664
0
        || strcmp(msg->method, "monitor_canceled")
1665
0
        || msg->params->type != JSON_ARRAY
1666
0
        || json_array_size(msg->params) != 1
1667
0
        || !json_equal(json_array_at(msg->params, 0), db->monitor_id)) {
1668
0
        return false;
1669
0
    }
1670
1671
0
    db->monitor_version = 0;
1672
1673
    /* Cancel the other monitor and restart the FSM from the top.
1674
     *
1675
     * Maybe a more sophisticated response would be better in some cases, but
1676
     * it doesn't seem worth optimizing yet.  (Although this is already more
1677
     * sophisticated than just dropping the connection and reconnecting.) */
1678
0
    struct ovsdb_cs_db *other_db
1679
0
        = db == &cs->data ? &cs->server : &cs->data;
1680
0
    if (other_db->monitor_version) {
1681
0
        jsonrpc_session_send(
1682
0
            cs->session,
1683
0
            jsonrpc_create_request(
1684
0
                "monitor_cancel",
1685
0
                json_array_create_1(json_clone(other_db->monitor_id)), NULL));
1686
0
        other_db->monitor_version = 0;
1687
0
    }
1688
0
    ovsdb_cs_restart_fsm(cs);
1689
1690
0
    return true;
1691
0
}
1692

1693
/* The _Server database.
1694
 *
1695
 * We replicate the Database table in the _Server database because this is the
1696
 * only way to find out properties we need to know for clustering, such as
1697
 * whether a database is clustered at all and whether this server is the
1698
 * leader.
1699
 *
1700
 * This code implements a kind of simple IDL-like layer. */
1701
1702
struct server_column {
1703
    const char *name;
1704
    struct ovsdb_type type;
1705
};
1706
enum server_column_index {
1707
    COL_NAME,
1708
    COL_MODEL,
1709
    COL_CONNECTED,
1710
    COL_LEADER,
1711
    COL_SCHEMA,
1712
    COL_CID,
1713
    COL_INDEX,
1714
};
1715
#define OPTIONAL_COLUMN(TYPE) \
1716
    {                                           \
1717
        .key = OVSDB_BASE_##TYPE##_INIT,        \
1718
        .value = OVSDB_BASE_VOID_INIT,          \
1719
        .n_min = 0,                             \
1720
        .n_max = 1                              \
1721
    }
1722
static const struct server_column server_columns[] = {
1723
    [COL_NAME] = {"name",  OPTIONAL_COLUMN(STRING) },
1724
    [COL_MODEL] = {"model", OPTIONAL_COLUMN(STRING) },
1725
    [COL_CONNECTED] = {"connected", OPTIONAL_COLUMN(BOOLEAN) },
1726
    [COL_LEADER] = {"leader", OPTIONAL_COLUMN(BOOLEAN) },
1727
    [COL_SCHEMA] = {"schema", OPTIONAL_COLUMN(STRING) },
1728
    [COL_CID] = {"cid", OPTIONAL_COLUMN(UUID) },
1729
    [COL_INDEX] = {"index", OPTIONAL_COLUMN(INTEGER) },
1730
};
1731
0
#define N_SERVER_COLUMNS ARRAY_SIZE(server_columns)
1732
struct server_row {
1733
    struct hmap_node hmap_node;
1734
    struct uuid uuid;
1735
    struct ovsdb_datum data[N_SERVER_COLUMNS];
1736
};
1737
1738
static void
1739
server_row_destroy(struct server_row *row)
1740
0
{
1741
0
    if (row) {
1742
0
        for (size_t i = 0; i < N_SERVER_COLUMNS; i++) {
1743
0
            ovsdb_datum_destroy(&row->data[i], &server_columns[i].type);
1744
0
        }
1745
0
        free(row);
1746
0
    }
1747
0
}
1748
1749
static struct server_row *
1750
ovsdb_cs_find_server_row(struct ovsdb_cs *cs, const struct uuid *uuid)
1751
0
{
1752
0
    struct server_row *row;
1753
0
    HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) {
1754
0
        if (uuid_equals(uuid, &row->uuid)) {
1755
0
            return row;
1756
0
        }
1757
0
    }
1758
0
    return NULL;
1759
0
}
1760
1761
static void
1762
ovsdb_cs_delete_server_row(struct ovsdb_cs *cs, struct server_row *row)
1763
0
{
1764
0
    hmap_remove(&cs->server_rows, &row->hmap_node);
1765
0
    server_row_destroy(row);
1766
0
}
1767
1768
static struct server_row *
1769
ovsdb_cs_insert_server_row(struct ovsdb_cs *cs, const struct uuid *uuid)
1770
0
{
1771
0
    struct server_row *row = xmalloc(sizeof *row);
1772
0
    hmap_insert(&cs->server_rows, &row->hmap_node, uuid_hash(uuid));
1773
0
    row->uuid = *uuid;
1774
0
    for (size_t i = 0; i < N_SERVER_COLUMNS; i++) {
1775
0
        ovsdb_datum_init_default(&row->data[i], &server_columns[i].type);
1776
0
    }
1777
0
    return row;
1778
0
}
1779
1780
static void
1781
ovsdb_cs_update_server_row(struct server_row *row,
1782
                           const struct shash *update, bool xor)
1783
0
{
1784
0
    for (size_t i = 0; i < N_SERVER_COLUMNS; i++) {
1785
0
        const struct server_column *column = &server_columns[i];
1786
0
        struct shash_node *node = shash_find(update, column->name);
1787
0
        if (!node) {
1788
0
            continue;
1789
0
        }
1790
0
        const struct json *json = node->data;
1791
1792
0
        struct ovsdb_datum *old = &row->data[i];
1793
0
        struct ovsdb_datum new;
1794
0
        if (!xor) {
1795
0
            struct ovsdb_error *error = ovsdb_datum_from_json(
1796
0
                &new, &column->type, json, NULL);
1797
0
            if (error) {
1798
0
                ovsdb_error_destroy(error);
1799
0
                continue;
1800
0
            }
1801
0
        } else {
1802
0
            struct ovsdb_datum diff;
1803
0
            struct ovsdb_error *error = ovsdb_transient_datum_from_json(
1804
0
                &diff, &column->type, json);
1805
0
            if (error) {
1806
0
                ovsdb_error_destroy(error);
1807
0
                continue;
1808
0
            }
1809
1810
0
            error = ovsdb_datum_apply_diff(&new, old, &diff, &column->type);
1811
0
            if (error) {
1812
0
                ovsdb_error_destroy(error);
1813
0
                ovsdb_datum_destroy(&new, &column->type);
1814
0
                continue;
1815
0
            }
1816
0
            ovsdb_datum_destroy(&diff, &column->type);
1817
0
        }
1818
1819
0
        ovsdb_datum_destroy(&row->data[i], &column->type);
1820
0
        row->data[i] = new;
1821
0
    }
1822
0
}
1823
1824
static void
1825
ovsdb_cs_clear_server_rows(struct ovsdb_cs *cs)
1826
0
{
1827
0
    struct server_row *row;
1828
0
    HMAP_FOR_EACH_SAFE (row, hmap_node, &cs->server_rows) {
1829
0
        ovsdb_cs_delete_server_row(cs, row);
1830
0
    }
1831
0
}
1832
1833
static void log_parse_update_error(struct ovsdb_error *);
1834
1835
static void
1836
ovsdb_cs_process_server_event(struct ovsdb_cs *cs,
1837
                              const struct ovsdb_cs_event *event)
1838
0
{
1839
0
    ovs_assert(event->type == OVSDB_CS_EVENT_TYPE_UPDATE);
1840
1841
0
    const struct ovsdb_cs_update_event *update = &event->update;
1842
0
    struct ovsdb_cs_db_update *du;
1843
0
    struct ovsdb_error *error = ovsdb_cs_parse_db_update(
1844
0
        update->table_updates, update->version, &du);
1845
0
    if (error) {
1846
0
        log_parse_update_error(error);
1847
0
        return;
1848
0
    }
1849
1850
0
    if (update->clear) {
1851
0
        ovsdb_cs_clear_server_rows(cs);
1852
0
    }
1853
1854
0
    const struct ovsdb_cs_table_update *tu = ovsdb_cs_db_update_find_table(
1855
0
        du, "Database");
1856
0
    if (tu) {
1857
0
        for (size_t i = 0; i < tu->n; i++) {
1858
0
            const struct ovsdb_cs_row_update *ru = &tu->row_updates[i];
1859
0
            struct server_row *row
1860
0
                = ovsdb_cs_find_server_row(cs, &ru->row_uuid);
1861
0
            if (ru->type == OVSDB_CS_ROW_DELETE) {
1862
0
                ovsdb_cs_delete_server_row(cs, row);
1863
0
            } else {
1864
0
                if (!row) {
1865
0
                    row = ovsdb_cs_insert_server_row(cs, &ru->row_uuid);
1866
0
                }
1867
0
                ovsdb_cs_update_server_row(row, ru->columns,
1868
0
                                           ru->type == OVSDB_CS_ROW_XOR);
1869
0
            }
1870
0
        }
1871
0
    }
1872
1873
0
    ovsdb_cs_db_update_destroy(du);
1874
0
}
1875
1876
static const char *
1877
server_column_get_string(const struct server_row *row,
1878
                         enum server_column_index index,
1879
                         const char *default_value)
1880
0
{
1881
0
    ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_STRING);
1882
0
    const struct ovsdb_datum *d = &row->data[index];
1883
0
    return d->n == 1 ? json_string(d->keys[0].s) : default_value;
1884
0
}
1885
1886
static bool
1887
server_column_get_bool(const struct server_row *row,
1888
                       enum server_column_index index,
1889
                       bool default_value)
1890
0
{
1891
0
    ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_BOOLEAN);
1892
0
    const struct ovsdb_datum *d = &row->data[index];
1893
0
    return d->n == 1 ? d->keys[0].boolean : default_value;
1894
0
}
1895
1896
static uint64_t
1897
server_column_get_int(const struct server_row *row,
1898
                      enum server_column_index index,
1899
                      uint64_t default_value)
1900
0
{
1901
0
    ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_INTEGER);
1902
0
    const struct ovsdb_datum *d = &row->data[index];
1903
0
    return d->n == 1 ? d->keys[0].integer : default_value;
1904
0
}
1905
1906
static const struct uuid *
1907
server_column_get_uuid(const struct server_row *row,
1908
                       enum server_column_index index,
1909
                       const struct uuid *default_value)
1910
0
{
1911
0
    ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_UUID);
1912
0
    const struct ovsdb_datum *d = &row->data[index];
1913
0
    return d->n == 1 ? &d->keys[0].uuid : default_value;
1914
0
}
1915
1916
static const struct server_row *
1917
ovsdb_find_server_row(struct ovsdb_cs *cs)
1918
0
{
1919
0
    const struct server_row *row;
1920
0
    HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) {
1921
0
        const struct uuid *cid = server_column_get_uuid(row, COL_CID, NULL);
1922
0
        const char *name = server_column_get_string(row, COL_NAME, NULL);
1923
0
        if (uuid_is_zero(&cs->cid)
1924
0
            ? (name && !strcmp(cs->data.db_name, name))
1925
0
            : (cid && uuid_equals(cid, &cs->cid))) {
1926
0
            return row;
1927
0
        }
1928
0
    }
1929
0
    return NULL;
1930
0
}
1931
1932
static void OVS_UNUSED
1933
ovsdb_log_server_rows(const struct ovsdb_cs *cs)
1934
0
{
1935
0
    int row_num = 0;
1936
0
    const struct server_row *row;
1937
0
    HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) {
1938
0
        struct ds s = DS_EMPTY_INITIALIZER;
1939
0
        for (size_t i = 0; i < N_SERVER_COLUMNS; i++) {
1940
0
            ds_put_format(&s, " %s=", server_columns[i].name);
1941
0
            if (i == COL_SCHEMA) {
1942
0
                ds_put_format(&s, "...");
1943
0
            } else {
1944
0
                ovsdb_datum_to_string(&row->data[i], &server_columns[i].type,
1945
0
                                      &s);
1946
0
            }
1947
0
        }
1948
0
        VLOG_INFO("row %d:%s", row_num++, ds_cstr(&s));
1949
0
        ds_destroy(&s);
1950
0
    }
1951
0
}
1952
1953
static bool
1954
ovsdb_cs_check_server_db__(struct ovsdb_cs *cs)
1955
0
{
1956
0
    struct ovsdb_cs_event *event;
1957
0
    LIST_FOR_EACH_POP (event, list_node, &cs->server.events) {
1958
0
        ovsdb_cs_process_server_event(cs, event);
1959
0
        ovsdb_cs_event_destroy(event);
1960
0
    }
1961
1962
0
    const struct server_row *db_row = ovsdb_find_server_row(cs);
1963
0
    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1964
0
    const char *server_name = jsonrpc_session_get_name(cs->session);
1965
0
    if (!db_row) {
1966
0
        VLOG_INFO_RL(&rl, "%s: server does not have %s database",
1967
0
                     server_name, cs->data.db_name);
1968
0
        return false;
1969
0
    }
1970
1971
0
    bool ok = false;
1972
0
    const char *model = server_column_get_string(db_row, COL_MODEL, "");
1973
0
    const char *schema = server_column_get_string(db_row, COL_SCHEMA, NULL);
1974
0
    bool connected = server_column_get_bool(db_row, COL_CONNECTED, false);
1975
0
    if (!strcmp(model, "clustered")) {
1976
0
        bool leader = server_column_get_bool(db_row, COL_LEADER, false);
1977
0
        uint64_t index = server_column_get_int(db_row, COL_INDEX, 0);
1978
1979
0
        if (!schema) {
1980
0
            VLOG_INFO("%s: clustered database server has not yet joined "
1981
0
                      "cluster; trying another server", server_name);
1982
0
        } else if (!connected) {
1983
0
            VLOG_INFO("%s: clustered database server is disconnected "
1984
0
                      "from cluster; trying another server", server_name);
1985
0
        } else if (cs->leader_only && !leader) {
1986
0
            VLOG_INFO("%s: clustered database server is not cluster "
1987
0
                      "leader; trying another server", server_name);
1988
0
        } else if (index < cs->min_index) {
1989
0
            VLOG_WARN("%s: clustered database server has stale data; "
1990
0
                      "trying another server", server_name);
1991
0
        } else {
1992
0
            cs->min_index = index;
1993
0
            ok = true;
1994
0
        }
1995
0
    } else if (!strcmp(model, "relay")) {
1996
0
        if (!schema) {
1997
0
            VLOG_INFO("%s: relay database server has not yet connected to the "
1998
0
                      "relay source; trying another server", server_name);
1999
0
        } else if (!connected) {
2000
0
            VLOG_INFO("%s: relay database server is disconnected from the "
2001
0
                      "relay source; trying another server", server_name);
2002
0
        } else if (cs->leader_only) {
2003
0
            VLOG_INFO("%s: relay database server cannot be a leader; "
2004
0
                      "trying another server", server_name);
2005
0
        } else {
2006
0
            ok = true;
2007
0
        }
2008
0
    } else {
2009
0
        if (!schema) {
2010
0
            VLOG_INFO("%s: missing database schema", server_name);
2011
0
        } else {
2012
0
            ok = true;
2013
0
        }
2014
0
    }
2015
0
    if (!ok) {
2016
0
        return false;
2017
0
    }
2018
2019
0
    if (cs->state == CS_S_SERVER_MONITOR_REQUESTED) {
2020
0
        json_destroy(cs->data.schema);
2021
0
        cs->data.schema = json_from_string(schema);
2022
0
        if (cs->data.max_version >= 3) {
2023
0
            ovsdb_cs_send_monitor_request(cs, &cs->data, 3);
2024
0
            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_SINCE_REQUESTED);
2025
0
        } else if (cs->data.max_version >= 2) {
2026
0
            ovsdb_cs_send_monitor_request(cs, &cs->data, 2);
2027
0
            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED);
2028
0
        } else {
2029
0
            ovsdb_cs_send_monitor_request(cs, &cs->data, 1);
2030
0
            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED);
2031
0
        }
2032
0
    }
2033
0
    return true;
2034
0
}
2035
2036
static bool
2037
ovsdb_cs_check_server_db(struct ovsdb_cs *cs)
2038
0
{
2039
0
    bool ok = ovsdb_cs_check_server_db__(cs);
2040
0
    if (!ok) {
2041
0
        ovsdb_cs_retry(cs);
2042
0
    }
2043
0
    return ok;
2044
0
}
2045
2046
static struct json *
2047
ovsdb_cs_compose_server_monitor_request(const struct json *schema_json,
2048
                                        void *cs_)
2049
0
{
2050
0
    struct ovsdb_cs *cs = cs_;
2051
0
    struct shash *schema = ovsdb_cs_parse_schema(schema_json);
2052
0
    struct json *monitor_requests = json_object_create();
2053
2054
0
    const char *table_name = "Database";
2055
0
    const struct shash *table_schema
2056
0
        = schema ? shash_find_data(schema, table_name) : NULL;
2057
0
    if (!table_schema) {
2058
0
        VLOG_WARN("%s database lacks %s table "
2059
0
                  "(database needs upgrade?)",
2060
0
                  cs->server.db_name, table_name);
2061
        /* XXX return failure? */
2062
0
    } else {
2063
0
        struct json *columns = json_array_create_empty();
2064
0
        for (size_t j = 0; j < N_SERVER_COLUMNS; j++) {
2065
0
            const struct server_column *column = &server_columns[j];
2066
0
            bool db_has_column = (table_schema &&
2067
0
                                  shash_find(table_schema, column->name));
2068
0
            if (table_schema && !db_has_column) {
2069
0
                VLOG_WARN("%s table in %s database lacks %s column "
2070
0
                          "(database needs upgrade?)",
2071
0
                          table_name, cs->server.db_name, column->name);
2072
0
                continue;
2073
0
            }
2074
0
            json_array_add(columns, json_string_create(column->name));
2075
0
        }
2076
2077
0
        struct json *monitor_request = json_object_create();
2078
0
        json_object_put(monitor_request, "columns", columns);
2079
0
        json_object_put(monitor_requests, table_name,
2080
0
                        json_array_create_1(monitor_request));
2081
0
    }
2082
0
    ovsdb_cs_free_schema(schema);
2083
2084
0
    return monitor_requests;
2085
0
}
2086
2087
static const struct ovsdb_cs_ops ovsdb_cs_server_ops = {
2088
    ovsdb_cs_compose_server_monitor_request
2089
};
2090

2091
static void
2092
log_error(struct ovsdb_error *error)
2093
0
{
2094
0
    char *s = ovsdb_error_to_string_free(error);
2095
0
    VLOG_WARN("error parsing database schema: %s", s);
2096
0
    free(s);
2097
0
}
2098
2099
/* Parses 'schema_json', an OVSDB schema in JSON format as described in RFC
2100
 * 7047, to obtain the names of its rows and columns.  If successful, returns
2101
 * an shash whose keys are table names and whose values are shashes, where each
2102
 * shash contains the names of its table's columns as keys and an ovsdb_type as
2103
 * data.  On failure (due to a parse error), returns NULL.
2104
 *
2105
 * It would also be possible to use the general-purpose OVSDB schema parser in
2106
 * ovsdb-server, but that's overkill, possibly too strict for the current use
2107
 * case, and would require restructuring ovsdb-server to separate the schema
2108
 * code from the rest. */
2109
struct shash *
2110
ovsdb_cs_parse_schema(const struct json *schema_json)
2111
0
{
2112
0
    struct ovsdb_parser parser;
2113
0
    const struct json *tables_json;
2114
0
    struct shash *schema = NULL;
2115
0
    struct ovsdb_error *error;
2116
0
    struct shash_node *node;
2117
2118
0
    ovsdb_parser_init(&parser, schema_json, "database schema");
2119
0
    tables_json = ovsdb_parser_member(&parser, "tables", OP_OBJECT);
2120
0
    error = ovsdb_parser_destroy(&parser);
2121
0
    if (error) {
2122
0
        log_error(error);
2123
0
        return NULL;
2124
0
    }
2125
2126
0
    schema = xmalloc(sizeof *schema);
2127
0
    shash_init(schema);
2128
0
    SHASH_FOR_EACH (node, json_object(tables_json)) {
2129
0
        const char *table_name = node->name;
2130
0
        const struct json *json = node->data;
2131
0
        const struct json *columns_json;
2132
2133
0
        ovsdb_parser_init(&parser, json, "table schema for table %s",
2134
0
                          table_name);
2135
0
        columns_json = ovsdb_parser_member(&parser, "columns", OP_OBJECT);
2136
0
        error = ovsdb_parser_destroy(&parser);
2137
0
        if (error) {
2138
0
            goto error;
2139
0
        }
2140
2141
0
        struct shash *columns = xmalloc(sizeof *columns);
2142
0
        shash_init(columns);
2143
2144
0
        struct shash_node *node2;
2145
0
        SHASH_FOR_EACH (node2, json_object(columns_json)) {
2146
0
            const struct json *column_json = node2->data;
2147
0
            const char *column_name = node2->name;
2148
0
            const struct json *type_json;
2149
0
            struct ovsdb_type *type;
2150
2151
0
            ovsdb_parser_init(&parser, column_json,
2152
0
                              "schema for column %s", column_name);
2153
0
            type_json = ovsdb_parser_member(&parser, "type",
2154
0
                                            OP_STRING | OP_OBJECT);
2155
0
            error = ovsdb_parser_destroy(&parser);
2156
0
            if (error) {
2157
0
                goto error;
2158
0
            }
2159
2160
0
            type = xzalloc(sizeof *type);
2161
0
            error = ovsdb_type_from_json(type, type_json);
2162
0
            if (error) {
2163
0
                free(type);
2164
0
                goto error;
2165
0
            }
2166
2167
0
            if (!shash_add_once(columns, column_name, type)) {
2168
                /* Should never happen, but just in case. */
2169
0
                ovsdb_type_destroy(type);
2170
0
                free(type);
2171
0
            }
2172
0
        }
2173
0
        shash_add(schema, table_name, columns);
2174
0
    }
2175
0
    return schema;
2176
2177
0
error:
2178
0
    log_error(error);
2179
0
    ovsdb_cs_free_schema(schema);
2180
0
    return NULL;
2181
0
}
2182
2183
/* Frees 'schema', which is in the format returned by
2184
 * ovsdb_cs_parse_schema(). */
2185
void
2186
ovsdb_cs_free_schema(struct shash *schema)
2187
0
{
2188
0
    if (schema) {
2189
0
        struct shash_node *node;
2190
2191
0
        SHASH_FOR_EACH_SAFE (node, schema) {
2192
0
            struct shash *columns = node->data;
2193
0
            struct shash_node *node2;
2194
2195
0
            SHASH_FOR_EACH (node2, columns) {
2196
0
                ovsdb_type_destroy(node2->data);
2197
0
            }
2198
0
            shash_destroy_free_data(columns);
2199
0
            free(columns);
2200
2201
0
            shash_delete(schema, node);
2202
0
        }
2203
0
        shash_destroy(schema);
2204
0
        free(schema);
2205
0
    }
2206
0
}
2207

2208
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
2209
ovsdb_cs_parse_row_update1(const struct json *in,
2210
                           struct ovsdb_cs_row_update *out)
2211
0
{
2212
0
    const struct json *old_json, *new_json;
2213
2214
0
    old_json = shash_find_data(json_object(in), "old");
2215
0
    new_json = shash_find_data(json_object(in), "new");
2216
0
    if (old_json && old_json->type != JSON_OBJECT) {
2217
0
        return ovsdb_syntax_error(old_json, NULL,
2218
0
                                  "\"old\" <row> is not object");
2219
0
    } else if (new_json && new_json->type != JSON_OBJECT) {
2220
0
        return ovsdb_syntax_error(new_json, NULL,
2221
0
                                  "\"new\" <row> is not object");
2222
0
    } else if ((old_json != NULL) + (new_json != NULL)
2223
0
               != shash_count(json_object(in))) {
2224
0
        return ovsdb_syntax_error(in, NULL,
2225
0
                                  "<row-update> contains "
2226
0
                                  "unexpected member");
2227
0
    } else if (!old_json && !new_json) {
2228
0
        return ovsdb_syntax_error(in, NULL,
2229
0
                                  "<row-update> missing \"old\" "
2230
0
                                  "and \"new\" members");
2231
0
    }
2232
2233
0
    if (!new_json) {
2234
0
        out->type = OVSDB_CS_ROW_DELETE;
2235
0
        out->columns = json_object(old_json);
2236
0
    } else if (!old_json) {
2237
0
        out->type = OVSDB_CS_ROW_INSERT;
2238
0
        out->columns = json_object(new_json);
2239
0
    } else {
2240
0
        out->type = OVSDB_CS_ROW_UPDATE;
2241
0
        out->columns = json_object(new_json);
2242
0
    }
2243
0
    return NULL;
2244
0
}
2245
2246
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
2247
ovsdb_cs_parse_row_update2(const struct json *in,
2248
                           struct ovsdb_cs_row_update *out)
2249
0
{
2250
0
    const struct shash *object = json_object(in);
2251
0
    if (shash_count(object) != 1) {
2252
0
        return ovsdb_syntax_error(
2253
0
            in, NULL, "<row-update2> has %"PRIuSIZE" members "
2254
0
            "instead of expected 1", shash_count(object));
2255
0
    }
2256
2257
0
    struct shash_node *node = shash_first(object);
2258
0
    const struct json *columns = node->data;
2259
0
    if (!strcmp(node->name, "insert") || !strcmp(node->name, "initial")) {
2260
0
        out->type = OVSDB_CS_ROW_INSERT;
2261
0
    } else if (!strcmp(node->name, "modify")) {
2262
0
        out->type = OVSDB_CS_ROW_XOR;
2263
0
    } else if (!strcmp(node->name, "delete")) {
2264
0
        out->type = OVSDB_CS_ROW_DELETE;
2265
0
        if (columns->type != JSON_NULL) {
2266
0
            return ovsdb_syntax_error(
2267
0
                in, NULL,
2268
0
                "<row-update2> delete operation has unexpected value");
2269
0
        }
2270
0
        return NULL;
2271
0
    } else {
2272
0
        return ovsdb_syntax_error(in, NULL,
2273
0
                                  "<row-update2> has unknown member \"%s\"",
2274
0
                                  node->name);
2275
0
    }
2276
2277
0
    if (columns->type != JSON_OBJECT) {
2278
0
        return ovsdb_syntax_error(
2279
0
            in, NULL,
2280
0
            "<row-update2> \"%s\" operation has unexpected value",
2281
0
            node->name);
2282
0
    }
2283
0
    out->columns = json_object(columns);
2284
2285
0
    return NULL;
2286
0
}
2287
2288
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
2289
ovsdb_cs_parse_row_update(const char *table_name,
2290
                          const struct json *in, int version,
2291
                          struct ovsdb_cs_row_update *out)
2292
0
{
2293
0
    if (in->type != JSON_OBJECT) {
2294
0
        const char *suffix = version > 1 ? "2" : "";
2295
0
        return ovsdb_syntax_error(
2296
0
            in, NULL,
2297
0
            "<table-update%s> for table \"%s\" contains <row-update%s> "
2298
0
            "that is not an object",
2299
0
            suffix, table_name, suffix);
2300
0
    }
2301
2302
0
    return (version == 1
2303
0
            ? ovsdb_cs_parse_row_update1(in, out)
2304
0
            : ovsdb_cs_parse_row_update2(in, out));
2305
0
}
2306
2307
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
2308
ovsdb_cs_parse_table_update(const char *table_name,
2309
                            const struct json *in, int version,
2310
                            struct ovsdb_cs_table_update *out)
2311
0
{
2312
0
    const char *suffix = version > 1 ? "2" : "";
2313
2314
0
    if (in->type != JSON_OBJECT) {
2315
0
        return ovsdb_syntax_error(
2316
0
            in, NULL, "<table-update%s> for table \"%s\" is not an object",
2317
0
            suffix, table_name);
2318
0
    }
2319
0
    struct shash *in_rows = json_object(in);
2320
2321
0
    out->row_updates = xmalloc(shash_count(in_rows) * sizeof *out->row_updates);
2322
2323
0
    const struct shash_node *node;
2324
0
    SHASH_FOR_EACH (node, in_rows) {
2325
0
        const char *row_uuid_string = node->name;
2326
0
        struct uuid row_uuid;
2327
0
        if (!uuid_from_string(&row_uuid, row_uuid_string)) {
2328
0
            return ovsdb_syntax_error(
2329
0
                in, NULL,
2330
0
                "<table-update%s> for table \"%s\" contains "
2331
0
                "bad UUID \"%s\" as member name",
2332
0
                suffix, table_name, row_uuid_string);
2333
0
        }
2334
2335
0
        const struct json *in_ru = node->data;
2336
0
        struct ovsdb_cs_row_update *out_ru = &out->row_updates[out->n++];
2337
0
        *out_ru = (struct ovsdb_cs_row_update) { .row_uuid = row_uuid };
2338
2339
0
        struct ovsdb_error *error = ovsdb_cs_parse_row_update(
2340
0
            table_name, in_ru, version, out_ru);
2341
0
        if (error) {
2342
0
            return error;
2343
0
        }
2344
0
    }
2345
2346
0
    return NULL;
2347
0
}
2348
2349
/* Parses OVSDB <table-updates> or <table-updates2> object 'in' into '*outp'.
2350
 * If successful, sets '*outp' to the new object and returns NULL.  On failure,
2351
 * returns the error and sets '*outp' to NULL.
2352
 *
2353
 * On success, the caller must eventually free '*outp', with
2354
 * ovsdb_cs_db_update_destroy().
2355
 *
2356
 * 'version' should be 1 if 'in' is a <table-updates>, 2 or 3 if it is a
2357
 * <table-updates2>. */
2358
struct ovsdb_error * OVS_WARN_UNUSED_RESULT
2359
ovsdb_cs_parse_db_update(const struct json *in, int version,
2360
                         struct ovsdb_cs_db_update **outp)
2361
0
{
2362
0
    const char *suffix = version > 1 ? "2" : "";
2363
2364
0
    *outp = NULL;
2365
0
    if (in->type != JSON_OBJECT) {
2366
0
        return ovsdb_syntax_error(in, NULL,
2367
0
                                  "<table-updates%s> is not an object", suffix);
2368
0
    }
2369
2370
0
    struct ovsdb_cs_db_update *out = xzalloc(sizeof *out);
2371
0
    out->table_updates = xmalloc(shash_count(json_object(in))
2372
0
                                 * sizeof *out->table_updates);
2373
0
    const struct shash_node *node;
2374
0
    SHASH_FOR_EACH (node, json_object(in)) {
2375
0
        const char *table_name = node->name;
2376
0
        const struct json *in_tu = node->data;
2377
2378
0
        struct ovsdb_cs_table_update *out_tu = &out->table_updates[out->n++];
2379
0
        *out_tu = (struct ovsdb_cs_table_update) { .table_name = table_name };
2380
2381
0
        struct ovsdb_error *error = ovsdb_cs_parse_table_update(
2382
0
            table_name, in_tu, version, out_tu);
2383
0
        if (error) {
2384
0
            ovsdb_cs_db_update_destroy(out);
2385
0
            return error;
2386
0
        }
2387
0
    }
2388
2389
0
    *outp = out;
2390
0
    return NULL;
2391
0
}
2392
2393
/* Frees 'du', which was presumably allocated by ovsdb_cs_parse_db_update(). */
2394
void
2395
ovsdb_cs_db_update_destroy(struct ovsdb_cs_db_update *du)
2396
0
{
2397
0
    if (!du) {
2398
0
        return;
2399
0
    }
2400
2401
0
    for (size_t i = 0; i < du->n; i++) {
2402
0
        struct ovsdb_cs_table_update *tu = &du->table_updates[i];
2403
0
        free(tu->row_updates);
2404
0
    }
2405
0
    free(du->table_updates);
2406
0
    free(du);
2407
0
}
2408
2409
const struct ovsdb_cs_table_update *
2410
ovsdb_cs_db_update_find_table(const struct ovsdb_cs_db_update *du,
2411
                              const char *table_name)
2412
0
{
2413
0
    for (size_t i = 0; i < du->n; i++) {
2414
0
        const struct ovsdb_cs_table_update *tu = &du->table_updates[i];
2415
0
        if (!strcmp(tu->table_name, table_name)) {
2416
0
            return tu;
2417
0
        }
2418
0
    }
2419
0
    return NULL;
2420
0
}
2421