/src/h2o/lib/common/redis.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2016 DeNA Co., Ltd., Ichito Nagata |
3 | | * |
4 | | * Permission is hereby granted, free of charge, to any person obtaining a copy |
5 | | * of this software and associated documentation files (the "Software"), to |
6 | | * deal in the Software without restriction, including without limitation the |
7 | | * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or |
8 | | * sell copies of the Software, and to permit persons to whom the Software is |
9 | | * furnished to do so, subject to the following conditions: |
10 | | * |
11 | | * The above copyright notice and this permission notice shall be included in |
12 | | * all copies or substantial portions of the Software. |
13 | | * |
14 | | * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
15 | | * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
16 | | * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
17 | | * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
18 | | * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
19 | | * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
20 | | * IN THE SOFTWARE. |
21 | | */ |
22 | | #include <errno.h> |
23 | | #include "h2o/redis.h" |
24 | | #include "h2o/hiredis_.h" |
25 | | #include "h2o/socket.h" |
26 | | |
27 | | const char h2o_redis_error_connection[] = "Connection Error"; |
28 | | const char h2o_redis_error_protocol[] = "Protocol Error"; |
29 | | const char h2o_redis_error_connect_timeout[] = "Connection Timeout"; |
30 | | const char h2o_redis_error_command_timeout[] = "Command Timeout"; |
31 | | |
32 | | struct st_redis_socket_data_t { |
33 | | redisAsyncContext *context; |
34 | | const char *errstr; |
35 | | h2o_socket_t *socket; |
36 | | }; |
37 | | |
38 | | static void attach_loop(redisAsyncContext *ac, h2o_loop_t *loop); |
39 | | |
40 | | static void invoke_deferred(h2o_redis_client_t *client, uint64_t tick, h2o_timer_t *entry, h2o_timer_cb cb) |
41 | 0 | { |
42 | 0 | entry->cb = cb; |
43 | 0 | h2o_timer_link(client->loop, tick, entry); |
44 | 0 | } |
45 | | |
46 | | static void close_and_detach_connection(h2o_redis_client_t *client, const char *errstr) |
47 | 0 | { |
48 | 0 | assert(client->_redis != NULL); |
49 | 0 | client->state = H2O_REDIS_CONNECTION_STATE_CLOSED; |
50 | 0 | if (client->on_close != NULL) |
51 | 0 | client->on_close(errstr); |
52 | |
|
53 | 0 | client->_redis->data = NULL; |
54 | 0 | client->_redis = NULL; |
55 | 0 | h2o_timer_unlink(&client->_timeout_entry); |
56 | 0 | } |
57 | | |
58 | | static void disconnect(h2o_redis_client_t *client, const char *errstr) |
59 | 0 | { |
60 | 0 | assert(client->state != H2O_REDIS_CONNECTION_STATE_CLOSED); |
61 | 0 | assert(client->_redis != NULL); |
62 | | |
63 | 0 | redisAsyncContext *redis = client->_redis; |
64 | 0 | struct st_redis_socket_data_t *data = redis->ev.data; |
65 | 0 | data->errstr = errstr; |
66 | 0 | close_and_detach_connection(client, errstr); |
67 | 0 | redisAsyncFree(redis); /* immediately call all callbacks of pending commands with nil replies */ |
68 | 0 | } |
69 | | |
70 | | static const char *get_error(const redisAsyncContext *redis) |
71 | 0 | { |
72 | 0 | switch (redis->err) { |
73 | 0 | case REDIS_OK: |
74 | 0 | return NULL; |
75 | 0 | case REDIS_ERR_IO: |
76 | | /* hiredis internally checks socket error and set errno */ |
77 | 0 | if (errno == ETIMEDOUT) { |
78 | 0 | return h2o_redis_error_connect_timeout; |
79 | 0 | } else { |
80 | 0 | return h2o_redis_error_connection; |
81 | 0 | } |
82 | 0 | case REDIS_ERR_EOF: |
83 | 0 | return h2o_redis_error_connection; |
84 | 0 | case REDIS_ERR_PROTOCOL: |
85 | 0 | return h2o_redis_error_protocol; |
86 | 0 | case REDIS_ERR_OOM: |
87 | 0 | case REDIS_ERR_OTHER: |
88 | 0 | return redis->errstr; |
89 | 0 | default: |
90 | 0 | h2o_fatal("FIXME"); |
91 | 0 | } |
92 | 0 | } |
93 | | |
94 | | static void on_connect(const redisAsyncContext *redis, int status) |
95 | 0 | { |
96 | 0 | h2o_redis_client_t *client = (h2o_redis_client_t *)redis->data; |
97 | 0 | if (client == NULL) |
98 | 0 | return; |
99 | | |
100 | 0 | if (status != REDIS_OK) { |
101 | 0 | close_and_detach_connection(client, h2o_redis_error_connection); |
102 | 0 | return; |
103 | 0 | } |
104 | 0 | h2o_timer_unlink(&client->_timeout_entry); |
105 | |
|
106 | 0 | client->state = H2O_REDIS_CONNECTION_STATE_CONNECTED; |
107 | 0 | if (client->on_connect != NULL) |
108 | 0 | client->on_connect(); |
109 | 0 | } |
110 | | |
111 | | static void on_disconnect(const redisAsyncContext *redis, int status) |
112 | 0 | { |
113 | 0 | h2o_redis_client_t *client = (h2o_redis_client_t *)redis->data; |
114 | 0 | if (client == NULL) |
115 | 0 | return; |
116 | | |
117 | 0 | close_and_detach_connection(client, get_error(redis)); |
118 | 0 | } |
119 | | |
120 | | static void on_connect_timeout(h2o_timer_t *entry) |
121 | 0 | { |
122 | 0 | h2o_redis_client_t *client = H2O_STRUCT_FROM_MEMBER(h2o_redis_client_t, _timeout_entry, entry); |
123 | 0 | assert((client->_redis->c.flags & REDIS_CONNECTED) == 0); |
124 | 0 | assert(client->state != H2O_REDIS_CONNECTION_STATE_CLOSED); |
125 | | |
126 | 0 | disconnect(client, h2o_redis_error_connect_timeout); |
127 | 0 | } |
128 | | |
129 | | h2o_redis_client_t *h2o_redis_create_client(h2o_loop_t *loop, size_t sz) |
130 | 0 | { |
131 | 0 | h2o_redis_client_t *client = h2o_mem_alloc(sz); |
132 | 0 | memset(client, 0, sz); |
133 | |
|
134 | 0 | client->loop = loop; |
135 | 0 | client->state = H2O_REDIS_CONNECTION_STATE_CLOSED; |
136 | 0 | h2o_timer_init(&client->_timeout_entry, on_connect_timeout); |
137 | |
|
138 | 0 | return client; |
139 | 0 | } |
140 | | |
141 | | void h2o_redis_connect(h2o_redis_client_t *client, const char *host, uint16_t port) |
142 | 0 | { |
143 | 0 | if (client->state != H2O_REDIS_CONNECTION_STATE_CLOSED) { |
144 | 0 | return; |
145 | 0 | } |
146 | | |
147 | 0 | redisAsyncContext *redis = redisAsyncConnect(host, port); |
148 | 0 | if (redis == NULL) { |
149 | 0 | h2o_fatal("no memory"); |
150 | 0 | } |
151 | | |
152 | 0 | client->_redis = redis; |
153 | 0 | client->_redis->data = client; |
154 | 0 | client->state = H2O_REDIS_CONNECTION_STATE_CONNECTING; |
155 | |
|
156 | 0 | attach_loop(redis, client->loop); |
157 | 0 | redisAsyncSetConnectCallback(redis, on_connect); |
158 | 0 | redisAsyncSetDisconnectCallback(redis, on_disconnect); |
159 | |
|
160 | 0 | if (redis->err != REDIS_OK) { |
161 | | /* some connection failures can be detected at this time */ |
162 | 0 | disconnect(client, h2o_redis_error_connection); |
163 | 0 | return; |
164 | 0 | } |
165 | | |
166 | 0 | if (client->connect_timeout != 0) |
167 | 0 | h2o_timer_link(client->loop, client->connect_timeout, &client->_timeout_entry); |
168 | 0 | } |
169 | | |
170 | | void h2o_redis_disconnect(h2o_redis_client_t *client) |
171 | 0 | { |
172 | 0 | if (client->state != H2O_REDIS_CONNECTION_STATE_CLOSED) |
173 | 0 | disconnect(client, NULL); |
174 | 0 | } |
175 | | |
176 | | static void dispose_command(h2o_redis_command_t *command) |
177 | 0 | { |
178 | 0 | h2o_timer_unlink(&command->_command_timeout); |
179 | 0 | free(command); |
180 | 0 | } |
181 | | |
182 | | static void handle_reply(h2o_redis_command_t *command, redisReply *reply, const char *errstr) |
183 | 0 | { |
184 | 0 | if (command->cb != NULL) |
185 | 0 | command->cb(reply, command->data, errstr); |
186 | |
|
187 | 0 | switch (command->type) { |
188 | 0 | case H2O_REDIS_COMMAND_TYPE_SUBSCRIBE: |
189 | 0 | case H2O_REDIS_COMMAND_TYPE_PSUBSCRIBE: |
190 | 0 | if (reply != NULL && reply->type == REDIS_REPLY_ARRAY) { |
191 | 0 | char *unsub = command->type == H2O_REDIS_COMMAND_TYPE_SUBSCRIBE ? "unsubscribe" : "punsubscribe"; |
192 | 0 | if (strncasecmp(reply->element[0]->str, unsub, reply->element[0]->len) == 0) { |
193 | 0 | dispose_command(command); |
194 | 0 | } else { |
195 | | /* (p)subscribe commands doesn't get freed until (p)unsubscribe or disconnect */ |
196 | 0 | } |
197 | 0 | } else { |
198 | 0 | dispose_command(command); |
199 | 0 | } |
200 | 0 | break; |
201 | 0 | default: |
202 | 0 | dispose_command(command); |
203 | 0 | } |
204 | 0 | } |
205 | | |
206 | | static void on_command(redisAsyncContext *redis, void *_reply, void *privdata) |
207 | 0 | { |
208 | 0 | redisReply *reply = (redisReply *)_reply; |
209 | 0 | h2o_redis_command_t *command = (h2o_redis_command_t *)privdata; |
210 | 0 | const char *errstr = ((struct st_redis_socket_data_t *)redis->ev.data)->errstr; |
211 | 0 | if (errstr == NULL) |
212 | 0 | errstr = get_error(redis); |
213 | 0 | handle_reply(command, reply, errstr); |
214 | 0 | } |
215 | | |
216 | | static void on_command_timeout(h2o_timer_t *entry) |
217 | 0 | { |
218 | 0 | h2o_redis_command_t *command = H2O_STRUCT_FROM_MEMBER(h2o_redis_command_t, _command_timeout, entry); |
219 | | |
220 | | /* invoke disconnect to finalize inflight commands */ |
221 | 0 | disconnect(command->client, h2o_redis_error_command_timeout); |
222 | 0 | } |
223 | | |
224 | | static h2o_redis_command_t *create_command(h2o_redis_client_t *client, h2o_redis_command_cb cb, void *cb_data, |
225 | | h2o_redis_command_type_t type) |
226 | 0 | { |
227 | 0 | h2o_redis_command_t *command = h2o_mem_alloc(sizeof(h2o_redis_command_t)); |
228 | 0 | *command = (h2o_redis_command_t){NULL}; |
229 | 0 | command->client = client; |
230 | 0 | command->cb = cb; |
231 | 0 | command->data = cb_data; |
232 | 0 | command->type = type; |
233 | 0 | h2o_timer_init(&command->_command_timeout, on_command_timeout); |
234 | |
|
235 | 0 | if (client->command_timeout != 0 && (type == H2O_REDIS_COMMAND_TYPE_NORMAL || type == H2O_REDIS_COMMAND_TYPE_UNSUBSCRIBE || |
236 | 0 | type == H2O_REDIS_COMMAND_TYPE_PUNSUBSCRIBE)) |
237 | 0 | h2o_timer_link(client->loop, client->command_timeout, &command->_command_timeout); |
238 | |
|
239 | 0 | return command; |
240 | 0 | } |
241 | | |
242 | | static void send_command(h2o_redis_client_t *client, h2o_redis_command_t *command, const char *cmd, size_t len) |
243 | 0 | { |
244 | 0 | if (cmd == NULL) { |
245 | 0 | handle_reply(command, NULL, "Failed to create command"); |
246 | 0 | return; |
247 | 0 | } |
248 | | |
249 | 0 | if (client->state == H2O_REDIS_CONNECTION_STATE_CLOSED) { |
250 | 0 | handle_reply(command, NULL, h2o_redis_error_connection); |
251 | 0 | return; |
252 | 0 | } |
253 | | |
254 | 0 | if (command->type == H2O_REDIS_COMMAND_TYPE_MONITOR) { |
255 | | /* monitor command implementation in hiredis asynchronous API is absolutely dangerous, so don't use it! */ |
256 | 0 | handle_reply(command, NULL, "Unsupported command"); |
257 | 0 | return; |
258 | 0 | } |
259 | | |
260 | 0 | int ret = redisAsyncFormattedCommand(client->_redis, on_command, command, cmd, len); |
261 | 0 | if (ret != REDIS_OK) { |
262 | 0 | handle_reply(command, NULL, "Failed to send command"); |
263 | 0 | } |
264 | 0 | } |
265 | | |
266 | | /* |
267 | | hiredis doesn't expose any information about the command, so parse here. |
268 | | this function assumes that formatted is NULL-terminated |
269 | | */ |
270 | | static h2o_redis_command_type_t detect_command_type(const char *formatted) |
271 | 0 | { |
272 | 0 | #define CHECK(c) \ |
273 | 0 | if (c == NULL) \ |
274 | 0 | return H2O_REDIS_COMMAND_TYPE_ERROR |
275 | |
|
276 | 0 | char *p = (char *)formatted; |
277 | 0 | CHECK(p); |
278 | | |
279 | 0 | assert(p[0] == '*'); |
280 | | |
281 | 0 | p = strchr(p, '$'); |
282 | 0 | CHECK(p); |
283 | 0 | p = strchr(p, '\n'); |
284 | 0 | CHECK(p); |
285 | 0 | ++p; |
286 | 0 | CHECK(p); |
287 | | |
288 | 0 | #define MATCH(c, target) strncasecmp(c, target, sizeof(target) - 1) == 0 |
289 | 0 | if (MATCH(p, "subscribe\r\n")) |
290 | 0 | return H2O_REDIS_COMMAND_TYPE_SUBSCRIBE; |
291 | 0 | if (MATCH(p, "unsubscribe\r\n")) |
292 | 0 | return H2O_REDIS_COMMAND_TYPE_UNSUBSCRIBE; |
293 | 0 | if (MATCH(p, "psubscribe\r\n")) |
294 | 0 | return H2O_REDIS_COMMAND_TYPE_PSUBSCRIBE; |
295 | 0 | if (MATCH(p, "punsubscribe\r\n")) |
296 | 0 | return H2O_REDIS_COMMAND_TYPE_PUNSUBSCRIBE; |
297 | 0 | if (MATCH(p, "monitor\r\n")) |
298 | 0 | return H2O_REDIS_COMMAND_TYPE_MONITOR; |
299 | 0 | #undef MATCH |
300 | 0 | return H2O_REDIS_COMMAND_TYPE_NORMAL; |
301 | 0 | #undef CHECK |
302 | 0 | } |
303 | | |
304 | | h2o_redis_command_t *h2o_redis_command(h2o_redis_client_t *client, h2o_redis_command_cb cb, void *cb_data, const char *format, ...) |
305 | 0 | { |
306 | 0 | char *cmd; |
307 | 0 | int len; |
308 | 0 | va_list ap; |
309 | 0 | va_start(ap, format); |
310 | 0 | len = redisvFormatCommand(&cmd, format, ap); |
311 | 0 | va_end(ap); |
312 | 0 | if (len <= 0) { |
313 | 0 | cmd = NULL; |
314 | 0 | len = 0; |
315 | 0 | } |
316 | |
|
317 | 0 | h2o_redis_command_t *command = create_command(client, cb, cb_data, detect_command_type(cmd)); |
318 | 0 | send_command(client, command, cmd, len); |
319 | 0 | free(cmd); |
320 | 0 | return command; |
321 | 0 | } |
322 | | |
323 | | h2o_redis_command_t *h2o_redis_command_argv(h2o_redis_client_t *client, h2o_redis_command_cb cb, void *cb_data, int argc, |
324 | | const char **argv, const size_t *argvlen) |
325 | 0 | { |
326 | 0 | sds sdscmd; |
327 | 0 | int len; |
328 | 0 | len = redisFormatSdsCommandArgv(&sdscmd, argc, argv, argvlen); |
329 | 0 | if (len < 0) { |
330 | 0 | sdscmd = NULL; |
331 | 0 | len = 0; |
332 | 0 | } |
333 | |
|
334 | 0 | h2o_redis_command_t *command = create_command(client, cb, cb_data, detect_command_type(sdscmd)); |
335 | 0 | send_command(client, command, sdscmd, len); |
336 | 0 | sdsfree(sdscmd); |
337 | 0 | return command; |
338 | 0 | } |
339 | | |
340 | | void h2o_redis_free(h2o_redis_client_t *client) |
341 | 0 | { |
342 | 0 | if (client->state != H2O_REDIS_CONNECTION_STATE_CLOSED) |
343 | 0 | disconnect(client, NULL); |
344 | 0 | h2o_timer_unlink(&client->_timeout_entry); |
345 | 0 | free(client); |
346 | 0 | } |
347 | | |
348 | | /* redis socket adapter */ |
349 | | |
350 | | static void on_read(h2o_socket_t *sock, const char *err) |
351 | 0 | { |
352 | 0 | struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)sock->data; |
353 | 0 | redisAsyncHandleRead(p->context); |
354 | 0 | } |
355 | | |
356 | | static void on_write(h2o_socket_t *sock, const char *err) |
357 | 0 | { |
358 | 0 | struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)sock->data; |
359 | 0 | redisAsyncHandleWrite(p->context); |
360 | 0 | } |
361 | | |
362 | | static void socket_add_read(void *privdata) |
363 | 0 | { |
364 | 0 | struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)privdata; |
365 | 0 | h2o_socket_read_start(p->socket, on_read); |
366 | 0 | } |
367 | | |
368 | | static void socket_del_read(void *privdata) |
369 | 0 | { |
370 | 0 | struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)privdata; |
371 | 0 | h2o_socket_read_stop(p->socket); |
372 | 0 | } |
373 | | |
374 | | static void socket_add_write(void *privdata) |
375 | 0 | { |
376 | 0 | struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)privdata; |
377 | 0 | if (!h2o_socket_is_writing(p->socket)) { |
378 | 0 | h2o_socket_notify_write(p->socket, on_write); |
379 | 0 | } |
380 | 0 | } |
381 | | |
382 | | static void socket_cleanup(void *privdata) |
383 | 0 | { |
384 | 0 | struct st_redis_socket_data_t *p = (struct st_redis_socket_data_t *)privdata; |
385 | 0 | h2o_socket_close(p->socket); |
386 | 0 | p->context->c.fd = -1; /* prevent hiredis from closing fd twice */ |
387 | 0 | free(p); |
388 | 0 | } |
389 | | |
390 | | static void attach_loop(redisAsyncContext *ac, h2o_loop_t *loop) |
391 | 0 | { |
392 | 0 | redisContext *c = &(ac->c); |
393 | |
|
394 | 0 | struct st_redis_socket_data_t *p = h2o_mem_alloc(sizeof(*p)); |
395 | 0 | *p = (struct st_redis_socket_data_t){NULL}; |
396 | |
|
397 | 0 | ac->ev.addRead = socket_add_read; |
398 | 0 | ac->ev.delRead = socket_del_read; |
399 | 0 | ac->ev.addWrite = socket_add_write; |
400 | 0 | ac->ev.cleanup = socket_cleanup; |
401 | 0 | ac->ev.data = p; |
402 | |
|
403 | | #if H2O_USE_LIBUV |
404 | | p->socket = h2o_uv__poll_create(loop, c->fd, (uv_close_cb)free); |
405 | | #else |
406 | 0 | p->socket = h2o_evloop_socket_create(loop, c->fd, H2O_SOCKET_FLAG_DONT_READ); |
407 | 0 | #endif |
408 | |
|
409 | 0 | p->socket->data = p; |
410 | 0 | p->context = ac; |
411 | 0 | } |