/src/openvswitch/lib/ovsdb-cs.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017 Nicira, Inc. |
2 | | * Copyright (C) 2016 Hewlett Packard Enterprise Development LP |
3 | | * |
4 | | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | * you may not use this file except in compliance with the License. |
6 | | * You may obtain a copy of the License at: |
7 | | * |
8 | | * http://www.apache.org/licenses/LICENSE-2.0 |
9 | | * |
10 | | * Unless required by applicable law or agreed to in writing, software |
11 | | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | | * WITHOUT WARRANTIES OR CONDITION OF ANY KIND, either express or implied. |
13 | | * See the License for the specific language governing permissions and |
14 | | * limitations under the License. |
15 | | */ |
16 | | |
17 | | #include <config.h> |
18 | | |
19 | | #include "ovsdb-cs.h" |
20 | | |
21 | | #include <errno.h> |
22 | | |
23 | | #include "hash.h" |
24 | | #include "jsonrpc.h" |
25 | | #include "openvswitch/dynamic-string.h" |
26 | | #include "openvswitch/hmap.h" |
27 | | #include "openvswitch/json.h" |
28 | | #include "openvswitch/poll-loop.h" |
29 | | #include "openvswitch/shash.h" |
30 | | #include "openvswitch/vlog.h" |
31 | | #include "ovsdb-data.h" |
32 | | #include "ovsdb-error.h" |
33 | | #include "ovsdb-parser.h" |
34 | | #include "ovsdb-session.h" |
35 | | #include "ovsdb-types.h" |
36 | | #include "sset.h" |
37 | | #include "svec.h" |
38 | | #include "util.h" |
39 | | #include "uuid.h" |
40 | | |
41 | | VLOG_DEFINE_THIS_MODULE(ovsdb_cs); |
42 | | |
43 | | /* Connection state machine. |
44 | | * |
45 | | * When a JSON-RPC session connects, the CS layer sends a "monitor_cond" |
46 | | * request for the Database table in the _Server database and transitions to |
47 | | * the CS_S_SERVER_MONITOR_REQUESTED state. If the session drops and |
48 | | * reconnects, or if the CS receives a "monitor_canceled" notification for a |
49 | | * table it is monitoring, the CS starts over again in the same way. */ |
50 | | #define OVSDB_CS_STATES \ |
51 | | /* Waits for "get_schema" reply, then sends "monitor_cond" \ |
52 | | * request for the Database table in the _Server database, whose \ |
53 | | * details are informed by the schema, and transitions to \ |
54 | | * CS_S_SERVER_MONITOR_REQUESTED. */ \ |
55 | 0 | OVSDB_CS_STATE(SERVER_SCHEMA_REQUESTED) \ |
56 | 0 | \ |
57 | 0 | /* Waits for "monitor_cond" reply for the Database table: \ |
58 | 0 | * \ |
59 | 0 | * - If the reply indicates success, and the Database table has a \ |
60 | 0 | * row for the CS database: \ |
61 | 0 | * \ |
62 | 0 | * * If the row indicates that this is a clustered database \ |
63 | 0 | * that is not connected to the cluster, closes the \ |
64 | 0 | * connection. The next connection attempt has a chance at \ |
65 | 0 | * picking a connected server. \ |
66 | 0 | * \ |
67 | 0 | * * Otherwise, sends a monitoring request for the CS \ |
68 | 0 | * database whose details are informed by the schema \ |
69 | 0 | * (obtained from the row), and transitions to \ |
70 | 0 | * CS_S_DATA_MONITOR_(COND_(SINCE_))REQUESTED. \ |
71 | 0 | * \ |
72 | 0 | * - If the reply indicates success, but the Database table does \ |
73 | 0 | * not have a row for the CS database, transitions to \ |
74 | 0 | * CS_S_ERROR. \ |
75 | 0 | * \ |
76 | 0 | * - If the reply indicates failure, sends a "get_schema" request \ |
77 | 0 | * for the CS database and transitions to \ |
78 | 0 | * CS_S_DATA_SCHEMA_REQUESTED. */ \ |
79 | 0 | OVSDB_CS_STATE(SERVER_MONITOR_REQUESTED) \ |
80 | 0 | \ |
81 | 0 | /* Waits for "get_schema" reply, then sends "monitor_cond" \ |
82 | 0 | * request whose details are informed by the schema, and \ |
83 | 0 | * transitions to CS_S_DATA_MONITOR_COND_REQUESTED. */ \ |
84 | 0 | OVSDB_CS_STATE(DATA_SCHEMA_REQUESTED) \ |
85 | 0 | \ |
86 | 0 | /* Waits for "monitor_cond_since" reply. If successful, replaces \ |
87 | 0 | * the CS contents by the data carried in the reply and \ |
88 | 0 | * transitions to CS_S_MONITORING. On failure, sends a \ |
89 | 0 | * "monitor_cond" request and transitions to \ |
90 | 0 | * CS_S_DATA_MONITOR_COND_REQUESTED. */ \ |
91 | 0 | OVSDB_CS_STATE(DATA_MONITOR_COND_SINCE_REQUESTED) \ |
92 | 0 | \ |
93 | 0 | /* Waits for "monitor_cond" reply. If successful, replaces the \ |
94 | 0 | * CS contents by the data carried in the reply and transitions \ |
95 | 0 | * to CS_S_MONITORING. On failure, sends a "monitor" request \ |
96 | 0 | * and transitions to CS_S_DATA_MONITOR_REQUESTED. */ \ |
97 | 0 | OVSDB_CS_STATE(DATA_MONITOR_COND_REQUESTED) \ |
98 | 0 | \ |
99 | 0 | /* Waits for "monitor" reply. If successful, replaces the CS \ |
100 | 0 | * contents by the data carried in the reply and transitions to \ |
101 | 0 | * CS_S_MONITORING. On failure, transitions to CS_S_ERROR. */ \ |
102 | 0 | OVSDB_CS_STATE(DATA_MONITOR_REQUESTED) \ |
103 | 0 | \ |
104 | 0 | /* State that processes "update", "update2" or "update3" \ |
105 | 0 | * notifications for the main database (and the Database table \ |
106 | 0 | * in _Server if available). \ |
107 | 0 | * \ |
108 | 0 | * If we're monitoring the Database table and we get notified \ |
109 | 0 | * that the CS database has been deleted, we close the \ |
110 | 0 | * connection (which will restart the state machine). */ \ |
111 | 0 | OVSDB_CS_STATE(MONITORING) \ |
112 | 0 | \ |
113 | 0 | /* Terminal error state that indicates that nothing useful can be \ |
114 | 0 | * done, for example because the database server doesn't actually \ |
115 | 0 | * have the desired database. We maintain the session with the \ |
116 | 0 | * database server anyway. If it starts serving the database \ |
117 | 0 | * that we want, or if someone fixes and restarts the database, \ |
118 | 0 | * then it will kill the session and we will automatically \ |
119 | 0 | * reconnect and try again. */ \ |
120 | 0 | OVSDB_CS_STATE(ERROR) \ |
121 | 0 | \ |
122 | 0 | /* Terminal state that indicates we connected to a useless server \ |
123 | 0 | * in a cluster, e.g. one that is partitioned from the rest of \ |
124 | 0 | * the cluster. We're waiting to retry. */ \ |
125 | 0 | OVSDB_CS_STATE(RETRY) |
126 | | |
127 | | enum ovsdb_cs_state { |
128 | | #define OVSDB_CS_STATE(NAME) CS_S_##NAME, |
129 | | OVSDB_CS_STATES |
130 | | #undef OVSDB_CS_STATE |
131 | | }; |
132 | | |
133 | | static const char * |
134 | | ovsdb_cs_state_to_string(enum ovsdb_cs_state state) |
135 | 0 | { |
136 | 0 | switch (state) { |
137 | 0 | #define OVSDB_CS_STATE(NAME) case CS_S_##NAME: return #NAME; |
138 | 0 | OVSDB_CS_STATES |
139 | 0 | #undef OVSDB_CS_STATE |
140 | 0 | default: return "<unknown>"; |
141 | 0 | } |
142 | 0 | } |
143 | | |
144 | | /* A database being monitored. |
145 | | * |
146 | | * There are two instances of this data structure for each CS instance, one for |
147 | | * the _Server database used for working with clusters, and the other one for |
148 | | * the actual database that the client is interested in. */ |
149 | | struct ovsdb_cs_db { |
150 | | struct ovsdb_cs *cs; |
151 | | |
152 | | /* Data. */ |
153 | | const char *db_name; /* Database's name. */ |
154 | | struct hmap tables; /* Contains "struct ovsdb_cs_db_table *"s.*/ |
155 | | struct json *monitor_id; |
156 | | struct json *schema; |
157 | | |
158 | | /* Monitor version. */ |
159 | | int max_version; /* Maximum version of monitor request to use. */ |
160 | | int monitor_version; /* 0 if not monitoring, 1=monitor, |
161 | | * 2=monitor_cond, 3=monitor_cond_since. */ |
162 | | |
163 | | /* Condition changes. */ |
164 | | bool cond_changed; /* Change not yet sent to server? */ |
165 | | unsigned int cond_seqno; /* Increments when condition changes. */ |
166 | | |
167 | | /* Database locking. */ |
168 | | char *lock_name; /* Name of lock we need, NULL if none. */ |
169 | | bool has_lock; /* Has db server told us we have the lock? */ |
170 | | bool is_lock_contended; /* Has db server told us we can't get lock? */ |
171 | | struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. */ |
172 | | |
173 | | /* Last db txn id, used for fast resync through monitor_cond_since */ |
174 | | struct uuid last_id; |
175 | | |
176 | | /* Client interface. */ |
177 | | struct ovs_list events; |
178 | | const struct ovsdb_cs_ops *ops; |
179 | | void *ops_aux; |
180 | | }; |
181 | | |
182 | | static const struct ovsdb_cs_ops ovsdb_cs_server_ops; |
183 | | |
184 | | static void ovsdb_cs_db_destroy_tables(struct ovsdb_cs_db *); |
185 | | static unsigned int ovsdb_cs_db_set_condition( |
186 | | struct ovsdb_cs_db *, const char *db_name, const struct json *condition); |
187 | | |
188 | | static void ovsdb_cs_send_schema_request(struct ovsdb_cs *, |
189 | | struct ovsdb_cs_db *); |
190 | | static void ovsdb_cs_send_db_change_aware(struct ovsdb_cs *); |
191 | | static bool ovsdb_cs_check_server_db(struct ovsdb_cs *); |
192 | | static void ovsdb_cs_clear_server_rows(struct ovsdb_cs *); |
193 | | static void ovsdb_cs_send_monitor_request(struct ovsdb_cs *, |
194 | | struct ovsdb_cs_db *, int version); |
195 | | static void ovsdb_cs_db_ack_condition(struct ovsdb_cs_db *db); |
196 | | static void ovsdb_cs_db_sync_condition(struct ovsdb_cs_db *db); |
197 | | |
198 | | struct ovsdb_cs { |
199 | | struct ovsdb_cs_db server; |
200 | | struct ovsdb_cs_db data; |
201 | | |
202 | | /* Session state. |
203 | | * |
204 | | * 'state_seqno' is a snapshot of the session's sequence number as returned |
205 | | * jsonrpc_session_get_seqno(session), so if it differs from the value that |
206 | | * function currently returns then the session has reconnected and the |
207 | | * state machine must restart. */ |
208 | | struct jsonrpc_session *session; /* Connection to the server. */ |
209 | | char *remote; /* 'session' remote name. */ |
210 | | enum ovsdb_cs_state state; /* Current session state. */ |
211 | | unsigned int state_seqno; /* See above. */ |
212 | | struct json *request_id; /* JSON ID for request awaiting reply. */ |
213 | | |
214 | | /* IDs of outstanding transactions. */ |
215 | | struct json **txns; |
216 | | size_t n_txns, allocated_txns; |
217 | | |
218 | | /* Info for the _Server database. */ |
219 | | struct uuid cid; |
220 | | struct hmap server_rows; |
221 | | |
222 | | /* Whether to send 'set_db_change_aware'. */ |
223 | | bool set_db_change_aware; |
224 | | |
225 | | /* Clustered servers. */ |
226 | | uint64_t min_index; /* Minimum allowed index, to avoid regression. */ |
227 | | bool leader_only; /* If true, do not connect to Raft followers. */ |
228 | | bool shuffle_remotes; /* If true, connect to servers in random order. */ |
229 | | }; |
230 | | |
231 | | static void ovsdb_cs_transition_at(struct ovsdb_cs *, enum ovsdb_cs_state, |
232 | | const char *where); |
233 | | #define ovsdb_cs_transition(CS, STATE) \ |
234 | 0 | ovsdb_cs_transition_at(CS, STATE, OVS_SOURCE_LOCATOR) |
235 | | |
236 | | static void ovsdb_cs_retry_at(struct ovsdb_cs *, const char *where); |
237 | 0 | #define ovsdb_cs_retry(CS) ovsdb_cs_retry_at(CS, OVS_SOURCE_LOCATOR) |
238 | | |
239 | | static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5); |
240 | | |
241 | | static void ovsdb_cs_db_parse_monitor_reply(struct ovsdb_cs_db *, |
242 | | const struct json *result, |
243 | | int version); |
244 | | static bool ovsdb_cs_db_parse_update_rpc(struct ovsdb_cs_db *, |
245 | | const struct jsonrpc_msg *); |
246 | | static bool ovsdb_cs_handle_monitor_canceled(struct ovsdb_cs *, |
247 | | struct ovsdb_cs_db *, |
248 | | const struct jsonrpc_msg *); |
249 | | |
250 | | static bool ovsdb_cs_db_process_lock_replies(struct ovsdb_cs_db *, |
251 | | const struct jsonrpc_msg *); |
252 | | static struct jsonrpc_msg *ovsdb_cs_db_compose_lock_request( |
253 | | struct ovsdb_cs_db *); |
254 | | static struct jsonrpc_msg *ovsdb_cs_db_compose_unlock_request( |
255 | | struct ovsdb_cs_db *); |
256 | | static void ovsdb_cs_db_parse_lock_reply(struct ovsdb_cs_db *, |
257 | | const struct json *); |
258 | | static bool ovsdb_cs_db_parse_lock_notify(struct ovsdb_cs_db *, |
259 | | const struct json *params, |
260 | | bool new_has_lock); |
261 | | static void ovsdb_cs_send_cond_change(struct ovsdb_cs *); |
262 | | |
263 | | static bool ovsdb_cs_db_txn_process_reply(struct ovsdb_cs *, |
264 | | const struct jsonrpc_msg *reply); |
265 | | |
266 | | /* Events. */ |
267 | | |
268 | | void |
269 | | ovsdb_cs_event_destroy(struct ovsdb_cs_event *event) |
270 | 0 | { |
271 | 0 | if (event) { |
272 | 0 | switch (event->type) { |
273 | 0 | case OVSDB_CS_EVENT_TYPE_RECONNECT: |
274 | 0 | case OVSDB_CS_EVENT_TYPE_LOCKED: |
275 | 0 | break; |
276 | | |
277 | 0 | case OVSDB_CS_EVENT_TYPE_UPDATE: |
278 | 0 | json_destroy(event->update.table_updates); |
279 | 0 | break; |
280 | | |
281 | 0 | case OVSDB_CS_EVENT_TYPE_TXN_REPLY: |
282 | 0 | jsonrpc_msg_destroy(event->txn_reply); |
283 | 0 | break; |
284 | 0 | } |
285 | 0 | free(event); |
286 | 0 | } |
287 | 0 | } |
288 | | |
289 | | /* Lifecycle. */ |
290 | | |
291 | | static void |
292 | | ovsdb_cs_db_init(struct ovsdb_cs_db *db, const char *db_name, |
293 | | struct ovsdb_cs *parent, int max_version, |
294 | | const struct ovsdb_cs_ops *ops, void *ops_aux) |
295 | 0 | { |
296 | 0 | *db = (struct ovsdb_cs_db) { |
297 | 0 | .cs = parent, |
298 | 0 | .db_name = db_name, |
299 | 0 | .tables = HMAP_INITIALIZER(&db->tables), |
300 | 0 | .max_version = max_version, |
301 | 0 | .monitor_id = json_array_create_2(json_string_create("monid"), |
302 | 0 | json_string_create(db_name)), |
303 | 0 | .events = OVS_LIST_INITIALIZER(&db->events), |
304 | 0 | .ops = ops, |
305 | 0 | .ops_aux = ops_aux, |
306 | 0 | }; |
307 | 0 | } |
308 | | |
309 | | /* Creates and returns a new client synchronization object. The connection |
310 | | * will monitor remote database 'db_name'. If 'retry' is true, then also |
311 | | * reconnect if the connection fails. |
312 | | * |
313 | | * XXX 'max_version' should ordinarily be 3, to allow use of the most efficient |
314 | | * "monitor_cond_since" method with the database. Currently there's some kind |
315 | | * of bug in the DDlog Rust code that interfaces to that, so instead |
316 | | * ovn-northd-ddlog passes 1 to use plain 'monitor' instead. Once the DDlog |
317 | | * Rust code gets fixed, we might as well just delete 'max_version' |
318 | | * entirely. |
319 | | * |
320 | | * 'ops' is a struct for northd_cs_run() to use, and 'ops_aux' is a pointer |
321 | | * that gets passed into each call. |
322 | | * |
323 | | * Use ovsdb_cs_set_remote() to configure the database to which to connect. |
324 | | * Until a remote is configured, no data can be retrieved. |
325 | | */ |
326 | | struct ovsdb_cs * |
327 | | ovsdb_cs_create(const char *db_name, int max_version, |
328 | | const struct ovsdb_cs_ops *ops, void *ops_aux) |
329 | 0 | { |
330 | 0 | struct ovsdb_cs *cs = xzalloc(sizeof *cs); |
331 | 0 | ovsdb_cs_db_init(&cs->server, "_Server", cs, 2, &ovsdb_cs_server_ops, cs); |
332 | 0 | ovsdb_cs_db_init(&cs->data, db_name, cs, max_version, ops, ops_aux); |
333 | 0 | cs->state_seqno = UINT_MAX; |
334 | 0 | cs->request_id = NULL; |
335 | 0 | cs->leader_only = true; |
336 | 0 | cs->shuffle_remotes = true; |
337 | 0 | cs->set_db_change_aware = true; |
338 | 0 | hmap_init(&cs->server_rows); |
339 | |
|
340 | 0 | return cs; |
341 | 0 | } |
342 | | |
343 | | static void |
344 | | ovsdb_cs_db_destroy(struct ovsdb_cs_db *db) |
345 | 0 | { |
346 | 0 | ovsdb_cs_db_destroy_tables(db); |
347 | |
|
348 | 0 | json_destroy(db->monitor_id); |
349 | 0 | json_destroy(db->schema); |
350 | |
|
351 | 0 | free(db->lock_name); |
352 | |
|
353 | 0 | json_destroy(db->lock_request_id); |
354 | | |
355 | | /* This list always gets flushed out at the end of ovsdb_cs_run(). */ |
356 | 0 | ovs_assert(ovs_list_is_empty(&db->events)); |
357 | 0 | } |
358 | | |
359 | | /* Destroys 'cs' and all of the data structures that it manages. */ |
360 | | void |
361 | | ovsdb_cs_destroy(struct ovsdb_cs *cs) |
362 | 0 | { |
363 | 0 | if (cs) { |
364 | 0 | ovsdb_cs_db_destroy(&cs->server); |
365 | 0 | ovsdb_cs_db_destroy(&cs->data); |
366 | 0 | jsonrpc_session_close(cs->session); |
367 | 0 | free(cs->remote); |
368 | 0 | json_destroy(cs->request_id); |
369 | |
|
370 | 0 | for (size_t i = 0; i < cs->n_txns; i++) { |
371 | 0 | json_destroy(cs->txns[i]); |
372 | 0 | } |
373 | 0 | free(cs->txns); |
374 | |
|
375 | 0 | ovsdb_cs_clear_server_rows(cs); |
376 | 0 | hmap_destroy(&cs->server_rows); |
377 | |
|
378 | 0 | free(cs); |
379 | 0 | } |
380 | 0 | } |
381 | | |
382 | | static void |
383 | | ovsdb_cs_transition_at(struct ovsdb_cs *cs, enum ovsdb_cs_state new_state, |
384 | | const char *where) |
385 | 0 | { |
386 | 0 | VLOG_DBG("%s: %s -> %s at %s", |
387 | 0 | cs->session ? jsonrpc_session_get_name(cs->session) : "void", |
388 | 0 | ovsdb_cs_state_to_string(cs->state), |
389 | 0 | ovsdb_cs_state_to_string(new_state), |
390 | 0 | where); |
391 | 0 | cs->state = new_state; |
392 | 0 | } |
393 | | |
394 | | static void |
395 | | ovsdb_cs_send_request(struct ovsdb_cs *cs, struct jsonrpc_msg *request) |
396 | 0 | { |
397 | 0 | json_destroy(cs->request_id); |
398 | 0 | cs->request_id = json_clone(request->id); |
399 | 0 | if (cs->session) { |
400 | 0 | jsonrpc_session_send(cs->session, request); |
401 | 0 | } else { |
402 | 0 | jsonrpc_msg_destroy(request); |
403 | 0 | } |
404 | 0 | } |
405 | | |
406 | | static void |
407 | | ovsdb_cs_retry_at(struct ovsdb_cs *cs, const char *where) |
408 | 0 | { |
409 | 0 | ovsdb_cs_force_reconnect(cs); |
410 | 0 | ovsdb_cs_transition_at(cs, CS_S_RETRY, where); |
411 | 0 | } |
412 | | |
413 | | static void |
414 | | ovsdb_cs_restart_fsm(struct ovsdb_cs *cs) |
415 | 0 | { |
416 | | /* Resync data DB table conditions to avoid missing updates due to |
417 | | * conditions that were in flight or changed locally while the connection |
418 | | * was down. |
419 | | */ |
420 | 0 | ovsdb_cs_db_sync_condition(&cs->data); |
421 | |
|
422 | 0 | ovsdb_cs_send_schema_request(cs, &cs->server); |
423 | 0 | ovsdb_cs_transition(cs, CS_S_SERVER_SCHEMA_REQUESTED); |
424 | 0 | cs->data.monitor_version = 0; |
425 | 0 | cs->server.monitor_version = 0; |
426 | 0 | } |
427 | | |
428 | | static void |
429 | | ovsdb_cs_process_response(struct ovsdb_cs *cs, struct jsonrpc_msg *msg) |
430 | 0 | { |
431 | 0 | bool ok = msg->type == JSONRPC_REPLY; |
432 | 0 | if (!ok |
433 | 0 | && cs->state != CS_S_SERVER_SCHEMA_REQUESTED |
434 | 0 | && cs->state != CS_S_SERVER_MONITOR_REQUESTED |
435 | 0 | && cs->state != CS_S_DATA_MONITOR_COND_REQUESTED |
436 | 0 | && cs->state != CS_S_DATA_MONITOR_COND_SINCE_REQUESTED) { |
437 | 0 | static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5); |
438 | 0 | char *s = jsonrpc_msg_to_string(msg); |
439 | 0 | VLOG_INFO_RL(&rl, "%s: received unexpected %s response in " |
440 | 0 | "%s state: %s", jsonrpc_session_get_name(cs->session), |
441 | 0 | jsonrpc_msg_type_to_string(msg->type), |
442 | 0 | ovsdb_cs_state_to_string(cs->state), |
443 | 0 | s); |
444 | 0 | free(s); |
445 | 0 | ovsdb_cs_retry(cs); |
446 | 0 | return; |
447 | 0 | } |
448 | | |
449 | 0 | switch (cs->state) { |
450 | 0 | case CS_S_SERVER_SCHEMA_REQUESTED: |
451 | 0 | if (ok) { |
452 | 0 | json_destroy(cs->server.schema); |
453 | 0 | cs->server.schema = json_clone(msg->result); |
454 | 0 | ovsdb_cs_send_monitor_request(cs, &cs->server, |
455 | 0 | cs->server.max_version); |
456 | 0 | ovsdb_cs_transition(cs, CS_S_SERVER_MONITOR_REQUESTED); |
457 | 0 | } else { |
458 | 0 | ovsdb_cs_send_schema_request(cs, &cs->data); |
459 | 0 | ovsdb_cs_transition(cs, CS_S_DATA_SCHEMA_REQUESTED); |
460 | 0 | } |
461 | 0 | break; |
462 | | |
463 | 0 | case CS_S_SERVER_MONITOR_REQUESTED: |
464 | 0 | if (ok) { |
465 | 0 | cs->server.monitor_version = cs->server.max_version; |
466 | 0 | ovsdb_cs_db_parse_monitor_reply(&cs->server, msg->result, |
467 | 0 | cs->server.monitor_version); |
468 | 0 | if (ovsdb_cs_check_server_db(cs) && cs->set_db_change_aware) { |
469 | 0 | ovsdb_cs_send_db_change_aware(cs); |
470 | 0 | } |
471 | 0 | } else { |
472 | 0 | ovsdb_cs_send_schema_request(cs, &cs->data); |
473 | 0 | ovsdb_cs_transition(cs, CS_S_DATA_SCHEMA_REQUESTED); |
474 | 0 | } |
475 | 0 | break; |
476 | | |
477 | 0 | case CS_S_DATA_SCHEMA_REQUESTED: |
478 | 0 | json_destroy(cs->data.schema); |
479 | 0 | cs->data.schema = json_clone(msg->result); |
480 | 0 | if (cs->data.max_version >= 2) { |
481 | 0 | ovsdb_cs_send_monitor_request(cs, &cs->data, 2); |
482 | 0 | ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED); |
483 | 0 | } else { |
484 | 0 | ovsdb_cs_send_monitor_request(cs, &cs->data, 1); |
485 | 0 | ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED); |
486 | 0 | } |
487 | 0 | break; |
488 | | |
489 | 0 | case CS_S_DATA_MONITOR_COND_SINCE_REQUESTED: |
490 | 0 | if (!ok) { |
491 | | /* "monitor_cond_since" not supported. Try "monitor_cond". */ |
492 | 0 | ovsdb_cs_send_monitor_request(cs, &cs->data, 2); |
493 | 0 | ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED); |
494 | 0 | } else { |
495 | 0 | cs->data.monitor_version = 3; |
496 | 0 | ovsdb_cs_transition(cs, CS_S_MONITORING); |
497 | 0 | ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 3); |
498 | 0 | } |
499 | 0 | break; |
500 | | |
501 | 0 | case CS_S_DATA_MONITOR_COND_REQUESTED: |
502 | 0 | if (!ok) { |
503 | | /* "monitor_cond" not supported. Try "monitor". */ |
504 | 0 | ovsdb_cs_send_monitor_request(cs, &cs->data, 1); |
505 | 0 | ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED); |
506 | 0 | } else { |
507 | 0 | cs->data.monitor_version = 2; |
508 | 0 | ovsdb_cs_transition(cs, CS_S_MONITORING); |
509 | 0 | ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 2); |
510 | 0 | } |
511 | 0 | break; |
512 | | |
513 | 0 | case CS_S_DATA_MONITOR_REQUESTED: |
514 | 0 | cs->data.monitor_version = 1; |
515 | 0 | ovsdb_cs_transition(cs, CS_S_MONITORING); |
516 | 0 | ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 1); |
517 | 0 | break; |
518 | | |
519 | 0 | case CS_S_MONITORING: |
520 | | /* We don't normally have a request outstanding in this state. If we |
521 | | * do, it's a "monitor_cond_change", which means that the conditional |
522 | | * monitor clauses were updated. |
523 | | * |
524 | | * Mark the last requested conditions as acked and if further |
525 | | * condition changes were pending, send them now. */ |
526 | 0 | ovsdb_cs_db_ack_condition(&cs->data); |
527 | 0 | ovsdb_cs_send_cond_change(cs); |
528 | 0 | cs->data.cond_seqno++; |
529 | 0 | break; |
530 | | |
531 | 0 | case CS_S_ERROR: |
532 | 0 | case CS_S_RETRY: |
533 | | /* Nothing to do in this state. */ |
534 | 0 | break; |
535 | | |
536 | 0 | default: |
537 | 0 | OVS_NOT_REACHED(); |
538 | 0 | } |
539 | 0 | } |
540 | | |
541 | | static void |
542 | | ovsdb_cs_process_msg(struct ovsdb_cs *cs, struct jsonrpc_msg *msg) |
543 | 0 | { |
544 | 0 | bool is_response = (msg->type == JSONRPC_REPLY || |
545 | 0 | msg->type == JSONRPC_ERROR); |
546 | | |
547 | | /* Process a reply to an outstanding request. */ |
548 | 0 | if (is_response |
549 | 0 | && cs->request_id && json_equal(cs->request_id, msg->id)) { |
550 | 0 | json_destroy(cs->request_id); |
551 | 0 | cs->request_id = NULL; |
552 | 0 | ovsdb_cs_process_response(cs, msg); |
553 | 0 | return; |
554 | 0 | } |
555 | | |
556 | | /* Process database contents updates. */ |
557 | 0 | if (ovsdb_cs_db_parse_update_rpc(&cs->data, msg)) { |
558 | 0 | return; |
559 | 0 | } |
560 | 0 | if (cs->server.monitor_version |
561 | 0 | && ovsdb_cs_db_parse_update_rpc(&cs->server, msg)) { |
562 | 0 | ovsdb_cs_check_server_db(cs); |
563 | 0 | return; |
564 | 0 | } |
565 | | |
566 | 0 | if (ovsdb_cs_handle_monitor_canceled(cs, &cs->data, msg) |
567 | 0 | || (cs->server.monitor_version |
568 | 0 | && ovsdb_cs_handle_monitor_canceled(cs, &cs->server, msg))) { |
569 | 0 | return; |
570 | 0 | } |
571 | | |
572 | | /* Process "lock" replies and related notifications. */ |
573 | 0 | if (ovsdb_cs_db_process_lock_replies(&cs->data, msg)) { |
574 | 0 | return; |
575 | 0 | } |
576 | | |
577 | | /* Process response to a database transaction we submitted. */ |
578 | 0 | if (is_response && ovsdb_cs_db_txn_process_reply(cs, msg)) { |
579 | 0 | return; |
580 | 0 | } |
581 | | |
582 | | /* Unknown message. Log at a low level because this can happen if |
583 | | * ovsdb_cs_txn_destroy() is called to destroy a transaction |
584 | | * before we receive the reply. |
585 | | * |
586 | | * (We could sort those out from other kinds of unknown messages by |
587 | | * using distinctive IDs for transactions, if it seems valuable to |
588 | | * do so, and then it would be possible to use different log |
589 | | * levels. XXX?) */ |
590 | 0 | char *s = jsonrpc_msg_to_string(msg); |
591 | 0 | VLOG_DBG("%s: received unexpected %s message: %s", |
592 | 0 | jsonrpc_session_get_name(cs->session), |
593 | 0 | jsonrpc_msg_type_to_string(msg->type), s); |
594 | 0 | free(s); |
595 | 0 | } |
596 | | |
597 | | static struct ovsdb_cs_event * |
598 | | ovsdb_cs_db_add_event(struct ovsdb_cs_db *db, enum ovsdb_cs_event_type type) |
599 | 0 | { |
600 | 0 | struct ovsdb_cs_event *event = xmalloc(sizeof *event); |
601 | 0 | event->type = type; |
602 | 0 | ovs_list_push_back(&db->events, &event->list_node); |
603 | 0 | return event; |
604 | 0 | } |
605 | | |
606 | | /* Processes a batch of messages from the database server on 'cs'. This may |
607 | | * cause the CS's contents to change. |
608 | | * |
609 | | * Initializes 'events' with a list of events that occurred on 'cs'. The |
610 | | * caller must process and destroy all of the events. */ |
611 | | void |
612 | | ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events) |
613 | 0 | { |
614 | 0 | ovs_list_init(events); |
615 | 0 | if (!cs->session) { |
616 | 0 | return; |
617 | 0 | } |
618 | | |
619 | 0 | ovsdb_cs_send_cond_change(cs); |
620 | |
|
621 | 0 | jsonrpc_session_run(cs->session); |
622 | |
|
623 | 0 | unsigned int seqno = jsonrpc_session_get_seqno(cs->session); |
624 | 0 | if (cs->state_seqno != seqno) { |
625 | 0 | cs->state_seqno = seqno; |
626 | 0 | ovsdb_cs_restart_fsm(cs); |
627 | |
|
628 | 0 | for (size_t i = 0; i < cs->n_txns; i++) { |
629 | 0 | json_destroy(cs->txns[i]); |
630 | 0 | } |
631 | 0 | cs->n_txns = 0; |
632 | |
|
633 | 0 | ovsdb_cs_db_add_event(&cs->data, OVSDB_CS_EVENT_TYPE_RECONNECT); |
634 | |
|
635 | 0 | if (cs->data.lock_name) { |
636 | 0 | jsonrpc_session_send( |
637 | 0 | cs->session, |
638 | 0 | ovsdb_cs_db_compose_lock_request(&cs->data)); |
639 | 0 | } |
640 | 0 | } |
641 | |
|
642 | 0 | for (int i = 0; i < 50; i++) { |
643 | 0 | struct jsonrpc_msg *msg = jsonrpc_session_recv(cs->session); |
644 | 0 | if (!msg) { |
645 | 0 | break; |
646 | 0 | } |
647 | 0 | ovsdb_cs_process_msg(cs, msg); |
648 | 0 | jsonrpc_msg_destroy(msg); |
649 | 0 | } |
650 | 0 | ovs_list_push_back_all(events, &cs->data.events); |
651 | 0 | } |
652 | | |
653 | | /* Arranges for poll_block() to wake up when ovsdb_cs_run() has something to |
654 | | * do or when activity occurs on a transaction on 'cs'. */ |
655 | | void |
656 | | ovsdb_cs_wait(struct ovsdb_cs *cs) |
657 | 0 | { |
658 | 0 | if (!cs->session) { |
659 | 0 | return; |
660 | 0 | } |
661 | 0 | jsonrpc_session_wait(cs->session); |
662 | 0 | jsonrpc_session_recv_wait(cs->session); |
663 | 0 | } |
664 | | |
665 | | /* Network connection. */ |
666 | | |
667 | | /* Changes the remote and creates a new session. Keeps existing connection |
668 | | * if current remote is still valid. |
669 | | * |
670 | | * If 'retry' is true, the connection to the remote will automatically retry |
671 | | * when it fails. If 'retry' is false, the connection is one-time. */ |
672 | | void |
673 | | ovsdb_cs_set_remote(struct ovsdb_cs *cs, const char *remote, bool retry) |
674 | 0 | { |
675 | 0 | if (cs |
676 | 0 | && ((remote != NULL) != (cs->remote != NULL) |
677 | 0 | || (remote && cs->remote && strcmp(remote, cs->remote)))) { |
678 | 0 | struct jsonrpc *rpc = NULL; |
679 | | |
680 | | /* Close the old session, if any. */ |
681 | 0 | if (cs->session) { |
682 | | /* Save the current open connection and close the session. */ |
683 | 0 | rpc = jsonrpc_session_steal(cs->session); |
684 | 0 | cs->session = NULL; |
685 | |
|
686 | 0 | free(cs->remote); |
687 | 0 | cs->remote = NULL; |
688 | 0 | } |
689 | | |
690 | | /* Open new session, if any. */ |
691 | 0 | if (remote) { |
692 | 0 | struct svec remotes = SVEC_EMPTY_INITIALIZER; |
693 | 0 | struct uuid old_cid = cs->cid; |
694 | |
|
695 | 0 | ovsdb_session_parse_remote(remote, &remotes, &cs->cid); |
696 | 0 | if (cs->shuffle_remotes) { |
697 | 0 | svec_shuffle(&remotes); |
698 | 0 | } |
699 | 0 | cs->session = jsonrpc_session_open_multiple(&remotes, retry); |
700 | | |
701 | | /* Use old connection, if cluster id didn't change and the remote |
702 | | * is still on the list, to avoid unnecessary re-connection. */ |
703 | 0 | if (rpc && uuid_equals(&old_cid, &cs->cid) |
704 | 0 | && svec_contains_unsorted(&remotes, jsonrpc_get_name(rpc))) { |
705 | 0 | jsonrpc_session_replace(cs->session, rpc); |
706 | 0 | cs->state_seqno = jsonrpc_session_get_seqno(cs->session); |
707 | 0 | rpc = NULL; |
708 | 0 | } else { |
709 | 0 | cs->state_seqno = UINT_MAX; |
710 | 0 | } |
711 | |
|
712 | 0 | svec_destroy(&remotes); |
713 | 0 | cs->remote = xstrdup(remote); |
714 | 0 | } |
715 | |
|
716 | 0 | jsonrpc_close(rpc); |
717 | 0 | } |
718 | 0 | } |
719 | | |
720 | | /* Reconfigures 'cs' so that it would reconnect to the database, if |
721 | | * connection was dropped. */ |
722 | | void |
723 | | ovsdb_cs_enable_reconnect(struct ovsdb_cs *cs) |
724 | 0 | { |
725 | 0 | if (cs->session) { |
726 | 0 | jsonrpc_session_enable_reconnect(cs->session); |
727 | 0 | } |
728 | 0 | } |
729 | | |
730 | | /* Forces 'cs' to drop its connection to the database and reconnect. In the |
731 | | * meantime, the contents of 'cs' will not change. */ |
732 | | void |
733 | | ovsdb_cs_force_reconnect(struct ovsdb_cs *cs) |
734 | 0 | { |
735 | 0 | if (cs->session) { |
736 | 0 | if (cs->state == CS_S_MONITORING) { |
737 | | /* The ovsdb-cs was in MONITORING state, so we either had data |
738 | | * inconsistency on this server, or it stopped being the cluster |
739 | | * leader, or the user requested to re-connect. Avoiding backoff |
740 | | * in these cases, as we need to re-connect as soon as possible. |
741 | | * Connections that are not in MONITORING state should have their |
742 | | * backoff to avoid constant flood of re-connection attempts in |
743 | | * case there is no suitable database server. */ |
744 | 0 | jsonrpc_session_reset_backoff(cs->session); |
745 | 0 | } |
746 | 0 | jsonrpc_session_force_reconnect(cs->session); |
747 | 0 | } |
748 | 0 | } |
749 | | |
750 | | /* Drops 'cs''s current connection and the cached session. This is useful if |
751 | | * the client notices some kind of inconsistency. */ |
752 | | void |
753 | | ovsdb_cs_flag_inconsistency(struct ovsdb_cs *cs) |
754 | 0 | { |
755 | 0 | cs->data.last_id = UUID_ZERO; |
756 | 0 | ovsdb_cs_retry(cs); |
757 | 0 | } |
758 | | |
759 | | /* Returns true if 'cs' is currently connected or will eventually try to |
760 | | * reconnect. */ |
761 | | bool |
762 | | ovsdb_cs_is_alive(const struct ovsdb_cs *cs) |
763 | 0 | { |
764 | 0 | return (cs->session |
765 | 0 | && jsonrpc_session_is_alive(cs->session) |
766 | 0 | && cs->state != CS_S_ERROR); |
767 | 0 | } |
768 | | |
769 | | /* Returns true if 'cs' is currently connected to a server. */ |
770 | | bool |
771 | | ovsdb_cs_is_connected(const struct ovsdb_cs *cs) |
772 | 0 | { |
773 | 0 | return cs->session && jsonrpc_session_is_connected(cs->session); |
774 | 0 | } |
775 | | |
776 | | /* Returns the last error reported on a connection by 'cs'. The return value |
777 | | * is 0 only if no connection made by 'cs' has ever encountered an error and |
778 | | * a negative response to a schema request has never been received. See |
779 | | * jsonrpc_get_status() for jsonrpc_session_get_last_error() return value |
780 | | * interpretation. */ |
781 | | int |
782 | | ovsdb_cs_get_last_error(const struct ovsdb_cs *cs) |
783 | 0 | { |
784 | 0 | int err = cs->session ? jsonrpc_session_get_last_error(cs->session) : 0; |
785 | 0 | if (err) { |
786 | 0 | return err; |
787 | 0 | } else if (cs->state == CS_S_ERROR) { |
788 | 0 | return ENOENT; |
789 | 0 | } else { |
790 | 0 | return 0; |
791 | 0 | } |
792 | 0 | } |
793 | | |
794 | | /* Sets all the JSON-RPC session 'options' for 'cs''s current session. */ |
795 | | void |
796 | | ovsdb_cs_set_jsonrpc_options(const struct ovsdb_cs *cs, |
797 | | const struct jsonrpc_session_options *options) |
798 | 0 | { |
799 | 0 | if (cs->session) { |
800 | 0 | jsonrpc_session_set_options(cs->session, options); |
801 | 0 | } |
802 | 0 | } |
803 | | |
804 | | /* Sets the "probe interval" for 'cs''s current session to 'probe_interval', in |
805 | | * milliseconds. */ |
806 | | void |
807 | | ovsdb_cs_set_probe_interval(const struct ovsdb_cs *cs, int probe_interval) |
808 | 0 | { |
809 | 0 | if (cs->session) { |
810 | 0 | jsonrpc_session_set_probe_interval(cs->session, probe_interval); |
811 | 0 | } |
812 | 0 | } |
813 | | |
814 | | /* Conditional monitoring. */ |
815 | | |
816 | | /* A table being monitored. |
817 | | * |
818 | | * At the CS layer, the only thing we care about, table-wise, is the conditions |
819 | | * we're using for monitoring them, so there's little here. We only create |
820 | | * these table structures at all for tables that have conditions. */ |
821 | | struct ovsdb_cs_db_table { |
822 | | struct hmap_node hmap_node; /* Indexed by 'name'. */ |
823 | | char *name; /* Table name. */ |
824 | | |
825 | | /* Each of these is a null pointer if it is empty, or JSON [<condition>*] |
826 | | * or [true] or [false] otherwise. [true] could be represented as a null |
827 | | * pointer, but we want to distinguish "empty slot" from "a condition that |
828 | | * is always true" in a slot. */ |
829 | | struct json *ack_cond; /* Last condition acked by the server. */ |
830 | | struct json *req_cond; /* Last condition requested to the server. */ |
831 | | struct json *new_cond; /* Latest condition set by the IDL client. */ |
832 | | }; |
833 | | |
834 | | /* A kind of condition, so that we can treat equivalent JSON as equivalent. */ |
835 | | enum condition_type { |
836 | | COND_FALSE, /* [] or [false] */ |
837 | | COND_TRUE, /* Null pointer or [true] */ |
838 | | COND_OTHER /* Anything else. */ |
839 | | }; |
840 | | |
841 | | /* Returns the condition_type for 'condition'. */ |
842 | | static enum condition_type |
843 | | condition_classify(const struct json *condition) |
844 | 0 | { |
845 | 0 | if (condition) { |
846 | 0 | switch (json_array_size(condition)) { |
847 | 0 | case 0: |
848 | 0 | return COND_FALSE; |
849 | | |
850 | 0 | case 1: { |
851 | 0 | const struct json *cond = json_array_at(condition, 0); |
852 | |
|
853 | 0 | return (cond->type == JSON_FALSE ? COND_FALSE |
854 | 0 | : cond->type == JSON_TRUE ? COND_TRUE |
855 | 0 | : COND_OTHER); |
856 | 0 | } |
857 | | |
858 | 0 | default: |
859 | 0 | return COND_OTHER; |
860 | 0 | } |
861 | 0 | } else { |
862 | 0 | return COND_TRUE; |
863 | 0 | } |
864 | 0 | } |
865 | | |
866 | | /* Returns true if 'a' and 'b' are the same condition (in an obvious way; we're |
867 | | * not going to compare for boolean equivalence or anything). */ |
868 | | static bool |
869 | | condition_equal(const struct json *a, const struct json *b) |
870 | 0 | { |
871 | 0 | enum condition_type type = condition_classify(a); |
872 | 0 | return (type == condition_classify(b) |
873 | 0 | && (type != COND_OTHER || json_equal(a, b))); |
874 | 0 | } |
875 | | |
876 | | /* Returns a clone of 'condition', translating always-true and always-false to |
877 | | * [true] and [false], respectively. */ |
878 | | static struct json * |
879 | | condition_clone(const struct json *condition) |
880 | 0 | { |
881 | 0 | switch (condition_classify(condition)) { |
882 | 0 | case COND_TRUE: |
883 | 0 | return json_array_create_1(json_boolean_create(true)); |
884 | | |
885 | 0 | case COND_FALSE: |
886 | 0 | return json_array_create_1(json_boolean_create(false)); |
887 | | |
888 | 0 | case COND_OTHER: |
889 | 0 | return json_clone(condition); |
890 | 0 | } |
891 | | |
892 | 0 | OVS_NOT_REACHED(); |
893 | 0 | } |
894 | | |
895 | | /* Returns the ovsdb_cs_db_table associated with 'table' in 'db', creating an |
896 | | * empty one if necessary. */ |
897 | | static struct ovsdb_cs_db_table * |
898 | | ovsdb_cs_db_get_table(struct ovsdb_cs_db *db, const char *table) |
899 | 0 | { |
900 | 0 | uint32_t hash = hash_string(table, 0); |
901 | 0 | struct ovsdb_cs_db_table *t; |
902 | |
|
903 | 0 | HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &db->tables) { |
904 | 0 | if (!strcmp(t->name, table)) { |
905 | 0 | return t; |
906 | 0 | } |
907 | 0 | } |
908 | | |
909 | 0 | t = xzalloc(sizeof *t); |
910 | 0 | t->name = xstrdup(table); |
911 | 0 | t->ack_cond = json_array_create_1(json_boolean_create(true)); |
912 | 0 | hmap_insert(&db->tables, &t->hmap_node, hash); |
913 | 0 | return t; |
914 | 0 | } |
915 | | |
916 | | static void |
917 | | ovsdb_cs_db_destroy_tables(struct ovsdb_cs_db *db) |
918 | 0 | { |
919 | 0 | struct ovsdb_cs_db_table *table; |
920 | 0 | HMAP_FOR_EACH_SAFE (table, hmap_node, &db->tables) { |
921 | 0 | json_destroy(table->ack_cond); |
922 | 0 | json_destroy(table->req_cond); |
923 | 0 | json_destroy(table->new_cond); |
924 | 0 | hmap_remove(&db->tables, &table->hmap_node); |
925 | 0 | free(table->name); |
926 | 0 | free(table); |
927 | 0 | } |
928 | 0 | hmap_destroy(&db->tables); |
929 | 0 | } |
930 | | |
931 | | static unsigned int |
932 | | ovsdb_cs_db_set_condition(struct ovsdb_cs_db *db, const char *table, |
933 | | const struct json *condition) |
934 | 0 | { |
935 | | /* Compare the new condition to the last known condition which can be |
936 | | * either "new" (not sent yet), "requested" or "acked", in this order. */ |
937 | 0 | struct ovsdb_cs_db_table *t = ovsdb_cs_db_get_table(db, table); |
938 | 0 | const struct json *table_cond = (t->new_cond ? t->new_cond |
939 | 0 | : t->req_cond ? t->req_cond |
940 | 0 | : t->ack_cond); |
941 | 0 | if (!condition_equal(condition, table_cond)) { |
942 | 0 | json_destroy(t->new_cond); |
943 | 0 | t->new_cond = condition_clone(condition); |
944 | 0 | db->cond_changed = true; |
945 | 0 | poll_immediate_wake(); |
946 | 0 | } |
947 | | |
948 | | /* Conditions will be up to date when we receive replies for already |
949 | | * requested and new conditions, if any. This includes condition change |
950 | | * requests for other tables too. |
951 | | */ |
952 | 0 | if (t->new_cond) { |
953 | | /* New condition will be sent out after all already requested ones |
954 | | * are acked. |
955 | | */ |
956 | 0 | bool any_req_cond = false; |
957 | 0 | HMAP_FOR_EACH (t, hmap_node, &db->tables) { |
958 | 0 | if (t->req_cond) { |
959 | 0 | any_req_cond = true; |
960 | 0 | break; |
961 | 0 | } |
962 | 0 | } |
963 | 0 | return db->cond_seqno + any_req_cond + 1; |
964 | 0 | } else { |
965 | | /* Already requested conditions should be up to date at |
966 | | * db->cond_seqno + 1 while acked conditions are already up to date. |
967 | | */ |
968 | 0 | return db->cond_seqno + !!t->req_cond; |
969 | 0 | } |
970 | 0 | } |
971 | | |
972 | | /* Sets the replication condition for 'tc' in 'cs' to 'condition' and arranges |
973 | | * to send the new condition to the database server. |
974 | | * |
975 | | * Return the next conditional update sequence number. When this value and |
976 | | * ovsdb_cs_get_condition_seqno() matches, 'cs' contains rows that match the |
977 | | * 'condition'. */ |
978 | | unsigned int |
979 | | ovsdb_cs_set_condition(struct ovsdb_cs *cs, const char *table, |
980 | | const struct json *condition) |
981 | 0 | { |
982 | 0 | return ovsdb_cs_db_set_condition(&cs->data, table, condition); |
983 | 0 | } |
984 | | |
985 | | /* Returns a "sequence number" that represents the number of conditional |
986 | | * monitoring updates successfully received by the OVSDB server of a CS |
987 | | * connection. |
988 | | * |
989 | | * ovsdb_cs_set_condition() sets a new condition that is different from the |
990 | | * current condtion, the next expected "sequence number" is returned. |
991 | | * |
992 | | * Whenever ovsdb_cs_get_condition_seqno() returns a value that matches the |
993 | | * return value of ovsdb_cs_set_condition(), the client is assured that: |
994 | | * |
995 | | * - The ovsdb_cs_set_condition() changes has been acknowledged by the OVSDB |
996 | | * server. |
997 | | * |
998 | | * - 'cs' now contains the content matches the new conditions. */ |
999 | | unsigned int |
1000 | | ovsdb_cs_get_condition_seqno(const struct ovsdb_cs *cs) |
1001 | 0 | { |
1002 | 0 | return cs->data.cond_seqno; |
1003 | 0 | } |
1004 | | |
1005 | | static struct json * |
1006 | | ovsdb_cs_create_cond_change_req(const struct json *cond) |
1007 | 0 | { |
1008 | 0 | struct json *monitor_cond_change_request = json_object_create(); |
1009 | 0 | json_object_put(monitor_cond_change_request, "where", json_clone(cond)); |
1010 | 0 | return monitor_cond_change_request; |
1011 | 0 | } |
1012 | | |
1013 | | static struct jsonrpc_msg * |
1014 | | ovsdb_cs_db_compose_cond_change(struct ovsdb_cs_db *db) |
1015 | 0 | { |
1016 | 0 | if (!db->cond_changed) { |
1017 | 0 | return NULL; |
1018 | 0 | } |
1019 | | |
1020 | 0 | struct json *monitor_cond_change_requests = NULL; |
1021 | 0 | struct ovsdb_cs_db_table *table; |
1022 | 0 | HMAP_FOR_EACH (table, hmap_node, &db->tables) { |
1023 | | /* Always use the most recent conditions set by the CS client when |
1024 | | * requesting monitor_cond_change, i.e., table->new_cond. |
1025 | | */ |
1026 | 0 | if (table->new_cond) { |
1027 | 0 | struct json *req = |
1028 | 0 | ovsdb_cs_create_cond_change_req(table->new_cond); |
1029 | 0 | if (req) { |
1030 | 0 | if (!monitor_cond_change_requests) { |
1031 | 0 | monitor_cond_change_requests = json_object_create(); |
1032 | 0 | } |
1033 | 0 | json_object_put(monitor_cond_change_requests, |
1034 | 0 | table->name, |
1035 | 0 | json_array_create_1(req)); |
1036 | 0 | } |
1037 | | /* Mark the new condition as requested by moving it to req_cond. |
1038 | | * If there's already requested condition that's a bug. |
1039 | | */ |
1040 | 0 | ovs_assert(table->req_cond == NULL); |
1041 | 0 | table->req_cond = table->new_cond; |
1042 | 0 | table->new_cond = NULL; |
1043 | 0 | } |
1044 | 0 | } |
1045 | |
|
1046 | 0 | if (!monitor_cond_change_requests) { |
1047 | 0 | return NULL; |
1048 | 0 | } |
1049 | | |
1050 | 0 | db->cond_changed = false; |
1051 | 0 | struct json *params = json_array_create_3(json_clone(db->monitor_id), |
1052 | 0 | json_clone(db->monitor_id), |
1053 | 0 | monitor_cond_change_requests); |
1054 | 0 | return jsonrpc_create_request("monitor_cond_change", params, NULL); |
1055 | 0 | } |
1056 | | |
1057 | | /* Marks all requested table conditions in 'db' as acked by the server. |
1058 | | * It should be called when the server replies to monitor_cond_change |
1059 | | * requests. |
1060 | | */ |
1061 | | static void |
1062 | | ovsdb_cs_db_ack_condition(struct ovsdb_cs_db *db) |
1063 | 0 | { |
1064 | 0 | struct ovsdb_cs_db_table *table; |
1065 | 0 | HMAP_FOR_EACH (table, hmap_node, &db->tables) { |
1066 | 0 | if (table->req_cond) { |
1067 | 0 | json_destroy(table->ack_cond); |
1068 | 0 | table->ack_cond = table->req_cond; |
1069 | 0 | table->req_cond = NULL; |
1070 | 0 | } |
1071 | 0 | } |
1072 | 0 | } |
1073 | | |
1074 | | /* Should be called when the CS fsm is restarted and resyncs table conditions |
1075 | | * based on the state the DB is in: |
1076 | | * - if a non-zero last_id is available for the DB then upon reconnect |
1077 | | * the CS should first request acked conditions to avoid missing updates |
1078 | | * about records that were added before the transaction with |
1079 | | * txn-id == last_id. If there were requested condition changes in flight |
1080 | | * (i.e., req_cond not NULL) and the CS client didn't set new conditions |
1081 | | * (i.e., new_cond is NULL) then move req_cond to new_cond to trigger a |
1082 | | * follow up monitor_cond_change request. |
1083 | | * - if there's no last_id available for the DB then it's safe to use the |
1084 | | * latest conditions set by the CS client even if they weren't acked yet. |
1085 | | */ |
1086 | | static void |
1087 | | ovsdb_cs_db_sync_condition(struct ovsdb_cs_db *db) |
1088 | 0 | { |
1089 | 0 | bool ack_all = uuid_is_zero(&db->last_id); |
1090 | 0 | if (ack_all) { |
1091 | 0 | db->cond_changed = false; |
1092 | 0 | } |
1093 | |
|
1094 | 0 | struct ovsdb_cs_db_table *table; |
1095 | 0 | HMAP_FOR_EACH (table, hmap_node, &db->tables) { |
1096 | | /* When monitor_cond_since requests will be issued, the |
1097 | | * table->ack_cond condition will be added to the "where" clause". |
1098 | | * Follow up monitor_cond_change requests will use table->new_cond. |
1099 | | */ |
1100 | 0 | if (ack_all) { |
1101 | 0 | if (table->new_cond) { |
1102 | 0 | json_destroy(table->req_cond); |
1103 | 0 | table->req_cond = table->new_cond; |
1104 | 0 | table->new_cond = NULL; |
1105 | 0 | } |
1106 | |
|
1107 | 0 | if (table->req_cond) { |
1108 | 0 | json_destroy(table->ack_cond); |
1109 | 0 | table->ack_cond = table->req_cond; |
1110 | 0 | table->req_cond = NULL; |
1111 | 0 | } |
1112 | 0 | } else { |
1113 | 0 | if (table->req_cond) { |
1114 | | /* There was an in-flight monitor_cond_change request. It's no |
1115 | | * longer relevant in the restarted FSM, so clear it. */ |
1116 | 0 | if (table->new_cond) { |
1117 | | /* We will send a new monitor_cond_change with the new |
1118 | | * condition. The previously in-flight condition is |
1119 | | * irrelevant and we can just forget about it. */ |
1120 | 0 | json_destroy(table->req_cond); |
1121 | 0 | } else { |
1122 | | /* The restarted FSM needs to again send a request for the |
1123 | | * previously in-flight condition. */ |
1124 | 0 | table->new_cond = table->req_cond; |
1125 | 0 | } |
1126 | 0 | table->req_cond = NULL; |
1127 | 0 | db->cond_changed = true; |
1128 | | |
1129 | | /* There are two cases: |
1130 | | * a. either the server already processed the requested monitor |
1131 | | * condition change but the FSM was restarted before the |
1132 | | * client was notified. In this case the client should |
1133 | | * clear its local cache because it's out of sync with the |
1134 | | * monitor view on the server side. |
1135 | | * |
1136 | | * b. OR the server hasn't processed the requested monitor |
1137 | | * condition change yet. |
1138 | | * |
1139 | | * As there's no easy way to differentiate between the two, |
1140 | | * and given that this condition should be rare, reset the |
1141 | | * 'last_id', essentially flushing the local cached DB |
1142 | | * contents. |
1143 | | */ |
1144 | 0 | db->last_id = UUID_ZERO; |
1145 | 0 | } |
1146 | 0 | } |
1147 | 0 | } |
1148 | 0 | } |
1149 | | |
1150 | | static void |
1151 | | ovsdb_cs_send_cond_change(struct ovsdb_cs *cs) |
1152 | 0 | { |
1153 | | /* When 'cs->request_id' is not NULL, there is an outstanding |
1154 | | * conditional monitoring update request that we have not heard |
1155 | | * from the server yet. Don't generate another request in this case. */ |
1156 | 0 | if (!jsonrpc_session_is_connected(cs->session) |
1157 | 0 | || cs->data.monitor_version == 1 |
1158 | 0 | || cs->request_id) { |
1159 | 0 | return; |
1160 | 0 | } |
1161 | | |
1162 | 0 | struct jsonrpc_msg *msg = ovsdb_cs_db_compose_cond_change(&cs->data); |
1163 | 0 | if (msg) { |
1164 | 0 | cs->request_id = json_clone(msg->id); |
1165 | 0 | jsonrpc_session_send(cs->session, msg); |
1166 | 0 | } |
1167 | 0 | } |
1168 | | |
1169 | | /* Database change awareness. */ |
1170 | | |
1171 | | /* By default, or if 'set_db_change_aware' is true, 'cs' will send |
1172 | | * 'set_db_change_aware' request to the server after receiving the _SERVER data |
1173 | | * (when the server supports it), which is useful for clients that intends to |
1174 | | * keep long connections to the server. Otherwise, 'cs' will not send the |
1175 | | * 'set_db_change_aware' request, which is more reasonable for short-lived |
1176 | | * connections to avoid unnecessary processing at the server side and possible |
1177 | | * error handling due to connections being closed by the clients before the |
1178 | | * responses are sent by the server. */ |
1179 | | void |
1180 | | ovsdb_cs_set_db_change_aware(struct ovsdb_cs *cs, bool set_db_change_aware) |
1181 | 0 | { |
1182 | 0 | cs->set_db_change_aware = set_db_change_aware; |
1183 | 0 | } |
1184 | | |
1185 | | /* Clustered servers. */ |
1186 | | |
1187 | | /* By default, or if 'leader_only' is true, when 'cs' connects to a clustered |
1188 | | * database, the CS layer will avoid servers other than the cluster |
1189 | | * leader. This ensures that any data that it reads and reports is up-to-date. |
1190 | | * If 'leader_only' is false, the CS layer will accept any server in the |
1191 | | * cluster, which means that for read-only transactions it can report and act |
1192 | | * on stale data (transactions that modify the database are always serialized |
1193 | | * even with false 'leader_only'). Refer to Understanding Cluster Consistency |
1194 | | * in ovsdb(7) for more information. */ |
1195 | | void |
1196 | | ovsdb_cs_set_leader_only(struct ovsdb_cs *cs, bool leader_only) |
1197 | 0 | { |
1198 | 0 | cs->leader_only = leader_only; |
1199 | 0 | if (leader_only && cs->server.monitor_version) { |
1200 | 0 | ovsdb_cs_check_server_db(cs); |
1201 | 0 | } |
1202 | 0 | } |
1203 | | |
1204 | | /* Set whether the order of remotes should be shuffled, when there is more than |
1205 | | * one remote. The setting doesn't take effect until the next time when |
1206 | | * ovsdb_cs_set_remote() is called. */ |
1207 | | void |
1208 | | ovsdb_cs_set_shuffle_remotes(struct ovsdb_cs *cs, bool shuffle) |
1209 | 0 | { |
1210 | 0 | cs->shuffle_remotes = shuffle; |
1211 | 0 | } |
1212 | | |
1213 | | /* Reset min_index to 0. This prevents a situation where the client |
1214 | | * thinks all databases have stale data, when they actually have all |
1215 | | * been destroyed and rebuilt from scratch. |
1216 | | */ |
1217 | | void |
1218 | | ovsdb_cs_reset_min_index(struct ovsdb_cs *cs) |
1219 | 0 | { |
1220 | 0 | cs->min_index = 0; |
1221 | 0 | } |
1222 | | |
1223 | | /* Database locks. */ |
1224 | | |
1225 | | static struct jsonrpc_msg * |
1226 | | ovsdb_cs_db_set_lock(struct ovsdb_cs_db *db, const char *lock_name) |
1227 | 0 | { |
1228 | 0 | if (db->lock_name |
1229 | 0 | && (!lock_name || strcmp(lock_name, db->lock_name))) { |
1230 | | /* Release previous lock. */ |
1231 | 0 | struct jsonrpc_msg *msg = ovsdb_cs_db_compose_unlock_request(db); |
1232 | 0 | free(db->lock_name); |
1233 | 0 | db->lock_name = NULL; |
1234 | 0 | db->is_lock_contended = false; |
1235 | 0 | return msg; |
1236 | 0 | } |
1237 | | |
1238 | 0 | if (lock_name && !db->lock_name) { |
1239 | | /* Acquire new lock. */ |
1240 | 0 | db->lock_name = xstrdup(lock_name); |
1241 | 0 | return ovsdb_cs_db_compose_lock_request(db); |
1242 | 0 | } |
1243 | | |
1244 | 0 | return NULL; |
1245 | 0 | } |
1246 | | |
1247 | | /* If 'lock_name' is nonnull, configures 'cs' to obtain the named lock from the |
1248 | | * database server and to prevent modifying the database when the lock cannot |
1249 | | * be acquired (that is, when another client has the same lock). |
1250 | | * |
1251 | | * If 'lock_name' is NULL, drops the locking requirement and releases the |
1252 | | * lock. */ |
1253 | | void |
1254 | | ovsdb_cs_set_lock(struct ovsdb_cs *cs, const char *lock_name) |
1255 | 0 | { |
1256 | 0 | for (;;) { |
1257 | 0 | struct jsonrpc_msg *msg = ovsdb_cs_db_set_lock(&cs->data, lock_name); |
1258 | 0 | if (!msg) { |
1259 | 0 | break; |
1260 | 0 | } |
1261 | 0 | if (cs->session) { |
1262 | 0 | jsonrpc_session_send(cs->session, msg); |
1263 | 0 | } else { |
1264 | 0 | jsonrpc_msg_destroy(msg); |
1265 | 0 | } |
1266 | 0 | } |
1267 | 0 | } |
1268 | | |
1269 | | /* Returns the name of the lock that 'cs' is trying to obtain, or NULL if none |
1270 | | * is configured. */ |
1271 | | const char * |
1272 | | ovsdb_cs_get_lock(const struct ovsdb_cs *cs) |
1273 | 0 | { |
1274 | 0 | return cs->data.lock_name; |
1275 | 0 | } |
1276 | | |
1277 | | /* Returns true if 'cs' is configured to obtain a lock and owns that lock, |
1278 | | * false if it doesn't own the lock or isn't configured to obtain one. |
1279 | | * |
1280 | | * Locking and unlocking happens asynchronously from the database client's |
1281 | | * point of view, so the information is only useful for optimization (e.g. if |
1282 | | * the client doesn't have the lock then there's no point in trying to write to |
1283 | | * the database). */ |
1284 | | bool |
1285 | | ovsdb_cs_has_lock(const struct ovsdb_cs *cs) |
1286 | 0 | { |
1287 | 0 | return cs->data.has_lock; |
1288 | 0 | } |
1289 | | |
1290 | | /* Returns true if 'cs' is configured to obtain a lock but the database server |
1291 | | * has indicated that some other client already owns the requested lock. */ |
1292 | | bool |
1293 | | ovsdb_cs_is_lock_contended(const struct ovsdb_cs *cs) |
1294 | 0 | { |
1295 | 0 | return cs->data.is_lock_contended; |
1296 | 0 | } |
1297 | | |
1298 | | static void |
1299 | | ovsdb_cs_db_update_has_lock(struct ovsdb_cs_db *db, bool new_has_lock) |
1300 | 0 | { |
1301 | 0 | if (new_has_lock && !db->has_lock) { |
1302 | 0 | ovsdb_cs_db_add_event(db, OVSDB_CS_EVENT_TYPE_LOCKED); |
1303 | 0 | db->is_lock_contended = false; |
1304 | 0 | } |
1305 | 0 | db->has_lock = new_has_lock; |
1306 | 0 | } |
1307 | | |
1308 | | static bool |
1309 | | ovsdb_cs_db_process_lock_replies(struct ovsdb_cs_db *db, |
1310 | | const struct jsonrpc_msg *msg) |
1311 | 0 | { |
1312 | 0 | if (msg->type == JSONRPC_REPLY |
1313 | 0 | && db->lock_request_id |
1314 | 0 | && json_equal(db->lock_request_id, msg->id)) { |
1315 | | /* Reply to our "lock" request. */ |
1316 | 0 | ovsdb_cs_db_parse_lock_reply(db, msg->result); |
1317 | 0 | return true; |
1318 | 0 | } |
1319 | | |
1320 | 0 | if (msg->type == JSONRPC_NOTIFY) { |
1321 | 0 | if (!strcmp(msg->method, "locked")) { |
1322 | | /* We got our lock. */ |
1323 | 0 | return ovsdb_cs_db_parse_lock_notify(db, msg->params, true); |
1324 | 0 | } else if (!strcmp(msg->method, "stolen")) { |
1325 | | /* Someone else stole our lock. */ |
1326 | 0 | return ovsdb_cs_db_parse_lock_notify(db, msg->params, false); |
1327 | 0 | } |
1328 | 0 | } |
1329 | | |
1330 | 0 | return false; |
1331 | 0 | } |
1332 | | |
1333 | | static struct jsonrpc_msg * |
1334 | | ovsdb_cs_db_compose_lock_request__(struct ovsdb_cs_db *db, |
1335 | | const char *method) |
1336 | 0 | { |
1337 | 0 | ovsdb_cs_db_update_has_lock(db, false); |
1338 | |
|
1339 | 0 | json_destroy(db->lock_request_id); |
1340 | 0 | db->lock_request_id = NULL; |
1341 | |
|
1342 | 0 | struct json *params = json_array_create_1(json_string_create( |
1343 | 0 | db->lock_name)); |
1344 | 0 | return jsonrpc_create_request(method, params, NULL); |
1345 | 0 | } |
1346 | | |
1347 | | static struct jsonrpc_msg * |
1348 | | ovsdb_cs_db_compose_lock_request(struct ovsdb_cs_db *db) |
1349 | 0 | { |
1350 | 0 | struct jsonrpc_msg *msg = ovsdb_cs_db_compose_lock_request__(db, "lock"); |
1351 | 0 | db->lock_request_id = json_clone(msg->id); |
1352 | 0 | return msg; |
1353 | 0 | } |
1354 | | |
1355 | | static struct jsonrpc_msg * |
1356 | | ovsdb_cs_db_compose_unlock_request(struct ovsdb_cs_db *db) |
1357 | 0 | { |
1358 | 0 | return ovsdb_cs_db_compose_lock_request__(db, "unlock"); |
1359 | 0 | } |
1360 | | |
1361 | | static void |
1362 | | ovsdb_cs_db_parse_lock_reply(struct ovsdb_cs_db *db, |
1363 | | const struct json *result) |
1364 | 0 | { |
1365 | 0 | bool got_lock; |
1366 | |
|
1367 | 0 | json_destroy(db->lock_request_id); |
1368 | 0 | db->lock_request_id = NULL; |
1369 | |
|
1370 | 0 | if (result->type == JSON_OBJECT) { |
1371 | 0 | const struct json *locked; |
1372 | |
|
1373 | 0 | locked = shash_find_data(json_object(result), "locked"); |
1374 | 0 | got_lock = locked && locked->type == JSON_TRUE; |
1375 | 0 | } else { |
1376 | 0 | got_lock = false; |
1377 | 0 | } |
1378 | |
|
1379 | 0 | ovsdb_cs_db_update_has_lock(db, got_lock); |
1380 | 0 | if (!got_lock) { |
1381 | 0 | db->is_lock_contended = true; |
1382 | 0 | } |
1383 | 0 | } |
1384 | | |
1385 | | static bool |
1386 | | ovsdb_cs_db_parse_lock_notify(struct ovsdb_cs_db *db, |
1387 | | const struct json *params, |
1388 | | bool new_has_lock) |
1389 | 0 | { |
1390 | 0 | if (db->lock_name |
1391 | 0 | && params->type == JSON_ARRAY |
1392 | 0 | && json_array_size(params) > 0 |
1393 | 0 | && json_array_at(params, 0)->type == JSON_STRING) { |
1394 | 0 | const char *lock_name = json_string(json_array_at(params, 0)); |
1395 | |
|
1396 | 0 | if (!strcmp(db->lock_name, lock_name)) { |
1397 | 0 | ovsdb_cs_db_update_has_lock(db, new_has_lock); |
1398 | 0 | if (!new_has_lock) { |
1399 | 0 | db->is_lock_contended = true; |
1400 | 0 | } |
1401 | 0 | return true; |
1402 | 0 | } |
1403 | 0 | } |
1404 | 0 | return false; |
1405 | 0 | } |
1406 | | |
1407 | | /* Transactions. */ |
1408 | | |
1409 | | static bool |
1410 | | ovsdb_cs_db_txn_process_reply(struct ovsdb_cs *cs, |
1411 | | const struct jsonrpc_msg *reply) |
1412 | 0 | { |
1413 | 0 | bool found = ovsdb_cs_forget_transaction(cs, reply->id); |
1414 | 0 | if (found) { |
1415 | 0 | struct ovsdb_cs_event *event |
1416 | 0 | = ovsdb_cs_db_add_event(&cs->data, OVSDB_CS_EVENT_TYPE_TXN_REPLY); |
1417 | 0 | event->txn_reply = jsonrpc_msg_clone(reply); |
1418 | 0 | } |
1419 | 0 | return found; |
1420 | 0 | } |
1421 | | |
1422 | | /* Returns true if 'cs' can be sent a transaction now, false otherwise. This |
1423 | | * is useful for optimization: there is no point in composing and sending a |
1424 | | * transaction if it returns false. */ |
1425 | | bool |
1426 | | ovsdb_cs_may_send_transaction(const struct ovsdb_cs *cs) |
1427 | 0 | { |
1428 | 0 | return (cs->session != NULL |
1429 | 0 | && cs->state == CS_S_MONITORING |
1430 | 0 | && (!cs->data.lock_name || ovsdb_cs_has_lock(cs))); |
1431 | 0 | } |
1432 | | |
1433 | | /* Attempts to send a transaction with the specified 'operations' to 'cs''s |
1434 | | * server. On success, returns the request ID; the caller must eventually free |
1435 | | * it. On failure, returns NULL. */ |
1436 | | struct json * OVS_WARN_UNUSED_RESULT |
1437 | | ovsdb_cs_send_transaction(struct ovsdb_cs *cs, struct json *operations) |
1438 | 0 | { |
1439 | 0 | if (!ovsdb_cs_may_send_transaction(cs)) { |
1440 | 0 | json_destroy(operations); |
1441 | 0 | return NULL; |
1442 | 0 | } |
1443 | | |
1444 | 0 | if (cs->data.lock_name) { |
1445 | 0 | struct json *assertion = json_object_create(); |
1446 | 0 | json_object_put_string(assertion, "op", "assert"); |
1447 | 0 | json_object_put_string(assertion, "lock", cs->data.lock_name); |
1448 | 0 | json_array_add(operations, assertion); |
1449 | 0 | } |
1450 | |
|
1451 | 0 | struct json *request_id; |
1452 | 0 | struct jsonrpc_msg *request = jsonrpc_create_request( |
1453 | 0 | "transact", operations, &request_id); |
1454 | 0 | int error = jsonrpc_session_send(cs->session, request); |
1455 | 0 | if (error) { |
1456 | 0 | json_destroy(request_id); |
1457 | 0 | return NULL; |
1458 | 0 | } |
1459 | | |
1460 | 0 | if (cs->n_txns >= cs->allocated_txns) { |
1461 | 0 | cs->txns = x2nrealloc(cs->txns, &cs->allocated_txns, |
1462 | 0 | sizeof *cs->txns); |
1463 | 0 | } |
1464 | 0 | cs->txns[cs->n_txns++] = request_id; |
1465 | 0 | return json_clone(request_id); |
1466 | 0 | } |
1467 | | |
1468 | | /* Makes 'cs' drop its record of transaction 'request_id'. If a reply arrives |
1469 | | * for it later (which it will, unless the connection drops in the meantime), |
1470 | | * it won't be reported through an event. |
1471 | | * |
1472 | | * Returns true if 'request_id' was known, false otherwise. */ |
1473 | | bool |
1474 | | ovsdb_cs_forget_transaction(struct ovsdb_cs *cs, const struct json *request_id) |
1475 | 0 | { |
1476 | 0 | for (size_t i = 0; i < cs->n_txns; i++) { |
1477 | 0 | if (json_equal(request_id, cs->txns[i])) { |
1478 | 0 | json_destroy(cs->txns[i]); |
1479 | 0 | cs->txns[i] = cs->txns[--cs->n_txns]; |
1480 | 0 | return true; |
1481 | 0 | } |
1482 | 0 | } |
1483 | 0 | return false; |
1484 | 0 | } |
1485 | | |
1486 | | static void |
1487 | | ovsdb_cs_send_schema_request(struct ovsdb_cs *cs, |
1488 | | struct ovsdb_cs_db *db) |
1489 | 0 | { |
1490 | 0 | ovsdb_cs_send_request(cs, jsonrpc_create_request( |
1491 | 0 | "get_schema", |
1492 | 0 | json_array_create_1(json_string_create( |
1493 | 0 | db->db_name)), |
1494 | 0 | NULL)); |
1495 | 0 | } |
1496 | | |
1497 | | static void |
1498 | | ovsdb_cs_send_db_change_aware(struct ovsdb_cs *cs) |
1499 | 0 | { |
1500 | 0 | struct jsonrpc_msg *msg = jsonrpc_create_request( |
1501 | 0 | "set_db_change_aware", json_array_create_1(json_boolean_create(true)), |
1502 | 0 | NULL); |
1503 | 0 | jsonrpc_session_send(cs->session, msg); |
1504 | 0 | } |
1505 | | |
1506 | | static void |
1507 | | ovsdb_cs_send_monitor_request(struct ovsdb_cs *cs, struct ovsdb_cs_db *db, |
1508 | | int version) |
1509 | 0 | { |
1510 | 0 | struct json *mrs = db->ops->compose_monitor_requests( |
1511 | 0 | db->schema, db->ops_aux); |
1512 | | /* XXX handle failure */ |
1513 | 0 | ovs_assert(mrs->type == JSON_OBJECT); |
1514 | |
|
1515 | 0 | if (version > 1) { |
1516 | 0 | struct ovsdb_cs_db_table *table; |
1517 | 0 | HMAP_FOR_EACH (table, hmap_node, &db->tables) { |
1518 | 0 | if (table->ack_cond) { |
1519 | 0 | struct json *mr = shash_find_data(json_object(mrs), |
1520 | 0 | table->name); |
1521 | 0 | if (!mr) { |
1522 | 0 | mr = json_array_create_empty(); |
1523 | 0 | json_object_put(mrs, table->name, mr); |
1524 | 0 | } |
1525 | 0 | ovs_assert(mr->type == JSON_ARRAY); |
1526 | |
|
1527 | 0 | struct json *mr0; |
1528 | 0 | if (json_array_size(mr) == 0) { |
1529 | 0 | mr0 = json_object_create(); |
1530 | 0 | json_object_put(mr0, "columns", json_array_create_empty()); |
1531 | 0 | json_array_add(mr, mr0); |
1532 | 0 | } else { |
1533 | 0 | mr0 = CONST_CAST(struct json *, json_array_at(mr, 0)); |
1534 | 0 | } |
1535 | 0 | ovs_assert(mr0->type == JSON_OBJECT); |
1536 | |
|
1537 | 0 | json_object_put(mr0, "where", |
1538 | 0 | json_clone(table->ack_cond)); |
1539 | 0 | } |
1540 | 0 | } |
1541 | 0 | } |
1542 | |
|
1543 | 0 | const char *method = (version == 1 ? "monitor" |
1544 | 0 | : version == 2 ? "monitor_cond" |
1545 | 0 | : "monitor_cond_since"); |
1546 | 0 | struct json *params = json_array_create_3( |
1547 | 0 | json_string_create(db->db_name), |
1548 | 0 | json_clone(db->monitor_id), |
1549 | 0 | mrs); |
1550 | 0 | if (version == 3) { |
1551 | 0 | json_array_add(params, json_string_create_uuid(&db->last_id)); |
1552 | 0 | } |
1553 | 0 | ovsdb_cs_send_request(cs, jsonrpc_create_request(method, params, NULL)); |
1554 | 0 | } |
1555 | | |
1556 | | static void |
1557 | | log_parse_update_error(struct ovsdb_error *error) |
1558 | 0 | { |
1559 | 0 | if (!VLOG_DROP_WARN(&syntax_rl)) { |
1560 | 0 | char *s = ovsdb_error_to_string(error); |
1561 | 0 | VLOG_WARN_RL(&syntax_rl, "%s", s); |
1562 | 0 | free(s); |
1563 | 0 | } |
1564 | 0 | ovsdb_error_destroy(error); |
1565 | 0 | } |
1566 | | |
1567 | | static void |
1568 | | ovsdb_cs_db_add_update(struct ovsdb_cs_db *db, |
1569 | | const struct json *table_updates, int version, |
1570 | | bool clear, bool monitor_reply) |
1571 | 0 | { |
1572 | 0 | struct ovsdb_cs_event *event = ovsdb_cs_db_add_event( |
1573 | 0 | db, OVSDB_CS_EVENT_TYPE_UPDATE); |
1574 | 0 | event->update = (struct ovsdb_cs_update_event) { |
1575 | 0 | .table_updates = json_clone(table_updates), |
1576 | 0 | .clear = clear, |
1577 | 0 | .monitor_reply = monitor_reply, |
1578 | 0 | .version = version, |
1579 | 0 | .last_id = db->last_id, |
1580 | 0 | }; |
1581 | 0 | } |
1582 | | |
1583 | | static void |
1584 | | ovsdb_cs_db_parse_monitor_reply(struct ovsdb_cs_db *db, |
1585 | | const struct json *result, int version) |
1586 | 0 | { |
1587 | 0 | const struct json *table_updates; |
1588 | 0 | bool clear; |
1589 | 0 | if (version == 3) { |
1590 | 0 | if (result->type != JSON_ARRAY || json_array_size(result) != 3 |
1591 | 0 | || (json_array_at(result, 0)->type != JSON_TRUE && |
1592 | 0 | json_array_at(result, 0)->type != JSON_FALSE) |
1593 | 0 | || json_array_at(result, 1)->type != JSON_STRING |
1594 | 0 | || !uuid_from_string(&db->last_id, |
1595 | 0 | json_string(json_array_at(result, 1)))) { |
1596 | 0 | struct ovsdb_error *error = ovsdb_syntax_error( |
1597 | 0 | result, NULL, "bad monitor_cond_since reply format"); |
1598 | 0 | log_parse_update_error(error); |
1599 | 0 | return; |
1600 | 0 | } |
1601 | | |
1602 | 0 | bool found = json_boolean(json_array_at(result, 0)); |
1603 | 0 | clear = !found; |
1604 | 0 | table_updates = json_array_at(result, 2); |
1605 | 0 | } else { |
1606 | 0 | clear = true; |
1607 | 0 | table_updates = result; |
1608 | 0 | } |
1609 | | |
1610 | 0 | ovsdb_cs_db_add_update(db, table_updates, version, clear, true); |
1611 | 0 | } |
1612 | | |
1613 | | static bool |
1614 | | ovsdb_cs_db_parse_update_rpc(struct ovsdb_cs_db *db, |
1615 | | const struct jsonrpc_msg *msg) |
1616 | 0 | { |
1617 | 0 | if (msg->type != JSONRPC_NOTIFY) { |
1618 | 0 | return false; |
1619 | 0 | } |
1620 | | |
1621 | 0 | int version = (!strcmp(msg->method, "update") ? 1 |
1622 | 0 | : !strcmp(msg->method, "update2") ? 2 |
1623 | 0 | : !strcmp(msg->method, "update3") ? 3 |
1624 | 0 | : 0); |
1625 | 0 | if (!version) { |
1626 | 0 | return false; |
1627 | 0 | } |
1628 | | |
1629 | 0 | struct json *params = msg->params; |
1630 | 0 | int n = version == 3 ? 3 : 2; |
1631 | 0 | if (params->type != JSON_ARRAY || json_array_size(params) != n) { |
1632 | 0 | struct ovsdb_error *error = ovsdb_syntax_error( |
1633 | 0 | params, NULL, "%s must be an array with %u elements.", |
1634 | 0 | msg->method, n); |
1635 | 0 | log_parse_update_error(error); |
1636 | 0 | return false; |
1637 | 0 | } |
1638 | | |
1639 | 0 | if (!json_equal(json_array_at(params, 0), db->monitor_id)) { |
1640 | 0 | return false; |
1641 | 0 | } |
1642 | | |
1643 | 0 | if (version == 3) { |
1644 | 0 | const char *last_id = json_string(json_array_at(params, 1)); |
1645 | 0 | if (!uuid_from_string(&db->last_id, last_id)) { |
1646 | 0 | struct ovsdb_error *error = ovsdb_syntax_error( |
1647 | 0 | params, NULL, "Last-id %s is not in UUID format.", last_id); |
1648 | 0 | log_parse_update_error(error); |
1649 | 0 | return false; |
1650 | 0 | } |
1651 | 0 | } |
1652 | | |
1653 | 0 | const struct json *table_updates = json_array_at(params, |
1654 | 0 | version == 3 ? 2 : 1); |
1655 | 0 | ovsdb_cs_db_add_update(db, table_updates, version, false, false); |
1656 | 0 | return true; |
1657 | 0 | } |
1658 | | |
1659 | | static bool |
1660 | | ovsdb_cs_handle_monitor_canceled(struct ovsdb_cs *cs, |
1661 | | struct ovsdb_cs_db *db, |
1662 | | const struct jsonrpc_msg *msg) |
1663 | 0 | { |
1664 | 0 | if (msg->type != JSONRPC_NOTIFY |
1665 | 0 | || strcmp(msg->method, "monitor_canceled") |
1666 | 0 | || msg->params->type != JSON_ARRAY |
1667 | 0 | || json_array_size(msg->params) != 1 |
1668 | 0 | || !json_equal(json_array_at(msg->params, 0), db->monitor_id)) { |
1669 | 0 | return false; |
1670 | 0 | } |
1671 | | |
1672 | 0 | db->monitor_version = 0; |
1673 | | |
1674 | | /* Cancel the other monitor and restart the FSM from the top. |
1675 | | * |
1676 | | * Maybe a more sophisticated response would be better in some cases, but |
1677 | | * it doesn't seem worth optimizing yet. (Although this is already more |
1678 | | * sophisticated than just dropping the connection and reconnecting.) */ |
1679 | 0 | struct ovsdb_cs_db *other_db |
1680 | 0 | = db == &cs->data ? &cs->server : &cs->data; |
1681 | 0 | if (other_db->monitor_version) { |
1682 | 0 | jsonrpc_session_send( |
1683 | 0 | cs->session, |
1684 | 0 | jsonrpc_create_request( |
1685 | 0 | "monitor_cancel", |
1686 | 0 | json_array_create_1(json_clone(other_db->monitor_id)), NULL)); |
1687 | 0 | other_db->monitor_version = 0; |
1688 | 0 | } |
1689 | 0 | ovsdb_cs_restart_fsm(cs); |
1690 | |
|
1691 | 0 | return true; |
1692 | 0 | } |
1693 | | |
1694 | | /* The _Server database. |
1695 | | * |
1696 | | * We replicate the Database table in the _Server database because this is the |
1697 | | * only way to find out properties we need to know for clustering, such as |
1698 | | * whether a database is clustered at all and whether this server is the |
1699 | | * leader. |
1700 | | * |
1701 | | * This code implements a kind of simple IDL-like layer. */ |
1702 | | |
1703 | | struct server_column { |
1704 | | const char *name; |
1705 | | struct ovsdb_type type; |
1706 | | }; |
1707 | | enum server_column_index { |
1708 | | COL_NAME, |
1709 | | COL_MODEL, |
1710 | | COL_CONNECTED, |
1711 | | COL_LEADER, |
1712 | | COL_SCHEMA, |
1713 | | COL_CID, |
1714 | | COL_INDEX, |
1715 | | }; |
1716 | | #define OPTIONAL_COLUMN(TYPE) \ |
1717 | | { \ |
1718 | | .key = OVSDB_BASE_##TYPE##_INIT, \ |
1719 | | .value = OVSDB_BASE_VOID_INIT, \ |
1720 | | .n_min = 0, \ |
1721 | | .n_max = 1 \ |
1722 | | } |
1723 | | static const struct server_column server_columns[] = { |
1724 | | [COL_NAME] = {"name", OPTIONAL_COLUMN(STRING) }, |
1725 | | [COL_MODEL] = {"model", OPTIONAL_COLUMN(STRING) }, |
1726 | | [COL_CONNECTED] = {"connected", OPTIONAL_COLUMN(BOOLEAN) }, |
1727 | | [COL_LEADER] = {"leader", OPTIONAL_COLUMN(BOOLEAN) }, |
1728 | | [COL_SCHEMA] = {"schema", OPTIONAL_COLUMN(STRING) }, |
1729 | | [COL_CID] = {"cid", OPTIONAL_COLUMN(UUID) }, |
1730 | | [COL_INDEX] = {"index", OPTIONAL_COLUMN(INTEGER) }, |
1731 | | }; |
1732 | 0 | #define N_SERVER_COLUMNS ARRAY_SIZE(server_columns) |
1733 | | struct server_row { |
1734 | | struct hmap_node hmap_node; |
1735 | | struct uuid uuid; |
1736 | | struct ovsdb_datum data[N_SERVER_COLUMNS]; |
1737 | | }; |
1738 | | |
1739 | | static void |
1740 | | server_row_destroy(struct server_row *row) |
1741 | 0 | { |
1742 | 0 | if (row) { |
1743 | 0 | for (size_t i = 0; i < N_SERVER_COLUMNS; i++) { |
1744 | 0 | ovsdb_datum_destroy(&row->data[i], &server_columns[i].type); |
1745 | 0 | } |
1746 | 0 | free(row); |
1747 | 0 | } |
1748 | 0 | } |
1749 | | |
1750 | | static struct server_row * |
1751 | | ovsdb_cs_find_server_row(struct ovsdb_cs *cs, const struct uuid *uuid) |
1752 | 0 | { |
1753 | 0 | struct server_row *row; |
1754 | 0 | HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) { |
1755 | 0 | if (uuid_equals(uuid, &row->uuid)) { |
1756 | 0 | return row; |
1757 | 0 | } |
1758 | 0 | } |
1759 | 0 | return NULL; |
1760 | 0 | } |
1761 | | |
1762 | | static void |
1763 | | ovsdb_cs_delete_server_row(struct ovsdb_cs *cs, struct server_row *row) |
1764 | 0 | { |
1765 | 0 | hmap_remove(&cs->server_rows, &row->hmap_node); |
1766 | 0 | server_row_destroy(row); |
1767 | 0 | } |
1768 | | |
1769 | | static struct server_row * |
1770 | | ovsdb_cs_insert_server_row(struct ovsdb_cs *cs, const struct uuid *uuid) |
1771 | 0 | { |
1772 | 0 | struct server_row *row = xmalloc(sizeof *row); |
1773 | 0 | hmap_insert(&cs->server_rows, &row->hmap_node, uuid_hash(uuid)); |
1774 | 0 | row->uuid = *uuid; |
1775 | 0 | for (size_t i = 0; i < N_SERVER_COLUMNS; i++) { |
1776 | 0 | ovsdb_datum_init_default(&row->data[i], &server_columns[i].type); |
1777 | 0 | } |
1778 | 0 | return row; |
1779 | 0 | } |
1780 | | |
1781 | | static void |
1782 | | ovsdb_cs_update_server_row(struct server_row *row, |
1783 | | const struct shash *update, bool xor) |
1784 | 0 | { |
1785 | 0 | for (size_t i = 0; i < N_SERVER_COLUMNS; i++) { |
1786 | 0 | const struct server_column *column = &server_columns[i]; |
1787 | 0 | struct shash_node *node = shash_find(update, column->name); |
1788 | 0 | if (!node) { |
1789 | 0 | continue; |
1790 | 0 | } |
1791 | 0 | const struct json *json = node->data; |
1792 | |
|
1793 | 0 | struct ovsdb_datum *old = &row->data[i]; |
1794 | 0 | struct ovsdb_datum new; |
1795 | 0 | if (!xor) { |
1796 | 0 | struct ovsdb_error *error = ovsdb_datum_from_json( |
1797 | 0 | &new, &column->type, json, NULL); |
1798 | 0 | if (error) { |
1799 | 0 | ovsdb_error_destroy(error); |
1800 | 0 | continue; |
1801 | 0 | } |
1802 | 0 | } else { |
1803 | 0 | struct ovsdb_datum diff; |
1804 | 0 | struct ovsdb_error *error = ovsdb_transient_datum_from_json( |
1805 | 0 | &diff, &column->type, json); |
1806 | 0 | if (error) { |
1807 | 0 | ovsdb_error_destroy(error); |
1808 | 0 | continue; |
1809 | 0 | } |
1810 | | |
1811 | 0 | error = ovsdb_datum_apply_diff(&new, old, &diff, &column->type); |
1812 | 0 | if (error) { |
1813 | 0 | ovsdb_error_destroy(error); |
1814 | 0 | ovsdb_datum_destroy(&new, &column->type); |
1815 | 0 | continue; |
1816 | 0 | } |
1817 | 0 | ovsdb_datum_destroy(&diff, &column->type); |
1818 | 0 | } |
1819 | | |
1820 | 0 | ovsdb_datum_destroy(&row->data[i], &column->type); |
1821 | 0 | row->data[i] = new; |
1822 | 0 | } |
1823 | 0 | } |
1824 | | |
1825 | | static void |
1826 | | ovsdb_cs_clear_server_rows(struct ovsdb_cs *cs) |
1827 | 0 | { |
1828 | 0 | struct server_row *row; |
1829 | 0 | HMAP_FOR_EACH_SAFE (row, hmap_node, &cs->server_rows) { |
1830 | 0 | ovsdb_cs_delete_server_row(cs, row); |
1831 | 0 | } |
1832 | 0 | } |
1833 | | |
1834 | | static void log_parse_update_error(struct ovsdb_error *); |
1835 | | |
1836 | | static void |
1837 | | ovsdb_cs_process_server_event(struct ovsdb_cs *cs, |
1838 | | const struct ovsdb_cs_event *event) |
1839 | 0 | { |
1840 | 0 | ovs_assert(event->type == OVSDB_CS_EVENT_TYPE_UPDATE); |
1841 | |
|
1842 | 0 | const struct ovsdb_cs_update_event *update = &event->update; |
1843 | 0 | struct ovsdb_cs_db_update *du; |
1844 | 0 | struct ovsdb_error *error = ovsdb_cs_parse_db_update( |
1845 | 0 | update->table_updates, update->version, &du); |
1846 | 0 | if (error) { |
1847 | 0 | log_parse_update_error(error); |
1848 | 0 | return; |
1849 | 0 | } |
1850 | | |
1851 | 0 | if (update->clear) { |
1852 | 0 | ovsdb_cs_clear_server_rows(cs); |
1853 | 0 | } |
1854 | |
|
1855 | 0 | const struct ovsdb_cs_table_update *tu = ovsdb_cs_db_update_find_table( |
1856 | 0 | du, "Database"); |
1857 | 0 | if (tu) { |
1858 | 0 | for (size_t i = 0; i < tu->n; i++) { |
1859 | 0 | const struct ovsdb_cs_row_update *ru = &tu->row_updates[i]; |
1860 | 0 | struct server_row *row |
1861 | 0 | = ovsdb_cs_find_server_row(cs, &ru->row_uuid); |
1862 | 0 | if (ru->type == OVSDB_CS_ROW_DELETE) { |
1863 | 0 | ovsdb_cs_delete_server_row(cs, row); |
1864 | 0 | } else { |
1865 | 0 | if (!row) { |
1866 | 0 | row = ovsdb_cs_insert_server_row(cs, &ru->row_uuid); |
1867 | 0 | } |
1868 | 0 | ovsdb_cs_update_server_row(row, ru->columns, |
1869 | 0 | ru->type == OVSDB_CS_ROW_XOR); |
1870 | 0 | } |
1871 | 0 | } |
1872 | 0 | } |
1873 | |
|
1874 | 0 | ovsdb_cs_db_update_destroy(du); |
1875 | 0 | } |
1876 | | |
1877 | | static const char * |
1878 | | server_column_get_string(const struct server_row *row, |
1879 | | enum server_column_index index, |
1880 | | const char *default_value) |
1881 | 0 | { |
1882 | 0 | ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_STRING); |
1883 | 0 | const struct ovsdb_datum *d = &row->data[index]; |
1884 | 0 | return d->n == 1 ? json_string(d->keys[0].s) : default_value; |
1885 | 0 | } |
1886 | | |
1887 | | static bool |
1888 | | server_column_get_bool(const struct server_row *row, |
1889 | | enum server_column_index index, |
1890 | | bool default_value) |
1891 | 0 | { |
1892 | 0 | ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_BOOLEAN); |
1893 | 0 | const struct ovsdb_datum *d = &row->data[index]; |
1894 | 0 | return d->n == 1 ? d->keys[0].boolean : default_value; |
1895 | 0 | } |
1896 | | |
1897 | | static uint64_t |
1898 | | server_column_get_int(const struct server_row *row, |
1899 | | enum server_column_index index, |
1900 | | uint64_t default_value) |
1901 | 0 | { |
1902 | 0 | ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_INTEGER); |
1903 | 0 | const struct ovsdb_datum *d = &row->data[index]; |
1904 | 0 | return d->n == 1 ? d->keys[0].integer : default_value; |
1905 | 0 | } |
1906 | | |
1907 | | static const struct uuid * |
1908 | | server_column_get_uuid(const struct server_row *row, |
1909 | | enum server_column_index index, |
1910 | | const struct uuid *default_value) |
1911 | 0 | { |
1912 | 0 | ovs_assert(server_columns[index].type.key.type == OVSDB_TYPE_UUID); |
1913 | 0 | const struct ovsdb_datum *d = &row->data[index]; |
1914 | 0 | return d->n == 1 ? &d->keys[0].uuid : default_value; |
1915 | 0 | } |
1916 | | |
1917 | | static const struct server_row * |
1918 | | ovsdb_find_server_row(struct ovsdb_cs *cs) |
1919 | 0 | { |
1920 | 0 | const struct server_row *row; |
1921 | 0 | HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) { |
1922 | 0 | const struct uuid *cid = server_column_get_uuid(row, COL_CID, NULL); |
1923 | 0 | const char *name = server_column_get_string(row, COL_NAME, NULL); |
1924 | 0 | if (uuid_is_zero(&cs->cid) |
1925 | 0 | ? (name && !strcmp(cs->data.db_name, name)) |
1926 | 0 | : (cid && uuid_equals(cid, &cs->cid))) { |
1927 | 0 | return row; |
1928 | 0 | } |
1929 | 0 | } |
1930 | 0 | return NULL; |
1931 | 0 | } |
1932 | | |
1933 | | static void OVS_UNUSED |
1934 | | ovsdb_log_server_rows(const struct ovsdb_cs *cs) |
1935 | 0 | { |
1936 | 0 | int row_num = 0; |
1937 | 0 | const struct server_row *row; |
1938 | 0 | HMAP_FOR_EACH (row, hmap_node, &cs->server_rows) { |
1939 | 0 | struct ds s = DS_EMPTY_INITIALIZER; |
1940 | 0 | for (size_t i = 0; i < N_SERVER_COLUMNS; i++) { |
1941 | 0 | ds_put_format(&s, " %s=", server_columns[i].name); |
1942 | 0 | if (i == COL_SCHEMA) { |
1943 | 0 | ds_put_format(&s, "..."); |
1944 | 0 | } else { |
1945 | 0 | ovsdb_datum_to_string(&row->data[i], &server_columns[i].type, |
1946 | 0 | &s); |
1947 | 0 | } |
1948 | 0 | } |
1949 | 0 | VLOG_INFO("row %d:%s", row_num++, ds_cstr(&s)); |
1950 | 0 | ds_destroy(&s); |
1951 | 0 | } |
1952 | 0 | } |
1953 | | |
1954 | | static bool |
1955 | | ovsdb_cs_check_server_db__(struct ovsdb_cs *cs) |
1956 | 0 | { |
1957 | 0 | struct ovsdb_cs_event *event; |
1958 | 0 | LIST_FOR_EACH_POP (event, list_node, &cs->server.events) { |
1959 | 0 | ovsdb_cs_process_server_event(cs, event); |
1960 | 0 | ovsdb_cs_event_destroy(event); |
1961 | 0 | } |
1962 | |
|
1963 | 0 | const struct server_row *db_row = ovsdb_find_server_row(cs); |
1964 | 0 | static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5); |
1965 | 0 | const char *server_name = jsonrpc_session_get_name(cs->session); |
1966 | 0 | if (!db_row) { |
1967 | 0 | VLOG_INFO_RL(&rl, "%s: server does not have %s database", |
1968 | 0 | server_name, cs->data.db_name); |
1969 | 0 | return false; |
1970 | 0 | } |
1971 | | |
1972 | 0 | bool ok = false; |
1973 | 0 | const char *model = server_column_get_string(db_row, COL_MODEL, ""); |
1974 | 0 | const char *schema = server_column_get_string(db_row, COL_SCHEMA, NULL); |
1975 | 0 | bool connected = server_column_get_bool(db_row, COL_CONNECTED, false); |
1976 | 0 | if (!strcmp(model, "clustered")) { |
1977 | 0 | bool leader = server_column_get_bool(db_row, COL_LEADER, false); |
1978 | 0 | uint64_t index = server_column_get_int(db_row, COL_INDEX, 0); |
1979 | |
|
1980 | 0 | if (!schema) { |
1981 | 0 | VLOG_INFO("%s: clustered database server has not yet joined " |
1982 | 0 | "cluster; trying another server", server_name); |
1983 | 0 | } else if (!connected) { |
1984 | 0 | VLOG_INFO("%s: clustered database server is disconnected " |
1985 | 0 | "from cluster; trying another server", server_name); |
1986 | 0 | } else if (cs->leader_only && !leader) { |
1987 | 0 | VLOG_INFO("%s: clustered database server is not cluster " |
1988 | 0 | "leader; trying another server", server_name); |
1989 | 0 | } else if (index < cs->min_index) { |
1990 | 0 | VLOG_WARN("%s: clustered database server has stale data; " |
1991 | 0 | "trying another server", server_name); |
1992 | 0 | } else { |
1993 | 0 | cs->min_index = index; |
1994 | 0 | ok = true; |
1995 | 0 | } |
1996 | 0 | } else if (!strcmp(model, "relay")) { |
1997 | 0 | if (!schema) { |
1998 | 0 | VLOG_INFO("%s: relay database server has not yet connected to the " |
1999 | 0 | "relay source; trying another server", server_name); |
2000 | 0 | } else if (!connected) { |
2001 | 0 | VLOG_INFO("%s: relay database server is disconnected from the " |
2002 | 0 | "relay source; trying another server", server_name); |
2003 | 0 | } else if (cs->leader_only) { |
2004 | 0 | VLOG_INFO("%s: relay database server cannot be a leader; " |
2005 | 0 | "trying another server", server_name); |
2006 | 0 | } else { |
2007 | 0 | ok = true; |
2008 | 0 | } |
2009 | 0 | } else { |
2010 | 0 | if (!schema) { |
2011 | 0 | VLOG_INFO("%s: missing database schema", server_name); |
2012 | 0 | } else { |
2013 | 0 | ok = true; |
2014 | 0 | } |
2015 | 0 | } |
2016 | 0 | if (!ok) { |
2017 | 0 | return false; |
2018 | 0 | } |
2019 | | |
2020 | 0 | if (cs->state == CS_S_SERVER_MONITOR_REQUESTED) { |
2021 | 0 | json_destroy(cs->data.schema); |
2022 | 0 | cs->data.schema = json_from_string(schema); |
2023 | 0 | if (cs->data.max_version >= 3) { |
2024 | 0 | ovsdb_cs_send_monitor_request(cs, &cs->data, 3); |
2025 | 0 | ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_SINCE_REQUESTED); |
2026 | 0 | } else if (cs->data.max_version >= 2) { |
2027 | 0 | ovsdb_cs_send_monitor_request(cs, &cs->data, 2); |
2028 | 0 | ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED); |
2029 | 0 | } else { |
2030 | 0 | ovsdb_cs_send_monitor_request(cs, &cs->data, 1); |
2031 | 0 | ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED); |
2032 | 0 | } |
2033 | 0 | } |
2034 | 0 | return true; |
2035 | 0 | } |
2036 | | |
2037 | | static bool |
2038 | | ovsdb_cs_check_server_db(struct ovsdb_cs *cs) |
2039 | 0 | { |
2040 | 0 | bool ok = ovsdb_cs_check_server_db__(cs); |
2041 | 0 | if (!ok) { |
2042 | 0 | ovsdb_cs_retry(cs); |
2043 | 0 | } |
2044 | 0 | return ok; |
2045 | 0 | } |
2046 | | |
2047 | | static struct json * |
2048 | | ovsdb_cs_compose_server_monitor_request(const struct json *schema_json, |
2049 | | void *cs_) |
2050 | 0 | { |
2051 | 0 | struct ovsdb_cs *cs = cs_; |
2052 | 0 | struct shash *schema = ovsdb_cs_parse_schema(schema_json); |
2053 | 0 | struct json *monitor_requests = json_object_create(); |
2054 | |
|
2055 | 0 | const char *table_name = "Database"; |
2056 | 0 | const struct sset *table_schema |
2057 | 0 | = schema ? shash_find_data(schema, table_name) : NULL; |
2058 | 0 | if (!table_schema) { |
2059 | 0 | VLOG_WARN("%s database lacks %s table " |
2060 | 0 | "(database needs upgrade?)", |
2061 | 0 | cs->server.db_name, table_name); |
2062 | | /* XXX return failure? */ |
2063 | 0 | } else { |
2064 | 0 | struct json *columns = json_array_create_empty(); |
2065 | 0 | for (size_t j = 0; j < N_SERVER_COLUMNS; j++) { |
2066 | 0 | const struct server_column *column = &server_columns[j]; |
2067 | 0 | bool db_has_column = (table_schema && |
2068 | 0 | sset_contains(table_schema, column->name)); |
2069 | 0 | if (table_schema && !db_has_column) { |
2070 | 0 | VLOG_WARN("%s table in %s database lacks %s column " |
2071 | 0 | "(database needs upgrade?)", |
2072 | 0 | table_name, cs->server.db_name, column->name); |
2073 | 0 | continue; |
2074 | 0 | } |
2075 | 0 | json_array_add(columns, json_string_create(column->name)); |
2076 | 0 | } |
2077 | |
|
2078 | 0 | struct json *monitor_request = json_object_create(); |
2079 | 0 | json_object_put(monitor_request, "columns", columns); |
2080 | 0 | json_object_put(monitor_requests, table_name, |
2081 | 0 | json_array_create_1(monitor_request)); |
2082 | 0 | } |
2083 | 0 | ovsdb_cs_free_schema(schema); |
2084 | |
|
2085 | 0 | return monitor_requests; |
2086 | 0 | } |
2087 | | |
2088 | | static const struct ovsdb_cs_ops ovsdb_cs_server_ops = { |
2089 | | ovsdb_cs_compose_server_monitor_request |
2090 | | }; |
2091 | | |
2092 | | static void |
2093 | | log_error(struct ovsdb_error *error) |
2094 | 0 | { |
2095 | 0 | char *s = ovsdb_error_to_string_free(error); |
2096 | 0 | VLOG_WARN("error parsing database schema: %s", s); |
2097 | 0 | free(s); |
2098 | 0 | } |
2099 | | |
2100 | | /* Parses 'schema_json', an OVSDB schema in JSON format as described in RFC |
2101 | | * 7047, to obtain the names of its rows and columns. If successful, returns |
2102 | | * an shash whose keys are table names and whose values are ssets, where each |
2103 | | * sset contains the names of its table's columns. On failure (due to a parse |
2104 | | * error), returns NULL. |
2105 | | * |
2106 | | * It would also be possible to use the general-purpose OVSDB schema parser in |
2107 | | * ovsdb-server, but that's overkill, possibly too strict for the current use |
2108 | | * case, and would require restructuring ovsdb-server to separate the schema |
2109 | | * code from the rest. */ |
2110 | | struct shash * |
2111 | | ovsdb_cs_parse_schema(const struct json *schema_json) |
2112 | 0 | { |
2113 | 0 | struct ovsdb_parser parser; |
2114 | 0 | const struct json *tables_json; |
2115 | 0 | struct ovsdb_error *error; |
2116 | 0 | struct shash_node *node; |
2117 | 0 | struct shash *schema; |
2118 | |
|
2119 | 0 | ovsdb_parser_init(&parser, schema_json, "database schema"); |
2120 | 0 | tables_json = ovsdb_parser_member(&parser, "tables", OP_OBJECT); |
2121 | 0 | error = ovsdb_parser_destroy(&parser); |
2122 | 0 | if (error) { |
2123 | 0 | log_error(error); |
2124 | 0 | return NULL; |
2125 | 0 | } |
2126 | | |
2127 | 0 | schema = xmalloc(sizeof *schema); |
2128 | 0 | shash_init(schema); |
2129 | 0 | SHASH_FOR_EACH (node, json_object(tables_json)) { |
2130 | 0 | const char *table_name = node->name; |
2131 | 0 | const struct json *json = node->data; |
2132 | 0 | const struct json *columns_json; |
2133 | |
|
2134 | 0 | ovsdb_parser_init(&parser, json, "table schema for table %s", |
2135 | 0 | table_name); |
2136 | 0 | columns_json = ovsdb_parser_member(&parser, "columns", OP_OBJECT); |
2137 | 0 | error = ovsdb_parser_destroy(&parser); |
2138 | 0 | if (error) { |
2139 | 0 | log_error(error); |
2140 | 0 | ovsdb_cs_free_schema(schema); |
2141 | 0 | return NULL; |
2142 | 0 | } |
2143 | | |
2144 | 0 | struct sset *columns = xmalloc(sizeof *columns); |
2145 | 0 | sset_init(columns); |
2146 | |
|
2147 | 0 | struct shash_node *node2; |
2148 | 0 | SHASH_FOR_EACH (node2, json_object(columns_json)) { |
2149 | 0 | const char *column_name = node2->name; |
2150 | 0 | sset_add(columns, column_name); |
2151 | 0 | } |
2152 | 0 | shash_add(schema, table_name, columns); |
2153 | 0 | } |
2154 | 0 | return schema; |
2155 | 0 | } |
2156 | | |
2157 | | /* Frees 'schema', which is in the format returned by |
2158 | | * ovsdb_cs_parse_schema(). */ |
2159 | | void |
2160 | | ovsdb_cs_free_schema(struct shash *schema) |
2161 | 0 | { |
2162 | 0 | if (schema) { |
2163 | 0 | struct shash_node *node; |
2164 | |
|
2165 | 0 | SHASH_FOR_EACH_SAFE (node, schema) { |
2166 | 0 | struct sset *sset = node->data; |
2167 | 0 | sset_destroy(sset); |
2168 | 0 | free(sset); |
2169 | 0 | shash_delete(schema, node); |
2170 | 0 | } |
2171 | 0 | shash_destroy(schema); |
2172 | 0 | free(schema); |
2173 | 0 | } |
2174 | 0 | } |
2175 | | |
2176 | | static struct ovsdb_error * OVS_WARN_UNUSED_RESULT |
2177 | | ovsdb_cs_parse_row_update1(const struct json *in, |
2178 | | struct ovsdb_cs_row_update *out) |
2179 | 0 | { |
2180 | 0 | const struct json *old_json, *new_json; |
2181 | |
|
2182 | 0 | old_json = shash_find_data(json_object(in), "old"); |
2183 | 0 | new_json = shash_find_data(json_object(in), "new"); |
2184 | 0 | if (old_json && old_json->type != JSON_OBJECT) { |
2185 | 0 | return ovsdb_syntax_error(old_json, NULL, |
2186 | 0 | "\"old\" <row> is not object"); |
2187 | 0 | } else if (new_json && new_json->type != JSON_OBJECT) { |
2188 | 0 | return ovsdb_syntax_error(new_json, NULL, |
2189 | 0 | "\"new\" <row> is not object"); |
2190 | 0 | } else if ((old_json != NULL) + (new_json != NULL) |
2191 | 0 | != shash_count(json_object(in))) { |
2192 | 0 | return ovsdb_syntax_error(in, NULL, |
2193 | 0 | "<row-update> contains " |
2194 | 0 | "unexpected member"); |
2195 | 0 | } else if (!old_json && !new_json) { |
2196 | 0 | return ovsdb_syntax_error(in, NULL, |
2197 | 0 | "<row-update> missing \"old\" " |
2198 | 0 | "and \"new\" members"); |
2199 | 0 | } |
2200 | | |
2201 | 0 | if (!new_json) { |
2202 | 0 | out->type = OVSDB_CS_ROW_DELETE; |
2203 | 0 | out->columns = json_object(old_json); |
2204 | 0 | } else if (!old_json) { |
2205 | 0 | out->type = OVSDB_CS_ROW_INSERT; |
2206 | 0 | out->columns = json_object(new_json); |
2207 | 0 | } else { |
2208 | 0 | out->type = OVSDB_CS_ROW_UPDATE; |
2209 | 0 | out->columns = json_object(new_json); |
2210 | 0 | } |
2211 | 0 | return NULL; |
2212 | 0 | } |
2213 | | |
2214 | | static struct ovsdb_error * OVS_WARN_UNUSED_RESULT |
2215 | | ovsdb_cs_parse_row_update2(const struct json *in, |
2216 | | struct ovsdb_cs_row_update *out) |
2217 | 0 | { |
2218 | 0 | const struct shash *object = json_object(in); |
2219 | 0 | if (shash_count(object) != 1) { |
2220 | 0 | return ovsdb_syntax_error( |
2221 | 0 | in, NULL, "<row-update2> has %"PRIuSIZE" members " |
2222 | 0 | "instead of expected 1", shash_count(object)); |
2223 | 0 | } |
2224 | | |
2225 | 0 | struct shash_node *node = shash_first(object); |
2226 | 0 | const struct json *columns = node->data; |
2227 | 0 | if (!strcmp(node->name, "insert") || !strcmp(node->name, "initial")) { |
2228 | 0 | out->type = OVSDB_CS_ROW_INSERT; |
2229 | 0 | } else if (!strcmp(node->name, "modify")) { |
2230 | 0 | out->type = OVSDB_CS_ROW_XOR; |
2231 | 0 | } else if (!strcmp(node->name, "delete")) { |
2232 | 0 | out->type = OVSDB_CS_ROW_DELETE; |
2233 | 0 | if (columns->type != JSON_NULL) { |
2234 | 0 | return ovsdb_syntax_error( |
2235 | 0 | in, NULL, |
2236 | 0 | "<row-update2> delete operation has unexpected value"); |
2237 | 0 | } |
2238 | 0 | return NULL; |
2239 | 0 | } else { |
2240 | 0 | return ovsdb_syntax_error(in, NULL, |
2241 | 0 | "<row-update2> has unknown member \"%s\"", |
2242 | 0 | node->name); |
2243 | 0 | } |
2244 | | |
2245 | 0 | if (columns->type != JSON_OBJECT) { |
2246 | 0 | return ovsdb_syntax_error( |
2247 | 0 | in, NULL, |
2248 | 0 | "<row-update2> \"%s\" operation has unexpected value", |
2249 | 0 | node->name); |
2250 | 0 | } |
2251 | 0 | out->columns = json_object(columns); |
2252 | |
|
2253 | 0 | return NULL; |
2254 | 0 | } |
2255 | | |
2256 | | static struct ovsdb_error * OVS_WARN_UNUSED_RESULT |
2257 | | ovsdb_cs_parse_row_update(const char *table_name, |
2258 | | const struct json *in, int version, |
2259 | | struct ovsdb_cs_row_update *out) |
2260 | 0 | { |
2261 | 0 | if (in->type != JSON_OBJECT) { |
2262 | 0 | const char *suffix = version > 1 ? "2" : ""; |
2263 | 0 | return ovsdb_syntax_error( |
2264 | 0 | in, NULL, |
2265 | 0 | "<table-update%s> for table \"%s\" contains <row-update%s> " |
2266 | 0 | "that is not an object", |
2267 | 0 | suffix, table_name, suffix); |
2268 | 0 | } |
2269 | | |
2270 | 0 | return (version == 1 |
2271 | 0 | ? ovsdb_cs_parse_row_update1(in, out) |
2272 | 0 | : ovsdb_cs_parse_row_update2(in, out)); |
2273 | 0 | } |
2274 | | |
2275 | | static struct ovsdb_error * OVS_WARN_UNUSED_RESULT |
2276 | | ovsdb_cs_parse_table_update(const char *table_name, |
2277 | | const struct json *in, int version, |
2278 | | struct ovsdb_cs_table_update *out) |
2279 | 0 | { |
2280 | 0 | const char *suffix = version > 1 ? "2" : ""; |
2281 | |
|
2282 | 0 | if (in->type != JSON_OBJECT) { |
2283 | 0 | return ovsdb_syntax_error( |
2284 | 0 | in, NULL, "<table-update%s> for table \"%s\" is not an object", |
2285 | 0 | suffix, table_name); |
2286 | 0 | } |
2287 | 0 | struct shash *in_rows = json_object(in); |
2288 | |
|
2289 | 0 | out->row_updates = xmalloc(shash_count(in_rows) * sizeof *out->row_updates); |
2290 | |
|
2291 | 0 | const struct shash_node *node; |
2292 | 0 | SHASH_FOR_EACH (node, in_rows) { |
2293 | 0 | const char *row_uuid_string = node->name; |
2294 | 0 | struct uuid row_uuid; |
2295 | 0 | if (!uuid_from_string(&row_uuid, row_uuid_string)) { |
2296 | 0 | return ovsdb_syntax_error( |
2297 | 0 | in, NULL, |
2298 | 0 | "<table-update%s> for table \"%s\" contains " |
2299 | 0 | "bad UUID \"%s\" as member name", |
2300 | 0 | suffix, table_name, row_uuid_string); |
2301 | 0 | } |
2302 | | |
2303 | 0 | const struct json *in_ru = node->data; |
2304 | 0 | struct ovsdb_cs_row_update *out_ru = &out->row_updates[out->n++]; |
2305 | 0 | *out_ru = (struct ovsdb_cs_row_update) { .row_uuid = row_uuid }; |
2306 | |
|
2307 | 0 | struct ovsdb_error *error = ovsdb_cs_parse_row_update( |
2308 | 0 | table_name, in_ru, version, out_ru); |
2309 | 0 | if (error) { |
2310 | 0 | return error; |
2311 | 0 | } |
2312 | 0 | } |
2313 | | |
2314 | 0 | return NULL; |
2315 | 0 | } |
2316 | | |
2317 | | /* Parses OVSDB <table-updates> or <table-updates2> object 'in' into '*outp'. |
2318 | | * If successful, sets '*outp' to the new object and returns NULL. On failure, |
2319 | | * returns the error and sets '*outp' to NULL. |
2320 | | * |
2321 | | * On success, the caller must eventually free '*outp', with |
2322 | | * ovsdb_cs_db_update_destroy(). |
2323 | | * |
2324 | | * 'version' should be 1 if 'in' is a <table-updates>, 2 or 3 if it is a |
2325 | | * <table-updates2>. */ |
2326 | | struct ovsdb_error * OVS_WARN_UNUSED_RESULT |
2327 | | ovsdb_cs_parse_db_update(const struct json *in, int version, |
2328 | | struct ovsdb_cs_db_update **outp) |
2329 | 0 | { |
2330 | 0 | const char *suffix = version > 1 ? "2" : ""; |
2331 | |
|
2332 | 0 | *outp = NULL; |
2333 | 0 | if (in->type != JSON_OBJECT) { |
2334 | 0 | return ovsdb_syntax_error(in, NULL, |
2335 | 0 | "<table-updates%s> is not an object", suffix); |
2336 | 0 | } |
2337 | | |
2338 | 0 | struct ovsdb_cs_db_update *out = xzalloc(sizeof *out); |
2339 | 0 | out->table_updates = xmalloc(shash_count(json_object(in)) |
2340 | 0 | * sizeof *out->table_updates); |
2341 | 0 | const struct shash_node *node; |
2342 | 0 | SHASH_FOR_EACH (node, json_object(in)) { |
2343 | 0 | const char *table_name = node->name; |
2344 | 0 | const struct json *in_tu = node->data; |
2345 | |
|
2346 | 0 | struct ovsdb_cs_table_update *out_tu = &out->table_updates[out->n++]; |
2347 | 0 | *out_tu = (struct ovsdb_cs_table_update) { .table_name = table_name }; |
2348 | |
|
2349 | 0 | struct ovsdb_error *error = ovsdb_cs_parse_table_update( |
2350 | 0 | table_name, in_tu, version, out_tu); |
2351 | 0 | if (error) { |
2352 | 0 | ovsdb_cs_db_update_destroy(out); |
2353 | 0 | return error; |
2354 | 0 | } |
2355 | 0 | } |
2356 | | |
2357 | 0 | *outp = out; |
2358 | 0 | return NULL; |
2359 | 0 | } |
2360 | | |
2361 | | /* Frees 'du', which was presumably allocated by ovsdb_cs_parse_db_update(). */ |
2362 | | void |
2363 | | ovsdb_cs_db_update_destroy(struct ovsdb_cs_db_update *du) |
2364 | 0 | { |
2365 | 0 | if (!du) { |
2366 | 0 | return; |
2367 | 0 | } |
2368 | | |
2369 | 0 | for (size_t i = 0; i < du->n; i++) { |
2370 | 0 | struct ovsdb_cs_table_update *tu = &du->table_updates[i]; |
2371 | 0 | free(tu->row_updates); |
2372 | 0 | } |
2373 | 0 | free(du->table_updates); |
2374 | 0 | free(du); |
2375 | 0 | } |
2376 | | |
2377 | | const struct ovsdb_cs_table_update * |
2378 | | ovsdb_cs_db_update_find_table(const struct ovsdb_cs_db_update *du, |
2379 | | const char *table_name) |
2380 | 0 | { |
2381 | 0 | for (size_t i = 0; i < du->n; i++) { |
2382 | 0 | const struct ovsdb_cs_table_update *tu = &du->table_updates[i]; |
2383 | 0 | if (!strcmp(tu->table_name, table_name)) { |
2384 | 0 | return tu; |
2385 | 0 | } |
2386 | 0 | } |
2387 | 0 | return NULL; |
2388 | 0 | } |
2389 | | |