/src/h2o/deps/hiredis/async.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com> |
3 | | * Copyright (c) 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com> |
4 | | * |
5 | | * All rights reserved. |
6 | | * |
7 | | * Redistribution and use in source and binary forms, with or without |
8 | | * modification, are permitted provided that the following conditions are met: |
9 | | * |
10 | | * * Redistributions of source code must retain the above copyright notice, |
11 | | * this list of conditions and the following disclaimer. |
12 | | * * Redistributions in binary form must reproduce the above copyright |
13 | | * notice, this list of conditions and the following disclaimer in the |
14 | | * documentation and/or other materials provided with the distribution. |
15 | | * * Neither the name of Redis nor the names of its contributors may be used |
16 | | * to endorse or promote products derived from this software without |
17 | | * specific prior written permission. |
18 | | * |
19 | | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
20 | | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
21 | | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
22 | | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
23 | | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
24 | | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
25 | | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
26 | | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
27 | | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
28 | | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
29 | | * POSSIBILITY OF SUCH DAMAGE. |
30 | | */ |
31 | | |
32 | | #include "fmacros.h" |
33 | | #include <stdlib.h> |
34 | | #include <string.h> |
35 | | #include <strings.h> |
36 | | #include <assert.h> |
37 | | #include <ctype.h> |
38 | | #include <errno.h> |
39 | | #include "async.h" |
40 | | #include "net.h" |
41 | | #include "dict.c" |
42 | | #include "sds.h" |
43 | | |
44 | 0 | #define _EL_ADD_READ(ctx) do { \ |
45 | 0 | if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \ |
46 | 0 | } while(0) |
47 | | #define _EL_DEL_READ(ctx) do { \ |
48 | | if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \ |
49 | | } while(0) |
50 | 0 | #define _EL_ADD_WRITE(ctx) do { \ |
51 | 0 | if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \ |
52 | 0 | } while(0) |
53 | 0 | #define _EL_DEL_WRITE(ctx) do { \ |
54 | 0 | if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \ |
55 | 0 | } while(0) |
56 | 0 | #define _EL_CLEANUP(ctx) do { \ |
57 | 0 | if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \ |
58 | 0 | } while(0); |
59 | | |
60 | | /* Forward declaration of function in hiredis.c */ |
61 | | int __redisAppendCommand(redisContext *c, const char *cmd, size_t len); |
62 | | |
63 | | /* Functions managing dictionary of callbacks for pub/sub. */ |
64 | 0 | static unsigned int callbackHash(const void *key) { |
65 | 0 | return dictGenHashFunction((const unsigned char *)key, |
66 | 0 | sdslen((const sds)key)); |
67 | 0 | } |
68 | | |
69 | 0 | static void *callbackValDup(void *privdata, const void *src) { |
70 | 0 | ((void) privdata); |
71 | 0 | redisCallback *dup = malloc(sizeof(*dup)); |
72 | 0 | memcpy(dup,src,sizeof(*dup)); |
73 | 0 | return dup; |
74 | 0 | } |
75 | | |
76 | 0 | static int callbackKeyCompare(void *privdata, const void *key1, const void *key2) { |
77 | 0 | int l1, l2; |
78 | 0 | ((void) privdata); |
79 | |
|
80 | 0 | l1 = sdslen((const sds)key1); |
81 | 0 | l2 = sdslen((const sds)key2); |
82 | 0 | if (l1 != l2) return 0; |
83 | 0 | return memcmp(key1,key2,l1) == 0; |
84 | 0 | } |
85 | | |
86 | 0 | static void callbackKeyDestructor(void *privdata, void *key) { |
87 | 0 | ((void) privdata); |
88 | 0 | sdsfree((sds)key); |
89 | 0 | } |
90 | | |
91 | 0 | static void callbackValDestructor(void *privdata, void *val) { |
92 | 0 | ((void) privdata); |
93 | 0 | free(val); |
94 | 0 | } |
95 | | |
96 | | static dictType callbackDict = { |
97 | | callbackHash, |
98 | | NULL, |
99 | | callbackValDup, |
100 | | callbackKeyCompare, |
101 | | callbackKeyDestructor, |
102 | | callbackValDestructor |
103 | | }; |
104 | | |
105 | 0 | static redisAsyncContext *redisAsyncInitialize(redisContext *c) { |
106 | 0 | redisAsyncContext *ac; |
107 | |
|
108 | 0 | ac = realloc(c,sizeof(redisAsyncContext)); |
109 | 0 | if (ac == NULL) |
110 | 0 | return NULL; |
111 | | |
112 | 0 | c = &(ac->c); |
113 | | |
114 | | /* The regular connect functions will always set the flag REDIS_CONNECTED. |
115 | | * For the async API, we want to wait until the first write event is |
116 | | * received up before setting this flag, so reset it here. */ |
117 | 0 | c->flags &= ~REDIS_CONNECTED; |
118 | |
|
119 | 0 | ac->err = 0; |
120 | 0 | ac->errstr = NULL; |
121 | 0 | ac->data = NULL; |
122 | |
|
123 | 0 | ac->ev.data = NULL; |
124 | 0 | ac->ev.addRead = NULL; |
125 | 0 | ac->ev.delRead = NULL; |
126 | 0 | ac->ev.addWrite = NULL; |
127 | 0 | ac->ev.delWrite = NULL; |
128 | 0 | ac->ev.cleanup = NULL; |
129 | |
|
130 | 0 | ac->onConnect = NULL; |
131 | 0 | ac->onDisconnect = NULL; |
132 | |
|
133 | 0 | ac->replies.head = NULL; |
134 | 0 | ac->replies.tail = NULL; |
135 | 0 | ac->sub.invalid.head = NULL; |
136 | 0 | ac->sub.invalid.tail = NULL; |
137 | 0 | ac->sub.channels = dictCreate(&callbackDict,NULL); |
138 | 0 | ac->sub.patterns = dictCreate(&callbackDict,NULL); |
139 | 0 | return ac; |
140 | 0 | } |
141 | | |
142 | | /* We want the error field to be accessible directly instead of requiring |
143 | | * an indirection to the redisContext struct. */ |
144 | 0 | static void __redisAsyncCopyError(redisAsyncContext *ac) { |
145 | 0 | if (!ac) |
146 | 0 | return; |
147 | | |
148 | 0 | redisContext *c = &(ac->c); |
149 | 0 | ac->err = c->err; |
150 | 0 | ac->errstr = c->errstr; |
151 | 0 | } |
152 | | |
153 | 0 | redisAsyncContext *redisAsyncConnect(const char *ip, int port) { |
154 | 0 | redisContext *c; |
155 | 0 | redisAsyncContext *ac; |
156 | |
|
157 | 0 | c = redisConnectNonBlock(ip,port); |
158 | 0 | if (c == NULL) |
159 | 0 | return NULL; |
160 | | |
161 | 0 | ac = redisAsyncInitialize(c); |
162 | 0 | if (ac == NULL) { |
163 | 0 | redisFree(c); |
164 | 0 | return NULL; |
165 | 0 | } |
166 | | |
167 | 0 | __redisAsyncCopyError(ac); |
168 | 0 | return ac; |
169 | 0 | } |
170 | | |
171 | | redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, |
172 | 0 | const char *source_addr) { |
173 | 0 | redisContext *c = redisConnectBindNonBlock(ip,port,source_addr); |
174 | 0 | redisAsyncContext *ac = redisAsyncInitialize(c); |
175 | 0 | __redisAsyncCopyError(ac); |
176 | 0 | return ac; |
177 | 0 | } |
178 | | |
179 | | redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, |
180 | 0 | const char *source_addr) { |
181 | 0 | redisContext *c = redisConnectBindNonBlockWithReuse(ip,port,source_addr); |
182 | 0 | redisAsyncContext *ac = redisAsyncInitialize(c); |
183 | 0 | __redisAsyncCopyError(ac); |
184 | 0 | return ac; |
185 | 0 | } |
186 | | |
187 | 0 | redisAsyncContext *redisAsyncConnectUnix(const char *path) { |
188 | 0 | redisContext *c; |
189 | 0 | redisAsyncContext *ac; |
190 | |
|
191 | 0 | c = redisConnectUnixNonBlock(path); |
192 | 0 | if (c == NULL) |
193 | 0 | return NULL; |
194 | | |
195 | 0 | ac = redisAsyncInitialize(c); |
196 | 0 | if (ac == NULL) { |
197 | 0 | redisFree(c); |
198 | 0 | return NULL; |
199 | 0 | } |
200 | | |
201 | 0 | __redisAsyncCopyError(ac); |
202 | 0 | return ac; |
203 | 0 | } |
204 | | |
205 | 0 | int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) { |
206 | 0 | if (ac->onConnect == NULL) { |
207 | 0 | ac->onConnect = fn; |
208 | | |
209 | | /* The common way to detect an established connection is to wait for |
210 | | * the first write event to be fired. This assumes the related event |
211 | | * library functions are already set. */ |
212 | 0 | _EL_ADD_WRITE(ac); |
213 | 0 | return REDIS_OK; |
214 | 0 | } |
215 | 0 | return REDIS_ERR; |
216 | 0 | } |
217 | | |
218 | 0 | int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) { |
219 | 0 | if (ac->onDisconnect == NULL) { |
220 | 0 | ac->onDisconnect = fn; |
221 | 0 | return REDIS_OK; |
222 | 0 | } |
223 | 0 | return REDIS_ERR; |
224 | 0 | } |
225 | | |
226 | | /* Helper functions to push/shift callbacks */ |
227 | 0 | static int __redisPushCallback(redisCallbackList *list, redisCallback *source) { |
228 | 0 | redisCallback *cb; |
229 | | |
230 | | /* Copy callback from stack to heap */ |
231 | 0 | cb = malloc(sizeof(*cb)); |
232 | 0 | if (cb == NULL) |
233 | 0 | return REDIS_ERR_OOM; |
234 | | |
235 | 0 | if (source != NULL) { |
236 | 0 | memcpy(cb,source,sizeof(*cb)); |
237 | 0 | cb->next = NULL; |
238 | 0 | } |
239 | | |
240 | | /* Store callback in list */ |
241 | 0 | if (list->head == NULL) |
242 | 0 | list->head = cb; |
243 | 0 | if (list->tail != NULL) |
244 | 0 | list->tail->next = cb; |
245 | 0 | list->tail = cb; |
246 | 0 | return REDIS_OK; |
247 | 0 | } |
248 | | |
249 | 0 | static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) { |
250 | 0 | redisCallback *cb = list->head; |
251 | 0 | if (cb != NULL) { |
252 | 0 | list->head = cb->next; |
253 | 0 | if (cb == list->tail) |
254 | 0 | list->tail = NULL; |
255 | | |
256 | | /* Copy callback from heap to stack */ |
257 | 0 | if (target != NULL) |
258 | 0 | memcpy(target,cb,sizeof(*cb)); |
259 | 0 | free(cb); |
260 | 0 | return REDIS_OK; |
261 | 0 | } |
262 | 0 | return REDIS_ERR; |
263 | 0 | } |
264 | | |
265 | 0 | static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) { |
266 | 0 | redisContext *c = &(ac->c); |
267 | 0 | if (cb->fn != NULL) { |
268 | 0 | c->flags |= REDIS_IN_CALLBACK; |
269 | 0 | cb->fn(ac,reply,cb->privdata); |
270 | 0 | c->flags &= ~REDIS_IN_CALLBACK; |
271 | 0 | } |
272 | 0 | } |
273 | | |
274 | | /* Helper function to free the context. */ |
275 | 0 | static void __redisAsyncFree(redisAsyncContext *ac) { |
276 | 0 | redisContext *c = &(ac->c); |
277 | 0 | redisCallback cb; |
278 | 0 | dictIterator *it; |
279 | 0 | dictEntry *de; |
280 | | |
281 | | /* Execute pending callbacks with NULL reply. */ |
282 | 0 | while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) |
283 | 0 | __redisRunCallback(ac,&cb,NULL); |
284 | | |
285 | | /* Execute callbacks for invalid commands */ |
286 | 0 | while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK) |
287 | 0 | __redisRunCallback(ac,&cb,NULL); |
288 | | |
289 | | /* Run subscription callbacks callbacks with NULL reply */ |
290 | 0 | it = dictGetIterator(ac->sub.channels); |
291 | 0 | while ((de = dictNext(it)) != NULL) |
292 | 0 | __redisRunCallback(ac,dictGetEntryVal(de),NULL); |
293 | 0 | dictReleaseIterator(it); |
294 | 0 | dictRelease(ac->sub.channels); |
295 | |
|
296 | 0 | it = dictGetIterator(ac->sub.patterns); |
297 | 0 | while ((de = dictNext(it)) != NULL) |
298 | 0 | __redisRunCallback(ac,dictGetEntryVal(de),NULL); |
299 | 0 | dictReleaseIterator(it); |
300 | 0 | dictRelease(ac->sub.patterns); |
301 | | |
302 | | /* Signal event lib to clean up */ |
303 | 0 | _EL_CLEANUP(ac); |
304 | | |
305 | | /* Execute disconnect callback. When redisAsyncFree() initiated destroying |
306 | | * this context, the status will always be REDIS_OK. */ |
307 | 0 | if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) { |
308 | 0 | if (c->flags & REDIS_FREEING) { |
309 | 0 | ac->onDisconnect(ac,REDIS_OK); |
310 | 0 | } else { |
311 | 0 | ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR); |
312 | 0 | } |
313 | 0 | } |
314 | | |
315 | | /* Cleanup self */ |
316 | 0 | redisFree(c); |
317 | 0 | } |
318 | | |
319 | | /* Free the async context. When this function is called from a callback, |
320 | | * control needs to be returned to redisProcessCallbacks() before actual |
321 | | * free'ing. To do so, a flag is set on the context which is picked up by |
322 | | * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */ |
323 | 0 | void redisAsyncFree(redisAsyncContext *ac) { |
324 | 0 | redisContext *c = &(ac->c); |
325 | 0 | c->flags |= REDIS_FREEING; |
326 | 0 | if (!(c->flags & REDIS_IN_CALLBACK)) |
327 | 0 | __redisAsyncFree(ac); |
328 | 0 | } |
329 | | |
330 | | /* Helper function to make the disconnect happen and clean up. */ |
331 | 0 | static void __redisAsyncDisconnect(redisAsyncContext *ac) { |
332 | 0 | redisContext *c = &(ac->c); |
333 | | |
334 | | /* Make sure error is accessible if there is any */ |
335 | 0 | __redisAsyncCopyError(ac); |
336 | |
|
337 | 0 | if (ac->err == 0) { |
338 | | /* For clean disconnects, there should be no pending callbacks. */ |
339 | 0 | int ret = __redisShiftCallback(&ac->replies,NULL); |
340 | 0 | assert(ret == REDIS_ERR); |
341 | 0 | } else { |
342 | | /* Disconnection is caused by an error, make sure that pending |
343 | | * callbacks cannot call new commands. */ |
344 | 0 | c->flags |= REDIS_DISCONNECTING; |
345 | 0 | } |
346 | | |
347 | | /* For non-clean disconnects, __redisAsyncFree() will execute pending |
348 | | * callbacks with a NULL-reply. */ |
349 | 0 | __redisAsyncFree(ac); |
350 | 0 | } |
351 | | |
352 | | /* Tries to do a clean disconnect from Redis, meaning it stops new commands |
353 | | * from being issued, but tries to flush the output buffer and execute |
354 | | * callbacks for all remaining replies. When this function is called from a |
355 | | * callback, there might be more replies and we can safely defer disconnecting |
356 | | * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately |
357 | | * when there are no pending callbacks. */ |
358 | 0 | void redisAsyncDisconnect(redisAsyncContext *ac) { |
359 | 0 | redisContext *c = &(ac->c); |
360 | 0 | c->flags |= REDIS_DISCONNECTING; |
361 | 0 | if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL) |
362 | 0 | __redisAsyncDisconnect(ac); |
363 | 0 | } |
364 | | |
365 | 0 | static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) { |
366 | 0 | redisContext *c = &(ac->c); |
367 | 0 | dict *callbacks; |
368 | 0 | dictEntry *de; |
369 | 0 | int pvariant; |
370 | 0 | char *stype; |
371 | 0 | sds sname; |
372 | | |
373 | | /* Custom reply functions are not supported for pub/sub. This will fail |
374 | | * very hard when they are used... */ |
375 | 0 | if (reply->type == REDIS_REPLY_ARRAY) { |
376 | 0 | assert(reply->elements >= 2); |
377 | 0 | assert(reply->element[0]->type == REDIS_REPLY_STRING); |
378 | 0 | stype = reply->element[0]->str; |
379 | 0 | pvariant = (tolower(stype[0]) == 'p') ? 1 : 0; |
380 | |
|
381 | 0 | if (pvariant) |
382 | 0 | callbacks = ac->sub.patterns; |
383 | 0 | else |
384 | 0 | callbacks = ac->sub.channels; |
385 | | |
386 | | /* Locate the right callback */ |
387 | 0 | assert(reply->element[1]->type == REDIS_REPLY_STRING); |
388 | 0 | sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len); |
389 | 0 | de = dictFind(callbacks,sname); |
390 | 0 | if (de != NULL) { |
391 | 0 | memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb)); |
392 | | |
393 | | /* If this is an unsubscribe message, remove it. */ |
394 | 0 | if (strcasecmp(stype+pvariant,"unsubscribe") == 0) { |
395 | 0 | dictDelete(callbacks,sname); |
396 | | |
397 | | /* If this was the last unsubscribe message, revert to |
398 | | * non-subscribe mode. */ |
399 | 0 | assert(reply->element[2]->type == REDIS_REPLY_INTEGER); |
400 | 0 | if (reply->element[2]->integer == 0) |
401 | 0 | c->flags &= ~REDIS_SUBSCRIBED; |
402 | 0 | } |
403 | 0 | } |
404 | 0 | sdsfree(sname); |
405 | 0 | } else { |
406 | | /* Shift callback for invalid commands. */ |
407 | 0 | __redisShiftCallback(&ac->sub.invalid,dstcb); |
408 | 0 | } |
409 | 0 | return REDIS_OK; |
410 | 0 | } |
411 | | |
412 | 0 | void redisProcessCallbacks(redisAsyncContext *ac) { |
413 | 0 | redisContext *c = &(ac->c); |
414 | 0 | redisCallback cb = {NULL, NULL, NULL}; |
415 | 0 | void *reply = NULL; |
416 | 0 | int status; |
417 | |
|
418 | 0 | while((status = redisGetReply(c,&reply)) == REDIS_OK) { |
419 | 0 | if (reply == NULL) { |
420 | | /* When the connection is being disconnected and there are |
421 | | * no more replies, this is the cue to really disconnect. */ |
422 | 0 | if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0 |
423 | 0 | && ac->replies.head == NULL) { |
424 | 0 | __redisAsyncDisconnect(ac); |
425 | 0 | return; |
426 | 0 | } |
427 | | |
428 | | /* If monitor mode, repush callback */ |
429 | 0 | if(c->flags & REDIS_MONITORING) { |
430 | 0 | __redisPushCallback(&ac->replies,&cb); |
431 | 0 | } |
432 | | |
433 | | /* When the connection is not being disconnected, simply stop |
434 | | * trying to get replies and wait for the next loop tick. */ |
435 | 0 | break; |
436 | 0 | } |
437 | | |
438 | | /* Even if the context is subscribed, pending regular callbacks will |
439 | | * get a reply before pub/sub messages arrive. */ |
440 | 0 | if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) { |
441 | | /* |
442 | | * A spontaneous reply in a not-subscribed context can be the error |
443 | | * reply that is sent when a new connection exceeds the maximum |
444 | | * number of allowed connections on the server side. |
445 | | * |
446 | | * This is seen as an error instead of a regular reply because the |
447 | | * server closes the connection after sending it. |
448 | | * |
449 | | * To prevent the error from being overwritten by an EOF error the |
450 | | * connection is closed here. See issue #43. |
451 | | * |
452 | | * Another possibility is that the server is loading its dataset. |
453 | | * In this case we also want to close the connection, and have the |
454 | | * user wait until the server is ready to take our request. |
455 | | */ |
456 | 0 | if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) { |
457 | 0 | c->err = REDIS_ERR_OTHER; |
458 | 0 | snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str); |
459 | 0 | c->reader->fn->freeObject(reply); |
460 | 0 | __redisAsyncDisconnect(ac); |
461 | 0 | return; |
462 | 0 | } |
463 | | /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */ |
464 | 0 | assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)); |
465 | 0 | if(c->flags & REDIS_SUBSCRIBED) |
466 | 0 | __redisGetSubscribeCallback(ac,reply,&cb); |
467 | 0 | } |
468 | | |
469 | 0 | if (cb.fn != NULL) { |
470 | 0 | __redisRunCallback(ac,&cb,reply); |
471 | 0 | c->reader->fn->freeObject(reply); |
472 | | |
473 | | /* Proceed with free'ing when redisAsyncFree() was called. */ |
474 | 0 | if (c->flags & REDIS_FREEING) { |
475 | 0 | __redisAsyncFree(ac); |
476 | 0 | return; |
477 | 0 | } |
478 | 0 | } else { |
479 | | /* No callback for this reply. This can either be a NULL callback, |
480 | | * or there were no callbacks to begin with. Either way, don't |
481 | | * abort with an error, but simply ignore it because the client |
482 | | * doesn't know what the server will spit out over the wire. */ |
483 | 0 | c->reader->fn->freeObject(reply); |
484 | 0 | } |
485 | 0 | } |
486 | | |
487 | | /* Disconnect when there was an error reading the reply */ |
488 | 0 | if (status != REDIS_OK) |
489 | 0 | __redisAsyncDisconnect(ac); |
490 | 0 | } |
491 | | |
492 | | /* Internal helper function to detect socket status the first time a read or |
493 | | * write event fires. When connecting was not successful, the connect callback |
494 | | * is called with a REDIS_ERR status and the context is free'd. */ |
495 | 0 | static int __redisAsyncHandleConnect(redisAsyncContext *ac) { |
496 | 0 | redisContext *c = &(ac->c); |
497 | |
|
498 | 0 | if (redisCheckSocketError(c) == REDIS_ERR) { |
499 | | /* Try again later when connect(2) is still in progress. */ |
500 | 0 | if (errno == EINPROGRESS) |
501 | 0 | return REDIS_OK; |
502 | | |
503 | 0 | if (ac->onConnect) ac->onConnect(ac,REDIS_ERR); |
504 | 0 | __redisAsyncDisconnect(ac); |
505 | 0 | return REDIS_ERR; |
506 | 0 | } |
507 | | |
508 | | /* Mark context as connected. */ |
509 | 0 | c->flags |= REDIS_CONNECTED; |
510 | 0 | if (ac->onConnect) ac->onConnect(ac,REDIS_OK); |
511 | 0 | return REDIS_OK; |
512 | 0 | } |
513 | | |
514 | | /* This function should be called when the socket is readable. |
515 | | * It processes all replies that can be read and executes their callbacks. |
516 | | */ |
517 | 0 | void redisAsyncHandleRead(redisAsyncContext *ac) { |
518 | 0 | redisContext *c = &(ac->c); |
519 | |
|
520 | 0 | if (!(c->flags & REDIS_CONNECTED)) { |
521 | | /* Abort connect was not successful. */ |
522 | 0 | if (__redisAsyncHandleConnect(ac) != REDIS_OK) |
523 | 0 | return; |
524 | | /* Try again later when the context is still not connected. */ |
525 | 0 | if (!(c->flags & REDIS_CONNECTED)) |
526 | 0 | return; |
527 | 0 | } |
528 | | |
529 | 0 | if (redisBufferRead(c) == REDIS_ERR) { |
530 | 0 | __redisAsyncDisconnect(ac); |
531 | 0 | } else { |
532 | | /* Always re-schedule reads */ |
533 | 0 | _EL_ADD_READ(ac); |
534 | 0 | redisProcessCallbacks(ac); |
535 | 0 | } |
536 | 0 | } |
537 | | |
538 | 0 | void redisAsyncHandleWrite(redisAsyncContext *ac) { |
539 | 0 | redisContext *c = &(ac->c); |
540 | 0 | int done = 0; |
541 | |
|
542 | 0 | if (!(c->flags & REDIS_CONNECTED)) { |
543 | | /* Abort connect was not successful. */ |
544 | 0 | if (__redisAsyncHandleConnect(ac) != REDIS_OK) |
545 | 0 | return; |
546 | | /* Try again later when the context is still not connected. */ |
547 | 0 | if (!(c->flags & REDIS_CONNECTED)) |
548 | 0 | return; |
549 | 0 | } |
550 | | |
551 | 0 | if (redisBufferWrite(c,&done) == REDIS_ERR) { |
552 | 0 | __redisAsyncDisconnect(ac); |
553 | 0 | } else { |
554 | | /* Continue writing when not done, stop writing otherwise */ |
555 | 0 | if (!done) |
556 | 0 | _EL_ADD_WRITE(ac); |
557 | 0 | else |
558 | 0 | _EL_DEL_WRITE(ac); |
559 | | |
560 | | /* Always schedule reads after writes */ |
561 | 0 | _EL_ADD_READ(ac); |
562 | 0 | } |
563 | 0 | } |
564 | | |
565 | | /* Sets a pointer to the first argument and its length starting at p. Returns |
566 | | * the number of bytes to skip to get to the following argument. */ |
567 | 0 | static const char *nextArgument(const char *start, const char **str, size_t *len) { |
568 | 0 | const char *p = start; |
569 | 0 | if (p[0] != '$') { |
570 | 0 | p = strchr(p,'$'); |
571 | 0 | if (p == NULL) return NULL; |
572 | 0 | } |
573 | | |
574 | 0 | *len = (int)strtol(p+1,NULL,10); |
575 | 0 | p = strchr(p,'\r'); |
576 | 0 | assert(p); |
577 | 0 | *str = p+2; |
578 | 0 | return p+2+(*len)+2; |
579 | 0 | } |
580 | | |
581 | | /* Helper function for the redisAsyncCommand* family of functions. Writes a |
582 | | * formatted command to the output buffer and registers the provided callback |
583 | | * function with the context. */ |
584 | 0 | static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) { |
585 | 0 | redisContext *c = &(ac->c); |
586 | 0 | redisCallback cb; |
587 | 0 | int pvariant, hasnext; |
588 | 0 | const char *cstr, *astr; |
589 | 0 | size_t clen, alen; |
590 | 0 | const char *p; |
591 | 0 | sds sname; |
592 | 0 | int ret; |
593 | | |
594 | | /* Don't accept new commands when the connection is about to be closed. */ |
595 | 0 | if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR; |
596 | | |
597 | | /* Setup callback */ |
598 | 0 | cb.fn = fn; |
599 | 0 | cb.privdata = privdata; |
600 | | |
601 | | /* Find out which command will be appended. */ |
602 | 0 | p = nextArgument(cmd,&cstr,&clen); |
603 | 0 | assert(p != NULL); |
604 | 0 | hasnext = (p[0] == '$'); |
605 | 0 | pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0; |
606 | 0 | cstr += pvariant; |
607 | 0 | clen -= pvariant; |
608 | |
|
609 | 0 | if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) { |
610 | 0 | c->flags |= REDIS_SUBSCRIBED; |
611 | | |
612 | | /* Add every channel/pattern to the list of subscription callbacks. */ |
613 | 0 | while ((p = nextArgument(p,&astr,&alen)) != NULL) { |
614 | 0 | sname = sdsnewlen(astr,alen); |
615 | 0 | if (pvariant) |
616 | 0 | ret = dictReplace(ac->sub.patterns,sname,&cb); |
617 | 0 | else |
618 | 0 | ret = dictReplace(ac->sub.channels,sname,&cb); |
619 | |
|
620 | 0 | if (ret == 0) sdsfree(sname); |
621 | 0 | } |
622 | 0 | } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) { |
623 | | /* It is only useful to call (P)UNSUBSCRIBE when the context is |
624 | | * subscribed to one or more channels or patterns. */ |
625 | 0 | if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR; |
626 | | |
627 | | /* (P)UNSUBSCRIBE does not have its own response: every channel or |
628 | | * pattern that is unsubscribed will receive a message. This means we |
629 | | * should not append a callback function for this command. */ |
630 | 0 | } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) { |
631 | | /* Set monitor flag and push callback */ |
632 | 0 | c->flags |= REDIS_MONITORING; |
633 | 0 | __redisPushCallback(&ac->replies,&cb); |
634 | 0 | } else { |
635 | 0 | if (c->flags & REDIS_SUBSCRIBED) |
636 | | /* This will likely result in an error reply, but it needs to be |
637 | | * received and passed to the callback. */ |
638 | 0 | __redisPushCallback(&ac->sub.invalid,&cb); |
639 | 0 | else |
640 | 0 | __redisPushCallback(&ac->replies,&cb); |
641 | 0 | } |
642 | | |
643 | 0 | __redisAppendCommand(c,cmd,len); |
644 | | |
645 | | /* Always schedule a write when the write buffer is non-empty */ |
646 | 0 | _EL_ADD_WRITE(ac); |
647 | |
|
648 | 0 | return REDIS_OK; |
649 | 0 | } |
650 | | |
651 | 0 | int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) { |
652 | 0 | char *cmd; |
653 | 0 | int len; |
654 | 0 | int status; |
655 | 0 | len = redisvFormatCommand(&cmd,format,ap); |
656 | | |
657 | | /* We don't want to pass -1 or -2 to future functions as a length. */ |
658 | 0 | if (len < 0) |
659 | 0 | return REDIS_ERR; |
660 | | |
661 | 0 | status = __redisAsyncCommand(ac,fn,privdata,cmd,len); |
662 | 0 | free(cmd); |
663 | 0 | return status; |
664 | 0 | } |
665 | | |
666 | 0 | int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) { |
667 | 0 | va_list ap; |
668 | 0 | int status; |
669 | 0 | va_start(ap,format); |
670 | 0 | status = redisvAsyncCommand(ac,fn,privdata,format,ap); |
671 | 0 | va_end(ap); |
672 | 0 | return status; |
673 | 0 | } |
674 | | |
675 | 0 | int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) { |
676 | 0 | sds cmd; |
677 | 0 | int len; |
678 | 0 | int status; |
679 | 0 | len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen); |
680 | 0 | if (len < 0) |
681 | 0 | return REDIS_ERR; |
682 | 0 | status = __redisAsyncCommand(ac,fn,privdata,cmd,len); |
683 | 0 | sdsfree(cmd); |
684 | 0 | return status; |
685 | 0 | } |
686 | | |
687 | 0 | int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) { |
688 | 0 | int status = __redisAsyncCommand(ac,fn,privdata,cmd,len); |
689 | 0 | return status; |
690 | 0 | } |