Coverage Report

Created: 2024-02-25 06:15

/src/h2o/lib/common/redis.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2016 DeNA Co., Ltd., Ichito Nagata
3
 *
4
 * Permission is hereby granted, free of charge, to any person obtaining a copy
5
 * of this software and associated documentation files (the "Software"), to
6
 * deal in the Software without restriction, including without limitation the
7
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
8
 * sell copies of the Software, and to permit persons to whom the Software is
9
 * furnished to do so, subject to the following conditions:
10
 *
11
 * The above copyright notice and this permission notice shall be included in
12
 * all copies or substantial portions of the Software.
13
 *
14
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
19
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20
 * IN THE SOFTWARE.
21
 */
22
#include <errno.h>
23
#include "h2o/redis.h"
24
#include "h2o/hiredis_.h"
25
#include "h2o/socket.h"
26
27
const char h2o_redis_error_connection[] = "Connection Error";
28
const char h2o_redis_error_protocol[] = "Protocol Error";
29
const char h2o_redis_error_connect_timeout[] = "Connection Timeout";
30
const char h2o_redis_error_command_timeout[] = "Command Timeout";
31
32
struct st_redis_socket_data_t {
33
    redisAsyncContext *context;
34
    const char *errstr;
35
    h2o_socket_t *socket;
36
};
37
38
static void attach_loop(redisAsyncContext *ac, h2o_loop_t *loop);
39
40
static void invoke_deferred(h2o_redis_client_t *client, uint64_t tick, h2o_timer_t *entry, h2o_timer_cb cb)
41
0
{
42
0
    entry->cb = cb;
43
0
    h2o_timer_link(client->loop, tick, entry);
44
0
}
45
46
static void close_and_detach_connection(h2o_redis_client_t *client, const char *errstr)
47
0
{
48
0
    assert(client->_redis != NULL);
49
0
    client->state = H2O_REDIS_CONNECTION_STATE_CLOSED;
50
0
    if (client->on_close != NULL)
51
0
        client->on_close(errstr);
52
53
0
    client->_redis->data = NULL;
54
0
    client->_redis = NULL;
55
0
    h2o_timer_unlink(&client->_timeout_entry);
56
0
}
57
58
static void disconnect(h2o_redis_client_t *client, const char *errstr)
59
0
{
60
0
    assert(client->state != H2O_REDIS_CONNECTION_STATE_CLOSED);
61
0
    assert(client->_redis != NULL);
62
63
0
    redisAsyncContext *redis = client->_redis;
64
0
    struct st_redis_socket_data_t *data = redis->ev.data;
65
0
    data->errstr = errstr;
66
0
    close_and_detach_connection(client, errstr);
67
0
    redisAsyncFree(redis); /* immediately call all callbacks of pending commands with nil replies */
68
0
}
69
70
static const char *get_error(const redisAsyncContext *redis)
71
0
{
72
0
    switch (redis->err) {
73
0
    case REDIS_OK:
74
0
        return NULL;
75
0
    case REDIS_ERR_IO:
76
        /* hiredis internally checks socket error and set errno */
77
0
        if (errno == ETIMEDOUT) {
78
0
            return h2o_redis_error_connect_timeout;
79
0
        } else {
80
0
            return h2o_redis_error_connection;
81
0
        }
82
0
    case REDIS_ERR_EOF:
83
0
        return h2o_redis_error_connection;
84
0
    case REDIS_ERR_PROTOCOL:
85
0
        return h2o_redis_error_protocol;
86
0
    case REDIS_ERR_OOM:
87
0
    case REDIS_ERR_OTHER:
88
0
        return redis->errstr;
89
0
    default:
90
0
        h2o_fatal("FIXME");
91
0
    }
92
0
}
93
94
static void on_connect(const redisAsyncContext *redis, int status)
95
0
{
96
0
    h2o_redis_client_t *client = (h2o_redis_client_t *)redis->data;
97
0
    if (client == NULL)
98
0
        return;
99
100
0
    if (status != REDIS_OK) {
101
0
        close_and_detach_connection(client, h2o_redis_error_connection);
102
0
        return;
103
0
    }
104
0
    h2o_timer_unlink(&client->_timeout_entry);
105
106
0
    client->state = H2O_REDIS_CONNECTION_STATE_CONNECTED;
107
0
    if (client->on_connect != NULL)
108
0
        client->on_connect();
109
0
}
110
111
static void on_disconnect(const redisAsyncContext *redis, int status)
112
0
{
113
0
    h2o_redis_client_t *client = (h2o_redis_client_t *)redis->data;
114
0
    if (client == NULL)
115
0
        return;
116
117
0
    close_and_detach_connection(client, get_error(redis));
118
0
}
119
120
static void on_connect_timeout(h2o_timer_t *entry)
121
0
{
122
0
    h2o_redis_client_t *client = H2O_STRUCT_FROM_MEMBER(h2o_redis_client_t, _timeout_entry, entry);
123
0
    assert((client->_redis->c.flags & REDIS_CONNECTED) == 0);
124
0
    assert(client->state != H2O_REDIS_CONNECTION_STATE_CLOSED);
125
126
0
    disconnect(client, h2o_redis_error_connect_timeout);
127
0
}
128
129
h2o_redis_client_t *h2o_redis_create_client(h2o_loop_t *loop, size_t sz)
130
0
{
131
0
    h2o_redis_client_t *client = h2o_mem_alloc(sz);
132
0
    memset(client, 0, sz);
133
134
0
    client->loop = loop;
135
0
    client->state = H2O_REDIS_CONNECTION_STATE_CLOSED;
136
0
    h2o_timer_init(&client->_timeout_entry, on_connect_timeout);
137
138
0
    return client;
139
0
}
140
141
void h2o_redis_connect(h2o_redis_client_t *client, const char *host, uint16_t port)
142
0
{
143
0
    if (client->state != H2O_REDIS_CONNECTION_STATE_CLOSED) {
144
0
        return;
145
0
    }
146
147
0
    redisAsyncContext *redis = redisAsyncConnect(host, port);
148
0
    if (redis == NULL) {
149
0
        h2o_fatal("no memory");
150
0
    }
151
152
0
    client->_redis = redis;
153
0
    client->_redis->data = client;
154
0
    client->state = H2O_REDIS_CONNECTION_STATE_CONNECTING;
155
156
0
    attach_loop(redis, client->loop);
157
0
    redisAsyncSetConnectCallback(redis, on_connect);
158
0
    redisAsyncSetDisconnectCallback(redis, on_disconnect);
159
160
0
    if (redis->err != REDIS_OK) {
161
        /* some connection failures can be detected at this time */
162
0
        disconnect(client, h2o_redis_error_connection);
163
0
        return;
164
0
    }
165
166
0
    if (client->connect_timeout != 0)
167
0
        h2o_timer_link(client->loop, client->connect_timeout, &client->_timeout_entry);
168
0
}
169
170
void h2o_redis_disconnect(h2o_redis_client_t *client)
171
0
{
172
0
    if (client->state != H2O_REDIS_CONNECTION_STATE_CLOSED)
173
0
        disconnect(client, NULL);
174
0
}
175
176
static void dispose_command(h2o_redis_command_t *command)
177
0
{
178
0
    h2o_timer_unlink(&command->_command_timeout);
179
0
    free(command);
180
0
}
181
182
static void handle_reply(h2o_redis_command_t *command, redisReply *reply, const char *errstr)
183
0
{
184
0
    if (command->cb != NULL)
185
0
        command->cb(reply, command->data, errstr);
186
187
0
    switch (command->type) {
188
0
    case H2O_REDIS_COMMAND_TYPE_SUBSCRIBE:
189
0
    case H2O_REDIS_COMMAND_TYPE_PSUBSCRIBE:
190
0
        if (reply != NULL && reply->type == REDIS_REPLY_ARRAY) {
191
0
            char *unsub = command->type == H2O_REDIS_COMMAND_TYPE_SUBSCRIBE ? "unsubscribe" : "punsubscribe";
192
0
            if (strncasecmp(reply->element[0]->str, unsub, reply->element[0]->len) == 0) {
193
0
                dispose_command(command);
194
0
            } else {
195
                /* (p)subscribe commands doesn't get freed until (p)unsubscribe or disconnect */
196
0
            }
197
0
        } else {
198
0
            dispose_command(command);
199
0
        }
200
0
        break;
201
0
    default:
202
0
        dispose_command(command);
203
0
    }
204
0
}
205
206
static void on_command(redisAsyncContext *redis, void *_reply, void *privdata)
207
0
{
208
0
    redisReply *reply = (redisReply *)_reply;
209
0
    h2o_redis_command_t *command = (h2o_redis_command_t *)privdata;
210
0
    const char *errstr = ((struct st_redis_socket_data_t *)redis->ev.data)->errstr;
211
0
    if (errstr == NULL)
212
0
        errstr = get_error(redis);
213
0
    handle_reply(command, reply, errstr);
214
0
}
215
216
static void on_command_timeout(h2o_timer_t *entry)
217
0
{
218
0
    h2o_redis_command_t *command = H2O_STRUCT_FROM_MEMBER(h2o_redis_command_t, _command_timeout, entry);
219
220
    /* invoke disconnect to finalize inflight commands */
221
0
    disconnect(command->client, h2o_redis_error_command_timeout);
222
0
}
223
224
static h2o_redis_command_t *create_command(h2o_redis_client_t *client, h2o_redis_command_cb cb, void *cb_data,
225
                                           h2o_redis_command_type_t type)
226
0
{
227
0
    h2o_redis_command_t *command = h2o_mem_alloc(sizeof(h2o_redis_command_t));
228
0
    *command = (h2o_redis_command_t){NULL};
229
0
    command->client = client;
230
0
    command->cb = cb;
231
0
    command->data = cb_data;
232
0
    command->type = type;
233
0
    h2o_timer_init(&command->_command_timeout, on_command_timeout);
234
235
0
    if (client->command_timeout != 0 && (type == H2O_REDIS_COMMAND_TYPE_NORMAL || type == H2O_REDIS_COMMAND_TYPE_UNSUBSCRIBE ||
236
0
                                         type == H2O_REDIS_COMMAND_TYPE_PUNSUBSCRIBE))
237
0
        h2o_timer_link(client->loop, client->command_timeout, &command->_command_timeout);
238
239
0
    return command;
240
0
}
241
242
static void send_command(h2o_redis_client_t *client, h2o_redis_command_t *command, const char *cmd, size_t len)
243
0
{
244
0
    if (cmd == NULL) {
245
0
        handle_reply(command, NULL, "Failed to create command");
246
0
        return;
247
0
    }
248
249
0
    if (client->state == H2O_REDIS_CONNECTION_STATE_CLOSED) {
250
0
        handle_reply(command, NULL, h2o_redis_error_connection);
251
0
        return;
252
0
    }
253
254
0
    if (command->type == H2O_REDIS_COMMAND_TYPE_MONITOR) {
255
        /* monitor command implementation in hiredis asynchronous API is absolutely dangerous, so don't use it! */
256
0
        handle_reply(command, NULL, "Unsupported command");
257
0
        return;
258
0
    }
259
260
0
    int ret = redisAsyncFormattedCommand(client->_redis, on_command, command, cmd, len);
261
0
    if (ret != REDIS_OK) {
262
0
        handle_reply(command, NULL, "Failed to send command");
263
0
    }
264
0
}
265
266
/*
267
  hiredis doesn't expose any information about the command, so parse here.
268
  this function assumes that formatted is NULL-terminated
269
 */
270
static h2o_redis_command_type_t detect_command_type(const char *formatted)
271
0
{
272
0
#define CHECK(c)                                                                                                                   \
273
0
    if (c == NULL)                                                                                                                 \
274
0
    return H2O_REDIS_COMMAND_TYPE_ERROR
275
276
0
    char *p = (char *)formatted;
277
0
    CHECK(p);
278
279
0
    assert(p[0] == '*');
280
281
0
    p = strchr(p, '$');
282
0
    CHECK(p);
283
0
    p = strchr(p, '\n');
284
0
    CHECK(p);
285
0
    ++p;
286
0
    CHECK(p);
287
288
0
#define MATCH(c, target) strncasecmp(c, target, sizeof(target) - 1) == 0
289
0
    if (MATCH(p, "subscribe\r\n"))
290
0
        return H2O_REDIS_COMMAND_TYPE_SUBSCRIBE;
291
0
    if (MATCH(p, "unsubscribe\r\n"))
292
0
        return H2O_REDIS_COMMAND_TYPE_UNSUBSCRIBE;
293
0
    if (MATCH(p, "psubscribe\r\n"))
294
0
        return H2O_REDIS_COMMAND_TYPE_PSUBSCRIBE;
295
0
    if (MATCH(p, "punsubscribe\r\n"))
296
0
        return H2O_REDIS_COMMAND_TYPE_PUNSUBSCRIBE;
297
0
    if (MATCH(p, "monitor\r\n"))
298
0
        return H2O_REDIS_COMMAND_TYPE_MONITOR;
299
0
#undef MATCH
300
0
    return H2O_REDIS_COMMAND_TYPE_NORMAL;
301
0
#undef CHECK
302
0
}
303
304
h2o_redis_command_t *h2o_redis_command(h2o_redis_client_t *client, h2o_redis_command_cb cb, void *cb_data, const char *format, ...)
305
0
{
306
0
    char *cmd;
307
0
    int len;
308
0
    va_list ap;
309
0
    va_start(ap, format);
310
0
    len = redisvFormatCommand(&cmd, format, ap);
311
0
    va_end(ap);
312
0
    if (len <= 0) {
313
0
        cmd = NULL;
314
0
        len = 0;
315
0
    }
316
317
0
    h2o_redis_command_t *command = create_command(client, cb, cb_data, detect_command_type(cmd));
318
0
    send_command(client, command, cmd, len);
319
0
    free(cmd);
320
0
    return command;
321
0
}
322
323
h2o_redis_command_t *h2o_redis_command_argv(h2o_redis_client_t *client, h2o_redis_command_cb cb, void *cb_data, int argc,
324
                                            const char **argv, const size_t *argvlen)
325
0
{
326
0
    sds sdscmd;
327
0
    int len;
328
0
    len = redisFormatSdsCommandArgv(&sdscmd, argc, argv, argvlen);
329
0
    if (len < 0) {
330
0
        sdscmd = NULL;
331
0
        len = 0;
332
0
    }
333
334
0
    h2o_redis_command_t *command = create_command(client, cb, cb_data, detect_command_type(sdscmd));
335
0
    send_command(client, command, sdscmd, len);
336
0
    sdsfree(sdscmd);
337
0
    return command;
338
0
}
339
340
void h2o_redis_free(h2o_redis_client_t *client)
341
0
{
342
0
    if (client->state != H2O_REDIS_CONNECTION_STATE_CLOSED)
343
0
        disconnect(client, NULL);
344
0
    h2o_timer_unlink(&client->_timeout_entry);
345
0
    free(client);
346
0
}
347
348
/* redis socket adapter */
349
350
static void on_read(h2o_socket_t *sock, const char *err)
351
0
{
352
0
    struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)sock->data;
353
0
    redisAsyncHandleRead(p->context);
354
0
}
355
356
static void on_write(h2o_socket_t *sock, const char *err)
357
0
{
358
0
    struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)sock->data;
359
0
    redisAsyncHandleWrite(p->context);
360
0
}
361
362
static void socket_add_read(void *privdata)
363
0
{
364
0
    struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)privdata;
365
0
    h2o_socket_read_start(p->socket, on_read);
366
0
}
367
368
static void socket_del_read(void *privdata)
369
0
{
370
0
    struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)privdata;
371
0
    h2o_socket_read_stop(p->socket);
372
0
}
373
374
static void socket_add_write(void *privdata)
375
0
{
376
0
    struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)privdata;
377
0
    if (!h2o_socket_is_writing(p->socket)) {
378
0
        h2o_socket_notify_write(p->socket, on_write);
379
0
    }
380
0
}
381
382
static void socket_cleanup(void *privdata)
383
0
{
384
0
    struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)privdata;
385
0
    h2o_socket_close(p->socket);
386
0
    p->context->c.fd = -1; /* prevent hiredis from closing fd twice */
387
0
    free(p);
388
0
}
389
390
static void attach_loop(redisAsyncContext *ac, h2o_loop_t *loop)
391
0
{
392
0
    redisContext *c = &(ac->c);
393
394
0
    struct st_redis_socket_data_t *p = h2o_mem_alloc(sizeof(*p));
395
0
    *p = (struct st_redis_socket_data_t){NULL};
396
397
0
    ac->ev.addRead = socket_add_read;
398
0
    ac->ev.delRead = socket_del_read;
399
0
    ac->ev.addWrite = socket_add_write;
400
0
    ac->ev.cleanup = socket_cleanup;
401
0
    ac->ev.data = p;
402
403
#if H2O_USE_LIBUV
404
    p->socket = h2o_uv__poll_create(loop, c->fd, (uv_close_cb)free);
405
#else
406
0
    p->socket = h2o_evloop_socket_create(loop, c->fd, H2O_SOCKET_FLAG_DONT_READ);
407
0
#endif
408
409
0
    p->socket->data = p;
410
0
    p->context = ac;
411
0
}