Coverage Report

Created: 2025-07-12 06:32

/src/h2o/lib/common/redis.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2016 DeNA Co., Ltd., Ichito Nagata
3
 *
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
5
 * of this software and associated documentation files (the "Software"), to
6
 * deal in the Software without restriction, including without limitation the
7
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8
 * sell copies of the Software, and to permit persons to whom the Software is
9
 * furnished to do so, subject to the following conditions:
10
 *
11
 * The above copyright notice and this permission notice shall be included in
12
 * all copies or substantial portions of the Software.
13
 *
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20
 * IN THE SOFTWARE.
21
 */
22
#include <errno.h>
23
#include "h2o/redis.h"
24
#include "h2o/hiredis_.h"
25
#include "h2o/socket.h"
26
27
const char h2o_redis_error_connection[] = "Connection Error";
28
const char h2o_redis_error_protocol[] = "Protocol Error";
29
const char h2o_redis_error_connect_timeout[] = "Connection Timeout";
30
const char h2o_redis_error_command_timeout[] = "Command Timeout";
31
32
struct st_redis_socket_data_t {
33
    redisAsyncContext *context;
34
    const char *errstr;
35
    h2o_socket_t *socket;
36
};
37
38
static void attach_loop(redisAsyncContext *ac, h2o_loop_t *loop);
39
40
static void invoke_deferred(h2o_redis_client_t *client, uint64_t tick, h2o_timer_t *entry, h2o_timer_cb cb)
41
0
{
42
0
    entry->cb = cb;
43
0
    h2o_timer_link(client->loop, tick, entry);
44
0
}
45
46
static void close_and_detach_connection(h2o_redis_client_t *client, const char *errstr)
47
0
{
48
0
    assert(client->_redis != NULL);
49
0
    client->state = H2O_REDIS_CONNECTION_STATE_CLOSED;
50
0
    if (client->on_close != NULL)
51
0
        client->on_close(errstr);
52
53
0
    client->_redis->data = NULL;
54
0
    client->_redis = NULL;
55
0
    h2o_timer_unlink(&client->_timeout_entry);
56
0
}
57
58
static void disconnect(h2o_redis_client_t *client, const char *errstr)
59
0
{
60
0
    assert(client->state != H2O_REDIS_CONNECTION_STATE_CLOSED);
61
0
    assert(client->_redis != NULL);
62
63
0
    redisAsyncContext *redis = client->_redis;
64
0
    struct st_redis_socket_data_t *data = redis->ev.data;
65
0
    data->errstr = errstr;
66
0
    close_and_detach_connection(client, errstr);
67
0
    redisAsyncFree(redis); /* immediately call all callbacks of pending commands with nil replies */
68
0
}
69
70
static const char *get_error(const redisAsyncContext *redis)
71
0
{
72
0
    switch (redis->err) {
73
0
    case REDIS_OK:
74
0
        return NULL;
75
0
    case REDIS_ERR_IO:
76
        /* hiredis internally checks socket error and set errno */
77
0
        if (errno == ETIMEDOUT) {
78
0
            return h2o_redis_error_connect_timeout;
79
0
        } else {
80
0
            return h2o_redis_error_connection;
81
0
        }
82
0
    case REDIS_ERR_EOF:
83
0
        return h2o_redis_error_connection;
84
0
    case REDIS_ERR_PROTOCOL:
85
0
        return h2o_redis_error_protocol;
86
0
    case REDIS_ERR_OOM:
87
0
    case REDIS_ERR_OTHER:
88
0
        return redis->errstr;
89
0
    default:
90
0
        h2o_fatal("FIXME");
91
0
    }
92
0
}
93
94
static void on_connect(const redisAsyncContext *redis, int status)
95
0
{
96
0
    h2o_redis_client_t *client = (h2o_redis_client_t *)redis->data;
97
0
    if (client == NULL)
98
0
        return;
99
100
0
    if (status != REDIS_OK) {
101
0
        close_and_detach_connection(client, h2o_redis_error_connection);
102
0
        return;
103
0
    }
104
0
    h2o_timer_unlink(&client->_timeout_entry);
105
106
0
    client->state = H2O_REDIS_CONNECTION_STATE_CONNECTED;
107
0
    if (client->on_connect != NULL)
108
0
        client->on_connect();
109
0
}
110
111
static void on_disconnect(const redisAsyncContext *redis, int status)
112
0
{
113
0
    h2o_redis_client_t *client = (h2o_redis_client_t *)redis->data;
114
0
    if (client == NULL)
115
0
        return;
116
117
0
    close_and_detach_connection(client, get_error(redis));
118
0
}
119
120
static void on_connect_timeout(h2o_timer_t *entry)
121
0
{
122
0
    h2o_redis_client_t *client = H2O_STRUCT_FROM_MEMBER(h2o_redis_client_t, _timeout_entry, entry);
123
0
    assert((client->_redis->c.flags & REDIS_CONNECTED) == 0);
124
0
    assert(client->state != H2O_REDIS_CONNECTION_STATE_CLOSED);
125
126
0
    disconnect(client, h2o_redis_error_connect_timeout);
127
0
}
128
129
h2o_redis_client_t *h2o_redis_create_client(h2o_loop_t *loop, size_t sz)
130
0
{
131
0
    h2o_redis_client_t *client = h2o_mem_alloc(sz);
132
0
    memset(client, 0, sz);
133
134
0
    client->loop = loop;
135
0
    client->state = H2O_REDIS_CONNECTION_STATE_CLOSED;
136
0
    h2o_timer_init(&client->_timeout_entry, on_connect_timeout);
137
138
0
    return client;
139
0
}
140
141
void h2o_redis_connect(h2o_redis_client_t *client, const char *host, uint16_t port)
142
0
{
143
0
    if (client->state != H2O_REDIS_CONNECTION_STATE_CLOSED) {
144
0
        return;
145
0
    }
146
147
0
    redisAsyncContext *redis = redisAsyncConnect(host, port);
148
0
    if (redis == NULL) {
149
0
        h2o_fatal("no memory");
150
0
    }
151
152
0
    client->_redis = redis;
153
0
    client->_redis->data = client;
154
0
    client->state = H2O_REDIS_CONNECTION_STATE_CONNECTING;
155
156
0
    attach_loop(redis, client->loop);
157
0
    redisAsyncSetConnectCallback(redis, on_connect);
158
0
    redisAsyncSetDisconnectCallback(redis, on_disconnect);
159
160
0
    if (redis->err != REDIS_OK) {
161
        /* some connection failures can be detected at this time */
162
0
        disconnect(client, h2o_redis_error_connection);
163
0
        return;
164
0
    }
165
166
0
    if (client->connect_timeout != 0)
167
0
        h2o_timer_link(client->loop, client->connect_timeout, &client->_timeout_entry);
168
0
}
169
170
void h2o_redis_disconnect(h2o_redis_client_t *client)
171
0
{
172
0
    if (client->state != H2O_REDIS_CONNECTION_STATE_CLOSED)
173
0
        disconnect(client, NULL);
174
0
}
175
176
static void dispose_command(h2o_redis_command_t *command)
177
0
{
178
0
    h2o_timer_unlink(&command->_command_timeout);
179
0
    free(command);
180
0
}
181
182
static void handle_reply(h2o_redis_command_t *command, redisReply *reply, const char *errstr)
183
0
{
184
0
    if (command->cb != NULL)
185
0
        command->cb(reply, command->data, errstr);
186
187
0
    switch (command->type) {
188
0
    case H2O_REDIS_COMMAND_TYPE_SUBSCRIBE:
189
0
    case H2O_REDIS_COMMAND_TYPE_PSUBSCRIBE:
190
0
        if (reply != NULL && reply->type == REDIS_REPLY_ARRAY) {
191
0
            char *unsub = command->type == H2O_REDIS_COMMAND_TYPE_SUBSCRIBE ? "unsubscribe" : "punsubscribe";
192
0
            if (strncasecmp(reply->element[0]->str, unsub, reply->element[0]->len) == 0) {
193
0
                dispose_command(command);
194
0
            } else {
195
                /* (p)subscribe commands doesn't get freed until (p)unsubscribe or disconnect */
196
0
            }
197
0
        } else {
198
0
            dispose_command(command);
199
0
        }
200
0
        break;
201
0
    default:
202
0
        dispose_command(command);
203
0
    }
204
0
}
205
206
static void on_command(redisAsyncContext *redis, void *_reply, void *privdata)
207
0
{
208
0
    redisReply *reply = (redisReply *)_reply;
209
0
    h2o_redis_command_t *command = (h2o_redis_command_t *)privdata;
210
0
    struct st_redis_socket_data_t *sockdata = redis->ev.data;
211
0
    const char *errstr = NULL;
212
213
0
    if (sockdata != NULL)
214
0
        errstr = sockdata->errstr;
215
0
    if (errstr == NULL)
216
0
        errstr = get_error(redis);
217
0
    handle_reply(command, reply, errstr);
218
0
}
219
220
static void on_command_timeout(h2o_timer_t *entry)
221
0
{
222
0
    h2o_redis_command_t *command = H2O_STRUCT_FROM_MEMBER(h2o_redis_command_t, _command_timeout, entry);
223
224
    /* invoke disconnect to finalize inflight commands */
225
0
    disconnect(command->client, h2o_redis_error_command_timeout);
226
0
}
227
228
static h2o_redis_command_t *create_command(h2o_redis_client_t *client, h2o_redis_command_cb cb, void *cb_data,
229
                                           h2o_redis_command_type_t type)
230
0
{
231
0
    h2o_redis_command_t *command = h2o_mem_alloc(sizeof(h2o_redis_command_t));
232
0
    *command = (h2o_redis_command_t){NULL};
233
0
    command->client = client;
234
0
    command->cb = cb;
235
0
    command->data = cb_data;
236
0
    command->type = type;
237
0
    h2o_timer_init(&command->_command_timeout, on_command_timeout);
238
239
0
    if (client->command_timeout != 0 && (type == H2O_REDIS_COMMAND_TYPE_NORMAL || type == H2O_REDIS_COMMAND_TYPE_UNSUBSCRIBE ||
240
0
                                         type == H2O_REDIS_COMMAND_TYPE_PUNSUBSCRIBE))
241
0
        h2o_timer_link(client->loop, client->command_timeout, &command->_command_timeout);
242
243
0
    return command;
244
0
}
245
246
static void send_command(h2o_redis_client_t *client, h2o_redis_command_t *command, const char *cmd, size_t len)
247
0
{
248
0
    if (cmd == NULL) {
249
0
        handle_reply(command, NULL, "Failed to create command");
250
0
        return;
251
0
    }
252
253
0
    if (client->state == H2O_REDIS_CONNECTION_STATE_CLOSED) {
254
0
        handle_reply(command, NULL, h2o_redis_error_connection);
255
0
        return;
256
0
    }
257
258
0
    if (command->type == H2O_REDIS_COMMAND_TYPE_MONITOR) {
259
        /* monitor command implementation in hiredis asynchronous API is absolutely dangerous, so don't use it! */
260
0
        handle_reply(command, NULL, "Unsupported command");
261
0
        return;
262
0
    }
263
264
0
    int ret = redisAsyncFormattedCommand(client->_redis, on_command, command, cmd, len);
265
0
    if (ret != REDIS_OK) {
266
0
        handle_reply(command, NULL, "Failed to send command");
267
0
    }
268
0
}
269
270
/*
271
  hiredis doesn't expose any information about the command, so parse here.
272
  this function assumes that formatted is NULL-terminated
273
 */
274
static h2o_redis_command_type_t detect_command_type(const char *formatted)
275
0
{
276
0
#define CHECK(c)                                                                                                                   \
277
0
    if (c == NULL)                                                                                                                 \
278
0
    return H2O_REDIS_COMMAND_TYPE_ERROR
279
280
0
    char *p = (char *)formatted;
281
0
    CHECK(p);
282
283
0
    assert(p[0] == '*');
284
285
0
    p = strchr(p, '$');
286
0
    CHECK(p);
287
0
    p = strchr(p, '\n');
288
0
    CHECK(p);
289
0
    ++p;
290
0
    CHECK(p);
291
292
0
#define MATCH(c, target) strncasecmp(c, target, sizeof(target) - 1) == 0
293
0
    if (MATCH(p, "subscribe\r\n"))
294
0
        return H2O_REDIS_COMMAND_TYPE_SUBSCRIBE;
295
0
    if (MATCH(p, "unsubscribe\r\n"))
296
0
        return H2O_REDIS_COMMAND_TYPE_UNSUBSCRIBE;
297
0
    if (MATCH(p, "psubscribe\r\n"))
298
0
        return H2O_REDIS_COMMAND_TYPE_PSUBSCRIBE;
299
0
    if (MATCH(p, "punsubscribe\r\n"))
300
0
        return H2O_REDIS_COMMAND_TYPE_PUNSUBSCRIBE;
301
0
    if (MATCH(p, "monitor\r\n"))
302
0
        return H2O_REDIS_COMMAND_TYPE_MONITOR;
303
0
#undef MATCH
304
0
    return H2O_REDIS_COMMAND_TYPE_NORMAL;
305
0
#undef CHECK
306
0
}
307
308
h2o_redis_command_t *h2o_redis_command(h2o_redis_client_t *client, h2o_redis_command_cb cb, void *cb_data, const char *format, ...)
309
0
{
310
0
    char *cmd;
311
0
    int len;
312
0
    va_list ap;
313
0
    va_start(ap, format);
314
0
    len = redisvFormatCommand(&cmd, format, ap);
315
0
    va_end(ap);
316
0
    if (len <= 0) {
317
0
        cmd = NULL;
318
0
        len = 0;
319
0
    }
320
321
0
    h2o_redis_command_t *command = create_command(client, cb, cb_data, detect_command_type(cmd));
322
0
    send_command(client, command, cmd, len);
323
0
    free(cmd);
324
0
    return command;
325
0
}
326
327
h2o_redis_command_t *h2o_redis_command_argv(h2o_redis_client_t *client, h2o_redis_command_cb cb, void *cb_data, int argc,
328
                                            const char **argv, const size_t *argvlen)
329
0
{
330
0
    sds sdscmd;
331
0
    int len;
332
0
    len = redisFormatSdsCommandArgv(&sdscmd, argc, argv, argvlen);
333
0
    if (len < 0) {
334
0
        sdscmd = NULL;
335
0
        len = 0;
336
0
    }
337
338
0
    h2o_redis_command_t *command = create_command(client, cb, cb_data, detect_command_type(sdscmd));
339
0
    send_command(client, command, sdscmd, len);
340
0
    sdsfree(sdscmd);
341
0
    return command;
342
0
}
343
344
void h2o_redis_free(h2o_redis_client_t *client)
345
0
{
346
0
    if (client->state != H2O_REDIS_CONNECTION_STATE_CLOSED)
347
0
        disconnect(client, NULL);
348
0
    h2o_timer_unlink(&client->_timeout_entry);
349
0
    free(client);
350
0
}
351
352
/* redis socket adapter */
353
354
static void on_read(h2o_socket_t *sock, const char *err)
355
0
{
356
0
    struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)sock->data;
357
0
    redisAsyncHandleRead(p->context);
358
0
}
359
360
static void on_write(h2o_socket_t *sock, const char *err)
361
0
{
362
0
    struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)sock->data;
363
0
    redisAsyncHandleWrite(p->context);
364
0
}
365
366
static void socket_add_read(void *privdata)
367
0
{
368
0
    struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)privdata;
369
0
    h2o_socket_read_start(p->socket, on_read);
370
0
}
371
372
static void socket_del_read(void *privdata)
373
0
{
374
0
    struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)privdata;
375
0
    h2o_socket_read_stop(p->socket);
376
0
}
377
378
static void socket_add_write(void *privdata)
379
0
{
380
0
    struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)privdata;
381
0
    if (!h2o_socket_is_writing(p->socket)) {
382
0
        h2o_socket_notify_write(p->socket, on_write);
383
0
    }
384
0
}
385
386
static void socket_cleanup(void *privdata)
387
0
{
388
0
    struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)privdata;
389
0
    h2o_socket_close(p->socket);
390
0
    p->context->c.fd = -1;      /* prevent hiredis from closing fd twice */
391
0
    p->context->ev.data = NULL; /* remove reference to `st_redis_socket_data_t` now that it is being freed */
392
0
    free(p);
393
0
}
394
395
static void attach_loop(redisAsyncContext *ac, h2o_loop_t *loop)
396
0
{
397
0
    redisContext *c = &(ac->c);
398
399
0
    struct st_redis_socket_data_t *p = h2o_mem_alloc(sizeof(*p));
400
0
    *p = (struct st_redis_socket_data_t){NULL};
401
402
0
    ac->ev.addRead = socket_add_read;
403
0
    ac->ev.delRead = socket_del_read;
404
0
    ac->ev.addWrite = socket_add_write;
405
0
    ac->ev.cleanup = socket_cleanup;
406
0
    ac->ev.data = p;
407
408
#if H2O_USE_LIBUV
409
    p->socket = h2o_uv__poll_create(loop, c->fd, (uv_close_cb)free);
410
#else
411
0
    p->socket = h2o_evloop_socket_create(loop, c->fd, H2O_SOCKET_FLAG_DONT_READ);
412
0
#endif
413
414
0
    p->socket->data = p;
415
0
    p->context = ac;
416
0
}