Coverage Report

Created: 2025-08-11 07:00

/src/librabbitmq/librabbitmq/amqp_api.c
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
2
// SPDX-License-Identifier: mit
3
4
#ifdef HAVE_CONFIG_H
5
#include "config.h"
6
#endif
7
8
#ifdef _MSC_VER
9
/* MSVC complains about sprintf being deprecated in favor of sprintf_s */
10
#define _CRT_SECURE_NO_WARNINGS
11
/* MSVC complains about strdup being deprecated in favor of _strdup */
12
#define _CRT_NONSTDC_NO_DEPRECATE
13
#endif
14
15
#include "amqp_private.h"
16
#include "amqp_time.h"
17
#include <stdarg.h>
18
#include <stdint.h>
19
#include <stdio.h>
20
#include <stdlib.h>
21
#include <string.h>
22
23
0
#define ERROR_MASK (0x00FF)
24
0
#define ERROR_CATEGORY_MASK (0xFF00)
25
26
enum error_category_enum_ { EC_base = 0, EC_tcp = 1, EC_ssl = 2 };
27
28
static const char *base_error_strings[] = {
29
    /* AMQP_STATUS_OK 0x0 */
30
    "operation completed successfully",
31
    /* AMQP_STATUS_NO_MEMORY                  -0x0001 */
32
    "could not allocate memory",
33
    /* AMQP_STATUS_BAD_AQMP_DATA              -0x0002 */
34
    "invalid AMQP data",
35
    /* AMQP_STATUS_UNKNOWN_CLASS              -0x0003 */
36
    "unknown AMQP class id",
37
    /* AMQP_STATUS_UNKNOWN_METHOD             -0x0004 */
38
    "unknown AMQP method id",
39
    /* AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED -0x0005 */
40
    "hostname lookup failed",
41
    /* AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION  -0x0006 */
42
    "incompatible AMQP version",
43
    /* AMQP_STATUS_CONNECTION_CLOSED          -0x0007 */
44
    "connection closed unexpectedly",
45
    /* AMQP_STATUS_BAD_AMQP_URL               -0x0008 */
46
    "could not parse AMQP URL",
47
    /* AMQP_STATUS_SOCKET_ERROR               -0x0009 */
48
    "a socket error occurred",
49
    /* AMQP_STATUS_INVALID_PARAMETER          -0x000A */
50
    "invalid parameter",
51
    /* AMQP_STATUS_TABLE_TOO_BIG              -0x000B */
52
    "table too large for buffer",
53
    /* AMQP_STATUS_WRONG_METHOD               -0x000C */
54
    "unexpected method received",
55
    /* AMQP_STATUS_TIMEOUT                    -0x000D */
56
    "request timed out",
57
    /* AMQP_STATUS_TIMER_FAILED               -0x000E */
58
    "system timer has failed",
59
    /* AMQP_STATUS_HEARTBEAT_TIMEOUT          -0x000F */
60
    "heartbeat timeout, connection closed",
61
    /* AMQP_STATUS_UNEXPECTED STATE           -0x0010 */
62
    "unexpected protocol state",
63
    /* AMQP_STATUS_SOCKET_CLOSED              -0x0011 */
64
    "socket is closed",
65
    /* AMQP_STATUS_SOCKET_INUSE               -0x0012 */
66
    "socket already open",
67
    /* AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD -0x00013 */
68
    "unsupported sasl method requested",
69
    /* AMQP_STATUS_UNSUPPORTED                -0x0014 */
70
    "parameter value is unsupported"};
71
72
static const char *tcp_error_strings[] = {
73
    /* AMQP_STATUS_TCP_ERROR                  -0x0100 */
74
    "a socket error occurred",
75
    /* AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR   -0x0101 */
76
    "socket library initialization failed"};
77
78
static const char *ssl_error_strings[] = {
79
    /* AMQP_STATUS_SSL_ERROR                  -0x0200 */
80
    "a SSL error occurred",
81
    /* AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED -0x0201 */
82
    "SSL hostname verification failed",
83
    /* AMQP_STATUS_SSL_PEER_VERIFY_FAILED     -0x0202 */
84
    "SSL peer cert verification failed",
85
    /* AMQP_STATUS_SSL_CONNECTION_FAILED      -0x0203 */
86
    "SSL handshake failed",
87
    /* AMQP_STATUS_SSL_SET_ENGINE_FAILED      -0x0204 */
88
    "SSL setting engine failed",
89
    /* AMQP_STATUS_SSL_UNIMPLEMENTED          -0x0204 */
90
    "SSL API is not implemented"};
91
92
static const char *unknown_error_string = "(unknown error)";
93
94
0
const char *amqp_error_string2(int code) {
95
0
  const char *error_string;
96
0
  size_t category = (((-code) & ERROR_CATEGORY_MASK) >> 8);
97
0
  size_t error = (-code) & ERROR_MASK;
98
99
0
  switch (category) {
100
0
    case EC_base:
101
0
      if (error < (sizeof(base_error_strings) / sizeof(char *))) {
102
0
        error_string = base_error_strings[error];
103
0
      } else {
104
0
        error_string = unknown_error_string;
105
0
      }
106
0
      break;
107
108
0
    case EC_tcp:
109
0
      if (error < (sizeof(tcp_error_strings) / sizeof(char *))) {
110
0
        error_string = tcp_error_strings[error];
111
0
      } else {
112
0
        error_string = unknown_error_string;
113
0
      }
114
0
      break;
115
116
0
    case EC_ssl:
117
0
      if (error < (sizeof(ssl_error_strings) / sizeof(char *))) {
118
0
        error_string = ssl_error_strings[error];
119
0
      } else {
120
0
        error_string = unknown_error_string;
121
0
      }
122
123
0
      break;
124
125
0
    default:
126
0
      error_string = unknown_error_string;
127
0
      break;
128
0
  }
129
130
0
  return error_string;
131
0
}
132
133
0
char *amqp_error_string(int code) {
134
  /* Previously sometimes clients had to flip the sign on a return value from a
135
   * function to get the correct error code. Now, all error codes are negative.
136
   * To keep people's legacy code running correctly, we map all error codes to
137
   * negative values.
138
   *
139
   * This is only done with this deprecated function.
140
   */
141
0
  if (code > 0) {
142
0
    code = -code;
143
0
  }
144
0
  return strdup(amqp_error_string2(code));
145
0
}
146
147
0
void amqp_abort(const char *fmt, ...) {
148
0
  va_list ap;
149
0
  va_start(ap, fmt);
150
0
  vfprintf(stderr, fmt, ap);
151
0
  va_end(ap);
152
0
  fputc('\n', stderr);
153
0
  abort();
154
0
}
155
156
const amqp_bytes_t amqp_empty_bytes = {0, NULL};
157
const amqp_table_t amqp_empty_table = {0, NULL};
158
const amqp_array_t amqp_empty_array = {0, NULL};
159
160
int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel,
161
                       amqp_bytes_t exchange, amqp_bytes_t routing_key,
162
                       amqp_boolean_t mandatory, amqp_boolean_t immediate,
163
                       amqp_basic_properties_t const *properties,
164
0
                       amqp_bytes_t body) {
165
0
  amqp_frame_t f;
166
0
  size_t body_offset;
167
0
  size_t usable_body_payload_size =
168
0
      state->frame_max - (HEADER_SIZE + FOOTER_SIZE);
169
0
  int res;
170
0
  int flagz;
171
172
0
  amqp_basic_publish_t m;
173
0
  amqp_basic_properties_t default_properties;
174
175
0
  m.exchange = exchange;
176
0
  m.routing_key = routing_key;
177
0
  m.mandatory = mandatory;
178
0
  m.immediate = immediate;
179
0
  m.ticket = 0;
180
181
  /* TODO(alanxz): this heartbeat check is happening in the wrong place, it
182
   * should really be done in amqp_try_send/writev */
183
0
  res = amqp_time_has_past(state->next_recv_heartbeat);
184
0
  if (AMQP_STATUS_TIMER_FAILURE == res) {
185
0
    return res;
186
0
  } else if (AMQP_STATUS_TIMEOUT == res) {
187
0
    res = amqp_try_recv(state);
188
0
    if (AMQP_STATUS_TIMEOUT == res) {
189
0
      return AMQP_STATUS_HEARTBEAT_TIMEOUT;
190
0
    } else if (AMQP_STATUS_OK != res) {
191
0
      return res;
192
0
    }
193
0
  }
194
195
0
  res = amqp_send_method_inner(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m,
196
0
                               AMQP_SF_MORE, amqp_time_infinite());
197
0
  if (res < 0) {
198
0
    return res;
199
0
  }
200
201
0
  if (properties == NULL) {
202
0
    memset(&default_properties, 0, sizeof(default_properties));
203
0
    properties = &default_properties;
204
0
  }
205
206
0
  f.frame_type = AMQP_FRAME_HEADER;
207
0
  f.channel = channel;
208
0
  f.payload.properties.class_id = AMQP_BASIC_CLASS;
209
0
  f.payload.properties.body_size = body.len;
210
0
  f.payload.properties.decoded = (void *)properties;
211
212
0
  if (body.len > 0) {
213
0
    flagz = AMQP_SF_MORE;
214
0
  } else {
215
0
    flagz = AMQP_SF_NONE;
216
0
  }
217
0
  res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite());
218
0
  if (res < 0) {
219
0
    return res;
220
0
  }
221
222
0
  body_offset = 0;
223
0
  while (body_offset < body.len) {
224
0
    size_t remaining = body.len - body_offset;
225
226
0
    if (remaining == 0) {
227
0
      break;
228
0
    }
229
230
0
    f.frame_type = AMQP_FRAME_BODY;
231
0
    f.channel = channel;
232
0
    f.payload.body_fragment.bytes = amqp_offset(body.bytes, body_offset);
233
0
    if (remaining >= usable_body_payload_size) {
234
0
      f.payload.body_fragment.len = usable_body_payload_size;
235
0
      flagz = AMQP_SF_MORE;
236
0
    } else {
237
0
      f.payload.body_fragment.len = remaining;
238
0
      flagz = AMQP_SF_NONE;
239
0
    }
240
241
0
    body_offset += f.payload.body_fragment.len;
242
0
    res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite());
243
0
    if (res < 0) {
244
0
      return res;
245
0
    }
246
0
  }
247
248
0
  return AMQP_STATUS_OK;
249
0
}
250
251
amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state,
252
0
                                    amqp_channel_t channel, int code) {
253
0
  char codestr[13];
254
0
  amqp_method_number_t replies[2] = {AMQP_CHANNEL_CLOSE_OK_METHOD, 0};
255
0
  amqp_channel_close_t req;
256
257
0
  if (code < 0 || code > UINT16_MAX) {
258
0
    return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
259
0
  }
260
261
0
  req.reply_code = (uint16_t)code;
262
0
  req.reply_text.bytes = codestr;
263
0
  req.reply_text.len = sprintf(codestr, "%d", code);
264
0
  req.class_id = 0;
265
0
  req.method_id = 0;
266
267
0
  return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD, replies,
268
0
                         &req);
269
0
}
270
271
amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state,
272
0
                                       int code) {
273
0
  char codestr[13];
274
0
  amqp_method_number_t replies[2] = {AMQP_CONNECTION_CLOSE_OK_METHOD, 0};
275
0
  amqp_channel_close_t req;
276
277
0
  if (code < 0 || code > UINT16_MAX) {
278
0
    return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER);
279
0
  }
280
281
0
  req.reply_code = (uint16_t)code;
282
0
  req.reply_text.bytes = codestr;
283
0
  req.reply_text.len = sprintf(codestr, "%d", code);
284
0
  req.class_id = 0;
285
0
  req.method_id = 0;
286
287
0
  return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD, replies, &req);
288
0
}
289
290
int amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel,
291
0
                   uint64_t delivery_tag, amqp_boolean_t multiple) {
292
0
  amqp_basic_ack_t m;
293
0
  m.delivery_tag = delivery_tag;
294
0
  m.multiple = multiple;
295
0
  return amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m);
296
0
}
297
298
amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state,
299
                                amqp_channel_t channel, amqp_bytes_t queue,
300
0
                                amqp_boolean_t no_ack) {
301
0
  amqp_method_number_t replies[] = {AMQP_BASIC_GET_OK_METHOD,
302
0
                                    AMQP_BASIC_GET_EMPTY_METHOD, 0};
303
0
  amqp_basic_get_t req;
304
0
  req.ticket = 0;
305
0
  req.queue = queue;
306
0
  req.no_ack = no_ack;
307
308
0
  state->most_recent_api_result =
309
0
      amqp_simple_rpc(state, channel, AMQP_BASIC_GET_METHOD, replies, &req);
310
0
  return state->most_recent_api_result;
311
0
}
312
313
int amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel,
314
0
                      uint64_t delivery_tag, amqp_boolean_t requeue) {
315
0
  amqp_basic_reject_t req;
316
0
  req.delivery_tag = delivery_tag;
317
0
  req.requeue = requeue;
318
0
  return amqp_send_method(state, channel, AMQP_BASIC_REJECT_METHOD, &req);
319
0
}
320
321
int amqp_basic_nack(amqp_connection_state_t state, amqp_channel_t channel,
322
                    uint64_t delivery_tag, amqp_boolean_t multiple,
323
0
                    amqp_boolean_t requeue) {
324
0
  amqp_basic_nack_t req;
325
0
  req.delivery_tag = delivery_tag;
326
0
  req.multiple = multiple;
327
0
  req.requeue = requeue;
328
0
  return amqp_send_method(state, channel, AMQP_BASIC_NACK_METHOD, &req);
329
0
}
330
331
0
struct timeval *amqp_get_handshake_timeout(amqp_connection_state_t state) {
332
0
  return state->handshake_timeout;
333
0
}
334
335
int amqp_set_handshake_timeout(amqp_connection_state_t state,
336
0
                               const struct timeval *timeout) {
337
0
  if (timeout) {
338
0
    if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
339
0
      return AMQP_STATUS_INVALID_PARAMETER;
340
0
    }
341
0
    state->internal_handshake_timeout = *timeout;
342
0
    state->handshake_timeout = &state->internal_handshake_timeout;
343
0
  } else {
344
0
    state->handshake_timeout = NULL;
345
0
  }
346
347
0
  return AMQP_STATUS_OK;
348
0
}
349
350
0
struct timeval *amqp_get_rpc_timeout(amqp_connection_state_t state) {
351
0
  return state->rpc_timeout;
352
0
}
353
354
int amqp_set_rpc_timeout(amqp_connection_state_t state,
355
0
                         const struct timeval *timeout) {
356
0
  if (timeout) {
357
0
    if (timeout->tv_sec < 0 || timeout->tv_usec < 0) {
358
0
      return AMQP_STATUS_INVALID_PARAMETER;
359
0
    }
360
0
    state->rpc_timeout = &state->internal_rpc_timeout;
361
0
    *state->rpc_timeout = *timeout;
362
0
  } else {
363
0
    state->rpc_timeout = NULL;
364
0
  }
365
0
  return AMQP_STATUS_OK;
366
0
}
367
368
amqp_rpc_reply_t amqp_publisher_confirm_wait(amqp_connection_state_t state,
369
                                             const struct timeval *timeout,
370
0
                                             amqp_publisher_confirm_t *result) {
371
0
  int res;
372
0
  amqp_frame_t frame;
373
0
  amqp_rpc_reply_t ret;
374
375
0
  memset(&ret, 0x0, sizeof(ret));
376
0
  memset(result, 0x0, sizeof(amqp_publisher_confirm_t));
377
378
0
  res = amqp_simple_wait_frame_noblock(state, &frame, timeout);
379
380
0
  if (AMQP_STATUS_OK != res) {
381
0
    ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
382
0
    ret.library_error = res;
383
0
    return ret;
384
0
  } else if (AMQP_FRAME_METHOD != frame.frame_type ||
385
0
             (AMQP_BASIC_ACK_METHOD != frame.payload.method.id &&
386
0
              AMQP_BASIC_NACK_METHOD != frame.payload.method.id &&
387
0
              AMQP_BASIC_REJECT_METHOD != frame.payload.method.id)) {
388
0
    amqp_put_back_frame(state, &frame);
389
0
    ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
390
0
    ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
391
0
    return ret;
392
0
  }
393
394
0
  switch (frame.payload.method.id) {
395
0
    case AMQP_BASIC_ACK_METHOD:
396
0
      memcpy(&(result->payload.ack), frame.payload.method.decoded,
397
0
             sizeof(amqp_basic_ack_t));
398
0
      break;
399
400
0
    case AMQP_BASIC_NACK_METHOD:
401
0
      memcpy(&(result->payload.nack), frame.payload.method.decoded,
402
0
             sizeof(amqp_basic_nack_t));
403
0
      break;
404
405
0
    case AMQP_BASIC_REJECT_METHOD:
406
0
      memcpy(&(result->payload.reject), frame.payload.method.decoded,
407
0
             sizeof(amqp_basic_reject_t));
408
0
      break;
409
410
0
    default:
411
0
      amqp_put_back_frame(state, &frame);
412
0
      ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
413
0
      ret.library_error = AMQP_STATUS_UNSUPPORTED;
414
0
      return ret;
415
0
  }
416
0
  result->method = frame.payload.method.id;
417
0
  result->channel = frame.channel;
418
0
  ret.reply_type = AMQP_RESPONSE_NORMAL;
419
420
0
  return ret;
421
0
}