/src/h2o/deps/libyrmcds/recv.c
Line | Count | Source (jump to first uncovered line) |
1 | | // (C) 2013 Cybozu et al. |
2 | | |
3 | | #include "yrmcds.h" |
4 | | #include "yrmcds_portability.h" |
5 | | |
6 | | #ifdef LIBYRMCDS_USE_LZ4 |
7 | | # include "lz4/lib/lz4.h" |
8 | | #endif |
9 | | |
10 | | #include <errno.h> |
11 | | #include <limits.h> |
12 | | #include <stdio.h> |
13 | | #include <stdlib.h> |
14 | | #include <string.h> |
15 | | #include <sys/socket.h> |
16 | | #include <sys/types.h> |
17 | | |
18 | | static const size_t BINARY_HEADER_SIZE = 24; |
19 | | static const size_t RECV_SIZE = 256 << 10; |
20 | | static const size_t MAX_CAPACITY = 50 << 20; // 50 MiB |
21 | | |
22 | 0 | static inline yrmcds_error recv_data(yrmcds* c) { |
23 | 0 | if( (c->capacity - c->used) < RECV_SIZE ) { |
24 | 0 | size_t new_capacity = c->capacity * 2; |
25 | 0 | char* new_buffer = (char*)realloc(c->recvbuf, new_capacity); |
26 | 0 | if( new_buffer == NULL ) |
27 | 0 | return YRMCDS_OUT_OF_MEMORY; |
28 | 0 | c->recvbuf = new_buffer; |
29 | 0 | c->capacity = new_capacity; |
30 | 0 | } |
31 | | |
32 | 0 | ssize_t n; |
33 | 0 | AGAIN: |
34 | 0 | n = recv(c->sock, c->recvbuf + c->used, RECV_SIZE, 0); |
35 | 0 | if( n == -1 ) { |
36 | 0 | if( errno == EINTR ) goto AGAIN; |
37 | 0 | return YRMCDS_SYSTEM_ERROR; |
38 | 0 | } |
39 | 0 | if( n == 0 ) |
40 | 0 | return YRMCDS_DISCONNECTED; |
41 | 0 | c->used += (size_t)n; |
42 | 0 | return YRMCDS_OK; |
43 | 0 | } |
44 | | |
45 | 0 | static inline uint64_t ntoh64(const char* p) { |
46 | 0 | uint64_t n; |
47 | 0 | memcpy(&n, p, sizeof(n)); |
48 | 0 | return be64toh(n); |
49 | 0 | } |
50 | | |
51 | 0 | static inline uint32_t ntoh32(const char* p) { |
52 | 0 | uint32_t n; |
53 | 0 | memcpy(&n, p, sizeof(n)); |
54 | 0 | return be32toh(n); |
55 | 0 | } |
56 | | |
57 | 0 | static inline uint16_t ntoh16(const char* p) { |
58 | 0 | uint16_t n; |
59 | 0 | memcpy(&n, p, sizeof(n)); |
60 | 0 | return be16toh(n); |
61 | 0 | } |
62 | | |
63 | | static yrmcds_error text_recv(yrmcds* c, yrmcds_response* r); |
64 | | |
65 | 0 | yrmcds_error yrmcds_recv(yrmcds* c, yrmcds_response* r) { |
66 | 0 | if( c == NULL || r == NULL ) |
67 | 0 | return YRMCDS_BAD_ARGUMENT; |
68 | 0 | if( c->invalid ) |
69 | 0 | return YRMCDS_PROTOCOL_ERROR; |
70 | | |
71 | 0 | if( c->last_size > 0 ) { |
72 | 0 | size_t remain = c->used - c->last_size; |
73 | 0 | if( remain > 0 ) |
74 | 0 | memmove(c->recvbuf, c->recvbuf + c->last_size, remain); |
75 | 0 | c->used = remain; |
76 | 0 | c->last_size = 0; |
77 | 0 | free(c->decompressed); |
78 | 0 | c->decompressed = NULL; |
79 | 0 | } |
80 | |
|
81 | 0 | if( c->text_mode ) { |
82 | 0 | return text_recv(c, r); |
83 | 0 | } |
84 | | |
85 | 0 | while( c->used < BINARY_HEADER_SIZE ) { |
86 | 0 | yrmcds_error e = recv_data(c); |
87 | 0 | if( e != 0 ) return e; |
88 | 0 | } |
89 | | |
90 | 0 | if( *c->recvbuf != '\x81' ) { |
91 | 0 | c->invalid = 1; |
92 | 0 | return YRMCDS_PROTOCOL_ERROR; |
93 | 0 | } |
94 | 0 | uint32_t total_len = ntoh32(c->recvbuf + 8); |
95 | 0 | if( total_len > MAX_CAPACITY ) { |
96 | 0 | c->invalid = 1; |
97 | 0 | return YRMCDS_PROTOCOL_ERROR; |
98 | 0 | } |
99 | 0 | while( c->used < (BINARY_HEADER_SIZE + total_len) ) { |
100 | 0 | yrmcds_error e = recv_data(c); |
101 | 0 | if( e != 0 ) return e; |
102 | 0 | } |
103 | | |
104 | 0 | uint16_t key_len = ntoh16(c->recvbuf + 2); |
105 | 0 | uint8_t extras_len = *(unsigned char*)(c->recvbuf + 4); |
106 | 0 | if( total_len < (key_len + extras_len) ) { |
107 | 0 | c->invalid = 1; |
108 | 0 | return YRMCDS_PROTOCOL_ERROR; |
109 | 0 | } |
110 | | |
111 | 0 | const char* pkey = c->recvbuf + (BINARY_HEADER_SIZE + extras_len); |
112 | 0 | r->length = BINARY_HEADER_SIZE + total_len; |
113 | 0 | r->command = *(unsigned char*)(c->recvbuf + 1); |
114 | 0 | r->key = key_len ? pkey : NULL; |
115 | 0 | r->key_len = key_len; |
116 | 0 | r->status = ntoh16(c->recvbuf + 6); |
117 | 0 | memcpy(&(r->serial), c->recvbuf + 12, 4); |
118 | 0 | r->cas_unique = ntoh64(c->recvbuf + 16); |
119 | 0 | r->flags = 0; |
120 | 0 | if( extras_len > 0 ) { |
121 | 0 | if( extras_len != 4 ) { |
122 | 0 | c->invalid = 1; |
123 | 0 | return YRMCDS_PROTOCOL_ERROR; |
124 | 0 | } |
125 | 0 | r->flags = ntoh32(c->recvbuf + BINARY_HEADER_SIZE); |
126 | 0 | } |
127 | | |
128 | 0 | size_t data_len = total_len - key_len - extras_len; |
129 | 0 | const char* pdata = pkey + key_len; |
130 | |
|
131 | 0 | if( (r->command == YRMCDS_CMD_INCREMENT || |
132 | 0 | r->command == YRMCDS_CMD_DECREMENT) && |
133 | 0 | (r->status == YRMCDS_STATUS_OK) ) { |
134 | 0 | r->data = NULL; |
135 | 0 | r->data_len = 0; |
136 | 0 | if( data_len != 8 ) { |
137 | 0 | c->invalid = 1; |
138 | 0 | return YRMCDS_PROTOCOL_ERROR; |
139 | 0 | } |
140 | 0 | r->value = ntoh64(pdata); |
141 | 0 | c->last_size = r->length; |
142 | 0 | return YRMCDS_OK; |
143 | 0 | } |
144 | 0 | r->value = 0; |
145 | 0 | r->data = data_len ? pdata : NULL; |
146 | 0 | r->data_len = data_len; |
147 | |
|
148 | | #ifdef LIBYRMCDS_USE_LZ4 |
149 | | if( c->compress_size && (r->flags & YRMCDS_FLAG_COMPRESS) ) { |
150 | | if( data_len == 0 ) { |
151 | | c->invalid = 1; |
152 | | return YRMCDS_PROTOCOL_ERROR; |
153 | | } |
154 | | r->flags &= ~(uint32_t)YRMCDS_FLAG_COMPRESS; |
155 | | uint32_t decompress_size = ntoh32(pdata); |
156 | | if( UINT32_MAX > INT_MAX ) { |
157 | | if( decompress_size > INT_MAX ) { |
158 | | c->invalid = 1; |
159 | | return YRMCDS_PROTOCOL_ERROR; |
160 | | } |
161 | | } |
162 | | c->decompressed = (char*)malloc(decompress_size); |
163 | | if( c->decompressed == NULL ) |
164 | | return YRMCDS_OUT_OF_MEMORY; |
165 | | int d = LZ4_decompress_safe(pdata + sizeof(uint32_t), |
166 | | c->decompressed, |
167 | | (int)(data_len - sizeof(uint32_t)), |
168 | | (int)decompress_size); |
169 | | if( d != decompress_size ) { |
170 | | c->invalid = 1; |
171 | | return YRMCDS_PROTOCOL_ERROR; |
172 | | } |
173 | | r->data = c->decompressed; |
174 | | r->data_len = decompress_size; |
175 | | } |
176 | | #endif // LIBYRMCDS_USE_LZ4 |
177 | |
|
178 | 0 | c->last_size = r->length; |
179 | 0 | return YRMCDS_OK; |
180 | 0 | } |
181 | | |
182 | | |
183 | | // text protocol |
184 | | #define PARSE_UINT(name) \ |
185 | 0 | uint64_t name = 0; \ |
186 | 0 | while( *p == ' ' ) p++; \ |
187 | 0 | while( '0' <= *p && *p <= '9' ) { \ |
188 | 0 | name *= 10; \ |
189 | 0 | name += (uint64_t)(*p - '0'); \ |
190 | 0 | p++; \ |
191 | 0 | } |
192 | | |
193 | 0 | static yrmcds_error text_recv(yrmcds* c, yrmcds_response* r) { |
194 | 0 | char* pos; |
195 | 0 | while( c->used == 0 || |
196 | 0 | (pos = (char*)memchr(c->recvbuf, '\n', c->used)) == NULL ) { |
197 | 0 | yrmcds_error e = recv_data(c); |
198 | 0 | if( e != 0 ) return e; |
199 | 0 | } |
200 | | // make sure the buffer contains CRLF. |
201 | 0 | if( (pos - c->recvbuf) < 2 || *(pos-1) != '\r' ) { |
202 | 0 | c->invalid = 1; |
203 | 0 | return YRMCDS_PROTOCOL_ERROR; |
204 | 0 | } |
205 | 0 | pos--; |
206 | 0 | size_t resp_len = (size_t)(pos - c->recvbuf); |
207 | |
|
208 | 0 | memset(r, 0, sizeof(yrmcds_response)); |
209 | 0 | r->serial = ++c->rserial; |
210 | 0 | r->length = resp_len + 2; |
211 | 0 | r->status = YRMCDS_STATUS_OK; |
212 | 0 | r->command = YRMCDS_CMD_BOTTOM; // dummy for emulating binary protocol |
213 | |
|
214 | 0 | if( resp_len == 2 && memcmp(c->recvbuf, "OK", 2) == 0 ) { |
215 | | // successful response for flush_all |
216 | 0 | goto FINISH; |
217 | 0 | } |
218 | 0 | if( resp_len == 3 && memcmp(c->recvbuf, "END", 3) == 0 ) { |
219 | | // get failed for non-existing object. |
220 | 0 | r->status = YRMCDS_STATUS_NOTFOUND; |
221 | 0 | goto FINISH; |
222 | 0 | } |
223 | 0 | if( resp_len == 5 && memcmp(c->recvbuf, "ERROR", 5) == 0 ) { |
224 | 0 | r->status = YRMCDS_STATUS_UNKNOWNCOMMAND; |
225 | 0 | goto FINISH; |
226 | 0 | } |
227 | 0 | if( resp_len == 6 ) { |
228 | 0 | if( memcmp(c->recvbuf, "STORED", 6) == 0 ) { |
229 | | // successful response for storage commands. |
230 | 0 | goto FINISH; |
231 | 0 | } |
232 | 0 | if( memcmp(c->recvbuf, "EXISTS", 6) == 0 ) { |
233 | | // failure response for cas. |
234 | 0 | r->status = YRMCDS_STATUS_EXISTS; |
235 | 0 | goto FINISH; |
236 | 0 | } |
237 | 0 | } |
238 | 0 | if( resp_len == 7 ) { |
239 | 0 | if( memcmp(c->recvbuf, "DELETED", 7) == 0 ) |
240 | | // successful response for delete. |
241 | 0 | goto FINISH; |
242 | 0 | if( memcmp(c->recvbuf, "TOUCHED", 7) == 0 ) |
243 | | // successful response for touch. |
244 | 0 | goto FINISH; |
245 | 0 | } |
246 | 0 | if( resp_len == 9 && memcmp(c->recvbuf, "NOT_FOUND", 9) == 0 ) { |
247 | | // failure response for cas, delete, incr, decr, or touch. |
248 | 0 | r->status = YRMCDS_STATUS_NOTFOUND; |
249 | 0 | goto FINISH; |
250 | 0 | } |
251 | 0 | if( resp_len == 10 && memcmp(c->recvbuf, "NOT_STORED", 10) == 0 ) { |
252 | | // failure response for add, replace, append, or prepend. |
253 | 0 | r->status = YRMCDS_STATUS_NOTSTORED; |
254 | 0 | goto FINISH; |
255 | 0 | } |
256 | 0 | if( resp_len > 0 && '0' <= c->recvbuf[0] && c->recvbuf[0] <= '9' ) { |
257 | | // successful response for incr or decr. |
258 | 0 | const char* p = c->recvbuf; |
259 | 0 | PARSE_UINT(value); |
260 | 0 | r->value = value; |
261 | 0 | goto FINISH; |
262 | 0 | } |
263 | 0 | if( resp_len > 8 && memcmp(c->recvbuf, "VERSION ", 8) == 0 ) { |
264 | | // successful response for version. |
265 | 0 | r->data_len = resp_len - 8; |
266 | 0 | r->data = c->recvbuf + 8; |
267 | 0 | goto FINISH; |
268 | 0 | } |
269 | 0 | if( resp_len > 6 && memcmp(c->recvbuf, "VALUE ", 6) == 0 ) { |
270 | | // successful response for gets. |
271 | 0 | const char* p = c->recvbuf + 6; |
272 | 0 | while( *p == ' ' ) p++; |
273 | 0 | if( p == pos ) goto UNKNOWN; |
274 | | |
275 | 0 | const char* key_end = memchr(p, ' ', (size_t)(pos - p)); |
276 | 0 | if( key_end == NULL ) goto UNKNOWN; |
277 | 0 | r->key = p; |
278 | 0 | r->key_len = (size_t)(key_end - p); |
279 | |
|
280 | 0 | p = key_end; |
281 | 0 | PARSE_UINT(flags); |
282 | 0 | if( *p != ' ' ) goto UNKNOWN; |
283 | 0 | r->flags = (uint32_t)flags; |
284 | |
|
285 | 0 | PARSE_UINT(bytes); |
286 | 0 | if( bytes > MAX_CAPACITY ) { |
287 | 0 | c->invalid = 1; |
288 | 0 | return YRMCDS_PROTOCOL_ERROR; |
289 | 0 | } |
290 | 0 | size_t data_len = (size_t)bytes; |
291 | |
|
292 | 0 | while( *p == ' ' ) p++; |
293 | 0 | if( *p < '0' || '9' < *p ) goto UNKNOWN; |
294 | 0 | PARSE_UINT(cas); |
295 | |
|
296 | 0 | size_t required = resp_len + 2 + data_len + 7; // CRLF "END" CRLF |
297 | 0 | while( c->used < required ) { |
298 | 0 | yrmcds_error e = recv_data(c); |
299 | 0 | if( e != 0 ) return e; |
300 | 0 | } |
301 | | |
302 | 0 | const char* data = c->recvbuf + (resp_len + 2); |
303 | 0 | if( memcmp(data + data_len, "\r\nEND\r\n", 7) != 0 ) { |
304 | 0 | c->invalid = 1; |
305 | 0 | return YRMCDS_PROTOCOL_ERROR; |
306 | 0 | } |
307 | 0 | r->length = required; |
308 | 0 | r->flags = (uint32_t)flags; |
309 | |
|
310 | | #ifdef LIBYRMCDS_USE_LZ4 |
311 | | if( c->compress_size && (r->flags & YRMCDS_FLAG_COMPRESS) ) { |
312 | | if( data_len == 0 ) { |
313 | | c->invalid = 1; |
314 | | return YRMCDS_PROTOCOL_ERROR; |
315 | | } |
316 | | r->flags &= ~(uint32_t)YRMCDS_FLAG_COMPRESS; |
317 | | uint32_t decompress_size = ntoh32(data); |
318 | | if( UINT32_MAX > INT_MAX ) { |
319 | | if( decompress_size > INT_MAX ) { |
320 | | c->invalid = 1; |
321 | | return YRMCDS_PROTOCOL_ERROR; |
322 | | } |
323 | | } |
324 | | c->decompressed = (char*)malloc(decompress_size); |
325 | | if( c->decompressed == NULL ) |
326 | | return YRMCDS_OUT_OF_MEMORY; |
327 | | int d = LZ4_decompress_safe(data + sizeof(uint32_t), |
328 | | c->decompressed, |
329 | | (int)(data_len - sizeof(uint32_t)), |
330 | | (int)decompress_size); |
331 | | if( d != decompress_size ) { |
332 | | c->invalid = 1; |
333 | | return YRMCDS_PROTOCOL_ERROR; |
334 | | } |
335 | | data = c->decompressed; |
336 | | data_len = (size_t)decompress_size; |
337 | | } |
338 | | #endif // LIBYRMCDS_USE_LZ4 |
339 | 0 | r->data = data; |
340 | 0 | r->data_len = data_len; |
341 | 0 | r->cas_unique = cas; |
342 | 0 | goto FINISH; |
343 | 0 | } |
344 | | |
345 | 0 | UNKNOWN: |
346 | 0 | r->status = YRMCDS_STATUS_OTHER; |
347 | 0 | fprintf(stderr, "[libyrmcds] unknown response: %.*s\n", |
348 | 0 | (int)resp_len, c->recvbuf); |
349 | |
|
350 | 0 | FINISH: |
351 | 0 | c->last_size = r->length; |
352 | 0 | return YRMCDS_OK; |
353 | 0 | } |
354 | | |