Coverage Report

Created: 2025-07-01 06:51

/src/openvswitch/lib/ovsdb-cs.c
Line
Count
Source (jump to first uncovered line)
1
/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017 Nicira, Inc.
2
 * Copyright (C) 2016 Hewlett Packard Enterprise Development LP
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at:
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITION OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16
17
#include <config.h>
18
19
#include "ovsdb-cs.h"
20
21
#include <errno.h>
22
23
#include "hash.h"
24
#include "jsonrpc.h"
25
#include "openvswitch/dynamic-string.h"
26
#include "openvswitch/hmap.h"
27
#include "openvswitch/json.h"
28
#include "openvswitch/poll-loop.h"
29
#include "openvswitch/shash.h"
30
#include "openvswitch/vlog.h"
31
#include "ovsdb-data.h"
32
#include "ovsdb-error.h"
33
#include "ovsdb-parser.h"
34
#include "ovsdb-session.h"
35
#include "ovsdb-types.h"
36
#include "sset.h"
37
#include "svec.h"
38
#include "util.h"
39
#include "uuid.h"
40
41
VLOG_DEFINE_THIS_MODULE(ovsdb_cs);
42
43
/* Connection state machine.
44
 *
45
 * When a JSON-RPC session connects, the CS layer sends a "monitor_cond"
46
 * request for the Database table in the _Server database and transitions to
47
 * the CS_S_SERVER_MONITOR_REQUESTED state.  If the session drops and
48
 * reconnects, or if the CS receives a "monitor_canceled" notification for a
49
 * table it is monitoring, the CS starts over again in the same way. */
50
#define OVSDB_CS_STATES                                                 \
51
    /* Waits for "get_schema" reply, then sends "monitor_cond"          \
52
     * request for the Database table in the _Server database, whose    \
53
     * details are informed by the schema, and transitions to           \
54
     * CS_S_SERVER_MONITOR_REQUESTED. */                                \
55
0
    OVSDB_CS_STATE(SERVER_SCHEMA_REQUESTED)                             \
56
0
                                                                        \
57
0
    /* Waits for "monitor_cond" reply for the Database table:           \
58
0
     *                                                                  \
59
0
     * - If the reply indicates success, and the Database table has a   \
60
0
     *   row for the CS database:                                       \
61
0
     *                                                                  \
62
0
     *   * If the row indicates that this is a clustered database       \
63
0
     *     that is not connected to the cluster, closes the             \
64
0
     *     connection.  The next connection attempt has a chance at     \
65
0
     *     picking a connected server.                                  \
66
0
     *                                                                  \
67
0
     *   * Otherwise, sends a monitoring request for the CS             \
68
0
     *     database whose details are informed by the schema            \
69
0
     *     (obtained from the row), and transitions to                  \
70
0
     *     CS_S_DATA_MONITOR_(COND_(SINCE_))REQUESTED.                  \
71
0
     *                                                                  \
72
0
     * - If the reply indicates success, but the Database table does    \
73
0
     *   not have a row for the CS database, transitions to             \
74
0
     *   CS_S_ERROR.                                                    \
75
0
     *                                                                  \
76
0
     * - If the reply indicates failure, sends a "get_schema" request   \
77
0
     *   for the CS database and transitions to                         \
78
0
     *   CS_S_DATA_SCHEMA_REQUESTED. */                                 \
79
0
    OVSDB_CS_STATE(SERVER_MONITOR_REQUESTED)                            \
80
0
                                                                        \
81
0
    /* Waits for "get_schema" reply, then sends "monitor_cond"          \
82
0
     * request whose details are informed by the schema, and            \
83
0
     * transitions to CS_S_DATA_MONITOR_COND_REQUESTED. */              \
84
0
    OVSDB_CS_STATE(DATA_SCHEMA_REQUESTED)                               \
85
0
                                                                        \
86
0
    /* Waits for "monitor_cond_since" reply.  If successful, replaces   \
87
0
     * the CS contents by the data carried in the reply and             \
88
0
     * transitions to CS_S_MONITORING.  On failure, sends a             \
89
0
     * "monitor_cond" request and transitions to                        \
90
0
     * CS_S_DATA_MONITOR_COND_REQUESTED. */                             \
91
0
    OVSDB_CS_STATE(DATA_MONITOR_COND_SINCE_REQUESTED)                   \
92
0
                                                                        \
93
0
    /* Waits for "monitor_cond" reply.  If successful, replaces the     \
94
0
     * CS contents by the data carried in the reply and transitions     \
95
0
     * to CS_S_MONITORING.  On failure, sends a "monitor" request       \
96
0
     * and transitions to CS_S_DATA_MONITOR_REQUESTED. */               \
97
0
    OVSDB_CS_STATE(DATA_MONITOR_COND_REQUESTED)                         \
98
0
                                                                        \
99
0
    /* Waits for "monitor" reply.  If successful, replaces the CS       \
100
0
     * contents by the data carried in the reply and transitions to     \
101
0
     * CS_S_MONITORING.  On failure, transitions to CS_S_ERROR. */      \
102
0
    OVSDB_CS_STATE(DATA_MONITOR_REQUESTED)                              \
103
0
                                                                        \
104
0
    /* State that processes "update", "update2" or "update3"            \
105
0
     * notifications for the main database (and the Database table      \
106
0
     * in _Server if available).                                        \
107
0
     *                                                                  \
108
0
     * If we're monitoring the Database table and we get notified       \
109
0
     * that the CS database has been deleted, we close the              \
110
0
     * connection (which will restart the state machine). */            \
111
0
    OVSDB_CS_STATE(MONITORING)                                          \
112
0
                                                                        \
113
0
    /* Terminal error state that indicates that nothing useful can be   \
114
0
     * done, for example because the database server doesn't actually   \
115
0
     * have the desired database.  We maintain the session with the     \
116
0
     * database server anyway.  If it starts serving the database       \
117
0
     * that we want, or if someone fixes and restarts the database,     \
118
0
     * then it will kill the session and we will automatically          \
119
0
     * reconnect and try again. */                                      \
120
0
    OVSDB_CS_STATE(ERROR)                                               \
121
0
                                                                        \
122
0
    /* Terminal state that indicates we connected to a useless server   \
123
0
     * in a cluster, e.g. one that is partitioned from the rest of      \
124
0
     * the cluster. We're waiting to retry. */                          \
125
0
    OVSDB_CS_STATE(RETRY)
126
127
enum ovsdb_cs_state {
128
#define OVSDB_CS_STATE(NAME) CS_S_##NAME,
129
    OVSDB_CS_STATES
130
#undef OVSDB_CS_STATE
131
};
132
133
static const char *
134
ovsdb_cs_state_to_string(enum ovsdb_cs_state state)
135
0
{
136
0
    switch (state) {
137
0
#define OVSDB_CS_STATE(NAME) case CS_S_##NAME: return #NAME;
138
0
        OVSDB_CS_STATES
139
0
#undef OVSDB_CS_STATE
140
0
    default: return "<unknown>";
141
0
    }
142
0
}
143
144
/* A database being monitored.
145
 *
146
 * There are two instances of this data structure for each CS instance, one for
147
 * the _Server database used for working with clusters, and the other one for
148
 * the actual database that the client is interested in.  */
149
struct ovsdb_cs_db {
150
    struct ovsdb_cs *cs;
151
152
    /* Data. */
153
    const char *db_name;        /* Database's name. */
154
    struct hmap tables;         /* Contains "struct ovsdb_cs_db_table *"s.*/
155
    struct json *monitor_id;
156
    struct json *schema;
157
158
    /* Monitor version. */
159
    int max_version;            /* Maximum version of monitor request to use. */
160
    int monitor_version;        /* 0 if not monitoring, 1=monitor,
161
                                 * 2=monitor_cond, 3=monitor_cond_since. */
162
163
    /* Condition changes. */
164
    bool cond_changed;          /* Change not yet sent to server? */
165
    unsigned int cond_seqno;    /* Increments when condition changes. */
166
167
    /* Database locking. */
168
    char *lock_name;            /* Name of lock we need, NULL if none. */
169
    bool has_lock;              /* Has db server told us we have the lock? */
170
    bool is_lock_contended;     /* Has db server told us we can't get lock? */
171
    struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */
172
173
    /* Last db txn id, used for fast resync through monitor_cond_since */
174
    struct uuid last_id;
175
176
    /* Client interface. */
177
    struct ovs_list events;
178
    const struct ovsdb_cs_ops *ops;
179
    void *ops_aux;
180
};
181
182
static const struct ovsdb_cs_ops ovsdb_cs_server_ops;
183
184
static void ovsdb_cs_db_destroy_tables(struct ovsdb_cs_db *);
185
static unsigned int ovsdb_cs_db_set_condition(
186
    struct ovsdb_cs_db *, const char *db_name, const struct json *condition);
187
188
static void ovsdb_cs_send_schema_request(struct ovsdb_cs *,
189
                                          struct ovsdb_cs_db *);
190
static void ovsdb_cs_send_db_change_aware(struct ovsdb_cs *);
191
static bool ovsdb_cs_check_server_db(struct ovsdb_cs *);
192
static void ovsdb_cs_clear_server_rows(struct ovsdb_cs *);
193
static void ovsdb_cs_send_monitor_request(struct ovsdb_cs *,
194
                                          struct ovsdb_cs_db *, int version);
195
static void ovsdb_cs_db_ack_condition(struct ovsdb_cs_db *db);
196
static void ovsdb_cs_db_sync_condition(struct ovsdb_cs_db *db);
197
198
struct ovsdb_cs {
199
    struct ovsdb_cs_db server;
200
    struct ovsdb_cs_db data;
201
202
    /* Session state.
203
     *
204
     * 'state_seqno' is a snapshot of the session's sequence number as returned
205
     * jsonrpc_session_get_seqno(session), so if it differs from the value that
206
     * function currently returns then the session has reconnected and the
207
     * state machine must restart.  */
208
    struct jsonrpc_session *session; /* Connection to the server. */
209
    char *remote;                    /* 'session' remote name. */
210
    enum ovsdb_cs_state state;       /* Current session state. */
211
    unsigned int state_seqno;        /* See above. */
212
    struct json *request_id;         /* JSON ID for request awaiting reply. */
213
214
    /* IDs of outstanding transactions. */
215
    struct json **txns;
216
    size_t n_txns, allocated_txns;
217
218
    /* Info for the _Server database. */
219
    struct uuid cid;
220
    struct hmap server_rows;
221
222
    /* Whether to send 'set_db_change_aware'. */
223
    bool set_db_change_aware;
224
225
    /* Clustered servers. */
226
    uint64_t min_index;      /* Minimum allowed index, to avoid regression. */
227
    bool leader_only;        /* If true, do not connect to Raft followers. */
228
    bool shuffle_remotes;    /* If true, connect to servers in random order. */
229
};
230
231
static void ovsdb_cs_transition_at(struct ovsdb_cs *, enum ovsdb_cs_state,
232
                                    const char *where);
233
#define ovsdb_cs_transition(CS, STATE) \
234
0
    ovsdb_cs_transition_at(CS, STATE, OVS_SOURCE_LOCATOR)
235
236
static void ovsdb_cs_retry_at(struct ovsdb_cs *, const char *where);
237
0
#define ovsdb_cs_retry(CS) ovsdb_cs_retry_at(CS, OVS_SOURCE_LOCATOR)
238
239
static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
240
241
static void ovsdb_cs_db_parse_monitor_reply(struct ovsdb_cs_db *,
242
                                            const struct json *result,
243
                                            int version);
244
static bool ovsdb_cs_db_parse_update_rpc(struct ovsdb_cs_db *,
245
                                         const struct jsonrpc_msg *);
246
static bool ovsdb_cs_handle_monitor_canceled(struct ovsdb_cs *,
247
                                              struct ovsdb_cs_db *,
248
                                              const struct jsonrpc_msg *);
249
250
static bool ovsdb_cs_db_process_lock_replies(struct ovsdb_cs_db *,
251
                                              const struct jsonrpc_msg *);
252
static struct jsonrpc_msg *ovsdb_cs_db_compose_lock_request(
253
    struct ovsdb_cs_db *);
254
static struct jsonrpc_msg *ovsdb_cs_db_compose_unlock_request(
255
    struct ovsdb_cs_db *);
256
static void ovsdb_cs_db_parse_lock_reply(struct ovsdb_cs_db *,
257
                                          const struct json *);
258
static bool ovsdb_cs_db_parse_lock_notify(struct ovsdb_cs_db *,
259
                                           const struct json *params,
260
                                           bool new_has_lock);
261
static void ovsdb_cs_send_cond_change(struct ovsdb_cs *);
262
263
static bool ovsdb_cs_db_txn_process_reply(struct ovsdb_cs *,
264
                                          const struct jsonrpc_msg *reply);
265

266
/* Events. */
267
268
void
269
ovsdb_cs_event_destroy(struct ovsdb_cs_event *event)
270
0
{
271
0
    if (event) {
272
0
        switch (event->type) {
273
0
        case OVSDB_CS_EVENT_TYPE_RECONNECT:
274
0
        case OVSDB_CS_EVENT_TYPE_LOCKED:
275
0
            break;
276
277
0
        case OVSDB_CS_EVENT_TYPE_UPDATE:
278
0
            json_destroy(event->update.table_updates);
279
0
            break;
280
281
0
        case OVSDB_CS_EVENT_TYPE_TXN_REPLY:
282
0
            jsonrpc_msg_destroy(event->txn_reply);
283
0
            break;
284
0
        }
285
0
        free(event);
286
0
    }
287
0
}
288

289
/* Lifecycle. */
290
291
static void
292
ovsdb_cs_db_init(struct ovsdb_cs_db *db, const char *db_name,
293
                 struct ovsdb_cs *parent, int max_version,
294
                 const struct ovsdb_cs_ops *ops, void *ops_aux)
295
0
{
296
0
    *db = (struct ovsdb_cs_db) {
297
0
        .cs = parent,
298
0
        .db_name = db_name,
299
0
        .tables = HMAP_INITIALIZER(&db->tables),
300
0
        .max_version = max_version,
301
0
        .monitor_id = json_array_create_2(json_string_create("monid"),
302
0
                                          json_string_create(db_name)),
303
0
        .events = OVS_LIST_INITIALIZER(&db->events),
304
0
        .ops = ops,
305
0
        .ops_aux = ops_aux,
306
0
    };
307
0
}
308
309
/* Creates and returns a new client synchronization object.  The connection
310
 * will monitor remote database 'db_name'.  If 'retry' is true, then also
311
 * reconnect if the connection fails.
312
 *
313
 * XXX 'max_version' should ordinarily be 3, to allow use of the most efficient
314
 * "monitor_cond_since" method with the database.  Currently there's some kind
315
 * of bug in the DDlog Rust code that interfaces to that, so instead
316
 * ovn-northd-ddlog passes 1 to use plain 'monitor' instead.  Once the DDlog
317
 * Rust code gets fixed, we might as well just delete 'max_version'
318
 * entirely.
319
 *
320
 * 'ops' is a struct for northd_cs_run() to use, and 'ops_aux' is a pointer
321
 * that gets passed into each call.
322
 *
323
 * Use ovsdb_cs_set_remote() to configure the database to which to connect.
324
 * Until a remote is configured, no data can be retrieved.
325
 */
326
struct ovsdb_cs *
327
ovsdb_cs_create(const char *db_name, int max_version,
328
                const struct ovsdb_cs_ops *ops, void *ops_aux)
329
0
{
330
0
    struct ovsdb_cs *cs = xzalloc(sizeof *cs);
331
0
    ovsdb_cs_db_init(&cs->server, "_Server", cs, 2, &ovsdb_cs_server_ops, cs);
332
0
    ovsdb_cs_db_init(&cs->data, db_name, cs, max_version, ops, ops_aux);
333
0
    cs->state_seqno = UINT_MAX;
334
0
    cs->request_id = NULL;
335
0
    cs->leader_only = true;
336
0
    cs->shuffle_remotes = true;
337
0
    cs->set_db_change_aware = true;
338
0
    hmap_init(&cs->server_rows);
339
340
0
    return cs;
341
0
}
342
343
static void
344
ovsdb_cs_db_destroy(struct ovsdb_cs_db *db)
345
0
{
346
0
    ovsdb_cs_db_destroy_tables(db);
347
348
0
    json_destroy(db->monitor_id);
349
0
    json_destroy(db->schema);
350
351
0
    free(db->lock_name);
352
353
0
    json_destroy(db->lock_request_id);
354
355
    /* This list always gets flushed out at the end of ovsdb_cs_run(). */
356
0
    ovs_assert(ovs_list_is_empty(&db->events));
357
0
}
358
359
/* Destroys 'cs' and all of the data structures that it manages. */
360
void
361
ovsdb_cs_destroy(struct ovsdb_cs *cs)
362
0
{
363
0
    if (cs) {
364
0
        ovsdb_cs_db_destroy(&cs->server);
365
0
        ovsdb_cs_db_destroy(&cs->data);
366
0
        jsonrpc_session_close(cs->session);
367
0
        free(cs->remote);
368
0
        json_destroy(cs->request_id);
369
370
0
        for (size_t i = 0; i < cs->n_txns; i++) {
371
0
            json_destroy(cs->txns[i]);
372
0
        }
373
0
        free(cs->txns);
374
375
0
        ovsdb_cs_clear_server_rows(cs);
376
0
        hmap_destroy(&cs->server_rows);
377
378
0
        free(cs);
379
0
    }
380
0
}
381
382
static void
383
ovsdb_cs_transition_at(struct ovsdb_cs *cs, enum ovsdb_cs_state new_state,
384
                        const char *where)
385
0
{
386
0
    VLOG_DBG("%s: %s -> %s at %s",
387
0
             cs->session ? jsonrpc_session_get_name(cs->session) : "void",
388
0
             ovsdb_cs_state_to_string(cs->state),
389
0
             ovsdb_cs_state_to_string(new_state),
390
0
             where);
391
0
    cs->state = new_state;
392
0
}
393
394
static void
395
ovsdb_cs_send_request(struct ovsdb_cs *cs, struct jsonrpc_msg *request)
396
0
{
397
0
    json_destroy(cs->request_id);
398
0
    cs->request_id = json_clone(request->id);
399
0
    if (cs->session) {
400
0
        jsonrpc_session_send(cs->session, request);
401
0
    } else {
402
0
        jsonrpc_msg_destroy(request);
403
0
    }
404
0
}
405
406
static void
407
ovsdb_cs_retry_at(struct ovsdb_cs *cs, const char *where)
408
0
{
409
0
    ovsdb_cs_force_reconnect(cs);
410
0
    ovsdb_cs_transition_at(cs, CS_S_RETRY, where);
411
0
}
412
413
static void
414
ovsdb_cs_restart_fsm(struct ovsdb_cs *cs)
415
0
{
416
    /* Resync data DB table conditions to avoid missing updates due to
417
     * conditions that were in flight or changed locally while the connection
418
     * was down.
419
     */
420
0
    ovsdb_cs_db_sync_condition(&cs->data);
421
422
0
    ovsdb_cs_send_schema_request(cs, &cs->server);
423
0
    ovsdb_cs_transition(cs, CS_S_SERVER_SCHEMA_REQUESTED);
424
0
    cs->data.monitor_version = 0;
425
0
    cs->server.monitor_version = 0;
426
0
}
427
428
static void
429
ovsdb_cs_process_response(struct ovsdb_cs *cs, struct jsonrpc_msg *msg)
430
0
{
431
0
    bool ok = msg->type == JSONRPC_REPLY;
432
0
    if (!ok
433
0
        && cs->state != CS_S_SERVER_SCHEMA_REQUESTED
434
0
        && cs->state != CS_S_SERVER_MONITOR_REQUESTED
435
0
        && cs->state != CS_S_DATA_MONITOR_COND_REQUESTED
436
0
        && cs->state != CS_S_DATA_MONITOR_COND_SINCE_REQUESTED) {
437
0
        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
438
0
        char *s = jsonrpc_msg_to_string(msg);
439
0
        VLOG_INFO_RL(&rl, "%s: received unexpected %s response in "
440
0
                     "%s state: %s", jsonrpc_session_get_name(cs->session),
441
0
                     jsonrpc_msg_type_to_string(msg->type),
442
0
                     ovsdb_cs_state_to_string(cs->state),
443
0
                     s);
444
0
        free(s);
445
0
        ovsdb_cs_retry(cs);
446
0
        return;
447
0
    }
448
449
0
    switch (cs->state) {
450
0
    case CS_S_SERVER_SCHEMA_REQUESTED:
451
0
        if (ok) {
452
0
            json_destroy(cs->server.schema);
453
0
            cs->server.schema = json_clone(msg->result);
454
0
            ovsdb_cs_send_monitor_request(cs, &cs->server,
455
0
                                          cs->server.max_version);
456
0
            ovsdb_cs_transition(cs, CS_S_SERVER_MONITOR_REQUESTED);
457
0
        } else {
458
0
            ovsdb_cs_send_schema_request(cs, &cs->data);
459
0
            ovsdb_cs_transition(cs, CS_S_DATA_SCHEMA_REQUESTED);
460
0
        }
461
0
        break;
462
463
0
    case CS_S_SERVER_MONITOR_REQUESTED:
464
0
        if (ok) {
465
0
            cs->server.monitor_version = cs->server.max_version;
466
0
            ovsdb_cs_db_parse_monitor_reply(&cs->server, msg->result,
467
0
                                            cs->server.monitor_version);
468
0
            if (ovsdb_cs_check_server_db(cs) && cs->set_db_change_aware) {
469
0
                ovsdb_cs_send_db_change_aware(cs);
470
0
            }
471
0
        } else {
472
0
            ovsdb_cs_send_schema_request(cs, &cs->data);
473
0
            ovsdb_cs_transition(cs, CS_S_DATA_SCHEMA_REQUESTED);
474
0
        }
475
0
        break;
476
477
0
    case CS_S_DATA_SCHEMA_REQUESTED:
478
0
        json_destroy(cs->data.schema);
479
0
        cs->data.schema = json_clone(msg->result);
480
0
        if (cs->data.max_version >= 2) {
481
0
            ovsdb_cs_send_monitor_request(cs, &cs->data, 2);
482
0
            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED);
483
0
        } else {
484
0
            ovsdb_cs_send_monitor_request(cs, &cs->data, 1);
485
0
            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED);
486
0
        }
487
0
        break;
488
489
0
    case CS_S_DATA_MONITOR_COND_SINCE_REQUESTED:
490
0
        if (!ok) {
491
            /* "monitor_cond_since" not supported.  Try "monitor_cond". */
492
0
            ovsdb_cs_send_monitor_request(cs, &cs->data, 2);
493
0
            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED);
494
0
        } else {
495
0
            cs->data.monitor_version = 3;
496
0
            ovsdb_cs_transition(cs, CS_S_MONITORING);
497
0
            ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 3);
498
0
        }
499
0
        break;
500
501
0
    case CS_S_DATA_MONITOR_COND_REQUESTED:
502
0
        if (!ok) {
503
            /* "monitor_cond" not supported.  Try "monitor". */
504
0
            ovsdb_cs_send_monitor_request(cs, &cs->data, 1);
505
0
            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED);
506
0
        } else {
507
0
            cs->data.monitor_version = 2;
508
0
            ovsdb_cs_transition(cs, CS_S_MONITORING);
509
0
            ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 2);
510
0
        }
511
0
        break;
512
513
0
    case CS_S_DATA_MONITOR_REQUESTED:
514
0
        cs->data.monitor_version = 1;
515
0
        ovsdb_cs_transition(cs, CS_S_MONITORING);
516
0
        ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 1);
517
0
        break;
518
519
0
    case CS_S_MONITORING:
520
        /* We don't normally have a request outstanding in this state.  If we
521
         * do, it's a "monitor_cond_change", which means that the conditional
522
         * monitor clauses were updated.
523
         *
524
         * Mark the last requested conditions as acked and if further
525
         * condition changes were pending, send them now. */
526
0
        ovsdb_cs_db_ack_condition(&cs->data);
527
0
        ovsdb_cs_send_cond_change(cs);
528
0
        cs->data.cond_seqno++;
529
0
        break;
530
531
0
    case CS_S_ERROR:
532
0
    case CS_S_RETRY:
533
        /* Nothing to do in this state. */
534
0
        break;
535
536
0
    default:
537
0
        OVS_NOT_REACHED();
538
0
    }
539
0
}
540
541
static void
542
ovsdb_cs_process_msg(struct ovsdb_cs *cs, struct jsonrpc_msg *msg)
543
0
{
544
0
    bool is_response = (msg->type == JSONRPC_REPLY ||
545
0
                        msg->type == JSONRPC_ERROR);
546
547
    /* Process a reply to an outstanding request. */
548
0
    if (is_response
549
0
        && cs->request_id && json_equal(cs->request_id, msg->id)) {
550
0
        json_destroy(cs->request_id);
551
0
        cs->request_id = NULL;
552
0
        ovsdb_cs_process_response(cs, msg);
553
0
        return;
554
0
    }
555
556
    /* Process database contents updates. */
557
0
    if (ovsdb_cs_db_parse_update_rpc(&cs->data, msg)) {
558
0
        return;
559
0
    }
560
0
    if (cs->server.monitor_version
561
0
        && ovsdb_cs_db_parse_update_rpc(&cs->server, msg)) {
562
0
        ovsdb_cs_check_server_db(cs);
563
0
        return;
564
0
    }
565
566
0
    if (ovsdb_cs_handle_monitor_canceled(cs, &cs->data, msg)
567
0
        || (cs->server.monitor_version
568
0
            && ovsdb_cs_handle_monitor_canceled(cs, &cs->server, msg))) {
569
0
        return;
570
0
    }
571
572
    /* Process "lock" replies and related notifications. */
573
0
    if (ovsdb_cs_db_process_lock_replies(&cs->data, msg)) {
574
0
        return;
575
0
    }
576
577
    /* Process response to a database transaction we submitted. */
578
0
    if (is_response && ovsdb_cs_db_txn_process_reply(cs, msg)) {
579
0
        return;
580
0
    }
581
582
    /* Unknown message.  Log at a low level because this can happen if
583
     * ovsdb_cs_txn_destroy() is called to destroy a transaction
584
     * before we receive the reply.
585
     *
586
     * (We could sort those out from other kinds of unknown messages by
587
     * using distinctive IDs for transactions, if it seems valuable to
588
     * do so, and then it would be possible to use different log
589
     * levels. XXX?) */
590
0
    char *s = jsonrpc_msg_to_string(msg);
591
0
    VLOG_DBG("%s: received unexpected %s message: %s",
592
0
             jsonrpc_session_get_name(cs->session),
593
0
             jsonrpc_msg_type_to_string(msg->type), s);
594
0
    free(s);
595
0
}
596
597
static struct ovsdb_cs_event *
598
ovsdb_cs_db_add_event(struct ovsdb_cs_db *db, enum ovsdb_cs_event_type type)
599
0
{
600
0
    struct ovsdb_cs_event *event = xmalloc(sizeof *event);
601
0
    event->type = type;
602
0
    ovs_list_push_back(&db->events, &event->list_node);
603
0
    return event;
604
0
}
605
606
/* Processes a batch of messages from the database server on 'cs'.  This may
607
 * cause the CS's contents to change.
608
 *
609
 * Initializes 'events' with a list of events that occurred on 'cs'.  The
610
 * caller must process and destroy all of the events. */
611
void
612
ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events)
613
0
{
614
0
    ovs_list_init(events);
615
0
    if (!cs->session) {
616
0
        return;
617
0
    }
618
619
0
    ovsdb_cs_send_cond_change(cs);
620
621
0
    jsonrpc_session_run(cs->session);
622
623
0
    unsigned int seqno = jsonrpc_session_get_seqno(cs->session);
624
0
    if (cs->state_seqno != seqno) {
625
0
        cs->state_seqno = seqno;
626
0
        ovsdb_cs_restart_fsm(cs);
627
628
0
        for (size_t i = 0; i < cs->n_txns; i++) {
629
0
            json_destroy(cs->txns[i]);
630
0
        }
631
0
        cs->n_txns = 0;
632
633
0
        ovsdb_cs_db_add_event(&cs->data, OVSDB_CS_EVENT_TYPE_RECONNECT);
634
635
0
        if (cs->data.lock_name) {
636
0
            jsonrpc_session_send(
637
0
                cs->session,
638
0
                ovsdb_cs_db_compose_lock_request(&cs->data));
639
0
        }
640
0
    }
641
642
0
    for (int i = 0; i < 50; i++) {
643
0
        struct jsonrpc_msg *msg = jsonrpc_session_recv(cs->session);
644
0
        if (!msg) {
645
0
            break;
646
0
        }
647
0
        ovsdb_cs_process_msg(cs, msg);
648
0
        jsonrpc_msg_destroy(msg);
649
0
    }
650
0
    ovs_list_push_back_all(events, &cs->data.events);
651
0
}
652
653
/* Arranges for poll_block() to wake up when ovsdb_cs_run() has something to
654
 * do or when activity occurs on a transaction on 'cs'. */
655
void
656
ovsdb_cs_wait(struct ovsdb_cs *cs)
657
0
{
658
0
    if (!cs->session) {
659
0
        return;
660
0
    }
661
0
    jsonrpc_session_wait(cs->session);
662
0
    jsonrpc_session_recv_wait(cs->session);
663
0
}
664

665
/* Network connection. */
666
667
/* Changes the remote and creates a new session.  Keeps existing connection
668
 * if current remote is still valid.
669
 *
670
 * If 'retry' is true, the connection to the remote will automatically retry
671
 * when it fails.  If 'retry' is false, the connection is one-time. */
672
void
673
ovsdb_cs_set_remote(struct ovsdb_cs *cs, const char *remote, bool retry)
674
0
{
675
0
    if (cs
676
0
        && ((remote != NULL) != (cs->remote != NULL)
677
0
            || (remote && cs->remote && strcmp(remote, cs->remote)))) {
678
0
        struct jsonrpc *rpc = NULL;
679
680
        /* Close the old session, if any. */
681
0
        if (cs->session) {
682
            /* Save the current open connection and close the session. */
683
0
            rpc = jsonrpc_session_steal(cs->session);
684
0
            cs->session = NULL;
685
686
0
            free(cs->remote);
687
0
            cs->remote = NULL;
688
0
        }
689
690
        /* Open new session, if any. */
691
0
        if (remote) {
692
0
            struct svec remotes = SVEC_EMPTY_INITIALIZER;
693
0
            struct uuid old_cid = cs->cid;
694
695
0
            ovsdb_session_parse_remote(remote, &remotes, &cs->cid);
696
0
            if (cs->shuffle_remotes) {
697
0
                svec_shuffle(&remotes);
698
0
            }
699
0
            cs->session = jsonrpc_session_open_multiple(&remotes, retry);
700
701
            /* Use old connection, if cluster id didn't change and the remote
702
             * is still on the list, to avoid unnecessary re-connection. */
703
0
            if (rpc && uuid_equals(&old_cid, &cs->cid)
704
0
                && svec_contains_unsorted(&remotes, jsonrpc_get_name(rpc))) {
705
0
                jsonrpc_session_replace(cs->session, rpc);
706
0
                cs->state_seqno = jsonrpc_session_get_seqno(cs->session);
707
0
                rpc = NULL;
708
0
            } else {
709
0
                cs->state_seqno = UINT_MAX;
710
0
            }
711
712
0
            svec_destroy(&remotes);
713
0
            cs->remote = xstrdup(remote);
714
0
        }
715
716
0
        jsonrpc_close(rpc);
717
0
    }
718
0
}
719
720
/* Reconfigures 'cs' so that it would reconnect to the database, if
721
 * connection was dropped. */
722
void
723
ovsdb_cs_enable_reconnect(struct ovsdb_cs *cs)
724
0
{
725
0
    if (cs->session) {
726
0
        jsonrpc_session_enable_reconnect(cs->session);
727
0
    }
728
0
}
729
730
/* Forces 'cs' to drop its connection to the database and reconnect.  In the
731
 * meantime, the contents of 'cs' will not change. */
732
void
733
ovsdb_cs_force_reconnect(struct ovsdb_cs *cs)
734
0
{
735
0
    if (cs->session) {
736
0
        if (cs->state == CS_S_MONITORING) {
737
            /* The ovsdb-cs was in MONITORING state, so we either had data
738
             * inconsistency on this server, or it stopped being the cluster
739
             * leader, or the user requested to re-connect.  Avoiding backoff
740
             * in these cases, as we need to re-connect as soon as possible.
741
             * Connections that are not in MONITORING state should have their
742
             * backoff to avoid constant flood of re-connection attempts in
743
             * case there is no suitable database server. */
744
0
            jsonrpc_session_reset_backoff(cs->session);
745
0
        }
746
0
        jsonrpc_session_force_reconnect(cs->session);
747
0
    }
748
0
}
749
750
/* Drops 'cs''s current connection and the cached session.  This is useful if
751
 * the client notices some kind of inconsistency. */
752
void
753
ovsdb_cs_flag_inconsistency(struct ovsdb_cs *cs)
754
0
{
755
0
    cs->data.last_id = UUID_ZERO;
756
0
    ovsdb_cs_retry(cs);
757
0
}
758
759
/* Returns true if 'cs' is currently connected or will eventually try to
760
 * reconnect. */
761
bool
762
ovsdb_cs_is_alive(const struct ovsdb_cs *cs)
763
0
{
764
0
    return (cs->session
765
0
            && jsonrpc_session_is_alive(cs->session)
766
0
            && cs->state != CS_S_ERROR);
767
0
}
768
769
/* Returns true if 'cs' is currently connected to a server. */
770
bool
771
ovsdb_cs_is_connected(const struct ovsdb_cs *cs)
772
0
{
773
0
    return cs->session && jsonrpc_session_is_connected(cs->session);
774
0
}
775
776
/* Returns the last error reported on a connection by 'cs'.  The return value
777
 * is 0 only if no connection made by 'cs' has ever encountered an error and
778
 * a negative response to a schema request has never been received. See
779
 * jsonrpc_get_status() for jsonrpc_session_get_last_error() return value
780
 * interpretation. */
781
int
782
ovsdb_cs_get_last_error(const struct ovsdb_cs *cs)
783
0
{
784
0
    int err = cs->session ? jsonrpc_session_get_last_error(cs->session) : 0;
785
0
    if (err) {
786
0
        return err;
787
0
    } else if (cs->state == CS_S_ERROR) {
788
0
        return ENOENT;
789
0
    } else {
790
0
        return 0;
791
0
    }
792
0
}
793
794
/* Sets all the JSON-RPC session 'options' for 'cs''s current session. */
795
void
796
ovsdb_cs_set_jsonrpc_options(const struct ovsdb_cs *cs,
797
                             const struct jsonrpc_session_options *options)
798
0
{
799
0
    if (cs->session) {
800
0
        jsonrpc_session_set_options(cs->session, options);
801
0
    }
802
0
}
803
804
/* Sets the "probe interval" for 'cs''s current session to 'probe_interval', in
805
 * milliseconds. */
806
void
807
ovsdb_cs_set_probe_interval(const struct ovsdb_cs *cs, int probe_interval)
808
0
{
809
0
    if (cs->session) {
810
0
        jsonrpc_session_set_probe_interval(cs->session, probe_interval);
811
0
    }
812
0
}
813

814
/* Conditional monitoring. */
815
816
/* A table being monitored.
817
 *
818
 * At the CS layer, the only thing we care about, table-wise, is the conditions
819
 * we're using for monitoring them, so there's little here.  We only create
820
 * these table structures at all for tables that have conditions. */
821
struct ovsdb_cs_db_table {
822
    struct hmap_node hmap_node; /* Indexed by 'name'. */
823
    char *name;                 /* Table name. */
824
825
    /* Each of these is a null pointer if it is empty, or JSON [<condition>*]
826
     * or [true] or [false] otherwise.  [true] could be represented as a null
827
     * pointer, but we want to distinguish "empty slot" from "a condition that
828
     * is always true" in a slot. */
829
    struct json *ack_cond; /* Last condition acked by the server. */
830
    struct json *req_cond; /* Last condition requested to the server. */
831
    struct json *new_cond; /* Latest condition set by the IDL client. */
832
};
833
834
/* A kind of condition, so that we can treat equivalent JSON as equivalent. */
835
enum condition_type {
836
    COND_FALSE,                 /* [] or [false] */
837
    COND_TRUE,                  /* Null pointer or [true] */
838
    COND_OTHER                  /* Anything else. */
839
};
840
841
/* Returns the condition_type for 'condition'. */
842
static enum condition_type
843
condition_classify(const struct json *condition)
844
0
{
845
0
    if (condition) {
846
0
        switch (json_array_size(condition)) {
847
0
        case 0:
848
0
            return COND_FALSE;
849
850
0
        case 1: {
851
0
            const struct json *cond = json_array_at(condition, 0);
852
853
0
            return (cond->type == JSON_FALSE ? COND_FALSE
854
0
                    : cond->type == JSON_TRUE ? COND_TRUE
855
0
                    : COND_OTHER);
856
0
        }
857
858
0
        default:
859
0
            return COND_OTHER;
860
0
        }
861
0
    } else {
862
0
        return COND_TRUE;
863
0
    }
864
0
}
865
866
/* Returns true if 'a' and 'b' are the same condition (in an obvious way; we're
867
 * not going to compare for boolean equivalence or anything). */
868
static bool
869
condition_equal(const struct json *a, const struct json *b)
870
0
{
871
0
    enum condition_type type = condition_classify(a);
872
0
    return (type == condition_classify(b)
873
0
            && (type != COND_OTHER || json_equal(a, b)));
874
0
}
875
876
/* Returns a clone of 'condition', translating always-true and always-false to
877
 * [true] and [false], respectively. */
878
static struct json *
879
condition_clone(const struct json *condition)
880
0
{
881
0
    switch (condition_classify(condition)) {
882
0
    case COND_TRUE:
883
0
        return json_array_create_1(json_boolean_create(true));
884
885
0
    case COND_FALSE:
886
0
        return json_array_create_1(json_boolean_create(false));
887
888
0
    case COND_OTHER:
889
0
        return json_clone(condition);
890
0
    }
891
892
0
    OVS_NOT_REACHED();
893
0
}
894
895
/* Returns the ovsdb_cs_db_table associated with 'table' in 'db', creating an
896
 * empty one if necessary. */
897
static struct ovsdb_cs_db_table *
898
ovsdb_cs_db_get_table(struct ovsdb_cs_db *db, const char *table)
899
0
{
900
0
    uint32_t hash = hash_string(table, 0);
901
0
    struct ovsdb_cs_db_table *t;
902
903
0
    HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &db->tables) {
904
0
        if (!strcmp(t->name, table)) {
905
0
            return t;
906
0
        }
907
0
    }
908
909
0
    t = xzalloc(sizeof *t);
910
0
    t->name = xstrdup(table);
911
0
    t->ack_cond = json_array_create_1(json_boolean_create(true));
912
0
    hmap_insert(&db->tables, &t->hmap_node, hash);
913
0
    return t;
914
0
}
915
916
static void
917
ovsdb_cs_db_destroy_tables(struct ovsdb_cs_db *db)
918
0
{
919
0
    struct ovsdb_cs_db_table *table;
920
0
    HMAP_FOR_EACH_SAFE (table, hmap_node, &db->tables) {
921
0
        json_destroy(table->ack_cond);
922
0
        json_destroy(table->req_cond);
923
0
        json_destroy(table->new_cond);
924
0
        hmap_remove(&db->tables, &table->hmap_node);
925
0
        free(table->name);
926
0
        free(table);
927
0
    }
928
0
    hmap_destroy(&db->tables);
929
0
}
930
931
static unsigned int
932
ovsdb_cs_db_set_condition(struct ovsdb_cs_db *db, const char *table,
933
                          const struct json *condition)
934
0
{
935
    /* Compare the new condition to the last known condition which can be
936
     * either "new" (not sent yet), "requested" or "acked", in this order. */
937
0
    struct ovsdb_cs_db_table *t = ovsdb_cs_db_get_table(db, table);
938
0
    const struct json *table_cond = (t->new_cond ? t->new_cond
939
0
                                     : t->req_cond ? t->req_cond
940
0
                                     : t->ack_cond);
941
0
    if (!condition_equal(condition, table_cond)) {
942
0
        json_destroy(t->new_cond);
943
0
        t->new_cond = condition_clone(condition);
944
0
        db->cond_changed = true;
945
0
        poll_immediate_wake();
946
0
    }
947
948
    /* Conditions will be up to date when we receive replies for already
949
     * requested and new conditions, if any.  This includes condition change
950
     * requests for other tables too.
951
     */
952
0
    if (t->new_cond) {
953
        /* New condition will be sent out after all already requested ones
954
         * are acked.
955
         */
956
0
        bool any_req_cond = false;
957
0
        HMAP_FOR_EACH (t, hmap_node, &db->tables) {
958
0
            if (t->req_cond) {
959
0
                any_req_cond = true;
960
0
                break;
961
0
            }
962
0
        }
963
0
        return db->cond_seqno + any_req_cond + 1;
964
0
    } else {
965
        /* Already requested conditions should be up to date at
966
         * db->cond_seqno + 1 while acked conditions are already up to date.
967
         */
968
0
        return db->cond_seqno + !!t->req_cond;
969
0
    }
970
0
}
971
972
/* Sets the replication condition for 'tc' in 'cs' to 'condition' and arranges
973
 * to send the new condition to the database server.
974
 *
975
 * Return the next conditional update sequence number.  When this value and
976
 * ovsdb_cs_get_condition_seqno() matches, 'cs' contains rows that match the
977
 * 'condition'. */
978
unsigned int
979
ovsdb_cs_set_condition(struct ovsdb_cs *cs, const char *table,
980
                       const struct json *condition)
981
0
{
982
0
    return ovsdb_cs_db_set_condition(&cs->data, table, condition);
983
0
}
984
985
/* Returns a "sequence number" that represents the number of conditional
986
 * monitoring updates successfully received by the OVSDB server of a CS
987
 * connection.
988
 *
989
 * ovsdb_cs_set_condition() sets a new condition that is different from the
990
 * current condtion, the next expected "sequence number" is returned.
991
 *
992
 * Whenever ovsdb_cs_get_condition_seqno() returns a value that matches the
993
 * return value of ovsdb_cs_set_condition(), the client is assured that:
994
 *
995
 *   - The ovsdb_cs_set_condition() changes has been acknowledged by the OVSDB
996
 *     server.
997
 *
998
 *   -  'cs' now contains the content matches the new conditions.   */
999
unsigned int
1000
ovsdb_cs_get_condition_seqno(const struct ovsdb_cs *cs)
1001
0
{
1002
0
    return cs->data.cond_seqno;
1003
0
}
1004
1005
static struct json *
1006
ovsdb_cs_create_cond_change_req(const struct json *cond)
1007
0
{
1008
0
    struct json *monitor_cond_change_request = json_object_create();
1009
0
    json_object_put(monitor_cond_change_request, "where", json_clone(cond));
1010
0
    return monitor_cond_change_request;
1011
0
}
1012
1013
static struct jsonrpc_msg *
1014
ovsdb_cs_db_compose_cond_change(struct ovsdb_cs_db *db)
1015
0
{
1016
0
    if (!db->cond_changed) {
1017
0
        return NULL;
1018
0
    }
1019
1020
0
    struct json *monitor_cond_change_requests = NULL;
1021
0
    struct ovsdb_cs_db_table *table;
1022
0
    HMAP_FOR_EACH (table, hmap_node, &db->tables) {
1023
        /* Always use the most recent conditions set by the CS client when
1024
         * requesting monitor_cond_change, i.e., table->new_cond.
1025
         */
1026
0
        if (table->new_cond) {
1027
0
            struct json *req =
1028
0
                ovsdb_cs_create_cond_change_req(table->new_cond);
1029
0
            if (req) {
1030
0
                if (!monitor_cond_change_requests) {
1031
0
                    monitor_cond_change_requests = json_object_create();
1032
0
                }
1033
0
                json_object_put(monitor_cond_change_requests,
1034
0
                                table->name,
1035
0
                                json_array_create_1(req));
1036
0
            }
1037
            /* Mark the new condition as requested by moving it to req_cond.
1038
             * If there's already requested condition that's a bug.
1039
             */
1040
0
            ovs_assert(table->req_cond == NULL);
1041
0
            table->req_cond = table->new_cond;
1042
0
            table->new_cond = NULL;
1043
0
        }
1044
0
    }
1045
1046
0
    if (!monitor_cond_change_requests) {
1047
0
        return NULL;
1048
0
    }
1049
1050
0
    db->cond_changed = false;
1051
0
    struct json *params = json_array_create_3(json_clone(db->monitor_id),
1052
0
                                              json_clone(db->monitor_id),
1053
0
                                              monitor_cond_change_requests);
1054
0
    return jsonrpc_create_request("monitor_cond_change", params, NULL);
1055
0
}
1056
1057
/* Marks all requested table conditions in 'db' as acked by the server.
1058
 * It should be called when the server replies to monitor_cond_change
1059
 * requests.
1060
 */
1061
static void
1062
ovsdb_cs_db_ack_condition(struct ovsdb_cs_db *db)
1063
0
{
1064
0
    struct ovsdb_cs_db_table *table;
1065
0
    HMAP_FOR_EACH (table, hmap_node, &db->tables) {
1066
0
        if (table->req_cond) {
1067
0
            json_destroy(table->ack_cond);
1068
0
            table->ack_cond = table->req_cond;
1069
0
            table->req_cond = NULL;
1070
0
        }
1071
0
    }
1072
0
}
1073
1074
/* Should be called when the CS fsm is restarted and resyncs table conditions
1075
 * based on the state the DB is in:
1076
 * - if a non-zero last_id is available for the DB then upon reconnect
1077
 *   the CS should first request acked conditions to avoid missing updates
1078
 *   about records that were added before the transaction with
1079
 *   txn-id == last_id. If there were requested condition changes in flight
1080
 *   (i.e., req_cond not NULL) and the CS client didn't set new conditions
1081
 *   (i.e., new_cond is NULL) then move req_cond to new_cond to trigger a
1082
 *   follow up monitor_cond_change request.
1083
 * - if there's no last_id available for the DB then it's safe to use the
1084
 *   latest conditions set by the CS client even if they weren't acked yet.
1085
 */
1086
static void
1087
ovsdb_cs_db_sync_condition(struct ovsdb_cs_db *db)
1088
0
{
1089
0
    bool ack_all = uuid_is_zero(&db->last_id);
1090
0
    if (ack_all) {
1091
0
        db->cond_changed = false;
1092
0
    }
1093
1094
0
    struct ovsdb_cs_db_table *table;
1095
0
    HMAP_FOR_EACH (table, hmap_node, &db->tables) {
1096
        /* When monitor_cond_since requests will be issued, the
1097
         * table->ack_cond condition will be added to the "where" clause".
1098
         * Follow up monitor_cond_change requests will use table->new_cond.
1099
         */
1100
0
        if (ack_all) {
1101
0
            if (table->new_cond) {
1102
0
                json_destroy(table->req_cond);
1103
0
                table->req_cond = table->new_cond;
1104
0
                table->new_cond = NULL;
1105
0
            }
1106
1107
0
            if (table->req_cond) {
1108
0
                json_destroy(table->ack_cond);
1109
0
                table->ack_cond = table->req_cond;
1110
0
                table->req_cond = NULL;
1111
0
            }
1112
0
        } else {
1113
0
            if (table->req_cond) {
1114
                /* There was an in-flight monitor_cond_change request.  It's no
1115
                 * longer relevant in the restarted FSM, so clear it. */
1116
0
                if (table->new_cond) {
1117
                    /* We will send a new monitor_cond_change with the new
1118
                     * condition.  The previously in-flight condition is
1119
                     * irrelevant and we can just forget about it. */
1120
0
                    json_destroy(table->req_cond);
1121
0
                } else {
1122
                    /* The restarted FSM needs to again send a request for the
1123
                     * previously in-flight condition. */
1124
0
                    table->new_cond = table->req_cond;
1125
0
                }
1126
0
                table->req_cond = NULL;
1127
0
                db->cond_changed = true;
1128
1129
                /* There are two cases:
1130
                 * a. either the server already processed the requested monitor
1131
                 *    condition change but the FSM was restarted before the
1132
                 *    client was notified.  In this case the client should
1133
                 *    clear its local cache because it's out of sync with the
1134
                 *    monitor view on the server side.
1135
                 *
1136
                 * b. OR the server hasn't processed the requested monitor
1137
                 *    condition change yet.
1138
                 *
1139
                 * As there's no easy way to differentiate between the two,
1140
                 * and given that this condition should be rare, reset the
1141
                 * 'last_id', essentially flushing the local cached DB
1142
                 * contents.
1143
                 */
1144
0
                db->last_id = UUID_ZERO;
1145
0
            }
1146
0
        }
1147
0
    }
1148
0
}
1149
1150
static void
1151
ovsdb_cs_send_cond_change(struct ovsdb_cs *cs)
1152
0
{
1153
    /* When 'cs->request_id' is not NULL, there is an outstanding
1154
     * conditional monitoring update request that we have not heard
1155
     * from the server yet. Don't generate another request in this case. */
1156
0
    if (!jsonrpc_session_is_connected(cs->session)
1157
0
        || cs->data.monitor_version == 1
1158
0
        || cs->request_id) {
1159
0
        return;
1160
0
    }
1161
1162
0
    struct jsonrpc_msg *msg = ovsdb_cs_db_compose_cond_change(&cs->data);
1163
0
    if (msg) {
1164
0
        cs->request_id = json_clone(msg->id);
1165
0
        jsonrpc_session_send(cs->session, msg);
1166
0
    }
1167
0
}
1168

1169
/* Database change awareness. */
1170
1171
/* By default, or if 'set_db_change_aware' is true, 'cs' will send
1172
 * 'set_db_change_aware' request to the server after receiving the _SERVER data
1173
 * (when the server supports it), which is useful for clients that intends to
1174
 * keep long connections to the server.  Otherwise, 'cs' will not send the
1175
 * 'set_db_change_aware' request, which is more reasonable for short-lived
1176
 * connections to avoid unnecessary processing at the server side and possible
1177
 * error handling due to connections being closed by the clients before the
1178
 * responses are sent by the server. */
1179
void
1180
ovsdb_cs_set_db_change_aware(struct ovsdb_cs *cs, bool set_db_change_aware)
1181
0
{
1182
0
    cs->set_db_change_aware = set_db_change_aware;
1183
0
}
1184

1185
/* Clustered servers. */
1186
1187
/* By default, or if 'leader_only' is true, when 'cs' connects to a clustered
1188
 * database, the CS layer will avoid servers other than the cluster
1189
 * leader. This ensures that any data that it reads and reports is up-to-date.
1190
 * If 'leader_only' is false, the CS layer will accept any server in the
1191
 * cluster, which means that for read-only transactions it can report and act
1192
 * on stale data (transactions that modify the database are always serialized
1193
 * even with false 'leader_only').  Refer to Understanding Cluster Consistency
1194
 * in ovsdb(7) for more information. */
1195
void
1196
ovsdb_cs_set_leader_only(struct ovsdb_cs *cs, bool leader_only)
1197
0
{
1198
0
    cs->leader_only = leader_only;
1199
0
    if (leader_only && cs->server.monitor_version) {
1200
0
        ovsdb_cs_check_server_db(cs);
1201
0
    }
1202
0
}
1203
1204
/* Set whether the order of remotes should be shuffled, when there is more than
1205
 * one remote.  The setting doesn't take effect until the next time when
1206
 * ovsdb_cs_set_remote() is called. */
1207
void
1208
ovsdb_cs_set_shuffle_remotes(struct ovsdb_cs *cs, bool shuffle)
1209
0
{
1210
0
    cs->shuffle_remotes = shuffle;
1211
0
}
1212
1213
/* Reset min_index to 0. This prevents a situation where the client
1214
 * thinks all databases have stale data, when they actually have all
1215
 * been destroyed and rebuilt from scratch.
1216
 */
1217
void
1218
ovsdb_cs_reset_min_index(struct ovsdb_cs *cs)
1219
0
{
1220
0
    cs->min_index = 0;
1221
0
}
1222

1223
/* Database locks. */
1224
1225
static struct jsonrpc_msg *
1226
ovsdb_cs_db_set_lock(struct ovsdb_cs_db *db, const char *lock_name)
1227
0
{
1228
0
    if (db->lock_name
1229
0
        && (!lock_name || strcmp(lock_name, db->lock_name))) {
1230
        /* Release previous lock. */
1231
0
        struct jsonrpc_msg *msg = ovsdb_cs_db_compose_unlock_request(db);
1232
0
        free(db->lock_name);
1233
0
        db->lock_name = NULL;
1234
0
        db->is_lock_contended = false;
1235
0
        return msg;
1236
0
    }
1237
1238
0
    if (lock_name && !db->lock_name) {
1239
        /* Acquire new lock. */
1240
0
        db->lock_name = xstrdup(lock_name);
1241
0
        return ovsdb_cs_db_compose_lock_request(db);
1242
0
    }
1243
1244
0
    return NULL;
1245
0
}
1246
1247
/* If 'lock_name' is nonnull, configures 'cs' to obtain the named lock from the
1248
 * database server and to prevent modifying the database when the lock cannot
1249
 * be acquired (that is, when another client has the same lock).
1250
 *
1251
 * If 'lock_name' is NULL, drops the locking requirement and releases the
1252
 * lock. */
1253
void
1254
ovsdb_cs_set_lock(struct ovsdb_cs *cs, const char *lock_name)
1255
0
{
1256
0
    for (;;) {
1257
0
        struct jsonrpc_msg *msg = ovsdb_cs_db_set_lock(&cs->data, lock_name);
1258
0
        if (!msg) {
1259
0
            break;
1260
0
        }
1261
0
        if (cs->session) {
1262
0
            jsonrpc_session_send(cs->session, msg);
1263
0
        } else {
1264
0
            jsonrpc_msg_destroy(msg);
1265
0
        }
1266
0
    }
1267
0
}
1268
1269
/* Returns the name of the lock that 'cs' is trying to obtain, or NULL if none
1270
 * is configured. */
1271
const char *
1272
ovsdb_cs_get_lock(const struct ovsdb_cs *cs)
1273
0
{
1274
0
    return cs->data.lock_name;
1275
0
}
1276
1277
/* Returns true if 'cs' is configured to obtain a lock and owns that lock,
1278
 * false if it doesn't own the lock or isn't configured to obtain one.
1279
 *
1280
 * Locking and unlocking happens asynchronously from the database client's
1281
 * point of view, so the information is only useful for optimization (e.g. if
1282
 * the client doesn't have the lock then there's no point in trying to write to
1283
 * the database). */
1284
bool
1285
ovsdb_cs_has_lock(const struct ovsdb_cs *cs)
1286
0
{
1287
0
    return cs->data.has_lock;
1288
0
}
1289
1290
/* Returns true if 'cs' is configured to obtain a lock but the database server
1291
 * has indicated that some other client already owns the requested lock. */
1292
bool
1293
ovsdb_cs_is_lock_contended(const struct ovsdb_cs *cs)
1294
0
{
1295
0
    return cs->data.is_lock_contended;
1296
0
}
1297
1298
static void
1299
ovsdb_cs_db_update_has_lock(struct ovsdb_cs_db *db, bool new_has_lock)
1300
0
{
1301
0
    if (new_has_lock && !db->has_lock) {
1302
0
        ovsdb_cs_db_add_event(db, OVSDB_CS_EVENT_TYPE_LOCKED);
1303
0
        db->is_lock_contended = false;
1304
0
    }
1305
0
    db->has_lock = new_has_lock;
1306
0
}
1307
1308
static bool
1309
ovsdb_cs_db_process_lock_replies(struct ovsdb_cs_db *db,
1310
                                  const struct jsonrpc_msg *msg)
1311
0
{
1312
0
    if (msg->type == JSONRPC_REPLY
1313
0
        && db->lock_request_id
1314
0
        && json_equal(db->lock_request_id, msg->id)) {
1315
        /* Reply to our "lock" request. */
1316
0
        ovsdb_cs_db_parse_lock_reply(db, msg->result);
1317
0
        return true;
1318
0
    }
1319
1320
0
    if (msg->type == JSONRPC_NOTIFY) {
1321
0
        if (!strcmp(msg->method, "locked")) {
1322
            /* We got our lock. */
1323
0
            return ovsdb_cs_db_parse_lock_notify(db, msg->params, true);
1324
0
        } else if (!strcmp(msg->method, "stolen")) {
1325
            /* Someone else stole our lock. */
1326
0
            return ovsdb_cs_db_parse_lock_notify(db, msg->params, false);
1327
0
        }
1328
0
    }
1329
1330
0
    return false;
1331
0
}
1332
1333
static struct jsonrpc_msg *
1334
ovsdb_cs_db_compose_lock_request__(struct ovsdb_cs_db *db,
1335
                                    const char *method)
1336
0
{
1337
0
    ovsdb_cs_db_update_has_lock(db, false);
1338
1339
0
    json_destroy(db->lock_request_id);
1340
0
    db->lock_request_id = NULL;
1341
1342
0
    struct json *params = json_array_create_1(json_string_create(
1343
0
                                                  db->lock_name));
1344
0
    return jsonrpc_create_request(method, params, NULL);
1345
0
}
1346
1347
static struct jsonrpc_msg *
1348
ovsdb_cs_db_compose_lock_request(struct ovsdb_cs_db *db)
1349
0
{
1350
0
    struct jsonrpc_msg *msg = ovsdb_cs_db_compose_lock_request__(db, "lock");
1351
0
    db->lock_request_id = json_clone(msg->id);
1352
0
    return msg;
1353
0
}
1354
1355
static struct jsonrpc_msg *
1356
ovsdb_cs_db_compose_unlock_request(struct ovsdb_cs_db *db)
1357
0
{
1358
0
    return ovsdb_cs_db_compose_lock_request__(db, "unlock");
1359
0
}
1360
1361
static void
1362
ovsdb_cs_db_parse_lock_reply(struct ovsdb_cs_db *db,
1363
                              const struct json *result)
1364
0
{
1365
0
    bool got_lock;
1366
1367
0
    json_destroy(db->lock_request_id);
1368
0
    db->lock_request_id = NULL;
1369
1370
0
    if (result->type == JSON_OBJECT) {
1371
0
        const struct json *locked;
1372
1373
0
        locked = shash_find_data(json_object(result), "locked");
1374
0
        got_lock = locked && locked->type == JSON_TRUE;
1375
0
    } else {
1376
0
        got_lock = false;
1377
0
    }
1378
1379
0
    ovsdb_cs_db_update_has_lock(db, got_lock);
1380
0
    if (!got_lock) {
1381
0
        db->is_lock_contended = true;
1382
0
    }
1383
0
}
1384
1385
static bool
1386
ovsdb_cs_db_parse_lock_notify(struct ovsdb_cs_db *db,
1387
                               const struct json *params,
1388
                               bool new_has_lock)
1389
0
{
1390
0
    if (db->lock_name
1391
0
        && params->type == JSON_ARRAY
1392
0
        && json_array_size(params) > 0
1393
0
        && json_array_at(params, 0)->type == JSON_STRING) {
1394
0
        const char *lock_name = json_string(json_array_at(params, 0));
1395
1396
0
        if (!strcmp(db->lock_name, lock_name)) {
1397
0
            ovsdb_cs_db_update_has_lock(db, new_has_lock);
1398
0
            if (!new_has_lock) {
1399
0
                db->is_lock_contended = true;
1400
0
            }
1401
0
            return true;
1402
0
        }
1403
0
    }
1404
0
    return false;
1405
0
}
1406

1407
/* Transactions. */
1408
1409
static bool
1410
ovsdb_cs_db_txn_process_reply(struct ovsdb_cs *cs,
1411
                              const struct jsonrpc_msg *reply)
1412
0
{
1413
0
    bool found = ovsdb_cs_forget_transaction(cs, reply->id);
1414
0
    if (found) {
1415
0
        struct ovsdb_cs_event *event
1416
0
            = ovsdb_cs_db_add_event(&cs->data, OVSDB_CS_EVENT_TYPE_TXN_REPLY);
1417
0
        event->txn_reply = jsonrpc_msg_clone(reply);
1418
0
    }
1419
0
    return found;
1420
0
}
1421
1422
/* Returns true if 'cs' can be sent a transaction now, false otherwise.  This
1423
 * is useful for optimization: there is no point in composing and sending a
1424
 * transaction if it returns false. */
1425
bool
1426
ovsdb_cs_may_send_transaction(const struct ovsdb_cs *cs)
1427
0
{
1428
0
    return (cs->session != NULL
1429
0
            && cs->state == CS_S_MONITORING
1430
0
            && (!cs->data.lock_name || ovsdb_cs_has_lock(cs)));
1431
0
}
1432
1433
/* Attempts to send a transaction with the specified 'operations' to 'cs''s
1434
 * server.  On success, returns the request ID; the caller must eventually free
1435
 * it.  On failure, returns NULL. */
1436
struct json * OVS_WARN_UNUSED_RESULT
1437
ovsdb_cs_send_transaction(struct ovsdb_cs *cs, struct json *operations)
1438
0
{
1439
0
    if (!ovsdb_cs_may_send_transaction(cs)) {
1440
0
        json_destroy(operations);
1441
0
        return NULL;
1442
0
    }
1443
1444
0
    if (cs->data.lock_name) {
1445
0
        struct json *assertion = json_object_create();
1446
0
        json_object_put_string(assertion, "op", "assert");
1447
0
        json_object_put_string(assertion, "lock", cs->data.lock_name);
1448
0
        json_array_add(operations, assertion);
1449
0
    }
1450
1451
0
    struct json *request_id;
1452
0
    struct jsonrpc_msg *request = jsonrpc_create_request(
1453
0
        "transact", operations, &request_id);
1454
0
    int error = jsonrpc_session_send(cs->session, request);
1455
0
    if (error) {
1456
0
        json_destroy(request_id);
1457
0
        return NULL;
1458
0
    }
1459
1460
0
    if (cs->n_txns >= cs->allocated_txns) {
1461
0
        cs->txns = x2nrealloc(cs->txns, &cs->allocated_txns,
1462
0
                              sizeof *cs->txns);
1463
0
    }
1464
0
    cs->txns[cs->n_txns++] = request_id;
1465
0
    return json_clone(request_id);
1466
0
}
1467
1468
/* Makes 'cs' drop its record of transaction 'request_id'.  If a reply arrives
1469
 * for it later (which it will, unless the connection drops in the meantime),
1470
 * it won't be reported through an event.
1471
 *
1472
 * Returns true if 'request_id' was known, false otherwise. */
1473
bool
1474
ovsdb_cs_forget_transaction(struct ovsdb_cs *cs, const struct json *request_id)
1475
0
{
1476
0
    for (size_t i = 0; i < cs->n_txns; i++) {
1477
0
        if (json_equal(request_id, cs->txns[i])) {
1478
0
            json_destroy(cs->txns[i]);
1479
0
            cs->txns[i] = cs->txns[--cs->n_txns];
1480
0
            return true;
1481
0
        }
1482
0
    }
1483
0
    return false;
1484
0
}
1485

1486
static void
1487
ovsdb_cs_send_schema_request(struct ovsdb_cs *cs,
1488
                              struct ovsdb_cs_db *db)
1489
0
{
1490
0
    ovsdb_cs_send_request(cs, jsonrpc_create_request(
1491
0
                               "get_schema",
1492
0
                               json_array_create_1(json_string_create(
1493
0
                                                       db->db_name)),
1494
0
                               NULL));
1495
0
}
1496
1497
static void
1498
ovsdb_cs_send_db_change_aware(struct ovsdb_cs *cs)
1499
0
{
1500
0
    struct jsonrpc_msg *msg = jsonrpc_create_request(
1501
0
        "set_db_change_aware", json_array_create_1(json_boolean_create(true)),
1502
0
        NULL);
1503
0
    jsonrpc_session_send(cs->session, msg);
1504
0
}
1505
1506
static void
1507
ovsdb_cs_send_monitor_request(struct ovsdb_cs *cs, struct ovsdb_cs_db *db,
1508
                              int version)
1509
0
{
1510
0
    struct json *mrs = db->ops->compose_monitor_requests(
1511
0
        db->schema, db->ops_aux);
1512
    /* XXX handle failure */
1513
0
    ovs_assert(mrs->type == JSON_OBJECT);
1514
1515
0
    if (version > 1) {
1516
0
        struct ovsdb_cs_db_table *table;
1517
0
        HMAP_FOR_EACH (table, hmap_node, &db->tables) {
1518
0
            if (table->ack_cond) {
1519
0
                struct json *mr = shash_find_data(json_object(mrs),
1520
0
                                                  table->name);
1521
0
                if (!mr) {
1522
0
                    mr = json_array_create_empty();
1523
0
                    json_object_put(mrs, table->name, mr);
1524
0
                }
1525
0
                ovs_assert(mr->type == JSON_ARRAY);
1526
1527
0
                struct json *mr0;
1528
0
                if (json_array_size(mr) == 0) {
1529
0
                    mr0 = json_object_create();
1530
0
                    json_object_put(mr0, "columns", json_array_create_empty());
1531
0
                    json_array_add(mr, mr0);
1532
0
                } else {
1533
0
                    mr0 = CONST_CAST(struct json *, json_array_at(mr, 0));
1534
0
                }
1535
0
                ovs_assert(mr0->type == JSON_OBJECT);
1536
1537
0
                json_object_put(mr0, "where",
1538
0
                                json_clone(table->ack_cond));
1539
0
            }
1540
0
        }
1541
0
    }
1542
1543
0
    const char *method = (version == 1 ? "monitor"
1544
0
                          : version == 2 ? "monitor_cond"
1545
0
                          : "monitor_cond_since");
1546
0
    struct json *params = json_array_create_3(
1547
0
                              json_string_create(db->db_name),
1548
0
                              json_clone(db->monitor_id),
1549
0
                              mrs);
1550
0
    if (version == 3) {
1551
0
        json_array_add(params, json_string_create_uuid(&db->last_id));
1552
0
    }
1553
0
    ovsdb_cs_send_request(cs, jsonrpc_create_request(method, params, NULL));
1554
0
}
1555
1556
static void
1557
log_parse_update_error(struct ovsdb_error *error)
1558
0
{
1559
0
    if (!VLOG_DROP_WARN(&syntax_rl)) {
1560
0
        char *s = ovsdb_error_to_string(error);
1561
0
        VLOG_WARN_RL(&syntax_rl, "%s", s);
1562
0
        free(s);
1563
0
    }
1564
0
    ovsdb_error_destroy(error);
1565
0
}
1566
1567
static void
1568
ovsdb_cs_db_add_update(struct ovsdb_cs_db *db,
1569
                       const struct json *table_updates, int version,
1570
                       bool clear, bool monitor_reply)
1571
0
{
1572
0
    struct ovsdb_cs_event *event = ovsdb_cs_db_add_event(
1573
0
        db, OVSDB_CS_EVENT_TYPE_UPDATE);
1574
0
    event->update = (struct ovsdb_cs_update_event) {
1575
0
        .table_updates = json_clone(table_updates),
1576
0
        .clear = clear,
1577
0
        .monitor_reply = monitor_reply,
1578
0
        .version = version,
1579
0
        .last_id = db->last_id,
1580
0
    };
1581
0
}
1582
1583
static void
1584
ovsdb_cs_db_parse_monitor_reply(struct ovsdb_cs_db *db,
1585
                                const struct json *result, int version)
1586
0
{
1587
0
    const struct json *table_updates;
1588
0
    bool clear;
1589
0
    if (version == 3) {
1590
0
        if (result->type != JSON_ARRAY || json_array_size(result) != 3
1591
0
            || (json_array_at(result, 0)->type != JSON_TRUE &&
1592
0
                json_array_at(result, 0)->type != JSON_FALSE)
1593
0
            || json_array_at(result, 1)->type != JSON_STRING
1594
0
            || !uuid_from_string(&db->last_id,
1595
0
                                 json_string(json_array_at(result, 1)))) {
1596
0
            struct ovsdb_error *error = ovsdb_syntax_error(
1597
0
                result, NULL, "bad monitor_cond_since reply format");
1598
0
            log_parse_update_error(error);
1599
0
            return;
1600
0
        }
1601
1602
0
        bool found = json_boolean(json_array_at(result, 0));
1603
0
        clear = !found;
1604
0
        table_updates = json_array_at(result, 2);
1605
0
    } else {
1606
0
        clear = true;
1607
0
        table_updates = result;
1608
0
    }
1609
1610
0
    ovsdb_cs_db_add_update(db, table_updates, version, clear, true);
1611
0
}
1612
1613
static bool
1614
ovsdb_cs_db_parse_update_rpc(struct ovsdb_cs_db *db,
1615
                             const struct jsonrpc_msg *msg)
1616
0
{
1617
0
    if (msg->type != JSONRPC_NOTIFY) {
1618
0
        return false;
1619
0
    }
1620
1621
0
    int version = (!strcmp(msg->method, "update") ? 1
1622
0
                   : !strcmp(msg->method, "update2") ? 2
1623
0
                   : !strcmp(msg->method, "update3") ? 3
1624
0
                   : 0);
1625
0
    if (!version) {
1626
0
        return false;
1627
0
    }
1628
1629
0
    struct json *params = msg->params;
1630
0
    int n = version == 3 ? 3 : 2;
1631
0
    if (params->type != JSON_ARRAY || json_array_size(params) != n) {
1632
0
        struct ovsdb_error *error = ovsdb_syntax_error(
1633
0
            params, NULL, "%s must be an array with %u elements.",
1634
0
            msg->method, n);
1635
0
        log_parse_update_error(error);
1636
0
        return false;
1637
0
    }
1638
1639
0
    if (!json_equal(json_array_at(params, 0), db->monitor_id)) {
1640
0
        return false;
1641
0
    }
1642
1643
0
    if (version == 3) {
1644
0
        const char *last_id = json_string(json_array_at(params, 1));
1645
0
        if (!uuid_from_string(&db->last_id, last_id)) {
1646
0
            struct ovsdb_error *error = ovsdb_syntax_error(
1647
0
                params, NULL, "Last-id %s is not in UUID format.", last_id);
1648
0
            log_parse_update_error(error);
1649
0
            return false;
1650
0
        }
1651
0
    }
1652
1653
0
    const struct json *table_updates = json_array_at(params,
1654
0
                                                     version == 3 ? 2 : 1);
1655
0
    ovsdb_cs_db_add_update(db, table_updates, version, false, false);
1656
0
    return true;
1657
0
}
1658
1659
static bool
1660
ovsdb_cs_handle_monitor_canceled(struct ovsdb_cs *cs,
1661
                                 struct ovsdb_cs_db *db,
1662
                                 const struct jsonrpc_msg *msg)
1663
0
{
1664
0
    if (msg->type != JSONRPC_NOTIFY
1665
0
        || strcmp(msg->method, "monitor_canceled")
1666
0
        || msg->params->type != JSON_ARRAY
1667
0
        || json_array_size(msg->params) != 1
1668
0
        || !json_equal(json_array_at(msg->params, 0), db->monitor_id)) {
1669
0
        return false;
1670
0
    }
1671
1672
0
    db->monitor_version = 0;
1673
1674
    /* Cancel the other monitor and restart the FSM from the top.
1675
     *
1676
     * Maybe a more sophisticated response would be better in some cases, but
1677
     * it doesn't seem worth optimizing yet.  (Although this is already more
1678
     * sophisticated than just dropping the connection and reconnecting.) */
1679
0
    struct ovsdb_cs_db *other_db
1680
0
        = db == &cs->data ? &cs->server : &cs->data;
1681
0
    if (other_db->monitor_version) {
1682
0
        jsonrpc_session_send(
1683
0
            cs->session,
1684
0
            jsonrpc_create_request(
1685
0
                "monitor_cancel",
1686
0
                json_array_create_1(json_clone(other_db->monitor_id)), NULL));
1687
0
        other_db->monitor_version = 0;
1688
0
    }
1689
0
    ovsdb_cs_restart_fsm(cs);
1690
1691
0
    return true;
1692
0
}
1693

1694
/* The _Server database.
1695
 *
1696
 * We replicate the Database table in the _Server database because this is the
1697
 * only way to find out properties we need to know for clustering, such as
1698
 * whether a database is clustered at all and whether this server is the
1699
 * leader.
1700
 *
1701
 * This code implements a kind of simple IDL-like layer. */
1702
1703
struct server_column {
1704
    const char *name;
1705
    struct ovsdb_type type;
1706
};
1707
enum server_column_index {
1708
    COL_NAME,
1709
    COL_MODEL,
1710
    COL_CONNECTED,
1711
    COL_LEADER,
1712
    COL_SCHEMA,
1713
    COL_CID,
1714
    COL_INDEX,
1715
};
1716
#define OPTIONAL_COLUMN(TYPE) \
1717
    {                                           \
1718
        .key = OVSDB_BASE_##TYPE##_INIT,        \
1719
        .value = OVSDB_BASE_VOID_INIT,          \
1720
        .n_min = 0,                             \
1721
        .n_max = 1                              \
1722
    }
1723
static const struct server_column server_columns[] = {
1724
    [COL_NAME] = {"name",  OPTIONAL_COLUMN(STRING) },
1725
    [COL_MODEL] = {"model", OPTIONAL_COLUMN(STRING) },
1726
    [COL_CONNECTED] = {"connected", OPTIONAL_COLUMN(BOOLEAN) },
1727
    [COL_LEADER] = {"leader", OPTIONAL_COLUMN(BOOLEAN) },
1728
    [COL_SCHEMA] = {"schema", OPTIONAL_COLUMN(STRING) },
1729
    [COL_CID] = {"cid", OPTIONAL_COLUMN(UUID) },
1730
    [COL_INDEX] = {"index", OPTIONAL_COLUMN(INTEGER) },
1731
};
1732
0
#define N_SERVER_COLUMNS ARRAY_SIZE(server_columns)
1733
struct server_row {
1734
    struct hmap_node hmap_node;
1735
    struct uuid uuid;
1736
    struct ovsdb_datum data[N_SERVER_COLUMNS];
1737
};
1738
1739
static void
1740
server_row_destroy(struct server_row *row)
1741
0
{
1742
0
    if (row) {
1743
0
        for (size_t i = 0; i < N_SERVER_COLUMNS; i++) {
1744
0
            ovsdb_datum_destroy(&row->data[i], &server_columns[i].type);
1745
0
        }
1746
0
        free(row);
1747
0
    }
1748
0
}
1749
1750
static struct server_row *
1751
ovsdb_cs_find_server_row(struct ovsdb_cs *cs, const struct uuid *uuid)
1752
0
{
1753
0
    struct server_row *row;
1754
0
    HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) {
1755
0
        if (uuid_equals(uuid, &row->uuid)) {
1756
0
            return row;
1757
0
        }
1758
0
    }
1759
0
    return NULL;
1760
0
}
1761
1762
static void
1763
ovsdb_cs_delete_server_row(struct ovsdb_cs *cs, struct server_row *row)
1764
0
{
1765
0
    hmap_remove(&cs->server_rows, &row->hmap_node);
1766
0
    server_row_destroy(row);
1767
0
}
1768
1769
static struct server_row *
1770
ovsdb_cs_insert_server_row(struct ovsdb_cs *cs, const struct uuid *uuid)
1771
0
{
1772
0
    struct server_row *row = xmalloc(sizeof *row);
1773
0
    hmap_insert(&cs->server_rows, &row->hmap_node, uuid_hash(uuid));
1774
0
    row->uuid = *uuid;
1775
0
    for (size_t i = 0; i < N_SERVER_COLUMNS; i++) {
1776
0
        ovsdb_datum_init_default(&row->data[i], &server_columns[i].type);
1777
0
    }
1778
0
    return row;
1779
0
}
1780
1781
static void
1782
ovsdb_cs_update_server_row(struct server_row *row,
1783
                           const struct shash *update, bool xor)
1784
0
{
1785
0
    for (size_t i = 0; i < N_SERVER_COLUMNS; i++) {
1786
0
        const struct server_column *column = &server_columns[i];
1787
0
        struct shash_node *node = shash_find(update, column->name);
1788
0
        if (!node) {
1789
0
            continue;
1790
0
        }
1791
0
        const struct json *json = node->data;
1792
1793
0
        struct ovsdb_datum *old = &row->data[i];
1794
0
        struct ovsdb_datum new;
1795
0
        if (!xor) {
1796
0
            struct ovsdb_error *error = ovsdb_datum_from_json(
1797
0
                &new, &column->type, json, NULL);
1798
0
            if (error) {
1799
0
                ovsdb_error_destroy(error);
1800
0
                continue;
1801
0
            }
1802
0
        } else {
1803
0
            struct ovsdb_datum diff;
1804
0
            struct ovsdb_error *error = ovsdb_transient_datum_from_json(
1805
0
                &diff, &column->type, json);
1806
0
            if (error) {
1807
0
                ovsdb_error_destroy(error);
1808
0
                continue;
1809
0
            }
1810
1811
0
            error = ovsdb_datum_apply_diff(&new, old, &diff, &column->type);
1812
0
            if (error) {
1813
0
                ovsdb_error_destroy(error);
1814
0
                ovsdb_datum_destroy(&new, &column->type);
1815
0
                continue;
1816
0
            }
1817
0
            ovsdb_datum_destroy(&diff, &column->type);
1818
0
        }
1819
1820
0
        ovsdb_datum_destroy(&row->data[i], &column->type);
1821
0
        row->data[i] = new;
1822
0
    }
1823
0
}
1824
1825
static void
1826
ovsdb_cs_clear_server_rows(struct ovsdb_cs *cs)
1827
0
{
1828
0
    struct server_row *row;
1829
0
    HMAP_FOR_EACH_SAFE (row, hmap_node, &cs->server_rows) {
1830
0
        ovsdb_cs_delete_server_row(cs, row);
1831
0
    }
1832
0
}
1833
1834
static void log_parse_update_error(struct ovsdb_error *);
1835
1836
static void
1837
ovsdb_cs_process_server_event(struct ovsdb_cs *cs,
1838
                              const struct ovsdb_cs_event *event)
1839
0
{
1840
0
    ovs_assert(event->type == OVSDB_CS_EVENT_TYPE_UPDATE);
1841
1842
0
    const struct ovsdb_cs_update_event *update = &event->update;
1843
0
    struct ovsdb_cs_db_update *du;
1844
0
    struct ovsdb_error *error = ovsdb_cs_parse_db_update(
1845
0
        update->table_updates, update->version, &du);
1846
0
    if (error) {
1847
0
        log_parse_update_error(error);
1848
0
        return;
1849
0
    }
1850
1851
0
    if (update->clear) {
1852
0
        ovsdb_cs_clear_server_rows(cs);
1853
0
    }
1854
1855
0
    const struct ovsdb_cs_table_update *tu = ovsdb_cs_db_update_find_table(
1856
0
        du, "Database");
1857
0
    if (tu) {
1858
0
        for (size_t i = 0; i < tu->n; i++) {
1859
0
            const struct ovsdb_cs_row_update *ru = &tu->row_updates[i];
1860
0
            struct server_row *row
1861
0
                = ovsdb_cs_find_server_row(cs, &ru->row_uuid);
1862
0
            if (ru->type == OVSDB_CS_ROW_DELETE) {
1863
0
                ovsdb_cs_delete_server_row(cs, row);
1864
0
            } else {
1865
0
                if (!row) {
1866
0
                    row = ovsdb_cs_insert_server_row(cs, &ru->row_uuid);
1867
0
                }
1868
0
                ovsdb_cs_update_server_row(row, ru->columns,
1869
0
                                           ru->type == OVSDB_CS_ROW_XOR);
1870
0
            }
1871
0
        }
1872
0
    }
1873
1874
0
    ovsdb_cs_db_update_destroy(du);
1875
0
}
1876
1877
static const char *
1878
server_column_get_string(const struct server_row *row,
1879
                         enum server_column_index index,
1880
                         const char *default_value)
1881
0
{
1882
0
    ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_STRING);
1883
0
    const struct ovsdb_datum *d = &row->data[index];
1884
0
    return d->n == 1 ? json_string(d->keys[0].s) : default_value;
1885
0
}
1886
1887
static bool
1888
server_column_get_bool(const struct server_row *row,
1889
                       enum server_column_index index,
1890
                       bool default_value)
1891
0
{
1892
0
    ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_BOOLEAN);
1893
0
    const struct ovsdb_datum *d = &row->data[index];
1894
0
    return d->n == 1 ? d->keys[0].boolean : default_value;
1895
0
}
1896
1897
static uint64_t
1898
server_column_get_int(const struct server_row *row,
1899
                      enum server_column_index index,
1900
                      uint64_t default_value)
1901
0
{
1902
0
    ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_INTEGER);
1903
0
    const struct ovsdb_datum *d = &row->data[index];
1904
0
    return d->n == 1 ? d->keys[0].integer : default_value;
1905
0
}
1906
1907
static const struct uuid *
1908
server_column_get_uuid(const struct server_row *row,
1909
                       enum server_column_index index,
1910
                       const struct uuid *default_value)
1911
0
{
1912
0
    ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_UUID);
1913
0
    const struct ovsdb_datum *d = &row->data[index];
1914
0
    return d->n == 1 ? &d->keys[0].uuid : default_value;
1915
0
}
1916
1917
static const struct server_row *
1918
ovsdb_find_server_row(struct ovsdb_cs *cs)
1919
0
{
1920
0
    const struct server_row *row;
1921
0
    HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) {
1922
0
        const struct uuid *cid = server_column_get_uuid(row, COL_CID, NULL);
1923
0
        const char *name = server_column_get_string(row, COL_NAME, NULL);
1924
0
        if (uuid_is_zero(&cs->cid)
1925
0
            ? (name && !strcmp(cs->data.db_name, name))
1926
0
            : (cid && uuid_equals(cid, &cs->cid))) {
1927
0
            return row;
1928
0
        }
1929
0
    }
1930
0
    return NULL;
1931
0
}
1932
1933
static void OVS_UNUSED
1934
ovsdb_log_server_rows(const struct ovsdb_cs *cs)
1935
0
{
1936
0
    int row_num = 0;
1937
0
    const struct server_row *row;
1938
0
    HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) {
1939
0
        struct ds s = DS_EMPTY_INITIALIZER;
1940
0
        for (size_t i = 0; i < N_SERVER_COLUMNS; i++) {
1941
0
            ds_put_format(&s, " %s=", server_columns[i].name);
1942
0
            if (i == COL_SCHEMA) {
1943
0
                ds_put_format(&s, "...");
1944
0
            } else {
1945
0
                ovsdb_datum_to_string(&row->data[i], &server_columns[i].type,
1946
0
                                      &s);
1947
0
            }
1948
0
        }
1949
0
        VLOG_INFO("row %d:%s", row_num++, ds_cstr(&s));
1950
0
        ds_destroy(&s);
1951
0
    }
1952
0
}
1953
1954
static bool
1955
ovsdb_cs_check_server_db__(struct ovsdb_cs *cs)
1956
0
{
1957
0
    struct ovsdb_cs_event *event;
1958
0
    LIST_FOR_EACH_POP (event, list_node, &cs->server.events) {
1959
0
        ovsdb_cs_process_server_event(cs, event);
1960
0
        ovsdb_cs_event_destroy(event);
1961
0
    }
1962
1963
0
    const struct server_row *db_row = ovsdb_find_server_row(cs);
1964
0
    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5);
1965
0
    const char *server_name = jsonrpc_session_get_name(cs->session);
1966
0
    if (!db_row) {
1967
0
        VLOG_INFO_RL(&rl, "%s: server does not have %s database",
1968
0
                     server_name, cs->data.db_name);
1969
0
        return false;
1970
0
    }
1971
1972
0
    bool ok = false;
1973
0
    const char *model = server_column_get_string(db_row, COL_MODEL, "");
1974
0
    const char *schema = server_column_get_string(db_row, COL_SCHEMA, NULL);
1975
0
    bool connected = server_column_get_bool(db_row, COL_CONNECTED, false);
1976
0
    if (!strcmp(model, "clustered")) {
1977
0
        bool leader = server_column_get_bool(db_row, COL_LEADER, false);
1978
0
        uint64_t index = server_column_get_int(db_row, COL_INDEX, 0);
1979
1980
0
        if (!schema) {
1981
0
            VLOG_INFO("%s: clustered database server has not yet joined "
1982
0
                      "cluster; trying another server", server_name);
1983
0
        } else if (!connected) {
1984
0
            VLOG_INFO("%s: clustered database server is disconnected "
1985
0
                      "from cluster; trying another server", server_name);
1986
0
        } else if (cs->leader_only && !leader) {
1987
0
            VLOG_INFO("%s: clustered database server is not cluster "
1988
0
                      "leader; trying another server", server_name);
1989
0
        } else if (index < cs->min_index) {
1990
0
            VLOG_WARN("%s: clustered database server has stale data; "
1991
0
                      "trying another server", server_name);
1992
0
        } else {
1993
0
            cs->min_index = index;
1994
0
            ok = true;
1995
0
        }
1996
0
    } else if (!strcmp(model, "relay")) {
1997
0
        if (!schema) {
1998
0
            VLOG_INFO("%s: relay database server has not yet connected to the "
1999
0
                      "relay source; trying another server", server_name);
2000
0
        } else if (!connected) {
2001
0
            VLOG_INFO("%s: relay database server is disconnected from the "
2002
0
                      "relay source; trying another server", server_name);
2003
0
        } else if (cs->leader_only) {
2004
0
            VLOG_INFO("%s: relay database server cannot be a leader; "
2005
0
                      "trying another server", server_name);
2006
0
        } else {
2007
0
            ok = true;
2008
0
        }
2009
0
    } else {
2010
0
        if (!schema) {
2011
0
            VLOG_INFO("%s: missing database schema", server_name);
2012
0
        } else {
2013
0
            ok = true;
2014
0
        }
2015
0
    }
2016
0
    if (!ok) {
2017
0
        return false;
2018
0
    }
2019
2020
0
    if (cs->state == CS_S_SERVER_MONITOR_REQUESTED) {
2021
0
        json_destroy(cs->data.schema);
2022
0
        cs->data.schema = json_from_string(schema);
2023
0
        if (cs->data.max_version >= 3) {
2024
0
            ovsdb_cs_send_monitor_request(cs, &cs->data, 3);
2025
0
            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_SINCE_REQUESTED);
2026
0
        } else if (cs->data.max_version >= 2) {
2027
0
            ovsdb_cs_send_monitor_request(cs, &cs->data, 2);
2028
0
            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED);
2029
0
        } else {
2030
0
            ovsdb_cs_send_monitor_request(cs, &cs->data, 1);
2031
0
            ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED);
2032
0
        }
2033
0
    }
2034
0
    return true;
2035
0
}
2036
2037
static bool
2038
ovsdb_cs_check_server_db(struct ovsdb_cs *cs)
2039
0
{
2040
0
    bool ok = ovsdb_cs_check_server_db__(cs);
2041
0
    if (!ok) {
2042
0
        ovsdb_cs_retry(cs);
2043
0
    }
2044
0
    return ok;
2045
0
}
2046
2047
static struct json *
2048
ovsdb_cs_compose_server_monitor_request(const struct json *schema_json,
2049
                                        void *cs_)
2050
0
{
2051
0
    struct ovsdb_cs *cs = cs_;
2052
0
    struct shash *schema = ovsdb_cs_parse_schema(schema_json);
2053
0
    struct json *monitor_requests = json_object_create();
2054
2055
0
    const char *table_name = "Database";
2056
0
    const struct sset *table_schema
2057
0
        = schema ? shash_find_data(schema, table_name) : NULL;
2058
0
    if (!table_schema) {
2059
0
        VLOG_WARN("%s database lacks %s table "
2060
0
                  "(database needs upgrade?)",
2061
0
                  cs->server.db_name, table_name);
2062
        /* XXX return failure? */
2063
0
    } else {
2064
0
        struct json *columns = json_array_create_empty();
2065
0
        for (size_t j = 0; j < N_SERVER_COLUMNS; j++) {
2066
0
            const struct server_column *column = &server_columns[j];
2067
0
            bool db_has_column = (table_schema &&
2068
0
                                  sset_contains(table_schema, column->name));
2069
0
            if (table_schema && !db_has_column) {
2070
0
                VLOG_WARN("%s table in %s database lacks %s column "
2071
0
                          "(database needs upgrade?)",
2072
0
                          table_name, cs->server.db_name, column->name);
2073
0
                continue;
2074
0
            }
2075
0
            json_array_add(columns, json_string_create(column->name));
2076
0
        }
2077
2078
0
        struct json *monitor_request = json_object_create();
2079
0
        json_object_put(monitor_request, "columns", columns);
2080
0
        json_object_put(monitor_requests, table_name,
2081
0
                        json_array_create_1(monitor_request));
2082
0
    }
2083
0
    ovsdb_cs_free_schema(schema);
2084
2085
0
    return monitor_requests;
2086
0
}
2087
2088
static const struct ovsdb_cs_ops ovsdb_cs_server_ops = {
2089
    ovsdb_cs_compose_server_monitor_request
2090
};
2091

2092
static void
2093
log_error(struct ovsdb_error *error)
2094
0
{
2095
0
    char *s = ovsdb_error_to_string_free(error);
2096
0
    VLOG_WARN("error parsing database schema: %s", s);
2097
0
    free(s);
2098
0
}
2099
2100
/* Parses 'schema_json', an OVSDB schema in JSON format as described in RFC
2101
 * 7047, to obtain the names of its rows and columns.  If successful, returns
2102
 * an shash whose keys are table names and whose values are ssets, where each
2103
 * sset contains the names of its table's columns.  On failure (due to a parse
2104
 * error), returns NULL.
2105
 *
2106
 * It would also be possible to use the general-purpose OVSDB schema parser in
2107
 * ovsdb-server, but that's overkill, possibly too strict for the current use
2108
 * case, and would require restructuring ovsdb-server to separate the schema
2109
 * code from the rest. */
2110
struct shash *
2111
ovsdb_cs_parse_schema(const struct json *schema_json)
2112
0
{
2113
0
    struct ovsdb_parser parser;
2114
0
    const struct json *tables_json;
2115
0
    struct ovsdb_error *error;
2116
0
    struct shash_node *node;
2117
0
    struct shash *schema;
2118
2119
0
    ovsdb_parser_init(&parser, schema_json, "database schema");
2120
0
    tables_json = ovsdb_parser_member(&parser, "tables", OP_OBJECT);
2121
0
    error = ovsdb_parser_destroy(&parser);
2122
0
    if (error) {
2123
0
        log_error(error);
2124
0
        return NULL;
2125
0
    }
2126
2127
0
    schema = xmalloc(sizeof *schema);
2128
0
    shash_init(schema);
2129
0
    SHASH_FOR_EACH (node, json_object(tables_json)) {
2130
0
        const char *table_name = node->name;
2131
0
        const struct json *json = node->data;
2132
0
        const struct json *columns_json;
2133
2134
0
        ovsdb_parser_init(&parser, json, "table schema for table %s",
2135
0
                          table_name);
2136
0
        columns_json = ovsdb_parser_member(&parser, "columns", OP_OBJECT);
2137
0
        error = ovsdb_parser_destroy(&parser);
2138
0
        if (error) {
2139
0
            log_error(error);
2140
0
            ovsdb_cs_free_schema(schema);
2141
0
            return NULL;
2142
0
        }
2143
2144
0
        struct sset *columns = xmalloc(sizeof *columns);
2145
0
        sset_init(columns);
2146
2147
0
        struct shash_node *node2;
2148
0
        SHASH_FOR_EACH (node2, json_object(columns_json)) {
2149
0
            const char *column_name = node2->name;
2150
0
            sset_add(columns, column_name);
2151
0
        }
2152
0
        shash_add(schema, table_name, columns);
2153
0
    }
2154
0
    return schema;
2155
0
}
2156
2157
/* Frees 'schema', which is in the format returned by
2158
 * ovsdb_cs_parse_schema(). */
2159
void
2160
ovsdb_cs_free_schema(struct shash *schema)
2161
0
{
2162
0
    if (schema) {
2163
0
        struct shash_node *node;
2164
2165
0
        SHASH_FOR_EACH_SAFE (node, schema) {
2166
0
            struct sset *sset = node->data;
2167
0
            sset_destroy(sset);
2168
0
            free(sset);
2169
0
            shash_delete(schema, node);
2170
0
        }
2171
0
        shash_destroy(schema);
2172
0
        free(schema);
2173
0
    }
2174
0
}
2175

2176
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
2177
ovsdb_cs_parse_row_update1(const struct json *in,
2178
                           struct ovsdb_cs_row_update *out)
2179
0
{
2180
0
    const struct json *old_json, *new_json;
2181
2182
0
    old_json = shash_find_data(json_object(in), "old");
2183
0
    new_json = shash_find_data(json_object(in), "new");
2184
0
    if (old_json && old_json->type != JSON_OBJECT) {
2185
0
        return ovsdb_syntax_error(old_json, NULL,
2186
0
                                  "\"old\" <row> is not object");
2187
0
    } else if (new_json && new_json->type != JSON_OBJECT) {
2188
0
        return ovsdb_syntax_error(new_json, NULL,
2189
0
                                  "\"new\" <row> is not object");
2190
0
    } else if ((old_json != NULL) + (new_json != NULL)
2191
0
               != shash_count(json_object(in))) {
2192
0
        return ovsdb_syntax_error(in, NULL,
2193
0
                                  "<row-update> contains "
2194
0
                                  "unexpected member");
2195
0
    } else if (!old_json && !new_json) {
2196
0
        return ovsdb_syntax_error(in, NULL,
2197
0
                                  "<row-update> missing \"old\" "
2198
0
                                  "and \"new\" members");
2199
0
    }
2200
2201
0
    if (!new_json) {
2202
0
        out->type = OVSDB_CS_ROW_DELETE;
2203
0
        out->columns = json_object(old_json);
2204
0
    } else if (!old_json) {
2205
0
        out->type = OVSDB_CS_ROW_INSERT;
2206
0
        out->columns = json_object(new_json);
2207
0
    } else {
2208
0
        out->type = OVSDB_CS_ROW_UPDATE;
2209
0
        out->columns = json_object(new_json);
2210
0
    }
2211
0
    return NULL;
2212
0
}
2213
2214
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
2215
ovsdb_cs_parse_row_update2(const struct json *in,
2216
                           struct ovsdb_cs_row_update *out)
2217
0
{
2218
0
    const struct shash *object = json_object(in);
2219
0
    if (shash_count(object) != 1) {
2220
0
        return ovsdb_syntax_error(
2221
0
            in, NULL, "<row-update2> has %"PRIuSIZE" members "
2222
0
            "instead of expected 1", shash_count(object));
2223
0
    }
2224
2225
0
    struct shash_node *node = shash_first(object);
2226
0
    const struct json *columns = node->data;
2227
0
    if (!strcmp(node->name, "insert") || !strcmp(node->name, "initial")) {
2228
0
        out->type = OVSDB_CS_ROW_INSERT;
2229
0
    } else if (!strcmp(node->name, "modify")) {
2230
0
        out->type = OVSDB_CS_ROW_XOR;
2231
0
    } else if (!strcmp(node->name, "delete")) {
2232
0
        out->type = OVSDB_CS_ROW_DELETE;
2233
0
        if (columns->type != JSON_NULL) {
2234
0
            return ovsdb_syntax_error(
2235
0
                in, NULL,
2236
0
                "<row-update2> delete operation has unexpected value");
2237
0
        }
2238
0
        return NULL;
2239
0
    } else {
2240
0
        return ovsdb_syntax_error(in, NULL,
2241
0
                                  "<row-update2> has unknown member \"%s\"",
2242
0
                                  node->name);
2243
0
    }
2244
2245
0
    if (columns->type != JSON_OBJECT) {
2246
0
        return ovsdb_syntax_error(
2247
0
            in, NULL,
2248
0
            "<row-update2> \"%s\" operation has unexpected value",
2249
0
            node->name);
2250
0
    }
2251
0
    out->columns = json_object(columns);
2252
2253
0
    return NULL;
2254
0
}
2255
2256
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
2257
ovsdb_cs_parse_row_update(const char *table_name,
2258
                          const struct json *in, int version,
2259
                          struct ovsdb_cs_row_update *out)
2260
0
{
2261
0
    if (in->type != JSON_OBJECT) {
2262
0
        const char *suffix = version > 1 ? "2" : "";
2263
0
        return ovsdb_syntax_error(
2264
0
            in, NULL,
2265
0
            "<table-update%s> for table \"%s\" contains <row-update%s> "
2266
0
            "that is not an object",
2267
0
            suffix, table_name, suffix);
2268
0
    }
2269
2270
0
    return (version == 1
2271
0
            ? ovsdb_cs_parse_row_update1(in, out)
2272
0
            : ovsdb_cs_parse_row_update2(in, out));
2273
0
}
2274
2275
static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
2276
ovsdb_cs_parse_table_update(const char *table_name,
2277
                            const struct json *in, int version,
2278
                            struct ovsdb_cs_table_update *out)
2279
0
{
2280
0
    const char *suffix = version > 1 ? "2" : "";
2281
2282
0
    if (in->type != JSON_OBJECT) {
2283
0
        return ovsdb_syntax_error(
2284
0
            in, NULL, "<table-update%s> for table \"%s\" is not an object",
2285
0
            suffix, table_name);
2286
0
    }
2287
0
    struct shash *in_rows = json_object(in);
2288
2289
0
    out->row_updates = xmalloc(shash_count(in_rows) * sizeof *out->row_updates);
2290
2291
0
    const struct shash_node *node;
2292
0
    SHASH_FOR_EACH (node, in_rows) {
2293
0
        const char *row_uuid_string = node->name;
2294
0
        struct uuid row_uuid;
2295
0
        if (!uuid_from_string(&row_uuid, row_uuid_string)) {
2296
0
            return ovsdb_syntax_error(
2297
0
                in, NULL,
2298
0
                "<table-update%s> for table \"%s\" contains "
2299
0
                "bad UUID \"%s\" as member name",
2300
0
                suffix, table_name, row_uuid_string);
2301
0
        }
2302
2303
0
        const struct json *in_ru = node->data;
2304
0
        struct ovsdb_cs_row_update *out_ru = &out->row_updates[out->n++];
2305
0
        *out_ru = (struct ovsdb_cs_row_update) { .row_uuid = row_uuid };
2306
2307
0
        struct ovsdb_error *error = ovsdb_cs_parse_row_update(
2308
0
            table_name, in_ru, version, out_ru);
2309
0
        if (error) {
2310
0
            return error;
2311
0
        }
2312
0
    }
2313
2314
0
    return NULL;
2315
0
}
2316
2317
/* Parses OVSDB <table-updates> or <table-updates2> object 'in' into '*outp'.
2318
 * If successful, sets '*outp' to the new object and returns NULL.  On failure,
2319
 * returns the error and sets '*outp' to NULL.
2320
 *
2321
 * On success, the caller must eventually free '*outp', with
2322
 * ovsdb_cs_db_update_destroy().
2323
 *
2324
 * 'version' should be 1 if 'in' is a <table-updates>, 2 or 3 if it is a
2325
 * <table-updates2>. */
2326
struct ovsdb_error * OVS_WARN_UNUSED_RESULT
2327
ovsdb_cs_parse_db_update(const struct json *in, int version,
2328
                         struct ovsdb_cs_db_update **outp)
2329
0
{
2330
0
    const char *suffix = version > 1 ? "2" : "";
2331
2332
0
    *outp = NULL;
2333
0
    if (in->type != JSON_OBJECT) {
2334
0
        return ovsdb_syntax_error(in, NULL,
2335
0
                                  "<table-updates%s> is not an object", suffix);
2336
0
    }
2337
2338
0
    struct ovsdb_cs_db_update *out = xzalloc(sizeof *out);
2339
0
    out->table_updates = xmalloc(shash_count(json_object(in))
2340
0
                                 * sizeof *out->table_updates);
2341
0
    const struct shash_node *node;
2342
0
    SHASH_FOR_EACH (node, json_object(in)) {
2343
0
        const char *table_name = node->name;
2344
0
        const struct json *in_tu = node->data;
2345
2346
0
        struct ovsdb_cs_table_update *out_tu = &out->table_updates[out->n++];
2347
0
        *out_tu = (struct ovsdb_cs_table_update) { .table_name = table_name };
2348
2349
0
        struct ovsdb_error *error = ovsdb_cs_parse_table_update(
2350
0
            table_name, in_tu, version, out_tu);
2351
0
        if (error) {
2352
0
            ovsdb_cs_db_update_destroy(out);
2353
0
            return error;
2354
0
        }
2355
0
    }
2356
2357
0
    *outp = out;
2358
0
    return NULL;
2359
0
}
2360
2361
/* Frees 'du', which was presumably allocated by ovsdb_cs_parse_db_update(). */
2362
void
2363
ovsdb_cs_db_update_destroy(struct ovsdb_cs_db_update *du)
2364
0
{
2365
0
    if (!du) {
2366
0
        return;
2367
0
    }
2368
2369
0
    for (size_t i = 0; i < du->n; i++) {
2370
0
        struct ovsdb_cs_table_update *tu = &du->table_updates[i];
2371
0
        free(tu->row_updates);
2372
0
    }
2373
0
    free(du->table_updates);
2374
0
    free(du);
2375
0
}
2376
2377
const struct ovsdb_cs_table_update *
2378
ovsdb_cs_db_update_find_table(const struct ovsdb_cs_db_update *du,
2379
                              const char *table_name)
2380
0
{
2381
0
    for (size_t i = 0; i < du->n; i++) {
2382
0
        const struct ovsdb_cs_table_update *tu = &du->table_updates[i];
2383
0
        if (!strcmp(tu->table_name, table_name)) {
2384
0
            return tu;
2385
0
        }
2386
0
    }
2387
0
    return NULL;
2388
0
}
2389