/src/librabbitmq/librabbitmq/amqp_api.c
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors. |
2 | | // SPDX-License-Identifier: mit |
3 | | |
4 | | #ifdef HAVE_CONFIG_H |
5 | | #include "config.h" |
6 | | #endif |
7 | | |
8 | | #ifdef _MSC_VER |
9 | | /* MSVC complains about sprintf being deprecated in favor of sprintf_s */ |
10 | | #define _CRT_SECURE_NO_WARNINGS |
11 | | /* MSVC complains about strdup being deprecated in favor of _strdup */ |
12 | | #define _CRT_NONSTDC_NO_DEPRECATE |
13 | | #endif |
14 | | |
15 | | #include "amqp_private.h" |
16 | | #include "amqp_time.h" |
17 | | #include <stdarg.h> |
18 | | #include <stdint.h> |
19 | | #include <stdio.h> |
20 | | #include <stdlib.h> |
21 | | #include <string.h> |
22 | | |
23 | 0 | #define ERROR_MASK (0x00FF) |
24 | 0 | #define ERROR_CATEGORY_MASK (0xFF00) |
25 | | |
26 | | enum error_category_enum_ { EC_base = 0, EC_tcp = 1, EC_ssl = 2 }; |
27 | | |
28 | | static const char *base_error_strings[] = { |
29 | | /* AMQP_STATUS_OK 0x0 */ |
30 | | "operation completed successfully", |
31 | | /* AMQP_STATUS_NO_MEMORY -0x0001 */ |
32 | | "could not allocate memory", |
33 | | /* AMQP_STATUS_BAD_AQMP_DATA -0x0002 */ |
34 | | "invalid AMQP data", |
35 | | /* AMQP_STATUS_UNKNOWN_CLASS -0x0003 */ |
36 | | "unknown AMQP class id", |
37 | | /* AMQP_STATUS_UNKNOWN_METHOD -0x0004 */ |
38 | | "unknown AMQP method id", |
39 | | /* AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED -0x0005 */ |
40 | | "hostname lookup failed", |
41 | | /* AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION -0x0006 */ |
42 | | "incompatible AMQP version", |
43 | | /* AMQP_STATUS_CONNECTION_CLOSED -0x0007 */ |
44 | | "connection closed unexpectedly", |
45 | | /* AMQP_STATUS_BAD_AMQP_URL -0x0008 */ |
46 | | "could not parse AMQP URL", |
47 | | /* AMQP_STATUS_SOCKET_ERROR -0x0009 */ |
48 | | "a socket error occurred", |
49 | | /* AMQP_STATUS_INVALID_PARAMETER -0x000A */ |
50 | | "invalid parameter", |
51 | | /* AMQP_STATUS_TABLE_TOO_BIG -0x000B */ |
52 | | "table too large for buffer", |
53 | | /* AMQP_STATUS_WRONG_METHOD -0x000C */ |
54 | | "unexpected method received", |
55 | | /* AMQP_STATUS_TIMEOUT -0x000D */ |
56 | | "request timed out", |
57 | | /* AMQP_STATUS_TIMER_FAILED -0x000E */ |
58 | | "system timer has failed", |
59 | | /* AMQP_STATUS_HEARTBEAT_TIMEOUT -0x000F */ |
60 | | "heartbeat timeout, connection closed", |
61 | | /* AMQP_STATUS_UNEXPECTED STATE -0x0010 */ |
62 | | "unexpected protocol state", |
63 | | /* AMQP_STATUS_SOCKET_CLOSED -0x0011 */ |
64 | | "socket is closed", |
65 | | /* AMQP_STATUS_SOCKET_INUSE -0x0012 */ |
66 | | "socket already open", |
67 | | /* AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD -0x00013 */ |
68 | | "unsupported sasl method requested", |
69 | | /* AMQP_STATUS_UNSUPPORTED -0x0014 */ |
70 | | "parameter value is unsupported"}; |
71 | | |
72 | | static const char *tcp_error_strings[] = { |
73 | | /* AMQP_STATUS_TCP_ERROR -0x0100 */ |
74 | | "a socket error occurred", |
75 | | /* AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR -0x0101 */ |
76 | | "socket library initialization failed"}; |
77 | | |
78 | | static const char *ssl_error_strings[] = { |
79 | | /* AMQP_STATUS_SSL_ERROR -0x0200 */ |
80 | | "a SSL error occurred", |
81 | | /* AMQP_STATUS_SSL_HOSTNAME_VERIFY_FAILED -0x0201 */ |
82 | | "SSL hostname verification failed", |
83 | | /* AMQP_STATUS_SSL_PEER_VERIFY_FAILED -0x0202 */ |
84 | | "SSL peer cert verification failed", |
85 | | /* AMQP_STATUS_SSL_CONNECTION_FAILED -0x0203 */ |
86 | | "SSL handshake failed", |
87 | | /* AMQP_STATUS_SSL_SET_ENGINE_FAILED -0x0204 */ |
88 | | "SSL setting engine failed", |
89 | | /* AMQP_STATUS_SSL_UNIMPLEMENTED -0x0204 */ |
90 | | "SSL API is not implemented"}; |
91 | | |
92 | | static const char *unknown_error_string = "(unknown error)"; |
93 | | |
94 | 0 | const char *amqp_error_string2(int code) { |
95 | 0 | const char *error_string; |
96 | 0 | size_t category = (((-code) & ERROR_CATEGORY_MASK) >> 8); |
97 | 0 | size_t error = (-code) & ERROR_MASK; |
98 | |
|
99 | 0 | switch (category) { |
100 | 0 | case EC_base: |
101 | 0 | if (error < (sizeof(base_error_strings) / sizeof(char *))) { |
102 | 0 | error_string = base_error_strings[error]; |
103 | 0 | } else { |
104 | 0 | error_string = unknown_error_string; |
105 | 0 | } |
106 | 0 | break; |
107 | | |
108 | 0 | case EC_tcp: |
109 | 0 | if (error < (sizeof(tcp_error_strings) / sizeof(char *))) { |
110 | 0 | error_string = tcp_error_strings[error]; |
111 | 0 | } else { |
112 | 0 | error_string = unknown_error_string; |
113 | 0 | } |
114 | 0 | break; |
115 | | |
116 | 0 | case EC_ssl: |
117 | 0 | if (error < (sizeof(ssl_error_strings) / sizeof(char *))) { |
118 | 0 | error_string = ssl_error_strings[error]; |
119 | 0 | } else { |
120 | 0 | error_string = unknown_error_string; |
121 | 0 | } |
122 | |
|
123 | 0 | break; |
124 | | |
125 | 0 | default: |
126 | 0 | error_string = unknown_error_string; |
127 | 0 | break; |
128 | 0 | } |
129 | | |
130 | 0 | return error_string; |
131 | 0 | } |
132 | | |
133 | 0 | char *amqp_error_string(int code) { |
134 | | /* Previously sometimes clients had to flip the sign on a return value from a |
135 | | * function to get the correct error code. Now, all error codes are negative. |
136 | | * To keep people's legacy code running correctly, we map all error codes to |
137 | | * negative values. |
138 | | * |
139 | | * This is only done with this deprecated function. |
140 | | */ |
141 | 0 | if (code > 0) { |
142 | 0 | code = -code; |
143 | 0 | } |
144 | 0 | return strdup(amqp_error_string2(code)); |
145 | 0 | } |
146 | | |
147 | 0 | void amqp_abort(const char *fmt, ...) { |
148 | 0 | va_list ap; |
149 | 0 | va_start(ap, fmt); |
150 | 0 | vfprintf(stderr, fmt, ap); |
151 | 0 | va_end(ap); |
152 | 0 | fputc('\n', stderr); |
153 | 0 | abort(); |
154 | 0 | } |
155 | | |
156 | | const amqp_bytes_t amqp_empty_bytes = {0, NULL}; |
157 | | const amqp_table_t amqp_empty_table = {0, NULL}; |
158 | | const amqp_array_t amqp_empty_array = {0, NULL}; |
159 | | |
160 | | int amqp_basic_publish(amqp_connection_state_t state, amqp_channel_t channel, |
161 | | amqp_bytes_t exchange, amqp_bytes_t routing_key, |
162 | | amqp_boolean_t mandatory, amqp_boolean_t immediate, |
163 | | amqp_basic_properties_t const *properties, |
164 | 0 | amqp_bytes_t body) { |
165 | 0 | amqp_frame_t f; |
166 | 0 | size_t body_offset; |
167 | 0 | size_t usable_body_payload_size = |
168 | 0 | state->frame_max - (HEADER_SIZE + FOOTER_SIZE); |
169 | 0 | int res; |
170 | 0 | int flagz; |
171 | |
|
172 | 0 | amqp_basic_publish_t m; |
173 | 0 | amqp_basic_properties_t default_properties; |
174 | |
|
175 | 0 | m.exchange = exchange; |
176 | 0 | m.routing_key = routing_key; |
177 | 0 | m.mandatory = mandatory; |
178 | 0 | m.immediate = immediate; |
179 | 0 | m.ticket = 0; |
180 | | |
181 | | /* TODO(alanxz): this heartbeat check is happening in the wrong place, it |
182 | | * should really be done in amqp_try_send/writev */ |
183 | 0 | res = amqp_time_has_past(state->next_recv_heartbeat); |
184 | 0 | if (AMQP_STATUS_TIMER_FAILURE == res) { |
185 | 0 | return res; |
186 | 0 | } else if (AMQP_STATUS_TIMEOUT == res) { |
187 | 0 | res = amqp_try_recv(state); |
188 | 0 | if (AMQP_STATUS_TIMEOUT == res) { |
189 | 0 | return AMQP_STATUS_HEARTBEAT_TIMEOUT; |
190 | 0 | } else if (AMQP_STATUS_OK != res) { |
191 | 0 | return res; |
192 | 0 | } |
193 | 0 | } |
194 | | |
195 | 0 | res = amqp_send_method_inner(state, channel, AMQP_BASIC_PUBLISH_METHOD, &m, |
196 | 0 | AMQP_SF_MORE, amqp_time_infinite()); |
197 | 0 | if (res < 0) { |
198 | 0 | return res; |
199 | 0 | } |
200 | | |
201 | 0 | if (properties == NULL) { |
202 | 0 | memset(&default_properties, 0, sizeof(default_properties)); |
203 | 0 | properties = &default_properties; |
204 | 0 | } |
205 | |
|
206 | 0 | f.frame_type = AMQP_FRAME_HEADER; |
207 | 0 | f.channel = channel; |
208 | 0 | f.payload.properties.class_id = AMQP_BASIC_CLASS; |
209 | 0 | f.payload.properties.body_size = body.len; |
210 | 0 | f.payload.properties.decoded = (void *)properties; |
211 | |
|
212 | 0 | if (body.len > 0) { |
213 | 0 | flagz = AMQP_SF_MORE; |
214 | 0 | } else { |
215 | 0 | flagz = AMQP_SF_NONE; |
216 | 0 | } |
217 | 0 | res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite()); |
218 | 0 | if (res < 0) { |
219 | 0 | return res; |
220 | 0 | } |
221 | | |
222 | 0 | body_offset = 0; |
223 | 0 | while (body_offset < body.len) { |
224 | 0 | size_t remaining = body.len - body_offset; |
225 | |
|
226 | 0 | if (remaining == 0) { |
227 | 0 | break; |
228 | 0 | } |
229 | | |
230 | 0 | f.frame_type = AMQP_FRAME_BODY; |
231 | 0 | f.channel = channel; |
232 | 0 | f.payload.body_fragment.bytes = amqp_offset(body.bytes, body_offset); |
233 | 0 | if (remaining >= usable_body_payload_size) { |
234 | 0 | f.payload.body_fragment.len = usable_body_payload_size; |
235 | 0 | flagz = AMQP_SF_MORE; |
236 | 0 | } else { |
237 | 0 | f.payload.body_fragment.len = remaining; |
238 | 0 | flagz = AMQP_SF_NONE; |
239 | 0 | } |
240 | |
|
241 | 0 | body_offset += f.payload.body_fragment.len; |
242 | 0 | res = amqp_send_frame_inner(state, &f, flagz, amqp_time_infinite()); |
243 | 0 | if (res < 0) { |
244 | 0 | return res; |
245 | 0 | } |
246 | 0 | } |
247 | | |
248 | 0 | return AMQP_STATUS_OK; |
249 | 0 | } |
250 | | |
251 | | amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, |
252 | 0 | amqp_channel_t channel, int code) { |
253 | 0 | char codestr[13]; |
254 | 0 | amqp_method_number_t replies[2] = {AMQP_CHANNEL_CLOSE_OK_METHOD, 0}; |
255 | 0 | amqp_channel_close_t req; |
256 | |
|
257 | 0 | if (code < 0 || code > UINT16_MAX) { |
258 | 0 | return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER); |
259 | 0 | } |
260 | | |
261 | 0 | req.reply_code = (uint16_t)code; |
262 | 0 | req.reply_text.bytes = codestr; |
263 | 0 | req.reply_text.len = sprintf(codestr, "%d", code); |
264 | 0 | req.class_id = 0; |
265 | 0 | req.method_id = 0; |
266 | |
|
267 | 0 | return amqp_simple_rpc(state, channel, AMQP_CHANNEL_CLOSE_METHOD, replies, |
268 | 0 | &req); |
269 | 0 | } |
270 | | |
271 | | amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, |
272 | 0 | int code) { |
273 | 0 | char codestr[13]; |
274 | 0 | amqp_method_number_t replies[2] = {AMQP_CONNECTION_CLOSE_OK_METHOD, 0}; |
275 | 0 | amqp_channel_close_t req; |
276 | |
|
277 | 0 | if (code < 0 || code > UINT16_MAX) { |
278 | 0 | return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER); |
279 | 0 | } |
280 | | |
281 | 0 | req.reply_code = (uint16_t)code; |
282 | 0 | req.reply_text.bytes = codestr; |
283 | 0 | req.reply_text.len = sprintf(codestr, "%d", code); |
284 | 0 | req.class_id = 0; |
285 | 0 | req.method_id = 0; |
286 | |
|
287 | 0 | return amqp_simple_rpc(state, 0, AMQP_CONNECTION_CLOSE_METHOD, replies, &req); |
288 | 0 | } |
289 | | |
290 | | int amqp_basic_ack(amqp_connection_state_t state, amqp_channel_t channel, |
291 | 0 | uint64_t delivery_tag, amqp_boolean_t multiple) { |
292 | 0 | amqp_basic_ack_t m; |
293 | 0 | m.delivery_tag = delivery_tag; |
294 | 0 | m.multiple = multiple; |
295 | 0 | return amqp_send_method(state, channel, AMQP_BASIC_ACK_METHOD, &m); |
296 | 0 | } |
297 | | |
298 | | amqp_rpc_reply_t amqp_basic_get(amqp_connection_state_t state, |
299 | | amqp_channel_t channel, amqp_bytes_t queue, |
300 | 0 | amqp_boolean_t no_ack) { |
301 | 0 | amqp_method_number_t replies[] = {AMQP_BASIC_GET_OK_METHOD, |
302 | 0 | AMQP_BASIC_GET_EMPTY_METHOD, 0}; |
303 | 0 | amqp_basic_get_t req; |
304 | 0 | req.ticket = 0; |
305 | 0 | req.queue = queue; |
306 | 0 | req.no_ack = no_ack; |
307 | |
|
308 | 0 | state->most_recent_api_result = |
309 | 0 | amqp_simple_rpc(state, channel, AMQP_BASIC_GET_METHOD, replies, &req); |
310 | 0 | return state->most_recent_api_result; |
311 | 0 | } |
312 | | |
313 | | int amqp_basic_reject(amqp_connection_state_t state, amqp_channel_t channel, |
314 | 0 | uint64_t delivery_tag, amqp_boolean_t requeue) { |
315 | 0 | amqp_basic_reject_t req; |
316 | 0 | req.delivery_tag = delivery_tag; |
317 | 0 | req.requeue = requeue; |
318 | 0 | return amqp_send_method(state, channel, AMQP_BASIC_REJECT_METHOD, &req); |
319 | 0 | } |
320 | | |
321 | | int amqp_basic_nack(amqp_connection_state_t state, amqp_channel_t channel, |
322 | | uint64_t delivery_tag, amqp_boolean_t multiple, |
323 | 0 | amqp_boolean_t requeue) { |
324 | 0 | amqp_basic_nack_t req; |
325 | 0 | req.delivery_tag = delivery_tag; |
326 | 0 | req.multiple = multiple; |
327 | 0 | req.requeue = requeue; |
328 | 0 | return amqp_send_method(state, channel, AMQP_BASIC_NACK_METHOD, &req); |
329 | 0 | } |
330 | | |
331 | 0 | struct timeval *amqp_get_handshake_timeout(amqp_connection_state_t state) { |
332 | 0 | return state->handshake_timeout; |
333 | 0 | } |
334 | | |
335 | | int amqp_set_handshake_timeout(amqp_connection_state_t state, |
336 | 0 | const struct timeval *timeout) { |
337 | 0 | if (timeout) { |
338 | 0 | if (timeout->tv_sec < 0 || timeout->tv_usec < 0) { |
339 | 0 | return AMQP_STATUS_INVALID_PARAMETER; |
340 | 0 | } |
341 | 0 | state->internal_handshake_timeout = *timeout; |
342 | 0 | state->handshake_timeout = &state->internal_handshake_timeout; |
343 | 0 | } else { |
344 | 0 | state->handshake_timeout = NULL; |
345 | 0 | } |
346 | | |
347 | 0 | return AMQP_STATUS_OK; |
348 | 0 | } |
349 | | |
350 | 0 | struct timeval *amqp_get_rpc_timeout(amqp_connection_state_t state) { |
351 | 0 | return state->rpc_timeout; |
352 | 0 | } |
353 | | |
354 | | int amqp_set_rpc_timeout(amqp_connection_state_t state, |
355 | 0 | const struct timeval *timeout) { |
356 | 0 | if (timeout) { |
357 | 0 | if (timeout->tv_sec < 0 || timeout->tv_usec < 0) { |
358 | 0 | return AMQP_STATUS_INVALID_PARAMETER; |
359 | 0 | } |
360 | 0 | state->rpc_timeout = &state->internal_rpc_timeout; |
361 | 0 | *state->rpc_timeout = *timeout; |
362 | 0 | } else { |
363 | 0 | state->rpc_timeout = NULL; |
364 | 0 | } |
365 | 0 | return AMQP_STATUS_OK; |
366 | 0 | } |
367 | | |
368 | | amqp_rpc_reply_t amqp_publisher_confirm_wait(amqp_connection_state_t state, |
369 | | const struct timeval *timeout, |
370 | 0 | amqp_publisher_confirm_t *result) { |
371 | 0 | int res; |
372 | 0 | amqp_frame_t frame; |
373 | 0 | amqp_rpc_reply_t ret; |
374 | |
|
375 | 0 | memset(&ret, 0x0, sizeof(ret)); |
376 | 0 | memset(result, 0x0, sizeof(amqp_publisher_confirm_t)); |
377 | |
|
378 | 0 | res = amqp_simple_wait_frame_noblock(state, &frame, timeout); |
379 | |
|
380 | 0 | if (AMQP_STATUS_OK != res) { |
381 | 0 | ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; |
382 | 0 | ret.library_error = res; |
383 | 0 | return ret; |
384 | 0 | } else if (AMQP_FRAME_METHOD != frame.frame_type || |
385 | 0 | (AMQP_BASIC_ACK_METHOD != frame.payload.method.id && |
386 | 0 | AMQP_BASIC_NACK_METHOD != frame.payload.method.id && |
387 | 0 | AMQP_BASIC_REJECT_METHOD != frame.payload.method.id)) { |
388 | 0 | amqp_put_back_frame(state, &frame); |
389 | 0 | ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; |
390 | 0 | ret.library_error = AMQP_STATUS_UNEXPECTED_STATE; |
391 | 0 | return ret; |
392 | 0 | } |
393 | | |
394 | 0 | switch (frame.payload.method.id) { |
395 | 0 | case AMQP_BASIC_ACK_METHOD: |
396 | 0 | memcpy(&(result->payload.ack), frame.payload.method.decoded, |
397 | 0 | sizeof(amqp_basic_ack_t)); |
398 | 0 | break; |
399 | | |
400 | 0 | case AMQP_BASIC_NACK_METHOD: |
401 | 0 | memcpy(&(result->payload.nack), frame.payload.method.decoded, |
402 | 0 | sizeof(amqp_basic_nack_t)); |
403 | 0 | break; |
404 | | |
405 | 0 | case AMQP_BASIC_REJECT_METHOD: |
406 | 0 | memcpy(&(result->payload.reject), frame.payload.method.decoded, |
407 | 0 | sizeof(amqp_basic_reject_t)); |
408 | 0 | break; |
409 | | |
410 | 0 | default: |
411 | 0 | amqp_put_back_frame(state, &frame); |
412 | 0 | ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; |
413 | 0 | ret.library_error = AMQP_STATUS_UNSUPPORTED; |
414 | 0 | return ret; |
415 | 0 | } |
416 | 0 | result->method = frame.payload.method.id; |
417 | 0 | result->channel = frame.channel; |
418 | 0 | ret.reply_type = AMQP_RESPONSE_NORMAL; |
419 | |
|
420 | 0 | return ret; |
421 | 0 | } |