Coverage Report

Created: 2025-07-11 06:26

/src/h2o/lib/common/memcached.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2015 DeNA Co., Ltd., Kazuho Oku
3
 *
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
5
 * of this software and associated documentation files (the "Software"), to
6
 * deal in the Software without restriction, including without limitation the
7
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8
 * sell copies of the Software, and to permit persons to whom the Software is
9
 * furnished to do so, subject to the following conditions:
10
 *
11
 * The above copyright notice and this permission notice shall be included in
12
 * all copies or substantial portions of the Software.
13
 *
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20
 * IN THE SOFTWARE.
21
 */
22
#include <errno.h>
23
#include <inttypes.h>
24
#include <unistd.h>
25
#include "yrmcds.h"
26
#include "h2o/linklist.h"
27
#include "h2o/memcached.h"
28
#include "h2o/rand.h"
29
#include "h2o/string_.h"
30
31
struct st_h2o_memcached_context_t {
32
    pthread_mutex_t mutex;
33
    pthread_cond_t cond;
34
    h2o_linklist_t pending;
35
    size_t num_threads_connected;
36
    char *host;
37
    uint16_t port;
38
    int text_protocol;
39
    h2o_iovec_t prefix;
40
};
41
42
struct st_h2o_memcached_conn_t {
43
    h2o_memcached_context_t *ctx;
44
    yrmcds yrmcds;
45
    pthread_mutex_t mutex;
46
    h2o_linklist_t inflight;
47
    int writer_exit_requested;
48
};
49
50
enum en_h2o_memcached_req_type_t { REQ_TYPE_GET, REQ_TYPE_SET, REQ_TYPE_DELETE };
51
52
struct st_h2o_memcached_req_t {
53
    enum en_h2o_memcached_req_type_t type;
54
    h2o_linklist_t pending;
55
    h2o_linklist_t inflight;
56
    union {
57
        struct {
58
            h2o_multithread_receiver_t *receiver;
59
            h2o_multithread_message_t message;
60
            h2o_memcached_get_cb cb;
61
            void *cb_data;
62
            int value_is_encoded;
63
            h2o_iovec_t value;
64
            uint32_t serial;
65
        } get;
66
        struct {
67
            h2o_iovec_t value;
68
            uint32_t expiration;
69
        } set;
70
    } data;
71
    struct {
72
        size_t len;
73
        char base[1];
74
    } key;
75
};
76
77
static h2o_memcached_req_t *create_req(h2o_memcached_context_t *ctx, enum en_h2o_memcached_req_type_t type, h2o_iovec_t key,
78
                                       int encode_key)
79
0
{
80
0
    h2o_memcached_req_t *req = h2o_mem_alloc(offsetof(h2o_memcached_req_t, key.base) + ctx->prefix.len +
81
0
                                             (encode_key ? (key.len + 2) / 3 * 4 + 1 : key.len));
82
0
    req->type = type;
83
0
    req->pending = (h2o_linklist_t){NULL};
84
0
    req->inflight = (h2o_linklist_t){NULL};
85
0
    memset(&req->data, 0, sizeof(req->data));
86
0
    if (ctx->prefix.len != 0)
87
0
        memcpy(req->key.base, ctx->prefix.base, ctx->prefix.len);
88
0
    req->key.len = ctx->prefix.len;
89
0
    if (encode_key) {
90
0
        req->key.len += h2o_base64_encode(req->key.base + req->key.len, key.base, key.len, 1);
91
0
    } else {
92
0
        memcpy(req->key.base + req->key.len, key.base, key.len);
93
0
        req->key.len += key.len;
94
0
    }
95
0
    return req;
96
0
}
97
98
static void free_req(h2o_memcached_req_t *req)
99
0
{
100
0
    assert(!h2o_linklist_is_linked(&req->pending));
101
0
    switch (req->type) {
102
0
    case REQ_TYPE_GET:
103
0
        assert(!h2o_linklist_is_linked(&req->data.get.message.link));
104
0
        h2o_mem_set_secure(req->data.get.value.base, 0, req->data.get.value.len);
105
0
        free(req->data.get.value.base);
106
0
        break;
107
0
    case REQ_TYPE_SET:
108
0
        h2o_mem_set_secure(req->data.set.value.base, 0, req->data.set.value.len);
109
0
        free(req->data.set.value.base);
110
0
        break;
111
0
    case REQ_TYPE_DELETE:
112
0
        break;
113
0
    default:
114
0
        assert(!"FIXME");
115
0
        break;
116
0
    }
117
0
    free(req);
118
0
}
119
120
static void discard_req(h2o_memcached_req_t *req)
121
0
{
122
0
    switch (req->type) {
123
0
    case REQ_TYPE_GET:
124
0
        h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message);
125
0
        break;
126
0
    default:
127
0
        free_req(req);
128
0
        break;
129
0
    }
130
0
}
131
132
static h2o_memcached_req_t *pop_inflight(struct st_h2o_memcached_conn_t *conn, uint32_t serial)
133
0
{
134
0
    h2o_memcached_req_t *req;
135
136
0
    pthread_mutex_lock(&conn->mutex);
137
138
0
    if (conn->yrmcds.text_mode) {
139
        /* in text mode, responses are returned in order (and we may receive responses for commands other than GET) */
140
0
        if (!h2o_linklist_is_empty(&conn->inflight)) {
141
0
            req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, conn->inflight.next);
142
0
            assert(req->type == REQ_TYPE_GET);
143
0
            if (req->data.get.serial == serial)
144
0
                goto Found;
145
0
        }
146
0
    } else {
147
        /* in binary mode, responses are received out-of-order (and we would only recieve responses for GET) */
148
0
        h2o_linklist_t *node;
149
0
        for (node = conn->inflight.next; node != &conn->inflight; node = node->next) {
150
0
            req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, node);
151
0
            assert(req->type == REQ_TYPE_GET);
152
0
            if (req->data.get.serial == serial)
153
0
                goto Found;
154
0
        }
155
0
    }
156
157
    /* not found */
158
0
    pthread_mutex_unlock(&conn->mutex);
159
0
    return NULL;
160
161
0
Found:
162
0
    h2o_linklist_unlink(&req->inflight);
163
0
    pthread_mutex_unlock(&conn->mutex);
164
0
    return req;
165
0
}
166
167
static void *writer_main(void *_conn)
168
0
{
169
0
    struct st_h2o_memcached_conn_t *conn = _conn;
170
0
    yrmcds_error err;
171
172
0
    pthread_mutex_lock(&conn->ctx->mutex);
173
174
0
    while (!__sync_add_and_fetch(&conn->writer_exit_requested, 0)) {
175
0
        while (!h2o_linklist_is_empty(&conn->ctx->pending)) {
176
0
            h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, pending, conn->ctx->pending.next);
177
0
            h2o_linklist_unlink(&req->pending);
178
0
            pthread_mutex_unlock(&conn->ctx->mutex);
179
180
0
            switch (req->type) {
181
0
            case REQ_TYPE_GET:
182
0
                pthread_mutex_lock(&conn->mutex);
183
0
                h2o_linklist_insert(&conn->inflight, &req->inflight);
184
0
                pthread_mutex_unlock(&conn->mutex);
185
0
                if ((err = yrmcds_get(&conn->yrmcds, req->key.base, req->key.len, 0, &req->data.get.serial)) != YRMCDS_OK)
186
0
                    goto Error;
187
0
                break;
188
0
            case REQ_TYPE_SET:
189
0
                err = yrmcds_set(&conn->yrmcds, req->key.base, req->key.len, req->data.set.value.base, req->data.set.value.len, 0,
190
0
                                 req->data.set.expiration, 0, !conn->yrmcds.text_mode, NULL);
191
0
                discard_req(req);
192
0
                if (err != YRMCDS_OK)
193
0
                    goto Error;
194
0
                break;
195
0
            case REQ_TYPE_DELETE:
196
0
                err = yrmcds_remove(&conn->yrmcds, req->key.base, req->key.len, !conn->yrmcds.text_mode, NULL);
197
0
                discard_req(req);
198
0
                if (err != YRMCDS_OK)
199
0
                    goto Error;
200
0
                break;
201
0
            default:
202
0
                h2o_error_printf("[lib/common/memcached.c] unknown type:%d\n", (int)req->type);
203
0
                err = YRMCDS_NOT_IMPLEMENTED;
204
0
                goto Error;
205
0
            }
206
207
0
            pthread_mutex_lock(&conn->ctx->mutex);
208
0
        }
209
0
        pthread_cond_wait(&conn->ctx->cond, &conn->ctx->mutex);
210
0
    }
211
212
0
    pthread_mutex_unlock(&conn->ctx->mutex);
213
0
    return NULL;
214
215
0
Error:
216
0
    h2o_error_printf("[lib/common/memcached.c] failed to send request; %s\n", yrmcds_strerror(err));
217
    /* doc says the call can be used to interrupt yrmcds_recv */
218
0
    yrmcds_shutdown(&conn->yrmcds);
219
220
0
    return NULL;
221
0
}
222
223
static void connect_to_server(h2o_memcached_context_t *ctx, yrmcds *yrmcds)
224
0
{
225
0
    size_t failcnt;
226
0
    yrmcds_error err;
227
228
0
    for (failcnt = 0; (err = yrmcds_connect(yrmcds, ctx->host, ctx->port)) != YRMCDS_OK; ++failcnt) {
229
0
        if (failcnt == 0) {
230
0
            h2o_error_printf("[lib/common/memcached.c] failed to connect to memcached at %s:%" PRIu16 ", %s\n", ctx->host,
231
0
                             ctx->port, yrmcds_strerror(err));
232
0
        }
233
0
        ++failcnt;
234
0
        usleep(2000000 + h2o_rand() % 3000000); /* sleep 2 to 5 seconds */
235
0
    }
236
    /* connected */
237
0
    if (ctx->text_protocol)
238
0
        yrmcds_text_mode(yrmcds);
239
0
    h2o_error_printf("[lib/common/memcached.c] connected to memcached at %s:%" PRIu16 "\n", ctx->host, ctx->port);
240
0
}
241
242
static void reader_main(h2o_memcached_context_t *ctx)
243
0
{
244
0
    struct st_h2o_memcached_conn_t conn = {ctx, {0}, PTHREAD_MUTEX_INITIALIZER, {&conn.inflight, &conn.inflight}, 0};
245
0
    pthread_t writer_thread;
246
0
    yrmcds_response resp;
247
0
    yrmcds_error err;
248
0
    int ret;
249
250
    /* connect to server and start the writer thread */
251
0
    connect_to_server(conn.ctx, &conn.yrmcds);
252
0
    if ((ret = pthread_create(&writer_thread, NULL, writer_main, &conn)) != 0) {
253
0
        char buf[128];
254
0
        h2o_fatal("pthread_create: %s", h2o_strerror_r(ret, buf, sizeof(buf)));
255
0
    }
256
257
0
    pthread_mutex_lock(&conn.ctx->mutex);
258
0
    ++conn.ctx->num_threads_connected;
259
0
    pthread_mutex_unlock(&conn.ctx->mutex);
260
261
    /* receive data until an error occurs */
262
0
    while (1) {
263
0
        if ((err = yrmcds_recv(&conn.yrmcds, &resp)) != YRMCDS_OK) {
264
0
            h2o_error_printf("[lib/common/memcached.c] yrmcds_recv:%s\n", yrmcds_strerror(err));
265
0
            break;
266
0
        }
267
0
        h2o_memcached_req_t *req = pop_inflight(&conn, resp.serial);
268
0
        if (req == NULL) {
269
0
            if (conn.yrmcds.text_mode)
270
0
                continue;
271
0
            h2o_error_printf("[lib/common/memcached.c] received unexpected serial\n");
272
0
            break;
273
0
        }
274
0
        if (resp.status == YRMCDS_STATUS_OK) {
275
0
            req->data.get.value = h2o_iovec_init(h2o_mem_alloc(resp.data_len), resp.data_len);
276
0
            memcpy(req->data.get.value.base, resp.data, resp.data_len);
277
0
            h2o_mem_set_secure((void *)resp.data, 0, resp.data_len);
278
0
        }
279
0
        h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message);
280
0
    }
281
282
    /* send error to all the reqs in-flight */
283
0
    pthread_mutex_lock(&conn.mutex);
284
0
    while (!h2o_linklist_is_empty(&conn.inflight)) {
285
0
        h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, inflight, conn.inflight.next);
286
0
        h2o_linklist_unlink(&req->inflight);
287
0
        assert(req->type == REQ_TYPE_GET);
288
0
        h2o_multithread_send_message(req->data.get.receiver, &req->data.get.message);
289
0
    }
290
0
    pthread_mutex_unlock(&conn.mutex);
291
292
    /* stop the writer thread */
293
0
    __sync_add_and_fetch(&conn.writer_exit_requested, 1);
294
0
    pthread_mutex_lock(&conn.ctx->mutex);
295
0
    pthread_cond_broadcast(&conn.ctx->cond);
296
0
    pthread_mutex_unlock(&conn.ctx->mutex);
297
0
    pthread_join(writer_thread, NULL);
298
299
    /* decrement num_threads_connected, and discard all the pending requests if no connections are alive */
300
0
    pthread_mutex_lock(&conn.ctx->mutex);
301
0
    if (--conn.ctx->num_threads_connected == 0) {
302
0
        while (!h2o_linklist_is_empty(&conn.ctx->pending)) {
303
0
            h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, pending, conn.ctx->pending.next);
304
0
            h2o_linklist_unlink(&req->pending);
305
0
            discard_req(req);
306
0
        }
307
0
    }
308
0
    pthread_mutex_unlock(&conn.ctx->mutex);
309
310
    /* close the connection */
311
0
    yrmcds_close(&conn.yrmcds);
312
0
}
313
314
static void *thread_main(void *_ctx)
315
0
{
316
0
    h2o_memcached_context_t *ctx = _ctx;
317
318
0
    while (1)
319
0
        reader_main(ctx);
320
0
    return NULL;
321
0
}
322
323
static void dispatch(h2o_memcached_context_t *ctx, h2o_memcached_req_t *req)
324
0
{
325
0
    pthread_mutex_lock(&ctx->mutex);
326
327
0
    if (ctx->num_threads_connected != 0) {
328
0
        h2o_linklist_insert(&ctx->pending, &req->pending);
329
0
        pthread_cond_signal(&ctx->cond);
330
0
    } else {
331
0
        discard_req(req);
332
0
    }
333
334
0
    pthread_mutex_unlock(&ctx->mutex);
335
0
}
336
337
void h2o_memcached_receiver(h2o_multithread_receiver_t *receiver, h2o_linklist_t *messages)
338
0
{
339
0
    while (!h2o_linklist_is_empty(messages)) {
340
0
        h2o_memcached_req_t *req = H2O_STRUCT_FROM_MEMBER(h2o_memcached_req_t, data.get.message.link, messages->next);
341
0
        h2o_linklist_unlink(&req->data.get.message.link);
342
0
        assert(req->type == REQ_TYPE_GET);
343
0
        if (req->data.get.cb != NULL) {
344
0
            if (req->data.get.value_is_encoded && req->data.get.value.len != 0) {
345
0
                h2o_iovec_t decoded = h2o_decode_base64url(NULL, req->data.get.value.base, req->data.get.value.len);
346
0
                h2o_mem_set_secure(req->data.get.value.base, 0, req->data.get.value.len);
347
0
                free(req->data.get.value.base);
348
0
                req->data.get.value = decoded;
349
0
            }
350
0
            req->data.get.cb(req->data.get.value, req->data.get.cb_data);
351
0
        }
352
0
        free_req(req);
353
0
    }
354
0
}
355
356
h2o_memcached_req_t *h2o_memcached_get(h2o_memcached_context_t *ctx, h2o_multithread_receiver_t *receiver, h2o_iovec_t key,
357
                                       h2o_memcached_get_cb cb, void *cb_data, int flags)
358
0
{
359
0
    h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_GET, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0);
360
0
    req->data.get.receiver = receiver;
361
0
    req->data.get.cb = cb;
362
0
    req->data.get.cb_data = cb_data;
363
0
    req->data.get.value_is_encoded = (flags & H2O_MEMCACHED_ENCODE_VALUE) != 0;
364
0
    dispatch(ctx, req);
365
0
    return req;
366
0
}
367
368
void h2o_memcached_cancel_get(h2o_memcached_context_t *ctx, h2o_memcached_req_t *req)
369
0
{
370
0
    int do_free = 0;
371
372
0
    pthread_mutex_lock(&ctx->mutex);
373
0
    req->data.get.cb = NULL;
374
0
    if (h2o_linklist_is_linked(&req->pending)) {
375
0
        h2o_linklist_unlink(&req->pending);
376
0
        do_free = 1;
377
0
    }
378
0
    pthread_mutex_unlock(&ctx->mutex);
379
380
0
    if (do_free)
381
0
        free_req(req);
382
0
}
383
384
void h2o_memcached_set(h2o_memcached_context_t *ctx, h2o_iovec_t key, h2o_iovec_t value, uint32_t expiration, int flags)
385
0
{
386
0
    h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_SET, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0);
387
0
    if ((flags & H2O_MEMCACHED_ENCODE_VALUE) != 0) {
388
0
        req->data.set.value.base = h2o_mem_alloc((value.len + 2) / 3 * 4 + 1);
389
0
        req->data.set.value.len = h2o_base64_encode(req->data.set.value.base, value.base, value.len, 1);
390
0
    } else {
391
0
        req->data.set.value = h2o_iovec_init(h2o_mem_alloc(value.len), value.len);
392
0
        memcpy(req->data.set.value.base, value.base, value.len);
393
0
    }
394
0
    req->data.set.expiration = expiration;
395
0
    dispatch(ctx, req);
396
0
}
397
398
void h2o_memcached_delete(h2o_memcached_context_t *ctx, h2o_iovec_t key, int flags)
399
0
{
400
0
    h2o_memcached_req_t *req = create_req(ctx, REQ_TYPE_DELETE, key, (flags & H2O_MEMCACHED_ENCODE_KEY) != 0);
401
0
    dispatch(ctx, req);
402
0
}
403
404
h2o_memcached_context_t *h2o_memcached_create_context(const char *host, uint16_t port, int text_protocol, size_t num_threads,
405
                                                      const char *prefix)
406
0
{
407
0
    h2o_memcached_context_t *ctx = h2o_mem_alloc(sizeof(*ctx));
408
409
0
    pthread_mutex_init(&ctx->mutex, NULL);
410
0
    pthread_cond_init(&ctx->cond, NULL);
411
0
    h2o_linklist_init_anchor(&ctx->pending);
412
0
    ctx->num_threads_connected = 0;
413
0
    ctx->host = h2o_strdup(NULL, host, SIZE_MAX).base;
414
0
    ctx->port = port;
415
0
    ctx->text_protocol = text_protocol;
416
0
    ctx->prefix = h2o_strdup(NULL, prefix, SIZE_MAX);
417
418
0
    { /* start the threads */
419
0
        pthread_t tid;
420
0
        pthread_attr_t attr;
421
0
        size_t i;
422
0
        pthread_attr_init(&attr);
423
0
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
424
0
        for (i = 0; i != num_threads; ++i)
425
0
            h2o_multithread_create_thread(&tid, &attr, thread_main, ctx);
426
0
        pthread_attr_destroy(&attr);
427
0
    }
428
429
0
    return ctx;
430
0
}