Coverage Report

Created: 2025-10-13 07:04

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/librabbitmq/librabbitmq/amqp_tcp_socket.c
Line
Count
Source
1
// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors.
2
// SPDX-License-Identifier: mit
3
4
#ifdef HAVE_CONFIG_H
5
#include "config.h"
6
#endif
7
8
#include "amqp_private.h"
9
#include "rabbitmq-c/tcp_socket.h"
10
11
#include <errno.h>
12
#if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__)))
13
#ifndef WIN32_LEAN_AND_MEAN
14
#define WIN32_LEAN_AND_MEAN
15
#endif
16
#include <winsock2.h>
17
#else
18
#include <netinet/in.h>
19
#include <netinet/tcp.h>
20
#include <sys/socket.h>
21
#endif
22
#include <stdio.h>
23
#include <stdlib.h>
24
25
struct amqp_tcp_socket_t {
26
  const struct amqp_socket_class_t *klass;
27
  int sockfd;
28
  int internal_error;
29
  int state;
30
};
31
32
static ssize_t amqp_tcp_socket_send(void *base, const void *buf, size_t len,
33
0
                                    int flags) {
34
0
  struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
35
0
  ssize_t res;
36
0
  int flagz = 0;
37
38
0
  if (-1 == self->sockfd) {
39
0
    return AMQP_STATUS_SOCKET_CLOSED;
40
0
  }
41
42
0
#ifdef MSG_NOSIGNAL
43
0
  flagz |= MSG_NOSIGNAL;
44
0
#endif
45
46
0
#if defined(MSG_MORE)
47
0
  if (flags & AMQP_SF_MORE) {
48
0
    flagz |= MSG_MORE;
49
0
  }
50
/* Cygwin defines TCP_NOPUSH, but trying to use it will return not
51
 * implemented. Disable it here. */
52
#elif defined(TCP_NOPUSH) && !defined(__CYGWIN__)
53
  if (flags & AMQP_SF_MORE && !(self->state & AMQP_SF_MORE)) {
54
    int one = 1;
55
    res = setsockopt(self->sockfd, IPPROTO_TCP, TCP_NOPUSH, &one, sizeof(one));
56
    if (0 != res) {
57
      self->internal_error = res;
58
      return AMQP_STATUS_SOCKET_ERROR;
59
    }
60
    self->state |= AMQP_SF_MORE;
61
  } else if (!(flags & AMQP_SF_MORE) && self->state & AMQP_SF_MORE) {
62
    int zero = 0;
63
    res =
64
        setsockopt(self->sockfd, IPPROTO_TCP, TCP_NOPUSH, &zero, sizeof(&zero));
65
    if (0 != res) {
66
      self->internal_error = res;
67
      res = AMQP_STATUS_SOCKET_ERROR;
68
    } else {
69
      self->state &= ~AMQP_SF_MORE;
70
    }
71
  }
72
#endif
73
74
0
start:
75
#ifdef _WIN32
76
  res = send(self->sockfd, buf, (int)len, flagz);
77
#else
78
0
  res = send(self->sockfd, buf, len, flagz);
79
0
#endif
80
81
0
  if (res < 0) {
82
0
    self->internal_error = amqp_os_socket_error();
83
0
    switch (self->internal_error) {
84
0
      case EINTR:
85
0
        goto start;
86
#ifdef _WIN32
87
      case WSAEWOULDBLOCK:
88
#else
89
0
      case EWOULDBLOCK:
90
0
#endif
91
#if defined(EAGAIN) && EAGAIN != EWOULDBLOCK
92
      case EAGAIN:
93
#endif
94
0
        res = AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE;
95
0
        break;
96
0
      default:
97
0
        res = AMQP_STATUS_SOCKET_ERROR;
98
0
    }
99
0
  } else {
100
0
    self->internal_error = 0;
101
0
  }
102
103
0
  return res;
104
0
}
105
106
static ssize_t amqp_tcp_socket_recv(void *base, void *buf, size_t len,
107
0
                                    int flags) {
108
0
  struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
109
0
  ssize_t ret;
110
0
  if (-1 == self->sockfd) {
111
0
    return AMQP_STATUS_SOCKET_CLOSED;
112
0
  }
113
114
0
start:
115
#ifdef _WIN32
116
  ret = recv(self->sockfd, buf, (int)len, flags);
117
#else
118
0
  ret = recv(self->sockfd, buf, len, flags);
119
0
#endif
120
121
0
  if (0 > ret) {
122
0
    self->internal_error = amqp_os_socket_error();
123
0
    switch (self->internal_error) {
124
0
      case EINTR:
125
0
        goto start;
126
#ifdef _WIN32
127
      case WSAEWOULDBLOCK:
128
#else
129
0
      case EWOULDBLOCK:
130
0
#endif
131
#if defined(EAGAIN) && EAGAIN != EWOULDBLOCK
132
      case EAGAIN:
133
#endif
134
0
        ret = AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD;
135
0
        break;
136
0
      default:
137
0
        ret = AMQP_STATUS_SOCKET_ERROR;
138
0
    }
139
0
  } else if (0 == ret) {
140
0
    ret = AMQP_STATUS_CONNECTION_CLOSED;
141
0
  }
142
143
0
  return ret;
144
0
}
145
146
static int amqp_tcp_socket_open(void *base, const char *host, int port,
147
0
                                const struct timeval *timeout) {
148
0
  struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
149
0
  if (-1 != self->sockfd) {
150
0
    return AMQP_STATUS_SOCKET_INUSE;
151
0
  }
152
0
  self->sockfd = amqp_open_socket_noblock(host, port, timeout);
153
0
  if (0 > self->sockfd) {
154
0
    int err = self->sockfd;
155
0
    self->sockfd = -1;
156
0
    return err;
157
0
  }
158
0
  return AMQP_STATUS_OK;
159
0
}
160
161
static int amqp_tcp_socket_close(void *base,
162
0
                                 AMQP_UNUSED amqp_socket_close_enum force) {
163
0
  struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
164
0
  if (-1 == self->sockfd) {
165
0
    return AMQP_STATUS_SOCKET_CLOSED;
166
0
  }
167
168
0
  if (amqp_os_socket_close(self->sockfd)) {
169
0
    return AMQP_STATUS_SOCKET_ERROR;
170
0
  }
171
0
  self->sockfd = -1;
172
173
0
  return AMQP_STATUS_OK;
174
0
}
175
176
0
static int amqp_tcp_socket_get_sockfd(void *base) {
177
0
  struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
178
0
  return self->sockfd;
179
0
}
180
181
0
static void amqp_tcp_socket_delete(void *base) {
182
0
  struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
183
184
0
  if (self) {
185
0
    amqp_tcp_socket_close(self, AMQP_SC_NONE);
186
0
    free(self);
187
0
  }
188
0
}
189
190
static const struct amqp_socket_class_t amqp_tcp_socket_class = {
191
    amqp_tcp_socket_send,       /* send */
192
    amqp_tcp_socket_recv,       /* recv */
193
    amqp_tcp_socket_open,       /* open */
194
    amqp_tcp_socket_close,      /* close */
195
    amqp_tcp_socket_get_sockfd, /* get_sockfd */
196
    amqp_tcp_socket_delete      /* delete */
197
};
198
199
0
amqp_socket_t *amqp_tcp_socket_new(amqp_connection_state_t state) {
200
0
  struct amqp_tcp_socket_t *self = calloc(1, sizeof(*self));
201
0
  if (!self) {
202
0
    return NULL;
203
0
  }
204
0
  self->klass = &amqp_tcp_socket_class;
205
0
  self->sockfd = -1;
206
207
0
  amqp_set_socket(state, (amqp_socket_t *)self);
208
209
0
  return (amqp_socket_t *)self;
210
0
}
211
212
0
void amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd) {
213
0
  struct amqp_tcp_socket_t *self;
214
0
  if (base->klass != &amqp_tcp_socket_class) {
215
0
    amqp_abort("<%p> is not of type amqp_tcp_socket_t", base);
216
0
  }
217
0
  self = (struct amqp_tcp_socket_t *)base;
218
0
  self->sockfd = sockfd;
219
0
}