/src/tarantool/src/box/iproto.cc
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file. |
3 | | * |
4 | | * Redistribution and use in source and binary forms, with or |
5 | | * without modification, are permitted provided that the following |
6 | | * conditions are met: |
7 | | * |
8 | | * 1. Redistributions of source code must retain the above |
9 | | * copyright notice, this list of conditions and the |
10 | | * following disclaimer. |
11 | | * |
12 | | * 2. Redistributions in binary form must reproduce the above |
13 | | * copyright notice, this list of conditions and the following |
14 | | * disclaimer in the documentation and/or other materials |
15 | | * provided with the distribution. |
16 | | * |
17 | | * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND |
18 | | * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED |
19 | | * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
20 | | * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL |
21 | | * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, |
22 | | * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
23 | | * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
24 | | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR |
25 | | * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
26 | | * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
27 | | * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF |
28 | | * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF |
29 | | * SUCH DAMAGE. |
30 | | */ |
31 | | #include "iproto.h" |
32 | | #include <string.h> |
33 | | #include <stdarg.h> |
34 | | #include <stdio.h> |
35 | | #include <fcntl.h> |
36 | | #include <ctype.h> |
37 | | |
38 | | #include <msgpuck.h> |
39 | | #include <small/ibuf.h> |
40 | | #include <small/obuf.h> |
41 | | #include <base64.h> |
42 | | |
43 | | #include "version.h" |
44 | | #include "event.h" |
45 | | #include "func_adapter.h" |
46 | | #include "fiber.h" |
47 | | #include "fiber_cond.h" |
48 | | #include "cbus.h" |
49 | | #include "say.h" |
50 | | #include "sio.h" |
51 | | #include "evio.h" |
52 | | #include "iostream.h" |
53 | | #include "scoped_guard.h" |
54 | | #include "memory.h" |
55 | | #include "random.h" |
56 | | |
57 | | #include "bind.h" |
58 | | #include "port.h" |
59 | | #include "box.h" |
60 | | #include "call.h" |
61 | | #include "tuple_convert.h" |
62 | | #include "session.h" |
63 | | #include "xrow.h" |
64 | | #include "schema.h" /* schema_version */ |
65 | | #include "replication.h" /* instance_uuid */ |
66 | | #include "iproto_constants.h" |
67 | | #include "iproto_features.h" |
68 | | #include "rmean.h" |
69 | | #include "execute.h" |
70 | | #include "errinj.h" |
71 | | #include "tt_static.h" |
72 | | #include "trivia/util.h" |
73 | | #include "salad/stailq.h" |
74 | | #include "txn.h" |
75 | | #include "on_shutdown.h" |
76 | | #include "flightrec.h" |
77 | | #include "security.h" |
78 | | #include "watcher.h" |
79 | | #include "box/mp_box_ctx.h" |
80 | | #include "box/tuple.h" |
81 | | #include "mpstream/mpstream.h" |
82 | | |
83 | | enum { |
84 | | IPROTO_PACKET_SIZE_MAX = 2UL * 1024 * 1024 * 1024, |
85 | | }; |
86 | | |
87 | | enum { |
88 | | ENDPOINT_NAME_MAX = 10 |
89 | | }; |
90 | | |
91 | | struct iproto_connection; |
92 | | struct iproto_msg; |
93 | | |
94 | | struct iproto_stream { |
95 | | /** Currently active stream transaction or NULL */ |
96 | | struct txn *txn; |
97 | | /** |
98 | | * Queue of pending requests (iproto messages) for this stream, |
99 | | * processed sequentially. This field is accesable only from |
100 | | * iproto thread. Queue items has iproto_msg type. |
101 | | */ |
102 | | struct stailq pending_requests; |
103 | | /** Id of this stream, used as a key in streams hash table */ |
104 | | uint64_t id; |
105 | | /** This stream connection */ |
106 | | struct iproto_connection *connection; |
107 | | /** |
108 | | * Pre-allocated disconnect msg to gracefully rollback stream |
109 | | * transaction and destroy stream object. |
110 | | */ |
111 | | struct cmsg on_disconnect; |
112 | | /** |
113 | | * Message currently being processed in the tx thread. |
114 | | * This field is accesable only from iproto thread. |
115 | | */ |
116 | | struct iproto_msg *current; |
117 | | }; |
118 | | |
119 | | /** |
120 | | * A position in connection output buffer. |
121 | | * Since we use rotating buffers to recycle memory, |
122 | | * it includes not only a position in obuf, but also |
123 | | * a pointer to obuf the position is for. |
124 | | */ |
125 | | struct iproto_wpos { |
126 | | struct obuf *obuf; |
127 | | struct obuf_svp svp; |
128 | | }; |
129 | | |
130 | | static void |
131 | | iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out) |
132 | 0 | { |
133 | 0 | wpos->obuf = out; |
134 | 0 | wpos->svp = obuf_create_svp(out); |
135 | 0 | } |
136 | | |
137 | | /** |
138 | | * Message sent when iproto thread dropped all connections that requested |
139 | | * to be dropped. |
140 | | */ |
141 | | struct iproto_drop_finished { |
142 | | /** Base structure. */ |
143 | | struct cmsg base; |
144 | | /** |
145 | | * Generation a is a sequence number of iproto_drop_connections() |
146 | | * invocation. |
147 | | * |
148 | | * Generation is used to handle racy situation when previous invocation |
149 | | * of iproto_drop_connections() was failed and there is new invocation. |
150 | | * Message from previous invocation may be delivired and account |
151 | | * iproto thread as finished dropping connection which is not true. |
152 | | */ |
153 | | unsigned generation; |
154 | | }; |
155 | | |
156 | | struct iproto_thread { |
157 | | /** |
158 | | * Slab cache used for allocating memory for output network buffers |
159 | | * in the tx thread. |
160 | | */ |
161 | | struct slab_cache net_slabc; |
162 | | /** |
163 | | * Network thread execution unit. |
164 | | */ |
165 | | struct cord net_cord; |
166 | | /** |
167 | | * A single global queue for all requests in all connections. All |
168 | | * requests from all connections are processed concurrently. |
169 | | * Is also used as a queue for just established connections and to |
170 | | * execute disconnect triggers. A few notes about these triggers: |
171 | | * - they need to be run in a fiber |
172 | | * - unlike an ordinary request failure, on_connect trigger |
173 | | * failure must lead to connection close. |
174 | | * - on_connect trigger must be processed before any other |
175 | | * request on this connection. |
176 | | */ |
177 | | struct cpipe tx_pipe; |
178 | | struct cpipe net_pipe; |
179 | | /** |
180 | | * Static routes for this iproto thread |
181 | | */ |
182 | | struct cmsg_hop begin_route[2]; |
183 | | struct cmsg_hop commit_route[2]; |
184 | | struct cmsg_hop rollback_route[2]; |
185 | | struct cmsg_hop rollback_on_disconnect_route[2]; |
186 | | struct cmsg_hop destroy_route[2]; |
187 | | struct cmsg_hop disconnect_route[2]; |
188 | | struct cmsg_hop misc_route[2]; |
189 | | struct cmsg_hop call_route[2]; |
190 | | struct cmsg_hop select_route[2]; |
191 | | struct cmsg_hop process1_route[2]; |
192 | | struct cmsg_hop sql_route[2]; |
193 | | struct cmsg_hop join_route[2]; |
194 | | struct cmsg_hop subscribe_route[2]; |
195 | | struct cmsg_hop error_route[2]; |
196 | | struct cmsg_hop push_route[2]; |
197 | | struct cmsg_hop *dml_route[IPROTO_TYPE_STAT_MAX]; |
198 | | struct cmsg_hop connect_route[2]; |
199 | | struct cmsg_hop override_route[2]; |
200 | | /* |
201 | | * Set of overridden request handlers. Used by IPROTO thread to skip |
202 | | * request preprocessing and use the 'override' route. |
203 | | */ |
204 | | mh_i32_t *req_handlers; |
205 | | /* |
206 | | * Iproto thread memory pools |
207 | | */ |
208 | | struct mempool iproto_msg_pool; |
209 | | struct mempool iproto_connection_pool; |
210 | | struct mempool iproto_stream_pool; |
211 | | /* |
212 | | * List of stopped connections |
213 | | */ |
214 | | struct rlist stopped_connections; |
215 | | /* |
216 | | * Iproto thread stat |
217 | | */ |
218 | | struct rmean *rmean; |
219 | | /* |
220 | | * Iproto thread id |
221 | | */ |
222 | | uint32_t id; |
223 | | /** Array of iproto binary listeners */ |
224 | | struct evio_service binary; |
225 | | /** Requests count currently pending in stream queue. */ |
226 | | size_t requests_in_stream_queue; |
227 | | /** List of all connections. */ |
228 | | struct rlist connections; |
229 | | /** Number of connections that pending drop. */ |
230 | | size_t drop_pending_connection_count; |
231 | | /** |
232 | | * Message used to notify TX thread that all connections marked |
233 | | * to de dropped are dropped. |
234 | | */ |
235 | | struct iproto_drop_finished drop_finished_msg; |
236 | | /** |
237 | | * If set then iproto thread shutdown is started and we should not |
238 | | * accept new connections. |
239 | | */ |
240 | | bool is_shutting_down; |
241 | | /** |
242 | | * The following fields are used exclusively by the tx thread. |
243 | | * Align them to prevent false-sharing. |
244 | | */ |
245 | | struct { |
246 | | alignas(CACHELINE_SIZE) |
247 | | /** Request count currently processed by tx thread. */ |
248 | | size_t requests_in_progress; |
249 | | /** Iproto thread stat collected in tx thread. */ |
250 | | struct rmean *rmean; |
251 | | } tx; |
252 | | }; |
253 | | |
254 | | /** Condition for drop finished. */ |
255 | | static struct fiber_cond drop_finished_cond; |
256 | | /** Count of iproto threads that are not finished connections drop yet. */ |
257 | | static size_t drop_pending_thread_count; |
258 | | /** |
259 | | * Generation is a sequence number of dropping connection invocation. |
260 | | * |
261 | | * See also `struct iproto_drop_finished`. |
262 | | */ |
263 | | static unsigned drop_generation; |
264 | | |
265 | | /** |
266 | | * IPROTO listen URIs. Set by box.cfg.listen. |
267 | | */ |
268 | | static struct uri_set iproto_uris; |
269 | | |
270 | | static struct iproto_thread *iproto_threads; |
271 | | int iproto_threads_count; |
272 | | /** |
273 | | * This binary contains all bind socket properties, like |
274 | | * address the iproto listens for. Is kept in TX to be |
275 | | * shown in box.info. It should be global, because it contains |
276 | | * properties, and should be accessible from differnent functions |
277 | | * in tx thread. |
278 | | */ |
279 | | static struct evio_service tx_binary; |
280 | | |
281 | | /** |
282 | | * In Greek mythology, Kharon is the ferryman who carries souls |
283 | | * of the newly deceased across the river Styx that divided the |
284 | | * world of the living from the world of the dead. Here Kharon is |
285 | | * a cbus message and does similar work. It notifies the iproto |
286 | | * thread about new data in a connection output buffer and carries |
287 | | * back to tx thread the position in the output buffer which has |
288 | | * been successfully flushed to the socket. Styx here is cpipe, |
289 | | * and the boat is cbus message. |
290 | | */ |
291 | | struct iproto_kharon { |
292 | | struct cmsg base; |
293 | | /** |
294 | | * Tx thread sets wpos to the current position in the |
295 | | * output buffer and sends the message to iproto thread. |
296 | | * Iproto returns the message to tx after setting wpos |
297 | | * to the last flushed position (similarly to |
298 | | * iproto_msg.wpos). |
299 | | */ |
300 | | struct iproto_wpos wpos; |
301 | | }; |
302 | | |
303 | | /** |
304 | | * Network readahead. A signed integer to avoid |
305 | | * automatic type coercion to an unsigned type. |
306 | | * We assign it without locks in txn thread and |
307 | | * use in iproto thread -- it's OK that |
308 | | * readahead has a stale value while until the thread |
309 | | * caches have synchronized, after all, it's used |
310 | | * in new connections only. |
311 | | * |
312 | | * Notice that the default is not a strict power of two. |
313 | | * slab metadata takes some space, and we want |
314 | | * allocation steps to be correlated to slab buddy |
315 | | * sizes, so when we ask slab cache for 16320 bytes, |
316 | | * we get a slab of size 16384, not 32768. |
317 | | */ |
318 | | unsigned iproto_readahead = 16320; |
319 | | |
320 | | /* The maximal number of iproto messages in fly. */ |
321 | | static int iproto_msg_max = IPROTO_MSG_MAX_MIN; |
322 | | |
323 | | /** |
324 | | * Request handlers meta information. The IPROTO request of each type can be |
325 | | * overridden by the following types of handlers (listed in priority order): |
326 | | * 1. Lua handlers, set in the event registry by request type id; |
327 | | * 2. Lua handlers, set in the event registry by request type name; |
328 | | * 3. C handler, set by `iproto_override()'. |
329 | | */ |
330 | | struct iproto_req_handlers { |
331 | | /** |
332 | | * Triggers from the event registry, set by request type id. |
333 | | * NULL if no such triggers. |
334 | | */ |
335 | | struct event *event_by_id; |
336 | | /** |
337 | | * Triggers from the event registry, set by request type name. |
338 | | * NULL if no such triggers. |
339 | | */ |
340 | | struct event *event_by_name; |
341 | | /** |
342 | | * C request handler. |
343 | | */ |
344 | | struct { |
345 | | /** C request handler. NULL if not set. */ |
346 | | iproto_handler_t cb; |
347 | | /** C request handler destructor, can be NULL. */ |
348 | | iproto_handler_destroy_t destroy; |
349 | | /** Context passed to the handler and destructor. */ |
350 | | void *ctx; |
351 | | } c; |
352 | | }; |
353 | | |
354 | | /** |
355 | | * Request handler table used in TX thread for processing requests with |
356 | | * overridden handlers. |
357 | | */ |
358 | | static mh_i32ptr_t *tx_req_handlers; |
359 | | |
360 | | /** |
361 | | * If set then iproto shutdown is started and we should not accept new |
362 | | * connections. |
363 | | */ |
364 | | static bool iproto_is_shutting_down; |
365 | | |
366 | | /** Available iproto configuration changes. */ |
367 | | enum iproto_cfg_op { |
368 | | /** Command code to set max input for iproto thread */ |
369 | | IPROTO_CFG_MSG_MAX, |
370 | | /** |
371 | | * Command code to start listen socket contained |
372 | | * in evio_service object |
373 | | */ |
374 | | IPROTO_CFG_START, |
375 | | /** |
376 | | * Command code to stop listen socket contained |
377 | | * in evio_service object. In case when user sets |
378 | | * new parameters for iproto, it is necessary to stop |
379 | | * listen sockets in iproto threads before reconfiguration. |
380 | | */ |
381 | | IPROTO_CFG_STOP, |
382 | | /** |
383 | | * Equivalent to IPROTO_CFG_STOP followed by IPROTO_CFG_START. |
384 | | */ |
385 | | IPROTO_CFG_RESTART, |
386 | | /** |
387 | | * Command code do get statistic from iproto thread |
388 | | */ |
389 | | IPROTO_CFG_STAT, |
390 | | /** |
391 | | * Command code to notify IPROTO threads a new handler has been set or |
392 | | * reset. |
393 | | */ |
394 | | IPROTO_CFG_OVERRIDE, |
395 | | /** |
396 | | * Command code to create a new IPROTO session. |
397 | | */ |
398 | | IPROTO_CFG_SESSION_NEW, |
399 | | /** |
400 | | * Command code to drop all current connections. |
401 | | */ |
402 | | IPROTO_CFG_DROP_CONNECTIONS, |
403 | | IPROTO_CFG_SHUTDOWN, |
404 | | }; |
405 | | |
406 | | /** |
407 | | * Since there is no way to "synchronously" change the |
408 | | * state of the io thread, to change the listen port or max |
409 | | * message count in flight send a special message to iproto |
410 | | * thread. |
411 | | */ |
412 | | struct iproto_cfg_msg: public cbus_call_msg |
413 | | { |
414 | | /** Operation to execute in iproto thread. */ |
415 | | enum iproto_cfg_op op; |
416 | | union { |
417 | | /** Pointer to the statistic structure. */ |
418 | | struct iproto_stats *stats; |
419 | | /** New iproto max message count. */ |
420 | | int iproto_msg_max; |
421 | | struct { |
422 | | /** New connection IO stream. */ |
423 | | struct iostream io; |
424 | | /** New connection session. */ |
425 | | struct session *session; |
426 | | } session_new; |
427 | | struct { |
428 | | /** Overridden request type. */ |
429 | | uint32_t req_type; |
430 | | /** Whether the request handler is set or reset. */ |
431 | | bool is_set; |
432 | | } override; |
433 | | struct { |
434 | | /** |
435 | | * Connection that executing iproto_drop_connections. |
436 | | * NULL if the function is called not from connection. |
437 | | */ |
438 | | struct iproto_connection *owner; |
439 | | /** |
440 | | * Generation is sequence number of dropping |
441 | | * connection invocation. |
442 | | * |
443 | | * See also `struct iproto_drop_finished`. |
444 | | */ |
445 | | unsigned generation; |
446 | | } drop_connections; |
447 | | }; |
448 | | struct iproto_thread *iproto_thread; |
449 | | }; |
450 | | |
451 | | static inline void |
452 | | iproto_cfg_msg_create(struct iproto_cfg_msg *msg, enum iproto_cfg_op op) |
453 | 0 | { |
454 | 0 | memset(msg, 0, sizeof(*msg)); |
455 | 0 | msg->op = op; |
456 | 0 | } |
457 | | |
458 | | /** |
459 | | * Sends a configuration message to an IPROTO thread and waits for completion. |
460 | | * |
461 | | * The message may be allocated on stack. |
462 | | */ |
463 | | static void |
464 | | iproto_do_cfg(struct iproto_thread *iproto_thread, struct iproto_cfg_msg *msg); |
465 | | |
466 | | int |
467 | | iproto_addr_count(void) |
468 | 0 | { |
469 | 0 | return evio_service_count(&tx_binary); |
470 | 0 | } |
471 | | |
472 | | const char * |
473 | | iproto_addr_str(char *buf, int idx) |
474 | 0 | { |
475 | 0 | socklen_t size; |
476 | 0 | const struct sockaddr *addr = evio_service_addr(&tx_binary, idx, &size); |
477 | 0 | sio_addr_snprintf(buf, SERVICE_NAME_MAXLEN, addr, size); |
478 | 0 | return buf; |
479 | 0 | } |
480 | | |
481 | | /** |
482 | | * How big is a buffer which needs to be shrunk before |
483 | | * it is put back into buffer cache. |
484 | | */ |
485 | | static inline unsigned |
486 | | iproto_max_input_size(void) |
487 | 0 | { |
488 | 0 | return 18 * iproto_readahead; |
489 | 0 | } |
490 | | |
491 | | void |
492 | | iproto_reset_input(struct ibuf *ibuf) |
493 | 0 | { |
494 | | /* |
495 | | * If we happen to have fully processed the input, |
496 | | * move the pos to the start of the input buffer. |
497 | | */ |
498 | 0 | assert(ibuf_used(ibuf) == 0); |
499 | 0 | if (ibuf_capacity(ibuf) < iproto_max_input_size()) { |
500 | 0 | ibuf_reset(ibuf); |
501 | 0 | } else { |
502 | 0 | struct slab_cache *slabc = ibuf->slabc; |
503 | 0 | ibuf_destroy(ibuf); |
504 | 0 | ibuf_create(ibuf, slabc, iproto_readahead); |
505 | 0 | } |
506 | 0 | } |
507 | | |
508 | | /* {{{ iproto_msg - declaration */ |
509 | | |
510 | | /** |
511 | | * A single msg from io thread. All requests |
512 | | * from all connections are queued into a single queue |
513 | | * and processed in FIFO order. |
514 | | */ |
515 | | struct iproto_msg |
516 | | { |
517 | | struct cmsg base; |
518 | | struct iproto_connection *connection; |
519 | | |
520 | | /* --- Box msgs - actual requests for the transaction processor --- */ |
521 | | /* Request message code and sync. */ |
522 | | struct xrow_header header; |
523 | | union { |
524 | | /** Connect. */ |
525 | | struct { |
526 | | union { |
527 | | /** Peer address. */ |
528 | | struct sockaddr addr; |
529 | | /** Peer address storage. */ |
530 | | struct sockaddr_storage addrstorage; |
531 | | }; |
532 | | /** Peer address size. */ |
533 | | socklen_t addrlen; |
534 | | /** |
535 | | * Session to use for the new connection. |
536 | | * Optional. If omitted, a new session object |
537 | | * will be created in the TX thread. |
538 | | */ |
539 | | struct session *session; |
540 | | } connect; |
541 | | /** Box request, if this is a DML */ |
542 | | struct request dml; |
543 | | /** Box request, if this is a call or eval. */ |
544 | | struct call_request call; |
545 | | /** Watch request. */ |
546 | | struct watch_request watch; |
547 | | /** Authentication request. */ |
548 | | struct auth_request auth; |
549 | | /** Features request. */ |
550 | | struct id_request id; |
551 | | /** SQL request, if this is the EXECUTE/PREPARE request. */ |
552 | | struct sql_request sql; |
553 | | /** BEGIN request */ |
554 | | struct begin_request begin; |
555 | | /** COMMIT request */ |
556 | | struct commit_request commit; |
557 | | /** In case of iproto parse error, saved diagnostics. */ |
558 | | struct diag diag; |
559 | | }; |
560 | | /** |
561 | | * Input buffer which stores the request data. It can be |
562 | | * discarded only when the message returns to iproto thread. |
563 | | */ |
564 | | struct ibuf *p_ibuf; |
565 | | /** |
566 | | * How much space the request takes in the |
567 | | * input buffer (len, header and body - all of it) |
568 | | * This also works as a reference counter to |
569 | | * ibuf object. |
570 | | */ |
571 | | size_t len; |
572 | | /** |
573 | | * Pointer to the start of unparsed request stored in @a p_ibuf. |
574 | | * It is used to dump request to flight recorder (if available) in |
575 | | * TX thread. It is guaranteed that @a reqstart points to the valid |
576 | | * position: rpos of input buffer is moved after processing the message; |
577 | | * meanwhile requests are handled in the order they are stored in |
578 | | * the buffer. |
579 | | */ |
580 | | const char *reqstart; |
581 | | /** |
582 | | * Position in the connection output buffer. When sending a |
583 | | * message to the tx thread, iproto sets it to its current |
584 | | * flush position so that tx can reuse a buffer that has been |
585 | | * flushed. The tx thread, in turn, sets it to the end of the |
586 | | * data it has just written, to let iproto know that there is |
587 | | * more output to flush. |
588 | | */ |
589 | | struct iproto_wpos wpos; |
590 | | /** |
591 | | * Message sent by the tx thread to notify iproto that input has |
592 | | * been processed and can be discarded before request completion. |
593 | | * Used by long (yielding) CALL/EVAL requests. |
594 | | */ |
595 | | struct cmsg discard_input; |
596 | | /** |
597 | | * Used in "connect" msgs, true if connect trigger failed |
598 | | * and the connection must be closed. |
599 | | */ |
600 | | bool close_connection; |
601 | | /** |
602 | | * A stailq_entry to hold message in stream. |
603 | | * All messages processed in stream sequently. Before processing |
604 | | * all messages added to queue of pending requests. If this queue |
605 | | * was empty message begins to be processed, otherwise it waits until |
606 | | * all previous messages are processed. |
607 | | */ |
608 | | struct stailq_entry in_stream; |
609 | | /** Stream that owns this message, or NULL. */ |
610 | | struct iproto_stream *stream; |
611 | | /** Link in connection->tx.inprogress. */ |
612 | | struct rlist in_inprogress; |
613 | | /** TX thread fiber that processing this message. */ |
614 | | struct fiber *fiber; |
615 | | }; |
616 | | |
617 | | /** |
618 | | * Resume stopped connections, if any. |
619 | | */ |
620 | | static void |
621 | | iproto_resume(struct iproto_thread *iproto_thread); |
622 | | |
623 | | /** |
624 | | * Prepares IPROTO message: decodes the message header, checks the message's |
625 | | * stream identifier, and set's the message's cbus route. |
626 | | */ |
627 | | static void |
628 | | iproto_msg_prepare(struct iproto_msg *msg, const char **pos, |
629 | | const char *reqend); |
630 | | |
631 | | enum rmean_net_name { |
632 | | IPROTO_SENT, |
633 | | IPROTO_RECEIVED, |
634 | | IPROTO_CONNECTIONS, |
635 | | IPROTO_REQUESTS, |
636 | | IPROTO_STREAMS, |
637 | | REQUESTS_IN_STREAM_QUEUE, |
638 | | RMEAN_NET_LAST, |
639 | | }; |
640 | | |
641 | | const char *rmean_net_strings[RMEAN_NET_LAST] = { |
642 | | "SENT", |
643 | | "RECEIVED", |
644 | | "CONNECTIONS", |
645 | | "REQUESTS", |
646 | | "STREAMS", |
647 | | "REQUESTS_IN_STREAM_QUEUE", |
648 | | }; |
649 | | |
650 | | enum rmean_tx_name { |
651 | | REQUESTS_IN_PROGRESS, |
652 | | RMEAN_TX_LAST, |
653 | | }; |
654 | | |
655 | | const char *rmean_tx_strings[RMEAN_TX_LAST] = { |
656 | | "REQUESTS_IN_PROGRESS", |
657 | | }; |
658 | | |
659 | | static void |
660 | | tx_process_destroy(struct cmsg *m); |
661 | | |
662 | | static void |
663 | | net_finish_destroy(struct cmsg *m); |
664 | | |
665 | | /** Fire on_disconnect triggers in the tx thread. */ |
666 | | static void |
667 | | tx_process_disconnect(struct cmsg *m); |
668 | | |
669 | | /** Send destroy message to tx thread. */ |
670 | | static void |
671 | | net_finish_disconnect(struct cmsg *m); |
672 | | |
673 | | /** |
674 | | * Kharon is in the dead world (iproto). Schedule an event to |
675 | | * flush new obuf as reflected in the fresh wpos. |
676 | | * @param m Kharon. |
677 | | */ |
678 | | static void |
679 | | iproto_process_push(struct cmsg *m); |
680 | | |
681 | | /** |
682 | | * Kharon returns to the living world (tx) back from the dead one |
683 | | * (iproto). Check if a new push is pending and make a new trip |
684 | | * to iproto if necessary. |
685 | | * @param m Kharon. |
686 | | */ |
687 | | static void |
688 | | tx_end_push(struct cmsg *m); |
689 | | |
690 | | /* }}} */ |
691 | | |
692 | | /* {{{ iproto_connection - declaration and definition */ |
693 | | |
694 | | /** Connection life cycle stages. */ |
695 | | enum iproto_connection_state { |
696 | | /** |
697 | | * A connection is always alive in the beginning because |
698 | | * takes an already active socket in a constructor. |
699 | | */ |
700 | | IPROTO_CONNECTION_ALIVE, |
701 | | /** |
702 | | * Socket was closed, a notification is sent to the TX |
703 | | * thread to close the session. |
704 | | */ |
705 | | IPROTO_CONNECTION_CLOSED, |
706 | | /** |
707 | | * TX thread was notified about close, but some requests |
708 | | * are still not finished. That state may be skipped in |
709 | | * case the connection was already idle (not having |
710 | | * unfinished requests) at the moment of closing. |
711 | | */ |
712 | | IPROTO_CONNECTION_PENDING_DESTROY, |
713 | | /** |
714 | | * All requests are finished, a destroy request is sent to |
715 | | * the TX thread. |
716 | | */ |
717 | | IPROTO_CONNECTION_DESTROYED, |
718 | | }; |
719 | | |
720 | | /** |
721 | | * Context of a single client connection. |
722 | | * Interaction scheme: |
723 | | * |
724 | | * Receive from the network. |
725 | | * | |
726 | | * +---|---------------------+ +------------+ |
727 | | * | | iproto thread | | tx thread | |
728 | | * | v | | | |
729 | | * | ibuf[0]- - - - - - - - -|- -|- - >+ | |
730 | | * | | | | | |
731 | | * | ibuf[1] | | | | |
732 | | * | | | | | |
733 | | * | obuf[0] <- - - - - - - -|- -|- - -+ | |
734 | | * | | | | | | |
735 | | * | | obuf[1] <- - -|- -|- - -+ | |
736 | | * +----|-----------|--------+ +------------+ |
737 | | * | v |
738 | | * | Send to |
739 | | * | network. |
740 | | * v |
741 | | * Send to network after obuf[1], i.e. older responses are sent first. |
742 | | * |
743 | | * ibuf structure: |
744 | | * rpos wpos end |
745 | | * +-------------------|----------------|-------------+ |
746 | | * \________/\________/ \________/\____/ |
747 | | * \ msg msg / msg parse |
748 | | * \______________/ size |
749 | | * response is sent, |
750 | | * messages are |
751 | | * discarded |
752 | | */ |
753 | | struct iproto_connection |
754 | | { |
755 | | /** |
756 | | * Two rotating buffers for input. Input is first read into |
757 | | * ibuf[0]. As soon as it buffer becomes full, the buffers are |
758 | | * rotated. When all input buffers are used up, the input |
759 | | * is suspended. The buffer becomes available for use |
760 | | * again when tx thread completes processing the messages |
761 | | * stored in the buffer. |
762 | | */ |
763 | | struct ibuf ibuf[2]; |
764 | | /** Pointer to the current buffer. */ |
765 | | struct ibuf *p_ibuf; |
766 | | /** |
767 | | * Number of not yet processed messages in the corresponding |
768 | | * input buffer. |
769 | | */ |
770 | | size_t input_msg_count[2]; |
771 | | /** |
772 | | * Two rotating buffers for output. The tx thread switches to |
773 | | * another buffer if it finds it to be empty (flushed out). |
774 | | * This guarantees that memory gets recycled as soon as output |
775 | | * is flushed by the iproto thread. |
776 | | */ |
777 | | struct obuf obuf[2]; |
778 | | /** |
779 | | * Position in the output buffer that points to the beginning |
780 | | * of the data awaiting to be flushed. Advanced by the iproto |
781 | | * thread upon successfull flush. |
782 | | */ |
783 | | struct iproto_wpos wpos; |
784 | | /** |
785 | | * Position in the output buffer that points to the end of the |
786 | | * data awaiting to be flushed. Advanced by the iproto thread |
787 | | * upon receiving a message from the tx thread telling that more |
788 | | * output is available (see iproto_msg::wpos). |
789 | | */ |
790 | | struct iproto_wpos wend; |
791 | | /* |
792 | | * Size of readahead which is not parsed yet, i.e. size of |
793 | | * a piece of request which is not fully read. Is always |
794 | | * relative to ibuf.wpos. In other words, ibuf.wpos - |
795 | | * parse_size gives the start of the unparsed request. |
796 | | * A size rather than a pointer is used to be safe in case |
797 | | * ibuf.buf is reallocated. Being relative to ibuf.wpos, |
798 | | * rather than to ibuf.rpos is helpful to make sure |
799 | | * ibuf_reserve() or buffers rotation don't make the value |
800 | | * meaningless. |
801 | | */ |
802 | | size_t parse_size; |
803 | | /** |
804 | | * Nubmer of active long polling requests that have already |
805 | | * discarded their arguments in order not to stall other |
806 | | * connections. |
807 | | */ |
808 | | int long_poll_count; |
809 | | /** I/O stream used for communication with the client. */ |
810 | | struct iostream io; |
811 | | struct ev_io input; |
812 | | struct ev_io output; |
813 | | /** Logical session. */ |
814 | | struct session *session; |
815 | | ev_loop *loop; |
816 | | /** |
817 | | * Pre-allocated disconnect msg. Is sent right after |
818 | | * actual disconnect has happened. Does not destroy the |
819 | | * connection. Used to notify existing requests about the |
820 | | * occasion. |
821 | | */ |
822 | | struct cmsg disconnect_msg; |
823 | | /** |
824 | | * Pre-allocated destroy msg. Is sent after disconnect has |
825 | | * happened and a last request has finished. Firstly |
826 | | * destroys tx-related resources and then deletes the |
827 | | * connection. |
828 | | */ |
829 | | struct cmsg destroy_msg; |
830 | | /** |
831 | | * Connection state. Mainly it is used to determine when |
832 | | * the connection can be destroyed, and for debug purposes |
833 | | * to assert on a double destroy, for example. |
834 | | */ |
835 | | enum iproto_connection_state state; |
836 | | struct rlist in_stop_list; |
837 | | /** |
838 | | * Flag indicates, that client sent SHUT_RDWR or connection |
839 | | * is closed from client side. When it is set to false, we |
840 | | * should not write to the socket. |
841 | | */ |
842 | | bool can_write; |
843 | | /** |
844 | | * Hash table that holds all streams for this connection. |
845 | | * This field is accesable only from iproto thread. |
846 | | */ |
847 | | struct mh_i64ptr_t *streams; |
848 | | /** |
849 | | * Kharon is used to implement box.session.push(). |
850 | | * When a new push is ready, tx uses kharon to notify |
851 | | * iproto about new data in connection output buffer. |
852 | | * |
853 | | * Kharon can not be in two places at the time. When |
854 | | * kharon leaves tx, is_push_sent is set to true. After |
855 | | * that new pushes can not use it. Instead, they set |
856 | | * is_push_pending flag. When Kharon is back to tx it |
857 | | * clears is_push_sent, checks is_push_pending and departs |
858 | | * immediately back to iproto if it is set. |
859 | | * |
860 | | * This design makes it easy to use a single message per |
861 | | * connection for pushes while new pushes do not wait for |
862 | | * the message to become available. |
863 | | * |
864 | | * iproto tx |
865 | | * ------------------------------------------------------- |
866 | | * + [push message] |
867 | | * <--- notification ---- |
868 | | * + [push message] |
869 | | * [feed event] |
870 | | * --- kharon travels back ----> |
871 | | * [write to socket] |
872 | | * + [push message] |
873 | | * [new push found] |
874 | | * <--- notification ---- |
875 | | * [write ends] |
876 | | * ... |
877 | | */ |
878 | | struct iproto_kharon kharon; |
879 | | /** |
880 | | * The following fields are used exclusively by the tx thread. |
881 | | * Align them to prevent false-sharing. |
882 | | */ |
883 | | struct { |
884 | | alignas(CACHELINE_SIZE) |
885 | | /** Pointer to the current output buffer. */ |
886 | | struct obuf *p_obuf; |
887 | | /** True if Kharon is in use/travelling. */ |
888 | | bool is_push_sent; |
889 | | /** |
890 | | * True if new pushes are waiting for Kharon |
891 | | * return. |
892 | | */ |
893 | | bool is_push_pending; |
894 | | /** List of inprogress messages. */ |
895 | | struct rlist inprogress; |
896 | | } tx; |
897 | | /** Authentication salt. */ |
898 | | char salt[IPROTO_SALT_SIZE]; |
899 | | /** Iproto connection thread */ |
900 | | struct iproto_thread *iproto_thread; |
901 | | /** |
902 | | * The connection is processing replication command so that |
903 | | * IO is handled by relay code. |
904 | | */ |
905 | | bool is_in_replication; |
906 | | /** Link in iproto_thread->connections. */ |
907 | | struct rlist in_connections; |
908 | | /** Set if connection is being dropped. */ |
909 | | bool is_drop_pending; |
910 | | /** |
911 | | * Generation is sequence number of dropping connection invocation. |
912 | | * |
913 | | * See also `struct iproto_drop_finished`. |
914 | | */ |
915 | | unsigned drop_generation; |
916 | | /** |
917 | | * Messaged sent to TX to cancel all inprogress requests of the |
918 | | * connection. |
919 | | */ |
920 | | struct cmsg cancel_msg; |
921 | | /** Set if connection is accepted in TX. */ |
922 | | bool is_established; |
923 | | }; |
924 | | |
925 | | /** Returns a string suitable for logging. */ |
926 | | static inline const char * |
927 | | iproto_connection_name(const struct iproto_connection *con) |
928 | 0 | { |
929 | 0 | return sio_socketname(con->io.fd); |
930 | 0 | } |
931 | | |
932 | | #ifdef NDEBUG |
933 | | #define iproto_write_error(io, e, schema_version, sync) \ |
934 | | iproto_do_write_error(io, e, schema_version, sync); |
935 | | #else |
936 | 0 | #define iproto_write_error(io, e, schema_version, sync) do { \ |
937 | 0 | int fd = (io)->fd; \ |
938 | 0 | int flags = fcntl(fd, F_GETFL, 0); \ |
939 | 0 | if (flags >= 0) \ |
940 | 0 | fcntl(fd, F_SETFL, flags & (~O_NONBLOCK)); \ |
941 | 0 | iproto_do_write_error(io, e, schema_version, sync); \ |
942 | 0 | if (flags >= 0) \ |
943 | 0 | fcntl(fd, F_SETFL, flags); \ |
944 | 0 | } while (0); |
945 | | #endif |
946 | | |
947 | | static struct iproto_stream * |
948 | | iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id) |
949 | 0 | { |
950 | 0 | struct iproto_thread *iproto_thread = connection->iproto_thread; |
951 | 0 | struct iproto_stream *stream = (struct iproto_stream *) |
952 | 0 | xmempool_alloc(&iproto_thread->iproto_stream_pool); |
953 | 0 | rmean_collect(connection->iproto_thread->rmean, IPROTO_STREAMS, 1); |
954 | 0 | stream->txn = NULL; |
955 | 0 | stream->current = NULL; |
956 | 0 | stailq_create(&stream->pending_requests); |
957 | 0 | stream->id = stream_id; |
958 | 0 | stream->connection = connection; |
959 | 0 | return stream; |
960 | 0 | } |
961 | | |
962 | | static inline void |
963 | | iproto_stream_rollback_on_disconnect(struct iproto_stream *stream) |
964 | 0 | { |
965 | 0 | struct iproto_connection *conn = stream->connection; |
966 | 0 | struct iproto_thread *iproto_thread = conn->iproto_thread; |
967 | 0 | struct cmsg_hop *route = |
968 | 0 | iproto_thread->rollback_on_disconnect_route; |
969 | 0 | cmsg_init(&stream->on_disconnect, route); |
970 | 0 | cpipe_push(&iproto_thread->tx_pipe, &stream->on_disconnect); |
971 | 0 | } |
972 | | |
973 | | /** |
974 | | * Return true if we have not enough spare messages |
975 | | * in the message pool. |
976 | | */ |
977 | | static inline bool |
978 | | iproto_check_msg_max(struct iproto_thread *iproto_thread) |
979 | 0 | { |
980 | 0 | size_t request_count = mempool_count(&iproto_thread->iproto_msg_pool); |
981 | 0 | return request_count > (size_t) iproto_msg_max; |
982 | 0 | } |
983 | | |
984 | | static inline void |
985 | | iproto_msg_delete(struct iproto_msg *msg) |
986 | 0 | { |
987 | 0 | struct iproto_thread *iproto_thread = msg->connection->iproto_thread; |
988 | 0 | mempool_free(&msg->connection->iproto_thread->iproto_msg_pool, msg); |
989 | 0 | iproto_resume(iproto_thread); |
990 | 0 | } |
991 | | |
992 | | static void |
993 | | iproto_stream_delete(struct iproto_stream *stream) |
994 | 0 | { |
995 | 0 | assert(stream->current == NULL); |
996 | 0 | assert(stailq_empty(&stream->pending_requests)); |
997 | 0 | assert(stream->txn == NULL); |
998 | 0 | mempool_free(&stream->connection->iproto_thread->iproto_stream_pool, stream); |
999 | 0 | } |
1000 | | |
1001 | | static struct iproto_msg * |
1002 | | iproto_msg_new(struct iproto_connection *con) |
1003 | 0 | { |
1004 | 0 | struct mempool *iproto_msg_pool = &con->iproto_thread->iproto_msg_pool; |
1005 | 0 | struct iproto_msg *msg = |
1006 | 0 | (struct iproto_msg *)xmempool_alloc(iproto_msg_pool); |
1007 | 0 | msg->close_connection = false; |
1008 | 0 | msg->connection = con; |
1009 | 0 | msg->stream = NULL; |
1010 | 0 | msg->fiber = NULL; |
1011 | 0 | rmean_collect(con->iproto_thread->rmean, IPROTO_REQUESTS, 1); |
1012 | 0 | return msg; |
1013 | 0 | } |
1014 | | |
1015 | | /** |
1016 | | * Signal input unless it's blocked on I/O or stopped. |
1017 | | */ |
1018 | | static inline void |
1019 | | iproto_connection_feed_input(struct iproto_connection *con) |
1020 | 0 | { |
1021 | 0 | assert(con->state == IPROTO_CONNECTION_ALIVE); |
1022 | 0 | if (!ev_is_active(&con->input) && rlist_empty(&con->in_stop_list)) |
1023 | 0 | ev_feed_event(con->loop, &con->input, EV_CUSTOM); |
1024 | 0 | } |
1025 | | |
1026 | | /** |
1027 | | * Signal output unless it's blocked on I/O. |
1028 | | */ |
1029 | | static inline void |
1030 | | iproto_connection_feed_output(struct iproto_connection *con) |
1031 | 0 | { |
1032 | 0 | assert(con->state == IPROTO_CONNECTION_ALIVE); |
1033 | 0 | if (!ev_is_active(&con->output)) |
1034 | 0 | ev_feed_event(con->loop, &con->output, EV_CUSTOM); |
1035 | 0 | } |
1036 | | |
1037 | | /** |
1038 | | * A connection is idle when the client is gone |
1039 | | * and there are no outstanding msgs in the msg queue. |
1040 | | * An idle connection can be safely garbage collected. |
1041 | | * |
1042 | | * ibuf_size() provides an effective reference counter |
1043 | | * on connection use in the tx request queue. Any request |
1044 | | * in the request queue has a non-zero len, and ibuf_size() |
1045 | | * is therefore non-zero as long as there is at least |
1046 | | * one request in the tx queue. |
1047 | | */ |
1048 | | static inline bool |
1049 | | iproto_connection_is_idle(struct iproto_connection *con) |
1050 | 0 | { |
1051 | | /* |
1052 | | * The check for 'mh_size (streams) == 0' was added, because it is |
1053 | | * possible that when disconnect occurs, there is active transaction |
1054 | | * in stream after processing all messages. In this case we send |
1055 | | * special message to rollback it, and without this check we would |
1056 | | * immediately send special message to destroy connection. This would |
1057 | | * not lead to error now, since the messages are processed strictly |
1058 | | * sequentially and rollback does not yield, but it is not safely and |
1059 | | * if we add some more complex logic, it may lead to difficulty catching |
1060 | | * errors in the future. |
1061 | | */ |
1062 | 0 | return con->long_poll_count == 0 && |
1063 | 0 | mh_size(con->streams) == 0 && |
1064 | 0 | ibuf_used(&con->ibuf[0]) == 0 && |
1065 | 0 | ibuf_used(&con->ibuf[1]) == 0; |
1066 | 0 | } |
1067 | | |
1068 | | /** |
1069 | | * Stop input when readahead limit is reached. When |
1070 | | * we process some messages *on this connection*, the input can be |
1071 | | * resumed. |
1072 | | */ |
1073 | | static inline void |
1074 | | iproto_connection_stop_readahead_limit(struct iproto_connection *con) |
1075 | 0 | { |
1076 | 0 | say_warn_ratelimited("stopping input on connection %s, " |
1077 | 0 | "readahead limit is reached", |
1078 | 0 | iproto_connection_name(con)); |
1079 | 0 | assert(rlist_empty(&con->in_stop_list)); |
1080 | 0 | ev_io_stop(con->loop, &con->input); |
1081 | 0 | } |
1082 | | |
1083 | | static inline void |
1084 | | iproto_connection_stop_msg_max_limit(struct iproto_connection *con) |
1085 | 0 | { |
1086 | 0 | assert(rlist_empty(&con->in_stop_list)); |
1087 | | |
1088 | 0 | say_warn_ratelimited("stopping input on connection %s, " |
1089 | 0 | "net_msg_max limit is reached", |
1090 | 0 | iproto_connection_name(con)); |
1091 | 0 | ev_io_stop(con->loop, &con->input); |
1092 | | /* |
1093 | | * Important to add to tail and fetch from head to ensure |
1094 | | * strict lifo order (fairness) for stopped connections. |
1095 | | */ |
1096 | 0 | rlist_add_tail(&con->iproto_thread->stopped_connections, |
1097 | 0 | &con->in_stop_list); |
1098 | 0 | } |
1099 | | |
1100 | | /** |
1101 | | * Send a destroy message to TX thread in case all requests are |
1102 | | * finished. |
1103 | | */ |
1104 | | static inline void |
1105 | | iproto_connection_try_to_start_destroy(struct iproto_connection *con) |
1106 | 0 | { |
1107 | 0 | assert(con->state == IPROTO_CONNECTION_CLOSED || |
1108 | 0 | con->state == IPROTO_CONNECTION_PENDING_DESTROY); |
1109 | 0 | if (!iproto_connection_is_idle(con)) { |
1110 | | /* |
1111 | | * Not all requests are finished. Let the last |
1112 | | * finished request destroy the connection. |
1113 | | */ |
1114 | 0 | con->state = IPROTO_CONNECTION_PENDING_DESTROY; |
1115 | 0 | return; |
1116 | 0 | } |
1117 | | /* |
1118 | | * If the connection has no outstanding requests in the |
1119 | | * input buffer, then no one (e.g. tx thread) is referring |
1120 | | * to it, so it must be destroyed. Firstly queue a msg to |
1121 | | * destroy the session and other resources owned by TX |
1122 | | * thread. When it is done, iproto thread will destroy |
1123 | | * other parts of the connection. |
1124 | | */ |
1125 | 0 | con->state = IPROTO_CONNECTION_DESTROYED; |
1126 | 0 | cpipe_push(&con->iproto_thread->tx_pipe, &con->destroy_msg); |
1127 | 0 | } |
1128 | | |
1129 | | /** |
1130 | | * Initiate a connection shutdown. This method may |
1131 | | * be invoked many times, and does the internal |
1132 | | * bookkeeping to only cleanup resources once. |
1133 | | */ |
1134 | | static inline void |
1135 | | iproto_connection_close(struct iproto_connection *con) |
1136 | 0 | { |
1137 | 0 | if (con->state == IPROTO_CONNECTION_ALIVE) { |
1138 | | /* Clears all pending events. */ |
1139 | 0 | ev_io_stop(con->loop, &con->input); |
1140 | 0 | ev_io_stop(con->loop, &con->output); |
1141 | | /* |
1142 | | * Invalidate fd to prevent undefined behavior in case |
1143 | | * we mistakenly try to use it after this point. |
1144 | | */ |
1145 | 0 | con->input.fd = con->output.fd = -1; |
1146 | 0 | iostream_close(&con->io); |
1147 | | /* |
1148 | | * Discard unparsed data, to recycle the |
1149 | | * connection in net_send_msg() as soon as all |
1150 | | * parsed data is processed. It's important this |
1151 | | * is done only once. |
1152 | | */ |
1153 | 0 | ibuf_discard(con->p_ibuf, con->parse_size); |
1154 | 0 | con->parse_size = 0; |
1155 | 0 | mh_int_t node; |
1156 | 0 | mh_foreach(con->streams, node) { |
1157 | 0 | struct iproto_stream *stream = (struct iproto_stream *) |
1158 | 0 | mh_i64ptr_node(con->streams, node)->val; |
1159 | | /** |
1160 | | * If stream->current == NULL and stream requests |
1161 | | * queue is empty, it means that there is some active |
1162 | | * transaction which was not commited yet. We need to |
1163 | | * rollback it, since we push on_disconnect message |
1164 | | * to tx thread here. Otherwise we destroy stream in |
1165 | | * `net_send_msg` after processing all requests. |
1166 | | */ |
1167 | 0 | if (stream->current == NULL && |
1168 | 0 | stailq_empty(&stream->pending_requests)) |
1169 | 0 | iproto_stream_rollback_on_disconnect(stream); |
1170 | 0 | } |
1171 | 0 | cpipe_push(&con->iproto_thread->tx_pipe, &con->disconnect_msg); |
1172 | 0 | assert(con->state == IPROTO_CONNECTION_ALIVE); |
1173 | 0 | con->state = IPROTO_CONNECTION_CLOSED; |
1174 | 0 | } else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) { |
1175 | 0 | iproto_connection_try_to_start_destroy(con); |
1176 | 0 | } else { |
1177 | 0 | assert(con->state == IPROTO_CONNECTION_CLOSED); |
1178 | 0 | } |
1179 | 0 | rlist_del(&con->in_stop_list); |
1180 | 0 | } |
1181 | | |
1182 | | static inline struct ibuf * |
1183 | | iproto_connection_next_input(struct iproto_connection *con) |
1184 | 0 | { |
1185 | 0 | return &con->ibuf[con->p_ibuf == &con->ibuf[0]]; |
1186 | 0 | } |
1187 | | |
1188 | | /** |
1189 | | * If there is no space for reading input, we can do one of the |
1190 | | * following: |
1191 | | * - try to get a new ibuf, so that it can fit the request. |
1192 | | * Always getting a new input buffer when there is no space |
1193 | | * makes the instance susceptible to input-flood attacks. |
1194 | | * Therefore, at most 2 ibufs are used in a single connection, |
1195 | | * one is "open", receiving input, and the other is closed, |
1196 | | * waiting for flushing output from a corresponding obuf. |
1197 | | * - stop input and wait until the client reads piled up output, |
1198 | | * so the input buffer can be reused. This complements |
1199 | | * the previous strategy. It is only safe to stop input if it |
1200 | | * is known that there is output. In this case input event |
1201 | | * flow will be resumed when all replies to previous requests |
1202 | | * are sent. Since there are two buffers, the input is only |
1203 | | * stopped when both of them are fully used up. |
1204 | | * |
1205 | | * To make this strategy work, each ibuf in use must fit at least |
1206 | | * one request. Otherwise, both obufs may end up having no data to |
1207 | | * flush, while current ibuf is too small to fit a big incoming |
1208 | | * request. |
1209 | | */ |
1210 | | static struct ibuf * |
1211 | | iproto_connection_input_buffer(struct iproto_connection *con) |
1212 | 0 | { |
1213 | 0 | struct ibuf *old_ibuf = con->p_ibuf; |
1214 | |
|
1215 | 0 | size_t to_read = 3; /* Smallest possible valid request. */ |
1216 | | |
1217 | | /* The type code is checked in iproto_enqueue_batch() */ |
1218 | 0 | if (con->parse_size) { |
1219 | 0 | const char *pos = old_ibuf->wpos - con->parse_size; |
1220 | 0 | if (mp_check_uint(pos, old_ibuf->wpos) <= 0) |
1221 | 0 | to_read = mp_decode_uint(&pos); |
1222 | 0 | } |
1223 | |
|
1224 | 0 | if (ibuf_unused(old_ibuf) >= to_read) { |
1225 | | /* |
1226 | | * If all read data is discarded, move read |
1227 | | * position to the start of the buffer, to |
1228 | | * reduce chances of unaccounted growth of the |
1229 | | * buffer as read position is shifted to the |
1230 | | * end of the buffer. |
1231 | | */ |
1232 | 0 | if (ibuf_used(old_ibuf) == 0) |
1233 | 0 | ibuf_reset(old_ibuf); |
1234 | 0 | return old_ibuf; |
1235 | 0 | } |
1236 | | |
1237 | | /* |
1238 | | * Reuse the buffer if all requests are processed |
1239 | | * (in only has unparsed content). |
1240 | | */ |
1241 | 0 | if (ibuf_used(old_ibuf) == con->parse_size) { |
1242 | 0 | xibuf_reserve(old_ibuf, to_read); |
1243 | 0 | return old_ibuf; |
1244 | 0 | } |
1245 | | |
1246 | 0 | struct ibuf *new_ibuf = iproto_connection_next_input(con); |
1247 | 0 | if (ibuf_used(new_ibuf) != 0) { |
1248 | | /* |
1249 | | * Wait until the second buffer is flushed |
1250 | | * and becomes available for reuse. |
1251 | | */ |
1252 | 0 | return NULL; |
1253 | 0 | } |
1254 | | /* Update buffer size if readahead has changed. */ |
1255 | 0 | if (new_ibuf->start_capacity != iproto_readahead) { |
1256 | 0 | ibuf_destroy(new_ibuf); |
1257 | 0 | ibuf_create(new_ibuf, cord_slab_cache(), iproto_readahead); |
1258 | 0 | } |
1259 | |
|
1260 | 0 | xibuf_reserve(new_ibuf, to_read + con->parse_size); |
1261 | 0 | if (con->parse_size != 0) { |
1262 | | /* Move the cached request prefix to the new buffer. */ |
1263 | 0 | void *wpos = ibuf_alloc(new_ibuf, con->parse_size); |
1264 | 0 | memcpy(wpos, old_ibuf->wpos - con->parse_size, con->parse_size); |
1265 | | /* |
1266 | | * Discard unparsed data in the old buffer, otherwise it |
1267 | | * won't be recycled when all parsed requests are processed. |
1268 | | */ |
1269 | 0 | ibuf_discard(old_ibuf, con->parse_size); |
1270 | | /* |
1271 | | * We made ibuf idle. If obuf was already idle it |
1272 | | * makes the both ibuf and obuf idle, time to trim |
1273 | | * them. |
1274 | | */ |
1275 | 0 | if (ibuf_used(old_ibuf) == 0) |
1276 | 0 | iproto_reset_input(old_ibuf); |
1277 | 0 | } |
1278 | | /* |
1279 | | * Rotate buffers. Not strictly necessary, but |
1280 | | * helps preserve response order. |
1281 | | */ |
1282 | 0 | con->p_ibuf = new_ibuf; |
1283 | 0 | return new_ibuf; |
1284 | 0 | } |
1285 | | |
1286 | | /** |
1287 | | * Check if message belongs to stream (stream_id != 0), and if it |
1288 | | * is so create new stream or get stream from connection streams |
1289 | | * hash table. Put message to stream pending messages list. |
1290 | | * @retval true - the message is ready to push to TX thread (either if |
1291 | | * stream_id is not set (is zero) or the stream is not |
1292 | | * processing other messages). |
1293 | | * false - the message is postponed because its stream is busy |
1294 | | * processing previous message(s). |
1295 | | */ |
1296 | | static bool |
1297 | | iproto_msg_start_processing_in_stream(struct iproto_msg *msg) |
1298 | 0 | { |
1299 | 0 | uint64_t stream_id = msg->header.stream_id; |
1300 | 0 | if (stream_id == 0) |
1301 | 0 | return true; |
1302 | | |
1303 | 0 | struct iproto_connection *con = msg->connection; |
1304 | 0 | struct iproto_stream *stream = NULL; |
1305 | 0 | mh_int_t pos = mh_i64ptr_find(con->streams, stream_id, 0); |
1306 | 0 | if (pos == mh_end(con->streams)) { |
1307 | 0 | stream = iproto_stream_new(msg->connection, msg->header.stream_id); |
1308 | 0 | struct mh_i64ptr_node_t node; |
1309 | 0 | node.key = stream_id; |
1310 | 0 | node.val = stream; |
1311 | 0 | pos = mh_i64ptr_put(con->streams, &node, NULL, NULL); |
1312 | 0 | } |
1313 | 0 | stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val; |
1314 | 0 | assert(stream != NULL); |
1315 | 0 | msg->stream = stream; |
1316 | 0 | if (stream->current == NULL) { |
1317 | 0 | stream->current = msg; |
1318 | 0 | return true; |
1319 | 0 | } |
1320 | 0 | con->iproto_thread->requests_in_stream_queue++; |
1321 | 0 | rmean_collect(con->iproto_thread->rmean, REQUESTS_IN_STREAM_QUEUE, 1); |
1322 | 0 | stailq_add_tail_entry(&stream->pending_requests, msg, in_stream); |
1323 | 0 | return false; |
1324 | 0 | } |
1325 | | |
1326 | | /** |
1327 | | * Enqueue all requests which were read up. If a request limit is |
1328 | | * reached - stop the connection input even if not the whole batch |
1329 | | * is enqueued. Else try to read more feeding read event to the |
1330 | | * event loop. |
1331 | | * @param con Connection to enqueue in. |
1332 | | * @param in Buffer to parse. |
1333 | | * |
1334 | | * @retval 0 Success. |
1335 | | * @retval -1 Invalid MessagePack. |
1336 | | */ |
1337 | | static inline int |
1338 | | iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) |
1339 | 0 | { |
1340 | 0 | assert(rlist_empty(&con->in_stop_list)); |
1341 | 0 | int n_requests = 0; |
1342 | 0 | const char *errmsg; |
1343 | 0 | while (con->parse_size != 0 && !con->is_in_replication) { |
1344 | 0 | if (iproto_check_msg_max(con->iproto_thread)) { |
1345 | 0 | iproto_connection_stop_msg_max_limit(con); |
1346 | 0 | cpipe_flush_input(&con->iproto_thread->tx_pipe); |
1347 | 0 | return 0; |
1348 | 0 | } |
1349 | 0 | const char *reqstart = in->wpos - con->parse_size; |
1350 | 0 | const char *pos = reqstart; |
1351 | | /* Read request length. */ |
1352 | 0 | if (mp_typeof(*pos) != MP_UINT) { |
1353 | 0 | errmsg = "packet length"; |
1354 | 0 | err_msgpack: |
1355 | 0 | cpipe_flush_input(&con->iproto_thread->tx_pipe); |
1356 | 0 | diag_set(ClientError, ER_INVALID_MSGPACK, |
1357 | 0 | errmsg); |
1358 | 0 | return -1; |
1359 | 0 | } |
1360 | 0 | if (mp_check_uint(pos, in->wpos) >= 0) |
1361 | 0 | break; |
1362 | 0 | uint64_t len = mp_decode_uint(&pos); |
1363 | 0 | if (len > IPROTO_PACKET_SIZE_MAX) { |
1364 | 0 | errmsg = tt_sprintf("too big packet size in the "\ |
1365 | 0 | "header: %llu", |
1366 | 0 | (unsigned long long) len); |
1367 | 0 | goto err_msgpack; |
1368 | 0 | } |
1369 | 0 | const char *reqend = pos + len; |
1370 | 0 | if (reqend > in->wpos) |
1371 | 0 | break; |
1372 | 0 | struct iproto_msg *msg = iproto_msg_new(con); |
1373 | 0 | msg->p_ibuf = con->p_ibuf; |
1374 | 0 | msg->reqstart = reqstart; |
1375 | 0 | msg->wpos = con->wpos; |
1376 | 0 | msg->len = reqend - reqstart; /* total request length */ |
1377 | 0 | con->input_msg_count[msg->p_ibuf == &con->ibuf[1]]++; |
1378 | |
|
1379 | 0 | iproto_msg_prepare(msg, &pos, reqend); |
1380 | 0 | if (iproto_msg_start_processing_in_stream(msg)) { |
1381 | 0 | cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base); |
1382 | 0 | n_requests++; |
1383 | 0 | } |
1384 | | |
1385 | | /* Request is parsed */ |
1386 | 0 | assert(reqend > reqstart); |
1387 | 0 | assert(con->parse_size >= (size_t) (reqend - reqstart)); |
1388 | 0 | con->parse_size -= reqend - reqstart; |
1389 | 0 | } |
1390 | 0 | if (con->is_in_replication) { |
1391 | | /** |
1392 | | * Don't mess with the file descriptor |
1393 | | * while join is running. ev_io_stop() |
1394 | | * also clears any pending events, which |
1395 | | * is good, since their invocation may |
1396 | | * re-start the watcher, ruining our |
1397 | | * efforts. |
1398 | | */ |
1399 | 0 | ev_io_stop(con->loop, &con->output); |
1400 | 0 | ev_io_stop(con->loop, &con->input); |
1401 | 0 | } else if (n_requests != 1 || con->parse_size != 0) { |
1402 | | /* |
1403 | | * Keep reading input, as long as the socket |
1404 | | * supplies data, but don't waste CPU on an extra |
1405 | | * read() if dealing with a blocking client, it |
1406 | | * has nothing in the socket for us. |
1407 | | * |
1408 | | * We look at the amount of enqueued requests |
1409 | | * and presence of a partial request in the |
1410 | | * input buffer as hints to distinguish |
1411 | | * blocking and non-blocking clients: |
1412 | | * |
1413 | | * For blocking clients, a request typically |
1414 | | * is fully read and enqueued. |
1415 | | * If there is unparsed data, or 0 queued |
1416 | | * requests, keep reading input, if only to avoid |
1417 | | * a deadlock on this connection. |
1418 | | */ |
1419 | 0 | iproto_connection_feed_input(con); |
1420 | 0 | } |
1421 | 0 | cpipe_flush_input(&con->iproto_thread->tx_pipe); |
1422 | 0 | return 0; |
1423 | 0 | } |
1424 | | |
1425 | | /** |
1426 | | * Enqueue connection's pending requests. Completely resurrect the |
1427 | | * connection, if it has no more requests, and the limit still is |
1428 | | * not reached. |
1429 | | */ |
1430 | | static void |
1431 | | iproto_connection_resume(struct iproto_connection *con) |
1432 | 0 | { |
1433 | 0 | assert(! iproto_check_msg_max(con->iproto_thread)); |
1434 | 0 | rlist_del(&con->in_stop_list); |
1435 | | /* |
1436 | | * Enqueue_batch() stops the connection again, if the |
1437 | | * limit is reached again. |
1438 | | */ |
1439 | 0 | if (iproto_enqueue_batch(con, con->p_ibuf) != 0) { |
1440 | 0 | struct error *e = box_error_last(); |
1441 | 0 | error_log(e); |
1442 | 0 | iproto_write_error(&con->io, e, ::schema_version, 0); |
1443 | 0 | iproto_connection_close(con); |
1444 | 0 | } |
1445 | 0 | } |
1446 | | |
1447 | | /** |
1448 | | * Resume as many connections as possible until a request limit is |
1449 | | * reached. By design of iproto_enqueue_batch(), a paused |
1450 | | * connection almost always has a pending request fully read up, |
1451 | | * so resuming a connection will immediately enqueue the request |
1452 | | * as an iproto message and exhaust the limit. Thus we aren't |
1453 | | * really resuming all connections here: only as many as is |
1454 | | * necessary to use up the limit. |
1455 | | */ |
1456 | | static void |
1457 | | iproto_resume(struct iproto_thread *iproto_thread) |
1458 | 0 | { |
1459 | 0 | while (!iproto_check_msg_max(iproto_thread) && |
1460 | 0 | !rlist_empty(&iproto_thread->stopped_connections)) { |
1461 | | /* |
1462 | | * Shift from list head to ensure strict FIFO |
1463 | | * (fairness) for resumed connections. |
1464 | | */ |
1465 | 0 | struct iproto_connection *con = |
1466 | 0 | rlist_first_entry(&iproto_thread->stopped_connections, |
1467 | 0 | struct iproto_connection, |
1468 | 0 | in_stop_list); |
1469 | 0 | iproto_connection_resume(con); |
1470 | 0 | } |
1471 | 0 | } |
1472 | | |
1473 | | static void |
1474 | | iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, |
1475 | | int /* revents */) |
1476 | 0 | { |
1477 | 0 | struct iproto_connection *con = |
1478 | 0 | (struct iproto_connection *) watcher->data; |
1479 | 0 | struct iostream *io = &con->io; |
1480 | 0 | assert(con->state == IPROTO_CONNECTION_ALIVE); |
1481 | 0 | assert(rlist_empty(&con->in_stop_list)); |
1482 | 0 | assert(loop == con->loop); |
1483 | | /* |
1484 | | * Throttle if there are too many pending requests, |
1485 | | * otherwise we might deplete the fiber pool in tx |
1486 | | * thread and deadlock. |
1487 | | */ |
1488 | 0 | if (iproto_check_msg_max(con->iproto_thread)) { |
1489 | 0 | iproto_connection_stop_msg_max_limit(con); |
1490 | 0 | return; |
1491 | 0 | } |
1492 | | |
1493 | | /* Ensure we have sufficient space for the next round. */ |
1494 | 0 | struct ibuf *in = iproto_connection_input_buffer(con); |
1495 | 0 | if (in == NULL) { |
1496 | 0 | iproto_connection_stop_readahead_limit(con); |
1497 | 0 | return; |
1498 | 0 | } |
1499 | | /* Read input. */ |
1500 | 0 | ibuf_reserve(in, ibuf_unused(in)); |
1501 | 0 | ssize_t nrd = iostream_read(io, in->wpos, ibuf_unused(in)); |
1502 | 0 | if (nrd < 0) { /* Socket is not ready. */ |
1503 | 0 | if (nrd == IOSTREAM_ERROR) |
1504 | 0 | goto error; |
1505 | 0 | int events = iostream_status_to_events(nrd); |
1506 | 0 | if (con->input.events != events) { |
1507 | 0 | ev_io_stop(loop, &con->input); |
1508 | 0 | ev_io_set(&con->input, con->io.fd, events); |
1509 | 0 | } |
1510 | 0 | ev_io_start(loop, &con->input); |
1511 | 0 | return; |
1512 | 0 | } |
1513 | 0 | if (nrd == 0) { /* EOF */ |
1514 | 0 | iproto_connection_close(con); |
1515 | 0 | return; |
1516 | 0 | } |
1517 | | /* Count statistics */ |
1518 | 0 | rmean_collect(con->iproto_thread->rmean, IPROTO_RECEIVED, nrd); |
1519 | | |
1520 | | /* Update the read position and connection state. */ |
1521 | 0 | ibuf_alloc(in, nrd); |
1522 | 0 | con->parse_size += nrd; |
1523 | | /* Enqueue all requests which are fully read up. */ |
1524 | 0 | if (iproto_enqueue_batch(con, in) != 0) |
1525 | 0 | goto error; |
1526 | 0 | return; |
1527 | 0 | error:; |
1528 | 0 | struct error *e = diag_last_error(diag_get()); |
1529 | 0 | assert(e != NULL); |
1530 | 0 | error_log(e); |
1531 | | /* Best effort at sending the error message to the client. */ |
1532 | 0 | iproto_write_error(io, e, ::schema_version, 0); |
1533 | 0 | iproto_connection_close(con); |
1534 | 0 | } |
1535 | | |
1536 | | /** writev() to the socket and handle the result. */ |
1537 | | static int |
1538 | | iproto_flush(struct iproto_connection *con) |
1539 | 0 | { |
1540 | 0 | struct obuf *obuf = con->wpos.obuf; |
1541 | 0 | struct obuf_svp obuf_end = obuf_create_svp(obuf); |
1542 | 0 | struct obuf_svp *begin = &con->wpos.svp; |
1543 | 0 | struct obuf_svp *end = &con->wend.svp; |
1544 | 0 | if (con->wend.obuf != obuf) { |
1545 | | /* |
1546 | | * Flush the current buffer before |
1547 | | * advancing to the next one. |
1548 | | */ |
1549 | 0 | if (begin->used == obuf_end.used) { |
1550 | 0 | obuf = con->wpos.obuf = con->wend.obuf; |
1551 | 0 | obuf_svp_reset(begin); |
1552 | 0 | } else { |
1553 | 0 | end = &obuf_end; |
1554 | 0 | } |
1555 | 0 | } |
1556 | 0 | if (begin->used == end->used) { |
1557 | | /* Nothing to do. */ |
1558 | 0 | return 1; |
1559 | 0 | } |
1560 | 0 | if (!con->can_write) { |
1561 | | /* Receiving end was closed. Discard the output. */ |
1562 | 0 | *begin = *end; |
1563 | 0 | return 0; |
1564 | 0 | } |
1565 | 0 | assert(begin->used < end->used); |
1566 | 0 | struct iovec iov[SMALL_OBUF_IOV_MAX+1]; |
1567 | 0 | struct iovec *src = obuf->iov; |
1568 | 0 | int iovcnt = end->pos - begin->pos + 1; |
1569 | | /* |
1570 | | * iov[i].iov_len may be concurrently modified in tx thread, |
1571 | | * but only for the last position. |
1572 | | */ |
1573 | 0 | memcpy(iov, src + begin->pos, iovcnt * sizeof(struct iovec)); |
1574 | 0 | sio_add_to_iov(iov, -begin->iov_len); |
1575 | | /* *Overwrite* iov_len of the last pos as it may be garbage. */ |
1576 | 0 | iov[iovcnt-1].iov_len = end->iov_len - begin->iov_len * (iovcnt == 1); |
1577 | |
|
1578 | 0 | ssize_t nwr = iostream_writev(&con->io, iov, iovcnt); |
1579 | 0 | if (nwr >= 0) { |
1580 | | /* Count statistics */ |
1581 | 0 | rmean_collect(con->iproto_thread->rmean, IPROTO_SENT, nwr); |
1582 | 0 | if (begin->used + nwr == end->used) { |
1583 | 0 | *begin = *end; |
1584 | 0 | return 0; |
1585 | 0 | } |
1586 | 0 | size_t offset = 0; |
1587 | 0 | int advance = 0; |
1588 | 0 | advance = sio_move_iov(iov, nwr, &offset); |
1589 | 0 | begin->used += nwr; /* advance write position */ |
1590 | 0 | begin->iov_len = advance == 0 ? begin->iov_len + offset: offset; |
1591 | 0 | begin->pos += advance; |
1592 | 0 | assert(begin->pos <= end->pos); |
1593 | 0 | return IOSTREAM_WANT_WRITE; |
1594 | 0 | } else if (nwr == IOSTREAM_ERROR) { |
1595 | | /* |
1596 | | * Don't close the connection on write error. Log the error and |
1597 | | * don't write to the socket anymore. Continue processing |
1598 | | * requests as usual, because the client might have closed the |
1599 | | * socket, but still expect pending requests to complete. |
1600 | | */ |
1601 | 0 | diag_log(); |
1602 | 0 | con->can_write = false; |
1603 | 0 | *begin = *end; |
1604 | 0 | return 0; |
1605 | 0 | } |
1606 | 0 | return nwr; |
1607 | 0 | } |
1608 | | |
1609 | | static void |
1610 | | iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher, |
1611 | | int /* revents */) |
1612 | 0 | { |
1613 | 0 | struct iproto_connection *con = (struct iproto_connection *) watcher->data; |
1614 | 0 | assert(con->state == IPROTO_CONNECTION_ALIVE); |
1615 | 0 | int rc; |
1616 | 0 | while ((rc = iproto_flush(con)) <= 0) { |
1617 | 0 | if (rc != 0) { |
1618 | 0 | int events = iostream_status_to_events(rc); |
1619 | 0 | if (con->output.events != events) { |
1620 | 0 | ev_io_stop(loop, &con->output); |
1621 | 0 | ev_io_set(&con->output, con->io.fd, events); |
1622 | 0 | } |
1623 | 0 | ev_io_start(loop, &con->output); |
1624 | 0 | return; |
1625 | 0 | } |
1626 | 0 | } |
1627 | 0 | if (ev_is_active(&con->output)) |
1628 | 0 | ev_io_stop(con->loop, &con->output); |
1629 | | /* |
1630 | | * If the out channel isn't clogged, we can read more requests. |
1631 | | * Note, we trigger input even if we didn't write any responses |
1632 | | * (iproto_flush returned 1 right away). This is intentional: |
1633 | | * some requests don't have responses (IPROTO_WATCH). |
1634 | | */ |
1635 | 0 | iproto_connection_feed_input(con); |
1636 | 0 | } |
1637 | | |
1638 | | static struct iproto_connection * |
1639 | | iproto_connection_new(struct iproto_thread *iproto_thread) |
1640 | 0 | { |
1641 | 0 | struct iproto_connection *con = (struct iproto_connection *) |
1642 | 0 | xmempool_alloc(&iproto_thread->iproto_connection_pool); |
1643 | 0 | con->streams = mh_i64ptr_new(); |
1644 | 0 | con->iproto_thread = iproto_thread; |
1645 | 0 | con->input.data = con->output.data = con; |
1646 | 0 | con->loop = loop(); |
1647 | 0 | iostream_clear(&con->io); |
1648 | 0 | ev_io_init(&con->input, iproto_connection_on_input, -1, EV_NONE); |
1649 | 0 | ev_io_init(&con->output, iproto_connection_on_output, -1, EV_NONE); |
1650 | 0 | ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead); |
1651 | 0 | ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead); |
1652 | 0 | con->input_msg_count[0] = 0; |
1653 | 0 | con->input_msg_count[1] = 0; |
1654 | 0 | obuf_create(&con->obuf[0], &con->iproto_thread->net_slabc, |
1655 | 0 | iproto_readahead); |
1656 | 0 | obuf_create(&con->obuf[1], &con->iproto_thread->net_slabc, |
1657 | 0 | iproto_readahead); |
1658 | 0 | con->p_ibuf = &con->ibuf[0]; |
1659 | 0 | con->tx.p_obuf = &con->obuf[0]; |
1660 | 0 | iproto_wpos_create(&con->wpos, con->tx.p_obuf); |
1661 | 0 | iproto_wpos_create(&con->wend, con->tx.p_obuf); |
1662 | 0 | con->parse_size = 0; |
1663 | 0 | con->can_write = true; |
1664 | 0 | con->long_poll_count = 0; |
1665 | 0 | con->session = NULL; |
1666 | 0 | con->is_in_replication = false; |
1667 | 0 | con->is_drop_pending = false; |
1668 | 0 | con->is_established = false; |
1669 | 0 | rlist_create(&con->in_stop_list); |
1670 | 0 | rlist_create(&con->tx.inprogress); |
1671 | 0 | rlist_add_entry(&iproto_thread->connections, con, in_connections); |
1672 | | /* It may be very awkward to allocate at close. */ |
1673 | 0 | cmsg_init(&con->destroy_msg, con->iproto_thread->destroy_route); |
1674 | 0 | cmsg_init(&con->disconnect_msg, con->iproto_thread->disconnect_route); |
1675 | 0 | con->state = IPROTO_CONNECTION_ALIVE; |
1676 | 0 | con->tx.is_push_pending = false; |
1677 | 0 | con->tx.is_push_sent = false; |
1678 | 0 | rmean_collect(iproto_thread->rmean, IPROTO_CONNECTIONS, 1); |
1679 | 0 | return con; |
1680 | 0 | } |
1681 | | |
1682 | | /** Notify that connections drop is finished. */ |
1683 | | static void |
1684 | | tx_process_drop_finished(struct cmsg *m) |
1685 | 0 | { |
1686 | 0 | struct iproto_drop_finished *drop_finished = |
1687 | 0 | (struct iproto_drop_finished *)m; |
1688 | 0 | if (drop_finished->generation == drop_generation && |
1689 | 0 | --drop_pending_thread_count == 0) |
1690 | 0 | fiber_cond_signal(&drop_finished_cond); |
1691 | 0 | } |
1692 | | |
1693 | | /** Send message to TX thread to notify that connections drop is finished. */ |
1694 | | static void |
1695 | | iproto_send_drop_finished(struct iproto_thread *iproto_thread, |
1696 | | unsigned generation) |
1697 | 0 | { |
1698 | 0 | static const struct cmsg_hop drop_finished_route[1] = |
1699 | 0 | {{ tx_process_drop_finished, NULL }}; |
1700 | |
|
1701 | 0 | cmsg_init(&iproto_thread->drop_finished_msg.base, drop_finished_route); |
1702 | 0 | iproto_thread->drop_finished_msg.generation = generation; |
1703 | 0 | cpipe_push(&iproto_thread->tx_pipe, |
1704 | 0 | &iproto_thread->drop_finished_msg.base); |
1705 | 0 | } |
1706 | | |
1707 | | /** Recycle a connection. */ |
1708 | | static inline void |
1709 | | iproto_connection_delete(struct iproto_connection *con) |
1710 | 0 | { |
1711 | 0 | assert(iproto_connection_is_idle(con)); |
1712 | 0 | assert(!iostream_is_initialized(&con->io)); |
1713 | 0 | assert(con->session == NULL); |
1714 | 0 | assert(con->state == IPROTO_CONNECTION_DESTROYED); |
1715 | | /* |
1716 | | * The output buffers must have been deleted |
1717 | | * in tx thread. |
1718 | | */ |
1719 | 0 | ibuf_destroy(&con->ibuf[0]); |
1720 | 0 | ibuf_destroy(&con->ibuf[1]); |
1721 | 0 | assert(!obuf_is_initialized(&con->obuf[0])); |
1722 | 0 | assert(!obuf_is_initialized(&con->obuf[1])); |
1723 | | |
1724 | 0 | assert(mh_size(con->streams) == 0); |
1725 | 0 | mh_i64ptr_delete(con->streams); |
1726 | 0 | rlist_del(&con->in_connections); |
1727 | 0 | if (con->is_drop_pending) { |
1728 | 0 | struct iproto_thread *iproto_thread = con->iproto_thread; |
1729 | |
|
1730 | 0 | assert(iproto_thread->drop_pending_connection_count > 0); |
1731 | 0 | if (--iproto_thread->drop_pending_connection_count == 0) |
1732 | 0 | iproto_send_drop_finished(iproto_thread, |
1733 | 0 | con->drop_generation); |
1734 | 0 | } |
1735 | 0 | mempool_free(&con->iproto_thread->iproto_connection_pool, con); |
1736 | 0 | } |
1737 | | |
1738 | | /* }}} iproto_connection */ |
1739 | | |
1740 | | /* {{{ iproto_msg - methods and routes */ |
1741 | | |
1742 | | static void |
1743 | | tx_process_misc(struct cmsg *msg); |
1744 | | |
1745 | | static void |
1746 | | tx_process_call(struct cmsg *msg); |
1747 | | |
1748 | | static void |
1749 | | tx_process1(struct cmsg *msg); |
1750 | | |
1751 | | static void |
1752 | | tx_process_select(struct cmsg *msg); |
1753 | | |
1754 | | static void |
1755 | | tx_process_sql(struct cmsg *msg); |
1756 | | |
1757 | | static void |
1758 | | tx_reply_error(struct iproto_msg *msg); |
1759 | | |
1760 | | static void |
1761 | | tx_reply_iproto_error(struct cmsg *m); |
1762 | | |
1763 | | static void |
1764 | | net_send_msg(struct cmsg *msg); |
1765 | | |
1766 | | static void |
1767 | | net_send_error(struct cmsg *msg); |
1768 | | |
1769 | | static void |
1770 | | tx_process_replication(struct cmsg *msg); |
1771 | | |
1772 | | static void |
1773 | | net_end_join(struct cmsg *msg); |
1774 | | |
1775 | | static void |
1776 | | net_end_subscribe(struct cmsg *msg); |
1777 | | |
1778 | | /** |
1779 | | * Decodes the IPROTO message and returns the route corresponding to the message |
1780 | | * type. |
1781 | | * Can be called from both IPROTO and TX threads. |
1782 | | */ |
1783 | | static int |
1784 | | iproto_msg_decode(struct iproto_msg *msg, struct cmsg_hop **route); |
1785 | | |
1786 | | static void |
1787 | | iproto_msg_prepare(struct iproto_msg *msg, const char **pos, const char *reqend) |
1788 | 0 | { |
1789 | 0 | uint64_t stream_id; |
1790 | 0 | uint32_t type; |
1791 | 0 | bool request_is_not_for_stream; |
1792 | 0 | bool request_is_only_for_stream; |
1793 | 0 | struct iproto_thread *iproto_thread = msg->connection->iproto_thread; |
1794 | 0 | mh_i32_t *handlers = iproto_thread->req_handlers; |
1795 | 0 | mh_int_t handler; |
1796 | 0 | struct cmsg_hop *route; |
1797 | 0 | int rc; |
1798 | |
|
1799 | 0 | if (xrow_header_decode(&msg->header, pos, reqend, true) != 0) |
1800 | 0 | goto error; |
1801 | 0 | assert(*pos == reqend); |
1802 | | |
1803 | 0 | type = msg->header.type; |
1804 | 0 | stream_id = msg->header.stream_id; |
1805 | 0 | request_is_not_for_stream = |
1806 | 0 | ((type > IPROTO_TYPE_STAT_MAX && |
1807 | 0 | type != IPROTO_PING) || type == IPROTO_AUTH); |
1808 | 0 | request_is_only_for_stream = |
1809 | 0 | (type == IPROTO_BEGIN || |
1810 | 0 | type == IPROTO_COMMIT || |
1811 | 0 | type == IPROTO_ROLLBACK); |
1812 | |
|
1813 | 0 | if (stream_id != 0 && request_is_not_for_stream) { |
1814 | 0 | diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM, |
1815 | 0 | iproto_type_name(type)); |
1816 | 0 | goto error; |
1817 | 0 | } else if (stream_id == 0 && request_is_only_for_stream) { |
1818 | 0 | diag_set(ClientError, ER_UNABLE_TO_PROCESS_OUT_OF_STREAM, |
1819 | 0 | iproto_type_name(type)); |
1820 | 0 | goto error; |
1821 | 0 | } |
1822 | | |
1823 | 0 | msg->connection->is_in_replication = type == IPROTO_JOIN || |
1824 | 0 | type == IPROTO_FETCH_SNAPSHOT || |
1825 | 0 | type == IPROTO_REGISTER || |
1826 | 0 | type == IPROTO_SUBSCRIBE; |
1827 | |
|
1828 | 0 | handler = mh_i32_find(handlers, type, NULL); |
1829 | 0 | if (handler != mh_end(handlers)) { |
1830 | 0 | assert(!msg->connection->is_in_replication); |
1831 | 0 | cmsg_init(&msg->base, iproto_thread->override_route); |
1832 | 0 | return; |
1833 | 0 | } |
1834 | | |
1835 | 0 | rc = iproto_msg_decode(msg, &route); |
1836 | 0 | if (rc == 0) { |
1837 | 0 | assert(route != NULL); |
1838 | 0 | cmsg_init(&msg->base, route); |
1839 | 0 | return; |
1840 | 0 | } |
1841 | 0 | if (route == NULL) { |
1842 | 0 | handler = mh_i32_find(handlers, IPROTO_UNKNOWN, NULL); |
1843 | 0 | if (handler != mh_end(handlers)) { |
1844 | 0 | cmsg_init(&msg->base, iproto_thread->override_route); |
1845 | 0 | return; |
1846 | 0 | } |
1847 | 0 | diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, (uint32_t)type); |
1848 | 0 | } |
1849 | 0 | error: |
1850 | | /** Log and send the error. */ |
1851 | 0 | diag_log(); |
1852 | 0 | diag_create(&msg->diag); |
1853 | 0 | diag_move(&fiber()->diag, &msg->diag); |
1854 | 0 | cmsg_init(&msg->base, iproto_thread->error_route); |
1855 | 0 | } |
1856 | | |
1857 | | static int |
1858 | | iproto_msg_decode(struct iproto_msg *msg, struct cmsg_hop **route) |
1859 | 0 | { |
1860 | 0 | uint32_t type = msg->header.type; |
1861 | 0 | struct iproto_thread *iproto_thread = msg->connection->iproto_thread; |
1862 | 0 | switch (type) { |
1863 | 0 | case IPROTO_SELECT: |
1864 | 0 | case IPROTO_INSERT: |
1865 | 0 | case IPROTO_REPLACE: |
1866 | 0 | case IPROTO_UPDATE: |
1867 | 0 | case IPROTO_DELETE: |
1868 | 0 | case IPROTO_UPSERT: |
1869 | 0 | assert(type < sizeof(iproto_thread->dml_route) / |
1870 | 0 | sizeof(*iproto_thread->dml_route)); |
1871 | 0 | *route = iproto_thread->dml_route[type]; |
1872 | 0 | if (xrow_decode_dml_iproto(&msg->header, &msg->dml, |
1873 | 0 | dml_request_key_map(type)) != 0) |
1874 | 0 | return -1; |
1875 | | /* |
1876 | | * In contrast to replication requests, for a client request |
1877 | | * the xrow header is set by WAL, which generates LSNs and sets |
1878 | | * replica id. Ignore the header received over network. |
1879 | | */ |
1880 | 0 | msg->dml.header = NULL; |
1881 | 0 | return 0; |
1882 | 0 | case IPROTO_BEGIN: |
1883 | 0 | *route = iproto_thread->begin_route; |
1884 | 0 | if (xrow_decode_begin(&msg->header, &msg->begin) != 0) |
1885 | 0 | return -1; |
1886 | 0 | return 0; |
1887 | 0 | case IPROTO_COMMIT: |
1888 | 0 | *route = iproto_thread->commit_route; |
1889 | 0 | if (xrow_decode_commit(&msg->header, &msg->commit) != 0) |
1890 | 0 | return -1; |
1891 | 0 | return 0; |
1892 | 0 | case IPROTO_ROLLBACK: |
1893 | 0 | *route = iproto_thread->rollback_route; |
1894 | 0 | return 0; |
1895 | 0 | case IPROTO_CALL_16: |
1896 | 0 | case IPROTO_CALL: |
1897 | 0 | case IPROTO_EVAL: |
1898 | 0 | *route = iproto_thread->call_route; |
1899 | 0 | if (xrow_decode_call(&msg->header, &msg->call)) |
1900 | 0 | return -1; |
1901 | 0 | return 0; |
1902 | 0 | case IPROTO_WATCH: |
1903 | 0 | case IPROTO_UNWATCH: |
1904 | 0 | case IPROTO_WATCH_ONCE: |
1905 | 0 | *route = iproto_thread->misc_route; |
1906 | 0 | ERROR_INJECT(ERRINJ_IPROTO_DISABLE_WATCH, { |
1907 | 0 | *route = NULL; |
1908 | 0 | return -1; |
1909 | 0 | }); |
1910 | 0 | if (xrow_decode_watch(&msg->header, &msg->watch) != 0) |
1911 | 0 | return -1; |
1912 | 0 | return 0; |
1913 | 0 | case IPROTO_EXECUTE: |
1914 | 0 | case IPROTO_PREPARE: |
1915 | 0 | *route = iproto_thread->sql_route; |
1916 | 0 | if (xrow_decode_sql(&msg->header, &msg->sql) != 0) |
1917 | 0 | return -1; |
1918 | 0 | return 0; |
1919 | 0 | case IPROTO_PING: |
1920 | 0 | *route = iproto_thread->misc_route; |
1921 | 0 | return 0; |
1922 | 0 | case IPROTO_ID: |
1923 | 0 | *route = iproto_thread->misc_route; |
1924 | 0 | ERROR_INJECT(ERRINJ_IPROTO_DISABLE_ID, { |
1925 | 0 | *route = NULL; |
1926 | 0 | return -1; |
1927 | 0 | }); |
1928 | 0 | if (xrow_decode_id(&msg->header, &msg->id) != 0) |
1929 | 0 | return -1; |
1930 | 0 | return 0; |
1931 | 0 | case IPROTO_JOIN: |
1932 | 0 | case IPROTO_FETCH_SNAPSHOT: |
1933 | 0 | case IPROTO_REGISTER: |
1934 | 0 | *route = iproto_thread->join_route; |
1935 | 0 | return 0; |
1936 | 0 | case IPROTO_SUBSCRIBE: |
1937 | 0 | *route = iproto_thread->subscribe_route; |
1938 | 0 | return 0; |
1939 | 0 | case IPROTO_VOTE_DEPRECATED: |
1940 | 0 | case IPROTO_VOTE: |
1941 | 0 | *route = iproto_thread->misc_route; |
1942 | 0 | return 0; |
1943 | 0 | case IPROTO_AUTH: |
1944 | 0 | *route = iproto_thread->misc_route; |
1945 | 0 | if (xrow_decode_auth(&msg->header, &msg->auth)) |
1946 | 0 | return -1; |
1947 | 0 | return 0; |
1948 | 0 | default: |
1949 | 0 | *route = NULL; |
1950 | 0 | return -1; |
1951 | 0 | } |
1952 | 0 | } |
1953 | | |
1954 | | static void |
1955 | | tx_fiber_init(struct session *session, uint64_t sync) |
1956 | 0 | { |
1957 | 0 | struct fiber *f = fiber(); |
1958 | | /* |
1959 | | * There should not be any not executed on_stop triggers |
1960 | | * from a previous request executed in that fiber. |
1961 | | */ |
1962 | 0 | assert(rlist_empty(&f->on_stop)); |
1963 | 0 | f->storage.net.sync = sync; |
1964 | | /* |
1965 | | * We do not cleanup fiber keys at the end of each request. |
1966 | | * This does not lead to privilege escalation as long as |
1967 | | * fibers used to serve iproto requests never mingle with |
1968 | | * fibers used to serve background tasks without going |
1969 | | * through the purification of fiber_recycle(), which |
1970 | | * resets the fiber local storage. Fibers, used to run |
1971 | | * background tasks clean up their session in on_stop |
1972 | | * trigger as well. |
1973 | | */ |
1974 | 0 | fiber_set_session(f, session); |
1975 | 0 | fiber_set_user(f, &session->credentials); |
1976 | 0 | } |
1977 | | |
1978 | | static void |
1979 | | tx_process_rollback_on_disconnect(struct cmsg *m) |
1980 | 0 | { |
1981 | 0 | struct iproto_stream *stream = |
1982 | 0 | container_of(m, struct iproto_stream, |
1983 | 0 | on_disconnect); |
1984 | |
|
1985 | 0 | if (stream->txn != NULL) { |
1986 | 0 | tx_fiber_init(stream->connection->session, 0); |
1987 | 0 | txn_attach(stream->txn); |
1988 | 0 | if (box_txn_rollback() != 0) |
1989 | 0 | panic("failed to rollback transaction on disconnect"); |
1990 | 0 | stream->txn = NULL; |
1991 | 0 | } |
1992 | 0 | } |
1993 | | |
1994 | | static void |
1995 | | net_finish_rollback_on_disconnect(struct cmsg *m) |
1996 | 0 | { |
1997 | 0 | struct iproto_stream *stream = |
1998 | 0 | container_of(m, struct iproto_stream, |
1999 | 0 | on_disconnect); |
2000 | 0 | struct iproto_connection *con = stream->connection; |
2001 | |
|
2002 | 0 | struct mh_i64ptr_node_t node = { stream->id, NULL }; |
2003 | 0 | mh_i64ptr_remove(con->streams, &node, 0); |
2004 | 0 | iproto_stream_delete(stream); |
2005 | 0 | assert(con->state != IPROTO_CONNECTION_ALIVE); |
2006 | 0 | if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) |
2007 | 0 | iproto_connection_try_to_start_destroy(con); |
2008 | 0 | } |
2009 | | |
2010 | | /** Cancel all inprogress requests of the connection. */ |
2011 | | static void |
2012 | | tx_process_cancel_inprogress(struct cmsg *m) |
2013 | 0 | { |
2014 | 0 | struct iproto_connection *con = |
2015 | 0 | container_of(m, struct iproto_connection, cancel_msg); |
2016 | 0 | struct iproto_msg *msg; |
2017 | 0 | rlist_foreach_entry(msg, &con->tx.inprogress, in_inprogress) |
2018 | 0 | fiber_cancel(msg->fiber); |
2019 | 0 | } |
2020 | | |
2021 | | static void |
2022 | | tx_process_disconnect(struct cmsg *m) |
2023 | 0 | { |
2024 | 0 | struct iproto_connection *con = |
2025 | 0 | container_of(m, struct iproto_connection, disconnect_msg); |
2026 | 0 | if (con->session != NULL) { |
2027 | 0 | session_close(con->session); |
2028 | | /* |
2029 | | * When kharon returns, it should not go back - the socket is |
2030 | | * already dead anyway, and soon the connection itself will be |
2031 | | * deleted. More pushes can't come, because after the session is |
2032 | | * closed, its push() method is replaced with a stub. |
2033 | | */ |
2034 | 0 | con->tx.is_push_pending = false; |
2035 | 0 | tx_fiber_init(con->session, 0); |
2036 | 0 | session_run_on_disconnect_triggers(con->session); |
2037 | 0 | } |
2038 | 0 | } |
2039 | | |
2040 | | static void |
2041 | | net_finish_disconnect(struct cmsg *m) |
2042 | 0 | { |
2043 | 0 | struct iproto_connection *con = |
2044 | 0 | container_of(m, struct iproto_connection, disconnect_msg); |
2045 | 0 | iproto_connection_try_to_start_destroy(con); |
2046 | 0 | } |
2047 | | |
2048 | | /** |
2049 | | * Destroy the session object, as well as output buffers of the |
2050 | | * connection. |
2051 | | */ |
2052 | | static void |
2053 | | tx_process_destroy(struct cmsg *m) |
2054 | 0 | { |
2055 | 0 | struct iproto_connection *con = |
2056 | 0 | container_of(m, struct iproto_connection, destroy_msg); |
2057 | 0 | assert(con->state == IPROTO_CONNECTION_DESTROYED); |
2058 | 0 | if (con->session) { |
2059 | 0 | session_delete(con->session); |
2060 | 0 | con->session = NULL; /* safety */ |
2061 | 0 | } |
2062 | | /* |
2063 | | * obuf is being destroyed in tx thread cause it is where |
2064 | | * it was allocated. |
2065 | | */ |
2066 | 0 | obuf_destroy(&con->obuf[0]); |
2067 | 0 | obuf_destroy(&con->obuf[1]); |
2068 | 0 | } |
2069 | | |
2070 | | /** |
2071 | | * Cleanup the net thread resources of a connection |
2072 | | * and close the connection. |
2073 | | */ |
2074 | | static void |
2075 | | net_finish_destroy(struct cmsg *m) |
2076 | 0 | { |
2077 | 0 | struct iproto_connection *con = |
2078 | 0 | container_of(m, struct iproto_connection, destroy_msg); |
2079 | | /* Runs the trigger, which may yield. */ |
2080 | 0 | iproto_connection_delete(con); |
2081 | 0 | } |
2082 | | |
2083 | | /** Account msg data in connection input buffer as processed. */ |
2084 | | static void |
2085 | | iproto_msg_finish_input(iproto_msg *msg) |
2086 | 0 | { |
2087 | 0 | struct iproto_connection *con = msg->connection; |
2088 | 0 | struct ibuf *ibuf = msg->p_ibuf; |
2089 | 0 | size_t *count = &con->input_msg_count[msg->p_ibuf == &con->ibuf[1]]; |
2090 | | /* |
2091 | | * Consume data from input buffer only when data of all messages |
2092 | | * is processed because messages process order and order of messages |
2093 | | * in the buffer may differ. |
2094 | | */ |
2095 | 0 | assert(*count != 0); |
2096 | 0 | if (--(*count) == 0) { |
2097 | 0 | size_t processed = ibuf_used(ibuf); |
2098 | 0 | if (ibuf == con->p_ibuf) { |
2099 | 0 | assert(processed >= con->parse_size); |
2100 | 0 | processed -= con->parse_size; |
2101 | 0 | } |
2102 | 0 | ibuf_consume(ibuf, processed); |
2103 | 0 | } |
2104 | 0 | } |
2105 | | |
2106 | | static void |
2107 | | net_discard_input(struct cmsg *m) |
2108 | 0 | { |
2109 | 0 | struct iproto_msg *msg = container_of(m, struct iproto_msg, |
2110 | 0 | discard_input); |
2111 | 0 | struct iproto_connection *con = msg->connection; |
2112 | 0 | iproto_msg_finish_input(msg); |
2113 | 0 | msg->len = 0; |
2114 | 0 | con->long_poll_count++; |
2115 | 0 | if (con->state == IPROTO_CONNECTION_ALIVE) |
2116 | 0 | iproto_connection_feed_input(con); |
2117 | 0 | } |
2118 | | |
2119 | | static void |
2120 | | tx_discard_input(struct iproto_msg *msg) |
2121 | 0 | { |
2122 | 0 | struct iproto_thread *iproto_thread = msg->connection->iproto_thread; |
2123 | 0 | static const struct cmsg_hop discard_input_route[] = { |
2124 | 0 | { net_discard_input, NULL }, |
2125 | 0 | }; |
2126 | 0 | cmsg_init(&msg->discard_input, discard_input_route); |
2127 | 0 | cpipe_push(&iproto_thread->net_pipe, &msg->discard_input); |
2128 | 0 | } |
2129 | | |
2130 | | /** |
2131 | | * The goal of this function is to maintain the state of |
2132 | | * two rotating connection output buffers in tx thread. |
2133 | | * |
2134 | | * The function enforces the following rules: |
2135 | | * - if both out buffers are empty, any one is selected; |
2136 | | * - if one of the buffers is empty, and the other is |
2137 | | * not, the empty buffer is selected. |
2138 | | * - if neither of the buffers are empty, the function |
2139 | | * does not rotate the buffer. |
2140 | | * |
2141 | | * @param con iproto connection. |
2142 | | * @param wpos Last flushed write position, received from iproto |
2143 | | * thread. |
2144 | | */ |
2145 | | static void |
2146 | | tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos) |
2147 | 0 | { |
2148 | 0 | struct obuf *prev = &con->obuf[con->tx.p_obuf == con->obuf]; |
2149 | 0 | if (wpos->obuf == con->tx.p_obuf) { |
2150 | | /* |
2151 | | * We got a message advancing the buffer which |
2152 | | * is being appended to. The previous buffer is |
2153 | | * guaranteed to have been flushed first, since |
2154 | | * buffers are never flushed out of order. |
2155 | | */ |
2156 | 0 | if (obuf_size(prev) != 0) |
2157 | 0 | obuf_reset(prev); |
2158 | 0 | } |
2159 | 0 | if (obuf_size(con->tx.p_obuf) != 0 && obuf_size(prev) == 0) { |
2160 | | /* |
2161 | | * If the current buffer is not empty, and the |
2162 | | * previous buffer has been flushed, rotate |
2163 | | * the current buffer. |
2164 | | */ |
2165 | 0 | con->tx.p_obuf = prev; |
2166 | 0 | } |
2167 | 0 | } |
2168 | | |
2169 | | /** |
2170 | | * Since the processing of requests within a transaction |
2171 | | * for a stream can occur in different fibers, we store |
2172 | | * a pointer to transaction in the stream structure. |
2173 | | * Check if message belongs to stream and there is active |
2174 | | * transaction for this stream. In case it is so, sets this |
2175 | | * transaction for current fiber. |
2176 | | */ |
2177 | | static inline void |
2178 | | tx_prepare_transaction_for_request(struct iproto_msg *msg) |
2179 | 0 | { |
2180 | 0 | if (msg->stream != NULL && msg->stream->txn != NULL) { |
2181 | 0 | txn_attach(msg->stream->txn); |
2182 | 0 | msg->stream->txn = NULL; |
2183 | 0 | } |
2184 | 0 | assert(!in_txn() || msg->stream != NULL); |
2185 | 0 | } |
2186 | | |
2187 | | static inline struct iproto_msg * |
2188 | | tx_accept_msg(struct cmsg *m) |
2189 | 0 | { |
2190 | 0 | struct iproto_msg *msg = (struct iproto_msg *) m; |
2191 | 0 | if (msg->fiber != NULL) |
2192 | 0 | return msg; |
2193 | 0 | tx_accept_wpos(msg->connection, &msg->wpos); |
2194 | 0 | tx_fiber_init(msg->connection->session, msg->header.sync); |
2195 | 0 | tx_prepare_transaction_for_request(msg); |
2196 | 0 | msg->connection->iproto_thread->tx.requests_in_progress++; |
2197 | 0 | rlist_add_entry(&msg->connection->tx.inprogress, msg, |
2198 | 0 | in_inprogress); |
2199 | 0 | msg->fiber = fiber(); |
2200 | 0 | rmean_collect(msg->connection->iproto_thread->tx.rmean, |
2201 | 0 | REQUESTS_IN_PROGRESS, 1); |
2202 | 0 | flightrec_write_request(msg->reqstart, msg->len); |
2203 | 0 | return msg; |
2204 | 0 | } |
2205 | | |
2206 | | /** |
2207 | | * Check if the watch request key is in the white list which doesn't need |
2208 | | * additional checks. |
2209 | | * The only allowed subscription is to "internal.ballot" event - the one used by |
2210 | | * replication instead of IPROTO_VOTE on Tarantool 2.11+. |
2211 | | */ |
2212 | | static bool |
2213 | | check_watch_key(const char *key, uint32_t len) |
2214 | 0 | { |
2215 | 0 | if (len != strlen(box_ballot_event_key)) |
2216 | 0 | return false; |
2217 | 0 | return strncmp(key, box_ballot_event_key, len) == 0; |
2218 | 0 | } |
2219 | | |
2220 | | /** |
2221 | | * Check if the tx thread may continue with processing an accepted message. |
2222 | | * If something's wrong, returns -1 and sets diag, otherwise returns 0. |
2223 | | */ |
2224 | | static int |
2225 | | tx_check_msg(struct iproto_msg *msg) |
2226 | 0 | { |
2227 | 0 | uint64_t new_schema_version = msg->header.schema_version; |
2228 | 0 | if (new_schema_version != 0 && new_schema_version != schema_version) { |
2229 | 0 | diag_set(ClientError, ER_WRONG_SCHEMA_VERSION, |
2230 | 0 | new_schema_version, schema_version); |
2231 | 0 | return -1; |
2232 | 0 | } |
2233 | 0 | enum iproto_type type = (enum iproto_type)msg->header.type; |
2234 | 0 | if (type != IPROTO_AUTH && type != IPROTO_PING && type != IPROTO_ID && |
2235 | 0 | type != IPROTO_VOTE && type != IPROTO_VOTE_DEPRECATED && |
2236 | 0 | (type != IPROTO_WATCH || |
2237 | 0 | !check_watch_key(msg->watch.key, msg->watch.key_len)) && |
2238 | 0 | security_check_session() != 0) |
2239 | 0 | return -1; |
2240 | 0 | return 0; |
2241 | 0 | } |
2242 | | |
2243 | | static inline void |
2244 | | tx_end_msg(struct iproto_msg *msg, struct obuf_svp *svp) |
2245 | 0 | { |
2246 | 0 | if (msg->stream != NULL) { |
2247 | 0 | assert(msg->stream->txn == NULL); |
2248 | 0 | msg->stream->txn = txn_detach(); |
2249 | 0 | } |
2250 | 0 | msg->connection->iproto_thread->tx.requests_in_progress--; |
2251 | 0 | rlist_del(&msg->in_inprogress); |
2252 | 0 | msg->fiber = NULL; |
2253 | 0 | struct obuf *out = msg->connection->tx.p_obuf; |
2254 | 0 | if (msg->connection->tx.p_obuf->used != svp->used) |
2255 | | /* Log response to the flight recorder. */ |
2256 | 0 | flightrec_write_response(out, svp); |
2257 | 0 | } |
2258 | | |
2259 | | /** |
2260 | | * Write error message to the output buffer and advance write position. |
2261 | | */ |
2262 | | static void |
2263 | | tx_reply_error(struct iproto_msg *msg) |
2264 | 0 | { |
2265 | 0 | struct obuf *out = msg->connection->tx.p_obuf; |
2266 | 0 | iproto_reply_error(out, diag_last_error(&fiber()->diag), |
2267 | 0 | msg->header.sync, ::schema_version); |
2268 | 0 | iproto_wpos_create(&msg->wpos, out); |
2269 | 0 | } |
2270 | | |
2271 | | /** |
2272 | | * Write error from iproto thread to the output buffer and advance |
2273 | | * write position. |
2274 | | */ |
2275 | | static void |
2276 | | tx_reply_iproto_error(struct cmsg *m) |
2277 | 0 | { |
2278 | 0 | struct iproto_msg *msg = tx_accept_msg(m); |
2279 | 0 | struct obuf *out = msg->connection->tx.p_obuf; |
2280 | 0 | struct obuf_svp header = obuf_create_svp(out); |
2281 | 0 | iproto_reply_error(out, diag_last_error(&msg->diag), |
2282 | 0 | msg->header.sync, ::schema_version); |
2283 | 0 | iproto_wpos_create(&msg->wpos, out); |
2284 | 0 | tx_end_msg(msg, &header); |
2285 | 0 | } |
2286 | | |
2287 | | /** Inject a short delay on tx request processing for testing. */ |
2288 | | static inline void |
2289 | | tx_inject_delay(void) |
2290 | 0 | { |
2291 | 0 | ERROR_INJECT(ERRINJ_IPROTO_TX_DELAY, { |
2292 | 0 | if (rand() % 100 < 10) |
2293 | 0 | fiber_sleep(0.001); |
2294 | 0 | }); |
2295 | 0 | } |
2296 | | |
2297 | | static void |
2298 | | tx_process_begin(struct cmsg *m) |
2299 | 0 | { |
2300 | 0 | struct iproto_msg *msg = tx_accept_msg(m); |
2301 | 0 | struct obuf *out; |
2302 | 0 | struct obuf_svp header; |
2303 | 0 | uint32_t txn_isolation = msg->begin.txn_isolation; |
2304 | 0 | bool is_sync = msg->begin.is_sync; |
2305 | |
|
2306 | 0 | if (tx_check_msg(msg) != 0) |
2307 | 0 | goto error; |
2308 | | |
2309 | 0 | if (box_txn_begin() != 0) |
2310 | 0 | goto error; |
2311 | | |
2312 | 0 | if (msg->begin.timeout != 0 && |
2313 | 0 | box_txn_set_timeout(msg->begin.timeout) != 0) { |
2314 | 0 | int rc = box_txn_rollback(); |
2315 | 0 | assert(rc == 0); |
2316 | 0 | (void)rc; |
2317 | 0 | goto error; |
2318 | 0 | } |
2319 | 0 | if (box_txn_set_isolation(txn_isolation) != 0) { |
2320 | 0 | int rc = box_txn_rollback(); |
2321 | 0 | assert(rc == 0); |
2322 | 0 | (void)rc; |
2323 | 0 | goto error; |
2324 | 0 | } |
2325 | 0 | if (is_sync) |
2326 | 0 | box_txn_make_sync(); |
2327 | |
|
2328 | 0 | out = msg->connection->tx.p_obuf; |
2329 | 0 | header = obuf_create_svp(out); |
2330 | 0 | iproto_reply_ok(out, msg->header.sync, ::schema_version); |
2331 | 0 | iproto_wpos_create(&msg->wpos, out); |
2332 | 0 | tx_end_msg(msg, &header); |
2333 | 0 | return; |
2334 | 0 | error: |
2335 | 0 | out = msg->connection->tx.p_obuf; |
2336 | 0 | header = obuf_create_svp(out); |
2337 | 0 | tx_reply_error(msg); |
2338 | 0 | tx_end_msg(msg, &header); |
2339 | 0 | } |
2340 | | |
2341 | | static void |
2342 | | tx_process_commit(struct cmsg *m) |
2343 | 0 | { |
2344 | 0 | struct iproto_msg *msg = tx_accept_msg(m); |
2345 | 0 | struct obuf *out; |
2346 | 0 | struct obuf_svp header; |
2347 | 0 | bool is_sync = msg->commit.is_sync; |
2348 | |
|
2349 | 0 | if (tx_check_msg(msg) != 0) |
2350 | 0 | goto error; |
2351 | | |
2352 | 0 | if (is_sync) |
2353 | 0 | box_txn_make_sync(); |
2354 | |
|
2355 | 0 | if (box_txn_commit() != 0) |
2356 | 0 | goto error; |
2357 | | |
2358 | 0 | out = msg->connection->tx.p_obuf; |
2359 | 0 | header = obuf_create_svp(out); |
2360 | 0 | iproto_reply_ok(out, msg->header.sync, ::schema_version); |
2361 | 0 | iproto_wpos_create(&msg->wpos, out); |
2362 | 0 | tx_end_msg(msg, &header); |
2363 | 0 | return; |
2364 | 0 | error: |
2365 | 0 | out = msg->connection->tx.p_obuf; |
2366 | 0 | header = obuf_create_svp(out); |
2367 | 0 | tx_reply_error(msg); |
2368 | 0 | tx_end_msg(msg, &header); |
2369 | 0 | } |
2370 | | |
2371 | | static void |
2372 | | tx_process_rollback(struct cmsg *m) |
2373 | 0 | { |
2374 | 0 | struct iproto_msg *msg = tx_accept_msg(m); |
2375 | 0 | struct obuf *out; |
2376 | 0 | struct obuf_svp header; |
2377 | |
|
2378 | 0 | if (tx_check_msg(msg) != 0) |
2379 | 0 | goto error; |
2380 | | |
2381 | 0 | if (box_txn_rollback() != 0) |
2382 | 0 | goto error; |
2383 | | |
2384 | 0 | out = msg->connection->tx.p_obuf; |
2385 | 0 | header = obuf_create_svp(out); |
2386 | 0 | iproto_reply_ok(out, msg->header.sync, ::schema_version); |
2387 | 0 | iproto_wpos_create(&msg->wpos, out); |
2388 | 0 | tx_end_msg(msg, &header); |
2389 | 0 | return; |
2390 | 0 | error: |
2391 | 0 | out = msg->connection->tx.p_obuf; |
2392 | 0 | header = obuf_create_svp(out); |
2393 | 0 | tx_reply_error(msg); |
2394 | 0 | tx_end_msg(msg, &header); |
2395 | 0 | } |
2396 | | |
2397 | | /* |
2398 | | * In case the request does not contain a space or identifier but contains a |
2399 | | * corresponding name, tries to resolve the name. |
2400 | | */ |
2401 | | static int |
2402 | | tx_resolve_space_and_index_name(struct request *dml) |
2403 | 0 | { |
2404 | 0 | struct space *space = NULL; |
2405 | 0 | if (dml->space_name != NULL) { |
2406 | 0 | space = space_by_name(dml->space_name, dml->space_name_len); |
2407 | 0 | if (space == NULL) { |
2408 | 0 | diag_set(ClientError, ER_NO_SUCH_SPACE, |
2409 | 0 | tt_cstr(dml->space_name, dml->space_name_len)); |
2410 | 0 | return -1; |
2411 | 0 | } |
2412 | 0 | dml->space_id = space->def->id; |
2413 | 0 | } |
2414 | 0 | if ((dml->type == IPROTO_SELECT || dml->type == IPROTO_UPDATE || |
2415 | 0 | dml->type == IPROTO_DELETE) && dml->index_name != NULL) { |
2416 | 0 | if (space == NULL) |
2417 | 0 | space = space_cache_find(dml->space_id); |
2418 | 0 | if (space == NULL) |
2419 | 0 | return -1; |
2420 | 0 | struct index *idx = space_index_by_name(space, dml->index_name, |
2421 | 0 | dml->index_name_len); |
2422 | 0 | if (idx == NULL) { |
2423 | 0 | diag_set(ClientError, ER_NO_SUCH_INDEX_NAME, |
2424 | 0 | tt_cstr(dml->index_name, dml->index_name_len), |
2425 | 0 | space->def->name); |
2426 | 0 | return -1; |
2427 | 0 | } |
2428 | 0 | dml->index_id = idx->dense_id; |
2429 | 0 | } |
2430 | 0 | return 0; |
2431 | 0 | } |
2432 | | |
2433 | | static void |
2434 | | tx_process1(struct cmsg *m) |
2435 | 0 | { |
2436 | 0 | struct iproto_msg *msg = tx_accept_msg(m); |
2437 | 0 | bool box_tuple_as_ext = |
2438 | 0 | iproto_features_test(&msg->connection->session->meta.features, |
2439 | 0 | IPROTO_FEATURE_DML_TUPLE_EXTENSION); |
2440 | 0 | struct tuple_format_map format_map; |
2441 | 0 | tuple_format_map_create_empty(&format_map); |
2442 | 0 | auto format_map_guard = make_scoped_guard([&format_map] { |
2443 | 0 | tuple_format_map_destroy(&format_map); |
2444 | 0 | }); |
2445 | 0 | if (tx_check_msg(msg) != 0) |
2446 | 0 | goto error; |
2447 | | |
2448 | 0 | struct tuple *tuple; |
2449 | 0 | struct obuf_svp svp; |
2450 | 0 | struct obuf *out; |
2451 | 0 | tx_inject_delay(); |
2452 | 0 | if (tx_resolve_space_and_index_name(&msg->dml) != 0) |
2453 | 0 | goto error; |
2454 | 0 | if (box_process1(&msg->dml, &tuple) != 0) |
2455 | 0 | goto error; |
2456 | 0 | out = msg->connection->tx.p_obuf; |
2457 | 0 | iproto_prepare_select(out, &svp); |
2458 | 0 | if (tuple != NULL) { |
2459 | 0 | if (box_tuple_as_ext) { |
2460 | 0 | tuple_format_map_add_format(&format_map, |
2461 | 0 | tuple->format_id); |
2462 | 0 | if (tuple_to_obuf_as_ext(tuple, out) != 0) |
2463 | 0 | goto error; |
2464 | 0 | } else if (tuple_to_obuf(tuple, out) != 0) { |
2465 | 0 | goto error; |
2466 | 0 | } |
2467 | 0 | } |
2468 | | /* |
2469 | | * Even if there is no tuple, we still need to send an empty tuple |
2470 | | * format map. |
2471 | | */ |
2472 | 0 | if (box_tuple_as_ext && |
2473 | 0 | tuple_format_map_to_iproto_obuf(&format_map, out) != 0) |
2474 | 0 | goto error; |
2475 | 0 | iproto_reply_select(out, &svp, msg->header.sync, ::schema_version, |
2476 | 0 | tuple != 0, box_tuple_as_ext); |
2477 | 0 | iproto_wpos_create(&msg->wpos, out); |
2478 | 0 | tx_end_msg(msg, &svp); |
2479 | 0 | return; |
2480 | 0 | error: |
2481 | 0 | out = msg->connection->tx.p_obuf; |
2482 | 0 | svp = obuf_create_svp(out); |
2483 | 0 | tx_reply_error(msg); |
2484 | 0 | tx_end_msg(msg, &svp); |
2485 | 0 | } |
2486 | | |
2487 | | static void |
2488 | | tx_process_select(struct cmsg *m) |
2489 | 0 | { |
2490 | 0 | struct iproto_msg *msg = tx_accept_msg(m); |
2491 | 0 | bool box_tuple_as_ext = |
2492 | 0 | iproto_features_test(&msg->connection->session->meta.features, |
2493 | 0 | IPROTO_FEATURE_DML_TUPLE_EXTENSION); |
2494 | 0 | struct obuf *out; |
2495 | 0 | struct obuf_svp svp; |
2496 | 0 | struct port port; |
2497 | |
|
2498 | 0 | struct mp_box_ctx ctx; |
2499 | 0 | struct mp_ctx *ctx_ref = NULL; |
2500 | 0 | if (box_tuple_as_ext) { |
2501 | 0 | mp_box_ctx_create(&ctx, NULL, NULL); |
2502 | 0 | ctx_ref = (struct mp_ctx *)&ctx; |
2503 | 0 | } |
2504 | 0 | auto ctx_guard = make_scoped_guard([ctx_ref] { |
2505 | 0 | mp_ctx_destroy(ctx_ref); |
2506 | 0 | }); |
2507 | 0 | ctx_guard.is_active = box_tuple_as_ext; |
2508 | |
|
2509 | 0 | int count; |
2510 | 0 | int rc; |
2511 | 0 | const char *packed_pos, *packed_pos_end; |
2512 | 0 | bool reply_position; |
2513 | 0 | struct request *req = &msg->dml; |
2514 | 0 | uint32_t region_svp = region_used(&fiber()->gc); |
2515 | 0 | if (tx_check_msg(msg) != 0) |
2516 | 0 | goto error; |
2517 | | |
2518 | 0 | tx_inject_delay(); |
2519 | 0 | if (tx_resolve_space_and_index_name(&msg->dml) != 0) |
2520 | 0 | goto error; |
2521 | 0 | packed_pos = req->after_position; |
2522 | 0 | packed_pos_end = req->after_position_end; |
2523 | 0 | if (packed_pos != NULL) { |
2524 | 0 | mp_decode_strl(&packed_pos); |
2525 | 0 | } else if (req->after_tuple != NULL) { |
2526 | 0 | rc = box_index_tuple_position(req->space_id, req->index_id, |
2527 | 0 | req->after_tuple, |
2528 | 0 | req->after_tuple_end, |
2529 | 0 | &packed_pos, &packed_pos_end); |
2530 | 0 | if (rc < 0) |
2531 | 0 | goto error; |
2532 | 0 | } |
2533 | 0 | rc = box_select(req->space_id, req->index_id, |
2534 | 0 | req->iterator, req->offset, req->limit, |
2535 | 0 | req->key, req->key_end, &packed_pos, &packed_pos_end, |
2536 | 0 | req->fetch_position, &port); |
2537 | 0 | if (rc < 0) |
2538 | 0 | goto error; |
2539 | | |
2540 | 0 | out = msg->connection->tx.p_obuf; |
2541 | 0 | reply_position = req->fetch_position && packed_pos != NULL; |
2542 | 0 | if (reply_position) |
2543 | 0 | iproto_prepare_select_with_position(out, &svp); |
2544 | 0 | else |
2545 | 0 | iproto_prepare_select(out, &svp); |
2546 | | /* |
2547 | | * SELECT output format has not changed since Tarantool 1.6 |
2548 | | */ |
2549 | 0 | count = port_dump_msgpack_16_with_ctx(&port, out, ctx_ref); |
2550 | 0 | port_destroy(&port); |
2551 | 0 | if (count < 0 || (box_tuple_as_ext && |
2552 | 0 | tuple_format_map_to_iproto_obuf(&ctx.tuple_format_map, |
2553 | 0 | out) != 0)) { |
2554 | 0 | goto discard; |
2555 | 0 | } |
2556 | 0 | if (reply_position) { |
2557 | 0 | assert(packed_pos != NULL); |
2558 | 0 | iproto_reply_select_with_position(out, &svp, msg->header.sync, |
2559 | 0 | ::schema_version, count, |
2560 | 0 | packed_pos, packed_pos_end, |
2561 | 0 | box_tuple_as_ext); |
2562 | 0 | } else { |
2563 | 0 | iproto_reply_select(out, &svp, msg->header.sync, |
2564 | 0 | ::schema_version, count, box_tuple_as_ext); |
2565 | 0 | } |
2566 | 0 | region_truncate(&fiber()->gc, region_svp); |
2567 | 0 | iproto_wpos_create(&msg->wpos, out); |
2568 | 0 | tx_end_msg(msg, &svp); |
2569 | 0 | return; |
2570 | 0 | discard: |
2571 | | /* Discard the prepared select. */ |
2572 | 0 | obuf_rollback_to_svp(out, &svp); |
2573 | 0 | error: |
2574 | 0 | region_truncate(&fiber()->gc, region_svp); |
2575 | 0 | out = msg->connection->tx.p_obuf; |
2576 | 0 | svp = obuf_create_svp(out); |
2577 | 0 | tx_reply_error(msg); |
2578 | 0 | tx_end_msg(msg, &svp); |
2579 | 0 | } |
2580 | | |
2581 | | static int |
2582 | | tx_process_call_on_yield(struct trigger *trigger, void *event) |
2583 | 0 | { |
2584 | 0 | (void)event; |
2585 | 0 | struct iproto_msg *msg = (struct iproto_msg *)trigger->data; |
2586 | 0 | TRASH(&msg->call); |
2587 | 0 | tx_discard_input(msg); |
2588 | 0 | trigger_clear(trigger); |
2589 | 0 | return 0; |
2590 | 0 | } |
2591 | | |
2592 | | static void |
2593 | | tx_process_call(struct cmsg *m) |
2594 | 0 | { |
2595 | 0 | struct iproto_msg *msg = tx_accept_msg(m); |
2596 | |
|
2597 | 0 | bool box_tuple_as_ext = |
2598 | 0 | iproto_features_test(&msg->connection->session->meta.features, |
2599 | 0 | IPROTO_FEATURE_CALL_RET_TUPLE_EXTENSION); |
2600 | 0 | struct mp_box_ctx ctx; |
2601 | 0 | struct mp_ctx *ctx_ref = NULL; |
2602 | 0 | if (box_tuple_as_ext) { |
2603 | 0 | mp_box_ctx_create(&ctx, NULL, NULL); |
2604 | 0 | ctx_ref = (struct mp_ctx *)&ctx; |
2605 | 0 | } |
2606 | 0 | auto ctx_guard = make_scoped_guard([ctx_ref] { |
2607 | 0 | mp_ctx_destroy(ctx_ref); |
2608 | 0 | }); |
2609 | 0 | ctx_guard.is_active = box_tuple_as_ext; |
2610 | |
|
2611 | 0 | if (tx_check_msg(msg) != 0) |
2612 | 0 | goto error; |
2613 | | |
2614 | | /* |
2615 | | * CALL/EVAL should copy its arguments so we can discard |
2616 | | * input on yield to avoid stalling other connections by |
2617 | | * a long polling request. |
2618 | | */ |
2619 | 0 | struct trigger fiber_on_yield; |
2620 | 0 | trigger_create(&fiber_on_yield, tx_process_call_on_yield, msg, NULL); |
2621 | 0 | trigger_add(&fiber()->on_yield, &fiber_on_yield); |
2622 | |
|
2623 | 0 | int rc; |
2624 | 0 | struct port port; |
2625 | |
|
2626 | 0 | switch (msg->header.type) { |
2627 | 0 | case IPROTO_CALL: |
2628 | 0 | case IPROTO_CALL_16: |
2629 | 0 | rc = box_process_call(&msg->call, &port); |
2630 | 0 | break; |
2631 | 0 | case IPROTO_EVAL: |
2632 | 0 | rc = box_process_eval(&msg->call, &port); |
2633 | 0 | break; |
2634 | 0 | default: |
2635 | 0 | unreachable(); |
2636 | 0 | } |
2637 | | |
2638 | 0 | trigger_clear(&fiber_on_yield); |
2639 | |
|
2640 | 0 | if (rc != 0) |
2641 | 0 | goto error; |
2642 | | |
2643 | 0 | if (in_txn() != NULL && msg->header.stream_id == 0) { |
2644 | 0 | diag_set(ClientError, ER_FUNCTION_TX_ACTIVE); |
2645 | 0 | port_destroy(&port); |
2646 | 0 | goto error; |
2647 | 0 | } |
2648 | | |
2649 | | /* |
2650 | | * Add all elements returned by the function to iproto. |
2651 | | * |
2652 | | * To allow clients to understand a complex return from |
2653 | | * a procedure, we are compatible with SELECT protocol, |
2654 | | * and return the number of return values first, and |
2655 | | * then each return value as a tuple. |
2656 | | * |
2657 | | * (!) Please note that a save point for output buffer |
2658 | | * must be taken only after finishing executing of Lua |
2659 | | * function because Lua can yield and leave the |
2660 | | * buffer in inconsistent state (a parallel request |
2661 | | * from the same connection will break the protocol). |
2662 | | */ |
2663 | | |
2664 | 0 | int count; |
2665 | 0 | struct obuf *out; |
2666 | 0 | struct obuf_svp svp; |
2667 | |
|
2668 | 0 | out = msg->connection->tx.p_obuf; |
2669 | 0 | iproto_prepare_select(out, &svp); |
2670 | |
|
2671 | 0 | if (msg->header.type == IPROTO_CALL_16) |
2672 | 0 | count = port_dump_msgpack_16_with_ctx(&port, out, ctx_ref); |
2673 | 0 | else |
2674 | 0 | count = port_dump_msgpack_with_ctx(&port, out, ctx_ref); |
2675 | |
|
2676 | 0 | port_destroy(&port); |
2677 | 0 | if (count < 0 || (box_tuple_as_ext && |
2678 | 0 | tuple_format_map_to_iproto_obuf(&ctx.tuple_format_map, |
2679 | 0 | out) != 0)) { |
2680 | 0 | obuf_rollback_to_svp(out, &svp); |
2681 | 0 | goto error; |
2682 | 0 | } |
2683 | 0 | iproto_reply_select(out, &svp, msg->header.sync, |
2684 | 0 | ::schema_version, count, box_tuple_as_ext); |
2685 | 0 | iproto_wpos_create(&msg->wpos, out); |
2686 | 0 | tx_end_msg(msg, &svp); |
2687 | 0 | return; |
2688 | 0 | error: |
2689 | 0 | out = msg->connection->tx.p_obuf; |
2690 | 0 | svp = obuf_create_svp(out); |
2691 | 0 | tx_reply_error(msg); |
2692 | 0 | tx_end_msg(msg, &svp); |
2693 | 0 | } |
2694 | | |
2695 | | static void |
2696 | | tx_process_id(struct iproto_connection *con, const struct id_request *id) |
2697 | 0 | { |
2698 | 0 | extern bool box_tuple_extension; |
2699 | 0 | con->session->meta.features = id->features; |
2700 | 0 | if (!box_tuple_extension) |
2701 | 0 | iproto_features_clear(&con->session->meta.features, |
2702 | 0 | IPROTO_FEATURE_CALL_RET_TUPLE_EXTENSION); |
2703 | 0 | } |
2704 | | |
2705 | | /** Callback passed to session_watch. */ |
2706 | | static void |
2707 | | iproto_session_notify(struct session *session, uint64_t sync, |
2708 | | const char *key, size_t key_len, |
2709 | | const char *data, const char *data_end); |
2710 | | |
2711 | | static void |
2712 | | tx_process_misc(struct cmsg *m) |
2713 | 0 | { |
2714 | 0 | struct iproto_msg *msg = tx_accept_msg(m); |
2715 | 0 | struct iproto_connection *con = msg->connection; |
2716 | 0 | struct obuf *out = con->tx.p_obuf; |
2717 | 0 | struct obuf_svp header; |
2718 | 0 | assert(!(msg->header.type != IPROTO_PING && in_txn())); |
2719 | 0 | if (tx_check_msg(msg) != 0) |
2720 | 0 | goto error; |
2721 | | |
2722 | 0 | struct ballot ballot; |
2723 | 0 | header = obuf_create_svp(out); |
2724 | 0 | switch (msg->header.type) { |
2725 | 0 | case IPROTO_AUTH: |
2726 | 0 | if (box_process_auth(&msg->auth, con->salt, |
2727 | 0 | IPROTO_SALT_SIZE) != 0) |
2728 | 0 | goto error; |
2729 | 0 | iproto_reply_ok(out, msg->header.sync, ::schema_version); |
2730 | 0 | break; |
2731 | 0 | case IPROTO_PING: |
2732 | 0 | iproto_reply_ok(out, msg->header.sync, ::schema_version); |
2733 | 0 | break; |
2734 | 0 | case IPROTO_ID: |
2735 | 0 | tx_process_id(con, &msg->id); |
2736 | 0 | iproto_reply_id(out, box_auth_type, msg->header.sync, |
2737 | 0 | ::schema_version); |
2738 | 0 | break; |
2739 | 0 | case IPROTO_VOTE_DEPRECATED: |
2740 | 0 | iproto_reply_vclock(out, &replicaset.vclock, msg->header.sync, |
2741 | 0 | ::schema_version); |
2742 | 0 | break; |
2743 | 0 | case IPROTO_VOTE: |
2744 | 0 | box_process_vote(&ballot); |
2745 | 0 | iproto_reply_vote(out, &ballot, msg->header.sync, |
2746 | 0 | ::schema_version); |
2747 | 0 | break; |
2748 | 0 | case IPROTO_WATCH: |
2749 | 0 | session_watch(con->session, msg->header.sync, |
2750 | 0 | msg->watch.key, msg->watch.key_len, |
2751 | 0 | iproto_session_notify); |
2752 | | /* Sic: no reply. */ |
2753 | 0 | break; |
2754 | 0 | case IPROTO_UNWATCH: |
2755 | 0 | session_unwatch(con->session, msg->watch.key, |
2756 | 0 | msg->watch.key_len); |
2757 | | /* Sic: no reply. */ |
2758 | 0 | break; |
2759 | 0 | case IPROTO_WATCH_ONCE: { |
2760 | 0 | const char *data, *data_end; |
2761 | 0 | data = box_watch_once(msg->watch.key, msg->watch.key_len, |
2762 | 0 | &data_end); |
2763 | 0 | iproto_prepare_select(out, &header); |
2764 | 0 | xobuf_dup(out, data, data_end - data); |
2765 | 0 | iproto_reply_select(out, &header, msg->header.sync, |
2766 | 0 | ::schema_version, data != NULL ? 1 : 0, |
2767 | 0 | /*box_tuple_as_ext=*/false); |
2768 | 0 | break; |
2769 | 0 | } |
2770 | 0 | default: |
2771 | 0 | unreachable(); |
2772 | 0 | } |
2773 | 0 | iproto_wpos_create(&msg->wpos, out); |
2774 | 0 | tx_end_msg(msg, &header); |
2775 | 0 | return; |
2776 | 0 | error: |
2777 | 0 | header = obuf_create_svp(out); |
2778 | 0 | tx_reply_error(msg); |
2779 | 0 | tx_end_msg(msg, &header); |
2780 | 0 | } |
2781 | | |
2782 | | static void |
2783 | | tx_process_sql(struct cmsg *m) |
2784 | 0 | { |
2785 | 0 | struct iproto_msg *msg = tx_accept_msg(m); |
2786 | 0 | struct obuf *out; |
2787 | 0 | struct port port; |
2788 | 0 | RegionGuard region_guard(&fiber()->gc); |
2789 | |
|
2790 | 0 | if (tx_check_msg(msg) != 0) |
2791 | 0 | goto error; |
2792 | 0 | assert(msg->header.type == IPROTO_EXECUTE || |
2793 | 0 | msg->header.type == IPROTO_PREPARE); |
2794 | 0 | tx_inject_delay(); |
2795 | 0 | if (box_process_sql(&msg->sql, &port) != 0) |
2796 | 0 | goto error; |
2797 | | /* |
2798 | | * Take an obuf only after execute(). Else the buffer can |
2799 | | * become out of date during yield. |
2800 | | */ |
2801 | 0 | out = msg->connection->tx.p_obuf; |
2802 | 0 | struct obuf_svp header_svp; |
2803 | 0 | iproto_prepare_header(out, &header_svp, IPROTO_HEADER_LEN); |
2804 | 0 | if (port_dump_msgpack(&port, out) != 0) { |
2805 | 0 | port_destroy(&port); |
2806 | 0 | obuf_rollback_to_svp(out, &header_svp); |
2807 | 0 | goto error; |
2808 | 0 | } |
2809 | 0 | port_destroy(&port); |
2810 | 0 | iproto_reply_sql(out, &header_svp, msg->header.sync, schema_version); |
2811 | 0 | iproto_wpos_create(&msg->wpos, out); |
2812 | 0 | tx_end_msg(msg, &header_svp); |
2813 | 0 | return; |
2814 | 0 | error: |
2815 | 0 | out = msg->connection->tx.p_obuf; |
2816 | 0 | header_svp = obuf_create_svp(out); |
2817 | 0 | tx_reply_error(msg); |
2818 | 0 | tx_end_msg(msg, &header_svp); |
2819 | 0 | } |
2820 | | |
2821 | | static void |
2822 | | tx_process_replication(struct cmsg *m) |
2823 | 0 | { |
2824 | 0 | struct iproto_msg *msg = tx_accept_msg(m); |
2825 | 0 | struct iproto_connection *con = msg->connection; |
2826 | 0 | struct iostream *io = &con->io; |
2827 | 0 | assert(!in_txn()); |
2828 | 0 | try { |
2829 | 0 | if (tx_check_msg(msg) != 0) |
2830 | 0 | diag_raise(); |
2831 | 0 | switch (msg->header.type) { |
2832 | 0 | case IPROTO_JOIN: |
2833 | | /* |
2834 | | * As soon as box_process_subscribe() returns |
2835 | | * the lambda in the beginning of the block |
2836 | | * will re-activate the watchers for us. |
2837 | | */ |
2838 | 0 | box_process_join(io, &msg->header); |
2839 | 0 | break; |
2840 | 0 | case IPROTO_FETCH_SNAPSHOT: |
2841 | 0 | box_process_fetch_snapshot(io, &msg->header); |
2842 | 0 | break; |
2843 | 0 | case IPROTO_REGISTER: |
2844 | 0 | box_process_register(io, &msg->header); |
2845 | 0 | break; |
2846 | 0 | case IPROTO_SUBSCRIBE: |
2847 | | /* |
2848 | | * Subscribe never returns - unless there |
2849 | | * is an error/exception. In that case |
2850 | | * the write watcher will be re-activated |
2851 | | * the same way as for JOIN. |
2852 | | */ |
2853 | 0 | box_process_subscribe(io, &msg->header); |
2854 | 0 | break; |
2855 | 0 | default: |
2856 | 0 | unreachable(); |
2857 | 0 | } |
2858 | 0 | } catch (SocketError *e) { |
2859 | | /* don't write error response to prevent SIGPIPE */ |
2860 | 0 | } catch (TimedOut *e) { |
2861 | | /* |
2862 | | * In case of a timeout the error could come after a partially |
2863 | | * written row. Do not push it on top. |
2864 | | */ |
2865 | 0 | } catch (FiberIsCancelled *e) { |
2866 | | /* Do not write into connection on connection drop. */ |
2867 | 0 | } catch (Exception *e) { |
2868 | 0 | iproto_write_error(io, e, ::schema_version, msg->header.sync); |
2869 | 0 | } |
2870 | 0 | struct obuf_svp empty = obuf_create_svp(msg->connection->tx.p_obuf); |
2871 | 0 | tx_end_msg(msg, &empty); |
2872 | 0 | } |
2873 | | |
2874 | | /** |
2875 | | * Allocates a new `iproto_req_handlers'. The memory is set to zero. |
2876 | | */ |
2877 | | static struct iproto_req_handlers * |
2878 | | iproto_req_handlers_new(void) |
2879 | 0 | { |
2880 | 0 | struct iproto_req_handlers *handlers; |
2881 | 0 | handlers = (struct iproto_req_handlers *)xmalloc(sizeof(*handlers)); |
2882 | 0 | memset(handlers, 0, sizeof(*handlers)); |
2883 | 0 | return handlers; |
2884 | 0 | } |
2885 | | |
2886 | | /** |
2887 | | * Destroys all handlers and deallocates the `handlers' structure. |
2888 | | */ |
2889 | | static void |
2890 | | iproto_req_handlers_delete(struct iproto_req_handlers *handlers) |
2891 | 0 | { |
2892 | 0 | if (handlers->event_by_id != NULL) |
2893 | 0 | event_unref(handlers->event_by_id); |
2894 | 0 | if (handlers->event_by_name != NULL) |
2895 | 0 | event_unref(handlers->event_by_name); |
2896 | 0 | if (handlers->c.destroy != NULL) |
2897 | 0 | handlers->c.destroy(handlers->c.ctx); |
2898 | 0 | TRASH(handlers); |
2899 | 0 | free(handlers); |
2900 | 0 | } |
2901 | | |
2902 | | /** |
2903 | | * Inserts `handlers' for the given `req_type' into the `tx_req_handlers' table. |
2904 | | * There must be no previous entries in the table for this key. |
2905 | | */ |
2906 | | static void |
2907 | | mh_req_handlers_put(uint32_t req_type, struct iproto_req_handlers *handlers) |
2908 | 0 | { |
2909 | 0 | struct mh_i32ptr_node_t old; |
2910 | 0 | struct mh_i32ptr_node_t *replaced = &old; |
2911 | 0 | struct mh_i32ptr_node_t node = { |
2912 | 0 | /* .key = */ req_type, |
2913 | 0 | /* .val = */ handlers, |
2914 | 0 | }; |
2915 | 0 | mh_i32ptr_put(tx_req_handlers, &node, &replaced, NULL); |
2916 | 0 | assert(replaced == NULL); |
2917 | 0 | } |
2918 | | |
2919 | | /** |
2920 | | * Returns a pointer to `iproto_req_handlers' for the given IPROTO request |
2921 | | * `req_type', or NULL if there are no such handlers. |
2922 | | */ |
2923 | | static struct iproto_req_handlers * |
2924 | | mh_req_handlers_get(uint32_t req_type) |
2925 | 0 | { |
2926 | 0 | mh_int_t k = mh_i32ptr_find(tx_req_handlers, req_type, NULL); |
2927 | 0 | if (k == mh_end(tx_req_handlers)) |
2928 | 0 | return NULL; |
2929 | 0 | struct mh_i32ptr_node_t *node = mh_i32ptr_node(tx_req_handlers, k); |
2930 | 0 | return (struct iproto_req_handlers *)node->val; |
2931 | 0 | } |
2932 | | |
2933 | | /** |
2934 | | * Deletes the handlers of IPROTO request `req_type' from the `tx_req_handlers' |
2935 | | * hash table. The entry must be present in the table. |
2936 | | */ |
2937 | | static void |
2938 | | mh_req_handlers_del(uint32_t req_type) |
2939 | 0 | { |
2940 | 0 | mh_int_t k = mh_i32ptr_find(tx_req_handlers, req_type, NULL); |
2941 | 0 | assert(k != mh_end(tx_req_handlers)); |
2942 | 0 | mh_i32ptr_del(tx_req_handlers, k, NULL); |
2943 | 0 | } |
2944 | | |
2945 | | /** |
2946 | | * Replaces an event in `handlers' by the new `event'. If `is_by_id', the |
2947 | | * handler is set by request type id, otherwise it is set by request type name. |
2948 | | */ |
2949 | | static void |
2950 | | iproto_req_handlers_set_event(struct iproto_req_handlers *handlers, |
2951 | | struct event *event, bool is_by_id) |
2952 | 0 | { |
2953 | 0 | assert(handlers != NULL); |
2954 | 0 | assert(event != NULL); |
2955 | | |
2956 | 0 | if (is_by_id) { |
2957 | 0 | if (handlers->event_by_id == NULL) { |
2958 | 0 | event_ref(event); |
2959 | 0 | handlers->event_by_id = event; |
2960 | 0 | } else { |
2961 | 0 | assert(handlers->event_by_id == event); |
2962 | 0 | } |
2963 | 0 | } else { |
2964 | 0 | if (handlers->event_by_name == NULL) { |
2965 | 0 | event_ref(event); |
2966 | 0 | handlers->event_by_name = event; |
2967 | 0 | } else { |
2968 | 0 | assert(handlers->event_by_name == event); |
2969 | 0 | } |
2970 | 0 | } |
2971 | 0 | } |
2972 | | |
2973 | | /** |
2974 | | * Deletes an event, which is set in `handlers' by request type id (if |
2975 | | * `is_by_id'), or by request type name. |
2976 | | */ |
2977 | | static void |
2978 | | iproto_req_handlers_del_event(struct iproto_req_handlers *handlers, |
2979 | | bool is_by_id) |
2980 | 0 | { |
2981 | 0 | assert(handlers != NULL); |
2982 | | |
2983 | 0 | if (is_by_id) { |
2984 | 0 | event_unref(handlers->event_by_id); |
2985 | 0 | handlers->event_by_id = NULL; |
2986 | 0 | } else { |
2987 | 0 | event_unref(handlers->event_by_name); |
2988 | 0 | handlers->event_by_name = NULL; |
2989 | 0 | } |
2990 | 0 | } |
2991 | | |
2992 | | /** |
2993 | | * Returns `true' if there is at least one handler in `handlers'. |
2994 | | */ |
2995 | | static bool |
2996 | | iproto_req_handler_is_set(struct iproto_req_handlers *handlers) |
2997 | 0 | { |
2998 | 0 | if (handlers == NULL) |
2999 | 0 | return false; |
3000 | | |
3001 | 0 | return handlers->event_by_id != NULL || |
3002 | 0 | handlers->event_by_name != NULL || |
3003 | 0 | handlers->c.cb != NULL; |
3004 | 0 | } |
3005 | | |
3006 | | /** |
3007 | | * Returns `enum iproto_type' if `name' is a valid IPROTO type name or equals |
3008 | | * "unknown". Otherwise, iproto_type_MAX is returned. The name is expected to |
3009 | | * be in lowercase. |
3010 | | */ |
3011 | | static enum iproto_type |
3012 | | get_iproto_type_by_name(const char *name) |
3013 | 0 | { |
3014 | 0 | for (uint32_t i = 0; i < iproto_type_MAX; i++) { |
3015 | 0 | const char *type_name = iproto_type_name_lower(i); |
3016 | 0 | if (type_name != NULL && strcmp(type_name, name) == 0) |
3017 | 0 | return (enum iproto_type)i; |
3018 | 0 | } |
3019 | 0 | if (strcmp(name, "unknown") == 0) |
3020 | 0 | return IPROTO_UNKNOWN; |
3021 | 0 | return iproto_type_MAX; |
3022 | 0 | } |
3023 | | |
3024 | | /** |
3025 | | * Runs triggers registered for the `event'. |
3026 | | * The given header and body the IPROTO packet are passed as trigger args. |
3027 | | * Returns IPROTO_HANDLER_OK if some trigger successfully handled the request, |
3028 | | * IPROTO_HANDLER_FALLBACK if no triggers handled the request, or |
3029 | | * IPROTO_HANDLER_ERROR on failure. |
3030 | | */ |
3031 | | static enum iproto_handler_status |
3032 | | tx_run_override_triggers(struct event *event, const char *header, |
3033 | | const char *header_end, const char *body, |
3034 | | const char *body_end) |
3035 | 0 | { |
3036 | 0 | enum iproto_handler_status rc = IPROTO_HANDLER_FALLBACK; |
3037 | 0 | const char *name = NULL; |
3038 | 0 | struct func_adapter *trigger = NULL; |
3039 | 0 | struct func_adapter_ctx ctx; |
3040 | 0 | struct event_trigger_iterator it; |
3041 | 0 | event_trigger_iterator_create(&it, event); |
3042 | |
|
3043 | 0 | while (event_trigger_iterator_next(&it, &trigger, &name)) { |
3044 | 0 | struct mp_ctx mp_ctx_header, mp_ctx_body; |
3045 | 0 | mp_ctx_create_default(&mp_ctx_header, iproto_key_translation); |
3046 | 0 | mp_ctx_create_default(&mp_ctx_body, iproto_key_translation); |
3047 | |
|
3048 | 0 | func_adapter_begin(trigger, &ctx); |
3049 | 0 | func_adapter_push_msgpack_with_ctx(trigger, &ctx, header, |
3050 | 0 | header_end, &mp_ctx_header); |
3051 | 0 | func_adapter_push_msgpack_with_ctx(trigger, &ctx, body, |
3052 | 0 | body_end, &mp_ctx_body); |
3053 | 0 | if (func_adapter_call(trigger, &ctx) == 0) { |
3054 | 0 | if (func_adapter_is_bool(trigger, &ctx)) { |
3055 | 0 | bool ok = false; |
3056 | 0 | func_adapter_pop_bool(trigger, &ctx, &ok); |
3057 | 0 | if (ok) |
3058 | 0 | rc = IPROTO_HANDLER_OK; |
3059 | 0 | } else { |
3060 | 0 | diag_set(ClientError, ER_PROC_LUA, |
3061 | 0 | "Invalid Lua IPROTO handler return " |
3062 | 0 | "type: expected boolean"); |
3063 | 0 | rc = IPROTO_HANDLER_ERROR; |
3064 | 0 | } |
3065 | 0 | } else { |
3066 | 0 | rc = IPROTO_HANDLER_ERROR; |
3067 | 0 | } |
3068 | 0 | func_adapter_end(trigger, &ctx); |
3069 | 0 | if (rc != IPROTO_HANDLER_FALLBACK) |
3070 | 0 | break; |
3071 | 0 | } |
3072 | 0 | event_trigger_iterator_destroy(&it); |
3073 | 0 | return rc; |
3074 | 0 | } |
3075 | | |
3076 | | /** |
3077 | | * Process a request using overridden handlers (or the unknown request handler |
3078 | | * as a last resort). |
3079 | | */ |
3080 | | static void |
3081 | | tx_process_override(struct cmsg *m) |
3082 | 0 | { |
3083 | 0 | struct iproto_msg *msg = tx_accept_msg(m); |
3084 | 0 | const char *header = msg->reqstart; |
3085 | 0 | mp_decode_uint(&header); |
3086 | |
|
3087 | 0 | const char *header_end = msg->reqstart + msg->len; |
3088 | 0 | const char *body = "\x80"; /* Empty MsgPack map encoding. */ |
3089 | 0 | const char *body_end = body + 1; |
3090 | 0 | if (msg->header.bodycnt != 0) { |
3091 | 0 | assert(msg->header.bodycnt == 1); |
3092 | 0 | header_end -= msg->header.body[0].iov_len; |
3093 | 0 | body = (const char *)msg->header.body[0].iov_base; |
3094 | 0 | body_end = body + msg->header.body[0].iov_len; |
3095 | 0 | } |
3096 | | |
3097 | | /* |
3098 | | * If we took the `override_route', there must exist either request |
3099 | | * type-specific or unknown request type handler. Their availability |
3100 | | * is checked by the IPROTO thread. |
3101 | | */ |
3102 | 0 | struct iproto_req_handlers *handlers; |
3103 | 0 | handlers = mh_req_handlers_get(msg->header.type); |
3104 | 0 | if (handlers == NULL) |
3105 | 0 | handlers = mh_req_handlers_get(IPROTO_UNKNOWN); |
3106 | 0 | assert(handlers != NULL); |
3107 | 0 | enum iproto_handler_status rc = IPROTO_HANDLER_FALLBACK; |
3108 | | |
3109 | | /* |
3110 | | * Run handlers from the event registry, set by request type id. |
3111 | | */ |
3112 | 0 | if (handlers->event_by_id != NULL) { |
3113 | 0 | rc = tx_run_override_triggers(handlers->event_by_id, header, |
3114 | 0 | header_end, body, body_end); |
3115 | 0 | } |
3116 | | /* |
3117 | | * Run handlers from the event registry, set by request type name. |
3118 | | */ |
3119 | 0 | if (rc == IPROTO_HANDLER_FALLBACK && handlers->event_by_name != NULL) { |
3120 | 0 | rc = tx_run_override_triggers(handlers->event_by_name, header, |
3121 | 0 | header_end, body, body_end); |
3122 | 0 | } |
3123 | | /* |
3124 | | * Run C handlers. |
3125 | | */ |
3126 | 0 | if (rc == IPROTO_HANDLER_FALLBACK && handlers->c.cb != NULL) { |
3127 | 0 | rc = handlers->c.cb(header, header_end, body, body_end, |
3128 | 0 | handlers->c.ctx); |
3129 | 0 | } |
3130 | |
|
3131 | 0 | struct cmsg_hop *route = NULL; |
3132 | 0 | switch (rc) { |
3133 | 0 | case IPROTO_HANDLER_OK: { |
3134 | 0 | struct obuf *out = msg->connection->tx.p_obuf; |
3135 | 0 | iproto_wpos_create(&msg->wpos, out); |
3136 | 0 | struct obuf_svp empty = obuf_create_svp(out); |
3137 | 0 | tx_end_msg(msg, &empty); |
3138 | 0 | return; |
3139 | 0 | } |
3140 | 0 | case IPROTO_HANDLER_FALLBACK: { |
3141 | 0 | int rc = iproto_msg_decode(msg, &route); |
3142 | 0 | assert(route != NULL); |
3143 | 0 | if (rc != 0) |
3144 | 0 | route = NULL; |
3145 | 0 | FALLTHROUGH; |
3146 | 0 | } |
3147 | 0 | case IPROTO_HANDLER_ERROR: |
3148 | 0 | break; |
3149 | 0 | default: |
3150 | 0 | unreachable(); |
3151 | 0 | } |
3152 | 0 | if (route != NULL) { |
3153 | 0 | assert(m->hop[1].f == route[1].f); |
3154 | 0 | route->f(m); |
3155 | 0 | return; |
3156 | 0 | } |
3157 | 0 | struct obuf_svp svp = obuf_create_svp(msg->connection->tx.p_obuf); |
3158 | 0 | tx_reply_error(msg); |
3159 | 0 | tx_end_msg(msg, &svp); |
3160 | 0 | } |
3161 | | |
3162 | | static void |
3163 | | iproto_msg_finish_processing_in_stream(struct iproto_msg *msg) |
3164 | 0 | { |
3165 | 0 | struct iproto_connection *con = msg->connection; |
3166 | 0 | struct iproto_stream *stream = msg->stream; |
3167 | |
|
3168 | 0 | if (stream == NULL) |
3169 | 0 | return; |
3170 | | |
3171 | 0 | assert(stream->current == msg); |
3172 | 0 | stream->current = NULL; |
3173 | |
|
3174 | 0 | if (stailq_empty(&stream->pending_requests)) { |
3175 | | /* |
3176 | | * If no more messages for the current stream |
3177 | | * and no transaction started, then delete it. |
3178 | | */ |
3179 | 0 | if (stream->txn == NULL) { |
3180 | 0 | struct mh_i64ptr_node_t node = { stream->id, NULL }; |
3181 | 0 | mh_i64ptr_remove(con->streams, &node, 0); |
3182 | 0 | iproto_stream_delete(stream); |
3183 | 0 | } else if (con->state != IPROTO_CONNECTION_ALIVE) { |
3184 | | /* |
3185 | | * Here we are in case when connection was closed, |
3186 | | * there is no messages in stream queue, but there |
3187 | | * is some active transaction in stream. |
3188 | | * Send disconnect message to rollback this |
3189 | | * transaction. |
3190 | | */ |
3191 | 0 | iproto_stream_rollback_on_disconnect(stream); |
3192 | 0 | } |
3193 | 0 | } else { |
3194 | | /* |
3195 | | * If there are new messages for this stream |
3196 | | * then schedule their processing. |
3197 | | */ |
3198 | 0 | stream->current = |
3199 | 0 | stailq_shift_entry(&stream->pending_requests, |
3200 | 0 | struct iproto_msg, |
3201 | 0 | in_stream); |
3202 | 0 | assert(stream->current != NULL); |
3203 | 0 | stream->current->wpos = con->wpos; |
3204 | 0 | con->iproto_thread->requests_in_stream_queue--; |
3205 | 0 | cpipe_push_input(&con->iproto_thread->tx_pipe, |
3206 | 0 | &stream->current->base); |
3207 | 0 | cpipe_flush_input(&con->iproto_thread->tx_pipe); |
3208 | 0 | } |
3209 | 0 | } |
3210 | | |
3211 | | static void |
3212 | | net_send_msg(struct cmsg *m) |
3213 | 0 | { |
3214 | 0 | struct iproto_msg *msg = (struct iproto_msg *) m; |
3215 | 0 | struct iproto_connection *con = msg->connection; |
3216 | |
|
3217 | 0 | iproto_msg_finish_processing_in_stream(msg); |
3218 | 0 | if (msg->len != 0) { |
3219 | | /* Discard request (see iproto_enqueue_batch()). */ |
3220 | 0 | iproto_msg_finish_input(msg); |
3221 | 0 | } else { |
3222 | | /* Already discarded by net_discard_input(). */ |
3223 | 0 | assert(con->long_poll_count > 0); |
3224 | 0 | con->long_poll_count--; |
3225 | 0 | } |
3226 | 0 | con->wend = msg->wpos; |
3227 | |
|
3228 | 0 | if (con->state == IPROTO_CONNECTION_ALIVE) { |
3229 | 0 | iproto_connection_feed_output(con); |
3230 | 0 | } else if (iproto_connection_is_idle(con)) { |
3231 | 0 | iproto_connection_close(con); |
3232 | 0 | } |
3233 | 0 | iproto_msg_delete(msg); |
3234 | 0 | } |
3235 | | |
3236 | | /** |
3237 | | * Complete sending an iproto error: |
3238 | | * recycle the error object and flush output. |
3239 | | */ |
3240 | | static void |
3241 | | net_send_error(struct cmsg *m) |
3242 | 0 | { |
3243 | 0 | struct iproto_msg *msg = (struct iproto_msg *) m; |
3244 | | /* Recycle the exception. */ |
3245 | 0 | diag_move(&msg->diag, &fiber()->diag); |
3246 | 0 | net_send_msg(m); |
3247 | 0 | } |
3248 | | |
3249 | | static void |
3250 | | net_end_join(struct cmsg *m) |
3251 | 0 | { |
3252 | 0 | struct iproto_msg *msg = (struct iproto_msg *) m; |
3253 | 0 | struct iproto_connection *con = msg->connection; |
3254 | 0 | struct ibuf *ibuf = msg->p_ibuf; |
3255 | |
|
3256 | 0 | iproto_msg_finish_input(msg); |
3257 | 0 | iproto_msg_delete(msg); |
3258 | |
|
3259 | 0 | assert(! ev_is_active(&con->input)); |
3260 | 0 | con->is_in_replication = false; |
3261 | |
|
3262 | 0 | if (con->is_drop_pending) { |
3263 | 0 | iproto_connection_close(con); |
3264 | 0 | return; |
3265 | 0 | } |
3266 | | /* |
3267 | | * Enqueue any messages if they are in the readahead |
3268 | | * queue. Will simply start input otherwise. |
3269 | | */ |
3270 | 0 | if (iproto_enqueue_batch(con, ibuf) != 0) |
3271 | 0 | iproto_connection_close(con); |
3272 | 0 | } |
3273 | | |
3274 | | static void |
3275 | | net_end_subscribe(struct cmsg *m) |
3276 | 0 | { |
3277 | 0 | struct iproto_msg *msg = (struct iproto_msg *) m; |
3278 | 0 | struct iproto_connection *con = msg->connection; |
3279 | |
|
3280 | 0 | iproto_msg_finish_input(msg); |
3281 | 0 | iproto_msg_delete(msg); |
3282 | |
|
3283 | 0 | assert(! ev_is_active(&con->input)); |
3284 | | |
3285 | 0 | iproto_connection_close(con); |
3286 | 0 | } |
3287 | | |
3288 | | /** |
3289 | | * Handshake a connection: invoke the on-connect trigger |
3290 | | * and possibly authenticate. Try to send the client an error |
3291 | | * upon a failure. |
3292 | | */ |
3293 | | static void |
3294 | | tx_process_connect(struct cmsg *m) |
3295 | 0 | { |
3296 | 0 | struct iproto_msg *msg = (struct iproto_msg *) m; |
3297 | 0 | struct iproto_connection *con = msg->connection; |
3298 | 0 | struct obuf *out = msg->connection->tx.p_obuf; |
3299 | 0 | if (msg->connect.session != NULL) { |
3300 | 0 | con->session = msg->connect.session; |
3301 | 0 | session_set_type(con->session, SESSION_TYPE_BINARY); |
3302 | 0 | } else { |
3303 | 0 | con->session = session_new(SESSION_TYPE_BINARY); |
3304 | 0 | } |
3305 | 0 | con->session->meta.connection = con; |
3306 | 0 | session_set_peer_addr(con->session, &msg->connect.addr, |
3307 | 0 | msg->connect.addrlen); |
3308 | 0 | iproto_features_create(&con->session->meta.features); |
3309 | 0 | tx_fiber_init(con->session, 0); |
3310 | 0 | char *greeting = (char *)static_alloc(IPROTO_GREETING_SIZE); |
3311 | | /* TODO: dirty read from tx thread */ |
3312 | 0 | struct tt_uuid uuid = INSTANCE_UUID; |
3313 | 0 | random_bytes(con->salt, IPROTO_SALT_SIZE); |
3314 | 0 | greeting_encode(greeting, tarantool_version_id(), &uuid, |
3315 | 0 | con->salt, IPROTO_SALT_SIZE); |
3316 | 0 | xobuf_dup(out, greeting, IPROTO_GREETING_SIZE); |
3317 | 0 | if (session_run_on_connect_triggers(con->session) != 0) |
3318 | 0 | goto error; |
3319 | 0 | iproto_wpos_create(&msg->wpos, out); |
3320 | 0 | return; |
3321 | 0 | error: |
3322 | 0 | tx_reply_error(msg); |
3323 | 0 | msg->close_connection = true; |
3324 | 0 | } |
3325 | | |
3326 | | /** |
3327 | | * Send a response to connect to the client or close the |
3328 | | * connection in case on_connect trigger failed. |
3329 | | */ |
3330 | | static void |
3331 | | net_send_greeting(struct cmsg *m) |
3332 | 0 | { |
3333 | 0 | struct iproto_msg *msg = (struct iproto_msg *) m; |
3334 | 0 | struct iproto_connection *con = msg->connection; |
3335 | 0 | if (con->is_drop_pending) { |
3336 | 0 | iproto_connection_close(con); |
3337 | 0 | iproto_msg_delete(msg); |
3338 | 0 | return; |
3339 | 0 | } |
3340 | 0 | if (msg->close_connection) { |
3341 | 0 | struct obuf *out = msg->wpos.obuf; |
3342 | 0 | int64_t nwr = iostream_writev(&con->io, out->iov, |
3343 | 0 | obuf_iovcnt(out)); |
3344 | 0 | if (nwr > 0) { |
3345 | | /* Count statistics. */ |
3346 | 0 | rmean_collect(con->iproto_thread->rmean, |
3347 | 0 | IPROTO_SENT, nwr); |
3348 | 0 | } else if (nwr == IOSTREAM_ERROR) { |
3349 | 0 | diag_log(); |
3350 | 0 | } |
3351 | 0 | assert(iproto_connection_is_idle(con)); |
3352 | 0 | iproto_connection_close(con); |
3353 | 0 | iproto_msg_delete(msg); |
3354 | 0 | return; |
3355 | 0 | } |
3356 | 0 | con->is_established = true; |
3357 | 0 | con->wend = msg->wpos; |
3358 | | /* |
3359 | | * Connect is synchronous, so no one could have been |
3360 | | * messing up with the connection while it was in |
3361 | | * progress. |
3362 | | */ |
3363 | 0 | assert(con->state == IPROTO_CONNECTION_ALIVE); |
3364 | | /* Handshake OK, start reading input. */ |
3365 | 0 | iproto_connection_feed_output(con); |
3366 | 0 | iproto_msg_delete(msg); |
3367 | 0 | } |
3368 | | |
3369 | | /** }}} */ |
3370 | | |
3371 | | /** |
3372 | | * Create a connection and start input. |
3373 | | * |
3374 | | * If session is NULL, a new session object will be created for the connection |
3375 | | * in the TX thread. |
3376 | | * |
3377 | | * The function takes ownership of the passed IO stream and session. |
3378 | | */ |
3379 | | static void |
3380 | | iproto_thread_accept(struct iproto_thread *iproto_thread, struct iostream *io, |
3381 | | struct sockaddr *addr, socklen_t addrlen, |
3382 | | struct session *session) |
3383 | 0 | { |
3384 | 0 | struct iproto_connection *con = iproto_connection_new(iproto_thread); |
3385 | 0 | struct iproto_msg *msg = iproto_msg_new(con); |
3386 | 0 | assert(addrlen <= sizeof(msg->connect.addrstorage)); |
3387 | 0 | memcpy(&msg->connect.addrstorage, addr, addrlen); |
3388 | 0 | msg->connect.addrlen = addrlen; |
3389 | 0 | msg->connect.session = session; |
3390 | 0 | iostream_move(&con->io, io); |
3391 | 0 | cmsg_init(&msg->base, iproto_thread->connect_route); |
3392 | 0 | msg->p_ibuf = con->p_ibuf; |
3393 | 0 | msg->wpos = con->wpos; |
3394 | 0 | cpipe_push(&iproto_thread->tx_pipe, &msg->base); |
3395 | 0 | } |
3396 | | |
3397 | | static void |
3398 | | iproto_on_accept_cb(struct evio_service *service, struct iostream *io, |
3399 | | struct sockaddr *addr, socklen_t addrlen) |
3400 | 0 | { |
3401 | 0 | struct iproto_thread *iproto_thread = |
3402 | 0 | (struct iproto_thread *)service->on_accept_param; |
3403 | 0 | iproto_thread_accept(iproto_thread, io, addr, addrlen, |
3404 | | /*session=*/NULL); |
3405 | 0 | } |
3406 | | |
3407 | | /** |
3408 | | * The network io thread main function: |
3409 | | * begin serving the message bus. |
3410 | | */ |
3411 | | static int |
3412 | | net_cord_f(va_list ap) |
3413 | 0 | { |
3414 | 0 | struct iproto_thread *iproto_thread = |
3415 | 0 | va_arg(ap, struct iproto_thread *); |
3416 | |
|
3417 | 0 | mempool_create(&iproto_thread->iproto_msg_pool, &cord()->slabc, |
3418 | 0 | sizeof(struct iproto_msg)); |
3419 | 0 | mempool_create(&iproto_thread->iproto_connection_pool, &cord()->slabc, |
3420 | 0 | sizeof(struct iproto_connection)); |
3421 | 0 | mempool_create(&iproto_thread->iproto_stream_pool, &cord()->slabc, |
3422 | 0 | sizeof(struct iproto_stream)); |
3423 | |
|
3424 | 0 | evio_service_create(loop(), &iproto_thread->binary, "binary", |
3425 | 0 | iproto_on_accept_cb, iproto_thread); |
3426 | |
|
3427 | 0 | char endpoint_name[ENDPOINT_NAME_MAX]; |
3428 | 0 | snprintf(endpoint_name, ENDPOINT_NAME_MAX, "net%u", |
3429 | 0 | iproto_thread->id); |
3430 | |
|
3431 | 0 | struct cbus_endpoint endpoint; |
3432 | | /* Create "net" endpoint. */ |
3433 | 0 | cbus_endpoint_create(&endpoint, endpoint_name, |
3434 | 0 | fiber_schedule_cb, fiber()); |
3435 | | /* Create a pipe to "tx" thread. */ |
3436 | 0 | cpipe_create(&iproto_thread->tx_pipe, "tx"); |
3437 | 0 | cpipe_set_max_input(&iproto_thread->tx_pipe, iproto_msg_max / 2); |
3438 | | |
3439 | | /* Process incomming messages. */ |
3440 | 0 | cbus_loop(&endpoint); |
3441 | |
|
3442 | 0 | cbus_endpoint_destroy(&endpoint, cbus_process); |
3443 | 0 | cpipe_destroy(&iproto_thread->tx_pipe); |
3444 | 0 | evio_service_detach(&iproto_thread->binary); |
3445 | |
|
3446 | 0 | mempool_destroy(&iproto_thread->iproto_stream_pool); |
3447 | 0 | mempool_destroy(&iproto_thread->iproto_connection_pool); |
3448 | 0 | mempool_destroy(&iproto_thread->iproto_msg_pool); |
3449 | 0 | return 0; |
3450 | 0 | } |
3451 | | |
3452 | | int |
3453 | | iproto_session_fd(struct session *session) |
3454 | 0 | { |
3455 | 0 | struct iproto_connection *con = |
3456 | 0 | (struct iproto_connection *) session->meta.connection; |
3457 | 0 | return con->io.fd; |
3458 | 0 | } |
3459 | | |
3460 | | int64_t |
3461 | | iproto_session_sync(struct session *session) |
3462 | 0 | { |
3463 | 0 | (void) session; |
3464 | 0 | assert(session == fiber()->storage.session); |
3465 | 0 | return fiber()->storage.net.sync; |
3466 | 0 | } |
3467 | | |
3468 | | /** {{{ IPROTO_PUSH implementation. */ |
3469 | | |
3470 | | static void |
3471 | | iproto_process_push(struct cmsg *m) |
3472 | 0 | { |
3473 | 0 | struct iproto_kharon *kharon = (struct iproto_kharon *) m; |
3474 | 0 | struct iproto_connection *con = |
3475 | 0 | container_of(kharon, struct iproto_connection, kharon); |
3476 | 0 | con->wend = kharon->wpos; |
3477 | 0 | kharon->wpos = con->wpos; |
3478 | 0 | if (con->state == IPROTO_CONNECTION_ALIVE) |
3479 | 0 | iproto_connection_feed_output(con); |
3480 | 0 | } |
3481 | | |
3482 | | /** |
3483 | | * Send to iproto thread a notification about new pushes. |
3484 | | * @param con iproto connection. |
3485 | | */ |
3486 | | static void |
3487 | | tx_begin_push(struct iproto_connection *con) |
3488 | 0 | { |
3489 | 0 | assert(! con->tx.is_push_sent); |
3490 | 0 | cmsg_init(&con->kharon.base, con->iproto_thread->push_route); |
3491 | 0 | iproto_wpos_create(&con->kharon.wpos, con->tx.p_obuf); |
3492 | 0 | con->tx.is_push_pending = false; |
3493 | 0 | con->tx.is_push_sent = true; |
3494 | 0 | cpipe_push(&con->iproto_thread->net_pipe, |
3495 | 0 | (struct cmsg *) &con->kharon); |
3496 | 0 | } |
3497 | | |
3498 | | static void |
3499 | | tx_end_push(struct cmsg *m) |
3500 | 0 | { |
3501 | 0 | struct iproto_kharon *kharon = (struct iproto_kharon *) m; |
3502 | 0 | struct iproto_connection *con = |
3503 | 0 | container_of(kharon, struct iproto_connection, kharon); |
3504 | 0 | tx_accept_wpos(con, &kharon->wpos); |
3505 | 0 | con->tx.is_push_sent = false; |
3506 | 0 | if (con->tx.is_push_pending) |
3507 | 0 | tx_begin_push(con); |
3508 | 0 | } |
3509 | | |
3510 | | /** |
3511 | | * Asynchronously send response message using Kharon facility. |
3512 | | */ |
3513 | | static void |
3514 | | tx_push(struct iproto_connection *con, struct obuf_svp *svp) |
3515 | 0 | { |
3516 | 0 | flightrec_write_response(con->tx.p_obuf, svp); |
3517 | 0 | if (!con->tx.is_push_sent) |
3518 | 0 | tx_begin_push(con); |
3519 | 0 | else |
3520 | 0 | con->tx.is_push_pending = true; |
3521 | 0 | } |
3522 | | |
3523 | | /** |
3524 | | * Push a message from @a port to a remote client. |
3525 | | * @param session iproto session. |
3526 | | * @param port Port with data to send. |
3527 | | * |
3528 | | * @retval -1 Memory error. |
3529 | | * @retval 0 Success, a message is written to the output buffer. |
3530 | | * We don't wait here that the push has reached the |
3531 | | * client: the output buffer is flushed asynchronously. |
3532 | | */ |
3533 | | static int |
3534 | | iproto_session_push(struct session *session, struct port *port) |
3535 | 0 | { |
3536 | 0 | struct iproto_connection *con = |
3537 | 0 | (struct iproto_connection *) session->meta.connection; |
3538 | 0 | struct obuf_svp svp; |
3539 | 0 | iproto_prepare_select(con->tx.p_obuf, &svp); |
3540 | 0 | if (port_dump_msgpack(port, con->tx.p_obuf) < 0) { |
3541 | 0 | obuf_rollback_to_svp(con->tx.p_obuf, &svp); |
3542 | 0 | return -1; |
3543 | 0 | } |
3544 | 0 | iproto_reply_chunk(con->tx.p_obuf, &svp, iproto_session_sync(session), |
3545 | 0 | ::schema_version); |
3546 | 0 | tx_push(con, &svp); |
3547 | 0 | return 0; |
3548 | 0 | } |
3549 | | |
3550 | | /** |
3551 | | * Sends a notification to a remote watcher when a key is updated. |
3552 | | * Uses IPROTO_PUSH (kharon) infrastructure to signal the iproto thread |
3553 | | * about new data. |
3554 | | */ |
3555 | | static void |
3556 | | iproto_session_notify(struct session *session, uint64_t sync, |
3557 | | const char *key, size_t key_len, |
3558 | | const char *data, const char *data_end) |
3559 | 0 | { |
3560 | 0 | struct iproto_connection *con = |
3561 | 0 | (struct iproto_connection *)session->meta.connection; |
3562 | 0 | struct obuf *out = con->tx.p_obuf; |
3563 | 0 | struct obuf_svp svp = obuf_create_svp(out); |
3564 | 0 | iproto_send_event(out, sync, key, key_len, data, data_end); |
3565 | 0 | tx_push(con, &svp); |
3566 | 0 | } |
3567 | | |
3568 | | /** }}} */ |
3569 | | |
3570 | | /** |
3571 | | * Stops accepting new connections on shutdown. |
3572 | | */ |
3573 | | static int |
3574 | | iproto_on_shutdown_f(void *arg) |
3575 | 0 | { |
3576 | 0 | (void)arg; |
3577 | 0 | fiber_set_name(fiber_self(), "iproto.shutdown"); |
3578 | 0 | iproto_is_shutting_down = true; |
3579 | 0 | struct iproto_cfg_msg cfg_msg; |
3580 | 0 | iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_SHUTDOWN); |
3581 | 0 | for (int i = 0; i < iproto_threads_count; i++) |
3582 | 0 | iproto_do_cfg(&iproto_threads[i], &cfg_msg); |
3583 | 0 | evio_service_stop(&tx_binary); |
3584 | 0 | return 0; |
3585 | 0 | } |
3586 | | |
3587 | | static inline void |
3588 | | iproto_thread_init_routes(struct iproto_thread *iproto_thread) |
3589 | 0 | { |
3590 | 0 | iproto_thread->begin_route[0] = |
3591 | 0 | { tx_process_begin, &iproto_thread->net_pipe }; |
3592 | 0 | iproto_thread->begin_route[1] = |
3593 | 0 | { net_send_msg, NULL }; |
3594 | 0 | iproto_thread->commit_route[0] = |
3595 | 0 | { tx_process_commit, &iproto_thread->net_pipe }; |
3596 | 0 | iproto_thread->commit_route[1] = |
3597 | 0 | { net_send_msg, NULL }; |
3598 | 0 | iproto_thread->rollback_route[0] = |
3599 | 0 | { tx_process_rollback, &iproto_thread->net_pipe }; |
3600 | 0 | iproto_thread->rollback_route[1] = |
3601 | 0 | { net_send_msg, NULL }; |
3602 | 0 | iproto_thread->rollback_on_disconnect_route[0] = |
3603 | 0 | { tx_process_rollback_on_disconnect, |
3604 | 0 | &iproto_thread->net_pipe }; |
3605 | 0 | iproto_thread->rollback_on_disconnect_route[1] = |
3606 | 0 | { net_finish_rollback_on_disconnect, NULL }; |
3607 | 0 | iproto_thread->destroy_route[0] = |
3608 | 0 | { tx_process_destroy, &iproto_thread->net_pipe }; |
3609 | 0 | iproto_thread->destroy_route[1] = |
3610 | 0 | { net_finish_destroy, NULL }; |
3611 | 0 | iproto_thread->disconnect_route[0] = |
3612 | 0 | { tx_process_disconnect, &iproto_thread->net_pipe }; |
3613 | 0 | iproto_thread->disconnect_route[1] = |
3614 | 0 | { net_finish_disconnect, NULL }; |
3615 | 0 | iproto_thread->misc_route[0] = |
3616 | 0 | { tx_process_misc, &iproto_thread->net_pipe }; |
3617 | 0 | iproto_thread->misc_route[1] = { net_send_msg, NULL }; |
3618 | 0 | iproto_thread->call_route[0] = |
3619 | 0 | { tx_process_call, &iproto_thread->net_pipe }; |
3620 | 0 | iproto_thread->call_route[1] = { net_send_msg, NULL }; |
3621 | 0 | iproto_thread->select_route[0] = |
3622 | 0 | { tx_process_select, &iproto_thread->net_pipe }; |
3623 | 0 | iproto_thread->select_route[1] = { net_send_msg, NULL }; |
3624 | 0 | iproto_thread->process1_route[0] = |
3625 | 0 | { tx_process1, &iproto_thread->net_pipe }; |
3626 | 0 | iproto_thread->process1_route[1] = { net_send_msg, NULL }; |
3627 | 0 | iproto_thread->sql_route[0] = |
3628 | 0 | { tx_process_sql, &iproto_thread->net_pipe }; |
3629 | 0 | iproto_thread->sql_route[1] = { net_send_msg, NULL }; |
3630 | 0 | iproto_thread->join_route[0] = |
3631 | 0 | { tx_process_replication, &iproto_thread->net_pipe }; |
3632 | 0 | iproto_thread->join_route[1] = { net_end_join, NULL }; |
3633 | 0 | iproto_thread->subscribe_route[0] = |
3634 | 0 | { tx_process_replication, &iproto_thread->net_pipe }; |
3635 | 0 | iproto_thread->subscribe_route[1] = { net_end_subscribe, NULL }; |
3636 | 0 | iproto_thread->error_route[0] = |
3637 | 0 | { tx_reply_iproto_error, &iproto_thread->net_pipe }; |
3638 | 0 | iproto_thread->error_route[1] = { net_send_error, NULL }; |
3639 | 0 | iproto_thread->push_route[0] = |
3640 | 0 | { iproto_process_push, &iproto_thread->tx_pipe }; |
3641 | 0 | iproto_thread->push_route[1] = { tx_end_push, NULL }; |
3642 | | /* IPROTO_OK */ |
3643 | 0 | iproto_thread->dml_route[0] = NULL; |
3644 | | /* IPROTO_SELECT */ |
3645 | 0 | iproto_thread->dml_route[1] = iproto_thread->select_route; |
3646 | | /* IPROTO_INSERT */ |
3647 | 0 | iproto_thread->dml_route[2] = iproto_thread->process1_route; |
3648 | | /* IPROTO_REPLACE */ |
3649 | 0 | iproto_thread->dml_route[3] = iproto_thread->process1_route; |
3650 | | /* IPROTO_UPDATE */ |
3651 | 0 | iproto_thread->dml_route[4] = iproto_thread->process1_route; |
3652 | | /* IPROTO_DELETE */ |
3653 | 0 | iproto_thread->dml_route[5] = iproto_thread->process1_route; |
3654 | | /* IPROTO_CALL_16 */ |
3655 | 0 | iproto_thread->dml_route[6] = iproto_thread->call_route; |
3656 | | /* IPROTO_AUTH */ |
3657 | 0 | iproto_thread->dml_route[7] = iproto_thread->misc_route; |
3658 | | /* IPROTO_EVAL */ |
3659 | 0 | iproto_thread->dml_route[8] = iproto_thread->call_route; |
3660 | | /* IPROTO_UPSERT */ |
3661 | 0 | iproto_thread->dml_route[9] = iproto_thread->process1_route; |
3662 | | /* IPROTO_CALL */ |
3663 | 0 | iproto_thread->dml_route[10] = iproto_thread->call_route; |
3664 | | /* IPROTO_EXECUTE */ |
3665 | 0 | iproto_thread->dml_route[11] = iproto_thread->sql_route; |
3666 | | /* IPROTO_NOP */ |
3667 | 0 | iproto_thread->dml_route[12] = NULL; |
3668 | | /* IPROTO_PREPARE */ |
3669 | 0 | iproto_thread->dml_route[13] = iproto_thread->sql_route; |
3670 | 0 | iproto_thread->connect_route[0] = |
3671 | 0 | { tx_process_connect, &iproto_thread->net_pipe }; |
3672 | 0 | iproto_thread->connect_route[1] = { net_send_greeting, NULL }; |
3673 | 0 | iproto_thread->override_route[0] = |
3674 | 0 | { tx_process_override, &iproto_thread->net_pipe }; |
3675 | 0 | iproto_thread->override_route[1] = { net_send_msg, NULL }; |
3676 | 0 | }; |
3677 | | |
3678 | | static inline void |
3679 | | iproto_thread_init(struct iproto_thread *iproto_thread) |
3680 | 0 | { |
3681 | 0 | iproto_thread_init_routes(iproto_thread); |
3682 | 0 | iproto_thread->req_handlers = mh_i32_new(); |
3683 | 0 | slab_cache_create(&iproto_thread->net_slabc, &runtime); |
3684 | | /* Init statistics counter */ |
3685 | 0 | iproto_thread->rmean = rmean_new(rmean_net_strings, RMEAN_NET_LAST); |
3686 | 0 | iproto_thread->tx.rmean = rmean_new(rmean_tx_strings, RMEAN_TX_LAST); |
3687 | 0 | rlist_create(&iproto_thread->stopped_connections); |
3688 | 0 | iproto_thread->tx.requests_in_progress = 0; |
3689 | 0 | iproto_thread->requests_in_stream_queue = 0; |
3690 | 0 | rlist_create(&iproto_thread->connections); |
3691 | 0 | } |
3692 | | |
3693 | | /** |
3694 | | * True for IPROTO request types that can be overridden. |
3695 | | */ |
3696 | | static bool |
3697 | | is_iproto_override_supported(uint32_t req_type) |
3698 | 0 | { |
3699 | 0 | switch (req_type) { |
3700 | 0 | case IPROTO_JOIN: |
3701 | 0 | case IPROTO_SUBSCRIBE: |
3702 | 0 | case IPROTO_FETCH_SNAPSHOT: |
3703 | 0 | case IPROTO_REGISTER: |
3704 | 0 | return false; |
3705 | 0 | default: |
3706 | 0 | return true; |
3707 | 0 | } |
3708 | 0 | } |
3709 | | |
3710 | | /** |
3711 | | * If the `name' contains a valid name of an IPROTO overriding event, sets |
3712 | | * `req_type' and returns True. If the name contains correct prefix, but |
3713 | | * the request type is invalid, the error is logged with CRIT log level. |
3714 | | * `is_by_id' set to True if the request is overridden by id, False if by name. |
3715 | | */ |
3716 | | static bool |
3717 | | get_iproto_type_from_event_name(const char *name, uint32_t *req_type, |
3718 | | bool *is_by_id) |
3719 | 0 | { |
3720 | 0 | const char *prefix = "box.iproto.override"; |
3721 | 0 | const size_t prefix_len = strlen(prefix); |
3722 | 0 | if (strncmp(name, prefix, prefix_len) != 0) |
3723 | 0 | return false; |
3724 | | |
3725 | 0 | const char *req_name = name + prefix_len; |
3726 | 0 | const char *req_name_err = req_name; |
3727 | 0 | if (*req_name == '.') { |
3728 | 0 | *is_by_id = false; |
3729 | | /* Skip the dot. */ |
3730 | 0 | req_name++; |
3731 | 0 | req_name_err = req_name; |
3732 | 0 | *req_type = get_iproto_type_by_name(req_name); |
3733 | 0 | if (*req_type == iproto_type_MAX) |
3734 | 0 | goto err_bad_type; |
3735 | 0 | } else if (*req_name == '[') { |
3736 | 0 | *is_by_id = true; |
3737 | | /* Skip open bracket. */ |
3738 | 0 | req_name++; |
3739 | 0 | if (!isdigit(*req_name) && *req_name != '-') |
3740 | 0 | goto err_bad_type; |
3741 | 0 | char *endptr; |
3742 | 0 | *req_type = strtol(req_name, &endptr, 10); |
3743 | 0 | if (endptr == req_name) |
3744 | 0 | goto err_bad_type; |
3745 | | /* |
3746 | | * At least one digit is parsed. |
3747 | | * Check that the rest of the string equals "]". |
3748 | | */ |
3749 | 0 | if (*endptr != ']' || endptr[1] != 0) |
3750 | 0 | goto err_bad_type; |
3751 | 0 | } else { |
3752 | | /* Not in IPROTO override namespace. */ |
3753 | 0 | return false; |
3754 | 0 | } |
3755 | | |
3756 | 0 | if (!is_iproto_override_supported(*req_type)) { |
3757 | 0 | say_crit("IPROTO request handler overriding does not support " |
3758 | 0 | "`%s' request type", iproto_type_name(*req_type)); |
3759 | 0 | return false; |
3760 | 0 | } |
3761 | 0 | return true; |
3762 | | |
3763 | 0 | err_bad_type: |
3764 | 0 | say_crit("The event `%s' is in IPROTO override namespace, but `%s' is " |
3765 | 0 | "not a valid request type", name, req_name_err); |
3766 | 0 | return false; |
3767 | 0 | } |
3768 | | |
3769 | | /** |
3770 | | * Gets an arbitrary `event', checks its name, and adds it to `req_handlers' if |
3771 | | * it is a valid IPROTO overriding event. |
3772 | | * If the event name contains correct IPROTO overriding prefix, but the request |
3773 | | * type is invalid, the error is logged with CRIT log level. |
3774 | | */ |
3775 | | static bool |
3776 | | iproto_override_event_init(struct event *event, void *arg) |
3777 | 0 | { |
3778 | 0 | (void)arg; |
3779 | 0 | uint32_t type; |
3780 | 0 | bool is_by_id; |
3781 | 0 | if (!get_iproto_type_from_event_name(event->name, &type, &is_by_id)) |
3782 | 0 | return true; |
3783 | | |
3784 | 0 | struct iproto_req_handlers *handlers = mh_req_handlers_get(type); |
3785 | 0 | if (handlers == NULL) { |
3786 | 0 | handlers = iproto_req_handlers_new(); |
3787 | 0 | mh_req_handlers_put(type, handlers); |
3788 | 0 | } |
3789 | 0 | iproto_req_handlers_set_event(handlers, event, is_by_id); |
3790 | |
|
3791 | 0 | for (int i = 0; i < iproto_threads_count; i++) { |
3792 | 0 | struct iproto_thread *iproto_thread = &iproto_threads[i]; |
3793 | 0 | mh_i32_put(iproto_thread->req_handlers, &type, NULL, NULL); |
3794 | 0 | } |
3795 | 0 | return true; |
3796 | 0 | } |
3797 | | |
3798 | | /** |
3799 | | * Notifies IPROTO threads that a new request handler has been set. |
3800 | | */ |
3801 | | static void |
3802 | | iproto_cfg_override(uint32_t req_type, bool is_set); |
3803 | | |
3804 | | /** |
3805 | | * Calls iproto_cfg_override() and destroys the handlers when necessary. |
3806 | | */ |
3807 | | static void |
3808 | | iproto_override_finish(struct iproto_req_handlers *handlers, uint32_t req_type, |
3809 | | bool old_is_set) |
3810 | 0 | { |
3811 | 0 | bool new_is_set = iproto_req_handler_is_set(handlers); |
3812 | 0 | if (new_is_set != old_is_set) |
3813 | 0 | iproto_cfg_override(req_type, new_is_set); |
3814 | |
|
3815 | 0 | if (!new_is_set && handlers != NULL) { |
3816 | 0 | mh_req_handlers_del(req_type); |
3817 | 0 | iproto_req_handlers_delete(handlers); |
3818 | 0 | } |
3819 | 0 | } |
3820 | | |
3821 | | /** |
3822 | | * Trigger which is fired on any change in the event registry. |
3823 | | */ |
3824 | | static int |
3825 | | trigger_on_change_iproto_notify(struct trigger *trigger, void *arg) |
3826 | 0 | { |
3827 | 0 | (void)trigger; |
3828 | 0 | uint32_t type; |
3829 | 0 | bool is_by_id; |
3830 | 0 | struct event *event = (struct event *)arg; |
3831 | 0 | if (!get_iproto_type_from_event_name(event->name, &type, &is_by_id)) |
3832 | 0 | return 0; |
3833 | | |
3834 | 0 | struct iproto_req_handlers *handlers; |
3835 | 0 | handlers = mh_req_handlers_get(type); |
3836 | 0 | bool is_set = iproto_req_handler_is_set(handlers); |
3837 | |
|
3838 | 0 | if (event_has_triggers(event)) { |
3839 | 0 | if (handlers == NULL) { |
3840 | 0 | handlers = iproto_req_handlers_new(); |
3841 | 0 | mh_req_handlers_put(type, handlers); |
3842 | 0 | } |
3843 | 0 | iproto_req_handlers_set_event(handlers, event, is_by_id); |
3844 | 0 | } else { |
3845 | 0 | iproto_req_handlers_del_event(handlers, is_by_id); |
3846 | 0 | } |
3847 | |
|
3848 | 0 | iproto_override_finish(handlers, type, is_set); |
3849 | 0 | return 0; |
3850 | 0 | } |
3851 | | |
3852 | | TRIGGER(trigger_on_change, trigger_on_change_iproto_notify); |
3853 | | |
3854 | | /** Initialize the iproto subsystem and start network io thread */ |
3855 | | void |
3856 | | iproto_init(int threads_count) |
3857 | 0 | { |
3858 | 0 | iproto_features_init(); |
3859 | |
|
3860 | 0 | iproto_threads_count = 0; |
3861 | 0 | struct session_vtab iproto_session_vtab = { |
3862 | 0 | /* .push = */ iproto_session_push, |
3863 | 0 | /* .fd = */ iproto_session_fd, |
3864 | 0 | /* .sync = */ iproto_session_sync, |
3865 | 0 | }; |
3866 | | /* |
3867 | | * We use this tx_binary only for bind, not for listen, so |
3868 | | * we don't need any accept functions. |
3869 | | */ |
3870 | 0 | evio_service_create(loop(), &tx_binary, "tx_binary", NULL, NULL); |
3871 | 0 | iproto_threads = (struct iproto_thread *) |
3872 | 0 | xcalloc(threads_count, sizeof(struct iproto_thread)); |
3873 | 0 | fiber_cond_create(&drop_finished_cond); |
3874 | |
|
3875 | 0 | for (int i = 0; i < threads_count; i++, iproto_threads_count++) { |
3876 | 0 | struct iproto_thread *iproto_thread = &iproto_threads[i]; |
3877 | 0 | iproto_thread->id = i; |
3878 | 0 | iproto_thread_init(iproto_thread); |
3879 | 0 | } |
3880 | | |
3881 | | /* |
3882 | | * Go through all events with triggers, and initialize overridden |
3883 | | * request handlers that were registered before IPROTO initialization. |
3884 | | */ |
3885 | 0 | tx_req_handlers = mh_i32ptr_new(); |
3886 | 0 | event_foreach(iproto_override_event_init, NULL); |
3887 | |
|
3888 | 0 | for (int i = 0; i < threads_count; i++) { |
3889 | 0 | struct iproto_thread *iproto_thread = &iproto_threads[i]; |
3890 | 0 | if (cord_costart(&iproto_thread->net_cord, "iproto", |
3891 | 0 | net_cord_f, iproto_thread)) |
3892 | 0 | panic("failed to start iproto thread"); |
3893 | | /* Create a pipe to "net" thread. */ |
3894 | 0 | char endpoint_name[ENDPOINT_NAME_MAX]; |
3895 | 0 | snprintf(endpoint_name, ENDPOINT_NAME_MAX, "net%u", |
3896 | 0 | iproto_thread->id); |
3897 | 0 | cpipe_create(&iproto_thread->net_pipe, endpoint_name); |
3898 | 0 | cpipe_set_max_input(&iproto_thread->net_pipe, |
3899 | 0 | iproto_msg_max / 2); |
3900 | 0 | } |
3901 | | |
3902 | 0 | session_vtab_registry[SESSION_TYPE_BINARY] = iproto_session_vtab; |
3903 | |
|
3904 | 0 | event_on_change(&trigger_on_change); |
3905 | 0 | if (box_on_shutdown(NULL, iproto_on_shutdown_f, NULL) != 0) |
3906 | 0 | panic("failed to set iproto shutdown trigger"); |
3907 | 0 | } |
3908 | | |
3909 | | static void |
3910 | | iproto_fill_stat(struct iproto_thread *iproto_thread, |
3911 | | struct iproto_cfg_msg *cfg_msg) |
3912 | 0 | { |
3913 | 0 | assert(cfg_msg->stats != NULL); |
3914 | 0 | cfg_msg->stats->mem_used = |
3915 | 0 | slab_cache_used(&iproto_thread->net_cord.slabc) + |
3916 | 0 | slab_cache_used(&iproto_thread->net_slabc); |
3917 | 0 | cfg_msg->stats->connections = |
3918 | 0 | mempool_count(&iproto_thread->iproto_connection_pool); |
3919 | 0 | cfg_msg->stats->streams = |
3920 | 0 | mempool_count(&iproto_thread->iproto_stream_pool); |
3921 | 0 | cfg_msg->stats->requests = |
3922 | 0 | mempool_count(&iproto_thread->iproto_msg_pool); |
3923 | 0 | cfg_msg->stats->requests_in_stream_queue = |
3924 | 0 | iproto_thread->requests_in_stream_queue; |
3925 | 0 | } |
3926 | | |
3927 | | static int |
3928 | | iproto_do_cfg_f(struct cbus_call_msg *m) |
3929 | 0 | { |
3930 | 0 | struct iproto_cfg_msg *cfg_msg = (struct iproto_cfg_msg *) m; |
3931 | 0 | struct iproto_thread *iproto_thread = cfg_msg->iproto_thread; |
3932 | 0 | struct mh_i32_t *req_handlers = iproto_thread->req_handlers; |
3933 | 0 | struct evio_service *binary = &iproto_thread->binary; |
3934 | 0 | switch (cfg_msg->op) { |
3935 | 0 | case IPROTO_CFG_MSG_MAX: { |
3936 | 0 | cpipe_set_max_input(&iproto_thread->tx_pipe, |
3937 | 0 | cfg_msg->iproto_msg_max / 2); |
3938 | 0 | int old = iproto_msg_max; |
3939 | 0 | iproto_msg_max = cfg_msg->iproto_msg_max; |
3940 | 0 | if (old < iproto_msg_max) |
3941 | 0 | iproto_resume(iproto_thread); |
3942 | 0 | break; |
3943 | 0 | } |
3944 | 0 | case IPROTO_CFG_START: |
3945 | 0 | if (iproto_thread->is_shutting_down) |
3946 | 0 | break; |
3947 | 0 | evio_service_attach(binary, &tx_binary); |
3948 | 0 | break; |
3949 | 0 | case IPROTO_CFG_SHUTDOWN: |
3950 | 0 | iproto_thread->is_shutting_down = true; |
3951 | 0 | FALLTHROUGH; |
3952 | 0 | case IPROTO_CFG_STOP: |
3953 | 0 | evio_service_detach(binary); |
3954 | 0 | break; |
3955 | 0 | case IPROTO_CFG_RESTART: |
3956 | 0 | evio_service_detach(binary); |
3957 | 0 | evio_service_attach(binary, &tx_binary); |
3958 | 0 | break; |
3959 | 0 | case IPROTO_CFG_STAT: |
3960 | 0 | iproto_fill_stat(iproto_thread, cfg_msg); |
3961 | 0 | break; |
3962 | 0 | case IPROTO_CFG_OVERRIDE: |
3963 | 0 | if (cfg_msg->override.is_set) { |
3964 | 0 | uint32_t old; |
3965 | 0 | uint32_t *replaced = &old; |
3966 | 0 | mh_i32_put(req_handlers, &cfg_msg->override.req_type, |
3967 | 0 | &replaced, NULL); |
3968 | 0 | assert(replaced == NULL); |
3969 | 0 | } else { |
3970 | 0 | mh_int_t k = mh_i32_find(req_handlers, |
3971 | 0 | cfg_msg->override.req_type, |
3972 | 0 | NULL); |
3973 | 0 | assert(k != mh_end(req_handlers)); |
3974 | 0 | mh_i32_del(req_handlers, k, NULL); |
3975 | 0 | } |
3976 | 0 | break; |
3977 | 0 | case IPROTO_CFG_SESSION_NEW: { |
3978 | 0 | struct iostream *io = &cfg_msg->session_new.io; |
3979 | 0 | struct session *session = cfg_msg->session_new.session; |
3980 | 0 | struct sockaddr_storage addrstorage; |
3981 | 0 | struct sockaddr *addr = (struct sockaddr *)&addrstorage; |
3982 | 0 | socklen_t addrlen = sizeof(addrstorage); |
3983 | 0 | if (sio_getpeername(io->fd, addr, &addrlen) != 0) |
3984 | 0 | addrlen = 0; |
3985 | 0 | iproto_thread_accept(iproto_thread, io, addr, addrlen, session); |
3986 | 0 | break; |
3987 | 0 | } |
3988 | 0 | case IPROTO_CFG_DROP_CONNECTIONS: { |
3989 | 0 | struct iproto_connection *con; |
3990 | 0 | static const struct cmsg_hop cancel_route[1] = |
3991 | 0 | {{ tx_process_cancel_inprogress, NULL }}; |
3992 | 0 | iproto_thread->drop_pending_connection_count = 0; |
3993 | 0 | rlist_foreach_entry(con, &iproto_thread->connections, |
3994 | 0 | in_connections) { |
3995 | | /* |
3996 | | * Replication IO is done outside iproto so we |
3997 | | * cannot close them as usual. Anyway we cancel |
3998 | | * replication fibers as well and close connection |
3999 | | * after replication is breaked. |
4000 | | * |
4001 | | * Do not close connection that is not yet |
4002 | | * established. Otherwise session |
4003 | | * on_connect/on_disconnect callbacks may be |
4004 | | * executed in reverse order in case of yields |
4005 | | * in on_connect callbacks. |
4006 | | */ |
4007 | 0 | if (!con->is_in_replication && |
4008 | 0 | con->state == IPROTO_CONNECTION_ALIVE && |
4009 | 0 | con->is_established) |
4010 | 0 | iproto_connection_close(con); |
4011 | | /* |
4012 | | * Do not wait deletion of connection that called |
4013 | | * iproto_drop_connections to avoid deadlock. |
4014 | | */ |
4015 | 0 | if (con != cfg_msg->drop_connections.owner) { |
4016 | 0 | con->is_drop_pending = true; |
4017 | 0 | con->drop_generation = |
4018 | 0 | cfg_msg->drop_connections.generation; |
4019 | 0 | iproto_thread->drop_pending_connection_count++; |
4020 | 0 | } |
4021 | 0 | if (con->state != IPROTO_CONNECTION_DESTROYED) { |
4022 | 0 | cmsg_init(&con->cancel_msg, cancel_route); |
4023 | 0 | cpipe_push(&iproto_thread->tx_pipe, |
4024 | 0 | &con->cancel_msg); |
4025 | 0 | } |
4026 | 0 | } |
4027 | 0 | if (iproto_thread->drop_pending_connection_count == 0) |
4028 | 0 | iproto_send_drop_finished( |
4029 | 0 | iproto_thread, |
4030 | 0 | cfg_msg->drop_connections.generation); |
4031 | 0 | break; |
4032 | 0 | } |
4033 | 0 | default: |
4034 | 0 | unreachable(); |
4035 | 0 | } |
4036 | 0 | return 0; |
4037 | 0 | } |
4038 | | |
4039 | | static void |
4040 | | iproto_do_cfg(struct iproto_thread *iproto_thread, struct iproto_cfg_msg *msg) |
4041 | 0 | { |
4042 | 0 | msg->iproto_thread = iproto_thread; |
4043 | 0 | int rc = cbus_call(&iproto_thread->net_pipe, &iproto_thread->tx_pipe, |
4044 | 0 | msg, iproto_do_cfg_f); |
4045 | 0 | assert(rc == 0); |
4046 | 0 | (void)rc; |
4047 | 0 | } |
4048 | | |
4049 | | static int |
4050 | | iproto_do_cfg_async_free_f(struct cbus_call_msg *m) |
4051 | 0 | { |
4052 | 0 | free(m); |
4053 | 0 | return 0; |
4054 | 0 | } |
4055 | | |
4056 | | /** |
4057 | | * Sends a configuration message to an IPROTO thread without waiting for |
4058 | | * completion. |
4059 | | * |
4060 | | * The message must be allocated with malloc. |
4061 | | */ |
4062 | | static void |
4063 | | iproto_do_cfg_async(struct iproto_thread *iproto_thread, |
4064 | | struct iproto_cfg_msg *msg) |
4065 | 0 | { |
4066 | 0 | msg->iproto_thread = iproto_thread; |
4067 | 0 | cbus_call_async(&iproto_thread->net_pipe, &iproto_thread->tx_pipe, |
4068 | 0 | msg, iproto_do_cfg_f, iproto_do_cfg_async_free_f); |
4069 | 0 | } |
4070 | | |
4071 | | /** Send IPROTO_CFG_STOP to all threads. */ |
4072 | | static void |
4073 | | iproto_send_stop_msg(void) |
4074 | 0 | { |
4075 | 0 | struct iproto_cfg_msg cfg_msg; |
4076 | 0 | iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_STOP); |
4077 | 0 | for (int i = 0; i < iproto_threads_count; i++) |
4078 | 0 | iproto_do_cfg(&iproto_threads[i], &cfg_msg); |
4079 | 0 | } |
4080 | | |
4081 | | /** Send IPROTO_CFG_START to all threads. */ |
4082 | | static void |
4083 | | iproto_send_start_msg(void) |
4084 | 0 | { |
4085 | 0 | struct iproto_cfg_msg cfg_msg; |
4086 | 0 | iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_START); |
4087 | 0 | for (int i = 0; i < iproto_threads_count; i++) |
4088 | 0 | iproto_do_cfg(&iproto_threads[i], &cfg_msg); |
4089 | 0 | } |
4090 | | |
4091 | | int |
4092 | | iproto_drop_connections(double timeout) |
4093 | 0 | { |
4094 | 0 | static struct latch latch = LATCH_INITIALIZER(latch); |
4095 | 0 | latch_lock(&latch); |
4096 | 0 | struct iproto_connection *owner = NULL; |
4097 | 0 | struct session *session = fiber_get_session(fiber()); |
4098 | 0 | if (session != NULL && session->type == SESSION_TYPE_BINARY) |
4099 | 0 | owner = (struct iproto_connection *)session->meta.connection; |
4100 | 0 | drop_generation++; |
4101 | 0 | drop_pending_thread_count = iproto_threads_count; |
4102 | 0 | for (int i = 0; i < iproto_threads_count; i++) { |
4103 | 0 | struct iproto_cfg_msg *cfg_msg = |
4104 | 0 | (struct iproto_cfg_msg *)xmalloc(sizeof(*cfg_msg)); |
4105 | 0 | iproto_cfg_msg_create(cfg_msg, IPROTO_CFG_DROP_CONNECTIONS); |
4106 | 0 | cfg_msg->drop_connections.owner = owner; |
4107 | 0 | cfg_msg->drop_connections.generation = drop_generation; |
4108 | 0 | iproto_do_cfg_async(&iproto_threads[i], cfg_msg); |
4109 | 0 | } |
4110 | |
|
4111 | 0 | double deadline = ev_monotonic_now(loop()) + timeout; |
4112 | 0 | while (drop_pending_thread_count != 0) { |
4113 | 0 | if (fiber_cond_wait_deadline(&drop_finished_cond, |
4114 | 0 | deadline) != 0) |
4115 | 0 | break; |
4116 | 0 | } |
4117 | 0 | latch_unlock(&latch); |
4118 | 0 | return drop_pending_thread_count == 0 ? 0 : -1; |
4119 | 0 | } |
4120 | | |
4121 | | /** Send IPROTO_CFG_RESTART to all threads. */ |
4122 | | static void |
4123 | | iproto_send_restart_msg(void) |
4124 | 0 | { |
4125 | 0 | struct iproto_cfg_msg cfg_msg; |
4126 | 0 | iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_RESTART); |
4127 | 0 | for (int i = 0; i < iproto_threads_count; i++) |
4128 | 0 | iproto_do_cfg(&iproto_threads[i], &cfg_msg); |
4129 | 0 | } |
4130 | | |
4131 | | int |
4132 | | iproto_listen(const struct uri_set *uri_set) |
4133 | 0 | { |
4134 | | /* |
4135 | | * No need to rebind IPROTO ports in case the configuration is |
4136 | | * the same. However, we should still reload the URIs because |
4137 | | * a URI parameter may store a path to a file (for example, |
4138 | | * an SSL certificate), which could change. |
4139 | | */ |
4140 | 0 | if (uri_set_is_equal(uri_set, &iproto_uris)) { |
4141 | 0 | if (evio_service_reload_uris(&tx_binary) != 0) |
4142 | 0 | return -1; |
4143 | 0 | iproto_send_restart_msg(); |
4144 | 0 | return 0; |
4145 | 0 | } |
4146 | | /* |
4147 | | * Note that we set iproto_uris before trying to bind so even if |
4148 | | * we fail, iproto_uris will still contain the new configuration. |
4149 | | * It's okay because box.cfg.listen is reverted on failure at |
4150 | | * the box.cfg level. |
4151 | | */ |
4152 | 0 | uri_set_destroy(&iproto_uris); |
4153 | 0 | uri_set_copy(&iproto_uris, uri_set); |
4154 | 0 | iproto_send_stop_msg(); |
4155 | 0 | evio_service_stop(&tx_binary); |
4156 | 0 | struct errinj *inj = errinj(ERRINJ_IPROTO_CFG_LISTEN, ERRINJ_INT); |
4157 | 0 | if (inj != NULL && inj->iparam > 0) { |
4158 | 0 | inj->iparam--; |
4159 | 0 | diag_set(ClientError, ER_INJECTION, "iproto listen"); |
4160 | 0 | return -1; |
4161 | 0 | } |
4162 | | /* |
4163 | | * Please note, we bind sockets in main thread, and then |
4164 | | * listen these sockets in all iproto threads! With this |
4165 | | * implementation, we rely on the Linux kernel to distribute |
4166 | | * incoming connections across iproto threads. |
4167 | | */ |
4168 | 0 | if (evio_service_start(&tx_binary, uri_set) != 0) |
4169 | 0 | return -1; |
4170 | 0 | iproto_send_start_msg(); |
4171 | 0 | return 0; |
4172 | 0 | } |
4173 | | |
4174 | | static void |
4175 | | iproto_stats_add(struct iproto_stats *total_stats, |
4176 | | struct iproto_stats *thread_stats) |
4177 | 0 | { |
4178 | 0 | total_stats->mem_used += thread_stats->mem_used; |
4179 | 0 | total_stats->connections += thread_stats->connections; |
4180 | 0 | total_stats->streams += thread_stats->streams; |
4181 | 0 | total_stats->requests += thread_stats->requests; |
4182 | 0 | total_stats->requests_in_stream_queue += |
4183 | 0 | thread_stats->requests_in_stream_queue; |
4184 | 0 | total_stats->requests_in_progress += |
4185 | 0 | thread_stats->requests_in_progress; |
4186 | 0 | } |
4187 | | |
4188 | | void |
4189 | | iproto_stats_get(struct iproto_stats *stats) |
4190 | 0 | { |
4191 | 0 | struct iproto_stats thread_stats; |
4192 | 0 | memset(stats, 0, sizeof(iproto_stats)); |
4193 | 0 | for (int i = 0; i < iproto_threads_count; i++) { |
4194 | 0 | iproto_thread_stats_get(&thread_stats, i); |
4195 | 0 | iproto_stats_add(stats, &thread_stats); |
4196 | 0 | } |
4197 | 0 | } |
4198 | | |
4199 | | void |
4200 | | iproto_thread_stats_get(struct iproto_stats *stats, int thread_id) |
4201 | 0 | { |
4202 | 0 | memset(stats, 0, sizeof(iproto_stats)); |
4203 | 0 | struct iproto_cfg_msg cfg_msg; |
4204 | 0 | iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_STAT); |
4205 | 0 | assert(thread_id >= 0 && thread_id < iproto_threads_count); |
4206 | 0 | cfg_msg.stats = stats; |
4207 | 0 | iproto_do_cfg(&iproto_threads[thread_id], &cfg_msg); |
4208 | 0 | stats->requests_in_progress = |
4209 | 0 | iproto_threads[thread_id].tx.requests_in_progress; |
4210 | 0 | } |
4211 | | |
4212 | | void |
4213 | | iproto_reset_stat(void) |
4214 | 0 | { |
4215 | 0 | for (int i = 0; i < iproto_threads_count; i++) { |
4216 | 0 | rmean_cleanup(iproto_threads[i].rmean); |
4217 | 0 | rmean_cleanup(iproto_threads[i].tx.rmean); |
4218 | 0 | } |
4219 | 0 | } |
4220 | | |
4221 | | int |
4222 | | iproto_set_msg_max(int new_iproto_msg_max) |
4223 | 0 | { |
4224 | 0 | if (new_iproto_msg_max < IPROTO_MSG_MAX_MIN) { |
4225 | 0 | diag_set(ClientError, ER_CFG, "net_msg_max", |
4226 | 0 | tt_sprintf("minimal value is %d", IPROTO_MSG_MAX_MIN)); |
4227 | 0 | return -1; |
4228 | 0 | } |
4229 | 0 | struct iproto_cfg_msg cfg_msg; |
4230 | 0 | iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_MSG_MAX); |
4231 | 0 | cfg_msg.iproto_msg_max = new_iproto_msg_max; |
4232 | 0 | for (int i = 0; i < iproto_threads_count; i++) { |
4233 | 0 | iproto_do_cfg(&iproto_threads[i], &cfg_msg); |
4234 | 0 | cpipe_set_max_input(&iproto_threads[i].net_pipe, |
4235 | 0 | new_iproto_msg_max / 2); |
4236 | 0 | } |
4237 | 0 | return 0; |
4238 | 0 | } |
4239 | | |
4240 | | int |
4241 | | iproto_session_new(struct iostream *io, struct user *user, uint64_t *sid) |
4242 | 0 | { |
4243 | 0 | assert(iostream_is_initialized(io)); |
4244 | 0 | if (iproto_is_shutting_down) { |
4245 | 0 | diag_set(ClientError, ER_SHUTDOWN); |
4246 | 0 | return -1; |
4247 | 0 | } |
4248 | 0 | struct session *session = session_new(SESSION_TYPE_BACKGROUND); |
4249 | 0 | if (user != NULL) |
4250 | 0 | credentials_reset(&session->credentials, user); |
4251 | 0 | struct iproto_cfg_msg *cfg_msg = |
4252 | 0 | (struct iproto_cfg_msg *)xmalloc(sizeof(*cfg_msg)); |
4253 | 0 | iproto_cfg_msg_create(cfg_msg, IPROTO_CFG_SESSION_NEW); |
4254 | 0 | iostream_move(&cfg_msg->session_new.io, io); |
4255 | 0 | cfg_msg->session_new.session = session; |
4256 | 0 | static int thread = 0; |
4257 | 0 | thread = (thread + 1) % iproto_threads_count; |
4258 | 0 | iproto_do_cfg_async(&iproto_threads[thread], cfg_msg); |
4259 | 0 | *sid = session->id; |
4260 | 0 | return 0; |
4261 | 0 | } |
4262 | | |
4263 | | static void |
4264 | | iproto_cfg_override(uint32_t req_type, bool is_set) |
4265 | 0 | { |
4266 | 0 | struct iproto_cfg_msg cfg_msg; |
4267 | 0 | iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_OVERRIDE); |
4268 | 0 | cfg_msg.override.req_type = req_type; |
4269 | 0 | cfg_msg.override.is_set = is_set; |
4270 | 0 | for (int i = 0; i < iproto_threads_count; ++i) |
4271 | 0 | iproto_do_cfg(&iproto_threads[i], &cfg_msg); |
4272 | 0 | } |
4273 | | |
4274 | | int |
4275 | | iproto_session_send(struct session *session, |
4276 | | const char *header, const char *header_end, |
4277 | | const char *body, const char *body_end) |
4278 | 0 | { |
4279 | 0 | assert(session->type == SESSION_TYPE_BINARY); |
4280 | 0 | struct iproto_connection *con = |
4281 | 0 | (struct iproto_connection *)session->meta.connection; |
4282 | 0 | if (con->state != IPROTO_CONNECTION_ALIVE) { |
4283 | 0 | diag_set(ClientError, ER_SESSION_CLOSED); |
4284 | 0 | return -1; |
4285 | 0 | } |
4286 | | |
4287 | 0 | struct obuf *out = con->tx.p_obuf; |
4288 | 0 | struct obuf_svp svp = obuf_create_svp(out); |
4289 | 0 | ptrdiff_t header_size = header_end - header; |
4290 | 0 | ptrdiff_t body_size = body_end - body; |
4291 | 0 | ptrdiff_t packet_size = 5 + header_size + body_size; |
4292 | 0 | char *p = (char *)xobuf_alloc(out, packet_size); |
4293 | 0 | *(p++) = INT8_C(0xce); |
4294 | 0 | p = mp_store_u32(p, packet_size - 5); |
4295 | 0 | memcpy(p, header, header_size); |
4296 | 0 | p += header_size; |
4297 | 0 | memcpy(p, body, body_size); |
4298 | 0 | tx_push(con, &svp); |
4299 | | /* |
4300 | | * The control yield is solely for enforcing the fact this function |
4301 | | * yields — in the future we may implement back pressure based on this. |
4302 | | */ |
4303 | 0 | fiber_sleep(0); |
4304 | 0 | return 0; |
4305 | 0 | } |
4306 | | |
4307 | | int |
4308 | | iproto_shutdown(double timeout) |
4309 | 0 | { |
4310 | 0 | assert(iproto_is_shutting_down); |
4311 | 0 | return iproto_drop_connections(timeout); |
4312 | 0 | } |
4313 | | |
4314 | | void |
4315 | | iproto_free(void) |
4316 | 0 | { |
4317 | 0 | for (int i = 0; i < iproto_threads_count; i++) { |
4318 | 0 | cbus_stop_loop(&iproto_threads[i].net_pipe); |
4319 | 0 | cpipe_destroy(&iproto_threads[i].net_pipe); |
4320 | 0 | if (cord_join(&iproto_threads[i].net_cord) != 0) |
4321 | 0 | panic_syserror("iproto cord join failed"); |
4322 | 0 | mh_i32_delete(iproto_threads[i].req_handlers); |
4323 | | /* |
4324 | | * Close socket descriptor to prevent hot standby instance |
4325 | | * failing to bind in case it tries to bind before socket |
4326 | | * is closed by OS. |
4327 | | */ |
4328 | 0 | evio_service_detach(&iproto_threads[i].binary); |
4329 | 0 | rmean_delete(iproto_threads[i].rmean); |
4330 | 0 | rmean_delete(iproto_threads[i].tx.rmean); |
4331 | 0 | slab_cache_destroy(&iproto_threads[i].net_slabc); |
4332 | 0 | } |
4333 | 0 | free(iproto_threads); |
4334 | |
|
4335 | 0 | mh_int_t i; |
4336 | 0 | mh_foreach(tx_req_handlers, i) { |
4337 | 0 | struct mh_i32ptr_node_t *node = |
4338 | 0 | mh_i32ptr_node(tx_req_handlers, i); |
4339 | 0 | struct iproto_req_handlers *handlers = |
4340 | 0 | (struct iproto_req_handlers *)node->val; |
4341 | 0 | iproto_req_handlers_delete(handlers); |
4342 | 0 | } |
4343 | 0 | mh_i32ptr_delete(tx_req_handlers); |
4344 | 0 | fiber_cond_destroy(&drop_finished_cond); |
4345 | | |
4346 | | /* |
4347 | | * Here we close sockets and unlink all unix socket paths. |
4348 | | * in case it's unix sockets. |
4349 | | */ |
4350 | 0 | evio_service_stop(&tx_binary); |
4351 | 0 | } |
4352 | | |
4353 | | static int |
4354 | | iproto_thread_rmean_foreach_impl(struct rmean *rmean, void *cb, void *cb_ctx) |
4355 | 0 | { |
4356 | 0 | int rc = 0; |
4357 | 0 | for (size_t i = 0; i < rmean->stats_n; i++) { |
4358 | 0 | int64_t mean = rmean_mean(rmean, i); |
4359 | 0 | int64_t total = rmean_total(rmean, i); |
4360 | 0 | if (((rmean_cb)cb)(rmean->stats[i].name, mean, |
4361 | 0 | total, cb_ctx) != 0) |
4362 | 0 | rc = 1; |
4363 | 0 | } |
4364 | 0 | return rc; |
4365 | 0 | } |
4366 | | |
4367 | | /** |
4368 | | * We use offset of rmean in struct iproto_thread, instead of pointer to |
4369 | | * rmean, because we should iterate over all same rmeans for all iproto |
4370 | | * threads. |
4371 | | */ |
4372 | | static int |
4373 | | iproto_rmean_foreach_impl(ptrdiff_t rmean_offset, void *cb, void *cb_ctx) |
4374 | 0 | { |
4375 | 0 | struct rmean *rmean0 = |
4376 | 0 | *(struct rmean **)((char *)&iproto_threads[0] + rmean_offset); |
4377 | 0 | for (size_t i = 0; i < rmean0->stats_n; i++) { |
4378 | 0 | int64_t mean = 0; |
4379 | 0 | int64_t total = 0; |
4380 | 0 | for (int j = 0; j < iproto_threads_count; j++) { |
4381 | 0 | struct rmean *rmean = |
4382 | 0 | *(struct rmean **) |
4383 | 0 | ((char *)&iproto_threads[j] + rmean_offset); |
4384 | 0 | assert(rmean == iproto_threads[j].rmean || |
4385 | 0 | rmean == iproto_threads[j].tx.rmean); |
4386 | 0 | mean += rmean_mean(rmean, i); |
4387 | 0 | total += rmean_total(rmean, i); |
4388 | 0 | } |
4389 | 0 | int rc = ((rmean_cb)cb)(rmean0->stats[i].name, mean, |
4390 | 0 | total, cb_ctx); |
4391 | 0 | if (rc != 0) |
4392 | 0 | return rc; |
4393 | 0 | } |
4394 | 0 | return 0; |
4395 | 0 | } |
4396 | | |
4397 | | int |
4398 | | iproto_rmean_foreach(void *cb, void *cb_ctx) |
4399 | 0 | { |
4400 | 0 | int rc; |
4401 | 0 | rc = iproto_rmean_foreach_impl(offsetof(struct iproto_thread, rmean), |
4402 | 0 | cb, cb_ctx); |
4403 | 0 | if (rc != 0) |
4404 | 0 | return rc; |
4405 | 0 | rc = iproto_rmean_foreach_impl(offsetof(struct iproto_thread, tx.rmean), |
4406 | 0 | cb, cb_ctx); |
4407 | 0 | if (rc != 0) |
4408 | 0 | return rc; |
4409 | 0 | return 0; |
4410 | 0 | } |
4411 | | |
4412 | | int |
4413 | | iproto_thread_rmean_foreach(int thread_id, void *cb, void *cb_ctx) |
4414 | 0 | { |
4415 | 0 | assert(thread_id >= 0 && thread_id < iproto_threads_count); |
4416 | 0 | int rc = 0; |
4417 | 0 | if (iproto_thread_rmean_foreach_impl(iproto_threads[thread_id].rmean, |
4418 | 0 | cb, cb_ctx) != 0) |
4419 | 0 | rc = 1; |
4420 | 0 | if (iproto_thread_rmean_foreach_impl(iproto_threads[thread_id].tx.rmean, |
4421 | 0 | cb, cb_ctx) != 0) |
4422 | 0 | rc = 1; |
4423 | 0 | return rc; |
4424 | 0 | } |
4425 | | |
4426 | | int |
4427 | | iproto_override(uint32_t req_type, iproto_handler_t cb, |
4428 | | iproto_handler_destroy_t destroy, void *ctx) |
4429 | 0 | { |
4430 | 0 | if (!is_iproto_override_supported(req_type)) { |
4431 | 0 | const char *feature = tt_sprintf("%s request type", |
4432 | 0 | iproto_type_name(req_type)); |
4433 | 0 | diag_set(ClientError, ER_UNSUPPORTED, |
4434 | 0 | "IPROTO request handler overriding", feature); |
4435 | 0 | return -1; |
4436 | 0 | } |
4437 | | |
4438 | 0 | struct iproto_req_handlers *handlers; |
4439 | 0 | handlers = mh_req_handlers_get(req_type); |
4440 | 0 | bool is_set = iproto_req_handler_is_set(handlers); |
4441 | |
|
4442 | 0 | if (handlers != NULL && handlers->c.destroy != NULL) |
4443 | 0 | handlers->c.destroy(handlers->c.ctx); |
4444 | |
|
4445 | 0 | if (cb != NULL) { |
4446 | 0 | if (handlers == NULL) { |
4447 | 0 | handlers = iproto_req_handlers_new(); |
4448 | 0 | mh_req_handlers_put(req_type, handlers); |
4449 | 0 | } |
4450 | 0 | handlers->c.cb = cb; |
4451 | 0 | handlers->c.destroy = destroy; |
4452 | 0 | handlers->c.ctx = ctx; |
4453 | 0 | } else if (handlers != NULL) { |
4454 | 0 | handlers->c.cb = NULL; |
4455 | 0 | handlers->c.destroy = NULL; |
4456 | 0 | handlers->c.ctx = NULL; |
4457 | 0 | } |
4458 | |
|
4459 | 0 | iproto_override_finish(handlers, req_type, is_set); |
4460 | 0 | return 0; |
4461 | 0 | } |