Coverage Report

Created: 2025-10-10 06:57

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/librabbitmq/librabbitmq/amqp_socket.c
Line
Count
Source
1
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
2
// SPDX-License-Identifier: mit
3
4
#ifdef HAVE_CONFIG_H
5
#include "config.h"
6
#endif
7
8
#ifdef _MSC_VER
9
#define _CRT_SECURE_NO_WARNINGS
10
#endif
11
12
#include "amqp_private.h"
13
#include "amqp_socket.h"
14
#include "amqp_table.h"
15
#include "amqp_time.h"
16
17
#include <assert.h>
18
#include <limits.h>
19
#include <stdarg.h>
20
#include <stdint.h>
21
#include <stdio.h>
22
#include <stdlib.h>
23
#include <string.h>
24
25
#include <errno.h>
26
27
#if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__)))
28
#ifndef WIN32_LEAN_AND_MEAN
29
#define WIN32_LEAN_AND_MEAN
30
#endif
31
#include <winsock2.h>
32
#include <ws2tcpip.h>
33
#else
34
#include <sys/types.h>
35
/* On older BSD types.h must come before net includes */
36
#include <netinet/in.h>
37
#include <netinet/tcp.h>
38
#ifdef HAVE_SELECT
39
#include <sys/select.h>
40
#endif
41
#include <fcntl.h>
42
#include <netdb.h>
43
#include <sys/socket.h>
44
#include <sys/uio.h>
45
#ifdef HAVE_POLL
46
#include <poll.h>
47
#endif
48
#include <unistd.h>
49
#endif
50
51
static int amqp_id_in_reply_list(amqp_method_number_t expected,
52
                                 amqp_method_number_t *list);
53
54
0
static int amqp_os_socket_init(void) {
55
#ifdef _WIN32
56
  static int called_wsastartup = 0;
57
  if (!called_wsastartup) {
58
    WSADATA data;
59
    int res = WSAStartup(0x0202, &data);
60
    if (res) {
61
      return AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR;
62
    }
63
64
    called_wsastartup = 1;
65
  }
66
  return AMQP_STATUS_OK;
67
68
#else
69
0
  return AMQP_STATUS_OK;
70
0
#endif
71
0
}
72
73
0
int amqp_os_socket_error(void) {
74
#ifdef _WIN32
75
  return WSAGetLastError();
76
#else
77
0
  return errno;
78
0
#endif
79
0
}
80
81
0
int amqp_os_socket_close(int sockfd) {
82
#ifdef _WIN32
83
  return closesocket(sockfd);
84
#else
85
0
  return close(sockfd);
86
0
#endif
87
0
}
88
89
ssize_t amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len,
90
0
                         int flags) {
91
0
  assert(self);
92
0
  assert(self->klass->send);
93
0
  return self->klass->send(self, buf, len, flags);
94
0
}
95
96
ssize_t amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len,
97
0
                         int flags) {
98
0
  assert(self);
99
0
  assert(self->klass->recv);
100
0
  return self->klass->recv(self, buf, len, flags);
101
0
}
102
103
0
int amqp_socket_open(amqp_socket_t *self, const char *host, int port) {
104
0
  assert(self);
105
0
  assert(self->klass->open);
106
0
  return self->klass->open(self, host, port, NULL);
107
0
}
108
109
int amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port,
110
0
                             const struct timeval *timeout) {
111
0
  assert(self);
112
0
  assert(self->klass->open);
113
0
  return self->klass->open(self, host, port, timeout);
114
0
}
115
116
0
int amqp_socket_close(amqp_socket_t *self, amqp_socket_close_enum force) {
117
0
  assert(self);
118
0
  assert(self->klass->close);
119
0
  return self->klass->close(self, force);
120
0
}
121
122
0
void amqp_socket_delete(amqp_socket_t *self) {
123
0
  if (self) {
124
0
    assert(self->klass->delete);
125
0
    self->klass->delete (self);
126
0
  }
127
0
}
128
129
0
int amqp_socket_get_sockfd(amqp_socket_t *self) {
130
0
  assert(self);
131
0
  assert(self->klass->get_sockfd);
132
0
  return self->klass->get_sockfd(self);
133
0
}
134
135
0
int amqp_poll(int fd, int event, amqp_time_t deadline) {
136
0
#ifdef HAVE_POLL
137
0
  struct pollfd pfd;
138
0
  int res;
139
0
  int timeout_ms;
140
141
  /* Function should only ever be called with one of these two */
142
0
  assert(event == AMQP_SF_POLLIN || event == AMQP_SF_POLLOUT);
143
144
0
start_poll:
145
0
  pfd.fd = fd;
146
0
  switch (event) {
147
0
    case AMQP_SF_POLLIN:
148
0
      pfd.events = POLLIN;
149
0
      break;
150
0
    case AMQP_SF_POLLOUT:
151
0
      pfd.events = POLLOUT;
152
0
      break;
153
0
  }
154
155
0
  timeout_ms = amqp_time_ms_until(deadline);
156
0
  if (-1 > timeout_ms) {
157
0
    return timeout_ms;
158
0
  }
159
160
0
  res = poll(&pfd, 1, timeout_ms);
161
162
0
  if (0 < res) {
163
    /* TODO: optimize this a bit by returning the AMQP_STATUS_SOCKET_ERROR or
164
     * equivalent when pdf.revent is POLLHUP or POLLERR, so an extra syscall
165
     * doesn't need to be made. */
166
0
    return AMQP_STATUS_OK;
167
0
  } else if (0 == res) {
168
0
    return AMQP_STATUS_TIMEOUT;
169
0
  } else {
170
0
    switch (amqp_os_socket_error()) {
171
0
      case EINTR:
172
0
        goto start_poll;
173
0
      default:
174
0
        return AMQP_STATUS_SOCKET_ERROR;
175
0
    }
176
0
  }
177
#elif defined(HAVE_SELECT)
178
  fd_set fds;
179
  fd_set exceptfds;
180
  fd_set *exceptfdsp;
181
  int res;
182
  struct timeval tv;
183
  struct timeval *tvp;
184
185
  assert((0 != (event & AMQP_SF_POLLIN)) || (0 != (event & AMQP_SF_POLLOUT)));
186
#ifndef _WIN32
187
  /* On Win32 connect() failure is indicated through the exceptfds, it does not
188
   * make any sense to allow POLLERR on any other platform or condition */
189
  assert(0 == (event & AMQP_SF_POLLERR));
190
#endif
191
192
start_select:
193
  FD_ZERO(&fds);
194
  FD_SET(fd, &fds);
195
196
  if (event & AMQP_SF_POLLERR) {
197
    FD_ZERO(&exceptfds);
198
    FD_SET(fd, &exceptfds);
199
    exceptfdsp = &exceptfds;
200
  } else {
201
    exceptfdsp = NULL;
202
  }
203
204
  res = amqp_time_tv_until(deadline, &tv, &tvp);
205
  if (res != AMQP_STATUS_OK) {
206
    return res;
207
  }
208
209
  if (event & AMQP_SF_POLLIN) {
210
    res = select(fd + 1, &fds, NULL, exceptfdsp, tvp);
211
  } else if (event & AMQP_SF_POLLOUT) {
212
    res = select(fd + 1, NULL, &fds, exceptfdsp, tvp);
213
  }
214
215
  if (0 < res) {
216
    return AMQP_STATUS_OK;
217
  } else if (0 == res) {
218
    return AMQP_STATUS_TIMEOUT;
219
  } else {
220
    switch (amqp_os_socket_error()) {
221
      case EINTR:
222
        goto start_select;
223
      default:
224
        return AMQP_STATUS_SOCKET_ERROR;
225
    }
226
  }
227
#else
228
#error "poll() or select() is needed to compile rabbitmq-c"
229
#endif
230
0
}
231
232
static ssize_t do_poll(amqp_connection_state_t state, ssize_t res,
233
0
                       amqp_time_t deadline) {
234
0
  int fd = amqp_get_sockfd(state);
235
0
  if (-1 == fd) {
236
0
    return AMQP_STATUS_SOCKET_CLOSED;
237
0
  }
238
0
  switch (res) {
239
0
    case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
240
0
      res = amqp_poll(fd, AMQP_SF_POLLIN, deadline);
241
0
      break;
242
0
    case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
243
0
      res = amqp_poll(fd, AMQP_SF_POLLOUT, deadline);
244
0
      break;
245
0
  }
246
0
  return res;
247
0
}
248
249
ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf,
250
0
                      size_t len, amqp_time_t deadline, int flags) {
251
0
  ssize_t res;
252
0
  void *buf_left = (void *)buf;
253
  /* Assume that len is not going to be larger than ssize_t can hold. */
254
0
  ssize_t len_left = (size_t)len;
255
256
0
start_send:
257
0
  res = amqp_socket_send(state->socket, buf_left, len_left, flags);
258
259
0
  if (res > 0) {
260
0
    len_left -= res;
261
0
    buf_left = (char *)buf_left + res;
262
0
    if (0 == len_left) {
263
0
      return (ssize_t)len;
264
0
    }
265
0
    goto start_send;
266
0
  }
267
0
  res = do_poll(state, res, deadline);
268
0
  if (AMQP_STATUS_OK == res) {
269
0
    goto start_send;
270
0
  }
271
0
  if (AMQP_STATUS_TIMEOUT == res) {
272
0
    return (ssize_t)len - len_left;
273
0
  }
274
0
  return res;
275
0
}
276
277
0
int amqp_open_socket(char const *hostname, int portnumber) {
278
0
  return amqp_open_socket_inner(hostname, portnumber, amqp_time_infinite());
279
0
}
280
281
int amqp_open_socket_noblock(char const *hostname, int portnumber,
282
0
                             const struct timeval *timeout) {
283
0
  amqp_time_t deadline;
284
0
  int res = amqp_time_from_now(&deadline, timeout);
285
0
  if (AMQP_STATUS_OK != res) {
286
0
    return res;
287
0
  }
288
0
  return amqp_open_socket_inner(hostname, portnumber, deadline);
289
0
}
290
291
#ifdef _WIN32
292
static int connect_socket(struct addrinfo *addr, amqp_time_t deadline) {
293
  int one = 1;
294
  SOCKET sockfd;
295
  int last_error;
296
297
  /*
298
   * This cast is to squash warnings on Win64, see:
299
   * http://stackoverflow.com/questions/1953639/is-it-safe-to-cast-socket-to-int-under-win64
300
   */
301
302
  sockfd = (int)socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
303
  if (INVALID_SOCKET == sockfd) {
304
    return AMQP_STATUS_SOCKET_ERROR;
305
  }
306
307
  /* Set the socket to be non-blocking */
308
  if (SOCKET_ERROR == ioctlsocket(sockfd, FIONBIO, &one)) {
309
    last_error = AMQP_STATUS_SOCKET_ERROR;
310
    goto err;
311
  }
312
313
  /* Disable nagle */
314
  if (SOCKET_ERROR == setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY,
315
                                 (const char *)&one, sizeof(one))) {
316
    last_error = AMQP_STATUS_SOCKET_ERROR;
317
    goto err;
318
  }
319
320
  /* Enable TCP keepalives */
321
  if (SOCKET_ERROR == setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE,
322
                                 (const char *)&one, sizeof(one))) {
323
    last_error = AMQP_STATUS_SOCKET_ERROR;
324
    goto err;
325
  }
326
327
  if (SOCKET_ERROR != connect(sockfd, addr->ai_addr, (int)addr->ai_addrlen)) {
328
    return (int)sockfd;
329
  }
330
331
  if (WSAEWOULDBLOCK != WSAGetLastError()) {
332
    last_error = AMQP_STATUS_SOCKET_ERROR;
333
    goto err;
334
  }
335
336
  last_error =
337
      amqp_poll((int)sockfd, AMQP_SF_POLLOUT | AMQP_SF_POLLERR, deadline);
338
  if (AMQP_STATUS_OK != last_error) {
339
    goto err;
340
  }
341
342
  {
343
    int result;
344
    int result_len = sizeof(result);
345
346
    if (SOCKET_ERROR == getsockopt(sockfd, SOL_SOCKET, SO_ERROR,
347
                                   (char *)&result, &result_len) ||
348
        result != 0) {
349
      last_error = AMQP_STATUS_SOCKET_ERROR;
350
      goto err;
351
    }
352
  }
353
354
  return (int)sockfd;
355
356
err:
357
  closesocket(sockfd);
358
  return last_error;
359
}
360
#else
361
0
static int connect_socket(struct addrinfo *addr, amqp_time_t deadline) {
362
0
  int one = 1;
363
0
  int sockfd;
364
0
  int flags;
365
0
  int last_error;
366
367
0
  sockfd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
368
0
  if (-1 == sockfd) {
369
0
    return AMQP_STATUS_SOCKET_ERROR;
370
0
  }
371
372
  /* Enable CLOEXEC on socket */
373
0
  flags = fcntl(sockfd, F_GETFD);
374
0
  if (flags == -1 || fcntl(sockfd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) {
375
0
    last_error = AMQP_STATUS_SOCKET_ERROR;
376
0
    goto err;
377
0
  }
378
379
  /* Set the socket as non-blocking */
380
0
  flags = fcntl(sockfd, F_GETFL);
381
0
  if (flags == -1 || fcntl(sockfd, F_SETFL, (long)(flags | O_NONBLOCK)) == -1) {
382
0
    last_error = AMQP_STATUS_SOCKET_ERROR;
383
0
    goto err;
384
0
  }
385
386
#ifdef SO_NOSIGPIPE
387
  /* Turn off SIGPIPE on platforms that support it, BSD, MacOSX */
388
  if (0 != setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) {
389
    last_error = AMQP_STATUS_SOCKET_ERROR;
390
    goto err;
391
  }
392
#endif /* SO_NOSIGPIPE */
393
394
  /* Disable nagle */
395
0
  if (0 != setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) {
396
0
    last_error = AMQP_STATUS_SOCKET_ERROR;
397
0
    goto err;
398
0
  }
399
400
  /* Enable TCP keepalives */
401
0
  if (0 != setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one))) {
402
0
    last_error = AMQP_STATUS_SOCKET_ERROR;
403
0
    goto err;
404
0
  }
405
406
0
  if (0 == connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
407
0
    return sockfd;
408
0
  }
409
410
0
  if (EINPROGRESS != errno) {
411
0
    last_error = AMQP_STATUS_SOCKET_ERROR;
412
0
    goto err;
413
0
  }
414
415
0
  last_error = amqp_poll(sockfd, AMQP_SF_POLLOUT, deadline);
416
0
  if (AMQP_STATUS_OK != last_error) {
417
0
    goto err;
418
0
  }
419
420
0
  {
421
0
    int result;
422
0
    socklen_t result_len = sizeof(result);
423
424
0
    if (-1 == getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) ||
425
0
        result != 0) {
426
0
      last_error = AMQP_STATUS_SOCKET_ERROR;
427
0
      goto err;
428
0
    }
429
0
  }
430
431
0
  return sockfd;
432
433
0
err:
434
0
  close(sockfd);
435
0
  return last_error;
436
0
}
437
#endif
438
439
int amqp_open_socket_inner(char const *hostname, int portnumber,
440
0
                           amqp_time_t deadline) {
441
0
  struct addrinfo hint;
442
0
  struct addrinfo *address_list;
443
0
  struct addrinfo *addr;
444
0
  char portnumber_string[33];
445
0
  int sockfd = -1;
446
0
  int last_error;
447
448
0
  last_error = amqp_os_socket_init();
449
0
  if (AMQP_STATUS_OK != last_error) {
450
0
    return last_error;
451
0
  }
452
453
0
  memset(&hint, 0, sizeof(hint));
454
0
  hint.ai_family = PF_UNSPEC; /* PF_INET or PF_INET6 */
455
0
  hint.ai_socktype = SOCK_STREAM;
456
0
  hint.ai_protocol = IPPROTO_TCP;
457
458
0
  (void)sprintf(portnumber_string, "%d", portnumber);
459
460
0
  last_error = getaddrinfo(hostname, portnumber_string, &hint, &address_list);
461
0
  if (0 != last_error) {
462
0
    return AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED;
463
0
  }
464
465
0
  for (addr = address_list; addr; addr = addr->ai_next) {
466
0
    sockfd = connect_socket(addr, deadline);
467
468
0
    if (sockfd >= 0) {
469
0
      last_error = AMQP_STATUS_OK;
470
0
      break;
471
0
    } else if (sockfd == AMQP_STATUS_TIMEOUT) {
472
0
      last_error = sockfd;
473
0
      break;
474
0
    }
475
0
  }
476
477
0
  freeaddrinfo(address_list);
478
0
  if (last_error != AMQP_STATUS_OK || sockfd == -1) {
479
0
    return last_error;
480
0
  }
481
0
  return sockfd;
482
0
}
483
484
static int send_header_inner(amqp_connection_state_t state,
485
0
                             amqp_time_t deadline) {
486
0
  ssize_t res;
487
0
  static const uint8_t header[8] = {'A',
488
0
                                    'M',
489
0
                                    'Q',
490
0
                                    'P',
491
0
                                    0,
492
0
                                    AMQP_PROTOCOL_VERSION_MAJOR,
493
0
                                    AMQP_PROTOCOL_VERSION_MINOR,
494
0
                                    AMQP_PROTOCOL_VERSION_REVISION};
495
0
  res = amqp_try_send(state, header, sizeof(header), deadline, AMQP_SF_NONE);
496
0
  if (sizeof(header) == res) {
497
0
    return AMQP_STATUS_OK;
498
0
  }
499
0
  return (int)res;
500
0
}
501
502
0
int amqp_send_header(amqp_connection_state_t state) {
503
0
  return send_header_inner(state, amqp_time_infinite());
504
0
}
505
506
0
static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) {
507
0
  amqp_bytes_t res;
508
509
0
  switch (method) {
510
0
    case AMQP_SASL_METHOD_PLAIN:
511
0
      res = amqp_literal_bytes("PLAIN");
512
0
      break;
513
0
    case AMQP_SASL_METHOD_EXTERNAL:
514
0
      res = amqp_literal_bytes("EXTERNAL");
515
0
      break;
516
517
0
    default:
518
0
      amqp_abort("Invalid SASL method: %d", (int)method);
519
0
  }
520
521
0
  return res;
522
0
}
523
524
0
static int bytes_equal(amqp_bytes_t l, amqp_bytes_t r) {
525
0
  if (l.len == r.len) {
526
0
    if (l.bytes && r.bytes) {
527
0
      if (0 == memcmp(l.bytes, r.bytes, l.len)) {
528
0
        return 1;
529
0
      }
530
0
    }
531
0
  }
532
0
  return 0;
533
0
}
534
535
int sasl_mechanism_in_list(amqp_bytes_t mechanisms,
536
0
                           amqp_sasl_method_enum method) {
537
0
  amqp_bytes_t mechanism;
538
0
  amqp_bytes_t supported_mechanism;
539
0
  uint8_t *start;
540
0
  uint8_t *end;
541
0
  uint8_t *current;
542
543
0
  assert(NULL != mechanisms.bytes);
544
545
0
  mechanism = sasl_method_name(method);
546
547
0
  start = (uint8_t *)mechanisms.bytes;
548
0
  current = start;
549
0
  end = start + mechanisms.len;
550
551
0
  for (; current != end; start = current + 1) {
552
    /* HACK: SASL states that we should be parsing this string as a UTF-8
553
     * string, which we're plainly not doing here. At this point its not worth
554
     * dragging an entire UTF-8 parser for this one case, and this should work
555
     * most of the time */
556
0
    current = memchr(start, ' ', end - start);
557
0
    if (NULL == current) {
558
0
      current = end;
559
0
    }
560
0
    supported_mechanism.bytes = start;
561
0
    supported_mechanism.len = current - start;
562
0
    if (bytes_equal(mechanism, supported_mechanism)) {
563
0
      return 1;
564
0
    }
565
0
  }
566
567
0
  return 0;
568
0
}
569
570
static amqp_bytes_t sasl_response(amqp_pool_t *pool,
571
0
                                  amqp_sasl_method_enum method, va_list args) {
572
0
  amqp_bytes_t response;
573
574
0
  switch (method) {
575
0
    case AMQP_SASL_METHOD_PLAIN: {
576
0
      char *username = va_arg(args, char *);
577
0
      size_t username_len = strlen(username);
578
0
      char *password = va_arg(args, char *);
579
0
      size_t password_len = strlen(password);
580
0
      char *response_buf;
581
582
0
      amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2,
583
0
                            &response);
584
0
      if (response.bytes == NULL)
585
      /* We never request a zero-length block, because of the +2
586
         above, so a NULL here really is ENOMEM. */
587
0
      {
588
0
        return response;
589
0
      }
590
591
0
      response_buf = response.bytes;
592
0
      response_buf[0] = 0;
593
0
      memcpy(response_buf + 1, username, username_len);
594
0
      response_buf[username_len + 1] = 0;
595
0
      memcpy(response_buf + username_len + 2, password, password_len);
596
0
      break;
597
0
    }
598
0
    case AMQP_SASL_METHOD_EXTERNAL: {
599
0
      char *identity = va_arg(args, char *);
600
0
      size_t identity_len = strlen(identity);
601
602
0
      amqp_pool_alloc_bytes(pool, identity_len, &response);
603
0
      if (response.bytes == NULL) {
604
0
        return response;
605
0
      }
606
607
0
      memcpy(response.bytes, identity, identity_len);
608
0
      break;
609
0
    }
610
0
    default:
611
0
      amqp_abort("Invalid SASL method: %d", (int)method);
612
0
  }
613
614
0
  return response;
615
0
}
616
617
0
amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) {
618
0
  return (state->first_queued_frame != NULL);
619
0
}
620
621
/*
622
 * Check to see if we have data in our buffer. If this returns 1, we
623
 * will avoid an immediate blocking read in amqp_simple_wait_frame.
624
 */
625
0
amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) {
626
0
  return (state->sock_inbound_offset < state->sock_inbound_limit);
627
0
}
628
629
static int consume_one_frame(amqp_connection_state_t state,
630
0
                             amqp_frame_t *decoded_frame) {
631
0
  int res;
632
633
0
  amqp_bytes_t buffer;
634
0
  buffer.len = state->sock_inbound_limit - state->sock_inbound_offset;
635
0
  buffer.bytes =
636
0
      ((char *)state->sock_inbound_buffer.bytes) + state->sock_inbound_offset;
637
638
0
  res = amqp_handle_input(state, buffer, decoded_frame);
639
0
  if (res < 0) {
640
0
    return res;
641
0
  }
642
643
0
  state->sock_inbound_offset += res;
644
645
0
  return AMQP_STATUS_OK;
646
0
}
647
648
static int recv_with_timeout(amqp_connection_state_t state,
649
0
                             amqp_time_t timeout) {
650
0
  ssize_t res;
651
0
  int fd;
652
653
0
start_recv:
654
0
  res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes,
655
0
                         state->sock_inbound_buffer.len, 0);
656
657
0
  if (res < 0) {
658
0
    fd = amqp_get_sockfd(state);
659
0
    if (-1 == fd) {
660
0
      return AMQP_STATUS_CONNECTION_CLOSED;
661
0
    }
662
0
    switch (res) {
663
0
      default:
664
0
        return (int)res;
665
0
      case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
666
0
        res = amqp_poll(fd, AMQP_SF_POLLIN, timeout);
667
0
        break;
668
0
      case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
669
0
        res = amqp_poll(fd, AMQP_SF_POLLOUT, timeout);
670
0
        break;
671
0
    }
672
0
    if (AMQP_STATUS_OK == res) {
673
0
      goto start_recv;
674
0
    }
675
0
    return (int)res;
676
0
  }
677
678
0
  state->sock_inbound_limit = res;
679
0
  state->sock_inbound_offset = 0;
680
681
0
  res = amqp_time_s_from_now(&state->next_recv_heartbeat,
682
0
                             amqp_heartbeat_recv(state));
683
0
  if (AMQP_STATUS_OK != res) {
684
0
    return (int)res;
685
0
  }
686
0
  return AMQP_STATUS_OK;
687
0
}
688
689
0
int amqp_try_recv(amqp_connection_state_t state) {
690
0
  amqp_time_t timeout;
691
0
  int res;
692
693
0
  while (amqp_data_in_buffer(state)) {
694
0
    amqp_frame_t frame;
695
0
    res = consume_one_frame(state, &frame);
696
697
0
    if (AMQP_STATUS_OK != res) {
698
0
      return res;
699
0
    }
700
701
0
    if (frame.frame_type != 0) {
702
0
      amqp_pool_t *channel_pool;
703
0
      amqp_frame_t *frame_copy;
704
0
      amqp_link_t *link;
705
706
0
      channel_pool = amqp_get_or_create_channel_pool(state, frame.channel);
707
0
      if (NULL == channel_pool) {
708
0
        return AMQP_STATUS_NO_MEMORY;
709
0
      }
710
711
0
      frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
712
0
      link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
713
714
0
      if (frame_copy == NULL || link == NULL) {
715
0
        return AMQP_STATUS_NO_MEMORY;
716
0
      }
717
718
0
      *frame_copy = frame;
719
720
0
      link->next = NULL;
721
0
      link->data = frame_copy;
722
723
0
      if (state->last_queued_frame == NULL) {
724
0
        state->first_queued_frame = link;
725
0
      } else {
726
0
        state->last_queued_frame->next = link;
727
0
      }
728
0
      state->last_queued_frame = link;
729
0
    }
730
0
  }
731
0
  res = amqp_time_from_now(&timeout, &(struct timeval){0});
732
0
  if (AMQP_STATUS_OK != res) {
733
0
    return res;
734
0
  }
735
736
0
  return recv_with_timeout(state, timeout);
737
0
}
738
739
static int wait_frame_inner(amqp_connection_state_t state,
740
                            amqp_frame_t *decoded_frame,
741
0
                            amqp_time_t timeout_deadline) {
742
0
  amqp_time_t deadline;
743
0
  int res;
744
745
0
  for (;;) {
746
0
    while (amqp_data_in_buffer(state)) {
747
0
      res = consume_one_frame(state, decoded_frame);
748
749
0
      if (AMQP_STATUS_OK != res) {
750
0
        return res;
751
0
      }
752
753
0
      if (AMQP_FRAME_HEARTBEAT == decoded_frame->frame_type) {
754
0
        amqp_maybe_release_buffers_on_channel(state, 0);
755
0
        continue;
756
0
      }
757
758
0
      if (decoded_frame->frame_type != 0) {
759
        /* Complete frame was read. Return it. */
760
0
        return AMQP_STATUS_OK;
761
0
      }
762
0
    }
763
764
0
  beginrecv:
765
0
    res = amqp_time_has_past(state->next_send_heartbeat);
766
0
    if (AMQP_STATUS_TIMER_FAILURE == res) {
767
0
      return res;
768
0
    } else if (AMQP_STATUS_TIMEOUT == res) {
769
0
      amqp_frame_t heartbeat;
770
0
      heartbeat.channel = 0;
771
0
      heartbeat.frame_type = AMQP_FRAME_HEARTBEAT;
772
773
0
      res = amqp_send_frame(state, &heartbeat);
774
0
      if (AMQP_STATUS_OK != res) {
775
0
        return res;
776
0
      }
777
0
    }
778
0
    deadline = amqp_time_first(timeout_deadline,
779
0
                               amqp_time_first(state->next_recv_heartbeat,
780
0
                                               state->next_send_heartbeat));
781
782
    /* TODO this needs to wait for a _frame_ and not anything written from the
783
     * socket */
784
0
    res = recv_with_timeout(state, deadline);
785
786
0
    if (AMQP_STATUS_TIMEOUT == res) {
787
0
      if (amqp_time_equal(deadline, state->next_recv_heartbeat)) {
788
0
        amqp_socket_close(state->socket, AMQP_SC_FORCE);
789
0
        return AMQP_STATUS_HEARTBEAT_TIMEOUT;
790
0
      } else if (amqp_time_equal(deadline, timeout_deadline)) {
791
0
        return AMQP_STATUS_TIMEOUT;
792
0
      } else if (amqp_time_equal(deadline, state->next_send_heartbeat)) {
793
        /* send heartbeat happens before we do recv_with_timeout */
794
0
        goto beginrecv;
795
0
      } else {
796
0
        amqp_abort("Internal error: unable to determine timeout reason");
797
0
      }
798
0
    } else if (AMQP_STATUS_OK != res) {
799
0
      return res;
800
0
    }
801
0
  }
802
0
}
803
804
static amqp_link_t *amqp_create_link_for_frame(amqp_connection_state_t state,
805
0
                                               amqp_frame_t *frame) {
806
0
  amqp_link_t *link;
807
0
  amqp_frame_t *frame_copy;
808
809
0
  amqp_pool_t *channel_pool =
810
0
      amqp_get_or_create_channel_pool(state, frame->channel);
811
812
0
  if (NULL == channel_pool) {
813
0
    return NULL;
814
0
  }
815
816
0
  link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
817
0
  frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
818
819
0
  if (NULL == link || NULL == frame_copy) {
820
0
    return NULL;
821
0
  }
822
823
0
  *frame_copy = *frame;
824
0
  link->data = frame_copy;
825
826
0
  return link;
827
0
}
828
829
0
int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame) {
830
0
  amqp_link_t *link = amqp_create_link_for_frame(state, frame);
831
0
  if (NULL == link) {
832
0
    return AMQP_STATUS_NO_MEMORY;
833
0
  }
834
835
0
  if (NULL == state->first_queued_frame) {
836
0
    state->first_queued_frame = link;
837
0
  } else {
838
0
    state->last_queued_frame->next = link;
839
0
  }
840
841
0
  link->next = NULL;
842
0
  state->last_queued_frame = link;
843
844
0
  return AMQP_STATUS_OK;
845
0
}
846
847
0
int amqp_put_back_frame(amqp_connection_state_t state, amqp_frame_t *frame) {
848
0
  amqp_link_t *link = amqp_create_link_for_frame(state, frame);
849
0
  if (NULL == link) {
850
0
    return AMQP_STATUS_NO_MEMORY;
851
0
  }
852
853
0
  if (NULL == state->first_queued_frame) {
854
0
    state->first_queued_frame = link;
855
0
    state->last_queued_frame = link;
856
0
    link->next = NULL;
857
0
  } else {
858
0
    link->next = state->first_queued_frame;
859
0
    state->first_queued_frame = link;
860
0
  }
861
862
0
  return AMQP_STATUS_OK;
863
0
}
864
865
int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state,
866
                                      amqp_channel_t channel,
867
0
                                      amqp_frame_t *decoded_frame) {
868
0
  amqp_frame_t *frame_ptr;
869
0
  amqp_link_t *cur;
870
0
  int res;
871
872
0
  for (cur = state->first_queued_frame; NULL != cur; cur = cur->next) {
873
0
    frame_ptr = cur->data;
874
875
0
    if (channel == frame_ptr->channel) {
876
0
      state->first_queued_frame = cur->next;
877
0
      if (NULL == state->first_queued_frame) {
878
0
        state->last_queued_frame = NULL;
879
0
      }
880
881
0
      *decoded_frame = *frame_ptr;
882
883
0
      return AMQP_STATUS_OK;
884
0
    }
885
0
  }
886
887
0
  for (;;) {
888
0
    res = wait_frame_inner(state, decoded_frame, amqp_time_infinite());
889
890
0
    if (AMQP_STATUS_OK != res) {
891
0
      return res;
892
0
    }
893
894
0
    if (channel == decoded_frame->channel) {
895
0
      return AMQP_STATUS_OK;
896
0
    } else {
897
0
      res = amqp_queue_frame(state, decoded_frame);
898
0
      if (res != AMQP_STATUS_OK) {
899
0
        return res;
900
0
      }
901
0
    }
902
0
  }
903
0
}
904
905
int amqp_simple_wait_frame(amqp_connection_state_t state,
906
0
                           amqp_frame_t *decoded_frame) {
907
0
  return amqp_simple_wait_frame_noblock(state, decoded_frame, NULL);
908
0
}
909
910
int amqp_simple_wait_frame_noblock(amqp_connection_state_t state,
911
                                   amqp_frame_t *decoded_frame,
912
0
                                   const struct timeval *timeout) {
913
0
  amqp_time_t deadline;
914
915
0
  int res = amqp_time_from_now(&deadline, timeout);
916
0
  if (AMQP_STATUS_OK != res) {
917
0
    return res;
918
0
  }
919
920
0
  if (state->first_queued_frame != NULL) {
921
0
    amqp_frame_t *f = (amqp_frame_t *)state->first_queued_frame->data;
922
0
    state->first_queued_frame = state->first_queued_frame->next;
923
0
    if (state->first_queued_frame == NULL) {
924
0
      state->last_queued_frame = NULL;
925
0
    }
926
0
    *decoded_frame = *f;
927
0
    return AMQP_STATUS_OK;
928
0
  } else {
929
0
    return wait_frame_inner(state, decoded_frame, deadline);
930
0
  }
931
0
}
932
933
static int amqp_simple_wait_method_list(amqp_connection_state_t state,
934
                                        amqp_channel_t expected_channel,
935
                                        amqp_method_number_t *expected_methods,
936
                                        amqp_time_t deadline,
937
0
                                        amqp_method_t *output) {
938
0
  amqp_frame_t frame;
939
0
  struct timeval tv;
940
0
  struct timeval *tvp;
941
942
0
  int res = amqp_time_tv_until(deadline, &tv, &tvp);
943
0
  if (res != AMQP_STATUS_OK) {
944
0
    return res;
945
0
  }
946
947
0
  res = amqp_simple_wait_frame_noblock(state, &frame, tvp);
948
0
  if (AMQP_STATUS_OK != res) {
949
0
    return res;
950
0
  }
951
952
0
  if (AMQP_FRAME_METHOD != frame.frame_type ||
953
0
      expected_channel != frame.channel ||
954
0
      !amqp_id_in_reply_list(frame.payload.method.id, expected_methods)) {
955
0
    return AMQP_STATUS_WRONG_METHOD;
956
0
  }
957
0
  *output = frame.payload.method;
958
0
  return AMQP_STATUS_OK;
959
0
}
960
961
static int simple_wait_method_inner(amqp_connection_state_t state,
962
                                    amqp_channel_t expected_channel,
963
                                    amqp_method_number_t expected_method,
964
                                    amqp_time_t deadline,
965
0
                                    amqp_method_t *output) {
966
0
  amqp_method_number_t expected_methods[2];
967
0
  expected_methods[0] = expected_method;
968
0
  expected_methods[1] = 0;
969
0
  return amqp_simple_wait_method_list(state, expected_channel, expected_methods,
970
0
                                      deadline, output);
971
0
}
972
973
int amqp_simple_wait_method(amqp_connection_state_t state,
974
                            amqp_channel_t expected_channel,
975
                            amqp_method_number_t expected_method,
976
0
                            amqp_method_t *output) {
977
0
  return simple_wait_method_inner(state, expected_channel, expected_method,
978
0
                                  amqp_time_infinite(), output);
979
0
}
980
981
int amqp_send_method(amqp_connection_state_t state, amqp_channel_t channel,
982
0
                     amqp_method_number_t id, void *decoded) {
983
0
  return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE,
984
0
                                amqp_time_infinite());
985
0
}
986
987
int amqp_send_method_inner(amqp_connection_state_t state,
988
                           amqp_channel_t channel, amqp_method_number_t id,
989
0
                           void *decoded, int flags, amqp_time_t deadline) {
990
0
  amqp_frame_t frame;
991
992
0
  frame.frame_type = AMQP_FRAME_METHOD;
993
0
  frame.channel = channel;
994
0
  frame.payload.method.id = id;
995
0
  frame.payload.method.decoded = decoded;
996
0
  return amqp_send_frame_inner(state, &frame, flags, deadline);
997
0
}
998
999
static int amqp_id_in_reply_list(amqp_method_number_t expected,
1000
0
                                 amqp_method_number_t *list) {
1001
0
  while (*list != 0) {
1002
0
    if (*list == expected) {
1003
0
      return 1;
1004
0
    }
1005
0
    list++;
1006
0
  }
1007
0
  return 0;
1008
0
}
1009
1010
static amqp_rpc_reply_t simple_rpc_inner(
1011
    amqp_connection_state_t state, amqp_channel_t channel,
1012
    amqp_method_number_t request_id, amqp_method_number_t *expected_reply_ids,
1013
0
    void *decoded_request_method, amqp_time_t deadline) {
1014
0
  int status;
1015
0
  amqp_rpc_reply_t result;
1016
1017
0
  memset(&result, 0, sizeof(result));
1018
1019
0
  status = amqp_send_method(state, channel, request_id, decoded_request_method);
1020
0
  if (status < 0) {
1021
0
    return amqp_rpc_reply_error(status);
1022
0
  }
1023
1024
0
  {
1025
0
    amqp_frame_t frame;
1026
1027
0
  retry:
1028
0
    status = wait_frame_inner(state, &frame, deadline);
1029
0
    if (status != AMQP_STATUS_OK) {
1030
0
      if (status == AMQP_STATUS_TIMEOUT) {
1031
0
        amqp_socket_close(state->socket, AMQP_SC_FORCE);
1032
0
      }
1033
0
      return amqp_rpc_reply_error(status);
1034
0
    }
1035
1036
    /*
1037
     * We store the frame for later processing unless it's something
1038
     * that directly affects us here, namely a method frame that is
1039
     * either
1040
     *  - on the channel we want, and of the expected type, or
1041
     *  - on the channel we want, and a channel.close frame, or
1042
     *  - on channel zero, and a connection.close frame.
1043
     */
1044
0
    if (!((frame.frame_type == AMQP_FRAME_METHOD) &&
1045
0
          (((frame.channel == channel) &&
1046
0
            (amqp_id_in_reply_list(frame.payload.method.id,
1047
0
                                   expected_reply_ids) ||
1048
0
             (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) ||
1049
0
           ((frame.channel == 0) &&
1050
0
            (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD))))) {
1051
0
      amqp_pool_t *channel_pool;
1052
0
      amqp_frame_t *frame_copy;
1053
0
      amqp_link_t *link;
1054
1055
0
      channel_pool = amqp_get_or_create_channel_pool(state, frame.channel);
1056
0
      if (NULL == channel_pool) {
1057
0
        return amqp_rpc_reply_error(AMQP_STATUS_NO_MEMORY);
1058
0
      }
1059
1060
0
      frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
1061
0
      link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
1062
1063
0
      if (frame_copy == NULL || link == NULL) {
1064
0
        return amqp_rpc_reply_error(AMQP_STATUS_NO_MEMORY);
1065
0
      }
1066
1067
0
      *frame_copy = frame;
1068
1069
0
      link->next = NULL;
1070
0
      link->data = frame_copy;
1071
1072
0
      if (state->last_queued_frame == NULL) {
1073
0
        state->first_queued_frame = link;
1074
0
      } else {
1075
0
        state->last_queued_frame->next = link;
1076
0
      }
1077
0
      state->last_queued_frame = link;
1078
1079
0
      goto retry;
1080
0
    }
1081
1082
0
    result.reply_type =
1083
0
        (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids))
1084
0
            ? AMQP_RESPONSE_NORMAL
1085
0
            : AMQP_RESPONSE_SERVER_EXCEPTION;
1086
1087
0
    result.reply = frame.payload.method;
1088
0
    return result;
1089
0
  }
1090
0
}
1091
1092
amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
1093
                                 amqp_channel_t channel,
1094
                                 amqp_method_number_t request_id,
1095
                                 amqp_method_number_t *expected_reply_ids,
1096
0
                                 void *decoded_request_method) {
1097
0
  amqp_time_t deadline;
1098
0
  int res;
1099
1100
0
  res = amqp_time_from_now(&deadline, state->rpc_timeout);
1101
0
  if (res != AMQP_STATUS_OK) {
1102
0
    return amqp_rpc_reply_error(res);
1103
0
  }
1104
1105
0
  return simple_rpc_inner(state, channel, request_id, expected_reply_ids,
1106
0
                          decoded_request_method, deadline);
1107
0
}
1108
1109
void *amqp_simple_rpc_decoded(amqp_connection_state_t state,
1110
                              amqp_channel_t channel,
1111
                              amqp_method_number_t request_id,
1112
                              amqp_method_number_t reply_id,
1113
0
                              void *decoded_request_method) {
1114
0
  amqp_time_t deadline;
1115
0
  int res;
1116
0
  amqp_method_number_t replies[2];
1117
1118
0
  res = amqp_time_from_now(&deadline, state->rpc_timeout);
1119
0
  if (res != AMQP_STATUS_OK) {
1120
0
    state->most_recent_api_result = amqp_rpc_reply_error(res);
1121
0
    return NULL;
1122
0
  }
1123
1124
0
  replies[0] = reply_id;
1125
0
  replies[1] = 0;
1126
1127
0
  state->most_recent_api_result = simple_rpc_inner(
1128
0
      state, channel, request_id, replies, decoded_request_method, deadline);
1129
1130
0
  if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) {
1131
0
    return state->most_recent_api_result.reply.decoded;
1132
0
  } else {
1133
0
    return NULL;
1134
0
  }
1135
0
}
1136
1137
0
amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) {
1138
0
  return state->most_recent_api_result;
1139
0
}
1140
1141
/*
1142
 * Merge base and add tables. If the two tables contain an entry with the same
1143
 * key, the entry from the add table takes precedence. For entries that are both
1144
 * tables with the same key, the table is recursively merged.
1145
 */
1146
int amqp_merge_capabilities(const amqp_table_t *base, const amqp_table_t *add,
1147
0
                            amqp_table_t *result, amqp_pool_t *pool) {
1148
0
  int i;
1149
0
  int res;
1150
0
  amqp_pool_t temp_pool;
1151
0
  amqp_table_t temp_result;
1152
0
  assert(base != NULL);
1153
0
  assert(result != NULL);
1154
0
  assert(pool != NULL);
1155
1156
0
  if (NULL == add) {
1157
0
    return amqp_table_clone(base, result, pool);
1158
0
  }
1159
1160
0
  init_amqp_pool(&temp_pool, 4096);
1161
0
  temp_result.num_entries = 0;
1162
0
  temp_result.entries =
1163
0
      amqp_pool_alloc(&temp_pool, sizeof(amqp_table_entry_t) *
1164
0
                                      (base->num_entries + add->num_entries));
1165
0
  if (NULL == temp_result.entries) {
1166
0
    res = AMQP_STATUS_NO_MEMORY;
1167
0
    goto error_out;
1168
0
  }
1169
0
  for (i = 0; i < base->num_entries; ++i) {
1170
0
    temp_result.entries[temp_result.num_entries] = base->entries[i];
1171
0
    temp_result.num_entries++;
1172
0
  }
1173
0
  for (i = 0; i < add->num_entries; ++i) {
1174
0
    amqp_table_entry_t *e =
1175
0
        amqp_table_get_entry_by_key(&temp_result, add->entries[i].key);
1176
0
    if (NULL != e) {
1177
0
      if (AMQP_FIELD_KIND_TABLE == add->entries[i].value.kind &&
1178
0
          AMQP_FIELD_KIND_TABLE == e->value.kind) {
1179
0
        amqp_table_entry_t *be =
1180
0
            amqp_table_get_entry_by_key(base, add->entries[i].key);
1181
1182
0
        res = amqp_merge_capabilities(&be->value.value.table,
1183
0
                                      &add->entries[i].value.value.table,
1184
0
                                      &e->value.value.table, &temp_pool);
1185
0
        if (AMQP_STATUS_OK != res) {
1186
0
          goto error_out;
1187
0
        }
1188
0
      } else {
1189
0
        e->value = add->entries[i].value;
1190
0
      }
1191
0
    } else {
1192
0
      temp_result.entries[temp_result.num_entries] = add->entries[i];
1193
0
      temp_result.num_entries++;
1194
0
    }
1195
0
  }
1196
0
  res = amqp_table_clone(&temp_result, result, pool);
1197
0
error_out:
1198
0
  empty_amqp_pool(&temp_pool);
1199
0
  return res;
1200
0
}
1201
1202
static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
1203
                                         char const *vhost, int channel_max,
1204
                                         int frame_max, int heartbeat,
1205
                                         const amqp_table_t *client_properties,
1206
                                         const struct timeval *timeout,
1207
                                         amqp_sasl_method_enum sasl_method,
1208
0
                                         va_list vl) {
1209
0
  int res;
1210
0
  amqp_method_t method;
1211
1212
0
  uint16_t client_channel_max;
1213
0
  uint32_t client_frame_max;
1214
0
  uint16_t client_heartbeat;
1215
1216
0
  uint16_t server_channel_max;
1217
0
  uint32_t server_frame_max;
1218
0
  uint16_t server_heartbeat;
1219
1220
0
  amqp_rpc_reply_t result;
1221
0
  amqp_time_t deadline;
1222
1223
0
  if (channel_max < 0 || channel_max > UINT16_MAX) {
1224
0
    return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
1225
0
  }
1226
0
  client_channel_max = (uint16_t)channel_max;
1227
1228
0
  if (frame_max < 0) {
1229
0
    return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
1230
0
  }
1231
0
  client_frame_max = (uint32_t)frame_max;
1232
1233
0
  if (heartbeat < 0 || heartbeat > UINT16_MAX) {
1234
0
    return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
1235
0
  }
1236
0
  client_heartbeat = (uint16_t)heartbeat;
1237
1238
0
  res = amqp_time_from_now(&deadline, timeout);
1239
0
  if (AMQP_STATUS_OK != res) {
1240
0
    goto error_res;
1241
0
  }
1242
1243
0
  res = send_header_inner(state, deadline);
1244
0
  if (AMQP_STATUS_OK != res) {
1245
0
    goto error_res;
1246
0
  }
1247
1248
0
  res = simple_wait_method_inner(state, 0, AMQP_CONNECTION_START_METHOD,
1249
0
                                 deadline, &method);
1250
0
  if (AMQP_STATUS_OK != res) {
1251
0
    goto error_res;
1252
0
  }
1253
1254
0
  {
1255
0
    amqp_connection_start_t *s = (amqp_connection_start_t *)method.decoded;
1256
0
    if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) ||
1257
0
        (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) {
1258
0
      res = AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION;
1259
0
      goto error_res;
1260
0
    }
1261
1262
0
    res = amqp_table_clone(&s->server_properties, &state->server_properties,
1263
0
                           &state->properties_pool);
1264
1265
0
    if (AMQP_STATUS_OK != res) {
1266
0
      goto error_res;
1267
0
    }
1268
1269
    /* TODO: check that our chosen SASL mechanism is in the list of
1270
       acceptable mechanisms. Or even let the application choose from
1271
       the list! */
1272
0
    if (!sasl_mechanism_in_list(s->mechanisms, sasl_method)) {
1273
0
      res = AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD;
1274
0
      goto error_res;
1275
0
    }
1276
0
  }
1277
1278
0
  {
1279
0
    amqp_table_entry_t default_properties[6];
1280
0
    amqp_table_t default_table;
1281
0
    amqp_table_entry_t client_capabilities[2];
1282
0
    amqp_table_t client_capabilities_table;
1283
0
    amqp_connection_start_ok_t s;
1284
0
    amqp_pool_t *channel_pool;
1285
0
    amqp_bytes_t response_bytes;
1286
1287
0
    channel_pool = amqp_get_or_create_channel_pool(state, 0);
1288
0
    if (NULL == channel_pool) {
1289
0
      res = AMQP_STATUS_NO_MEMORY;
1290
0
      goto error_res;
1291
0
    }
1292
1293
0
    response_bytes = sasl_response(channel_pool, sasl_method, vl);
1294
0
    if (response_bytes.bytes == NULL) {
1295
0
      res = AMQP_STATUS_NO_MEMORY;
1296
0
      goto error_res;
1297
0
    }
1298
1299
0
    client_capabilities[0] =
1300
0
        amqp_table_construct_bool_entry("authentication_failure_close", 1);
1301
0
    client_capabilities[1] =
1302
0
        amqp_table_construct_bool_entry("exchange_exchange_bindings", 1);
1303
1304
0
    client_capabilities_table.entries = client_capabilities;
1305
0
    client_capabilities_table.num_entries =
1306
0
        sizeof(client_capabilities) / sizeof(amqp_table_entry_t);
1307
1308
0
    default_properties[0] =
1309
0
        amqp_table_construct_utf8_entry("product", "rabbitmq-c");
1310
0
    default_properties[1] =
1311
0
        amqp_table_construct_utf8_entry("version", AMQP_VERSION_STRING);
1312
0
    default_properties[2] =
1313
0
        amqp_table_construct_utf8_entry("platform", AMQ_PLATFORM);
1314
0
    default_properties[3] =
1315
0
        amqp_table_construct_utf8_entry("copyright", AMQ_COPYRIGHT);
1316
0
    default_properties[4] = amqp_table_construct_utf8_entry(
1317
0
        "information", "See https://github.com/alanxz/rabbitmq-c");
1318
0
    default_properties[5] = amqp_table_construct_table_entry(
1319
0
        "capabilities", &client_capabilities_table);
1320
1321
0
    default_table.entries = default_properties;
1322
0
    default_table.num_entries =
1323
0
        sizeof(default_properties) / sizeof(amqp_table_entry_t);
1324
1325
0
    res = amqp_merge_capabilities(&default_table, client_properties,
1326
0
                                  &state->client_properties, channel_pool);
1327
0
    if (AMQP_STATUS_OK != res) {
1328
0
      goto error_res;
1329
0
    }
1330
1331
0
    s.client_properties = state->client_properties;
1332
0
    s.mechanism = sasl_method_name(sasl_method);
1333
0
    s.response = response_bytes;
1334
0
    s.locale = amqp_literal_bytes("en_US");
1335
1336
0
    res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s,
1337
0
                                 AMQP_SF_NONE, deadline);
1338
0
    if (res < 0) {
1339
0
      goto error_res;
1340
0
    }
1341
0
  }
1342
1343
0
  amqp_release_buffers(state);
1344
1345
0
  {
1346
0
    amqp_method_number_t expected[] = {AMQP_CONNECTION_TUNE_METHOD,
1347
0
                                       AMQP_CONNECTION_CLOSE_METHOD, 0};
1348
1349
0
    res = amqp_simple_wait_method_list(state, 0, expected, deadline, &method);
1350
0
    if (AMQP_STATUS_OK != res) {
1351
0
      goto error_res;
1352
0
    }
1353
0
  }
1354
1355
0
  if (AMQP_CONNECTION_CLOSE_METHOD == method.id) {
1356
0
    result.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
1357
0
    result.reply = method;
1358
0
    result.library_error = 0;
1359
0
    goto out;
1360
0
  }
1361
1362
0
  {
1363
0
    amqp_connection_tune_t *s = (amqp_connection_tune_t *)method.decoded;
1364
0
    server_channel_max = s->channel_max;
1365
0
    server_frame_max = s->frame_max;
1366
0
    server_heartbeat = s->heartbeat;
1367
0
  }
1368
1369
0
  if (server_channel_max != 0 &&
1370
0
      (server_channel_max < client_channel_max || client_channel_max == 0)) {
1371
0
    client_channel_max = server_channel_max;
1372
0
  } else if (server_channel_max == 0 && client_channel_max == 0) {
1373
0
    client_channel_max = UINT16_MAX;
1374
0
  }
1375
1376
0
  if (server_frame_max != 0 && server_frame_max < client_frame_max) {
1377
0
    client_frame_max = server_frame_max;
1378
0
  }
1379
1380
0
  if (server_heartbeat != 0 && server_heartbeat < client_heartbeat) {
1381
0
    client_heartbeat = server_heartbeat;
1382
0
  }
1383
1384
0
  res = amqp_tune_connection(state, client_channel_max, client_frame_max,
1385
0
                             client_heartbeat);
1386
0
  if (res < 0) {
1387
0
    goto error_res;
1388
0
  }
1389
1390
0
  {
1391
0
    amqp_connection_tune_ok_t s;
1392
0
    s.frame_max = client_frame_max;
1393
0
    s.channel_max = client_channel_max;
1394
0
    s.heartbeat = client_heartbeat;
1395
1396
0
    res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s,
1397
0
                                 AMQP_SF_NONE, deadline);
1398
0
    if (res < 0) {
1399
0
      goto error_res;
1400
0
    }
1401
0
  }
1402
1403
0
  amqp_release_buffers(state);
1404
1405
0
  {
1406
0
    amqp_method_number_t replies[] = {AMQP_CONNECTION_OPEN_OK_METHOD, 0};
1407
0
    amqp_connection_open_t s;
1408
0
    s.virtual_host = amqp_cstring_bytes(vhost);
1409
0
    s.capabilities = amqp_empty_bytes;
1410
0
    s.insist = 1;
1411
1412
0
    result = simple_rpc_inner(state, 0, AMQP_CONNECTION_OPEN_METHOD, replies,
1413
0
                              &s, deadline);
1414
0
    if (result.reply_type != AMQP_RESPONSE_NORMAL) {
1415
0
      goto out;
1416
0
    }
1417
0
  }
1418
1419
0
  result.reply_type = AMQP_RESPONSE_NORMAL;
1420
0
  result.reply.id = 0;
1421
0
  result.reply.decoded = NULL;
1422
0
  result.library_error = 0;
1423
0
  amqp_maybe_release_buffers(state);
1424
1425
0
out:
1426
0
  return result;
1427
1428
0
error_res:
1429
0
  amqp_socket_close(state->socket, AMQP_SC_FORCE);
1430
0
  result = amqp_rpc_reply_error(res);
1431
1432
0
  goto out;
1433
0
}
1434
1435
amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost,
1436
                            int channel_max, int frame_max, int heartbeat,
1437
0
                            amqp_sasl_method_enum sasl_method, ...) {
1438
1439
0
  va_list vl;
1440
0
  amqp_rpc_reply_t ret;
1441
1442
0
  va_start(vl, sasl_method);
1443
1444
0
  ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat,
1445
0
                         &amqp_empty_table, state->handshake_timeout,
1446
0
                         sasl_method, vl);
1447
1448
0
  va_end(vl);
1449
1450
0
  return ret;
1451
0
}
1452
1453
amqp_rpc_reply_t amqp_login_with_properties(
1454
    amqp_connection_state_t state, char const *vhost, int channel_max,
1455
    int frame_max, int heartbeat, const amqp_table_t *client_properties,
1456
0
    amqp_sasl_method_enum sasl_method, ...) {
1457
0
  va_list vl;
1458
0
  amqp_rpc_reply_t ret;
1459
1460
0
  va_start(vl, sasl_method);
1461
1462
0
  ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat,
1463
0
                         client_properties, state->handshake_timeout,
1464
0
                         sasl_method, vl);
1465
1466
0
  va_end(vl);
1467
1468
0
  return ret;
1469
0
}