Coverage Report

Created: 2025-07-11 06:25

/src/h2o/deps/libyrmcds/recv.c
Line
Count
Source (jump to first uncovered line)
1
// (C) 2013 Cybozu et al.
2
3
#include "yrmcds.h"
4
#include "yrmcds_portability.h"
5
6
#ifdef LIBYRMCDS_USE_LZ4
7
#  include "lz4/lib/lz4.h"
8
#endif
9
10
#include <errno.h>
11
#include <limits.h>
12
#include <stdio.h>
13
#include <stdlib.h>
14
#include <string.h>
15
#include <sys/socket.h>
16
#include <sys/types.h>
17
18
static const size_t BINARY_HEADER_SIZE = 24;
19
static const size_t RECV_SIZE = 256 << 10;
20
static const size_t MAX_CAPACITY = 50 << 20; // 50 MiB
21
22
0
static inline yrmcds_error recv_data(yrmcds* c) {
23
0
    if( (c->capacity - c->used) < RECV_SIZE ) {
24
0
        size_t new_capacity = c->capacity * 2;
25
0
        char* new_buffer = (char*)realloc(c->recvbuf, new_capacity);
26
0
        if( new_buffer == NULL )
27
0
            return YRMCDS_OUT_OF_MEMORY;
28
0
        c->recvbuf = new_buffer;
29
0
        c->capacity = new_capacity;
30
0
    }
31
32
0
    ssize_t n;
33
0
  AGAIN:
34
0
    n = recv(c->sock, c->recvbuf + c->used, RECV_SIZE, 0);
35
0
    if( n == -1 ) {
36
0
        if( errno == EINTR ) goto AGAIN;
37
0
        return YRMCDS_SYSTEM_ERROR;
38
0
    }
39
0
    if( n == 0 )
40
0
        return YRMCDS_DISCONNECTED;
41
0
    c->used += (size_t)n;
42
0
    return YRMCDS_OK;
43
0
}
44
45
0
static inline uint64_t ntoh64(const char* p) {
46
0
    uint64_t n;
47
0
    memcpy(&n, p, sizeof(n));
48
0
    return be64toh(n);
49
0
}
50
51
0
static inline uint32_t ntoh32(const char* p) {
52
0
    uint32_t n;
53
0
    memcpy(&n, p, sizeof(n));
54
0
    return be32toh(n);
55
0
}
56
57
0
static inline uint16_t ntoh16(const char* p) {
58
0
    uint16_t n;
59
0
    memcpy(&n, p, sizeof(n));
60
0
    return be16toh(n);
61
0
}
62
63
static yrmcds_error text_recv(yrmcds* c, yrmcds_response* r);
64
65
0
yrmcds_error yrmcds_recv(yrmcds* c, yrmcds_response* r) {
66
0
    if( c == NULL || r == NULL )
67
0
        return YRMCDS_BAD_ARGUMENT;
68
0
    if( c->invalid )
69
0
        return YRMCDS_PROTOCOL_ERROR;
70
71
0
    if( c->last_size > 0 ) {
72
0
        size_t remain = c->used - c->last_size;
73
0
        if( remain > 0 )
74
0
            memmove(c->recvbuf, c->recvbuf + c->last_size, remain);
75
0
        c->used = remain;
76
0
        c->last_size = 0;
77
0
        free(c->decompressed);
78
0
        c->decompressed = NULL;
79
0
    }
80
81
0
    if( c->text_mode ) {
82
0
        return text_recv(c, r);
83
0
    }
84
85
0
    while( c->used < BINARY_HEADER_SIZE ) {
86
0
        yrmcds_error e = recv_data(c);
87
0
        if( e != 0 ) return e;
88
0
    }
89
90
0
    if( *c->recvbuf != '\x81' ) {
91
0
        c->invalid = 1;
92
0
        return YRMCDS_PROTOCOL_ERROR;
93
0
    }
94
0
    uint32_t total_len = ntoh32(c->recvbuf + 8);
95
0
    if( total_len > MAX_CAPACITY ) {
96
0
        c->invalid = 1;
97
0
        return YRMCDS_PROTOCOL_ERROR;
98
0
    }
99
0
    while( c->used < (BINARY_HEADER_SIZE + total_len) ) {
100
0
        yrmcds_error e = recv_data(c);
101
0
        if( e != 0 ) return e;
102
0
    }
103
104
0
    uint16_t key_len = ntoh16(c->recvbuf + 2);
105
0
    uint8_t extras_len = *(unsigned char*)(c->recvbuf + 4);
106
0
    if( total_len < (key_len + extras_len) ) {
107
0
        c->invalid = 1;
108
0
        return YRMCDS_PROTOCOL_ERROR;
109
0
    }
110
111
0
    const char* pkey = c->recvbuf + (BINARY_HEADER_SIZE + extras_len);
112
0
    r->length = BINARY_HEADER_SIZE + total_len;
113
0
    r->command = *(unsigned char*)(c->recvbuf + 1);
114
0
    r->key = key_len ? pkey : NULL;
115
0
    r->key_len = key_len;
116
0
    r->status = ntoh16(c->recvbuf + 6);
117
0
    memcpy(&(r->serial), c->recvbuf + 12, 4);
118
0
    r->cas_unique = ntoh64(c->recvbuf + 16);
119
0
    r->flags = 0;
120
0
    if( extras_len > 0 ) {
121
0
        if( extras_len != 4 ) {
122
0
            c->invalid = 1;
123
0
            return YRMCDS_PROTOCOL_ERROR;
124
0
        }
125
0
        r->flags = ntoh32(c->recvbuf + BINARY_HEADER_SIZE);
126
0
    }
127
128
0
    size_t data_len = total_len - key_len - extras_len;
129
0
    const char* pdata = pkey + key_len;
130
131
0
    if( (r->command == YRMCDS_CMD_INCREMENT ||
132
0
         r->command == YRMCDS_CMD_DECREMENT) &&
133
0
        (r->status == YRMCDS_STATUS_OK) ) {
134
0
        r->data = NULL;
135
0
        r->data_len = 0;
136
0
        if( data_len != 8 ) {
137
0
            c->invalid = 1;
138
0
            return YRMCDS_PROTOCOL_ERROR;
139
0
        }
140
0
        r->value = ntoh64(pdata);
141
0
        c->last_size = r->length;
142
0
        return YRMCDS_OK;
143
0
    }
144
0
    r->value = 0;
145
0
    r->data = data_len ? pdata : NULL;
146
0
    r->data_len = data_len;
147
148
#ifdef LIBYRMCDS_USE_LZ4
149
    if( c->compress_size && (r->flags & YRMCDS_FLAG_COMPRESS) ) {
150
        if( data_len == 0 ) {
151
            c->invalid = 1;
152
            return YRMCDS_PROTOCOL_ERROR;
153
        }
154
        r->flags &= ~(uint32_t)YRMCDS_FLAG_COMPRESS;
155
        uint32_t decompress_size = ntoh32(pdata);
156
        if( UINT32_MAX > INT_MAX ) {
157
            if( decompress_size > INT_MAX ) {
158
                c->invalid = 1;
159
                return YRMCDS_PROTOCOL_ERROR;
160
            }
161
        }
162
        c->decompressed = (char*)malloc(decompress_size);
163
        if( c->decompressed == NULL )
164
            return YRMCDS_OUT_OF_MEMORY;
165
        int d = LZ4_decompress_safe(pdata + sizeof(uint32_t),
166
                                    c->decompressed,
167
                                    (int)(data_len - sizeof(uint32_t)),
168
                                    (int)decompress_size);
169
        if( d != decompress_size ) {
170
            c->invalid = 1;
171
            return YRMCDS_PROTOCOL_ERROR;
172
        }
173
        r->data = c->decompressed;
174
        r->data_len = decompress_size;
175
    }
176
#endif // LIBYRMCDS_USE_LZ4
177
178
0
    c->last_size = r->length;
179
0
    return YRMCDS_OK;
180
0
}
181
182
183
// text protocol
184
#define PARSE_UINT(name)                        \
185
0
    uint64_t name = 0;                          \
186
0
    while( *p == ' ' ) p++;                     \
187
0
    while( '0' <= *p && *p <= '9' ) {           \
188
0
        name *= 10;                             \
189
0
        name += (uint64_t)(*p - '0');           \
190
0
        p++;                                    \
191
0
    }
192
193
0
static yrmcds_error text_recv(yrmcds* c, yrmcds_response* r) {
194
0
    char* pos;
195
0
    while( c->used == 0 ||
196
0
           (pos = (char*)memchr(c->recvbuf, '\n', c->used)) == NULL ) {
197
0
        yrmcds_error e = recv_data(c);
198
0
        if( e != 0 ) return e;
199
0
    }
200
    // make sure the buffer contains CRLF.
201
0
    if( (pos - c->recvbuf) < 2 || *(pos-1) != '\r' ) {
202
0
        c->invalid = 1;
203
0
        return YRMCDS_PROTOCOL_ERROR;
204
0
    }
205
0
    pos--;
206
0
    size_t resp_len = (size_t)(pos - c->recvbuf);
207
208
0
    memset(r, 0, sizeof(yrmcds_response));
209
0
    r->serial = ++c->rserial;
210
0
    r->length = resp_len + 2;
211
0
    r->status = YRMCDS_STATUS_OK;
212
0
    r->command = YRMCDS_CMD_BOTTOM;  // dummy for emulating binary protocol
213
214
0
    if( resp_len == 2 && memcmp(c->recvbuf, "OK", 2) == 0 ) {
215
        // successful response for flush_all
216
0
        goto FINISH;
217
0
    }
218
0
    if( resp_len == 3 && memcmp(c->recvbuf, "END", 3) == 0 ) {
219
        // get failed for non-existing object.
220
0
        r->status = YRMCDS_STATUS_NOTFOUND;
221
0
        goto FINISH;
222
0
    }
223
0
    if( resp_len == 5 && memcmp(c->recvbuf, "ERROR", 5) == 0 ) {
224
0
        r->status = YRMCDS_STATUS_UNKNOWNCOMMAND;
225
0
        goto FINISH;
226
0
    }
227
0
    if( resp_len == 6 ) {
228
0
        if( memcmp(c->recvbuf, "STORED", 6) == 0 ) {
229
            // successful response for storage commands.
230
0
            goto FINISH;
231
0
        }
232
0
        if( memcmp(c->recvbuf, "EXISTS", 6) == 0 ) {
233
            // failure response for cas.
234
0
            r->status = YRMCDS_STATUS_EXISTS;
235
0
            goto FINISH;
236
0
        }
237
0
    }
238
0
    if( resp_len == 7 ) {
239
0
        if( memcmp(c->recvbuf, "DELETED", 7) == 0 )
240
            // successful response for delete.
241
0
            goto FINISH;
242
0
        if( memcmp(c->recvbuf, "TOUCHED", 7) == 0 )
243
            // successful response for touch.
244
0
            goto FINISH;
245
0
    }
246
0
    if( resp_len == 9 && memcmp(c->recvbuf, "NOT_FOUND", 9) == 0 ) {
247
        // failure response for cas, delete, incr, decr, or touch.
248
0
        r->status = YRMCDS_STATUS_NOTFOUND;
249
0
        goto FINISH;
250
0
    }
251
0
    if( resp_len == 10 && memcmp(c->recvbuf, "NOT_STORED", 10) == 0 ) {
252
        // failure response for add, replace, append, or prepend.
253
0
        r->status = YRMCDS_STATUS_NOTSTORED;
254
0
        goto FINISH;
255
0
    }
256
0
    if( resp_len > 0 && '0' <= c->recvbuf[0] && c->recvbuf[0] <= '9' ) {
257
        // successful response for incr or decr.
258
0
        const char* p = c->recvbuf;
259
0
        PARSE_UINT(value);
260
0
        r->value = value;
261
0
        goto FINISH;
262
0
    }
263
0
    if( resp_len > 8 && memcmp(c->recvbuf, "VERSION ", 8) == 0 ) {
264
        // successful response for version.
265
0
        r->data_len = resp_len - 8;
266
0
        r->data = c->recvbuf + 8;
267
0
        goto FINISH;
268
0
    }
269
0
    if( resp_len > 6 && memcmp(c->recvbuf, "VALUE ", 6) == 0 ) {
270
        // successful response for gets.
271
0
        const char* p = c->recvbuf + 6;
272
0
        while( *p == ' ' ) p++;
273
0
        if( p == pos ) goto UNKNOWN;
274
275
0
        const char* key_end = memchr(p, ' ', (size_t)(pos - p));
276
0
        if( key_end == NULL ) goto UNKNOWN;
277
0
        r->key = p;
278
0
        r->key_len = (size_t)(key_end - p);
279
280
0
        p = key_end;
281
0
        PARSE_UINT(flags);
282
0
        if( *p != ' ' ) goto UNKNOWN;
283
0
        r->flags = (uint32_t)flags;
284
285
0
        PARSE_UINT(bytes);
286
0
        if( bytes > MAX_CAPACITY ) {
287
0
            c->invalid = 1;
288
0
            return YRMCDS_PROTOCOL_ERROR;
289
0
        }
290
0
        size_t data_len = (size_t)bytes;
291
292
0
        while( *p == ' ' ) p++;
293
0
        if( *p < '0' || '9' < *p ) goto UNKNOWN;
294
0
        PARSE_UINT(cas);
295
296
0
        size_t required = resp_len + 2 + data_len + 7; // CRLF "END" CRLF
297
0
        while( c->used < required ) {
298
0
            yrmcds_error e = recv_data(c);
299
0
            if( e != 0 ) return e;
300
0
        }
301
302
0
        const char* data = c->recvbuf + (resp_len + 2);
303
0
        if( memcmp(data + data_len, "\r\nEND\r\n", 7) != 0 ) {
304
0
            c->invalid = 1;
305
0
            return YRMCDS_PROTOCOL_ERROR;
306
0
        }
307
0
        r->length = required;
308
0
        r->flags = (uint32_t)flags;
309
310
#ifdef LIBYRMCDS_USE_LZ4
311
        if( c->compress_size && (r->flags & YRMCDS_FLAG_COMPRESS) ) {
312
            if( data_len == 0 ) {
313
                c->invalid = 1;
314
                return YRMCDS_PROTOCOL_ERROR;
315
            }
316
            r->flags &= ~(uint32_t)YRMCDS_FLAG_COMPRESS;
317
            uint32_t decompress_size = ntoh32(data);
318
            if( UINT32_MAX > INT_MAX ) {
319
                if( decompress_size > INT_MAX ) {
320
                    c->invalid = 1;
321
                    return YRMCDS_PROTOCOL_ERROR;
322
                }
323
            }
324
            c->decompressed = (char*)malloc(decompress_size);
325
            if( c->decompressed == NULL )
326
                return YRMCDS_OUT_OF_MEMORY;
327
            int d = LZ4_decompress_safe(data + sizeof(uint32_t),
328
                                        c->decompressed,
329
                                        (int)(data_len - sizeof(uint32_t)),
330
                                        (int)decompress_size);
331
            if( d != decompress_size ) {
332
                c->invalid = 1;
333
                return YRMCDS_PROTOCOL_ERROR;
334
            }
335
            data = c->decompressed;
336
            data_len = (size_t)decompress_size;
337
        }
338
#endif // LIBYRMCDS_USE_LZ4
339
0
        r->data = data;
340
0
        r->data_len = data_len;
341
0
        r->cas_unique = cas;
342
0
        goto FINISH;
343
0
    }
344
345
0
  UNKNOWN:
346
0
    r->status = YRMCDS_STATUS_OTHER;
347
0
    fprintf(stderr, "[libyrmcds] unknown response: %.*s\n",
348
0
            (int)resp_len, c->recvbuf);
349
350
0
  FINISH:
351
0
    c->last_size = r->length;
352
0
    return YRMCDS_OK;
353
0
}
354