Coverage Report

Created: 2024-02-25 06:30

/src/tarantool/src/box/iproto.cc
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright 2010-2016, Tarantool AUTHORS, please see AUTHORS file.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 * 1. Redistributions of source code must retain the above
9
 *    copyright notice, this list of conditions and the
10
 *    following disclaimer.
11
 *
12
 * 2. Redistributions in binary form must reproduce the above
13
 *    copyright notice, this list of conditions and the following
14
 *    disclaimer in the documentation and/or other materials
15
 *    provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
18
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
21
 * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
22
 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
25
 * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
28
 * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
29
 * SUCH DAMAGE.
30
 */
31
#include "iproto.h"
32
#include <string.h>
33
#include <stdarg.h>
34
#include <stdio.h>
35
#include <fcntl.h>
36
#include <ctype.h>
37
38
#include <msgpuck.h>
39
#include <small/ibuf.h>
40
#include <small/obuf.h>
41
#include <base64.h>
42
43
#include "version.h"
44
#include "event.h"
45
#include "func_adapter.h"
46
#include "fiber.h"
47
#include "fiber_cond.h"
48
#include "cbus.h"
49
#include "say.h"
50
#include "sio.h"
51
#include "evio.h"
52
#include "iostream.h"
53
#include "scoped_guard.h"
54
#include "memory.h"
55
#include "random.h"
56
57
#include "bind.h"
58
#include "port.h"
59
#include "box.h"
60
#include "call.h"
61
#include "tuple_convert.h"
62
#include "session.h"
63
#include "xrow.h"
64
#include "schema.h" /* schema_version */
65
#include "replication.h" /* instance_uuid */
66
#include "iproto_constants.h"
67
#include "iproto_features.h"
68
#include "rmean.h"
69
#include "execute.h"
70
#include "errinj.h"
71
#include "tt_static.h"
72
#include "trivia/util.h"
73
#include "salad/stailq.h"
74
#include "txn.h"
75
#include "on_shutdown.h"
76
#include "flightrec.h"
77
#include "security.h"
78
#include "watcher.h"
79
#include "box/mp_box_ctx.h"
80
#include "box/tuple.h"
81
#include "mpstream/mpstream.h"
82
83
enum {
84
  IPROTO_PACKET_SIZE_MAX = 2UL * 1024 * 1024 * 1024,
85
};
86
87
enum {
88
   ENDPOINT_NAME_MAX = 10
89
};
90
91
struct iproto_connection;
92
struct iproto_msg;
93
94
struct iproto_stream {
95
  /** Currently active stream transaction or NULL */
96
  struct txn *txn;
97
  /**
98
   * Queue of pending requests (iproto messages) for this stream,
99
   * processed sequentially. This field is accesable only from
100
   * iproto thread. Queue items has iproto_msg type.
101
   */
102
  struct stailq pending_requests;
103
  /** Id of this stream, used as a key in streams hash table */
104
  uint64_t id;
105
  /** This stream connection */
106
  struct iproto_connection *connection;
107
  /**
108
   * Pre-allocated disconnect msg to gracefully rollback stream
109
   * transaction and destroy stream object.
110
   */
111
  struct cmsg on_disconnect;
112
  /**
113
   * Message currently being processed in the tx thread.
114
   * This field is accesable only from iproto thread.
115
   */
116
  struct iproto_msg *current;
117
};
118
119
/**
120
 * A position in connection output buffer.
121
 * Since we use rotating buffers to recycle memory,
122
 * it includes not only a position in obuf, but also
123
 * a pointer to obuf the position is for.
124
 */
125
struct iproto_wpos {
126
  struct obuf *obuf;
127
  struct obuf_svp svp;
128
};
129
130
static void
131
iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out)
132
0
{
133
0
  wpos->obuf = out;
134
0
  wpos->svp = obuf_create_svp(out);
135
0
}
136
137
/**
138
 * Message sent when iproto thread dropped all connections that requested
139
 * to be dropped.
140
 */
141
struct iproto_drop_finished {
142
  /** Base structure. */
143
  struct cmsg base;
144
  /**
145
   * Generation a is a sequence number of iproto_drop_connections()
146
   * invocation.
147
   *
148
   * Generation is used to handle racy situation when previous invocation
149
   * of iproto_drop_connections() was failed and there is new invocation.
150
   * Message from previous invocation may be delivired and account
151
   * iproto thread as finished dropping connection which is not true.
152
   */
153
  unsigned generation;
154
};
155
156
struct iproto_thread {
157
  /**
158
   * Slab cache used for allocating memory for output network buffers
159
   * in the tx thread.
160
   */
161
  struct slab_cache net_slabc;
162
  /**
163
   * Network thread execution unit.
164
   */
165
  struct cord net_cord;
166
  /**
167
   * A single global queue for all requests in all connections. All
168
   * requests from all connections are processed concurrently.
169
   * Is also used as a queue for just established connections and to
170
   * execute disconnect triggers. A few notes about these triggers:
171
   * - they need to be run in a fiber
172
   * - unlike an ordinary request failure, on_connect trigger
173
   *   failure must lead to connection close.
174
   * - on_connect trigger must be processed before any other
175
   *   request on this connection.
176
   */
177
  struct cpipe tx_pipe;
178
  struct cpipe net_pipe;
179
  /**
180
   * Static routes for this iproto thread
181
   */
182
  struct cmsg_hop begin_route[2];
183
  struct cmsg_hop commit_route[2];
184
  struct cmsg_hop rollback_route[2];
185
  struct cmsg_hop rollback_on_disconnect_route[2];
186
  struct cmsg_hop destroy_route[2];
187
  struct cmsg_hop disconnect_route[2];
188
  struct cmsg_hop misc_route[2];
189
  struct cmsg_hop call_route[2];
190
  struct cmsg_hop select_route[2];
191
  struct cmsg_hop process1_route[2];
192
  struct cmsg_hop sql_route[2];
193
  struct cmsg_hop join_route[2];
194
  struct cmsg_hop subscribe_route[2];
195
  struct cmsg_hop error_route[2];
196
  struct cmsg_hop push_route[2];
197
  struct cmsg_hop *dml_route[IPROTO_TYPE_STAT_MAX];
198
  struct cmsg_hop connect_route[2];
199
  struct cmsg_hop override_route[2];
200
  /*
201
   * Set of overridden request handlers. Used by IPROTO thread to skip
202
   * request preprocessing and use the 'override' route.
203
   */
204
  mh_i32_t *req_handlers;
205
  /*
206
   * Iproto thread memory pools
207
   */
208
  struct mempool iproto_msg_pool;
209
  struct mempool iproto_connection_pool;
210
  struct mempool iproto_stream_pool;
211
  /*
212
   * List of stopped connections
213
   */
214
  struct rlist stopped_connections;
215
  /*
216
   * Iproto thread stat
217
   */
218
  struct rmean *rmean;
219
  /*
220
   * Iproto thread id
221
   */
222
  uint32_t id;
223
  /** Array of iproto binary listeners */
224
  struct evio_service binary;
225
  /** Requests count currently pending in stream queue. */
226
  size_t requests_in_stream_queue;
227
  /** List of all connections. */
228
  struct rlist connections;
229
  /** Number of connections that pending drop. */
230
  size_t drop_pending_connection_count;
231
  /**
232
   * Message used to notify TX thread that all connections marked
233
   * to de dropped are dropped.
234
   */
235
  struct iproto_drop_finished drop_finished_msg;
236
  /**
237
   * If set then iproto thread shutdown is started and we should not
238
   * accept new connections.
239
   */
240
  bool is_shutting_down;
241
  /**
242
   * The following fields are used exclusively by the tx thread.
243
   * Align them to prevent false-sharing.
244
   */
245
  struct {
246
    alignas(CACHELINE_SIZE)
247
    /** Request count currently processed by tx thread. */
248
    size_t requests_in_progress;
249
    /** Iproto thread stat collected in tx thread. */
250
    struct rmean *rmean;
251
  } tx;
252
};
253
254
/** Condition for drop finished. */
255
static struct fiber_cond drop_finished_cond;
256
/** Count of iproto threads that are not finished connections drop yet. */
257
static size_t drop_pending_thread_count;
258
/**
259
 * Generation is a sequence number of dropping connection invocation.
260
 *
261
 * See also `struct iproto_drop_finished`.
262
 */
263
static unsigned drop_generation;
264
265
/**
266
 * IPROTO listen URIs. Set by box.cfg.listen.
267
 */
268
static struct uri_set iproto_uris;
269
270
static struct iproto_thread *iproto_threads;
271
int iproto_threads_count;
272
/**
273
 * This binary contains all bind socket properties, like
274
 * address the iproto listens for. Is kept in TX to be
275
 * shown in box.info. It should be global, because it contains
276
 * properties, and should be accessible from differnent functions
277
 * in tx thread.
278
 */
279
static struct evio_service tx_binary;
280
281
/**
282
 * In Greek mythology, Kharon is the ferryman who carries souls
283
 * of the newly deceased across the river Styx that divided the
284
 * world of the living from the world of the dead. Here Kharon is
285
 * a cbus message and does similar work. It notifies the iproto
286
 * thread about new data in a connection output buffer and carries
287
 * back to tx thread the position in the output buffer which has
288
 * been successfully flushed to the socket. Styx here is cpipe,
289
 * and the boat is cbus message.
290
 */
291
struct iproto_kharon {
292
  struct cmsg base;
293
  /**
294
   * Tx thread sets wpos to the current position in the
295
   * output buffer and sends the message to iproto thread.
296
   * Iproto returns the message to tx after setting wpos
297
   * to the last flushed position (similarly to
298
   * iproto_msg.wpos).
299
   */
300
  struct iproto_wpos wpos;
301
};
302
303
/**
304
 * Network readahead. A signed integer to avoid
305
 * automatic type coercion to an unsigned type.
306
 * We assign it without locks in txn thread and
307
 * use in iproto thread -- it's OK that
308
 * readahead has a stale value while until the thread
309
 * caches have synchronized, after all, it's used
310
 * in new connections only.
311
 *
312
 * Notice that the default is not a strict power of two.
313
 * slab metadata takes some space, and we want
314
 * allocation steps to be correlated to slab buddy
315
 * sizes, so when we ask slab cache for 16320 bytes,
316
 * we get a slab of size 16384, not 32768.
317
 */
318
unsigned iproto_readahead = 16320;
319
320
/* The maximal number of iproto messages in fly. */
321
static int iproto_msg_max = IPROTO_MSG_MAX_MIN;
322
323
/**
324
 * Request handlers meta information. The IPROTO request of each type can be
325
 * overridden by the following types of handlers (listed in priority order):
326
 *  1. Lua handlers, set in the event registry by request type id;
327
 *  2. Lua handlers, set in the event registry by request type name;
328
 *  3. C handler, set by `iproto_override()'.
329
 */
330
struct iproto_req_handlers {
331
  /**
332
   * Triggers from the event registry, set by request type id.
333
   * NULL if no such triggers.
334
   */
335
  struct event *event_by_id;
336
  /**
337
   * Triggers from the event registry, set by request type name.
338
   * NULL if no such triggers.
339
   */
340
  struct event *event_by_name;
341
  /**
342
   * C request handler.
343
   */
344
  struct {
345
    /** C request handler. NULL if not set. */
346
    iproto_handler_t cb;
347
    /** C request handler destructor, can be NULL. */
348
    iproto_handler_destroy_t destroy;
349
    /** Context passed to the handler and destructor. */
350
    void *ctx;
351
  } c;
352
};
353
354
/**
355
 * Request handler table used in TX thread for processing requests with
356
 * overridden handlers.
357
 */
358
static mh_i32ptr_t *tx_req_handlers;
359
360
/**
361
 * If set then iproto shutdown is started and we should not accept new
362
 * connections.
363
 */
364
static bool iproto_is_shutting_down;
365
366
/** Available iproto configuration changes. */
367
enum iproto_cfg_op {
368
  /** Command code to set max input for iproto thread */
369
  IPROTO_CFG_MSG_MAX,
370
  /**
371
   * Command code to start listen socket contained
372
   * in evio_service object
373
   */
374
  IPROTO_CFG_START,
375
  /**
376
   * Command code to stop listen socket contained
377
   * in evio_service object. In case when user sets
378
   * new parameters for iproto, it is necessary to stop
379
   * listen sockets in iproto threads before reconfiguration.
380
   */
381
  IPROTO_CFG_STOP,
382
  /**
383
   * Equivalent to IPROTO_CFG_STOP followed by IPROTO_CFG_START.
384
   */
385
  IPROTO_CFG_RESTART,
386
  /**
387
   * Command code do get statistic from iproto thread
388
   */
389
  IPROTO_CFG_STAT,
390
  /**
391
   * Command code to notify IPROTO threads a new handler has been set or
392
   * reset.
393
   */
394
  IPROTO_CFG_OVERRIDE,
395
  /**
396
   * Command code to create a new IPROTO session.
397
   */
398
  IPROTO_CFG_SESSION_NEW,
399
  /**
400
   * Command code to drop all current connections.
401
   */
402
  IPROTO_CFG_DROP_CONNECTIONS,
403
  IPROTO_CFG_SHUTDOWN,
404
};
405
406
/**
407
 * Since there is no way to "synchronously" change the
408
 * state of the io thread, to change the listen port or max
409
 * message count in flight send a special message to iproto
410
 * thread.
411
 */
412
struct iproto_cfg_msg: public cbus_call_msg
413
{
414
  /** Operation to execute in iproto thread. */
415
  enum iproto_cfg_op op;
416
  union {
417
    /** Pointer to the statistic structure. */
418
    struct iproto_stats *stats;
419
    /** New iproto max message count. */
420
    int iproto_msg_max;
421
    struct {
422
      /** New connection IO stream. */
423
      struct iostream io;
424
      /** New connection session. */
425
      struct session *session;
426
    } session_new;
427
    struct {
428
      /** Overridden request type. */
429
      uint32_t req_type;
430
      /** Whether the request handler is set or reset. */
431
      bool is_set;
432
    } override;
433
    struct {
434
      /**
435
       * Connection that executing iproto_drop_connections.
436
       * NULL if the function is called not from connection.
437
       */
438
      struct iproto_connection *owner;
439
      /**
440
       * Generation is sequence number of dropping
441
       * connection invocation.
442
       *
443
       * See also `struct iproto_drop_finished`.
444
       */
445
      unsigned generation;
446
    } drop_connections;
447
  };
448
  struct iproto_thread *iproto_thread;
449
};
450
451
static inline void
452
iproto_cfg_msg_create(struct iproto_cfg_msg *msg, enum iproto_cfg_op op)
453
0
{
454
0
  memset(msg, 0, sizeof(*msg));
455
0
  msg->op = op;
456
0
}
457
458
/**
459
 * Sends a configuration message to an IPROTO thread and waits for completion.
460
 *
461
 * The message may be allocated on stack.
462
 */
463
static void
464
iproto_do_cfg(struct iproto_thread *iproto_thread, struct iproto_cfg_msg *msg);
465
466
int
467
iproto_addr_count(void)
468
0
{
469
0
  return evio_service_count(&tx_binary);
470
0
}
471
472
const char *
473
iproto_addr_str(char *buf, int idx)
474
0
{
475
0
  socklen_t size;
476
0
  const struct sockaddr *addr = evio_service_addr(&tx_binary, idx, &size);
477
0
  sio_addr_snprintf(buf, SERVICE_NAME_MAXLEN, addr, size);
478
0
  return buf;
479
0
}
480
481
/**
482
 * How big is a buffer which needs to be shrunk before
483
 * it is put back into buffer cache.
484
 */
485
static inline unsigned
486
iproto_max_input_size(void)
487
0
{
488
0
  return 18 * iproto_readahead;
489
0
}
490
491
void
492
iproto_reset_input(struct ibuf *ibuf)
493
0
{
494
  /*
495
   * If we happen to have fully processed the input,
496
   * move the pos to the start of the input buffer.
497
   */
498
0
  assert(ibuf_used(ibuf) == 0);
499
0
  if (ibuf_capacity(ibuf) < iproto_max_input_size()) {
500
0
    ibuf_reset(ibuf);
501
0
  } else {
502
0
    struct slab_cache *slabc = ibuf->slabc;
503
0
    ibuf_destroy(ibuf);
504
0
    ibuf_create(ibuf, slabc, iproto_readahead);
505
0
  }
506
0
}
507
508
/* {{{ iproto_msg - declaration */
509
510
/**
511
 * A single msg from io thread. All requests
512
 * from all connections are queued into a single queue
513
 * and processed in FIFO order.
514
 */
515
struct iproto_msg
516
{
517
  struct cmsg base;
518
  struct iproto_connection *connection;
519
520
  /* --- Box msgs - actual requests for the transaction processor --- */
521
  /* Request message code and sync. */
522
  struct xrow_header header;
523
  union {
524
    /** Connect. */
525
    struct {
526
      union {
527
        /** Peer address. */
528
        struct sockaddr addr;
529
        /** Peer address storage. */
530
        struct sockaddr_storage addrstorage;
531
      };
532
      /** Peer address size. */
533
      socklen_t addrlen;
534
      /**
535
       * Session to use for the new connection.
536
       * Optional. If omitted, a new session object
537
       * will be created in the TX thread.
538
       */
539
      struct session *session;
540
    } connect;
541
    /** Box request, if this is a DML */
542
    struct request dml;
543
    /** Box request, if this is a call or eval. */
544
    struct call_request call;
545
    /** Watch request. */
546
    struct watch_request watch;
547
    /** Authentication request. */
548
    struct auth_request auth;
549
    /** Features request. */
550
    struct id_request id;
551
    /** SQL request, if this is the EXECUTE/PREPARE request. */
552
    struct sql_request sql;
553
    /** BEGIN request */
554
    struct begin_request begin;
555
    /** COMMIT request */
556
    struct commit_request commit;
557
    /** In case of iproto parse error, saved diagnostics. */
558
    struct diag diag;
559
  };
560
  /**
561
   * Input buffer which stores the request data. It can be
562
   * discarded only when the message returns to iproto thread.
563
   */
564
  struct ibuf *p_ibuf;
565
  /**
566
   * How much space the request takes in the
567
   * input buffer (len, header and body - all of it)
568
   * This also works as a reference counter to
569
   * ibuf object.
570
   */
571
  size_t len;
572
  /**
573
   * Pointer to the start of unparsed request stored in @a p_ibuf.
574
   * It is used to dump request to flight recorder (if available) in
575
   * TX thread. It is guaranteed that @a reqstart points to the valid
576
   * position: rpos of input buffer is moved after processing the message;
577
   * meanwhile requests are handled in the order they are stored in
578
   * the buffer.
579
   */
580
  const char *reqstart;
581
  /**
582
   * Position in the connection output buffer. When sending a
583
   * message to the tx thread, iproto sets it to its current
584
   * flush position so that tx can reuse a buffer that has been
585
   * flushed. The tx thread, in turn, sets it to the end of the
586
   * data it has just written, to let iproto know that there is
587
   * more output to flush.
588
   */
589
  struct iproto_wpos wpos;
590
  /**
591
   * Message sent by the tx thread to notify iproto that input has
592
   * been processed and can be discarded before request completion.
593
   * Used by long (yielding) CALL/EVAL requests.
594
   */
595
  struct cmsg discard_input;
596
  /**
597
   * Used in "connect" msgs, true if connect trigger failed
598
   * and the connection must be closed.
599
   */
600
  bool close_connection;
601
  /**
602
   * A stailq_entry to hold message in stream.
603
   * All messages processed in stream sequently. Before processing
604
   * all messages added to queue of pending requests. If this queue
605
   * was empty message begins to be processed, otherwise it waits until
606
   * all previous messages are processed.
607
   */
608
  struct stailq_entry in_stream;
609
  /** Stream that owns this message, or NULL. */
610
  struct iproto_stream *stream;
611
  /** Link in connection->tx.inprogress. */
612
  struct rlist in_inprogress;
613
  /** TX thread fiber that processing this message. */
614
  struct fiber *fiber;
615
};
616
617
/**
618
 * Resume stopped connections, if any.
619
 */
620
static void
621
iproto_resume(struct iproto_thread *iproto_thread);
622
623
/**
624
 * Prepares IPROTO message: decodes the message header, checks the message's
625
 * stream identifier, and set's the message's cbus route.
626
 */
627
static void
628
iproto_msg_prepare(struct iproto_msg *msg, const char **pos,
629
       const char *reqend);
630
631
enum rmean_net_name {
632
  IPROTO_SENT,
633
  IPROTO_RECEIVED,
634
  IPROTO_CONNECTIONS,
635
  IPROTO_REQUESTS,
636
  IPROTO_STREAMS,
637
  REQUESTS_IN_STREAM_QUEUE,
638
  RMEAN_NET_LAST,
639
};
640
641
const char *rmean_net_strings[RMEAN_NET_LAST] = {
642
  "SENT",
643
  "RECEIVED",
644
  "CONNECTIONS",
645
  "REQUESTS",
646
  "STREAMS",
647
  "REQUESTS_IN_STREAM_QUEUE",
648
};
649
650
enum rmean_tx_name {
651
  REQUESTS_IN_PROGRESS,
652
  RMEAN_TX_LAST,
653
};
654
655
const char *rmean_tx_strings[RMEAN_TX_LAST] = {
656
  "REQUESTS_IN_PROGRESS",
657
};
658
659
static void
660
tx_process_destroy(struct cmsg *m);
661
662
static void
663
net_finish_destroy(struct cmsg *m);
664
665
/** Fire on_disconnect triggers in the tx thread. */
666
static void
667
tx_process_disconnect(struct cmsg *m);
668
669
/** Send destroy message to tx thread. */
670
static void
671
net_finish_disconnect(struct cmsg *m);
672
673
/**
674
 * Kharon is in the dead world (iproto). Schedule an event to
675
 * flush new obuf as reflected in the fresh wpos.
676
 * @param m Kharon.
677
 */
678
static void
679
iproto_process_push(struct cmsg *m);
680
681
/**
682
 * Kharon returns to the living world (tx) back from the dead one
683
 * (iproto). Check if a new push is pending and make a new trip
684
 * to iproto if necessary.
685
 * @param m Kharon.
686
 */
687
static void
688
tx_end_push(struct cmsg *m);
689
690
/* }}} */
691
692
/* {{{ iproto_connection - declaration and definition */
693
694
/** Connection life cycle stages. */
695
enum iproto_connection_state {
696
  /**
697
   * A connection is always alive in the beginning because
698
   * takes an already active socket in a constructor.
699
   */
700
  IPROTO_CONNECTION_ALIVE,
701
  /**
702
   * Socket was closed, a notification is sent to the TX
703
   * thread to close the session.
704
   */
705
  IPROTO_CONNECTION_CLOSED,
706
  /**
707
   * TX thread was notified about close, but some requests
708
   * are still not finished. That state may be skipped in
709
   * case the connection was already idle (not having
710
   * unfinished requests) at the moment of closing.
711
   */
712
  IPROTO_CONNECTION_PENDING_DESTROY,
713
  /**
714
   * All requests are finished, a destroy request is sent to
715
   * the TX thread.
716
   */
717
  IPROTO_CONNECTION_DESTROYED,
718
};
719
720
/**
721
 * Context of a single client connection.
722
 * Interaction scheme:
723
 *
724
 *  Receive from the network.
725
 *     |
726
 * +---|---------------------+   +------------+
727
 * |   |      iproto thread  |   | tx thread  |
728
 * |   v                     |   |            |
729
 * | ibuf[0]- - - - - - - - -|- -|- - >+      |
730
 * |                         |   |     |      |
731
 * |           ibuf[1]       |   |     |      |
732
 * |                         |   |     |      |
733
 * | obuf[0] <- - - - - - - -|- -|- - -+      |
734
 * |    |                    |   |     |      |
735
 * |    |      obuf[1] <- - -|- -|- - -+      |
736
 * +----|-----------|--------+   +------------+
737
 *      |           v
738
 *      |        Send to
739
 *      |        network.
740
 *      v
741
 * Send to network after obuf[1], i.e. older responses are sent first.
742
 *
743
 * ibuf structure:
744
 *                   rpos             wpos           end
745
 * +-------------------|----------------|-------------+
746
 * \________/\________/ \________/\____/
747
 *  \  msg       msg /    msg     parse
748
 *   \______________/             size
749
 *   response is sent,
750
 *     messages are
751
 *      discarded
752
 */
753
struct iproto_connection
754
{
755
  /**
756
   * Two rotating buffers for input. Input is first read into
757
   * ibuf[0]. As soon as it buffer becomes full, the buffers are
758
   * rotated. When all input buffers are used up, the input
759
   * is suspended. The buffer becomes available for use
760
   * again when tx thread completes processing the messages
761
   * stored in the buffer.
762
   */
763
  struct ibuf ibuf[2];
764
  /** Pointer to the current buffer. */
765
  struct ibuf *p_ibuf;
766
  /**
767
   * Number of not yet processed messages in the corresponding
768
   * input buffer.
769
   */
770
  size_t input_msg_count[2];
771
  /**
772
   * Two rotating buffers for output. The tx thread switches to
773
   * another buffer if it finds it to be empty (flushed out).
774
   * This guarantees that memory gets recycled as soon as output
775
   * is flushed by the iproto thread.
776
   */
777
  struct obuf obuf[2];
778
  /**
779
   * Position in the output buffer that points to the beginning
780
   * of the data awaiting to be flushed. Advanced by the iproto
781
   * thread upon successfull flush.
782
   */
783
  struct iproto_wpos wpos;
784
  /**
785
   * Position in the output buffer that points to the end of the
786
   * data awaiting to be flushed. Advanced by the iproto thread
787
   * upon receiving a message from the tx thread telling that more
788
   * output is available (see iproto_msg::wpos).
789
   */
790
  struct iproto_wpos wend;
791
  /*
792
   * Size of readahead which is not parsed yet, i.e. size of
793
   * a piece of request which is not fully read. Is always
794
   * relative to ibuf.wpos. In other words, ibuf.wpos -
795
   * parse_size gives the start of the unparsed request.
796
   * A size rather than a pointer is used to be safe in case
797
   * ibuf.buf is reallocated. Being relative to ibuf.wpos,
798
   * rather than to ibuf.rpos is helpful to make sure
799
   * ibuf_reserve() or buffers rotation don't make the value
800
   * meaningless.
801
   */
802
  size_t parse_size;
803
  /**
804
   * Nubmer of active long polling requests that have already
805
   * discarded their arguments in order not to stall other
806
   * connections.
807
   */
808
  int long_poll_count;
809
  /** I/O stream used for communication with the client. */
810
  struct iostream io;
811
  struct ev_io input;
812
  struct ev_io output;
813
  /** Logical session. */
814
  struct session *session;
815
  ev_loop *loop;
816
  /**
817
   * Pre-allocated disconnect msg. Is sent right after
818
   * actual disconnect has happened. Does not destroy the
819
   * connection. Used to notify existing requests about the
820
   * occasion.
821
   */
822
  struct cmsg disconnect_msg;
823
  /**
824
   * Pre-allocated destroy msg. Is sent after disconnect has
825
   * happened and a last request has finished. Firstly
826
   * destroys tx-related resources and then deletes the
827
   * connection.
828
   */
829
  struct cmsg destroy_msg;
830
  /**
831
   * Connection state. Mainly it is used to determine when
832
   * the connection can be destroyed, and for debug purposes
833
   * to assert on a double destroy, for example.
834
   */
835
  enum iproto_connection_state state;
836
  struct rlist in_stop_list;
837
  /**
838
   * Flag indicates, that client sent SHUT_RDWR or connection
839
   * is closed from client side. When it is set to false, we
840
   * should not write to the socket.
841
   */
842
  bool can_write;
843
  /**
844
   * Hash table that holds all streams for this connection.
845
   * This field is accesable only from iproto thread.
846
   */
847
  struct mh_i64ptr_t *streams;
848
  /**
849
   * Kharon is used to implement box.session.push().
850
   * When a new push is ready, tx uses kharon to notify
851
   * iproto about new data in connection output buffer.
852
   *
853
   * Kharon can not be in two places at the time. When
854
   * kharon leaves tx, is_push_sent is set to true. After
855
   * that new pushes can not use it. Instead, they set
856
   * is_push_pending flag. When Kharon is back to tx it
857
   * clears is_push_sent, checks is_push_pending and departs
858
   * immediately back to iproto if it is set.
859
   *
860
   * This design makes it easy to use a single message per
861
   * connection for pushes while new pushes do not wait for
862
   * the message to become available.
863
   *
864
   * iproto                                               tx
865
   * -------------------------------------------------------
866
   *                                        + [push message]
867
   *                 <--- notification ----
868
   *                                        + [push message]
869
   * [feed event]
870
   *             --- kharon travels back ---->
871
   * [write to socket]
872
   *                                        + [push message]
873
   *                                        [new push found]
874
   *                 <--- notification ----
875
   * [write ends]
876
   *                          ...
877
   */
878
  struct iproto_kharon kharon;
879
  /**
880
   * The following fields are used exclusively by the tx thread.
881
   * Align them to prevent false-sharing.
882
   */
883
  struct {
884
    alignas(CACHELINE_SIZE)
885
    /** Pointer to the current output buffer. */
886
    struct obuf *p_obuf;
887
    /** True if Kharon is in use/travelling. */
888
    bool is_push_sent;
889
    /**
890
     * True if new pushes are waiting for Kharon
891
     * return.
892
     */
893
    bool is_push_pending;
894
    /** List of inprogress messages. */
895
    struct rlist inprogress;
896
  } tx;
897
  /** Authentication salt. */
898
  char salt[IPROTO_SALT_SIZE];
899
  /** Iproto connection thread */
900
  struct iproto_thread *iproto_thread;
901
  /**
902
   * The connection is processing replication command so that
903
   * IO is handled by relay code.
904
   */
905
  bool is_in_replication;
906
  /** Link in iproto_thread->connections. */
907
  struct rlist in_connections;
908
  /** Set if connection is being dropped. */
909
  bool is_drop_pending;
910
  /**
911
   * Generation is sequence number of dropping connection invocation.
912
   *
913
   * See also `struct iproto_drop_finished`.
914
   */
915
  unsigned drop_generation;
916
  /**
917
   * Messaged sent to TX to cancel all inprogress requests of the
918
   * connection.
919
   */
920
  struct cmsg cancel_msg;
921
  /** Set if connection is accepted in TX. */
922
  bool is_established;
923
};
924
925
/** Returns a string suitable for logging. */
926
static inline const char *
927
iproto_connection_name(const struct iproto_connection *con)
928
0
{
929
0
  return sio_socketname(con->io.fd);
930
0
}
931
932
#ifdef NDEBUG
933
#define iproto_write_error(io, e, schema_version, sync)                         \
934
  iproto_do_write_error(io, e, schema_version, sync);
935
#else
936
0
#define iproto_write_error(io, e, schema_version, sync) do {                    \
937
0
  int fd = (io)->fd;                                                      \
938
0
  int flags = fcntl(fd, F_GETFL, 0);                                      \
939
0
  if (flags >= 0)                                                         \
940
0
    fcntl(fd, F_SETFL, flags & (~O_NONBLOCK));                      \
941
0
  iproto_do_write_error(io, e, schema_version, sync);                     \
942
0
  if (flags >= 0)                                                         \
943
0
    fcntl(fd, F_SETFL, flags);                                      \
944
0
} while (0);
945
#endif
946
947
static struct iproto_stream *
948
iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id)
949
0
{
950
0
  struct iproto_thread *iproto_thread = connection->iproto_thread;
951
0
  struct iproto_stream *stream = (struct iproto_stream *)
952
0
    xmempool_alloc(&iproto_thread->iproto_stream_pool);
953
0
  rmean_collect(connection->iproto_thread->rmean, IPROTO_STREAMS, 1);
954
0
  stream->txn = NULL;
955
0
  stream->current = NULL;
956
0
  stailq_create(&stream->pending_requests);
957
0
  stream->id = stream_id;
958
0
  stream->connection = connection;
959
0
  return stream;
960
0
}
961
962
static inline void
963
iproto_stream_rollback_on_disconnect(struct iproto_stream *stream)
964
0
{
965
0
  struct iproto_connection *conn = stream->connection;
966
0
  struct iproto_thread *iproto_thread = conn->iproto_thread;
967
0
  struct cmsg_hop *route =
968
0
    iproto_thread->rollback_on_disconnect_route;
969
0
  cmsg_init(&stream->on_disconnect, route);
970
0
  cpipe_push(&iproto_thread->tx_pipe, &stream->on_disconnect);
971
0
}
972
973
/**
974
 * Return true if we have not enough spare messages
975
 * in the message pool.
976
 */
977
static inline bool
978
iproto_check_msg_max(struct iproto_thread *iproto_thread)
979
0
{
980
0
  size_t request_count = mempool_count(&iproto_thread->iproto_msg_pool);
981
0
  return request_count > (size_t) iproto_msg_max;
982
0
}
983
984
static inline void
985
iproto_msg_delete(struct iproto_msg *msg)
986
0
{
987
0
  struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
988
0
  mempool_free(&msg->connection->iproto_thread->iproto_msg_pool, msg);
989
0
  iproto_resume(iproto_thread);
990
0
}
991
992
static void
993
iproto_stream_delete(struct iproto_stream *stream)
994
0
{
995
0
  assert(stream->current == NULL);
996
0
  assert(stailq_empty(&stream->pending_requests));
997
0
  assert(stream->txn == NULL);
998
0
  mempool_free(&stream->connection->iproto_thread->iproto_stream_pool, stream);
999
0
}
1000
1001
static struct iproto_msg *
1002
iproto_msg_new(struct iproto_connection *con)
1003
0
{
1004
0
  struct mempool *iproto_msg_pool = &con->iproto_thread->iproto_msg_pool;
1005
0
  struct iproto_msg *msg =
1006
0
    (struct iproto_msg *)xmempool_alloc(iproto_msg_pool);
1007
0
  msg->close_connection = false;
1008
0
  msg->connection = con;
1009
0
  msg->stream = NULL;
1010
0
  msg->fiber = NULL;
1011
0
  rmean_collect(con->iproto_thread->rmean, IPROTO_REQUESTS, 1);
1012
0
  return msg;
1013
0
}
1014
1015
/**
1016
 * Signal input unless it's blocked on I/O or stopped.
1017
 */
1018
static inline void
1019
iproto_connection_feed_input(struct iproto_connection *con)
1020
0
{
1021
0
  assert(con->state == IPROTO_CONNECTION_ALIVE);
1022
0
  if (!ev_is_active(&con->input) && rlist_empty(&con->in_stop_list))
1023
0
    ev_feed_event(con->loop, &con->input, EV_CUSTOM);
1024
0
}
1025
1026
/**
1027
 * Signal output unless it's blocked on I/O.
1028
 */
1029
static inline void
1030
iproto_connection_feed_output(struct iproto_connection *con)
1031
0
{
1032
0
  assert(con->state == IPROTO_CONNECTION_ALIVE);
1033
0
  if (!ev_is_active(&con->output))
1034
0
    ev_feed_event(con->loop, &con->output, EV_CUSTOM);
1035
0
}
1036
1037
/**
1038
 * A connection is idle when the client is gone
1039
 * and there are no outstanding msgs in the msg queue.
1040
 * An idle connection can be safely garbage collected.
1041
 *
1042
 * ibuf_size() provides an effective reference counter
1043
 * on connection use in the tx request queue. Any request
1044
 * in the request queue has a non-zero len, and ibuf_size()
1045
 * is therefore non-zero as long as there is at least
1046
 * one request in the tx queue.
1047
 */
1048
static inline bool
1049
iproto_connection_is_idle(struct iproto_connection *con)
1050
0
{
1051
  /*
1052
   * The check for 'mh_size (streams) == 0' was added, because it is
1053
   * possible that when disconnect occurs, there is active transaction
1054
   * in stream after processing all messages. In this case we send
1055
   * special message to rollback it, and without this check we would
1056
   * immediately send special message to destroy connection. This would
1057
   * not lead to error now, since the messages are processed strictly
1058
   * sequentially and rollback does not yield, but it is not safely and
1059
   * if we add some more complex logic, it may lead to difficulty catching
1060
   * errors in the future.
1061
   */
1062
0
  return con->long_poll_count == 0 &&
1063
0
         mh_size(con->streams) == 0 &&
1064
0
         ibuf_used(&con->ibuf[0]) == 0 &&
1065
0
         ibuf_used(&con->ibuf[1]) == 0;
1066
0
}
1067
1068
/**
1069
 * Stop input when readahead limit is reached. When
1070
 * we process some messages *on this connection*, the input can be
1071
 * resumed.
1072
 */
1073
static inline void
1074
iproto_connection_stop_readahead_limit(struct iproto_connection *con)
1075
0
{
1076
0
  say_warn_ratelimited("stopping input on connection %s, "
1077
0
           "readahead limit is reached",
1078
0
           iproto_connection_name(con));
1079
0
  assert(rlist_empty(&con->in_stop_list));
1080
0
  ev_io_stop(con->loop, &con->input);
1081
0
}
1082
1083
static inline void
1084
iproto_connection_stop_msg_max_limit(struct iproto_connection *con)
1085
0
{
1086
0
  assert(rlist_empty(&con->in_stop_list));
1087
1088
0
  say_warn_ratelimited("stopping input on connection %s, "
1089
0
           "net_msg_max limit is reached",
1090
0
           iproto_connection_name(con));
1091
0
  ev_io_stop(con->loop, &con->input);
1092
  /*
1093
   * Important to add to tail and fetch from head to ensure
1094
   * strict lifo order (fairness) for stopped connections.
1095
   */
1096
0
  rlist_add_tail(&con->iproto_thread->stopped_connections,
1097
0
           &con->in_stop_list);
1098
0
}
1099
1100
/**
1101
 * Send a destroy message to TX thread in case all requests are
1102
 * finished.
1103
 */
1104
static inline void
1105
iproto_connection_try_to_start_destroy(struct iproto_connection *con)
1106
0
{
1107
0
  assert(con->state == IPROTO_CONNECTION_CLOSED ||
1108
0
         con->state == IPROTO_CONNECTION_PENDING_DESTROY);
1109
0
  if (!iproto_connection_is_idle(con)) {
1110
    /*
1111
     * Not all requests are finished. Let the last
1112
     * finished request destroy the connection.
1113
     */
1114
0
    con->state = IPROTO_CONNECTION_PENDING_DESTROY;
1115
0
    return;
1116
0
  }
1117
  /*
1118
   * If the connection has no outstanding requests in the
1119
   * input buffer, then no one (e.g. tx thread) is referring
1120
   * to it, so it must be destroyed. Firstly queue a msg to
1121
   * destroy the session and other resources owned by TX
1122
   * thread. When it is done, iproto thread will destroy
1123
   * other parts of the connection.
1124
   */
1125
0
  con->state = IPROTO_CONNECTION_DESTROYED;
1126
0
  cpipe_push(&con->iproto_thread->tx_pipe, &con->destroy_msg);
1127
0
}
1128
1129
/**
1130
 * Initiate a connection shutdown. This method may
1131
 * be invoked many times, and does the internal
1132
 * bookkeeping to only cleanup resources once.
1133
 */
1134
static inline void
1135
iproto_connection_close(struct iproto_connection *con)
1136
0
{
1137
0
  if (con->state == IPROTO_CONNECTION_ALIVE) {
1138
    /* Clears all pending events. */
1139
0
    ev_io_stop(con->loop, &con->input);
1140
0
    ev_io_stop(con->loop, &con->output);
1141
    /*
1142
     * Invalidate fd to prevent undefined behavior in case
1143
     * we mistakenly try to use it after this point.
1144
     */
1145
0
    con->input.fd = con->output.fd = -1;
1146
0
    iostream_close(&con->io);
1147
    /*
1148
     * Discard unparsed data, to recycle the
1149
     * connection in net_send_msg() as soon as all
1150
     * parsed data is processed.  It's important this
1151
     * is done only once.
1152
     */
1153
0
    ibuf_discard(con->p_ibuf, con->parse_size);
1154
0
    con->parse_size = 0;
1155
0
    mh_int_t node;
1156
0
    mh_foreach(con->streams, node) {
1157
0
      struct iproto_stream *stream = (struct iproto_stream *)
1158
0
        mh_i64ptr_node(con->streams, node)->val;
1159
      /**
1160
       * If stream->current == NULL and stream requests
1161
       * queue is empty, it means that there is some active
1162
       * transaction which was not commited yet. We need to
1163
       * rollback it, since we push on_disconnect message
1164
       * to tx thread here. Otherwise we destroy stream in
1165
       * `net_send_msg` after processing all requests.
1166
       */
1167
0
      if (stream->current == NULL &&
1168
0
          stailq_empty(&stream->pending_requests))
1169
0
        iproto_stream_rollback_on_disconnect(stream);
1170
0
    }
1171
0
    cpipe_push(&con->iproto_thread->tx_pipe, &con->disconnect_msg);
1172
0
    assert(con->state == IPROTO_CONNECTION_ALIVE);
1173
0
    con->state = IPROTO_CONNECTION_CLOSED;
1174
0
  } else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
1175
0
    iproto_connection_try_to_start_destroy(con);
1176
0
  } else {
1177
0
    assert(con->state == IPROTO_CONNECTION_CLOSED);
1178
0
  }
1179
0
  rlist_del(&con->in_stop_list);
1180
0
}
1181
1182
static inline struct ibuf *
1183
iproto_connection_next_input(struct iproto_connection *con)
1184
0
{
1185
0
  return &con->ibuf[con->p_ibuf == &con->ibuf[0]];
1186
0
}
1187
1188
/**
1189
 * If there is no space for reading input, we can do one of the
1190
 * following:
1191
 * - try to get a new ibuf, so that it can fit the request.
1192
 *   Always getting a new input buffer when there is no space
1193
 *   makes the instance susceptible to input-flood attacks.
1194
 *   Therefore, at most 2 ibufs are used in a single connection,
1195
 *   one is "open", receiving input, and the other is closed,
1196
 *   waiting for flushing output from a corresponding obuf.
1197
 * - stop input and wait until the client reads piled up output,
1198
 *   so the input buffer can be reused. This complements
1199
 *   the previous strategy. It is only safe to stop input if it
1200
 *   is known that there is output. In this case input event
1201
 *   flow will be resumed when all replies to previous requests
1202
 *   are sent. Since there are two buffers, the input is only
1203
 *   stopped when both of them are fully used up.
1204
 *
1205
 * To make this strategy work, each ibuf in use must fit at least
1206
 * one request. Otherwise, both obufs may end up having no data to
1207
 * flush, while current ibuf is too small to fit a big incoming
1208
 * request.
1209
 */
1210
static struct ibuf *
1211
iproto_connection_input_buffer(struct iproto_connection *con)
1212
0
{
1213
0
  struct ibuf *old_ibuf = con->p_ibuf;
1214
1215
0
  size_t to_read = 3; /* Smallest possible valid request. */
1216
1217
  /* The type code is checked in iproto_enqueue_batch() */
1218
0
  if (con->parse_size) {
1219
0
    const char *pos = old_ibuf->wpos - con->parse_size;
1220
0
    if (mp_check_uint(pos, old_ibuf->wpos) <= 0)
1221
0
      to_read = mp_decode_uint(&pos);
1222
0
  }
1223
1224
0
  if (ibuf_unused(old_ibuf) >= to_read) {
1225
    /*
1226
     * If all read data is discarded, move read
1227
     * position to the start of the buffer, to
1228
     * reduce chances of unaccounted growth of the
1229
     * buffer as read position is shifted to the
1230
     * end of the buffer.
1231
     */
1232
0
    if (ibuf_used(old_ibuf) == 0)
1233
0
      ibuf_reset(old_ibuf);
1234
0
    return old_ibuf;
1235
0
  }
1236
1237
  /*
1238
   * Reuse the buffer if all requests are processed
1239
   * (in only has unparsed content).
1240
   */
1241
0
  if (ibuf_used(old_ibuf) == con->parse_size) {
1242
0
    xibuf_reserve(old_ibuf, to_read);
1243
0
    return old_ibuf;
1244
0
  }
1245
1246
0
  struct ibuf *new_ibuf = iproto_connection_next_input(con);
1247
0
  if (ibuf_used(new_ibuf) != 0) {
1248
    /*
1249
     * Wait until the second buffer is flushed
1250
     * and becomes available for reuse.
1251
     */
1252
0
    return NULL;
1253
0
  }
1254
  /* Update buffer size if readahead has changed. */
1255
0
  if (new_ibuf->start_capacity != iproto_readahead) {
1256
0
    ibuf_destroy(new_ibuf);
1257
0
    ibuf_create(new_ibuf, cord_slab_cache(), iproto_readahead);
1258
0
  }
1259
1260
0
  xibuf_reserve(new_ibuf, to_read + con->parse_size);
1261
0
  if (con->parse_size != 0) {
1262
    /* Move the cached request prefix to the new buffer. */
1263
0
    void *wpos = ibuf_alloc(new_ibuf, con->parse_size);
1264
0
    memcpy(wpos, old_ibuf->wpos - con->parse_size, con->parse_size);
1265
    /*
1266
     * Discard unparsed data in the old buffer, otherwise it
1267
     * won't be recycled when all parsed requests are processed.
1268
     */
1269
0
    ibuf_discard(old_ibuf, con->parse_size);
1270
    /*
1271
     * We made ibuf idle. If obuf was already idle it
1272
     * makes the both ibuf and obuf idle, time to trim
1273
     * them.
1274
     */
1275
0
    if (ibuf_used(old_ibuf) == 0)
1276
0
      iproto_reset_input(old_ibuf);
1277
0
  }
1278
  /*
1279
   * Rotate buffers. Not strictly necessary, but
1280
   * helps preserve response order.
1281
   */
1282
0
  con->p_ibuf = new_ibuf;
1283
0
  return new_ibuf;
1284
0
}
1285
1286
/**
1287
 * Check if message belongs to stream (stream_id != 0), and if it
1288
 * is so create new stream or get stream from connection streams
1289
 * hash table. Put message to stream pending messages list.
1290
 * @retval true  - the message is ready to push to TX thread (either if
1291
 *                 stream_id is not set (is zero) or the stream is not
1292
 *                 processing other messages).
1293
 *         false - the message is postponed because its stream is busy
1294
 *                 processing previous message(s).
1295
 */
1296
static bool
1297
iproto_msg_start_processing_in_stream(struct iproto_msg *msg)
1298
0
{
1299
0
  uint64_t stream_id = msg->header.stream_id;
1300
0
  if (stream_id == 0)
1301
0
    return true;
1302
1303
0
  struct iproto_connection *con = msg->connection;
1304
0
  struct iproto_stream *stream = NULL;
1305
0
  mh_int_t pos = mh_i64ptr_find(con->streams, stream_id, 0);
1306
0
  if (pos == mh_end(con->streams)) {
1307
0
    stream = iproto_stream_new(msg->connection, msg->header.stream_id);
1308
0
    struct mh_i64ptr_node_t node;
1309
0
    node.key = stream_id;
1310
0
    node.val = stream;
1311
0
    pos = mh_i64ptr_put(con->streams, &node, NULL, NULL);
1312
0
  }
1313
0
  stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val;
1314
0
  assert(stream != NULL);
1315
0
  msg->stream = stream;
1316
0
  if (stream->current == NULL) {
1317
0
    stream->current = msg;
1318
0
    return true;
1319
0
  }
1320
0
  con->iproto_thread->requests_in_stream_queue++;
1321
0
  rmean_collect(con->iproto_thread->rmean, REQUESTS_IN_STREAM_QUEUE, 1);
1322
0
  stailq_add_tail_entry(&stream->pending_requests, msg, in_stream);
1323
0
  return false;
1324
0
}
1325
1326
/**
1327
 * Enqueue all requests which were read up. If a request limit is
1328
 * reached - stop the connection input even if not the whole batch
1329
 * is enqueued. Else try to read more feeding read event to the
1330
 * event loop.
1331
 * @param con Connection to enqueue in.
1332
 * @param in Buffer to parse.
1333
 *
1334
 * @retval  0 Success.
1335
 * @retval -1 Invalid MessagePack.
1336
 */
1337
static inline int
1338
iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
1339
0
{
1340
0
  assert(rlist_empty(&con->in_stop_list));
1341
0
  int n_requests = 0;
1342
0
  const char *errmsg;
1343
0
  while (con->parse_size != 0 && !con->is_in_replication) {
1344
0
    if (iproto_check_msg_max(con->iproto_thread)) {
1345
0
      iproto_connection_stop_msg_max_limit(con);
1346
0
      cpipe_flush_input(&con->iproto_thread->tx_pipe);
1347
0
      return 0;
1348
0
    }
1349
0
    const char *reqstart = in->wpos - con->parse_size;
1350
0
    const char *pos = reqstart;
1351
    /* Read request length. */
1352
0
    if (mp_typeof(*pos) != MP_UINT) {
1353
0
      errmsg = "packet length";
1354
0
err_msgpack:
1355
0
      cpipe_flush_input(&con->iproto_thread->tx_pipe);
1356
0
      diag_set(ClientError, ER_INVALID_MSGPACK,
1357
0
         errmsg);
1358
0
      return -1;
1359
0
    }
1360
0
    if (mp_check_uint(pos, in->wpos) >= 0)
1361
0
      break;
1362
0
    uint64_t len = mp_decode_uint(&pos);
1363
0
    if (len > IPROTO_PACKET_SIZE_MAX) {
1364
0
      errmsg = tt_sprintf("too big packet size in the "\
1365
0
              "header: %llu",
1366
0
              (unsigned long long) len);
1367
0
      goto err_msgpack;
1368
0
    }
1369
0
    const char *reqend = pos + len;
1370
0
    if (reqend > in->wpos)
1371
0
      break;
1372
0
    struct iproto_msg *msg = iproto_msg_new(con);
1373
0
    msg->p_ibuf = con->p_ibuf;
1374
0
    msg->reqstart = reqstart;
1375
0
    msg->wpos = con->wpos;
1376
0
    msg->len = reqend - reqstart; /* total request length */
1377
0
    con->input_msg_count[msg->p_ibuf == &con->ibuf[1]]++;
1378
1379
0
    iproto_msg_prepare(msg, &pos, reqend);
1380
0
    if (iproto_msg_start_processing_in_stream(msg)) {
1381
0
      cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base);
1382
0
      n_requests++;
1383
0
    }
1384
1385
    /* Request is parsed */
1386
0
    assert(reqend > reqstart);
1387
0
    assert(con->parse_size >= (size_t) (reqend - reqstart));
1388
0
    con->parse_size -= reqend - reqstart;
1389
0
  }
1390
0
  if (con->is_in_replication) {
1391
    /**
1392
     * Don't mess with the file descriptor
1393
     * while join is running. ev_io_stop()
1394
     * also clears any pending events, which
1395
     * is good, since their invocation may
1396
     * re-start the watcher, ruining our
1397
     * efforts.
1398
     */
1399
0
    ev_io_stop(con->loop, &con->output);
1400
0
    ev_io_stop(con->loop, &con->input);
1401
0
  } else if (n_requests != 1 || con->parse_size != 0) {
1402
    /*
1403
     * Keep reading input, as long as the socket
1404
     * supplies data, but don't waste CPU on an extra
1405
     * read() if dealing with a blocking client, it
1406
     * has nothing in the socket for us.
1407
     *
1408
     * We look at the amount of enqueued requests
1409
     * and presence of a partial request in the
1410
     * input buffer as hints to distinguish
1411
     * blocking and non-blocking clients:
1412
     *
1413
     * For blocking clients, a request typically
1414
     * is fully read and enqueued.
1415
     * If there is unparsed data, or 0 queued
1416
     * requests, keep reading input, if only to avoid
1417
     * a deadlock on this connection.
1418
     */
1419
0
    iproto_connection_feed_input(con);
1420
0
  }
1421
0
  cpipe_flush_input(&con->iproto_thread->tx_pipe);
1422
0
  return 0;
1423
0
}
1424
1425
/**
1426
 * Enqueue connection's pending requests. Completely resurrect the
1427
 * connection, if it has no more requests, and the limit still is
1428
 * not reached.
1429
 */
1430
static void
1431
iproto_connection_resume(struct iproto_connection *con)
1432
0
{
1433
0
  assert(! iproto_check_msg_max(con->iproto_thread));
1434
0
  rlist_del(&con->in_stop_list);
1435
  /*
1436
   * Enqueue_batch() stops the connection again, if the
1437
   * limit is reached again.
1438
   */
1439
0
  if (iproto_enqueue_batch(con, con->p_ibuf) != 0) {
1440
0
    struct error *e = box_error_last();
1441
0
    error_log(e);
1442
0
    iproto_write_error(&con->io, e, ::schema_version, 0);
1443
0
    iproto_connection_close(con);
1444
0
  }
1445
0
}
1446
1447
/**
1448
 * Resume as many connections as possible until a request limit is
1449
 * reached. By design of iproto_enqueue_batch(), a paused
1450
 * connection almost always has a pending request fully read up,
1451
 * so resuming a connection will immediately enqueue the request
1452
 * as an iproto message and exhaust the limit. Thus we aren't
1453
 * really resuming all connections here: only as many as is
1454
 * necessary to use up the limit.
1455
 */
1456
static void
1457
iproto_resume(struct iproto_thread *iproto_thread)
1458
0
{
1459
0
  while (!iproto_check_msg_max(iproto_thread) &&
1460
0
         !rlist_empty(&iproto_thread->stopped_connections)) {
1461
    /*
1462
     * Shift from list head to ensure strict FIFO
1463
     * (fairness) for resumed connections.
1464
     */
1465
0
    struct iproto_connection *con =
1466
0
      rlist_first_entry(&iproto_thread->stopped_connections,
1467
0
            struct iproto_connection,
1468
0
            in_stop_list);
1469
0
    iproto_connection_resume(con);
1470
0
  }
1471
0
}
1472
1473
static void
1474
iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
1475
         int /* revents */)
1476
0
{
1477
0
  struct iproto_connection *con =
1478
0
    (struct iproto_connection *) watcher->data;
1479
0
  struct iostream *io = &con->io;
1480
0
  assert(con->state == IPROTO_CONNECTION_ALIVE);
1481
0
  assert(rlist_empty(&con->in_stop_list));
1482
0
  assert(loop == con->loop);
1483
  /*
1484
   * Throttle if there are too many pending requests,
1485
   * otherwise we might deplete the fiber pool in tx
1486
   * thread and deadlock.
1487
   */
1488
0
  if (iproto_check_msg_max(con->iproto_thread)) {
1489
0
    iproto_connection_stop_msg_max_limit(con);
1490
0
    return;
1491
0
  }
1492
1493
  /* Ensure we have sufficient space for the next round.  */
1494
0
  struct ibuf *in = iproto_connection_input_buffer(con);
1495
0
  if (in == NULL) {
1496
0
    iproto_connection_stop_readahead_limit(con);
1497
0
    return;
1498
0
  }
1499
  /* Read input. */
1500
0
  ibuf_reserve(in, ibuf_unused(in));
1501
0
  ssize_t nrd = iostream_read(io, in->wpos, ibuf_unused(in));
1502
0
  if (nrd < 0) {                  /* Socket is not ready. */
1503
0
    if (nrd == IOSTREAM_ERROR)
1504
0
      goto error;
1505
0
    int events = iostream_status_to_events(nrd);
1506
0
    if (con->input.events != events) {
1507
0
      ev_io_stop(loop, &con->input);
1508
0
      ev_io_set(&con->input, con->io.fd, events);
1509
0
    }
1510
0
    ev_io_start(loop, &con->input);
1511
0
    return;
1512
0
  }
1513
0
  if (nrd == 0) {                 /* EOF */
1514
0
    iproto_connection_close(con);
1515
0
    return;
1516
0
  }
1517
  /* Count statistics */
1518
0
  rmean_collect(con->iproto_thread->rmean, IPROTO_RECEIVED, nrd);
1519
1520
  /* Update the read position and connection state. */
1521
0
  ibuf_alloc(in, nrd);
1522
0
  con->parse_size += nrd;
1523
  /* Enqueue all requests which are fully read up. */
1524
0
  if (iproto_enqueue_batch(con, in) != 0)
1525
0
    goto error;
1526
0
  return;
1527
0
error:;
1528
0
  struct error *e = diag_last_error(diag_get());
1529
0
  assert(e != NULL);
1530
0
  error_log(e);
1531
  /* Best effort at sending the error message to the client. */
1532
0
  iproto_write_error(io, e, ::schema_version, 0);
1533
0
  iproto_connection_close(con);
1534
0
}
1535
1536
/** writev() to the socket and handle the result. */
1537
static int
1538
iproto_flush(struct iproto_connection *con)
1539
0
{
1540
0
  struct obuf *obuf = con->wpos.obuf;
1541
0
  struct obuf_svp obuf_end = obuf_create_svp(obuf);
1542
0
  struct obuf_svp *begin = &con->wpos.svp;
1543
0
  struct obuf_svp *end = &con->wend.svp;
1544
0
  if (con->wend.obuf != obuf) {
1545
    /*
1546
     * Flush the current buffer before
1547
     * advancing to the next one.
1548
     */
1549
0
    if (begin->used == obuf_end.used) {
1550
0
      obuf = con->wpos.obuf = con->wend.obuf;
1551
0
      obuf_svp_reset(begin);
1552
0
    } else {
1553
0
      end = &obuf_end;
1554
0
    }
1555
0
  }
1556
0
  if (begin->used == end->used) {
1557
    /* Nothing to do. */
1558
0
    return 1;
1559
0
  }
1560
0
  if (!con->can_write) {
1561
    /* Receiving end was closed. Discard the output. */
1562
0
    *begin = *end;
1563
0
    return 0;
1564
0
  }
1565
0
  assert(begin->used < end->used);
1566
0
  struct iovec iov[SMALL_OBUF_IOV_MAX+1];
1567
0
  struct iovec *src = obuf->iov;
1568
0
  int iovcnt = end->pos - begin->pos + 1;
1569
  /*
1570
   * iov[i].iov_len may be concurrently modified in tx thread,
1571
   * but only for the last position.
1572
   */
1573
0
  memcpy(iov, src + begin->pos, iovcnt * sizeof(struct iovec));
1574
0
  sio_add_to_iov(iov, -begin->iov_len);
1575
  /* *Overwrite* iov_len of the last pos as it may be garbage. */
1576
0
  iov[iovcnt-1].iov_len = end->iov_len - begin->iov_len * (iovcnt == 1);
1577
1578
0
  ssize_t nwr = iostream_writev(&con->io, iov, iovcnt);
1579
0
  if (nwr >= 0) {
1580
    /* Count statistics */
1581
0
    rmean_collect(con->iproto_thread->rmean, IPROTO_SENT, nwr);
1582
0
    if (begin->used + nwr == end->used) {
1583
0
      *begin = *end;
1584
0
      return 0;
1585
0
    }
1586
0
    size_t offset = 0;
1587
0
    int advance = 0;
1588
0
    advance = sio_move_iov(iov, nwr, &offset);
1589
0
    begin->used += nwr;             /* advance write position */
1590
0
    begin->iov_len = advance == 0 ? begin->iov_len + offset: offset;
1591
0
    begin->pos += advance;
1592
0
    assert(begin->pos <= end->pos);
1593
0
    return IOSTREAM_WANT_WRITE;
1594
0
  } else if (nwr == IOSTREAM_ERROR) {
1595
    /*
1596
     * Don't close the connection on write error. Log the error and
1597
     * don't write to the socket anymore. Continue processing
1598
     * requests as usual, because the client might have closed the
1599
     * socket, but still expect pending requests to complete.
1600
     */
1601
0
    diag_log();
1602
0
    con->can_write = false;
1603
0
    *begin = *end;
1604
0
    return 0;
1605
0
  }
1606
0
  return nwr;
1607
0
}
1608
1609
static void
1610
iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher,
1611
          int /* revents */)
1612
0
{
1613
0
  struct iproto_connection *con = (struct iproto_connection *) watcher->data;
1614
0
  assert(con->state == IPROTO_CONNECTION_ALIVE);
1615
0
  int rc;
1616
0
  while ((rc = iproto_flush(con)) <= 0) {
1617
0
    if (rc != 0) {
1618
0
      int events = iostream_status_to_events(rc);
1619
0
      if (con->output.events != events) {
1620
0
        ev_io_stop(loop, &con->output);
1621
0
        ev_io_set(&con->output, con->io.fd, events);
1622
0
      }
1623
0
      ev_io_start(loop, &con->output);
1624
0
      return;
1625
0
    }
1626
0
  }
1627
0
  if (ev_is_active(&con->output))
1628
0
    ev_io_stop(con->loop, &con->output);
1629
  /*
1630
   * If the out channel isn't clogged, we can read more requests.
1631
   * Note, we trigger input even if we didn't write any responses
1632
   * (iproto_flush returned 1 right away). This is intentional:
1633
   * some requests don't have responses (IPROTO_WATCH).
1634
   */
1635
0
  iproto_connection_feed_input(con);
1636
0
}
1637
1638
static struct iproto_connection *
1639
iproto_connection_new(struct iproto_thread *iproto_thread)
1640
0
{
1641
0
  struct iproto_connection *con = (struct iproto_connection *)
1642
0
    xmempool_alloc(&iproto_thread->iproto_connection_pool);
1643
0
  con->streams = mh_i64ptr_new();
1644
0
  con->iproto_thread = iproto_thread;
1645
0
  con->input.data = con->output.data = con;
1646
0
  con->loop = loop();
1647
0
  iostream_clear(&con->io);
1648
0
  ev_io_init(&con->input, iproto_connection_on_input, -1, EV_NONE);
1649
0
  ev_io_init(&con->output, iproto_connection_on_output, -1, EV_NONE);
1650
0
  ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
1651
0
  ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
1652
0
  con->input_msg_count[0] = 0;
1653
0
  con->input_msg_count[1] = 0;
1654
0
  obuf_create(&con->obuf[0], &con->iproto_thread->net_slabc,
1655
0
        iproto_readahead);
1656
0
  obuf_create(&con->obuf[1], &con->iproto_thread->net_slabc,
1657
0
        iproto_readahead);
1658
0
  con->p_ibuf = &con->ibuf[0];
1659
0
  con->tx.p_obuf = &con->obuf[0];
1660
0
  iproto_wpos_create(&con->wpos, con->tx.p_obuf);
1661
0
  iproto_wpos_create(&con->wend, con->tx.p_obuf);
1662
0
  con->parse_size = 0;
1663
0
  con->can_write = true;
1664
0
  con->long_poll_count = 0;
1665
0
  con->session = NULL;
1666
0
  con->is_in_replication = false;
1667
0
  con->is_drop_pending = false;
1668
0
  con->is_established = false;
1669
0
  rlist_create(&con->in_stop_list);
1670
0
  rlist_create(&con->tx.inprogress);
1671
0
  rlist_add_entry(&iproto_thread->connections, con, in_connections);
1672
  /* It may be very awkward to allocate at close. */
1673
0
  cmsg_init(&con->destroy_msg, con->iproto_thread->destroy_route);
1674
0
  cmsg_init(&con->disconnect_msg, con->iproto_thread->disconnect_route);
1675
0
  con->state = IPROTO_CONNECTION_ALIVE;
1676
0
  con->tx.is_push_pending = false;
1677
0
  con->tx.is_push_sent = false;
1678
0
  rmean_collect(iproto_thread->rmean, IPROTO_CONNECTIONS, 1);
1679
0
  return con;
1680
0
}
1681
1682
/** Notify that connections drop is finished. */
1683
static void
1684
tx_process_drop_finished(struct cmsg *m)
1685
0
{
1686
0
  struct iproto_drop_finished *drop_finished =
1687
0
          (struct iproto_drop_finished *)m;
1688
0
  if (drop_finished->generation == drop_generation &&
1689
0
      --drop_pending_thread_count == 0)
1690
0
    fiber_cond_signal(&drop_finished_cond);
1691
0
}
1692
1693
/** Send message to TX thread to notify that connections drop is finished. */
1694
static void
1695
iproto_send_drop_finished(struct iproto_thread *iproto_thread,
1696
        unsigned generation)
1697
0
{
1698
0
  static const struct cmsg_hop drop_finished_route[1] =
1699
0
          {{ tx_process_drop_finished, NULL }};
1700
1701
0
  cmsg_init(&iproto_thread->drop_finished_msg.base, drop_finished_route);
1702
0
  iproto_thread->drop_finished_msg.generation = generation;
1703
0
  cpipe_push(&iproto_thread->tx_pipe,
1704
0
       &iproto_thread->drop_finished_msg.base);
1705
0
}
1706
1707
/** Recycle a connection. */
1708
static inline void
1709
iproto_connection_delete(struct iproto_connection *con)
1710
0
{
1711
0
  assert(iproto_connection_is_idle(con));
1712
0
  assert(!iostream_is_initialized(&con->io));
1713
0
  assert(con->session == NULL);
1714
0
  assert(con->state == IPROTO_CONNECTION_DESTROYED);
1715
  /*
1716
   * The output buffers must have been deleted
1717
   * in tx thread.
1718
   */
1719
0
  ibuf_destroy(&con->ibuf[0]);
1720
0
  ibuf_destroy(&con->ibuf[1]);
1721
0
  assert(!obuf_is_initialized(&con->obuf[0]));
1722
0
  assert(!obuf_is_initialized(&con->obuf[1]));
1723
1724
0
  assert(mh_size(con->streams) == 0);
1725
0
  mh_i64ptr_delete(con->streams);
1726
0
  rlist_del(&con->in_connections);
1727
0
  if (con->is_drop_pending) {
1728
0
    struct iproto_thread *iproto_thread = con->iproto_thread;
1729
1730
0
    assert(iproto_thread->drop_pending_connection_count > 0);
1731
0
    if (--iproto_thread->drop_pending_connection_count == 0)
1732
0
      iproto_send_drop_finished(iproto_thread,
1733
0
              con->drop_generation);
1734
0
  }
1735
0
  mempool_free(&con->iproto_thread->iproto_connection_pool, con);
1736
0
}
1737
1738
/* }}} iproto_connection */
1739
1740
/* {{{ iproto_msg - methods and routes */
1741
1742
static void
1743
tx_process_misc(struct cmsg *msg);
1744
1745
static void
1746
tx_process_call(struct cmsg *msg);
1747
1748
static void
1749
tx_process1(struct cmsg *msg);
1750
1751
static void
1752
tx_process_select(struct cmsg *msg);
1753
1754
static void
1755
tx_process_sql(struct cmsg *msg);
1756
1757
static void
1758
tx_reply_error(struct iproto_msg *msg);
1759
1760
static void
1761
tx_reply_iproto_error(struct cmsg *m);
1762
1763
static void
1764
net_send_msg(struct cmsg *msg);
1765
1766
static void
1767
net_send_error(struct cmsg *msg);
1768
1769
static void
1770
tx_process_replication(struct cmsg *msg);
1771
1772
static void
1773
net_end_join(struct cmsg *msg);
1774
1775
static void
1776
net_end_subscribe(struct cmsg *msg);
1777
1778
/**
1779
 * Decodes the IPROTO message and returns the route corresponding to the message
1780
 * type.
1781
 * Can be called from both IPROTO and TX threads.
1782
 */
1783
static int
1784
iproto_msg_decode(struct iproto_msg *msg, struct cmsg_hop **route);
1785
1786
static void
1787
iproto_msg_prepare(struct iproto_msg *msg, const char **pos, const char *reqend)
1788
0
{
1789
0
  uint64_t stream_id;
1790
0
  uint32_t type;
1791
0
  bool request_is_not_for_stream;
1792
0
  bool request_is_only_for_stream;
1793
0
  struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
1794
0
  mh_i32_t *handlers = iproto_thread->req_handlers;
1795
0
  mh_int_t handler;
1796
0
  struct cmsg_hop *route;
1797
0
  int rc;
1798
1799
0
  if (xrow_header_decode(&msg->header, pos, reqend, true) != 0)
1800
0
    goto error;
1801
0
  assert(*pos == reqend);
1802
1803
0
  type = msg->header.type;
1804
0
  stream_id = msg->header.stream_id;
1805
0
  request_is_not_for_stream =
1806
0
    ((type > IPROTO_TYPE_STAT_MAX &&
1807
0
     type != IPROTO_PING) || type == IPROTO_AUTH);
1808
0
  request_is_only_for_stream =
1809
0
    (type == IPROTO_BEGIN ||
1810
0
     type == IPROTO_COMMIT ||
1811
0
     type == IPROTO_ROLLBACK);
1812
1813
0
  if (stream_id != 0 && request_is_not_for_stream) {
1814
0
    diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM,
1815
0
       iproto_type_name(type));
1816
0
    goto error;
1817
0
  } else if (stream_id == 0 && request_is_only_for_stream) {
1818
0
    diag_set(ClientError, ER_UNABLE_TO_PROCESS_OUT_OF_STREAM,
1819
0
       iproto_type_name(type));
1820
0
    goto error;
1821
0
  }
1822
1823
0
  msg->connection->is_in_replication = type == IPROTO_JOIN ||
1824
0
               type == IPROTO_FETCH_SNAPSHOT ||
1825
0
               type == IPROTO_REGISTER ||
1826
0
               type == IPROTO_SUBSCRIBE;
1827
1828
0
  handler = mh_i32_find(handlers, type, NULL);
1829
0
  if (handler != mh_end(handlers)) {
1830
0
    assert(!msg->connection->is_in_replication);
1831
0
    cmsg_init(&msg->base, iproto_thread->override_route);
1832
0
    return;
1833
0
  }
1834
1835
0
  rc = iproto_msg_decode(msg, &route);
1836
0
  if (rc == 0) {
1837
0
    assert(route != NULL);
1838
0
    cmsg_init(&msg->base, route);
1839
0
    return;
1840
0
  }
1841
0
  if (route == NULL) {
1842
0
    handler = mh_i32_find(handlers, IPROTO_UNKNOWN, NULL);
1843
0
    if (handler != mh_end(handlers)) {
1844
0
      cmsg_init(&msg->base, iproto_thread->override_route);
1845
0
      return;
1846
0
    }
1847
0
    diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, (uint32_t)type);
1848
0
  }
1849
0
error:
1850
  /** Log and send the error. */
1851
0
  diag_log();
1852
0
  diag_create(&msg->diag);
1853
0
  diag_move(&fiber()->diag, &msg->diag);
1854
0
  cmsg_init(&msg->base, iproto_thread->error_route);
1855
0
}
1856
1857
static int
1858
iproto_msg_decode(struct iproto_msg *msg, struct cmsg_hop **route)
1859
0
{
1860
0
  uint32_t type = msg->header.type;
1861
0
  struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
1862
0
  switch (type) {
1863
0
  case IPROTO_SELECT:
1864
0
  case IPROTO_INSERT:
1865
0
  case IPROTO_REPLACE:
1866
0
  case IPROTO_UPDATE:
1867
0
  case IPROTO_DELETE:
1868
0
  case IPROTO_UPSERT:
1869
0
    assert(type < sizeof(iproto_thread->dml_route) /
1870
0
            sizeof(*iproto_thread->dml_route));
1871
0
    *route = iproto_thread->dml_route[type];
1872
0
    if (xrow_decode_dml_iproto(&msg->header, &msg->dml,
1873
0
             dml_request_key_map(type)) != 0)
1874
0
      return -1;
1875
    /*
1876
     * In contrast to replication requests, for a client request
1877
     * the xrow header is set by WAL, which generates LSNs and sets
1878
     * replica id. Ignore the header received over network.
1879
     */
1880
0
    msg->dml.header = NULL;
1881
0
    return 0;
1882
0
  case IPROTO_BEGIN:
1883
0
    *route = iproto_thread->begin_route;
1884
0
    if (xrow_decode_begin(&msg->header, &msg->begin) != 0)
1885
0
      return -1;
1886
0
    return 0;
1887
0
  case IPROTO_COMMIT:
1888
0
    *route = iproto_thread->commit_route;
1889
0
    if (xrow_decode_commit(&msg->header, &msg->commit) != 0)
1890
0
      return -1;
1891
0
    return 0;
1892
0
  case IPROTO_ROLLBACK:
1893
0
    *route = iproto_thread->rollback_route;
1894
0
    return 0;
1895
0
  case IPROTO_CALL_16:
1896
0
  case IPROTO_CALL:
1897
0
  case IPROTO_EVAL:
1898
0
    *route = iproto_thread->call_route;
1899
0
    if (xrow_decode_call(&msg->header, &msg->call))
1900
0
      return -1;
1901
0
    return 0;
1902
0
  case IPROTO_WATCH:
1903
0
  case IPROTO_UNWATCH:
1904
0
  case IPROTO_WATCH_ONCE:
1905
0
    *route = iproto_thread->misc_route;
1906
0
    ERROR_INJECT(ERRINJ_IPROTO_DISABLE_WATCH, {
1907
0
      *route = NULL;
1908
0
      return -1;
1909
0
    });
1910
0
    if (xrow_decode_watch(&msg->header, &msg->watch) != 0)
1911
0
      return -1;
1912
0
    return 0;
1913
0
  case IPROTO_EXECUTE:
1914
0
  case IPROTO_PREPARE:
1915
0
    *route = iproto_thread->sql_route;
1916
0
    if (xrow_decode_sql(&msg->header, &msg->sql) != 0)
1917
0
      return -1;
1918
0
    return 0;
1919
0
  case IPROTO_PING:
1920
0
    *route = iproto_thread->misc_route;
1921
0
    return 0;
1922
0
  case IPROTO_ID:
1923
0
    *route = iproto_thread->misc_route;
1924
0
    ERROR_INJECT(ERRINJ_IPROTO_DISABLE_ID, {
1925
0
      *route = NULL;
1926
0
      return -1;
1927
0
    });
1928
0
    if (xrow_decode_id(&msg->header, &msg->id) != 0)
1929
0
      return -1;
1930
0
    return 0;
1931
0
  case IPROTO_JOIN:
1932
0
  case IPROTO_FETCH_SNAPSHOT:
1933
0
  case IPROTO_REGISTER:
1934
0
    *route = iproto_thread->join_route;
1935
0
    return 0;
1936
0
  case IPROTO_SUBSCRIBE:
1937
0
    *route = iproto_thread->subscribe_route;
1938
0
    return 0;
1939
0
  case IPROTO_VOTE_DEPRECATED:
1940
0
  case IPROTO_VOTE:
1941
0
    *route = iproto_thread->misc_route;
1942
0
    return 0;
1943
0
  case IPROTO_AUTH:
1944
0
    *route = iproto_thread->misc_route;
1945
0
    if (xrow_decode_auth(&msg->header, &msg->auth))
1946
0
      return -1;
1947
0
    return 0;
1948
0
  default:
1949
0
    *route = NULL;
1950
0
    return -1;
1951
0
  }
1952
0
}
1953
1954
static void
1955
tx_fiber_init(struct session *session, uint64_t sync)
1956
0
{
1957
0
  struct fiber *f = fiber();
1958
  /*
1959
   * There should not be any not executed on_stop triggers
1960
   * from a previous request executed in that fiber.
1961
   */
1962
0
  assert(rlist_empty(&f->on_stop));
1963
0
  f->storage.net.sync = sync;
1964
  /*
1965
   * We do not cleanup fiber keys at the end of each request.
1966
   * This does not lead to privilege escalation as long as
1967
   * fibers used to serve iproto requests never mingle with
1968
   * fibers used to serve background tasks without going
1969
   * through the purification of fiber_recycle(), which
1970
   * resets the fiber local storage. Fibers, used to run
1971
   * background tasks clean up their session in on_stop
1972
   * trigger as well.
1973
   */
1974
0
  fiber_set_session(f, session);
1975
0
  fiber_set_user(f, &session->credentials);
1976
0
}
1977
1978
static void
1979
tx_process_rollback_on_disconnect(struct cmsg *m)
1980
0
{
1981
0
  struct iproto_stream *stream =
1982
0
    container_of(m, struct iproto_stream,
1983
0
           on_disconnect);
1984
1985
0
  if (stream->txn != NULL) {
1986
0
    tx_fiber_init(stream->connection->session, 0);
1987
0
    txn_attach(stream->txn);
1988
0
    if (box_txn_rollback() != 0)
1989
0
      panic("failed to rollback transaction on disconnect");
1990
0
    stream->txn = NULL;
1991
0
  }
1992
0
}
1993
1994
static void
1995
net_finish_rollback_on_disconnect(struct cmsg *m)
1996
0
{
1997
0
  struct iproto_stream *stream =
1998
0
    container_of(m, struct iproto_stream,
1999
0
           on_disconnect);
2000
0
  struct iproto_connection *con = stream->connection;
2001
2002
0
  struct mh_i64ptr_node_t node = { stream->id, NULL };
2003
0
  mh_i64ptr_remove(con->streams, &node, 0);
2004
0
  iproto_stream_delete(stream);
2005
0
  assert(con->state != IPROTO_CONNECTION_ALIVE);
2006
0
  if (con->state == IPROTO_CONNECTION_PENDING_DESTROY)
2007
0
    iproto_connection_try_to_start_destroy(con);
2008
0
}
2009
2010
/** Cancel all inprogress requests of the connection. */
2011
static void
2012
tx_process_cancel_inprogress(struct cmsg *m)
2013
0
{
2014
0
  struct iproto_connection *con =
2015
0
    container_of(m, struct iproto_connection, cancel_msg);
2016
0
  struct iproto_msg *msg;
2017
0
  rlist_foreach_entry(msg, &con->tx.inprogress, in_inprogress)
2018
0
    fiber_cancel(msg->fiber);
2019
0
}
2020
2021
static void
2022
tx_process_disconnect(struct cmsg *m)
2023
0
{
2024
0
  struct iproto_connection *con =
2025
0
    container_of(m, struct iproto_connection, disconnect_msg);
2026
0
  if (con->session != NULL) {
2027
0
    session_close(con->session);
2028
    /*
2029
     * When kharon returns, it should not go back - the socket is
2030
     * already dead anyway, and soon the connection itself will be
2031
     * deleted. More pushes can't come, because after the session is
2032
     * closed, its push() method is replaced with a stub.
2033
     */
2034
0
    con->tx.is_push_pending = false;
2035
0
    tx_fiber_init(con->session, 0);
2036
0
    session_run_on_disconnect_triggers(con->session);
2037
0
  }
2038
0
}
2039
2040
static void
2041
net_finish_disconnect(struct cmsg *m)
2042
0
{
2043
0
  struct iproto_connection *con =
2044
0
    container_of(m, struct iproto_connection, disconnect_msg);
2045
0
  iproto_connection_try_to_start_destroy(con);
2046
0
}
2047
2048
/**
2049
 * Destroy the session object, as well as output buffers of the
2050
 * connection.
2051
 */
2052
static void
2053
tx_process_destroy(struct cmsg *m)
2054
0
{
2055
0
  struct iproto_connection *con =
2056
0
    container_of(m, struct iproto_connection, destroy_msg);
2057
0
  assert(con->state == IPROTO_CONNECTION_DESTROYED);
2058
0
  if (con->session) {
2059
0
    session_delete(con->session);
2060
0
    con->session = NULL; /* safety */
2061
0
  }
2062
  /*
2063
   * obuf is being destroyed in tx thread cause it is where
2064
   * it was allocated.
2065
   */
2066
0
  obuf_destroy(&con->obuf[0]);
2067
0
  obuf_destroy(&con->obuf[1]);
2068
0
}
2069
2070
/**
2071
 * Cleanup the net thread resources of a connection
2072
 * and close the connection.
2073
 */
2074
static void
2075
net_finish_destroy(struct cmsg *m)
2076
0
{
2077
0
  struct iproto_connection *con =
2078
0
    container_of(m, struct iproto_connection, destroy_msg);
2079
  /* Runs the trigger, which may yield. */
2080
0
  iproto_connection_delete(con);
2081
0
}
2082
2083
/** Account msg data in connection input buffer as processed. */
2084
static void
2085
iproto_msg_finish_input(iproto_msg *msg)
2086
0
{
2087
0
  struct iproto_connection *con = msg->connection;
2088
0
  struct ibuf *ibuf = msg->p_ibuf;
2089
0
  size_t *count = &con->input_msg_count[msg->p_ibuf == &con->ibuf[1]];
2090
  /*
2091
   * Consume data from input buffer only when data of all messages
2092
   * is processed because messages process order and order of messages
2093
   * in the buffer may differ.
2094
   */
2095
0
  assert(*count != 0);
2096
0
  if (--(*count) == 0) {
2097
0
    size_t processed = ibuf_used(ibuf);
2098
0
    if (ibuf == con->p_ibuf) {
2099
0
      assert(processed >= con->parse_size);
2100
0
      processed -= con->parse_size;
2101
0
    }
2102
0
    ibuf_consume(ibuf, processed);
2103
0
  }
2104
0
}
2105
2106
static void
2107
net_discard_input(struct cmsg *m)
2108
0
{
2109
0
  struct iproto_msg *msg = container_of(m, struct iproto_msg,
2110
0
                discard_input);
2111
0
  struct iproto_connection *con = msg->connection;
2112
0
  iproto_msg_finish_input(msg);
2113
0
  msg->len = 0;
2114
0
  con->long_poll_count++;
2115
0
  if (con->state == IPROTO_CONNECTION_ALIVE)
2116
0
    iproto_connection_feed_input(con);
2117
0
}
2118
2119
static void
2120
tx_discard_input(struct iproto_msg *msg)
2121
0
{
2122
0
  struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
2123
0
  static const struct cmsg_hop discard_input_route[] = {
2124
0
    { net_discard_input, NULL },
2125
0
  };
2126
0
  cmsg_init(&msg->discard_input, discard_input_route);
2127
0
  cpipe_push(&iproto_thread->net_pipe, &msg->discard_input);
2128
0
}
2129
2130
/**
2131
 * The goal of this function is to maintain the state of
2132
 * two rotating connection output buffers in tx thread.
2133
 *
2134
 * The function enforces the following rules:
2135
 * - if both out buffers are empty, any one is selected;
2136
 * - if one of the buffers is empty, and the other is
2137
 *   not, the empty buffer is selected.
2138
 * - if neither of the buffers are empty, the function
2139
 *   does not rotate the buffer.
2140
 *
2141
 * @param con iproto connection.
2142
 * @param wpos Last flushed write position, received from iproto
2143
 *        thread.
2144
 */
2145
static void
2146
tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
2147
0
{
2148
0
  struct obuf *prev = &con->obuf[con->tx.p_obuf == con->obuf];
2149
0
  if (wpos->obuf == con->tx.p_obuf) {
2150
    /*
2151
     * We got a message advancing the buffer which
2152
     * is being appended to. The previous buffer is
2153
     * guaranteed to have been flushed first, since
2154
     * buffers are never flushed out of order.
2155
     */
2156
0
    if (obuf_size(prev) != 0)
2157
0
      obuf_reset(prev);
2158
0
  }
2159
0
  if (obuf_size(con->tx.p_obuf) != 0 && obuf_size(prev) == 0) {
2160
    /*
2161
     * If the current buffer is not empty, and the
2162
     * previous buffer has been flushed, rotate
2163
     * the current buffer.
2164
     */
2165
0
    con->tx.p_obuf = prev;
2166
0
  }
2167
0
}
2168
2169
/**
2170
 * Since the processing of requests within a transaction
2171
 * for a stream can occur in different fibers, we store
2172
 * a pointer to transaction in the stream structure.
2173
 * Check if message belongs to stream and there is active
2174
 * transaction for this stream. In case it is so, sets this
2175
 * transaction for current fiber.
2176
 */
2177
static inline void
2178
tx_prepare_transaction_for_request(struct iproto_msg *msg)
2179
0
{
2180
0
  if (msg->stream != NULL && msg->stream->txn != NULL) {
2181
0
    txn_attach(msg->stream->txn);
2182
0
    msg->stream->txn = NULL;
2183
0
  }
2184
0
  assert(!in_txn() || msg->stream != NULL);
2185
0
}
2186
2187
static inline struct iproto_msg *
2188
tx_accept_msg(struct cmsg *m)
2189
0
{
2190
0
  struct iproto_msg *msg = (struct iproto_msg *) m;
2191
0
  if (msg->fiber != NULL)
2192
0
    return msg;
2193
0
  tx_accept_wpos(msg->connection, &msg->wpos);
2194
0
  tx_fiber_init(msg->connection->session, msg->header.sync);
2195
0
  tx_prepare_transaction_for_request(msg);
2196
0
  msg->connection->iproto_thread->tx.requests_in_progress++;
2197
0
  rlist_add_entry(&msg->connection->tx.inprogress, msg,
2198
0
      in_inprogress);
2199
0
  msg->fiber = fiber();
2200
0
  rmean_collect(msg->connection->iproto_thread->tx.rmean,
2201
0
          REQUESTS_IN_PROGRESS, 1);
2202
0
  flightrec_write_request(msg->reqstart, msg->len);
2203
0
  return msg;
2204
0
}
2205
2206
/**
2207
 * Check if the watch request key is in the white list which doesn't need
2208
 * additional checks.
2209
 * The only allowed subscription is to "internal.ballot" event - the one used by
2210
 * replication instead of IPROTO_VOTE on Tarantool 2.11+.
2211
 */
2212
static bool
2213
check_watch_key(const char *key, uint32_t len)
2214
0
{
2215
0
  if (len != strlen(box_ballot_event_key))
2216
0
    return false;
2217
0
  return strncmp(key, box_ballot_event_key, len) == 0;
2218
0
}
2219
2220
/**
2221
 * Check if the tx thread may continue with processing an accepted message.
2222
 * If something's wrong, returns -1 and sets diag, otherwise returns 0.
2223
 */
2224
static int
2225
tx_check_msg(struct iproto_msg *msg)
2226
0
{
2227
0
  uint64_t new_schema_version = msg->header.schema_version;
2228
0
  if (new_schema_version != 0 && new_schema_version != schema_version) {
2229
0
    diag_set(ClientError, ER_WRONG_SCHEMA_VERSION,
2230
0
       new_schema_version, schema_version);
2231
0
    return -1;
2232
0
  }
2233
0
  enum iproto_type type = (enum iproto_type)msg->header.type;
2234
0
  if (type != IPROTO_AUTH && type != IPROTO_PING && type != IPROTO_ID &&
2235
0
      type != IPROTO_VOTE && type != IPROTO_VOTE_DEPRECATED &&
2236
0
      (type != IPROTO_WATCH ||
2237
0
       !check_watch_key(msg->watch.key, msg->watch.key_len)) &&
2238
0
      security_check_session() != 0)
2239
0
    return -1;
2240
0
  return 0;
2241
0
}
2242
2243
static inline void
2244
tx_end_msg(struct iproto_msg *msg, struct obuf_svp *svp)
2245
0
{
2246
0
  if (msg->stream != NULL) {
2247
0
    assert(msg->stream->txn == NULL);
2248
0
    msg->stream->txn = txn_detach();
2249
0
  }
2250
0
  msg->connection->iproto_thread->tx.requests_in_progress--;
2251
0
  rlist_del(&msg->in_inprogress);
2252
0
  msg->fiber = NULL;
2253
0
  struct obuf *out = msg->connection->tx.p_obuf;
2254
0
  if (msg->connection->tx.p_obuf->used != svp->used)
2255
    /* Log response to the flight recorder. */
2256
0
    flightrec_write_response(out, svp);
2257
0
}
2258
2259
/**
2260
 * Write error message to the output buffer and advance write position.
2261
 */
2262
static void
2263
tx_reply_error(struct iproto_msg *msg)
2264
0
{
2265
0
  struct obuf *out = msg->connection->tx.p_obuf;
2266
0
  iproto_reply_error(out, diag_last_error(&fiber()->diag),
2267
0
         msg->header.sync, ::schema_version);
2268
0
  iproto_wpos_create(&msg->wpos, out);
2269
0
}
2270
2271
/**
2272
 * Write error from iproto thread to the output buffer and advance
2273
 * write position.
2274
 */
2275
static void
2276
tx_reply_iproto_error(struct cmsg *m)
2277
0
{
2278
0
  struct iproto_msg *msg = tx_accept_msg(m);
2279
0
  struct obuf *out = msg->connection->tx.p_obuf;
2280
0
  struct obuf_svp header = obuf_create_svp(out);
2281
0
  iproto_reply_error(out, diag_last_error(&msg->diag),
2282
0
         msg->header.sync, ::schema_version);
2283
0
  iproto_wpos_create(&msg->wpos, out);
2284
0
  tx_end_msg(msg, &header);
2285
0
}
2286
2287
/** Inject a short delay on tx request processing for testing. */
2288
static inline void
2289
tx_inject_delay(void)
2290
0
{
2291
0
  ERROR_INJECT(ERRINJ_IPROTO_TX_DELAY, {
2292
0
    if (rand() % 100 < 10)
2293
0
      fiber_sleep(0.001);
2294
0
  });
2295
0
}
2296
2297
static void
2298
tx_process_begin(struct cmsg *m)
2299
0
{
2300
0
  struct iproto_msg *msg = tx_accept_msg(m);
2301
0
  struct obuf *out;
2302
0
  struct obuf_svp header;
2303
0
  uint32_t txn_isolation = msg->begin.txn_isolation;
2304
0
  bool is_sync = msg->begin.is_sync;
2305
2306
0
  if (tx_check_msg(msg) != 0)
2307
0
    goto error;
2308
2309
0
  if (box_txn_begin() != 0)
2310
0
    goto error;
2311
2312
0
  if (msg->begin.timeout != 0 &&
2313
0
      box_txn_set_timeout(msg->begin.timeout) != 0) {
2314
0
    int rc = box_txn_rollback();
2315
0
    assert(rc == 0);
2316
0
    (void)rc;
2317
0
    goto error;
2318
0
  }
2319
0
  if (box_txn_set_isolation(txn_isolation) != 0) {
2320
0
    int rc = box_txn_rollback();
2321
0
    assert(rc == 0);
2322
0
    (void)rc;
2323
0
    goto error;
2324
0
  }
2325
0
  if (is_sync)
2326
0
    box_txn_make_sync();
2327
2328
0
  out = msg->connection->tx.p_obuf;
2329
0
  header = obuf_create_svp(out);
2330
0
  iproto_reply_ok(out, msg->header.sync, ::schema_version);
2331
0
  iproto_wpos_create(&msg->wpos, out);
2332
0
  tx_end_msg(msg, &header);
2333
0
  return;
2334
0
error:
2335
0
  out = msg->connection->tx.p_obuf;
2336
0
  header = obuf_create_svp(out);
2337
0
  tx_reply_error(msg);
2338
0
  tx_end_msg(msg, &header);
2339
0
}
2340
2341
static void
2342
tx_process_commit(struct cmsg *m)
2343
0
{
2344
0
  struct iproto_msg *msg = tx_accept_msg(m);
2345
0
  struct obuf *out;
2346
0
  struct obuf_svp header;
2347
0
  bool is_sync = msg->commit.is_sync;
2348
2349
0
  if (tx_check_msg(msg) != 0)
2350
0
    goto error;
2351
2352
0
  if (is_sync)
2353
0
    box_txn_make_sync();
2354
2355
0
  if (box_txn_commit() != 0)
2356
0
    goto error;
2357
2358
0
  out = msg->connection->tx.p_obuf;
2359
0
  header = obuf_create_svp(out);
2360
0
  iproto_reply_ok(out, msg->header.sync, ::schema_version);
2361
0
  iproto_wpos_create(&msg->wpos, out);
2362
0
  tx_end_msg(msg, &header);
2363
0
  return;
2364
0
error:
2365
0
  out = msg->connection->tx.p_obuf;
2366
0
  header = obuf_create_svp(out);
2367
0
  tx_reply_error(msg);
2368
0
  tx_end_msg(msg, &header);
2369
0
}
2370
2371
static void
2372
tx_process_rollback(struct cmsg *m)
2373
0
{
2374
0
  struct iproto_msg *msg = tx_accept_msg(m);
2375
0
  struct obuf *out;
2376
0
  struct obuf_svp header;
2377
2378
0
  if (tx_check_msg(msg) != 0)
2379
0
    goto error;
2380
2381
0
  if (box_txn_rollback() != 0)
2382
0
    goto error;
2383
2384
0
  out = msg->connection->tx.p_obuf;
2385
0
  header = obuf_create_svp(out);
2386
0
  iproto_reply_ok(out, msg->header.sync, ::schema_version);
2387
0
  iproto_wpos_create(&msg->wpos, out);
2388
0
  tx_end_msg(msg, &header);
2389
0
  return;
2390
0
error:
2391
0
  out = msg->connection->tx.p_obuf;
2392
0
  header = obuf_create_svp(out);
2393
0
  tx_reply_error(msg);
2394
0
  tx_end_msg(msg, &header);
2395
0
}
2396
2397
/*
2398
 * In case the request does not contain a space or identifier but contains a
2399
 * corresponding name, tries to resolve the name.
2400
 */
2401
static int
2402
tx_resolve_space_and_index_name(struct request *dml)
2403
0
{
2404
0
  struct space *space = NULL;
2405
0
  if (dml->space_name != NULL) {
2406
0
    space = space_by_name(dml->space_name, dml->space_name_len);
2407
0
    if (space == NULL) {
2408
0
      diag_set(ClientError, ER_NO_SUCH_SPACE,
2409
0
         tt_cstr(dml->space_name, dml->space_name_len));
2410
0
      return -1;
2411
0
    }
2412
0
    dml->space_id = space->def->id;
2413
0
  }
2414
0
  if ((dml->type == IPROTO_SELECT || dml->type == IPROTO_UPDATE ||
2415
0
       dml->type == IPROTO_DELETE) && dml->index_name != NULL) {
2416
0
    if (space == NULL)
2417
0
      space = space_cache_find(dml->space_id);
2418
0
    if (space == NULL)
2419
0
      return -1;
2420
0
    struct index *idx = space_index_by_name(space, dml->index_name,
2421
0
              dml->index_name_len);
2422
0
    if (idx == NULL) {
2423
0
      diag_set(ClientError, ER_NO_SUCH_INDEX_NAME,
2424
0
         tt_cstr(dml->index_name, dml->index_name_len),
2425
0
         space->def->name);
2426
0
      return -1;
2427
0
    }
2428
0
    dml->index_id = idx->dense_id;
2429
0
  }
2430
0
  return 0;
2431
0
}
2432
2433
static void
2434
tx_process1(struct cmsg *m)
2435
0
{
2436
0
  struct iproto_msg *msg = tx_accept_msg(m);
2437
0
  bool box_tuple_as_ext =
2438
0
    iproto_features_test(&msg->connection->session->meta.features,
2439
0
             IPROTO_FEATURE_DML_TUPLE_EXTENSION);
2440
0
  struct tuple_format_map format_map;
2441
0
  tuple_format_map_create_empty(&format_map);
2442
0
  auto format_map_guard = make_scoped_guard([&format_map] {
2443
0
    tuple_format_map_destroy(&format_map);
2444
0
  });
2445
0
  if (tx_check_msg(msg) != 0)
2446
0
    goto error;
2447
2448
0
  struct tuple *tuple;
2449
0
  struct obuf_svp svp;
2450
0
  struct obuf *out;
2451
0
  tx_inject_delay();
2452
0
  if (tx_resolve_space_and_index_name(&msg->dml) != 0)
2453
0
    goto error;
2454
0
  if (box_process1(&msg->dml, &tuple) != 0)
2455
0
    goto error;
2456
0
  out = msg->connection->tx.p_obuf;
2457
0
  iproto_prepare_select(out, &svp);
2458
0
  if (tuple != NULL) {
2459
0
    if (box_tuple_as_ext) {
2460
0
      tuple_format_map_add_format(&format_map,
2461
0
                tuple->format_id);
2462
0
      if (tuple_to_obuf_as_ext(tuple, out) != 0)
2463
0
        goto error;
2464
0
    } else if (tuple_to_obuf(tuple, out) != 0) {
2465
0
      goto error;
2466
0
    }
2467
0
  }
2468
  /*
2469
   * Even if there is no tuple, we still need to send an empty tuple
2470
   * format map.
2471
   */
2472
0
  if (box_tuple_as_ext &&
2473
0
      tuple_format_map_to_iproto_obuf(&format_map, out) != 0)
2474
0
    goto error;
2475
0
  iproto_reply_select(out, &svp, msg->header.sync, ::schema_version,
2476
0
          tuple != 0, box_tuple_as_ext);
2477
0
  iproto_wpos_create(&msg->wpos, out);
2478
0
  tx_end_msg(msg, &svp);
2479
0
  return;
2480
0
error:
2481
0
  out = msg->connection->tx.p_obuf;
2482
0
  svp = obuf_create_svp(out);
2483
0
  tx_reply_error(msg);
2484
0
  tx_end_msg(msg, &svp);
2485
0
}
2486
2487
static void
2488
tx_process_select(struct cmsg *m)
2489
0
{
2490
0
  struct iproto_msg *msg = tx_accept_msg(m);
2491
0
  bool box_tuple_as_ext =
2492
0
    iproto_features_test(&msg->connection->session->meta.features,
2493
0
             IPROTO_FEATURE_DML_TUPLE_EXTENSION);
2494
0
  struct obuf *out;
2495
0
  struct obuf_svp svp;
2496
0
  struct port port;
2497
2498
0
  struct mp_box_ctx ctx;
2499
0
  struct mp_ctx *ctx_ref = NULL;
2500
0
  if (box_tuple_as_ext) {
2501
0
    mp_box_ctx_create(&ctx, NULL, NULL);
2502
0
    ctx_ref = (struct mp_ctx *)&ctx;
2503
0
  }
2504
0
  auto ctx_guard = make_scoped_guard([ctx_ref] {
2505
0
    mp_ctx_destroy(ctx_ref);
2506
0
  });
2507
0
  ctx_guard.is_active = box_tuple_as_ext;
2508
2509
0
  int count;
2510
0
  int rc;
2511
0
  const char *packed_pos, *packed_pos_end;
2512
0
  bool reply_position;
2513
0
  struct request *req = &msg->dml;
2514
0
  uint32_t region_svp = region_used(&fiber()->gc);
2515
0
  if (tx_check_msg(msg) != 0)
2516
0
    goto error;
2517
2518
0
  tx_inject_delay();
2519
0
  if (tx_resolve_space_and_index_name(&msg->dml) != 0)
2520
0
    goto error;
2521
0
  packed_pos = req->after_position;
2522
0
  packed_pos_end = req->after_position_end;
2523
0
  if (packed_pos != NULL) {
2524
0
    mp_decode_strl(&packed_pos);
2525
0
  } else if (req->after_tuple != NULL) {
2526
0
    rc = box_index_tuple_position(req->space_id, req->index_id,
2527
0
                req->after_tuple,
2528
0
                req->after_tuple_end,
2529
0
                &packed_pos, &packed_pos_end);
2530
0
    if (rc < 0)
2531
0
      goto error;
2532
0
  }
2533
0
  rc = box_select(req->space_id, req->index_id,
2534
0
      req->iterator, req->offset, req->limit,
2535
0
      req->key, req->key_end, &packed_pos, &packed_pos_end,
2536
0
      req->fetch_position, &port);
2537
0
  if (rc < 0)
2538
0
    goto error;
2539
2540
0
  out = msg->connection->tx.p_obuf;
2541
0
  reply_position = req->fetch_position && packed_pos != NULL;
2542
0
  if (reply_position)
2543
0
    iproto_prepare_select_with_position(out, &svp);
2544
0
  else
2545
0
    iproto_prepare_select(out, &svp);
2546
  /*
2547
   * SELECT output format has not changed since Tarantool 1.6
2548
   */
2549
0
  count = port_dump_msgpack_16_with_ctx(&port, out, ctx_ref);
2550
0
  port_destroy(&port);
2551
0
  if (count < 0 || (box_tuple_as_ext &&
2552
0
        tuple_format_map_to_iproto_obuf(&ctx.tuple_format_map,
2553
0
                out) != 0)) {
2554
0
    goto discard;
2555
0
  }
2556
0
  if (reply_position) {
2557
0
    assert(packed_pos != NULL);
2558
0
    iproto_reply_select_with_position(out, &svp, msg->header.sync,
2559
0
              ::schema_version, count,
2560
0
              packed_pos, packed_pos_end,
2561
0
              box_tuple_as_ext);
2562
0
  } else {
2563
0
    iproto_reply_select(out, &svp, msg->header.sync,
2564
0
            ::schema_version, count, box_tuple_as_ext);
2565
0
  }
2566
0
  region_truncate(&fiber()->gc, region_svp);
2567
0
  iproto_wpos_create(&msg->wpos, out);
2568
0
  tx_end_msg(msg, &svp);
2569
0
  return;
2570
0
discard:
2571
  /* Discard the prepared select. */
2572
0
  obuf_rollback_to_svp(out, &svp);
2573
0
error:
2574
0
  region_truncate(&fiber()->gc, region_svp);
2575
0
  out = msg->connection->tx.p_obuf;
2576
0
  svp = obuf_create_svp(out);
2577
0
  tx_reply_error(msg);
2578
0
  tx_end_msg(msg, &svp);
2579
0
}
2580
2581
static int
2582
tx_process_call_on_yield(struct trigger *trigger, void *event)
2583
0
{
2584
0
  (void)event;
2585
0
  struct iproto_msg *msg = (struct iproto_msg *)trigger->data;
2586
0
  TRASH(&msg->call);
2587
0
  tx_discard_input(msg);
2588
0
  trigger_clear(trigger);
2589
0
  return 0;
2590
0
}
2591
2592
static void
2593
tx_process_call(struct cmsg *m)
2594
0
{
2595
0
  struct iproto_msg *msg = tx_accept_msg(m);
2596
2597
0
  bool box_tuple_as_ext =
2598
0
    iproto_features_test(&msg->connection->session->meta.features,
2599
0
             IPROTO_FEATURE_CALL_RET_TUPLE_EXTENSION);
2600
0
  struct mp_box_ctx ctx;
2601
0
  struct mp_ctx *ctx_ref = NULL;
2602
0
  if (box_tuple_as_ext) {
2603
0
    mp_box_ctx_create(&ctx, NULL, NULL);
2604
0
    ctx_ref = (struct mp_ctx *)&ctx;
2605
0
  }
2606
0
  auto ctx_guard = make_scoped_guard([ctx_ref] {
2607
0
    mp_ctx_destroy(ctx_ref);
2608
0
  });
2609
0
  ctx_guard.is_active = box_tuple_as_ext;
2610
2611
0
  if (tx_check_msg(msg) != 0)
2612
0
    goto error;
2613
2614
  /*
2615
   * CALL/EVAL should copy its arguments so we can discard
2616
   * input on yield to avoid stalling other connections by
2617
   * a long polling request.
2618
   */
2619
0
  struct trigger fiber_on_yield;
2620
0
  trigger_create(&fiber_on_yield, tx_process_call_on_yield, msg, NULL);
2621
0
  trigger_add(&fiber()->on_yield, &fiber_on_yield);
2622
2623
0
  int rc;
2624
0
  struct port port;
2625
2626
0
  switch (msg->header.type) {
2627
0
  case IPROTO_CALL:
2628
0
  case IPROTO_CALL_16:
2629
0
    rc = box_process_call(&msg->call, &port);
2630
0
    break;
2631
0
  case IPROTO_EVAL:
2632
0
    rc = box_process_eval(&msg->call, &port);
2633
0
    break;
2634
0
  default:
2635
0
    unreachable();
2636
0
  }
2637
2638
0
  trigger_clear(&fiber_on_yield);
2639
2640
0
  if (rc != 0)
2641
0
    goto error;
2642
2643
0
  if (in_txn() != NULL && msg->header.stream_id == 0) {
2644
0
    diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
2645
0
    port_destroy(&port);
2646
0
    goto error;
2647
0
  }
2648
2649
  /*
2650
   * Add all elements returned by the function to iproto.
2651
   *
2652
   * To allow clients to understand a complex return from
2653
   * a procedure, we are compatible with SELECT protocol,
2654
   * and return the number of return values first, and
2655
   * then each return value as a tuple.
2656
   *
2657
   * (!) Please note that a save point for output buffer
2658
   * must be taken only after finishing executing of Lua
2659
   * function because Lua can yield and leave the
2660
   * buffer in inconsistent state (a parallel request
2661
   * from the same connection will break the protocol).
2662
   */
2663
2664
0
  int count;
2665
0
  struct obuf *out;
2666
0
  struct obuf_svp svp;
2667
2668
0
  out = msg->connection->tx.p_obuf;
2669
0
  iproto_prepare_select(out, &svp);
2670
2671
0
  if (msg->header.type == IPROTO_CALL_16)
2672
0
    count = port_dump_msgpack_16_with_ctx(&port, out, ctx_ref);
2673
0
  else
2674
0
    count = port_dump_msgpack_with_ctx(&port, out, ctx_ref);
2675
2676
0
  port_destroy(&port);
2677
0
  if (count < 0 || (box_tuple_as_ext &&
2678
0
        tuple_format_map_to_iproto_obuf(&ctx.tuple_format_map,
2679
0
                out) != 0)) {
2680
0
    obuf_rollback_to_svp(out, &svp);
2681
0
    goto error;
2682
0
  }
2683
0
  iproto_reply_select(out, &svp, msg->header.sync,
2684
0
          ::schema_version, count, box_tuple_as_ext);
2685
0
  iproto_wpos_create(&msg->wpos, out);
2686
0
  tx_end_msg(msg, &svp);
2687
0
  return;
2688
0
error:
2689
0
  out = msg->connection->tx.p_obuf;
2690
0
  svp = obuf_create_svp(out);
2691
0
  tx_reply_error(msg);
2692
0
  tx_end_msg(msg, &svp);
2693
0
}
2694
2695
static void
2696
tx_process_id(struct iproto_connection *con, const struct id_request *id)
2697
0
{
2698
0
  extern bool box_tuple_extension;
2699
0
  con->session->meta.features = id->features;
2700
0
  if (!box_tuple_extension)
2701
0
    iproto_features_clear(&con->session->meta.features,
2702
0
              IPROTO_FEATURE_CALL_RET_TUPLE_EXTENSION);
2703
0
}
2704
2705
/** Callback passed to session_watch. */
2706
static void
2707
iproto_session_notify(struct session *session, uint64_t sync,
2708
          const char *key, size_t key_len,
2709
          const char *data, const char *data_end);
2710
2711
static void
2712
tx_process_misc(struct cmsg *m)
2713
0
{
2714
0
  struct iproto_msg *msg = tx_accept_msg(m);
2715
0
  struct iproto_connection *con = msg->connection;
2716
0
  struct obuf *out = con->tx.p_obuf;
2717
0
  struct obuf_svp header;
2718
0
  assert(!(msg->header.type != IPROTO_PING && in_txn()));
2719
0
  if (tx_check_msg(msg) != 0)
2720
0
    goto error;
2721
2722
0
  struct ballot ballot;
2723
0
  header = obuf_create_svp(out);
2724
0
  switch (msg->header.type) {
2725
0
  case IPROTO_AUTH:
2726
0
    if (box_process_auth(&msg->auth, con->salt,
2727
0
             IPROTO_SALT_SIZE) != 0)
2728
0
      goto error;
2729
0
    iproto_reply_ok(out, msg->header.sync, ::schema_version);
2730
0
    break;
2731
0
  case IPROTO_PING:
2732
0
    iproto_reply_ok(out, msg->header.sync, ::schema_version);
2733
0
    break;
2734
0
  case IPROTO_ID:
2735
0
    tx_process_id(con, &msg->id);
2736
0
    iproto_reply_id(out, box_auth_type, msg->header.sync,
2737
0
        ::schema_version);
2738
0
    break;
2739
0
  case IPROTO_VOTE_DEPRECATED:
2740
0
    iproto_reply_vclock(out, &replicaset.vclock, msg->header.sync,
2741
0
            ::schema_version);
2742
0
    break;
2743
0
  case IPROTO_VOTE:
2744
0
    box_process_vote(&ballot);
2745
0
    iproto_reply_vote(out, &ballot, msg->header.sync,
2746
0
          ::schema_version);
2747
0
    break;
2748
0
  case IPROTO_WATCH:
2749
0
    session_watch(con->session, msg->header.sync,
2750
0
            msg->watch.key, msg->watch.key_len,
2751
0
            iproto_session_notify);
2752
    /* Sic: no reply. */
2753
0
    break;
2754
0
  case IPROTO_UNWATCH:
2755
0
    session_unwatch(con->session, msg->watch.key,
2756
0
        msg->watch.key_len);
2757
    /* Sic: no reply. */
2758
0
    break;
2759
0
  case IPROTO_WATCH_ONCE: {
2760
0
    const char *data, *data_end;
2761
0
    data = box_watch_once(msg->watch.key, msg->watch.key_len,
2762
0
              &data_end);
2763
0
    iproto_prepare_select(out, &header);
2764
0
    xobuf_dup(out, data, data_end - data);
2765
0
    iproto_reply_select(out, &header, msg->header.sync,
2766
0
            ::schema_version, data != NULL ? 1 : 0,
2767
0
            /*box_tuple_as_ext=*/false);
2768
0
    break;
2769
0
  }
2770
0
  default:
2771
0
    unreachable();
2772
0
  }
2773
0
  iproto_wpos_create(&msg->wpos, out);
2774
0
  tx_end_msg(msg, &header);
2775
0
  return;
2776
0
error:
2777
0
  header = obuf_create_svp(out);
2778
0
  tx_reply_error(msg);
2779
0
  tx_end_msg(msg, &header);
2780
0
}
2781
2782
static void
2783
tx_process_sql(struct cmsg *m)
2784
0
{
2785
0
  struct iproto_msg *msg = tx_accept_msg(m);
2786
0
  struct obuf *out;
2787
0
  struct port port;
2788
0
  RegionGuard region_guard(&fiber()->gc);
2789
2790
0
  if (tx_check_msg(msg) != 0)
2791
0
    goto error;
2792
0
  assert(msg->header.type == IPROTO_EXECUTE ||
2793
0
         msg->header.type == IPROTO_PREPARE);
2794
0
  tx_inject_delay();
2795
0
  if (box_process_sql(&msg->sql, &port) != 0)
2796
0
    goto error;
2797
  /*
2798
   * Take an obuf only after execute(). Else the buffer can
2799
   * become out of date during yield.
2800
   */
2801
0
  out = msg->connection->tx.p_obuf;
2802
0
  struct obuf_svp header_svp;
2803
0
  iproto_prepare_header(out, &header_svp, IPROTO_HEADER_LEN);
2804
0
  if (port_dump_msgpack(&port, out) != 0) {
2805
0
    port_destroy(&port);
2806
0
    obuf_rollback_to_svp(out, &header_svp);
2807
0
    goto error;
2808
0
  }
2809
0
  port_destroy(&port);
2810
0
  iproto_reply_sql(out, &header_svp, msg->header.sync, schema_version);
2811
0
  iproto_wpos_create(&msg->wpos, out);
2812
0
  tx_end_msg(msg, &header_svp);
2813
0
  return;
2814
0
error:
2815
0
  out = msg->connection->tx.p_obuf;
2816
0
  header_svp = obuf_create_svp(out);
2817
0
  tx_reply_error(msg);
2818
0
  tx_end_msg(msg, &header_svp);
2819
0
}
2820
2821
static void
2822
tx_process_replication(struct cmsg *m)
2823
0
{
2824
0
  struct iproto_msg *msg = tx_accept_msg(m);
2825
0
  struct iproto_connection *con = msg->connection;
2826
0
  struct iostream *io = &con->io;
2827
0
  assert(!in_txn());
2828
0
  try {
2829
0
    if (tx_check_msg(msg) != 0)
2830
0
      diag_raise();
2831
0
    switch (msg->header.type) {
2832
0
    case IPROTO_JOIN:
2833
      /*
2834
       * As soon as box_process_subscribe() returns
2835
       * the lambda in the beginning of the block
2836
       * will re-activate the watchers for us.
2837
       */
2838
0
      box_process_join(io, &msg->header);
2839
0
      break;
2840
0
    case IPROTO_FETCH_SNAPSHOT:
2841
0
      box_process_fetch_snapshot(io, &msg->header);
2842
0
      break;
2843
0
    case IPROTO_REGISTER:
2844
0
      box_process_register(io, &msg->header);
2845
0
      break;
2846
0
    case IPROTO_SUBSCRIBE:
2847
      /*
2848
       * Subscribe never returns - unless there
2849
       * is an error/exception. In that case
2850
       * the write watcher will be re-activated
2851
       * the same way as for JOIN.
2852
       */
2853
0
      box_process_subscribe(io, &msg->header);
2854
0
      break;
2855
0
    default:
2856
0
      unreachable();
2857
0
    }
2858
0
  } catch (SocketError *e) {
2859
    /* don't write error response to prevent SIGPIPE */
2860
0
  } catch (TimedOut *e) {
2861
     /*
2862
      * In case of a timeout the error could come after a partially
2863
      * written row. Do not push it on top.
2864
      */
2865
0
  } catch (FiberIsCancelled *e) {
2866
    /* Do not write into connection on connection drop. */
2867
0
  } catch (Exception *e) {
2868
0
    iproto_write_error(io, e, ::schema_version, msg->header.sync);
2869
0
  }
2870
0
  struct obuf_svp empty = obuf_create_svp(msg->connection->tx.p_obuf);
2871
0
  tx_end_msg(msg, &empty);
2872
0
}
2873
2874
/**
2875
 * Allocates a new `iproto_req_handlers'. The memory is set to zero.
2876
 */
2877
static struct iproto_req_handlers *
2878
iproto_req_handlers_new(void)
2879
0
{
2880
0
  struct iproto_req_handlers *handlers;
2881
0
  handlers = (struct iproto_req_handlers *)xmalloc(sizeof(*handlers));
2882
0
  memset(handlers, 0, sizeof(*handlers));
2883
0
  return handlers;
2884
0
}
2885
2886
/**
2887
 * Destroys all handlers and deallocates the `handlers' structure.
2888
 */
2889
static void
2890
iproto_req_handlers_delete(struct iproto_req_handlers *handlers)
2891
0
{
2892
0
  if (handlers->event_by_id != NULL)
2893
0
    event_unref(handlers->event_by_id);
2894
0
  if (handlers->event_by_name != NULL)
2895
0
    event_unref(handlers->event_by_name);
2896
0
  if (handlers->c.destroy != NULL)
2897
0
    handlers->c.destroy(handlers->c.ctx);
2898
0
  TRASH(handlers);
2899
0
  free(handlers);
2900
0
}
2901
2902
/**
2903
 * Inserts `handlers' for the given `req_type' into the `tx_req_handlers' table.
2904
 * There must be no previous entries in the table for this key.
2905
 */
2906
static void
2907
mh_req_handlers_put(uint32_t req_type, struct iproto_req_handlers *handlers)
2908
0
{
2909
0
  struct mh_i32ptr_node_t old;
2910
0
  struct mh_i32ptr_node_t *replaced = &old;
2911
0
  struct mh_i32ptr_node_t node = {
2912
0
    /* .key = */ req_type,
2913
0
    /* .val = */ handlers,
2914
0
  };
2915
0
  mh_i32ptr_put(tx_req_handlers, &node, &replaced, NULL);
2916
0
  assert(replaced == NULL);
2917
0
}
2918
2919
/**
2920
 * Returns a pointer to `iproto_req_handlers' for the given IPROTO request
2921
 * `req_type', or NULL if there are no such handlers.
2922
 */
2923
static struct iproto_req_handlers *
2924
mh_req_handlers_get(uint32_t req_type)
2925
0
{
2926
0
  mh_int_t k = mh_i32ptr_find(tx_req_handlers, req_type, NULL);
2927
0
  if (k == mh_end(tx_req_handlers))
2928
0
    return NULL;
2929
0
  struct mh_i32ptr_node_t *node = mh_i32ptr_node(tx_req_handlers, k);
2930
0
  return (struct iproto_req_handlers *)node->val;
2931
0
}
2932
2933
/**
2934
 * Deletes the handlers of IPROTO request `req_type' from the `tx_req_handlers'
2935
 * hash table. The entry must be present in the table.
2936
 */
2937
static void
2938
mh_req_handlers_del(uint32_t req_type)
2939
0
{
2940
0
  mh_int_t k = mh_i32ptr_find(tx_req_handlers, req_type, NULL);
2941
0
  assert(k != mh_end(tx_req_handlers));
2942
0
  mh_i32ptr_del(tx_req_handlers, k, NULL);
2943
0
}
2944
2945
/**
2946
 * Replaces an event in `handlers' by the new `event'. If `is_by_id', the
2947
 * handler is set by request type id, otherwise it is set by request type name.
2948
 */
2949
static void
2950
iproto_req_handlers_set_event(struct iproto_req_handlers *handlers,
2951
            struct event *event, bool is_by_id)
2952
0
{
2953
0
  assert(handlers != NULL);
2954
0
  assert(event != NULL);
2955
2956
0
  if (is_by_id) {
2957
0
    if (handlers->event_by_id == NULL) {
2958
0
      event_ref(event);
2959
0
      handlers->event_by_id = event;
2960
0
    } else {
2961
0
      assert(handlers->event_by_id == event);
2962
0
    }
2963
0
  } else {
2964
0
    if (handlers->event_by_name == NULL) {
2965
0
      event_ref(event);
2966
0
      handlers->event_by_name = event;
2967
0
    } else {
2968
0
      assert(handlers->event_by_name == event);
2969
0
    }
2970
0
  }
2971
0
}
2972
2973
/**
2974
 * Deletes an event, which is set in `handlers' by request type id (if
2975
 * `is_by_id'), or by request type name.
2976
 */
2977
static void
2978
iproto_req_handlers_del_event(struct iproto_req_handlers *handlers,
2979
            bool is_by_id)
2980
0
{
2981
0
  assert(handlers != NULL);
2982
2983
0
  if (is_by_id) {
2984
0
    event_unref(handlers->event_by_id);
2985
0
    handlers->event_by_id = NULL;
2986
0
  } else {
2987
0
    event_unref(handlers->event_by_name);
2988
0
    handlers->event_by_name = NULL;
2989
0
  }
2990
0
}
2991
2992
/**
2993
 * Returns `true' if there is at least one handler in `handlers'.
2994
 */
2995
static bool
2996
iproto_req_handler_is_set(struct iproto_req_handlers *handlers)
2997
0
{
2998
0
  if (handlers == NULL)
2999
0
    return false;
3000
3001
0
  return handlers->event_by_id != NULL ||
3002
0
         handlers->event_by_name != NULL ||
3003
0
         handlers->c.cb != NULL;
3004
0
}
3005
3006
/**
3007
 * Returns `enum iproto_type' if `name' is a valid IPROTO type name or equals
3008
 * "unknown". Otherwise, iproto_type_MAX is returned. The name is expected to
3009
 * be in lowercase.
3010
 */
3011
static enum iproto_type
3012
get_iproto_type_by_name(const char *name)
3013
0
{
3014
0
  for (uint32_t i = 0; i < iproto_type_MAX; i++) {
3015
0
    const char *type_name = iproto_type_name_lower(i);
3016
0
    if (type_name != NULL && strcmp(type_name, name) == 0)
3017
0
      return (enum iproto_type)i;
3018
0
  }
3019
0
  if (strcmp(name, "unknown") == 0)
3020
0
    return IPROTO_UNKNOWN;
3021
0
  return iproto_type_MAX;
3022
0
}
3023
3024
/**
3025
 * Runs triggers registered for the `event'.
3026
 * The given header and body the IPROTO packet are passed as trigger args.
3027
 * Returns IPROTO_HANDLER_OK if some trigger successfully handled the request,
3028
 * IPROTO_HANDLER_FALLBACK if no triggers handled the request, or
3029
 * IPROTO_HANDLER_ERROR on failure.
3030
 */
3031
static enum iproto_handler_status
3032
tx_run_override_triggers(struct event *event, const char *header,
3033
       const char *header_end, const char *body,
3034
       const char *body_end)
3035
0
{
3036
0
  enum iproto_handler_status rc = IPROTO_HANDLER_FALLBACK;
3037
0
  const char *name = NULL;
3038
0
  struct func_adapter *trigger = NULL;
3039
0
  struct func_adapter_ctx ctx;
3040
0
  struct event_trigger_iterator it;
3041
0
  event_trigger_iterator_create(&it, event);
3042
3043
0
  while (event_trigger_iterator_next(&it, &trigger, &name)) {
3044
0
    struct mp_ctx mp_ctx_header, mp_ctx_body;
3045
0
    mp_ctx_create_default(&mp_ctx_header, iproto_key_translation);
3046
0
    mp_ctx_create_default(&mp_ctx_body, iproto_key_translation);
3047
3048
0
    func_adapter_begin(trigger, &ctx);
3049
0
    func_adapter_push_msgpack_with_ctx(trigger, &ctx, header,
3050
0
               header_end, &mp_ctx_header);
3051
0
    func_adapter_push_msgpack_with_ctx(trigger, &ctx, body,
3052
0
               body_end, &mp_ctx_body);
3053
0
    if (func_adapter_call(trigger, &ctx) == 0) {
3054
0
      if (func_adapter_is_bool(trigger, &ctx)) {
3055
0
        bool ok = false;
3056
0
        func_adapter_pop_bool(trigger, &ctx, &ok);
3057
0
        if (ok)
3058
0
          rc = IPROTO_HANDLER_OK;
3059
0
      } else {
3060
0
        diag_set(ClientError, ER_PROC_LUA,
3061
0
           "Invalid Lua IPROTO handler return "
3062
0
           "type: expected boolean");
3063
0
        rc = IPROTO_HANDLER_ERROR;
3064
0
      }
3065
0
    } else {
3066
0
      rc = IPROTO_HANDLER_ERROR;
3067
0
    }
3068
0
    func_adapter_end(trigger, &ctx);
3069
0
    if (rc != IPROTO_HANDLER_FALLBACK)
3070
0
      break;
3071
0
  }
3072
0
  event_trigger_iterator_destroy(&it);
3073
0
  return rc;
3074
0
}
3075
3076
/**
3077
 * Process a request using overridden handlers (or the unknown request handler
3078
 * as a last resort).
3079
 */
3080
static void
3081
tx_process_override(struct cmsg *m)
3082
0
{
3083
0
  struct iproto_msg *msg = tx_accept_msg(m);
3084
0
  const char *header = msg->reqstart;
3085
0
  mp_decode_uint(&header);
3086
3087
0
  const char *header_end = msg->reqstart + msg->len;
3088
0
  const char *body = "\x80"; /* Empty MsgPack map encoding. */
3089
0
  const char *body_end = body + 1;
3090
0
  if (msg->header.bodycnt != 0) {
3091
0
    assert(msg->header.bodycnt == 1);
3092
0
    header_end -= msg->header.body[0].iov_len;
3093
0
    body = (const char *)msg->header.body[0].iov_base;
3094
0
    body_end = body + msg->header.body[0].iov_len;
3095
0
  }
3096
3097
  /*
3098
   * If we took the `override_route', there must exist either request
3099
   * type-specific or unknown request type handler. Their availability
3100
   * is checked by the IPROTO thread.
3101
   */
3102
0
  struct iproto_req_handlers *handlers;
3103
0
  handlers = mh_req_handlers_get(msg->header.type);
3104
0
  if (handlers == NULL)
3105
0
    handlers = mh_req_handlers_get(IPROTO_UNKNOWN);
3106
0
  assert(handlers != NULL);
3107
0
  enum iproto_handler_status rc = IPROTO_HANDLER_FALLBACK;
3108
3109
  /*
3110
   * Run handlers from the event registry, set by request type id.
3111
   */
3112
0
  if (handlers->event_by_id != NULL) {
3113
0
    rc = tx_run_override_triggers(handlers->event_by_id, header,
3114
0
                header_end, body, body_end);
3115
0
  }
3116
  /*
3117
   * Run handlers from the event registry, set by request type name.
3118
   */
3119
0
  if (rc == IPROTO_HANDLER_FALLBACK && handlers->event_by_name != NULL) {
3120
0
    rc = tx_run_override_triggers(handlers->event_by_name, header,
3121
0
                header_end, body, body_end);
3122
0
  }
3123
  /*
3124
   * Run C handlers.
3125
   */
3126
0
  if (rc == IPROTO_HANDLER_FALLBACK && handlers->c.cb != NULL) {
3127
0
    rc = handlers->c.cb(header, header_end, body, body_end,
3128
0
            handlers->c.ctx);
3129
0
  }
3130
3131
0
  struct cmsg_hop *route = NULL;
3132
0
  switch (rc) {
3133
0
  case IPROTO_HANDLER_OK: {
3134
0
    struct obuf *out = msg->connection->tx.p_obuf;
3135
0
    iproto_wpos_create(&msg->wpos, out);
3136
0
    struct obuf_svp empty = obuf_create_svp(out);
3137
0
    tx_end_msg(msg, &empty);
3138
0
    return;
3139
0
  }
3140
0
  case IPROTO_HANDLER_FALLBACK: {
3141
0
    int rc = iproto_msg_decode(msg, &route);
3142
0
    assert(route != NULL);
3143
0
    if (rc != 0)
3144
0
      route = NULL;
3145
0
    FALLTHROUGH;
3146
0
  }
3147
0
  case IPROTO_HANDLER_ERROR:
3148
0
    break;
3149
0
  default:
3150
0
    unreachable();
3151
0
  }
3152
0
  if (route != NULL) {
3153
0
    assert(m->hop[1].f == route[1].f);
3154
0
    route->f(m);
3155
0
    return;
3156
0
  }
3157
0
  struct obuf_svp svp = obuf_create_svp(msg->connection->tx.p_obuf);
3158
0
  tx_reply_error(msg);
3159
0
  tx_end_msg(msg, &svp);
3160
0
}
3161
3162
static void
3163
iproto_msg_finish_processing_in_stream(struct iproto_msg *msg)
3164
0
{
3165
0
  struct iproto_connection *con = msg->connection;
3166
0
  struct iproto_stream *stream = msg->stream;
3167
3168
0
  if (stream == NULL)
3169
0
    return;
3170
3171
0
  assert(stream->current == msg);
3172
0
  stream->current = NULL;
3173
3174
0
  if (stailq_empty(&stream->pending_requests)) {
3175
    /*
3176
     * If no more messages for the current stream
3177
     * and no transaction started, then delete it.
3178
     */
3179
0
    if (stream->txn == NULL) {
3180
0
      struct mh_i64ptr_node_t node = { stream->id, NULL };
3181
0
      mh_i64ptr_remove(con->streams, &node, 0);
3182
0
      iproto_stream_delete(stream);
3183
0
    } else if (con->state != IPROTO_CONNECTION_ALIVE) {
3184
      /*
3185
       * Here we are in case when connection was closed,
3186
       * there is no messages in stream queue, but there
3187
       * is some active transaction in stream.
3188
       * Send disconnect message to rollback this
3189
       * transaction.
3190
       */
3191
0
      iproto_stream_rollback_on_disconnect(stream);
3192
0
    }
3193
0
  } else {
3194
    /*
3195
     * If there are new messages for this stream
3196
     * then schedule their processing.
3197
     */
3198
0
    stream->current =
3199
0
      stailq_shift_entry(&stream->pending_requests,
3200
0
             struct iproto_msg,
3201
0
             in_stream);
3202
0
    assert(stream->current != NULL);
3203
0
    stream->current->wpos = con->wpos;
3204
0
    con->iproto_thread->requests_in_stream_queue--;
3205
0
    cpipe_push_input(&con->iproto_thread->tx_pipe,
3206
0
         &stream->current->base);
3207
0
    cpipe_flush_input(&con->iproto_thread->tx_pipe);
3208
0
  }
3209
0
}
3210
3211
static void
3212
net_send_msg(struct cmsg *m)
3213
0
{
3214
0
  struct iproto_msg *msg = (struct iproto_msg *) m;
3215
0
  struct iproto_connection *con = msg->connection;
3216
3217
0
  iproto_msg_finish_processing_in_stream(msg);
3218
0
  if (msg->len != 0) {
3219
    /* Discard request (see iproto_enqueue_batch()). */
3220
0
    iproto_msg_finish_input(msg);
3221
0
  } else {
3222
    /* Already discarded by net_discard_input(). */
3223
0
    assert(con->long_poll_count > 0);
3224
0
    con->long_poll_count--;
3225
0
  }
3226
0
  con->wend = msg->wpos;
3227
3228
0
  if (con->state == IPROTO_CONNECTION_ALIVE) {
3229
0
    iproto_connection_feed_output(con);
3230
0
  } else if (iproto_connection_is_idle(con)) {
3231
0
    iproto_connection_close(con);
3232
0
  }
3233
0
  iproto_msg_delete(msg);
3234
0
}
3235
3236
/**
3237
 * Complete sending an iproto error:
3238
 * recycle the error object and flush output.
3239
 */
3240
static void
3241
net_send_error(struct cmsg *m)
3242
0
{
3243
0
  struct iproto_msg *msg = (struct iproto_msg *) m;
3244
  /* Recycle the exception. */
3245
0
  diag_move(&msg->diag, &fiber()->diag);
3246
0
  net_send_msg(m);
3247
0
}
3248
3249
static void
3250
net_end_join(struct cmsg *m)
3251
0
{
3252
0
  struct iproto_msg *msg = (struct iproto_msg *) m;
3253
0
  struct iproto_connection *con = msg->connection;
3254
0
  struct ibuf *ibuf = msg->p_ibuf;
3255
3256
0
  iproto_msg_finish_input(msg);
3257
0
  iproto_msg_delete(msg);
3258
3259
0
  assert(! ev_is_active(&con->input));
3260
0
  con->is_in_replication = false;
3261
3262
0
  if (con->is_drop_pending) {
3263
0
    iproto_connection_close(con);
3264
0
    return;
3265
0
  }
3266
  /*
3267
   * Enqueue any messages if they are in the readahead
3268
   * queue. Will simply start input otherwise.
3269
   */
3270
0
  if (iproto_enqueue_batch(con, ibuf) != 0)
3271
0
    iproto_connection_close(con);
3272
0
}
3273
3274
static void
3275
net_end_subscribe(struct cmsg *m)
3276
0
{
3277
0
  struct iproto_msg *msg = (struct iproto_msg *) m;
3278
0
  struct iproto_connection *con = msg->connection;
3279
3280
0
  iproto_msg_finish_input(msg);
3281
0
  iproto_msg_delete(msg);
3282
3283
0
  assert(! ev_is_active(&con->input));
3284
3285
0
  iproto_connection_close(con);
3286
0
}
3287
3288
/**
3289
 * Handshake a connection: invoke the on-connect trigger
3290
 * and possibly authenticate. Try to send the client an error
3291
 * upon a failure.
3292
 */
3293
static void
3294
tx_process_connect(struct cmsg *m)
3295
0
{
3296
0
  struct iproto_msg *msg = (struct iproto_msg *) m;
3297
0
  struct iproto_connection *con = msg->connection;
3298
0
  struct obuf *out = msg->connection->tx.p_obuf;
3299
0
  if (msg->connect.session != NULL) {
3300
0
    con->session = msg->connect.session;
3301
0
    session_set_type(con->session, SESSION_TYPE_BINARY);
3302
0
  } else {
3303
0
    con->session = session_new(SESSION_TYPE_BINARY);
3304
0
  }
3305
0
  con->session->meta.connection = con;
3306
0
  session_set_peer_addr(con->session, &msg->connect.addr,
3307
0
            msg->connect.addrlen);
3308
0
  iproto_features_create(&con->session->meta.features);
3309
0
  tx_fiber_init(con->session, 0);
3310
0
  char *greeting = (char *)static_alloc(IPROTO_GREETING_SIZE);
3311
  /* TODO: dirty read from tx thread */
3312
0
  struct tt_uuid uuid = INSTANCE_UUID;
3313
0
  random_bytes(con->salt, IPROTO_SALT_SIZE);
3314
0
  greeting_encode(greeting, tarantool_version_id(), &uuid,
3315
0
      con->salt, IPROTO_SALT_SIZE);
3316
0
  xobuf_dup(out, greeting, IPROTO_GREETING_SIZE);
3317
0
  if (session_run_on_connect_triggers(con->session) != 0)
3318
0
    goto error;
3319
0
  iproto_wpos_create(&msg->wpos, out);
3320
0
  return;
3321
0
error:
3322
0
  tx_reply_error(msg);
3323
0
  msg->close_connection = true;
3324
0
}
3325
3326
/**
3327
 * Send a response to connect to the client or close the
3328
 * connection in case on_connect trigger failed.
3329
 */
3330
static void
3331
net_send_greeting(struct cmsg *m)
3332
0
{
3333
0
  struct iproto_msg *msg = (struct iproto_msg *) m;
3334
0
  struct iproto_connection *con = msg->connection;
3335
0
  if (con->is_drop_pending) {
3336
0
    iproto_connection_close(con);
3337
0
    iproto_msg_delete(msg);
3338
0
    return;
3339
0
  }
3340
0
  if (msg->close_connection) {
3341
0
    struct obuf *out = msg->wpos.obuf;
3342
0
    int64_t nwr = iostream_writev(&con->io, out->iov,
3343
0
                obuf_iovcnt(out));
3344
0
    if (nwr > 0) {
3345
      /* Count statistics. */
3346
0
      rmean_collect(con->iproto_thread->rmean,
3347
0
              IPROTO_SENT, nwr);
3348
0
    } else if (nwr == IOSTREAM_ERROR) {
3349
0
      diag_log();
3350
0
    }
3351
0
    assert(iproto_connection_is_idle(con));
3352
0
    iproto_connection_close(con);
3353
0
    iproto_msg_delete(msg);
3354
0
    return;
3355
0
  }
3356
0
  con->is_established = true;
3357
0
  con->wend = msg->wpos;
3358
  /*
3359
   * Connect is synchronous, so no one could have been
3360
   * messing up with the connection while it was in
3361
   * progress.
3362
   */
3363
0
  assert(con->state == IPROTO_CONNECTION_ALIVE);
3364
  /* Handshake OK, start reading input. */
3365
0
  iproto_connection_feed_output(con);
3366
0
  iproto_msg_delete(msg);
3367
0
}
3368
3369
/** }}} */
3370
3371
/**
3372
 * Create a connection and start input.
3373
 *
3374
 * If session is NULL, a new session object will be created for the connection
3375
 * in the TX thread.
3376
 *
3377
 * The function takes ownership of the passed IO stream and session.
3378
 */
3379
static void
3380
iproto_thread_accept(struct iproto_thread *iproto_thread, struct iostream *io,
3381
         struct sockaddr *addr, socklen_t addrlen,
3382
         struct session *session)
3383
0
{
3384
0
  struct iproto_connection *con = iproto_connection_new(iproto_thread);
3385
0
  struct iproto_msg *msg = iproto_msg_new(con);
3386
0
  assert(addrlen <= sizeof(msg->connect.addrstorage));
3387
0
  memcpy(&msg->connect.addrstorage, addr, addrlen);
3388
0
  msg->connect.addrlen = addrlen;
3389
0
  msg->connect.session = session;
3390
0
  iostream_move(&con->io, io);
3391
0
  cmsg_init(&msg->base, iproto_thread->connect_route);
3392
0
  msg->p_ibuf = con->p_ibuf;
3393
0
  msg->wpos = con->wpos;
3394
0
  cpipe_push(&iproto_thread->tx_pipe, &msg->base);
3395
0
}
3396
3397
static void
3398
iproto_on_accept_cb(struct evio_service *service, struct iostream *io,
3399
        struct sockaddr *addr, socklen_t addrlen)
3400
0
{
3401
0
  struct iproto_thread *iproto_thread =
3402
0
    (struct iproto_thread *)service->on_accept_param;
3403
0
  iproto_thread_accept(iproto_thread, io, addr, addrlen,
3404
           /*session=*/NULL);
3405
0
}
3406
3407
/**
3408
 * The network io thread main function:
3409
 * begin serving the message bus.
3410
 */
3411
static int
3412
net_cord_f(va_list  ap)
3413
0
{
3414
0
  struct iproto_thread *iproto_thread =
3415
0
    va_arg(ap, struct iproto_thread *);
3416
3417
0
  mempool_create(&iproto_thread->iproto_msg_pool, &cord()->slabc,
3418
0
           sizeof(struct iproto_msg));
3419
0
  mempool_create(&iproto_thread->iproto_connection_pool, &cord()->slabc,
3420
0
           sizeof(struct iproto_connection));
3421
0
  mempool_create(&iproto_thread->iproto_stream_pool, &cord()->slabc,
3422
0
           sizeof(struct iproto_stream));
3423
3424
0
  evio_service_create(loop(), &iproto_thread->binary, "binary",
3425
0
          iproto_on_accept_cb, iproto_thread);
3426
3427
0
  char endpoint_name[ENDPOINT_NAME_MAX];
3428
0
  snprintf(endpoint_name, ENDPOINT_NAME_MAX, "net%u",
3429
0
     iproto_thread->id);
3430
3431
0
  struct cbus_endpoint endpoint;
3432
  /* Create "net" endpoint. */
3433
0
  cbus_endpoint_create(&endpoint, endpoint_name,
3434
0
           fiber_schedule_cb, fiber());
3435
  /* Create a pipe to "tx" thread. */
3436
0
  cpipe_create(&iproto_thread->tx_pipe, "tx");
3437
0
  cpipe_set_max_input(&iproto_thread->tx_pipe, iproto_msg_max / 2);
3438
3439
  /* Process incomming messages. */
3440
0
  cbus_loop(&endpoint);
3441
3442
0
  cbus_endpoint_destroy(&endpoint, cbus_process);
3443
0
  cpipe_destroy(&iproto_thread->tx_pipe);
3444
0
  evio_service_detach(&iproto_thread->binary);
3445
3446
0
  mempool_destroy(&iproto_thread->iproto_stream_pool);
3447
0
  mempool_destroy(&iproto_thread->iproto_connection_pool);
3448
0
  mempool_destroy(&iproto_thread->iproto_msg_pool);
3449
0
  return 0;
3450
0
}
3451
3452
int
3453
iproto_session_fd(struct session *session)
3454
0
{
3455
0
  struct iproto_connection *con =
3456
0
    (struct iproto_connection *) session->meta.connection;
3457
0
  return con->io.fd;
3458
0
}
3459
3460
int64_t
3461
iproto_session_sync(struct session *session)
3462
0
{
3463
0
  (void) session;
3464
0
  assert(session == fiber()->storage.session);
3465
0
  return fiber()->storage.net.sync;
3466
0
}
3467
3468
/** {{{ IPROTO_PUSH implementation. */
3469
3470
static void
3471
iproto_process_push(struct cmsg *m)
3472
0
{
3473
0
  struct iproto_kharon *kharon = (struct iproto_kharon *) m;
3474
0
  struct iproto_connection *con =
3475
0
    container_of(kharon, struct iproto_connection, kharon);
3476
0
  con->wend = kharon->wpos;
3477
0
  kharon->wpos = con->wpos;
3478
0
  if (con->state == IPROTO_CONNECTION_ALIVE)
3479
0
    iproto_connection_feed_output(con);
3480
0
}
3481
3482
/**
3483
 * Send to iproto thread a notification about new pushes.
3484
 * @param con iproto connection.
3485
 */
3486
static void
3487
tx_begin_push(struct iproto_connection *con)
3488
0
{
3489
0
  assert(! con->tx.is_push_sent);
3490
0
  cmsg_init(&con->kharon.base, con->iproto_thread->push_route);
3491
0
  iproto_wpos_create(&con->kharon.wpos, con->tx.p_obuf);
3492
0
  con->tx.is_push_pending = false;
3493
0
  con->tx.is_push_sent = true;
3494
0
  cpipe_push(&con->iproto_thread->net_pipe,
3495
0
       (struct cmsg *) &con->kharon);
3496
0
}
3497
3498
static void
3499
tx_end_push(struct cmsg *m)
3500
0
{
3501
0
  struct iproto_kharon *kharon = (struct iproto_kharon *) m;
3502
0
  struct iproto_connection *con =
3503
0
    container_of(kharon, struct iproto_connection, kharon);
3504
0
  tx_accept_wpos(con, &kharon->wpos);
3505
0
  con->tx.is_push_sent = false;
3506
0
  if (con->tx.is_push_pending)
3507
0
    tx_begin_push(con);
3508
0
}
3509
3510
/**
3511
 * Asynchronously send response message using Kharon facility.
3512
 */
3513
static void
3514
tx_push(struct iproto_connection *con, struct obuf_svp *svp)
3515
0
{
3516
0
  flightrec_write_response(con->tx.p_obuf, svp);
3517
0
  if (!con->tx.is_push_sent)
3518
0
    tx_begin_push(con);
3519
0
  else
3520
0
    con->tx.is_push_pending = true;
3521
0
}
3522
3523
/**
3524
 * Push a message from @a port to a remote client.
3525
 * @param session iproto session.
3526
 * @param port Port with data to send.
3527
 *
3528
 * @retval -1 Memory error.
3529
 * @retval  0 Success, a message is written to the output buffer.
3530
 *            We don't wait here that the push has reached the
3531
 *            client: the output buffer is flushed asynchronously.
3532
 */
3533
static int
3534
iproto_session_push(struct session *session, struct port *port)
3535
0
{
3536
0
  struct iproto_connection *con =
3537
0
    (struct iproto_connection *) session->meta.connection;
3538
0
  struct obuf_svp svp;
3539
0
  iproto_prepare_select(con->tx.p_obuf, &svp);
3540
0
  if (port_dump_msgpack(port, con->tx.p_obuf) < 0) {
3541
0
    obuf_rollback_to_svp(con->tx.p_obuf, &svp);
3542
0
    return -1;
3543
0
  }
3544
0
  iproto_reply_chunk(con->tx.p_obuf, &svp, iproto_session_sync(session),
3545
0
         ::schema_version);
3546
0
  tx_push(con, &svp);
3547
0
  return 0;
3548
0
}
3549
3550
/**
3551
 * Sends a notification to a remote watcher when a key is updated.
3552
 * Uses IPROTO_PUSH (kharon) infrastructure to signal the iproto thread
3553
 * about new data.
3554
 */
3555
static void
3556
iproto_session_notify(struct session *session, uint64_t sync,
3557
          const char *key, size_t key_len,
3558
          const char *data, const char *data_end)
3559
0
{
3560
0
  struct iproto_connection *con =
3561
0
    (struct iproto_connection *)session->meta.connection;
3562
0
  struct obuf *out = con->tx.p_obuf;
3563
0
  struct obuf_svp svp = obuf_create_svp(out);
3564
0
  iproto_send_event(out, sync, key, key_len, data, data_end);
3565
0
  tx_push(con, &svp);
3566
0
}
3567
3568
/** }}} */
3569
3570
/**
3571
 * Stops accepting new connections on shutdown.
3572
 */
3573
static int
3574
iproto_on_shutdown_f(void *arg)
3575
0
{
3576
0
  (void)arg;
3577
0
  fiber_set_name(fiber_self(), "iproto.shutdown");
3578
0
  iproto_is_shutting_down = true;
3579
0
  struct iproto_cfg_msg cfg_msg;
3580
0
  iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_SHUTDOWN);
3581
0
  for (int i = 0; i < iproto_threads_count; i++)
3582
0
    iproto_do_cfg(&iproto_threads[i], &cfg_msg);
3583
0
  evio_service_stop(&tx_binary);
3584
0
  return 0;
3585
0
}
3586
3587
static inline void
3588
iproto_thread_init_routes(struct iproto_thread *iproto_thread)
3589
0
{
3590
0
  iproto_thread->begin_route[0] =
3591
0
    { tx_process_begin, &iproto_thread->net_pipe };
3592
0
  iproto_thread->begin_route[1] =
3593
0
    { net_send_msg, NULL };
3594
0
  iproto_thread->commit_route[0] =
3595
0
    { tx_process_commit, &iproto_thread->net_pipe };
3596
0
  iproto_thread->commit_route[1] =
3597
0
    { net_send_msg, NULL };
3598
0
  iproto_thread->rollback_route[0] =
3599
0
    { tx_process_rollback, &iproto_thread->net_pipe };
3600
0
  iproto_thread->rollback_route[1] =
3601
0
    { net_send_msg, NULL };
3602
0
  iproto_thread->rollback_on_disconnect_route[0] =
3603
0
    { tx_process_rollback_on_disconnect,
3604
0
      &iproto_thread->net_pipe };
3605
0
  iproto_thread->rollback_on_disconnect_route[1] =
3606
0
    { net_finish_rollback_on_disconnect, NULL };
3607
0
  iproto_thread->destroy_route[0] =
3608
0
    { tx_process_destroy, &iproto_thread->net_pipe };
3609
0
  iproto_thread->destroy_route[1] =
3610
0
    { net_finish_destroy, NULL };
3611
0
  iproto_thread->disconnect_route[0] =
3612
0
    { tx_process_disconnect, &iproto_thread->net_pipe };
3613
0
  iproto_thread->disconnect_route[1] =
3614
0
    { net_finish_disconnect, NULL };
3615
0
  iproto_thread->misc_route[0] =
3616
0
    { tx_process_misc, &iproto_thread->net_pipe };
3617
0
  iproto_thread->misc_route[1] = { net_send_msg, NULL };
3618
0
  iproto_thread->call_route[0] =
3619
0
    { tx_process_call, &iproto_thread->net_pipe };
3620
0
  iproto_thread->call_route[1] = { net_send_msg, NULL };
3621
0
  iproto_thread->select_route[0] =
3622
0
    { tx_process_select, &iproto_thread->net_pipe };
3623
0
  iproto_thread->select_route[1] = { net_send_msg, NULL };
3624
0
  iproto_thread->process1_route[0] =
3625
0
    { tx_process1, &iproto_thread->net_pipe };
3626
0
  iproto_thread->process1_route[1] = { net_send_msg, NULL };
3627
0
  iproto_thread->sql_route[0] =
3628
0
    { tx_process_sql, &iproto_thread->net_pipe };
3629
0
  iproto_thread->sql_route[1] = { net_send_msg, NULL };
3630
0
  iproto_thread->join_route[0] =
3631
0
    { tx_process_replication, &iproto_thread->net_pipe };
3632
0
  iproto_thread->join_route[1] = { net_end_join, NULL };
3633
0
  iproto_thread->subscribe_route[0] =
3634
0
    { tx_process_replication, &iproto_thread->net_pipe };
3635
0
  iproto_thread->subscribe_route[1] = { net_end_subscribe, NULL };
3636
0
  iproto_thread->error_route[0] =
3637
0
    { tx_reply_iproto_error, &iproto_thread->net_pipe };
3638
0
  iproto_thread->error_route[1] = { net_send_error, NULL };
3639
0
  iproto_thread->push_route[0] =
3640
0
    { iproto_process_push, &iproto_thread->tx_pipe };
3641
0
  iproto_thread->push_route[1] = { tx_end_push, NULL };
3642
  /* IPROTO_OK */
3643
0
  iproto_thread->dml_route[0] = NULL;
3644
  /* IPROTO_SELECT */
3645
0
  iproto_thread->dml_route[1] = iproto_thread->select_route;
3646
  /* IPROTO_INSERT */
3647
0
  iproto_thread->dml_route[2] = iproto_thread->process1_route;
3648
  /* IPROTO_REPLACE */
3649
0
  iproto_thread->dml_route[3] = iproto_thread->process1_route;
3650
  /* IPROTO_UPDATE */
3651
0
  iproto_thread->dml_route[4] = iproto_thread->process1_route;
3652
  /* IPROTO_DELETE */
3653
0
  iproto_thread->dml_route[5] = iproto_thread->process1_route;
3654
   /* IPROTO_CALL_16 */
3655
0
  iproto_thread->dml_route[6] =  iproto_thread->call_route;
3656
  /* IPROTO_AUTH */
3657
0
  iproto_thread->dml_route[7] = iproto_thread->misc_route;
3658
  /* IPROTO_EVAL */
3659
0
  iproto_thread->dml_route[8] = iproto_thread->call_route;
3660
  /* IPROTO_UPSERT */
3661
0
  iproto_thread->dml_route[9] = iproto_thread->process1_route;
3662
  /* IPROTO_CALL */
3663
0
  iproto_thread->dml_route[10] = iproto_thread->call_route;
3664
  /* IPROTO_EXECUTE */
3665
0
  iproto_thread->dml_route[11] = iproto_thread->sql_route;
3666
  /* IPROTO_NOP */
3667
0
  iproto_thread->dml_route[12] = NULL;
3668
  /* IPROTO_PREPARE */
3669
0
  iproto_thread->dml_route[13] = iproto_thread->sql_route;
3670
0
  iproto_thread->connect_route[0] =
3671
0
    { tx_process_connect, &iproto_thread->net_pipe };
3672
0
  iproto_thread->connect_route[1] = { net_send_greeting, NULL };
3673
0
  iproto_thread->override_route[0] =
3674
0
    { tx_process_override, &iproto_thread->net_pipe };
3675
0
  iproto_thread->override_route[1] = { net_send_msg, NULL };
3676
0
};
3677
3678
static inline void
3679
iproto_thread_init(struct iproto_thread *iproto_thread)
3680
0
{
3681
0
  iproto_thread_init_routes(iproto_thread);
3682
0
  iproto_thread->req_handlers = mh_i32_new();
3683
0
  slab_cache_create(&iproto_thread->net_slabc, &runtime);
3684
  /* Init statistics counter */
3685
0
  iproto_thread->rmean = rmean_new(rmean_net_strings, RMEAN_NET_LAST);
3686
0
  iproto_thread->tx.rmean = rmean_new(rmean_tx_strings, RMEAN_TX_LAST);
3687
0
  rlist_create(&iproto_thread->stopped_connections);
3688
0
  iproto_thread->tx.requests_in_progress = 0;
3689
0
  iproto_thread->requests_in_stream_queue = 0;
3690
0
  rlist_create(&iproto_thread->connections);
3691
0
}
3692
3693
/**
3694
 * True for IPROTO request types that can be overridden.
3695
 */
3696
static bool
3697
is_iproto_override_supported(uint32_t req_type)
3698
0
{
3699
0
  switch (req_type) {
3700
0
  case IPROTO_JOIN:
3701
0
  case IPROTO_SUBSCRIBE:
3702
0
  case IPROTO_FETCH_SNAPSHOT:
3703
0
  case IPROTO_REGISTER:
3704
0
    return false;
3705
0
  default:
3706
0
    return true;
3707
0
  }
3708
0
}
3709
3710
/**
3711
 * If the `name' contains a valid name of an IPROTO overriding event, sets
3712
 * `req_type' and returns True. If the name contains correct prefix, but
3713
 * the request type is invalid, the error is logged with CRIT log level.
3714
 * `is_by_id' set to True if the request is overridden by id, False if by name.
3715
 */
3716
static bool
3717
get_iproto_type_from_event_name(const char *name, uint32_t *req_type,
3718
        bool *is_by_id)
3719
0
{
3720
0
  const char *prefix = "box.iproto.override";
3721
0
  const size_t prefix_len = strlen(prefix);
3722
0
  if (strncmp(name, prefix, prefix_len) != 0)
3723
0
    return false;
3724
3725
0
  const char *req_name = name + prefix_len;
3726
0
  const char *req_name_err = req_name;
3727
0
  if (*req_name == '.') {
3728
0
    *is_by_id = false;
3729
    /* Skip the dot. */
3730
0
    req_name++;
3731
0
    req_name_err = req_name;
3732
0
    *req_type = get_iproto_type_by_name(req_name);
3733
0
    if (*req_type == iproto_type_MAX)
3734
0
      goto err_bad_type;
3735
0
  } else if (*req_name == '[') {
3736
0
    *is_by_id = true;
3737
    /* Skip open bracket. */
3738
0
    req_name++;
3739
0
    if (!isdigit(*req_name) && *req_name != '-')
3740
0
      goto err_bad_type;
3741
0
    char *endptr;
3742
0
    *req_type = strtol(req_name, &endptr, 10);
3743
0
    if (endptr == req_name)
3744
0
      goto err_bad_type;
3745
    /*
3746
     * At least one digit is parsed.
3747
     * Check that the rest of the string equals "]".
3748
     */
3749
0
    if (*endptr != ']' || endptr[1] != 0)
3750
0
      goto err_bad_type;
3751
0
  } else {
3752
    /* Not in IPROTO override namespace. */
3753
0
    return false;
3754
0
  }
3755
3756
0
  if (!is_iproto_override_supported(*req_type)) {
3757
0
    say_crit("IPROTO request handler overriding does not support "
3758
0
       "`%s' request type", iproto_type_name(*req_type));
3759
0
    return false;
3760
0
  }
3761
0
  return true;
3762
3763
0
err_bad_type:
3764
0
  say_crit("The event `%s' is in IPROTO override namespace, but `%s' is "
3765
0
     "not a valid request type", name, req_name_err);
3766
0
  return false;
3767
0
}
3768
3769
/**
3770
 * Gets an arbitrary `event', checks its name, and adds it to `req_handlers' if
3771
 * it is a valid IPROTO overriding event.
3772
 * If the event name contains correct IPROTO overriding prefix, but the request
3773
 * type is invalid, the error is logged with CRIT log level.
3774
 */
3775
static bool
3776
iproto_override_event_init(struct event *event, void *arg)
3777
0
{
3778
0
  (void)arg;
3779
0
  uint32_t type;
3780
0
  bool is_by_id;
3781
0
  if (!get_iproto_type_from_event_name(event->name, &type, &is_by_id))
3782
0
    return true;
3783
3784
0
  struct iproto_req_handlers *handlers = mh_req_handlers_get(type);
3785
0
  if (handlers == NULL) {
3786
0
    handlers = iproto_req_handlers_new();
3787
0
    mh_req_handlers_put(type, handlers);
3788
0
  }
3789
0
  iproto_req_handlers_set_event(handlers, event, is_by_id);
3790
3791
0
  for (int i = 0; i < iproto_threads_count; i++) {
3792
0
    struct iproto_thread *iproto_thread = &iproto_threads[i];
3793
0
    mh_i32_put(iproto_thread->req_handlers, &type, NULL, NULL);
3794
0
  }
3795
0
  return true;
3796
0
}
3797
3798
/**
3799
 * Notifies IPROTO threads that a new request handler has been set.
3800
 */
3801
static void
3802
iproto_cfg_override(uint32_t req_type, bool is_set);
3803
3804
/**
3805
 * Calls iproto_cfg_override() and destroys the handlers when necessary.
3806
 */
3807
static void
3808
iproto_override_finish(struct iproto_req_handlers *handlers, uint32_t req_type,
3809
           bool old_is_set)
3810
0
{
3811
0
  bool new_is_set = iproto_req_handler_is_set(handlers);
3812
0
  if (new_is_set != old_is_set)
3813
0
    iproto_cfg_override(req_type, new_is_set);
3814
3815
0
  if (!new_is_set && handlers != NULL) {
3816
0
    mh_req_handlers_del(req_type);
3817
0
    iproto_req_handlers_delete(handlers);
3818
0
  }
3819
0
}
3820
3821
/**
3822
 * Trigger which is fired on any change in the event registry.
3823
 */
3824
static int
3825
trigger_on_change_iproto_notify(struct trigger *trigger, void *arg)
3826
0
{
3827
0
  (void)trigger;
3828
0
  uint32_t type;
3829
0
  bool is_by_id;
3830
0
  struct event *event = (struct event *)arg;
3831
0
  if (!get_iproto_type_from_event_name(event->name, &type, &is_by_id))
3832
0
    return 0;
3833
3834
0
  struct iproto_req_handlers *handlers;
3835
0
  handlers = mh_req_handlers_get(type);
3836
0
  bool is_set = iproto_req_handler_is_set(handlers);
3837
3838
0
  if (event_has_triggers(event)) {
3839
0
    if (handlers == NULL) {
3840
0
      handlers = iproto_req_handlers_new();
3841
0
      mh_req_handlers_put(type, handlers);
3842
0
    }
3843
0
    iproto_req_handlers_set_event(handlers, event, is_by_id);
3844
0
  } else {
3845
0
    iproto_req_handlers_del_event(handlers, is_by_id);
3846
0
  }
3847
3848
0
  iproto_override_finish(handlers, type, is_set);
3849
0
  return 0;
3850
0
}
3851
3852
TRIGGER(trigger_on_change, trigger_on_change_iproto_notify);
3853
3854
/** Initialize the iproto subsystem and start network io thread */
3855
void
3856
iproto_init(int threads_count)
3857
0
{
3858
0
  iproto_features_init();
3859
3860
0
  iproto_threads_count = 0;
3861
0
  struct session_vtab iproto_session_vtab = {
3862
0
    /* .push = */ iproto_session_push,
3863
0
    /* .fd = */ iproto_session_fd,
3864
0
    /* .sync = */ iproto_session_sync,
3865
0
  };
3866
  /*
3867
   * We use this tx_binary only for bind, not for listen, so
3868
   * we don't need any accept functions.
3869
   */
3870
0
  evio_service_create(loop(), &tx_binary, "tx_binary", NULL, NULL);
3871
0
  iproto_threads = (struct iproto_thread *)
3872
0
    xcalloc(threads_count, sizeof(struct iproto_thread));
3873
0
  fiber_cond_create(&drop_finished_cond);
3874
3875
0
  for (int i = 0; i < threads_count; i++, iproto_threads_count++) {
3876
0
    struct iproto_thread *iproto_thread = &iproto_threads[i];
3877
0
    iproto_thread->id = i;
3878
0
    iproto_thread_init(iproto_thread);
3879
0
  }
3880
3881
  /*
3882
   * Go through all events with triggers, and initialize overridden
3883
   * request handlers that were registered before IPROTO initialization.
3884
   */
3885
0
  tx_req_handlers = mh_i32ptr_new();
3886
0
  event_foreach(iproto_override_event_init, NULL);
3887
3888
0
  for (int i = 0; i < threads_count; i++) {
3889
0
    struct iproto_thread *iproto_thread = &iproto_threads[i];
3890
0
    if (cord_costart(&iproto_thread->net_cord, "iproto",
3891
0
         net_cord_f, iproto_thread))
3892
0
      panic("failed to start iproto thread");
3893
    /* Create a pipe to "net" thread. */
3894
0
    char endpoint_name[ENDPOINT_NAME_MAX];
3895
0
    snprintf(endpoint_name, ENDPOINT_NAME_MAX, "net%u",
3896
0
       iproto_thread->id);
3897
0
    cpipe_create(&iproto_thread->net_pipe, endpoint_name);
3898
0
    cpipe_set_max_input(&iproto_thread->net_pipe,
3899
0
            iproto_msg_max / 2);
3900
0
  }
3901
3902
0
  session_vtab_registry[SESSION_TYPE_BINARY] = iproto_session_vtab;
3903
3904
0
  event_on_change(&trigger_on_change);
3905
0
  if (box_on_shutdown(NULL, iproto_on_shutdown_f, NULL) != 0)
3906
0
    panic("failed to set iproto shutdown trigger");
3907
0
}
3908
3909
static void
3910
iproto_fill_stat(struct iproto_thread *iproto_thread,
3911
     struct iproto_cfg_msg *cfg_msg)
3912
0
{
3913
0
  assert(cfg_msg->stats != NULL);
3914
0
  cfg_msg->stats->mem_used =
3915
0
    slab_cache_used(&iproto_thread->net_cord.slabc) +
3916
0
    slab_cache_used(&iproto_thread->net_slabc);
3917
0
  cfg_msg->stats->connections =
3918
0
    mempool_count(&iproto_thread->iproto_connection_pool);
3919
0
  cfg_msg->stats->streams =
3920
0
    mempool_count(&iproto_thread->iproto_stream_pool);
3921
0
  cfg_msg->stats->requests =
3922
0
    mempool_count(&iproto_thread->iproto_msg_pool);
3923
0
  cfg_msg->stats->requests_in_stream_queue =
3924
0
    iproto_thread->requests_in_stream_queue;
3925
0
}
3926
3927
static int
3928
iproto_do_cfg_f(struct cbus_call_msg *m)
3929
0
{
3930
0
  struct iproto_cfg_msg *cfg_msg = (struct iproto_cfg_msg *) m;
3931
0
  struct iproto_thread *iproto_thread = cfg_msg->iproto_thread;
3932
0
  struct mh_i32_t *req_handlers = iproto_thread->req_handlers;
3933
0
  struct evio_service *binary = &iproto_thread->binary;
3934
0
  switch (cfg_msg->op) {
3935
0
  case IPROTO_CFG_MSG_MAX: {
3936
0
    cpipe_set_max_input(&iproto_thread->tx_pipe,
3937
0
            cfg_msg->iproto_msg_max / 2);
3938
0
    int old = iproto_msg_max;
3939
0
    iproto_msg_max = cfg_msg->iproto_msg_max;
3940
0
    if (old < iproto_msg_max)
3941
0
      iproto_resume(iproto_thread);
3942
0
    break;
3943
0
  }
3944
0
  case IPROTO_CFG_START:
3945
0
    if (iproto_thread->is_shutting_down)
3946
0
      break;
3947
0
    evio_service_attach(binary, &tx_binary);
3948
0
    break;
3949
0
  case IPROTO_CFG_SHUTDOWN:
3950
0
    iproto_thread->is_shutting_down = true;
3951
0
    FALLTHROUGH;
3952
0
  case IPROTO_CFG_STOP:
3953
0
    evio_service_detach(binary);
3954
0
    break;
3955
0
  case IPROTO_CFG_RESTART:
3956
0
    evio_service_detach(binary);
3957
0
    evio_service_attach(binary, &tx_binary);
3958
0
    break;
3959
0
  case IPROTO_CFG_STAT:
3960
0
    iproto_fill_stat(iproto_thread, cfg_msg);
3961
0
    break;
3962
0
  case IPROTO_CFG_OVERRIDE:
3963
0
    if (cfg_msg->override.is_set) {
3964
0
      uint32_t old;
3965
0
      uint32_t *replaced = &old;
3966
0
      mh_i32_put(req_handlers, &cfg_msg->override.req_type,
3967
0
           &replaced, NULL);
3968
0
      assert(replaced == NULL);
3969
0
    } else {
3970
0
      mh_int_t k = mh_i32_find(req_handlers,
3971
0
             cfg_msg->override.req_type,
3972
0
             NULL);
3973
0
      assert(k != mh_end(req_handlers));
3974
0
      mh_i32_del(req_handlers, k, NULL);
3975
0
    }
3976
0
    break;
3977
0
  case IPROTO_CFG_SESSION_NEW: {
3978
0
    struct iostream *io = &cfg_msg->session_new.io;
3979
0
    struct session *session = cfg_msg->session_new.session;
3980
0
    struct sockaddr_storage addrstorage;
3981
0
    struct sockaddr *addr = (struct sockaddr *)&addrstorage;
3982
0
    socklen_t addrlen = sizeof(addrstorage);
3983
0
    if (sio_getpeername(io->fd, addr, &addrlen) != 0)
3984
0
      addrlen = 0;
3985
0
    iproto_thread_accept(iproto_thread, io, addr, addrlen, session);
3986
0
    break;
3987
0
  }
3988
0
  case IPROTO_CFG_DROP_CONNECTIONS: {
3989
0
    struct iproto_connection *con;
3990
0
    static const struct cmsg_hop cancel_route[1] =
3991
0
        {{ tx_process_cancel_inprogress, NULL }};
3992
0
    iproto_thread->drop_pending_connection_count = 0;
3993
0
    rlist_foreach_entry(con, &iproto_thread->connections,
3994
0
            in_connections) {
3995
      /*
3996
       * Replication IO is done outside iproto so we
3997
       * cannot close them as usual. Anyway we cancel
3998
       * replication fibers as well and close connection
3999
       * after replication is breaked.
4000
       *
4001
       * Do not close connection that is not yet
4002
       * established. Otherwise session
4003
       * on_connect/on_disconnect callbacks may be
4004
       * executed in reverse order in case of yields
4005
       * in on_connect callbacks.
4006
       */
4007
0
      if (!con->is_in_replication &&
4008
0
          con->state == IPROTO_CONNECTION_ALIVE &&
4009
0
          con->is_established)
4010
0
        iproto_connection_close(con);
4011
      /*
4012
       * Do not wait deletion of connection that called
4013
       * iproto_drop_connections to avoid deadlock.
4014
       */
4015
0
      if (con != cfg_msg->drop_connections.owner) {
4016
0
        con->is_drop_pending = true;
4017
0
        con->drop_generation =
4018
0
          cfg_msg->drop_connections.generation;
4019
0
        iproto_thread->drop_pending_connection_count++;
4020
0
      }
4021
0
      if (con->state != IPROTO_CONNECTION_DESTROYED) {
4022
0
        cmsg_init(&con->cancel_msg, cancel_route);
4023
0
        cpipe_push(&iproto_thread->tx_pipe,
4024
0
             &con->cancel_msg);
4025
0
      }
4026
0
    }
4027
0
    if (iproto_thread->drop_pending_connection_count == 0)
4028
0
      iproto_send_drop_finished(
4029
0
        iproto_thread,
4030
0
        cfg_msg->drop_connections.generation);
4031
0
    break;
4032
0
  }
4033
0
  default:
4034
0
    unreachable();
4035
0
  }
4036
0
  return 0;
4037
0
}
4038
4039
static void
4040
iproto_do_cfg(struct iproto_thread *iproto_thread, struct iproto_cfg_msg *msg)
4041
0
{
4042
0
  msg->iproto_thread = iproto_thread;
4043
0
  int rc = cbus_call(&iproto_thread->net_pipe, &iproto_thread->tx_pipe,
4044
0
         msg, iproto_do_cfg_f);
4045
0
  assert(rc == 0);
4046
0
  (void)rc;
4047
0
}
4048
4049
static int
4050
iproto_do_cfg_async_free_f(struct cbus_call_msg *m)
4051
0
{
4052
0
  free(m);
4053
0
  return 0;
4054
0
}
4055
4056
/**
4057
 * Sends a configuration message to an IPROTO thread without waiting for
4058
 * completion.
4059
 *
4060
 * The message must be allocated with malloc.
4061
 */
4062
static void
4063
iproto_do_cfg_async(struct iproto_thread *iproto_thread,
4064
        struct iproto_cfg_msg *msg)
4065
0
{
4066
0
  msg->iproto_thread = iproto_thread;
4067
0
  cbus_call_async(&iproto_thread->net_pipe, &iproto_thread->tx_pipe,
4068
0
      msg, iproto_do_cfg_f, iproto_do_cfg_async_free_f);
4069
0
}
4070
4071
/** Send IPROTO_CFG_STOP to all threads. */
4072
static void
4073
iproto_send_stop_msg(void)
4074
0
{
4075
0
  struct iproto_cfg_msg cfg_msg;
4076
0
  iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_STOP);
4077
0
  for (int i = 0; i < iproto_threads_count; i++)
4078
0
    iproto_do_cfg(&iproto_threads[i], &cfg_msg);
4079
0
}
4080
4081
/** Send IPROTO_CFG_START to all threads. */
4082
static void
4083
iproto_send_start_msg(void)
4084
0
{
4085
0
  struct iproto_cfg_msg cfg_msg;
4086
0
  iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_START);
4087
0
  for (int i = 0; i < iproto_threads_count; i++)
4088
0
    iproto_do_cfg(&iproto_threads[i], &cfg_msg);
4089
0
}
4090
4091
int
4092
iproto_drop_connections(double timeout)
4093
0
{
4094
0
  static struct latch latch = LATCH_INITIALIZER(latch);
4095
0
  latch_lock(&latch);
4096
0
  struct iproto_connection *owner = NULL;
4097
0
  struct session *session = fiber_get_session(fiber());
4098
0
  if (session != NULL && session->type == SESSION_TYPE_BINARY)
4099
0
    owner = (struct iproto_connection *)session->meta.connection;
4100
0
  drop_generation++;
4101
0
  drop_pending_thread_count = iproto_threads_count;
4102
0
  for (int i = 0; i < iproto_threads_count; i++) {
4103
0
    struct iproto_cfg_msg *cfg_msg =
4104
0
      (struct iproto_cfg_msg *)xmalloc(sizeof(*cfg_msg));
4105
0
    iproto_cfg_msg_create(cfg_msg, IPROTO_CFG_DROP_CONNECTIONS);
4106
0
    cfg_msg->drop_connections.owner = owner;
4107
0
    cfg_msg->drop_connections.generation = drop_generation;
4108
0
    iproto_do_cfg_async(&iproto_threads[i], cfg_msg);
4109
0
  }
4110
4111
0
  double deadline = ev_monotonic_now(loop()) + timeout;
4112
0
  while (drop_pending_thread_count != 0) {
4113
0
    if (fiber_cond_wait_deadline(&drop_finished_cond,
4114
0
               deadline) != 0)
4115
0
      break;
4116
0
  }
4117
0
  latch_unlock(&latch);
4118
0
  return drop_pending_thread_count == 0 ? 0 : -1;
4119
0
}
4120
4121
/** Send IPROTO_CFG_RESTART to all threads. */
4122
static void
4123
iproto_send_restart_msg(void)
4124
0
{
4125
0
  struct iproto_cfg_msg cfg_msg;
4126
0
  iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_RESTART);
4127
0
  for (int i = 0; i < iproto_threads_count; i++)
4128
0
    iproto_do_cfg(&iproto_threads[i], &cfg_msg);
4129
0
}
4130
4131
int
4132
iproto_listen(const struct uri_set *uri_set)
4133
0
{
4134
  /*
4135
   * No need to rebind IPROTO ports in case the configuration is
4136
   * the same. However, we should still reload the URIs because
4137
   * a URI parameter may store a path to a file (for example,
4138
   * an SSL certificate), which could change.
4139
   */
4140
0
  if (uri_set_is_equal(uri_set, &iproto_uris)) {
4141
0
    if (evio_service_reload_uris(&tx_binary) != 0)
4142
0
      return -1;
4143
0
    iproto_send_restart_msg();
4144
0
    return 0;
4145
0
  }
4146
  /*
4147
   * Note that we set iproto_uris before trying to bind so even if
4148
   * we fail, iproto_uris will still contain the new configuration.
4149
   * It's okay because box.cfg.listen is reverted on failure at
4150
   * the box.cfg level.
4151
   */
4152
0
  uri_set_destroy(&iproto_uris);
4153
0
  uri_set_copy(&iproto_uris, uri_set);
4154
0
  iproto_send_stop_msg();
4155
0
  evio_service_stop(&tx_binary);
4156
0
  struct errinj *inj = errinj(ERRINJ_IPROTO_CFG_LISTEN, ERRINJ_INT);
4157
0
  if (inj != NULL && inj->iparam > 0) {
4158
0
    inj->iparam--;
4159
0
    diag_set(ClientError, ER_INJECTION, "iproto listen");
4160
0
    return -1;
4161
0
  }
4162
  /*
4163
   * Please note, we bind sockets in main thread, and then
4164
   * listen these sockets in all iproto threads! With this
4165
   * implementation, we rely on the Linux kernel to distribute
4166
   * incoming connections across iproto threads.
4167
   */
4168
0
  if (evio_service_start(&tx_binary, uri_set) != 0)
4169
0
    return -1;
4170
0
  iproto_send_start_msg();
4171
0
  return 0;
4172
0
}
4173
4174
static void
4175
iproto_stats_add(struct iproto_stats *total_stats,
4176
     struct iproto_stats *thread_stats)
4177
0
{
4178
0
  total_stats->mem_used += thread_stats->mem_used;
4179
0
  total_stats->connections += thread_stats->connections;
4180
0
  total_stats->streams += thread_stats->streams;
4181
0
  total_stats->requests += thread_stats->requests;
4182
0
  total_stats->requests_in_stream_queue +=
4183
0
    thread_stats->requests_in_stream_queue;
4184
0
  total_stats->requests_in_progress +=
4185
0
    thread_stats->requests_in_progress;
4186
0
}
4187
4188
void
4189
iproto_stats_get(struct iproto_stats *stats)
4190
0
{
4191
0
  struct iproto_stats thread_stats;
4192
0
  memset(stats, 0, sizeof(iproto_stats));
4193
0
  for (int i = 0; i < iproto_threads_count; i++) {
4194
0
    iproto_thread_stats_get(&thread_stats, i);
4195
0
    iproto_stats_add(stats, &thread_stats);
4196
0
  }
4197
0
}
4198
4199
void
4200
iproto_thread_stats_get(struct iproto_stats *stats, int thread_id)
4201
0
{
4202
0
  memset(stats, 0, sizeof(iproto_stats));
4203
0
  struct iproto_cfg_msg cfg_msg;
4204
0
  iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_STAT);
4205
0
  assert(thread_id >= 0 && thread_id < iproto_threads_count);
4206
0
  cfg_msg.stats = stats;
4207
0
  iproto_do_cfg(&iproto_threads[thread_id], &cfg_msg);
4208
0
  stats->requests_in_progress =
4209
0
    iproto_threads[thread_id].tx.requests_in_progress;
4210
0
}
4211
4212
void
4213
iproto_reset_stat(void)
4214
0
{
4215
0
  for (int i = 0; i < iproto_threads_count; i++) {
4216
0
    rmean_cleanup(iproto_threads[i].rmean);
4217
0
    rmean_cleanup(iproto_threads[i].tx.rmean);
4218
0
  }
4219
0
}
4220
4221
int
4222
iproto_set_msg_max(int new_iproto_msg_max)
4223
0
{
4224
0
  if (new_iproto_msg_max < IPROTO_MSG_MAX_MIN) {
4225
0
    diag_set(ClientError, ER_CFG, "net_msg_max",
4226
0
       tt_sprintf("minimal value is %d", IPROTO_MSG_MAX_MIN));
4227
0
    return -1;
4228
0
  }
4229
0
  struct iproto_cfg_msg cfg_msg;
4230
0
  iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_MSG_MAX);
4231
0
  cfg_msg.iproto_msg_max = new_iproto_msg_max;
4232
0
  for (int i = 0; i < iproto_threads_count; i++) {
4233
0
    iproto_do_cfg(&iproto_threads[i], &cfg_msg);
4234
0
    cpipe_set_max_input(&iproto_threads[i].net_pipe,
4235
0
            new_iproto_msg_max / 2);
4236
0
  }
4237
0
  return 0;
4238
0
}
4239
4240
int
4241
iproto_session_new(struct iostream *io, struct user *user, uint64_t *sid)
4242
0
{
4243
0
  assert(iostream_is_initialized(io));
4244
0
  if (iproto_is_shutting_down) {
4245
0
    diag_set(ClientError, ER_SHUTDOWN);
4246
0
    return -1;
4247
0
  }
4248
0
  struct session *session = session_new(SESSION_TYPE_BACKGROUND);
4249
0
  if (user != NULL)
4250
0
    credentials_reset(&session->credentials, user);
4251
0
  struct iproto_cfg_msg *cfg_msg =
4252
0
    (struct iproto_cfg_msg *)xmalloc(sizeof(*cfg_msg));
4253
0
  iproto_cfg_msg_create(cfg_msg, IPROTO_CFG_SESSION_NEW);
4254
0
  iostream_move(&cfg_msg->session_new.io, io);
4255
0
  cfg_msg->session_new.session = session;
4256
0
  static int thread = 0;
4257
0
  thread = (thread + 1) % iproto_threads_count;
4258
0
  iproto_do_cfg_async(&iproto_threads[thread], cfg_msg);
4259
0
  *sid = session->id;
4260
0
  return 0;
4261
0
}
4262
4263
static void
4264
iproto_cfg_override(uint32_t req_type, bool is_set)
4265
0
{
4266
0
  struct iproto_cfg_msg cfg_msg;
4267
0
  iproto_cfg_msg_create(&cfg_msg, IPROTO_CFG_OVERRIDE);
4268
0
  cfg_msg.override.req_type = req_type;
4269
0
  cfg_msg.override.is_set = is_set;
4270
0
  for (int i = 0; i < iproto_threads_count; ++i)
4271
0
    iproto_do_cfg(&iproto_threads[i], &cfg_msg);
4272
0
}
4273
4274
int
4275
iproto_session_send(struct session *session,
4276
        const char *header, const char *header_end,
4277
        const char *body, const char *body_end)
4278
0
{
4279
0
  assert(session->type == SESSION_TYPE_BINARY);
4280
0
  struct iproto_connection *con =
4281
0
    (struct iproto_connection *)session->meta.connection;
4282
0
  if (con->state != IPROTO_CONNECTION_ALIVE) {
4283
0
    diag_set(ClientError, ER_SESSION_CLOSED);
4284
0
    return -1;
4285
0
  }
4286
4287
0
  struct obuf *out = con->tx.p_obuf;
4288
0
  struct obuf_svp svp = obuf_create_svp(out);
4289
0
  ptrdiff_t header_size = header_end - header;
4290
0
  ptrdiff_t body_size = body_end - body;
4291
0
  ptrdiff_t packet_size = 5 + header_size + body_size;
4292
0
  char *p = (char *)xobuf_alloc(out, packet_size);
4293
0
  *(p++) = INT8_C(0xce);
4294
0
  p = mp_store_u32(p, packet_size - 5);
4295
0
  memcpy(p, header, header_size);
4296
0
  p += header_size;
4297
0
  memcpy(p, body, body_size);
4298
0
  tx_push(con, &svp);
4299
  /*
4300
   * The control yield is solely for enforcing the fact this function
4301
   * yields — in the future we may implement back pressure based on this.
4302
   */
4303
0
  fiber_sleep(0);
4304
0
  return 0;
4305
0
}
4306
4307
int
4308
iproto_shutdown(double timeout)
4309
0
{
4310
0
  assert(iproto_is_shutting_down);
4311
0
  return iproto_drop_connections(timeout);
4312
0
}
4313
4314
void
4315
iproto_free(void)
4316
0
{
4317
0
  for (int i = 0; i < iproto_threads_count; i++) {
4318
0
    cbus_stop_loop(&iproto_threads[i].net_pipe);
4319
0
    cpipe_destroy(&iproto_threads[i].net_pipe);
4320
0
    if (cord_join(&iproto_threads[i].net_cord) != 0)
4321
0
      panic_syserror("iproto cord join failed");
4322
0
    mh_i32_delete(iproto_threads[i].req_handlers);
4323
    /*
4324
     * Close socket descriptor to prevent hot standby instance
4325
     * failing to bind in case it tries to bind before socket
4326
     * is closed by OS.
4327
     */
4328
0
    evio_service_detach(&iproto_threads[i].binary);
4329
0
    rmean_delete(iproto_threads[i].rmean);
4330
0
    rmean_delete(iproto_threads[i].tx.rmean);
4331
0
    slab_cache_destroy(&iproto_threads[i].net_slabc);
4332
0
  }
4333
0
  free(iproto_threads);
4334
4335
0
  mh_int_t i;
4336
0
  mh_foreach(tx_req_handlers, i) {
4337
0
    struct mh_i32ptr_node_t *node =
4338
0
      mh_i32ptr_node(tx_req_handlers, i);
4339
0
    struct iproto_req_handlers *handlers =
4340
0
      (struct iproto_req_handlers *)node->val;
4341
0
    iproto_req_handlers_delete(handlers);
4342
0
  }
4343
0
  mh_i32ptr_delete(tx_req_handlers);
4344
0
  fiber_cond_destroy(&drop_finished_cond);
4345
4346
  /*
4347
   * Here we close sockets and unlink all unix socket paths.
4348
   * in case it's unix sockets.
4349
   */
4350
0
  evio_service_stop(&tx_binary);
4351
0
}
4352
4353
static int
4354
iproto_thread_rmean_foreach_impl(struct rmean *rmean, void *cb, void *cb_ctx)
4355
0
{
4356
0
  int rc = 0;
4357
0
  for (size_t i = 0; i < rmean->stats_n; i++) {
4358
0
    int64_t mean = rmean_mean(rmean, i);
4359
0
    int64_t total = rmean_total(rmean, i);
4360
0
    if (((rmean_cb)cb)(rmean->stats[i].name, mean,
4361
0
           total, cb_ctx) != 0)
4362
0
      rc = 1;
4363
0
  }
4364
0
  return rc;
4365
0
}
4366
4367
/**
4368
 * We use offset of rmean in struct iproto_thread, instead of pointer to
4369
 * rmean, because we should iterate over all same rmeans for all iproto
4370
 * threads.
4371
 */
4372
static int
4373
iproto_rmean_foreach_impl(ptrdiff_t rmean_offset, void *cb, void *cb_ctx)
4374
0
{
4375
0
  struct rmean *rmean0 =
4376
0
    *(struct rmean **)((char *)&iproto_threads[0] + rmean_offset);
4377
0
  for (size_t i = 0; i < rmean0->stats_n; i++) {
4378
0
    int64_t mean = 0;
4379
0
    int64_t total = 0;
4380
0
    for (int j = 0; j < iproto_threads_count; j++) {
4381
0
      struct rmean *rmean =
4382
0
        *(struct rmean **)
4383
0
        ((char *)&iproto_threads[j] + rmean_offset);
4384
0
      assert(rmean == iproto_threads[j].rmean ||
4385
0
             rmean == iproto_threads[j].tx.rmean);
4386
0
      mean += rmean_mean(rmean, i);
4387
0
      total += rmean_total(rmean, i);
4388
0
    }
4389
0
    int rc = ((rmean_cb)cb)(rmean0->stats[i].name, mean,
4390
0
          total, cb_ctx);
4391
0
    if (rc != 0)
4392
0
      return rc;
4393
0
  }
4394
0
  return 0;
4395
0
}
4396
4397
int
4398
iproto_rmean_foreach(void *cb, void *cb_ctx)
4399
0
{
4400
0
  int rc;
4401
0
  rc = iproto_rmean_foreach_impl(offsetof(struct iproto_thread, rmean),
4402
0
               cb, cb_ctx);
4403
0
  if (rc != 0)
4404
0
    return rc;
4405
0
  rc = iproto_rmean_foreach_impl(offsetof(struct iproto_thread, tx.rmean),
4406
0
               cb, cb_ctx);
4407
0
  if (rc != 0)
4408
0
    return rc;
4409
0
  return 0;
4410
0
}
4411
4412
int
4413
iproto_thread_rmean_foreach(int thread_id, void *cb, void *cb_ctx)
4414
0
{
4415
0
  assert(thread_id >= 0 && thread_id < iproto_threads_count);
4416
0
  int rc = 0;
4417
0
  if (iproto_thread_rmean_foreach_impl(iproto_threads[thread_id].rmean,
4418
0
          cb, cb_ctx) != 0)
4419
0
    rc = 1;
4420
0
  if (iproto_thread_rmean_foreach_impl(iproto_threads[thread_id].tx.rmean,
4421
0
          cb, cb_ctx) != 0)
4422
0
    rc = 1;
4423
0
  return rc;
4424
0
}
4425
4426
int
4427
iproto_override(uint32_t req_type, iproto_handler_t cb,
4428
    iproto_handler_destroy_t destroy, void *ctx)
4429
0
{
4430
0
  if (!is_iproto_override_supported(req_type)) {
4431
0
    const char *feature = tt_sprintf("%s request type",
4432
0
             iproto_type_name(req_type));
4433
0
    diag_set(ClientError, ER_UNSUPPORTED,
4434
0
       "IPROTO request handler overriding", feature);
4435
0
    return -1;
4436
0
  }
4437
4438
0
  struct iproto_req_handlers *handlers;
4439
0
  handlers = mh_req_handlers_get(req_type);
4440
0
  bool is_set = iproto_req_handler_is_set(handlers);
4441
4442
0
  if (handlers != NULL && handlers->c.destroy != NULL)
4443
0
    handlers->c.destroy(handlers->c.ctx);
4444
4445
0
  if (cb != NULL) {
4446
0
    if (handlers == NULL) {
4447
0
      handlers = iproto_req_handlers_new();
4448
0
      mh_req_handlers_put(req_type, handlers);
4449
0
    }
4450
0
    handlers->c.cb = cb;
4451
0
    handlers->c.destroy = destroy;
4452
0
    handlers->c.ctx = ctx;
4453
0
  } else if (handlers != NULL) {
4454
0
    handlers->c.cb = NULL;
4455
0
    handlers->c.destroy = NULL;
4456
0
    handlers->c.ctx = NULL;
4457
0
  }
4458
4459
0
  iproto_override_finish(handlers, req_type, is_set);
4460
0
  return 0;
4461
0
}