Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com> |
3 | | * Copyright (c) 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com> |
4 | | * |
5 | | * All rights reserved. |
6 | | * |
7 | | * Redistribution and use in source and binary forms, with or without |
8 | | * modification, are permitted provided that the following conditions are met: |
9 | | * |
10 | | * * Redistributions of source code must retain the above copyright notice, |
11 | | * this list of conditions and the following disclaimer. |
12 | | * * Redistributions in binary form must reproduce the above copyright |
13 | | * notice, this list of conditions and the following disclaimer in the |
14 | | * documentation and/or other materials provided with the distribution. |
15 | | * * Neither the name of Redis nor the names of its contributors may be used |
16 | | * to endorse or promote products derived from this software without |
17 | | * specific prior written permission. |
18 | | * |
19 | | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
20 | | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
21 | | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
22 | | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE |
23 | | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
24 | | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
25 | | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
26 | | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
27 | | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
28 | | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
29 | | * POSSIBILITY OF SUCH DAMAGE. |
30 | | */ |
31 | | |
32 | | #include "fmacros.h" |
33 | | #include "alloc.h" |
34 | | #include <stdlib.h> |
35 | | #include <string.h> |
36 | | #ifndef _MSC_VER |
37 | | #include <strings.h> |
38 | | #endif |
39 | | #include <assert.h> |
40 | | #include <ctype.h> |
41 | | #include <errno.h> |
42 | | #include "async.h" |
43 | | #include "net.h" |
44 | | #include "dict.c" |
45 | | #include "sds.h" |
46 | | #include "win32.h" |
47 | | |
48 | | #include "async_private.h" |
49 | | |
50 | | #ifdef NDEBUG |
51 | | #undef assert |
52 | | #define assert(e) (void)(e) |
53 | | #endif |
54 | | |
55 | | /* Forward declarations of hiredis.c functions */ |
56 | | int __redisAppendCommand(redisContext *c, const char *cmd, size_t len); |
57 | | void __redisSetError(redisContext *c, int type, const char *str); |
58 | | |
59 | | /* Functions managing dictionary of callbacks for pub/sub. */ |
60 | 0 | static unsigned int callbackHash(const void *key) { |
61 | 0 | return dictGenHashFunction((const unsigned char *)key, |
62 | 0 | sdslen((const sds)key)); |
63 | 0 | } |
64 | | |
65 | 0 | static void *callbackValDup(void *privdata, const void *src) { |
66 | 0 | ((void) privdata); |
67 | 0 | redisCallback *dup; |
68 | |
|
69 | 0 | dup = hi_malloc(sizeof(*dup)); |
70 | 0 | if (dup == NULL) |
71 | 0 | return NULL; |
72 | | |
73 | 0 | memcpy(dup,src,sizeof(*dup)); |
74 | 0 | return dup; |
75 | 0 | } |
76 | | |
77 | 0 | static int callbackKeyCompare(void *privdata, const void *key1, const void *key2) { |
78 | 0 | int l1, l2; |
79 | 0 | ((void) privdata); |
80 | |
|
81 | 0 | l1 = sdslen((const sds)key1); |
82 | 0 | l2 = sdslen((const sds)key2); |
83 | 0 | if (l1 != l2) return 0; |
84 | 0 | return memcmp(key1,key2,l1) == 0; |
85 | 0 | } |
86 | | |
87 | 0 | static void callbackKeyDestructor(void *privdata, void *key) { |
88 | 0 | ((void) privdata); |
89 | 0 | sdsfree((sds)key); |
90 | 0 | } |
91 | | |
92 | 0 | static void callbackValDestructor(void *privdata, void *val) { |
93 | 0 | ((void) privdata); |
94 | 0 | hi_free(val); |
95 | 0 | } |
96 | | |
97 | | static dictType callbackDict = { |
98 | | callbackHash, |
99 | | NULL, |
100 | | callbackValDup, |
101 | | callbackKeyCompare, |
102 | | callbackKeyDestructor, |
103 | | callbackValDestructor |
104 | | }; |
105 | | |
106 | 0 | static redisAsyncContext *redisAsyncInitialize(redisContext *c) { |
107 | 0 | redisAsyncContext *ac; |
108 | 0 | dict *channels = NULL, *patterns = NULL; |
109 | |
|
110 | 0 | channels = dictCreate(&callbackDict,NULL); |
111 | 0 | if (channels == NULL) |
112 | 0 | goto oom; |
113 | | |
114 | 0 | patterns = dictCreate(&callbackDict,NULL); |
115 | 0 | if (patterns == NULL) |
116 | 0 | goto oom; |
117 | | |
118 | 0 | ac = hi_realloc(c,sizeof(redisAsyncContext)); |
119 | 0 | if (ac == NULL) |
120 | 0 | goto oom; |
121 | | |
122 | 0 | c = &(ac->c); |
123 | | |
124 | | /* The regular connect functions will always set the flag REDIS_CONNECTED. |
125 | | * For the async API, we want to wait until the first write event is |
126 | | * received up before setting this flag, so reset it here. */ |
127 | 0 | c->flags &= ~REDIS_CONNECTED; |
128 | |
|
129 | 0 | ac->err = 0; |
130 | 0 | ac->errstr = NULL; |
131 | 0 | ac->data = NULL; |
132 | 0 | ac->dataCleanup = NULL; |
133 | |
|
134 | 0 | ac->ev.data = NULL; |
135 | 0 | ac->ev.addRead = NULL; |
136 | 0 | ac->ev.delRead = NULL; |
137 | 0 | ac->ev.addWrite = NULL; |
138 | 0 | ac->ev.delWrite = NULL; |
139 | 0 | ac->ev.cleanup = NULL; |
140 | 0 | ac->ev.scheduleTimer = NULL; |
141 | |
|
142 | 0 | ac->onConnect = NULL; |
143 | 0 | ac->onConnectNC = NULL; |
144 | 0 | ac->onDisconnect = NULL; |
145 | |
|
146 | 0 | ac->replies.head = NULL; |
147 | 0 | ac->replies.tail = NULL; |
148 | 0 | ac->sub.replies.head = NULL; |
149 | 0 | ac->sub.replies.tail = NULL; |
150 | 0 | ac->sub.channels = channels; |
151 | 0 | ac->sub.patterns = patterns; |
152 | 0 | ac->sub.pending_unsubs = 0; |
153 | |
|
154 | 0 | return ac; |
155 | 0 | oom: |
156 | 0 | if (channels) dictRelease(channels); |
157 | 0 | if (patterns) dictRelease(patterns); |
158 | 0 | return NULL; |
159 | 0 | } |
160 | | |
161 | | /* We want the error field to be accessible directly instead of requiring |
162 | | * an indirection to the redisContext struct. */ |
163 | 0 | static void __redisAsyncCopyError(redisAsyncContext *ac) { |
164 | 0 | if (!ac) |
165 | 0 | return; |
166 | | |
167 | 0 | redisContext *c = &(ac->c); |
168 | 0 | ac->err = c->err; |
169 | 0 | ac->errstr = c->errstr; |
170 | 0 | } |
171 | | |
172 | 0 | redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) { |
173 | 0 | redisOptions myOptions = *options; |
174 | 0 | redisContext *c; |
175 | 0 | redisAsyncContext *ac; |
176 | | |
177 | | /* Clear any erroneously set sync callback and flag that we don't want to |
178 | | * use freeReplyObject by default. */ |
179 | 0 | myOptions.push_cb = NULL; |
180 | 0 | myOptions.options |= REDIS_OPT_NO_PUSH_AUTOFREE; |
181 | |
|
182 | 0 | myOptions.options |= REDIS_OPT_NONBLOCK; |
183 | 0 | c = redisConnectWithOptions(&myOptions); |
184 | 0 | if (c == NULL) { |
185 | 0 | return NULL; |
186 | 0 | } |
187 | | |
188 | 0 | ac = redisAsyncInitialize(c); |
189 | 0 | if (ac == NULL) { |
190 | 0 | redisFree(c); |
191 | 0 | return NULL; |
192 | 0 | } |
193 | | |
194 | | /* Set any configured async push handler */ |
195 | 0 | redisAsyncSetPushCallback(ac, myOptions.async_push_cb); |
196 | |
|
197 | 0 | __redisAsyncCopyError(ac); |
198 | 0 | return ac; |
199 | 0 | } |
200 | | |
201 | 0 | redisAsyncContext *redisAsyncConnect(const char *ip, int port) { |
202 | 0 | redisOptions options = {0}; |
203 | 0 | REDIS_OPTIONS_SET_TCP(&options, ip, port); |
204 | 0 | return redisAsyncConnectWithOptions(&options); |
205 | 0 | } |
206 | | |
207 | | redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, |
208 | 0 | const char *source_addr) { |
209 | 0 | redisOptions options = {0}; |
210 | 0 | REDIS_OPTIONS_SET_TCP(&options, ip, port); |
211 | 0 | options.endpoint.tcp.source_addr = source_addr; |
212 | 0 | return redisAsyncConnectWithOptions(&options); |
213 | 0 | } |
214 | | |
215 | | redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, |
216 | 0 | const char *source_addr) { |
217 | 0 | redisOptions options = {0}; |
218 | 0 | REDIS_OPTIONS_SET_TCP(&options, ip, port); |
219 | 0 | options.options |= REDIS_OPT_REUSEADDR; |
220 | 0 | options.endpoint.tcp.source_addr = source_addr; |
221 | 0 | return redisAsyncConnectWithOptions(&options); |
222 | 0 | } |
223 | | |
224 | 0 | redisAsyncContext *redisAsyncConnectUnix(const char *path) { |
225 | 0 | redisOptions options = {0}; |
226 | 0 | REDIS_OPTIONS_SET_UNIX(&options, path); |
227 | 0 | return redisAsyncConnectWithOptions(&options); |
228 | 0 | } |
229 | | |
230 | | static int |
231 | | redisAsyncSetConnectCallbackImpl(redisAsyncContext *ac, redisConnectCallback *fn, |
232 | | redisConnectCallbackNC *fn_nc) |
233 | 0 | { |
234 | | /* If either are already set, this is an error */ |
235 | 0 | if (ac->onConnect || ac->onConnectNC) |
236 | 0 | return REDIS_ERR; |
237 | | |
238 | 0 | if (fn) { |
239 | 0 | ac->onConnect = fn; |
240 | 0 | } else if (fn_nc) { |
241 | 0 | ac->onConnectNC = fn_nc; |
242 | 0 | } |
243 | | |
244 | | /* The common way to detect an established connection is to wait for |
245 | | * the first write event to be fired. This assumes the related event |
246 | | * library functions are already set. */ |
247 | 0 | _EL_ADD_WRITE(ac); |
248 | |
|
249 | 0 | return REDIS_OK; |
250 | 0 | } |
251 | | |
252 | 0 | int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) { |
253 | 0 | return redisAsyncSetConnectCallbackImpl(ac, fn, NULL); |
254 | 0 | } |
255 | | |
256 | 0 | int redisAsyncSetConnectCallbackNC(redisAsyncContext *ac, redisConnectCallbackNC *fn) { |
257 | 0 | return redisAsyncSetConnectCallbackImpl(ac, NULL, fn); |
258 | 0 | } |
259 | | |
260 | 0 | int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) { |
261 | 0 | if (ac->onDisconnect == NULL) { |
262 | 0 | ac->onDisconnect = fn; |
263 | 0 | return REDIS_OK; |
264 | 0 | } |
265 | 0 | return REDIS_ERR; |
266 | 0 | } |
267 | | |
268 | | /* Helper functions to push/shift callbacks */ |
269 | 0 | static int __redisPushCallback(redisCallbackList *list, redisCallback *source) { |
270 | 0 | redisCallback *cb; |
271 | | |
272 | | /* Copy callback from stack to heap */ |
273 | 0 | cb = hi_malloc(sizeof(*cb)); |
274 | 0 | if (cb == NULL) |
275 | 0 | return REDIS_ERR_OOM; |
276 | | |
277 | 0 | if (source != NULL) { |
278 | 0 | memcpy(cb,source,sizeof(*cb)); |
279 | 0 | cb->next = NULL; |
280 | 0 | } |
281 | | |
282 | | /* Store callback in list */ |
283 | 0 | if (list->head == NULL) |
284 | 0 | list->head = cb; |
285 | 0 | if (list->tail != NULL) |
286 | 0 | list->tail->next = cb; |
287 | 0 | list->tail = cb; |
288 | 0 | return REDIS_OK; |
289 | 0 | } |
290 | | |
291 | 0 | static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) { |
292 | 0 | redisCallback *cb = list->head; |
293 | 0 | if (cb != NULL) { |
294 | 0 | list->head = cb->next; |
295 | 0 | if (cb == list->tail) |
296 | 0 | list->tail = NULL; |
297 | | |
298 | | /* Copy callback from heap to stack */ |
299 | 0 | if (target != NULL) |
300 | 0 | memcpy(target,cb,sizeof(*cb)); |
301 | 0 | hi_free(cb); |
302 | 0 | return REDIS_OK; |
303 | 0 | } |
304 | 0 | return REDIS_ERR; |
305 | 0 | } |
306 | | |
307 | 0 | static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) { |
308 | 0 | redisContext *c = &(ac->c); |
309 | 0 | if (cb->fn != NULL) { |
310 | 0 | c->flags |= REDIS_IN_CALLBACK; |
311 | 0 | cb->fn(ac,reply,cb->privdata); |
312 | 0 | c->flags &= ~REDIS_IN_CALLBACK; |
313 | 0 | } |
314 | 0 | } |
315 | | |
316 | 0 | static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) { |
317 | 0 | if (ac->push_cb != NULL) { |
318 | 0 | ac->c.flags |= REDIS_IN_CALLBACK; |
319 | 0 | ac->push_cb(ac, reply); |
320 | 0 | ac->c.flags &= ~REDIS_IN_CALLBACK; |
321 | 0 | } |
322 | 0 | } |
323 | | |
324 | | static void __redisRunConnectCallback(redisAsyncContext *ac, int status) |
325 | 0 | { |
326 | 0 | if (ac->onConnect == NULL && ac->onConnectNC == NULL) |
327 | 0 | return; |
328 | | |
329 | 0 | if (!(ac->c.flags & REDIS_IN_CALLBACK)) { |
330 | 0 | ac->c.flags |= REDIS_IN_CALLBACK; |
331 | 0 | if (ac->onConnect) { |
332 | 0 | ac->onConnect(ac, status); |
333 | 0 | } else { |
334 | 0 | ac->onConnectNC(ac, status); |
335 | 0 | } |
336 | 0 | ac->c.flags &= ~REDIS_IN_CALLBACK; |
337 | 0 | } else { |
338 | | /* already in callback */ |
339 | 0 | if (ac->onConnect) { |
340 | 0 | ac->onConnect(ac, status); |
341 | 0 | } else { |
342 | 0 | ac->onConnectNC(ac, status); |
343 | 0 | } |
344 | 0 | } |
345 | 0 | } |
346 | | |
347 | | static void __redisRunDisconnectCallback(redisAsyncContext *ac, int status) |
348 | 0 | { |
349 | 0 | if (ac->onDisconnect) { |
350 | 0 | if (!(ac->c.flags & REDIS_IN_CALLBACK)) { |
351 | 0 | ac->c.flags |= REDIS_IN_CALLBACK; |
352 | 0 | ac->onDisconnect(ac, status); |
353 | 0 | ac->c.flags &= ~REDIS_IN_CALLBACK; |
354 | 0 | } else { |
355 | | /* already in callback */ |
356 | 0 | ac->onDisconnect(ac, status); |
357 | 0 | } |
358 | 0 | } |
359 | 0 | } |
360 | | |
361 | | /* Helper function to free the context. */ |
362 | 0 | static void __redisAsyncFree(redisAsyncContext *ac) { |
363 | 0 | redisContext *c = &(ac->c); |
364 | 0 | redisCallback cb; |
365 | 0 | dictIterator it; |
366 | 0 | dictEntry *de; |
367 | | |
368 | | /* Execute pending callbacks with NULL reply. */ |
369 | 0 | while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) |
370 | 0 | __redisRunCallback(ac,&cb,NULL); |
371 | 0 | while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) |
372 | 0 | __redisRunCallback(ac,&cb,NULL); |
373 | | |
374 | | /* Run subscription callbacks with NULL reply */ |
375 | 0 | if (ac->sub.channels) { |
376 | 0 | dictInitIterator(&it,ac->sub.channels); |
377 | 0 | while ((de = dictNext(&it)) != NULL) |
378 | 0 | __redisRunCallback(ac,dictGetEntryVal(de),NULL); |
379 | |
|
380 | 0 | dictRelease(ac->sub.channels); |
381 | 0 | } |
382 | |
|
383 | 0 | if (ac->sub.patterns) { |
384 | 0 | dictInitIterator(&it,ac->sub.patterns); |
385 | 0 | while ((de = dictNext(&it)) != NULL) |
386 | 0 | __redisRunCallback(ac,dictGetEntryVal(de),NULL); |
387 | |
|
388 | 0 | dictRelease(ac->sub.patterns); |
389 | 0 | } |
390 | | |
391 | | /* Signal event lib to clean up */ |
392 | 0 | _EL_CLEANUP(ac); |
393 | | |
394 | | /* Execute disconnect callback. When redisAsyncFree() initiated destroying |
395 | | * this context, the status will always be REDIS_OK. */ |
396 | 0 | if (c->flags & REDIS_CONNECTED) { |
397 | 0 | int status = ac->err == 0 ? REDIS_OK : REDIS_ERR; |
398 | 0 | if (c->flags & REDIS_FREEING) |
399 | 0 | status = REDIS_OK; |
400 | 0 | __redisRunDisconnectCallback(ac, status); |
401 | 0 | } |
402 | |
|
403 | 0 | if (ac->dataCleanup) { |
404 | 0 | ac->dataCleanup(ac->data); |
405 | 0 | } |
406 | | |
407 | | /* Cleanup self */ |
408 | 0 | redisFree(c); |
409 | 0 | } |
410 | | |
411 | | /* Free the async context. When this function is called from a callback, |
412 | | * control needs to be returned to redisProcessCallbacks() before actual |
413 | | * free'ing. To do so, a flag is set on the context which is picked up by |
414 | | * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */ |
415 | 0 | void redisAsyncFree(redisAsyncContext *ac) { |
416 | 0 | if (ac == NULL) |
417 | 0 | return; |
418 | | |
419 | 0 | redisContext *c = &(ac->c); |
420 | |
|
421 | 0 | c->flags |= REDIS_FREEING; |
422 | 0 | if (!(c->flags & REDIS_IN_CALLBACK)) |
423 | 0 | __redisAsyncFree(ac); |
424 | 0 | } |
425 | | |
426 | | /* Helper function to make the disconnect happen and clean up. */ |
427 | 0 | void __redisAsyncDisconnect(redisAsyncContext *ac) { |
428 | 0 | redisContext *c = &(ac->c); |
429 | | |
430 | | /* Make sure error is accessible if there is any */ |
431 | 0 | __redisAsyncCopyError(ac); |
432 | |
|
433 | 0 | if (ac->err == 0) { |
434 | | /* For clean disconnects, there should be no pending callbacks. */ |
435 | 0 | int ret = __redisShiftCallback(&ac->replies,NULL); |
436 | 0 | assert(ret == REDIS_ERR); |
437 | 0 | } else { |
438 | | /* Disconnection is caused by an error, make sure that pending |
439 | | * callbacks cannot call new commands. */ |
440 | 0 | c->flags |= REDIS_DISCONNECTING; |
441 | 0 | } |
442 | | |
443 | | /* cleanup event library on disconnect. |
444 | | * this is safe to call multiple times */ |
445 | 0 | _EL_CLEANUP(ac); |
446 | | |
447 | | /* For non-clean disconnects, __redisAsyncFree() will execute pending |
448 | | * callbacks with a NULL-reply. */ |
449 | 0 | if (!(c->flags & REDIS_NO_AUTO_FREE)) { |
450 | 0 | __redisAsyncFree(ac); |
451 | 0 | } |
452 | 0 | } |
453 | | |
454 | | /* Tries to do a clean disconnect from Redis, meaning it stops new commands |
455 | | * from being issued, but tries to flush the output buffer and execute |
456 | | * callbacks for all remaining replies. When this function is called from a |
457 | | * callback, there might be more replies and we can safely defer disconnecting |
458 | | * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately |
459 | | * when there are no pending callbacks. */ |
460 | 0 | void redisAsyncDisconnect(redisAsyncContext *ac) { |
461 | 0 | redisContext *c = &(ac->c); |
462 | 0 | c->flags |= REDIS_DISCONNECTING; |
463 | | |
464 | | /** unset the auto-free flag here, because disconnect undoes this */ |
465 | 0 | c->flags &= ~REDIS_NO_AUTO_FREE; |
466 | 0 | if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL) |
467 | 0 | __redisAsyncDisconnect(ac); |
468 | 0 | } |
469 | | |
470 | 0 | static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) { |
471 | 0 | redisContext *c = &(ac->c); |
472 | 0 | dict *callbacks; |
473 | 0 | redisCallback *cb = NULL; |
474 | 0 | dictEntry *de; |
475 | 0 | int pvariant; |
476 | 0 | char *stype; |
477 | 0 | sds sname = NULL; |
478 | | |
479 | | /* Match reply with the expected format of a pushed message. |
480 | | * The type and number of elements (3 to 4) are specified at: |
481 | | * https://redis.io/topics/pubsub#format-of-pushed-messages */ |
482 | 0 | if ((reply->type == REDIS_REPLY_ARRAY && !(c->flags & REDIS_SUPPORTS_PUSH) && reply->elements >= 3) || |
483 | 0 | reply->type == REDIS_REPLY_PUSH) { |
484 | 0 | assert(reply->element[0]->type == REDIS_REPLY_STRING); |
485 | 0 | stype = reply->element[0]->str; |
486 | 0 | pvariant = (tolower(stype[0]) == 'p') ? 1 : 0; |
487 | |
|
488 | 0 | if (pvariant) |
489 | 0 | callbacks = ac->sub.patterns; |
490 | 0 | else |
491 | 0 | callbacks = ac->sub.channels; |
492 | | |
493 | | /* Locate the right callback */ |
494 | 0 | if (reply->element[1]->type == REDIS_REPLY_STRING) { |
495 | 0 | sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len); |
496 | 0 | if (sname == NULL) goto oom; |
497 | | |
498 | 0 | if ((de = dictFind(callbacks,sname)) != NULL) { |
499 | 0 | cb = dictGetEntryVal(de); |
500 | 0 | memcpy(dstcb,cb,sizeof(*dstcb)); |
501 | 0 | } |
502 | 0 | } |
503 | | |
504 | | /* If this is an subscribe reply decrease pending counter. */ |
505 | 0 | if (strcasecmp(stype+pvariant,"subscribe") == 0) { |
506 | 0 | assert(cb != NULL); |
507 | 0 | cb->pending_subs -= 1; |
508 | |
|
509 | 0 | } else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) { |
510 | 0 | if (cb == NULL) |
511 | 0 | ac->sub.pending_unsubs -= 1; |
512 | 0 | else if (cb->pending_subs == 0) |
513 | 0 | dictDelete(callbacks,sname); |
514 | | |
515 | | /* If this was the last unsubscribe message, revert to |
516 | | * non-subscribe mode. */ |
517 | 0 | assert(reply->element[2]->type == REDIS_REPLY_INTEGER); |
518 | | |
519 | | /* Unset subscribed flag only when no pipelined pending subscribe |
520 | | * or pending unsubscribe replies. */ |
521 | 0 | if (reply->element[2]->integer == 0 |
522 | 0 | && dictSize(ac->sub.channels) == 0 |
523 | 0 | && dictSize(ac->sub.patterns) == 0 |
524 | 0 | && ac->sub.pending_unsubs == 0) { |
525 | 0 | c->flags &= ~REDIS_SUBSCRIBED; |
526 | | |
527 | | /* Move ongoing regular command callbacks. */ |
528 | 0 | redisCallback cb; |
529 | 0 | while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) { |
530 | 0 | __redisPushCallback(&ac->replies,&cb); |
531 | 0 | } |
532 | 0 | } |
533 | 0 | } |
534 | 0 | sdsfree(sname); |
535 | 0 | } else { |
536 | | /* Shift callback for pending command in subscribed context. */ |
537 | 0 | __redisShiftCallback(&ac->sub.replies,dstcb); |
538 | 0 | } |
539 | 0 | return REDIS_OK; |
540 | 0 | oom: |
541 | 0 | __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory"); |
542 | 0 | __redisAsyncCopyError(ac); |
543 | 0 | return REDIS_ERR; |
544 | 0 | } |
545 | | |
546 | | #define redisIsSpontaneousPushReply(r) \ |
547 | 0 | (redisIsPushReply(r) && !redisIsSubscribeReply(r)) |
548 | | |
549 | 0 | static int redisIsSubscribeReply(redisReply *reply) { |
550 | 0 | char *str; |
551 | 0 | size_t len, off; |
552 | | |
553 | | /* We will always have at least one string with the subscribe/message type */ |
554 | 0 | if (reply->elements < 1 || reply->element[0]->type != REDIS_REPLY_STRING || |
555 | 0 | reply->element[0]->len < sizeof("message") - 1) |
556 | 0 | { |
557 | 0 | return 0; |
558 | 0 | } |
559 | | |
560 | | /* Get the string/len moving past 'p' if needed */ |
561 | 0 | off = tolower(reply->element[0]->str[0]) == 'p'; |
562 | 0 | str = reply->element[0]->str + off; |
563 | 0 | len = reply->element[0]->len - off; |
564 | |
|
565 | 0 | return !strncasecmp(str, "subscribe", len) || |
566 | 0 | !strncasecmp(str, "message", len) || |
567 | 0 | !strncasecmp(str, "unsubscribe", len); |
568 | 0 | } |
569 | | |
570 | 0 | void redisProcessCallbacks(redisAsyncContext *ac) { |
571 | 0 | redisContext *c = &(ac->c); |
572 | 0 | void *reply = NULL; |
573 | 0 | int status; |
574 | |
|
575 | 0 | while((status = redisGetReply(c,&reply)) == REDIS_OK) { |
576 | 0 | if (reply == NULL) { |
577 | | /* When the connection is being disconnected and there are |
578 | | * no more replies, this is the cue to really disconnect. */ |
579 | 0 | if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0 |
580 | 0 | && ac->replies.head == NULL) { |
581 | 0 | __redisAsyncDisconnect(ac); |
582 | 0 | return; |
583 | 0 | } |
584 | | /* When the connection is not being disconnected, simply stop |
585 | | * trying to get replies and wait for the next loop tick. */ |
586 | 0 | break; |
587 | 0 | } |
588 | | |
589 | | /* Keep track of push message support for subscribe handling */ |
590 | 0 | if (redisIsPushReply(reply)) c->flags |= REDIS_SUPPORTS_PUSH; |
591 | | |
592 | | /* Send any non-subscribe related PUSH messages to our PUSH handler |
593 | | * while allowing subscribe related PUSH messages to pass through. |
594 | | * This allows existing code to be backward compatible and work in |
595 | | * either RESP2 or RESP3 mode. */ |
596 | 0 | if (redisIsSpontaneousPushReply(reply)) { |
597 | 0 | __redisRunPushCallback(ac, reply); |
598 | 0 | c->reader->fn->freeObject(reply); |
599 | 0 | continue; |
600 | 0 | } |
601 | | |
602 | | /* Even if the context is subscribed, pending regular |
603 | | * callbacks will get a reply before pub/sub messages arrive. */ |
604 | 0 | redisCallback cb = {NULL, NULL, 0, 0, NULL}; |
605 | 0 | if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) { |
606 | | /* |
607 | | * A spontaneous reply in a not-subscribed context can be the error |
608 | | * reply that is sent when a new connection exceeds the maximum |
609 | | * number of allowed connections on the server side. |
610 | | * |
611 | | * This is seen as an error instead of a regular reply because the |
612 | | * server closes the connection after sending it. |
613 | | * |
614 | | * To prevent the error from being overwritten by an EOF error the |
615 | | * connection is closed here. See issue #43. |
616 | | * |
617 | | * Another possibility is that the server is loading its dataset. |
618 | | * In this case we also want to close the connection, and have the |
619 | | * user wait until the server is ready to take our request. |
620 | | */ |
621 | 0 | if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) { |
622 | 0 | c->err = REDIS_ERR_OTHER; |
623 | 0 | snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str); |
624 | 0 | c->reader->fn->freeObject(reply); |
625 | 0 | __redisAsyncDisconnect(ac); |
626 | 0 | return; |
627 | 0 | } |
628 | | /* No more regular callbacks and no errors, the context *must* be subscribed. */ |
629 | 0 | assert(c->flags & REDIS_SUBSCRIBED); |
630 | 0 | if (c->flags & REDIS_SUBSCRIBED) |
631 | 0 | __redisGetSubscribeCallback(ac,reply,&cb); |
632 | 0 | } |
633 | | |
634 | 0 | if (cb.fn != NULL) { |
635 | 0 | __redisRunCallback(ac,&cb,reply); |
636 | 0 | if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){ |
637 | 0 | c->reader->fn->freeObject(reply); |
638 | 0 | } |
639 | | |
640 | | /* Proceed with free'ing when redisAsyncFree() was called. */ |
641 | 0 | if (c->flags & REDIS_FREEING) { |
642 | 0 | __redisAsyncFree(ac); |
643 | 0 | return; |
644 | 0 | } |
645 | 0 | } else { |
646 | | /* No callback for this reply. This can either be a NULL callback, |
647 | | * or there were no callbacks to begin with. Either way, don't |
648 | | * abort with an error, but simply ignore it because the client |
649 | | * doesn't know what the server will spit out over the wire. */ |
650 | 0 | c->reader->fn->freeObject(reply); |
651 | 0 | } |
652 | | |
653 | | /* If in monitor mode, repush the callback */ |
654 | 0 | if (c->flags & REDIS_MONITORING) { |
655 | 0 | __redisPushCallback(&ac->replies,&cb); |
656 | 0 | } |
657 | 0 | } |
658 | | |
659 | | /* Disconnect when there was an error reading the reply */ |
660 | 0 | if (status != REDIS_OK) |
661 | 0 | __redisAsyncDisconnect(ac); |
662 | 0 | } |
663 | | |
664 | 0 | static void __redisAsyncHandleConnectFailure(redisAsyncContext *ac) { |
665 | 0 | __redisRunConnectCallback(ac, REDIS_ERR); |
666 | 0 | __redisAsyncDisconnect(ac); |
667 | 0 | } |
668 | | |
669 | | /* Internal helper function to detect socket status the first time a read or |
670 | | * write event fires. When connecting was not successful, the connect callback |
671 | | * is called with a REDIS_ERR status and the context is free'd. */ |
672 | 0 | static int __redisAsyncHandleConnect(redisAsyncContext *ac) { |
673 | 0 | int completed = 0; |
674 | 0 | redisContext *c = &(ac->c); |
675 | |
|
676 | 0 | if (redisCheckConnectDone(c, &completed) == REDIS_ERR) { |
677 | | /* Error! */ |
678 | 0 | if (redisCheckSocketError(c) == REDIS_ERR) |
679 | 0 | __redisAsyncCopyError(ac); |
680 | 0 | __redisAsyncHandleConnectFailure(ac); |
681 | 0 | return REDIS_ERR; |
682 | 0 | } else if (completed == 1) { |
683 | | /* connected! */ |
684 | 0 | if (c->connection_type == REDIS_CONN_TCP && |
685 | 0 | redisSetTcpNoDelay(c) == REDIS_ERR) { |
686 | 0 | __redisAsyncHandleConnectFailure(ac); |
687 | 0 | return REDIS_ERR; |
688 | 0 | } |
689 | | |
690 | | /* flag us as fully connect, but allow the callback |
691 | | * to disconnect. For that reason, permit the function |
692 | | * to delete the context here after callback return. |
693 | | */ |
694 | 0 | c->flags |= REDIS_CONNECTED; |
695 | 0 | __redisRunConnectCallback(ac, REDIS_OK); |
696 | 0 | if ((ac->c.flags & REDIS_DISCONNECTING)) { |
697 | 0 | redisAsyncDisconnect(ac); |
698 | 0 | return REDIS_ERR; |
699 | 0 | } else if ((ac->c.flags & REDIS_FREEING)) { |
700 | 0 | redisAsyncFree(ac); |
701 | 0 | return REDIS_ERR; |
702 | 0 | } |
703 | 0 | return REDIS_OK; |
704 | 0 | } else { |
705 | 0 | return REDIS_OK; |
706 | 0 | } |
707 | 0 | } |
708 | | |
709 | 0 | void redisAsyncRead(redisAsyncContext *ac) { |
710 | 0 | redisContext *c = &(ac->c); |
711 | |
|
712 | 0 | if (redisBufferRead(c) == REDIS_ERR) { |
713 | 0 | __redisAsyncDisconnect(ac); |
714 | 0 | } else { |
715 | | /* Always re-schedule reads */ |
716 | 0 | _EL_ADD_READ(ac); |
717 | 0 | redisProcessCallbacks(ac); |
718 | 0 | } |
719 | 0 | } |
720 | | |
721 | | /* This function should be called when the socket is readable. |
722 | | * It processes all replies that can be read and executes their callbacks. |
723 | | */ |
724 | 0 | void redisAsyncHandleRead(redisAsyncContext *ac) { |
725 | 0 | redisContext *c = &(ac->c); |
726 | | /* must not be called from a callback */ |
727 | 0 | assert(!(c->flags & REDIS_IN_CALLBACK)); |
728 | | |
729 | 0 | if (!(c->flags & REDIS_CONNECTED)) { |
730 | | /* Abort connect was not successful. */ |
731 | 0 | if (__redisAsyncHandleConnect(ac) != REDIS_OK) |
732 | 0 | return; |
733 | | /* Try again later when the context is still not connected. */ |
734 | 0 | if (!(c->flags & REDIS_CONNECTED)) |
735 | 0 | return; |
736 | 0 | } |
737 | | |
738 | 0 | c->funcs->async_read(ac); |
739 | 0 | } |
740 | | |
741 | 0 | void redisAsyncWrite(redisAsyncContext *ac) { |
742 | 0 | redisContext *c = &(ac->c); |
743 | 0 | int done = 0; |
744 | |
|
745 | 0 | if (redisBufferWrite(c,&done) == REDIS_ERR) { |
746 | 0 | __redisAsyncDisconnect(ac); |
747 | 0 | } else { |
748 | | /* Continue writing when not done, stop writing otherwise */ |
749 | 0 | if (!done) |
750 | 0 | _EL_ADD_WRITE(ac); |
751 | 0 | else |
752 | 0 | _EL_DEL_WRITE(ac); |
753 | | |
754 | | /* Always schedule reads after writes */ |
755 | 0 | _EL_ADD_READ(ac); |
756 | 0 | } |
757 | 0 | } |
758 | | |
759 | 0 | void redisAsyncHandleWrite(redisAsyncContext *ac) { |
760 | 0 | redisContext *c = &(ac->c); |
761 | | /* must not be called from a callback */ |
762 | 0 | assert(!(c->flags & REDIS_IN_CALLBACK)); |
763 | | |
764 | 0 | if (!(c->flags & REDIS_CONNECTED)) { |
765 | | /* Abort connect was not successful. */ |
766 | 0 | if (__redisAsyncHandleConnect(ac) != REDIS_OK) |
767 | 0 | return; |
768 | | /* Try again later when the context is still not connected. */ |
769 | 0 | if (!(c->flags & REDIS_CONNECTED)) |
770 | 0 | return; |
771 | 0 | } |
772 | | |
773 | 0 | c->funcs->async_write(ac); |
774 | 0 | } |
775 | | |
776 | 0 | void redisAsyncHandleTimeout(redisAsyncContext *ac) { |
777 | 0 | redisContext *c = &(ac->c); |
778 | 0 | redisCallback cb; |
779 | | /* must not be called from a callback */ |
780 | 0 | assert(!(c->flags & REDIS_IN_CALLBACK)); |
781 | | |
782 | 0 | if ((c->flags & REDIS_CONNECTED)) { |
783 | 0 | if (ac->replies.head == NULL && ac->sub.replies.head == NULL) { |
784 | | /* Nothing to do - just an idle timeout */ |
785 | 0 | return; |
786 | 0 | } |
787 | | |
788 | 0 | if (!ac->c.command_timeout || |
789 | 0 | (!ac->c.command_timeout->tv_sec && !ac->c.command_timeout->tv_usec)) { |
790 | | /* A belated connect timeout arriving, ignore */ |
791 | 0 | return; |
792 | 0 | } |
793 | 0 | } |
794 | | |
795 | 0 | if (!c->err) { |
796 | 0 | __redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout"); |
797 | 0 | __redisAsyncCopyError(ac); |
798 | 0 | } |
799 | |
|
800 | 0 | if (!(c->flags & REDIS_CONNECTED)) { |
801 | 0 | __redisRunConnectCallback(ac, REDIS_ERR); |
802 | 0 | } |
803 | |
|
804 | 0 | while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) { |
805 | 0 | __redisRunCallback(ac, &cb, NULL); |
806 | 0 | } |
807 | | |
808 | | /** |
809 | | * TODO: Don't automatically sever the connection, |
810 | | * rather, allow to ignore <x> responses before the queue is clear |
811 | | */ |
812 | 0 | __redisAsyncDisconnect(ac); |
813 | 0 | } |
814 | | |
815 | | /* Sets a pointer to the first argument and its length starting at p. Returns |
816 | | * the number of bytes to skip to get to the following argument. */ |
817 | 0 | static const char *nextArgument(const char *start, const char **str, size_t *len) { |
818 | 0 | const char *p = start; |
819 | 0 | if (p[0] != '$') { |
820 | 0 | p = strchr(p,'$'); |
821 | 0 | if (p == NULL) return NULL; |
822 | 0 | } |
823 | | |
824 | 0 | *len = (int)strtol(p+1,NULL,10); |
825 | 0 | p = strchr(p,'\r'); |
826 | 0 | assert(p); |
827 | 0 | *str = p+2; |
828 | 0 | return p+2+(*len)+2; |
829 | 0 | } |
830 | | |
831 | | /* Helper function for the redisAsyncCommand* family of functions. Writes a |
832 | | * formatted command to the output buffer and registers the provided callback |
833 | | * function with the context. */ |
834 | 0 | static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) { |
835 | 0 | redisContext *c = &(ac->c); |
836 | 0 | redisCallback cb; |
837 | 0 | struct dict *cbdict; |
838 | 0 | dictIterator it; |
839 | 0 | dictEntry *de; |
840 | 0 | redisCallback *existcb; |
841 | 0 | int pvariant, hasnext; |
842 | 0 | const char *cstr, *astr; |
843 | 0 | size_t clen, alen; |
844 | 0 | const char *p; |
845 | 0 | sds sname; |
846 | 0 | int ret; |
847 | | |
848 | | /* Don't accept new commands when the connection is about to be closed. */ |
849 | 0 | if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR; |
850 | | |
851 | | /* Setup callback */ |
852 | 0 | cb.fn = fn; |
853 | 0 | cb.privdata = privdata; |
854 | 0 | cb.pending_subs = 1; |
855 | 0 | cb.unsubscribe_sent = 0; |
856 | | |
857 | | /* Find out which command will be appended. */ |
858 | 0 | p = nextArgument(cmd,&cstr,&clen); |
859 | 0 | assert(p != NULL); |
860 | 0 | hasnext = (p[0] == '$'); |
861 | 0 | pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0; |
862 | 0 | cstr += pvariant; |
863 | 0 | clen -= pvariant; |
864 | |
|
865 | 0 | if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) { |
866 | 0 | c->flags |= REDIS_SUBSCRIBED; |
867 | | |
868 | | /* Add every channel/pattern to the list of subscription callbacks. */ |
869 | 0 | while ((p = nextArgument(p,&astr,&alen)) != NULL) { |
870 | 0 | sname = sdsnewlen(astr,alen); |
871 | 0 | if (sname == NULL) |
872 | 0 | goto oom; |
873 | | |
874 | 0 | if (pvariant) |
875 | 0 | cbdict = ac->sub.patterns; |
876 | 0 | else |
877 | 0 | cbdict = ac->sub.channels; |
878 | |
|
879 | 0 | de = dictFind(cbdict,sname); |
880 | |
|
881 | 0 | if (de != NULL) { |
882 | 0 | existcb = dictGetEntryVal(de); |
883 | 0 | cb.pending_subs = existcb->pending_subs + 1; |
884 | 0 | } |
885 | |
|
886 | 0 | ret = dictReplace(cbdict,sname,&cb); |
887 | |
|
888 | 0 | if (ret == 0) sdsfree(sname); |
889 | 0 | } |
890 | 0 | } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) { |
891 | | /* It is only useful to call (P)UNSUBSCRIBE when the context is |
892 | | * subscribed to one or more channels or patterns. */ |
893 | 0 | if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR; |
894 | | |
895 | 0 | if (pvariant) |
896 | 0 | cbdict = ac->sub.patterns; |
897 | 0 | else |
898 | 0 | cbdict = ac->sub.channels; |
899 | |
|
900 | 0 | if (hasnext) { |
901 | | /* Send an unsubscribe with specific channels/patterns. |
902 | | * Bookkeeping the number of expected replies */ |
903 | 0 | while ((p = nextArgument(p,&astr,&alen)) != NULL) { |
904 | 0 | sname = sdsnewlen(astr,alen); |
905 | 0 | if (sname == NULL) |
906 | 0 | goto oom; |
907 | | |
908 | 0 | de = dictFind(cbdict,sname); |
909 | 0 | if (de != NULL) { |
910 | 0 | existcb = dictGetEntryVal(de); |
911 | 0 | if (existcb->unsubscribe_sent == 0) |
912 | 0 | existcb->unsubscribe_sent = 1; |
913 | 0 | else |
914 | | /* Already sent, reply to be ignored */ |
915 | 0 | ac->sub.pending_unsubs += 1; |
916 | 0 | } else { |
917 | | /* Not subscribed to, reply to be ignored */ |
918 | 0 | ac->sub.pending_unsubs += 1; |
919 | 0 | } |
920 | 0 | sdsfree(sname); |
921 | 0 | } |
922 | 0 | } else { |
923 | | /* Send an unsubscribe without specific channels/patterns. |
924 | | * Bookkeeping the number of expected replies */ |
925 | 0 | int no_subs = 1; |
926 | 0 | dictInitIterator(&it,cbdict); |
927 | 0 | while ((de = dictNext(&it)) != NULL) { |
928 | 0 | existcb = dictGetEntryVal(de); |
929 | 0 | if (existcb->unsubscribe_sent == 0) { |
930 | 0 | existcb->unsubscribe_sent = 1; |
931 | 0 | no_subs = 0; |
932 | 0 | } |
933 | 0 | } |
934 | | /* Unsubscribing to all channels/patterns, where none is |
935 | | * subscribed to, results in a single reply to be ignored. */ |
936 | 0 | if (no_subs == 1) |
937 | 0 | ac->sub.pending_unsubs += 1; |
938 | 0 | } |
939 | | |
940 | | /* (P)UNSUBSCRIBE does not have its own response: every channel or |
941 | | * pattern that is unsubscribed will receive a message. This means we |
942 | | * should not append a callback function for this command. */ |
943 | 0 | } else if (strncasecmp(cstr,"monitor\r\n",9) == 0) { |
944 | | /* Set monitor flag and push callback */ |
945 | 0 | c->flags |= REDIS_MONITORING; |
946 | 0 | if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK) |
947 | 0 | goto oom; |
948 | 0 | } else { |
949 | 0 | if (c->flags & REDIS_SUBSCRIBED) { |
950 | 0 | if (__redisPushCallback(&ac->sub.replies,&cb) != REDIS_OK) |
951 | 0 | goto oom; |
952 | 0 | } else { |
953 | 0 | if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK) |
954 | 0 | goto oom; |
955 | 0 | } |
956 | 0 | } |
957 | | |
958 | 0 | __redisAppendCommand(c,cmd,len); |
959 | | |
960 | | /* Always schedule a write when the write buffer is non-empty */ |
961 | 0 | _EL_ADD_WRITE(ac); |
962 | |
|
963 | 0 | return REDIS_OK; |
964 | 0 | oom: |
965 | 0 | __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory"); |
966 | 0 | __redisAsyncCopyError(ac); |
967 | 0 | return REDIS_ERR; |
968 | 0 | } |
969 | | |
970 | 0 | int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) { |
971 | 0 | char *cmd; |
972 | 0 | int len; |
973 | 0 | int status; |
974 | 0 | len = redisvFormatCommand(&cmd,format,ap); |
975 | | |
976 | | /* We don't want to pass -1 or -2 to future functions as a length. */ |
977 | 0 | if (len < 0) |
978 | 0 | return REDIS_ERR; |
979 | | |
980 | 0 | status = __redisAsyncCommand(ac,fn,privdata,cmd,len); |
981 | 0 | hi_free(cmd); |
982 | 0 | return status; |
983 | 0 | } |
984 | | |
985 | 0 | int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) { |
986 | 0 | va_list ap; |
987 | 0 | int status; |
988 | 0 | va_start(ap,format); |
989 | 0 | status = redisvAsyncCommand(ac,fn,privdata,format,ap); |
990 | 0 | va_end(ap); |
991 | 0 | return status; |
992 | 0 | } |
993 | | |
994 | 0 | int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) { |
995 | 0 | sds cmd; |
996 | 0 | long long len; |
997 | 0 | int status; |
998 | 0 | len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen); |
999 | 0 | if (len < 0) |
1000 | 0 | return REDIS_ERR; |
1001 | 0 | status = __redisAsyncCommand(ac,fn,privdata,cmd,len); |
1002 | 0 | sdsfree(cmd); |
1003 | 0 | return status; |
1004 | 0 | } |
1005 | | |
1006 | 0 | int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) { |
1007 | 0 | int status = __redisAsyncCommand(ac,fn,privdata,cmd,len); |
1008 | 0 | return status; |
1009 | 0 | } |
1010 | | |
1011 | 0 | redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn) { |
1012 | 0 | redisAsyncPushFn *old = ac->push_cb; |
1013 | 0 | ac->push_cb = fn; |
1014 | 0 | return old; |
1015 | 0 | } |
1016 | | |
1017 | 0 | int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) { |
1018 | 0 | if (!ac->c.command_timeout) { |
1019 | 0 | ac->c.command_timeout = hi_calloc(1, sizeof(tv)); |
1020 | 0 | if (ac->c.command_timeout == NULL) { |
1021 | 0 | __redisSetError(&ac->c, REDIS_ERR_OOM, "Out of memory"); |
1022 | 0 | __redisAsyncCopyError(ac); |
1023 | 0 | return REDIS_ERR; |
1024 | 0 | } |
1025 | 0 | } |
1026 | | |
1027 | 0 | if (tv.tv_sec != ac->c.command_timeout->tv_sec || |
1028 | 0 | tv.tv_usec != ac->c.command_timeout->tv_usec) |
1029 | 0 | { |
1030 | 0 | *ac->c.command_timeout = tv; |
1031 | 0 | } |
1032 | |
|
1033 | 0 | return REDIS_OK; |
1034 | 0 | } |