/src/librabbitmq/librabbitmq/amqp_tcp_socket.c
Line | Count | Source |
1 | | // Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors. |
2 | | // SPDX-License-Identifier: mit |
3 | | |
4 | | #ifdef HAVE_CONFIG_H |
5 | | #include "config.h" |
6 | | #endif |
7 | | |
8 | | #include "amqp_private.h" |
9 | | #include "rabbitmq-c/tcp_socket.h" |
10 | | |
11 | | #include <errno.h> |
12 | | #if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__))) |
13 | | #ifndef WIN32_LEAN_AND_MEAN |
14 | | #define WIN32_LEAN_AND_MEAN |
15 | | #endif |
16 | | #include <winsock2.h> |
17 | | #else |
18 | | #include <netinet/in.h> |
19 | | #include <netinet/tcp.h> |
20 | | #include <sys/socket.h> |
21 | | #endif |
22 | | #include <stdio.h> |
23 | | #include <stdlib.h> |
24 | | |
25 | | struct amqp_tcp_socket_t { |
26 | | const struct amqp_socket_class_t *klass; |
27 | | int sockfd; |
28 | | int internal_error; |
29 | | int state; |
30 | | }; |
31 | | |
32 | | static ssize_t amqp_tcp_socket_send(void *base, const void *buf, size_t len, |
33 | 0 | int flags) { |
34 | 0 | struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; |
35 | 0 | ssize_t res; |
36 | 0 | int flagz = 0; |
37 | |
|
38 | 0 | if (-1 == self->sockfd) { |
39 | 0 | return AMQP_STATUS_SOCKET_CLOSED; |
40 | 0 | } |
41 | | |
42 | 0 | #ifdef MSG_NOSIGNAL |
43 | 0 | flagz |= MSG_NOSIGNAL; |
44 | 0 | #endif |
45 | |
|
46 | 0 | #if defined(MSG_MORE) |
47 | 0 | if (flags & AMQP_SF_MORE) { |
48 | 0 | flagz |= MSG_MORE; |
49 | 0 | } |
50 | | /* Cygwin defines TCP_NOPUSH, but trying to use it will return not |
51 | | * implemented. Disable it here. */ |
52 | | #elif defined(TCP_NOPUSH) && !defined(__CYGWIN__) |
53 | | if (flags & AMQP_SF_MORE && !(self->state & AMQP_SF_MORE)) { |
54 | | int one = 1; |
55 | | res = setsockopt(self->sockfd, IPPROTO_TCP, TCP_NOPUSH, &one, sizeof(one)); |
56 | | if (0 != res) { |
57 | | self->internal_error = res; |
58 | | return AMQP_STATUS_SOCKET_ERROR; |
59 | | } |
60 | | self->state |= AMQP_SF_MORE; |
61 | | } else if (!(flags & AMQP_SF_MORE) && self->state & AMQP_SF_MORE) { |
62 | | int zero = 0; |
63 | | res = |
64 | | setsockopt(self->sockfd, IPPROTO_TCP, TCP_NOPUSH, &zero, sizeof(&zero)); |
65 | | if (0 != res) { |
66 | | self->internal_error = res; |
67 | | res = AMQP_STATUS_SOCKET_ERROR; |
68 | | } else { |
69 | | self->state &= ~AMQP_SF_MORE; |
70 | | } |
71 | | } |
72 | | #endif |
73 | |
|
74 | 0 | start: |
75 | | #ifdef _WIN32 |
76 | | res = send(self->sockfd, buf, (int)len, flagz); |
77 | | #else |
78 | 0 | res = send(self->sockfd, buf, len, flagz); |
79 | 0 | #endif |
80 | |
|
81 | 0 | if (res < 0) { |
82 | 0 | self->internal_error = amqp_os_socket_error(); |
83 | 0 | switch (self->internal_error) { |
84 | 0 | case EINTR: |
85 | 0 | goto start; |
86 | | #ifdef _WIN32 |
87 | | case WSAEWOULDBLOCK: |
88 | | #else |
89 | 0 | case EWOULDBLOCK: |
90 | 0 | #endif |
91 | | #if defined(EAGAIN) && EAGAIN != EWOULDBLOCK |
92 | | case EAGAIN: |
93 | | #endif |
94 | 0 | res = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE; |
95 | 0 | break; |
96 | 0 | default: |
97 | 0 | res = AMQP_STATUS_SOCKET_ERROR; |
98 | 0 | } |
99 | 0 | } else { |
100 | 0 | self->internal_error = 0; |
101 | 0 | } |
102 | | |
103 | 0 | return res; |
104 | 0 | } |
105 | | |
106 | | static ssize_t amqp_tcp_socket_recv(void *base, void *buf, size_t len, |
107 | 0 | int flags) { |
108 | 0 | struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; |
109 | 0 | ssize_t ret; |
110 | 0 | if (-1 == self->sockfd) { |
111 | 0 | return AMQP_STATUS_SOCKET_CLOSED; |
112 | 0 | } |
113 | | |
114 | 0 | start: |
115 | | #ifdef _WIN32 |
116 | | ret = recv(self->sockfd, buf, (int)len, flags); |
117 | | #else |
118 | 0 | ret = recv(self->sockfd, buf, len, flags); |
119 | 0 | #endif |
120 | |
|
121 | 0 | if (0 > ret) { |
122 | 0 | self->internal_error = amqp_os_socket_error(); |
123 | 0 | switch (self->internal_error) { |
124 | 0 | case EINTR: |
125 | 0 | goto start; |
126 | | #ifdef _WIN32 |
127 | | case WSAEWOULDBLOCK: |
128 | | #else |
129 | 0 | case EWOULDBLOCK: |
130 | 0 | #endif |
131 | | #if defined(EAGAIN) && EAGAIN != EWOULDBLOCK |
132 | | case EAGAIN: |
133 | | #endif |
134 | 0 | ret = AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD; |
135 | 0 | break; |
136 | 0 | default: |
137 | 0 | ret = AMQP_STATUS_SOCKET_ERROR; |
138 | 0 | } |
139 | 0 | } else if (0 == ret) { |
140 | 0 | ret = AMQP_STATUS_CONNECTION_CLOSED; |
141 | 0 | } |
142 | | |
143 | 0 | return ret; |
144 | 0 | } |
145 | | |
146 | | static int amqp_tcp_socket_open(void *base, const char *host, int port, |
147 | 0 | const struct timeval *timeout) { |
148 | 0 | struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; |
149 | 0 | if (-1 != self->sockfd) { |
150 | 0 | return AMQP_STATUS_SOCKET_INUSE; |
151 | 0 | } |
152 | 0 | self->sockfd = amqp_open_socket_noblock(host, port, timeout); |
153 | 0 | if (0 > self->sockfd) { |
154 | 0 | int err = self->sockfd; |
155 | 0 | self->sockfd = -1; |
156 | 0 | return err; |
157 | 0 | } |
158 | 0 | return AMQP_STATUS_OK; |
159 | 0 | } |
160 | | |
161 | | static int amqp_tcp_socket_close(void *base, |
162 | 0 | AMQP_UNUSED amqp_socket_close_enum force) { |
163 | 0 | struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; |
164 | 0 | if (-1 == self->sockfd) { |
165 | 0 | return AMQP_STATUS_SOCKET_CLOSED; |
166 | 0 | } |
167 | | |
168 | 0 | if (amqp_os_socket_close(self->sockfd)) { |
169 | 0 | return AMQP_STATUS_SOCKET_ERROR; |
170 | 0 | } |
171 | 0 | self->sockfd = -1; |
172 | |
|
173 | 0 | return AMQP_STATUS_OK; |
174 | 0 | } |
175 | | |
176 | 0 | static int amqp_tcp_socket_get_sockfd(void *base) { |
177 | 0 | struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; |
178 | 0 | return self->sockfd; |
179 | 0 | } |
180 | | |
181 | 0 | static void amqp_tcp_socket_delete(void *base) { |
182 | 0 | struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; |
183 | |
|
184 | 0 | if (self) { |
185 | 0 | amqp_tcp_socket_close(self, AMQP_SC_NONE); |
186 | 0 | free(self); |
187 | 0 | } |
188 | 0 | } |
189 | | |
190 | | static const struct amqp_socket_class_t amqp_tcp_socket_class = { |
191 | | amqp_tcp_socket_send, /* send */ |
192 | | amqp_tcp_socket_recv, /* recv */ |
193 | | amqp_tcp_socket_open, /* open */ |
194 | | amqp_tcp_socket_close, /* close */ |
195 | | amqp_tcp_socket_get_sockfd, /* get_sockfd */ |
196 | | amqp_tcp_socket_delete /* delete */ |
197 | | }; |
198 | | |
199 | 0 | amqp_socket_t *amqp_tcp_socket_new(amqp_connection_state_t state) { |
200 | 0 | struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self)); |
201 | 0 | if (!self) { |
202 | 0 | return NULL; |
203 | 0 | } |
204 | 0 | self->klass = &amqp_tcp_socket_class; |
205 | 0 | self->sockfd = -1; |
206 | |
|
207 | 0 | amqp_set_socket(state, (amqp_socket_t *)self); |
208 | |
|
209 | 0 | return (amqp_socket_t *)self; |
210 | 0 | } |
211 | | |
212 | 0 | void amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd) { |
213 | 0 | struct amqp_tcp_socket_t *self; |
214 | 0 | if (base->klass != &amqp_tcp_socket_class) { |
215 | 0 | amqp_abort("<%p> is not of type amqp_tcp_socket_t", base); |
216 | 0 | } |
217 | 0 | self = (struct amqp_tcp_socket_t *)base; |
218 | 0 | self->sockfd = sockfd; |
219 | 0 | } |