Coverage Report

Created: 2024-09-19 06:25

/src/hiredis/async.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
3
 * Copyright (c) 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4
 *
5
 * All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or without
8
 * modification, are permitted provided that the following conditions are met:
9
 *
10
 *   * Redistributions of source code must retain the above copyright notice,
11
 *     this list of conditions and the following disclaimer.
12
 *   * Redistributions in binary form must reproduce the above copyright
13
 *     notice, this list of conditions and the following disclaimer in the
14
 *     documentation and/or other materials provided with the distribution.
15
 *   * Neither the name of Redis nor the names of its contributors may be used
16
 *     to endorse or promote products derived from this software without
17
 *     specific prior written permission.
18
 *
19
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
 * POSSIBILITY OF SUCH DAMAGE.
30
 */
31
32
#include "fmacros.h"
33
#include "alloc.h"
34
#include <stdlib.h>
35
#include <string.h>
36
#ifndef _MSC_VER
37
#include <strings.h>
38
#endif
39
#include <assert.h>
40
#include <ctype.h>
41
#include <errno.h>
42
#include "async.h"
43
#include "net.h"
44
#include "dict.c"
45
#include "sds.h"
46
#include "win32.h"
47
48
#include "async_private.h"
49
50
#ifdef NDEBUG
51
#undef assert
52
#define assert(e) (void)(e)
53
#endif
54
55
/* Forward declarations of hiredis.c functions */
56
int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
57
void __redisSetError(redisContext *c, int type, const char *str);
58
59
/* Functions managing dictionary of callbacks for pub/sub. */
60
0
static unsigned int callbackHash(const void *key) {
61
0
    return dictGenHashFunction((const unsigned char *)key,
62
0
                               sdslen((const sds)key));
63
0
}
64
65
0
static void *callbackValDup(void *privdata, const void *src) {
66
0
    ((void) privdata);
67
0
    redisCallback *dup;
68
69
0
    dup = hi_malloc(sizeof(*dup));
70
0
    if (dup == NULL)
71
0
        return NULL;
72
73
0
    memcpy(dup,src,sizeof(*dup));
74
0
    return dup;
75
0
}
76
77
0
static int callbackKeyCompare(void *privdata, const void *key1, const void *key2) {
78
0
    int l1, l2;
79
0
    ((void) privdata);
80
81
0
    l1 = sdslen((const sds)key1);
82
0
    l2 = sdslen((const sds)key2);
83
0
    if (l1 != l2) return 0;
84
0
    return memcmp(key1,key2,l1) == 0;
85
0
}
86
87
0
static void callbackKeyDestructor(void *privdata, void *key) {
88
0
    ((void) privdata);
89
0
    sdsfree((sds)key);
90
0
}
91
92
0
static void callbackValDestructor(void *privdata, void *val) {
93
0
    ((void) privdata);
94
0
    hi_free(val);
95
0
}
96
97
static dictType callbackDict = {
98
    callbackHash,
99
    NULL,
100
    callbackValDup,
101
    callbackKeyCompare,
102
    callbackKeyDestructor,
103
    callbackValDestructor
104
};
105
106
0
static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
107
0
    redisAsyncContext *ac;
108
0
    dict *channels = NULL, *patterns = NULL;
109
110
0
    channels = dictCreate(&callbackDict,NULL);
111
0
    if (channels == NULL)
112
0
        goto oom;
113
114
0
    patterns = dictCreate(&callbackDict,NULL);
115
0
    if (patterns == NULL)
116
0
        goto oom;
117
118
0
    ac = hi_realloc(c,sizeof(redisAsyncContext));
119
0
    if (ac == NULL)
120
0
        goto oom;
121
122
0
    c = &(ac->c);
123
124
    /* The regular connect functions will always set the flag REDIS_CONNECTED.
125
     * For the async API, we want to wait until the first write event is
126
     * received up before setting this flag, so reset it here. */
127
0
    c->flags &= ~REDIS_CONNECTED;
128
129
0
    ac->err = 0;
130
0
    ac->errstr = NULL;
131
0
    ac->data = NULL;
132
0
    ac->dataCleanup = NULL;
133
134
0
    ac->ev.data = NULL;
135
0
    ac->ev.addRead = NULL;
136
0
    ac->ev.delRead = NULL;
137
0
    ac->ev.addWrite = NULL;
138
0
    ac->ev.delWrite = NULL;
139
0
    ac->ev.cleanup = NULL;
140
0
    ac->ev.scheduleTimer = NULL;
141
142
0
    ac->onConnect = NULL;
143
0
    ac->onConnectNC = NULL;
144
0
    ac->onDisconnect = NULL;
145
146
0
    ac->replies.head = NULL;
147
0
    ac->replies.tail = NULL;
148
0
    ac->sub.replies.head = NULL;
149
0
    ac->sub.replies.tail = NULL;
150
0
    ac->sub.channels = channels;
151
0
    ac->sub.patterns = patterns;
152
0
    ac->sub.pending_unsubs = 0;
153
154
0
    return ac;
155
0
oom:
156
0
    if (channels) dictRelease(channels);
157
0
    if (patterns) dictRelease(patterns);
158
0
    return NULL;
159
0
}
160
161
/* We want the error field to be accessible directly instead of requiring
162
 * an indirection to the redisContext struct. */
163
0
static void __redisAsyncCopyError(redisAsyncContext *ac) {
164
0
    if (!ac)
165
0
        return;
166
167
0
    redisContext *c = &(ac->c);
168
0
    ac->err = c->err;
169
0
    ac->errstr = c->errstr;
170
0
}
171
172
0
redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) {
173
0
    redisOptions myOptions = *options;
174
0
    redisContext *c;
175
0
    redisAsyncContext *ac;
176
177
    /* Clear any erroneously set sync callback and flag that we don't want to
178
     * use freeReplyObject by default. */
179
0
    myOptions.push_cb = NULL;
180
0
    myOptions.options |= REDIS_OPT_NO_PUSH_AUTOFREE;
181
182
0
    myOptions.options |= REDIS_OPT_NONBLOCK;
183
0
    c = redisConnectWithOptions(&myOptions);
184
0
    if (c == NULL) {
185
0
        return NULL;
186
0
    }
187
188
0
    ac = redisAsyncInitialize(c);
189
0
    if (ac == NULL) {
190
0
        redisFree(c);
191
0
        return NULL;
192
0
    }
193
194
    /* Set any configured async push handler */
195
0
    redisAsyncSetPushCallback(ac, myOptions.async_push_cb);
196
197
0
    __redisAsyncCopyError(ac);
198
0
    return ac;
199
0
}
200
201
0
redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
202
0
    redisOptions options = {0};
203
0
    REDIS_OPTIONS_SET_TCP(&options, ip, port);
204
0
    return redisAsyncConnectWithOptions(&options);
205
0
}
206
207
redisAsyncContext *redisAsyncConnectBind(const char *ip, int port,
208
0
                                         const char *source_addr) {
209
0
    redisOptions options = {0};
210
0
    REDIS_OPTIONS_SET_TCP(&options, ip, port);
211
0
    options.endpoint.tcp.source_addr = source_addr;
212
0
    return redisAsyncConnectWithOptions(&options);
213
0
}
214
215
redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
216
0
                                                  const char *source_addr) {
217
0
    redisOptions options = {0};
218
0
    REDIS_OPTIONS_SET_TCP(&options, ip, port);
219
0
    options.options |= REDIS_OPT_REUSEADDR;
220
0
    options.endpoint.tcp.source_addr = source_addr;
221
0
    return redisAsyncConnectWithOptions(&options);
222
0
}
223
224
0
redisAsyncContext *redisAsyncConnectUnix(const char *path) {
225
0
    redisOptions options = {0};
226
0
    REDIS_OPTIONS_SET_UNIX(&options, path);
227
0
    return redisAsyncConnectWithOptions(&options);
228
0
}
229
230
static int
231
redisAsyncSetConnectCallbackImpl(redisAsyncContext *ac, redisConnectCallback *fn,
232
                                 redisConnectCallbackNC *fn_nc)
233
0
{
234
    /* If either are already set, this is an error */
235
0
    if (ac->onConnect || ac->onConnectNC)
236
0
        return REDIS_ERR;
237
238
0
    if (fn) {
239
0
        ac->onConnect = fn;
240
0
    } else if (fn_nc) {
241
0
        ac->onConnectNC = fn_nc;
242
0
    }
243
244
    /* The common way to detect an established connection is to wait for
245
     * the first write event to be fired. This assumes the related event
246
     * library functions are already set. */
247
0
    _EL_ADD_WRITE(ac);
248
249
0
    return REDIS_OK;
250
0
}
251
252
0
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
253
0
    return redisAsyncSetConnectCallbackImpl(ac, fn, NULL);
254
0
}
255
256
0
int redisAsyncSetConnectCallbackNC(redisAsyncContext *ac, redisConnectCallbackNC *fn) {
257
0
    return redisAsyncSetConnectCallbackImpl(ac, NULL, fn);
258
0
}
259
260
0
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
261
0
    if (ac->onDisconnect == NULL) {
262
0
        ac->onDisconnect = fn;
263
0
        return REDIS_OK;
264
0
    }
265
0
    return REDIS_ERR;
266
0
}
267
268
/* Helper functions to push/shift callbacks */
269
0
static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
270
0
    redisCallback *cb;
271
272
    /* Copy callback from stack to heap */
273
0
    cb = hi_malloc(sizeof(*cb));
274
0
    if (cb == NULL)
275
0
        return REDIS_ERR_OOM;
276
277
0
    if (source != NULL) {
278
0
        memcpy(cb,source,sizeof(*cb));
279
0
        cb->next = NULL;
280
0
    }
281
282
    /* Store callback in list */
283
0
    if (list->head == NULL)
284
0
        list->head = cb;
285
0
    if (list->tail != NULL)
286
0
        list->tail->next = cb;
287
0
    list->tail = cb;
288
0
    return REDIS_OK;
289
0
}
290
291
0
static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) {
292
0
    redisCallback *cb = list->head;
293
0
    if (cb != NULL) {
294
0
        list->head = cb->next;
295
0
        if (cb == list->tail)
296
0
            list->tail = NULL;
297
298
        /* Copy callback from heap to stack */
299
0
        if (target != NULL)
300
0
            memcpy(target,cb,sizeof(*cb));
301
0
        hi_free(cb);
302
0
        return REDIS_OK;
303
0
    }
304
0
    return REDIS_ERR;
305
0
}
306
307
0
static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
308
0
    redisContext *c = &(ac->c);
309
0
    if (cb->fn != NULL) {
310
0
        c->flags |= REDIS_IN_CALLBACK;
311
0
        cb->fn(ac,reply,cb->privdata);
312
0
        c->flags &= ~REDIS_IN_CALLBACK;
313
0
    }
314
0
}
315
316
0
static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) {
317
0
    if (ac->push_cb != NULL) {
318
0
        ac->c.flags |= REDIS_IN_CALLBACK;
319
0
        ac->push_cb(ac, reply);
320
0
        ac->c.flags &= ~REDIS_IN_CALLBACK;
321
0
    }
322
0
}
323
324
static void __redisRunConnectCallback(redisAsyncContext *ac, int status)
325
0
{
326
0
    if (ac->onConnect == NULL && ac->onConnectNC == NULL)
327
0
        return;
328
329
0
    if (!(ac->c.flags & REDIS_IN_CALLBACK)) {
330
0
        ac->c.flags |= REDIS_IN_CALLBACK;
331
0
        if (ac->onConnect) {
332
0
            ac->onConnect(ac, status);
333
0
        } else {
334
0
            ac->onConnectNC(ac, status);
335
0
        }
336
0
        ac->c.flags &= ~REDIS_IN_CALLBACK;
337
0
    } else {
338
        /* already in callback */
339
0
        if (ac->onConnect) {
340
0
            ac->onConnect(ac, status);
341
0
        } else {
342
0
            ac->onConnectNC(ac, status);
343
0
        }
344
0
    }
345
0
}
346
347
static void __redisRunDisconnectCallback(redisAsyncContext *ac, int status)
348
0
{
349
0
    if (ac->onDisconnect) {
350
0
        if (!(ac->c.flags & REDIS_IN_CALLBACK)) {
351
0
            ac->c.flags |= REDIS_IN_CALLBACK;
352
0
            ac->onDisconnect(ac, status);
353
0
            ac->c.flags &= ~REDIS_IN_CALLBACK;
354
0
        } else {
355
            /* already in callback */
356
0
            ac->onDisconnect(ac, status);
357
0
        }
358
0
    }
359
0
}
360
361
/* Helper function to free the context. */
362
0
static void __redisAsyncFree(redisAsyncContext *ac) {
363
0
    redisContext *c = &(ac->c);
364
0
    redisCallback cb;
365
0
    dictIterator it;
366
0
    dictEntry *de;
367
368
    /* Execute pending callbacks with NULL reply. */
369
0
    while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
370
0
        __redisRunCallback(ac,&cb,NULL);
371
0
    while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK)
372
0
        __redisRunCallback(ac,&cb,NULL);
373
374
    /* Run subscription callbacks with NULL reply */
375
0
    if (ac->sub.channels) {
376
0
        dictInitIterator(&it,ac->sub.channels);
377
0
        while ((de = dictNext(&it)) != NULL)
378
0
            __redisRunCallback(ac,dictGetEntryVal(de),NULL);
379
380
0
        dictRelease(ac->sub.channels);
381
0
    }
382
383
0
    if (ac->sub.patterns) {
384
0
        dictInitIterator(&it,ac->sub.patterns);
385
0
        while ((de = dictNext(&it)) != NULL)
386
0
            __redisRunCallback(ac,dictGetEntryVal(de),NULL);
387
388
0
        dictRelease(ac->sub.patterns);
389
0
    }
390
391
    /* Signal event lib to clean up */
392
0
    _EL_CLEANUP(ac);
393
394
    /* Execute disconnect callback. When redisAsyncFree() initiated destroying
395
     * this context, the status will always be REDIS_OK. */
396
0
    if (c->flags & REDIS_CONNECTED) {
397
0
        int status = ac->err == 0 ? REDIS_OK : REDIS_ERR;
398
0
        if (c->flags & REDIS_FREEING)
399
0
            status = REDIS_OK;
400
0
        __redisRunDisconnectCallback(ac, status);
401
0
    }
402
403
0
    if (ac->dataCleanup) {
404
0
        ac->dataCleanup(ac->data);
405
0
    }
406
407
    /* Cleanup self */
408
0
    redisFree(c);
409
0
}
410
411
/* Free the async context. When this function is called from a callback,
412
 * control needs to be returned to redisProcessCallbacks() before actual
413
 * free'ing. To do so, a flag is set on the context which is picked up by
414
 * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */
415
0
void redisAsyncFree(redisAsyncContext *ac) {
416
0
    if (ac == NULL)
417
0
        return;
418
419
0
    redisContext *c = &(ac->c);
420
421
0
    c->flags |= REDIS_FREEING;
422
0
    if (!(c->flags & REDIS_IN_CALLBACK))
423
0
        __redisAsyncFree(ac);
424
0
}
425
426
/* Helper function to make the disconnect happen and clean up. */
427
0
void __redisAsyncDisconnect(redisAsyncContext *ac) {
428
0
    redisContext *c = &(ac->c);
429
430
    /* Make sure error is accessible if there is any */
431
0
    __redisAsyncCopyError(ac);
432
433
0
    if (ac->err == 0) {
434
        /* For clean disconnects, there should be no pending callbacks. */
435
0
        int ret = __redisShiftCallback(&ac->replies,NULL);
436
0
        assert(ret == REDIS_ERR);
437
0
    } else {
438
        /* Disconnection is caused by an error, make sure that pending
439
         * callbacks cannot call new commands. */
440
0
        c->flags |= REDIS_DISCONNECTING;
441
0
    }
442
443
    /* cleanup event library on disconnect.
444
     * this is safe to call multiple times */
445
0
    _EL_CLEANUP(ac);
446
447
    /* For non-clean disconnects, __redisAsyncFree() will execute pending
448
     * callbacks with a NULL-reply. */
449
0
    if (!(c->flags & REDIS_NO_AUTO_FREE)) {
450
0
      __redisAsyncFree(ac);
451
0
    }
452
0
}
453
454
/* Tries to do a clean disconnect from Redis, meaning it stops new commands
455
 * from being issued, but tries to flush the output buffer and execute
456
 * callbacks for all remaining replies. When this function is called from a
457
 * callback, there might be more replies and we can safely defer disconnecting
458
 * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately
459
 * when there are no pending callbacks. */
460
0
void redisAsyncDisconnect(redisAsyncContext *ac) {
461
0
    redisContext *c = &(ac->c);
462
0
    c->flags |= REDIS_DISCONNECTING;
463
464
    /** unset the auto-free flag here, because disconnect undoes this */
465
0
    c->flags &= ~REDIS_NO_AUTO_FREE;
466
0
    if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
467
0
        __redisAsyncDisconnect(ac);
468
0
}
469
470
0
static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
471
0
    redisContext *c = &(ac->c);
472
0
    dict *callbacks;
473
0
    redisCallback *cb = NULL;
474
0
    dictEntry *de;
475
0
    int pvariant;
476
0
    char *stype;
477
0
    sds sname = NULL;
478
479
    /* Match reply with the expected format of a pushed message.
480
     * The type and number of elements (3 to 4) are specified at:
481
     * https://redis.io/topics/pubsub#format-of-pushed-messages */
482
0
    if ((reply->type == REDIS_REPLY_ARRAY && !(c->flags & REDIS_SUPPORTS_PUSH) && reply->elements >= 3) ||
483
0
        reply->type == REDIS_REPLY_PUSH) {
484
0
        assert(reply->element[0]->type == REDIS_REPLY_STRING);
485
0
        stype = reply->element[0]->str;
486
0
        pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
487
488
0
        if (pvariant)
489
0
            callbacks = ac->sub.patterns;
490
0
        else
491
0
            callbacks = ac->sub.channels;
492
493
        /* Locate the right callback */
494
0
        if (reply->element[1]->type == REDIS_REPLY_STRING) {
495
0
            sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
496
0
            if (sname == NULL) goto oom;
497
498
0
            if ((de = dictFind(callbacks,sname)) != NULL) {
499
0
                cb = dictGetEntryVal(de);
500
0
                memcpy(dstcb,cb,sizeof(*dstcb));
501
0
            }
502
0
        }
503
504
        /* If this is an subscribe reply decrease pending counter. */
505
0
        if (strcasecmp(stype+pvariant,"subscribe") == 0) {
506
0
            assert(cb != NULL);
507
0
            cb->pending_subs -= 1;
508
509
0
        } else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
510
0
            if (cb == NULL)
511
0
                ac->sub.pending_unsubs -= 1;
512
0
            else if (cb->pending_subs == 0)
513
0
                dictDelete(callbacks,sname);
514
515
            /* If this was the last unsubscribe message, revert to
516
             * non-subscribe mode. */
517
0
            assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
518
519
            /* Unset subscribed flag only when no pipelined pending subscribe
520
             * or pending unsubscribe replies. */
521
0
            if (reply->element[2]->integer == 0
522
0
                && dictSize(ac->sub.channels) == 0
523
0
                && dictSize(ac->sub.patterns) == 0
524
0
                && ac->sub.pending_unsubs == 0) {
525
0
                c->flags &= ~REDIS_SUBSCRIBED;
526
527
                /* Move ongoing regular command callbacks. */
528
0
                redisCallback cb;
529
0
                while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) {
530
0
                    __redisPushCallback(&ac->replies,&cb);
531
0
                }
532
0
            }
533
0
        }
534
0
        sdsfree(sname);
535
0
    } else {
536
        /* Shift callback for pending command in subscribed context. */
537
0
        __redisShiftCallback(&ac->sub.replies,dstcb);
538
0
    }
539
0
    return REDIS_OK;
540
0
oom:
541
0
    __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory");
542
0
    __redisAsyncCopyError(ac);
543
0
    return REDIS_ERR;
544
0
}
545
546
#define redisIsSpontaneousPushReply(r) \
547
0
    (redisIsPushReply(r) && !redisIsSubscribeReply(r))
548
549
0
static int redisIsSubscribeReply(redisReply *reply) {
550
0
    char *str;
551
0
    size_t len, off;
552
553
    /* We will always have at least one string with the subscribe/message type */
554
0
    if (reply->elements < 1 || reply->element[0]->type != REDIS_REPLY_STRING ||
555
0
        reply->element[0]->len < sizeof("message") - 1)
556
0
    {
557
0
        return 0;
558
0
    }
559
560
    /* Get the string/len moving past 'p' if needed */
561
0
    off = tolower(reply->element[0]->str[0]) == 'p';
562
0
    str = reply->element[0]->str + off;
563
0
    len = reply->element[0]->len - off;
564
565
0
    return !strncasecmp(str, "subscribe", len) ||
566
0
           !strncasecmp(str, "message", len) ||
567
0
           !strncasecmp(str, "unsubscribe", len);
568
0
}
569
570
0
void redisProcessCallbacks(redisAsyncContext *ac) {
571
0
    redisContext *c = &(ac->c);
572
0
    void *reply = NULL;
573
0
    int status;
574
575
0
    while((status = redisGetReply(c,&reply)) == REDIS_OK) {
576
0
        if (reply == NULL) {
577
            /* When the connection is being disconnected and there are
578
             * no more replies, this is the cue to really disconnect. */
579
0
            if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0
580
0
                && ac->replies.head == NULL) {
581
0
                __redisAsyncDisconnect(ac);
582
0
                return;
583
0
            }
584
            /* When the connection is not being disconnected, simply stop
585
             * trying to get replies and wait for the next loop tick. */
586
0
            break;
587
0
        }
588
589
        /* Keep track of push message support for subscribe handling */
590
0
        if (redisIsPushReply(reply)) c->flags |= REDIS_SUPPORTS_PUSH;
591
592
        /* Send any non-subscribe related PUSH messages to our PUSH handler
593
         * while allowing subscribe related PUSH messages to pass through.
594
         * This allows existing code to be backward compatible and work in
595
         * either RESP2 or RESP3 mode. */
596
0
        if (redisIsSpontaneousPushReply(reply)) {
597
0
            __redisRunPushCallback(ac, reply);
598
0
            c->reader->fn->freeObject(reply);
599
0
            continue;
600
0
        }
601
602
        /* Even if the context is subscribed, pending regular
603
         * callbacks will get a reply before pub/sub messages arrive. */
604
0
        redisCallback cb = {NULL, NULL, 0, 0, NULL};
605
0
        if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
606
            /*
607
             * A spontaneous reply in a not-subscribed context can be the error
608
             * reply that is sent when a new connection exceeds the maximum
609
             * number of allowed connections on the server side.
610
             *
611
             * This is seen as an error instead of a regular reply because the
612
             * server closes the connection after sending it.
613
             *
614
             * To prevent the error from being overwritten by an EOF error the
615
             * connection is closed here. See issue #43.
616
             *
617
             * Another possibility is that the server is loading its dataset.
618
             * In this case we also want to close the connection, and have the
619
             * user wait until the server is ready to take our request.
620
             */
621
0
            if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
622
0
                c->err = REDIS_ERR_OTHER;
623
0
                snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
624
0
                c->reader->fn->freeObject(reply);
625
0
                __redisAsyncDisconnect(ac);
626
0
                return;
627
0
            }
628
            /* No more regular callbacks and no errors, the context *must* be subscribed. */
629
0
            assert(c->flags & REDIS_SUBSCRIBED);
630
0
            if (c->flags & REDIS_SUBSCRIBED)
631
0
                __redisGetSubscribeCallback(ac,reply,&cb);
632
0
        }
633
634
0
        if (cb.fn != NULL) {
635
0
            __redisRunCallback(ac,&cb,reply);
636
0
            if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){
637
0
                c->reader->fn->freeObject(reply);
638
0
            }
639
640
            /* Proceed with free'ing when redisAsyncFree() was called. */
641
0
            if (c->flags & REDIS_FREEING) {
642
0
                __redisAsyncFree(ac);
643
0
                return;
644
0
            }
645
0
        } else {
646
            /* No callback for this reply. This can either be a NULL callback,
647
             * or there were no callbacks to begin with. Either way, don't
648
             * abort with an error, but simply ignore it because the client
649
             * doesn't know what the server will spit out over the wire. */
650
0
            c->reader->fn->freeObject(reply);
651
0
        }
652
653
        /* If in monitor mode, repush the callback */
654
0
        if (c->flags & REDIS_MONITORING) {
655
0
            __redisPushCallback(&ac->replies,&cb);
656
0
        }
657
0
    }
658
659
    /* Disconnect when there was an error reading the reply */
660
0
    if (status != REDIS_OK)
661
0
        __redisAsyncDisconnect(ac);
662
0
}
663
664
0
static void __redisAsyncHandleConnectFailure(redisAsyncContext *ac) {
665
0
    __redisRunConnectCallback(ac, REDIS_ERR);
666
0
    __redisAsyncDisconnect(ac);
667
0
}
668
669
/* Internal helper function to detect socket status the first time a read or
670
 * write event fires. When connecting was not successful, the connect callback
671
 * is called with a REDIS_ERR status and the context is free'd. */
672
0
static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
673
0
    int completed = 0;
674
0
    redisContext *c = &(ac->c);
675
676
0
    if (redisCheckConnectDone(c, &completed) == REDIS_ERR) {
677
        /* Error! */
678
0
        if (redisCheckSocketError(c) == REDIS_ERR)
679
0
            __redisAsyncCopyError(ac);
680
0
        __redisAsyncHandleConnectFailure(ac);
681
0
        return REDIS_ERR;
682
0
    } else if (completed == 1) {
683
        /* connected! */
684
0
        if (c->connection_type == REDIS_CONN_TCP &&
685
0
            redisSetTcpNoDelay(c) == REDIS_ERR) {
686
0
            __redisAsyncHandleConnectFailure(ac);
687
0
            return REDIS_ERR;
688
0
        }
689
690
        /* flag us as fully connect, but allow the callback
691
         * to disconnect.  For that reason, permit the function
692
         * to delete the context here after callback return.
693
         */
694
0
        c->flags |= REDIS_CONNECTED;
695
0
        __redisRunConnectCallback(ac, REDIS_OK);
696
0
        if ((ac->c.flags & REDIS_DISCONNECTING)) {
697
0
            redisAsyncDisconnect(ac);
698
0
            return REDIS_ERR;
699
0
        } else if ((ac->c.flags & REDIS_FREEING)) {
700
0
            redisAsyncFree(ac);
701
0
            return REDIS_ERR;
702
0
        }
703
0
        return REDIS_OK;
704
0
    } else {
705
0
        return REDIS_OK;
706
0
    }
707
0
}
708
709
0
void redisAsyncRead(redisAsyncContext *ac) {
710
0
    redisContext *c = &(ac->c);
711
712
0
    if (redisBufferRead(c) == REDIS_ERR) {
713
0
        __redisAsyncDisconnect(ac);
714
0
    } else {
715
        /* Always re-schedule reads */
716
0
        _EL_ADD_READ(ac);
717
0
        redisProcessCallbacks(ac);
718
0
    }
719
0
}
720
721
/* This function should be called when the socket is readable.
722
 * It processes all replies that can be read and executes their callbacks.
723
 */
724
0
void redisAsyncHandleRead(redisAsyncContext *ac) {
725
0
    redisContext *c = &(ac->c);
726
    /* must not be called from a callback */
727
0
    assert(!(c->flags & REDIS_IN_CALLBACK));
728
729
0
    if (!(c->flags & REDIS_CONNECTED)) {
730
        /* Abort connect was not successful. */
731
0
        if (__redisAsyncHandleConnect(ac) != REDIS_OK)
732
0
            return;
733
        /* Try again later when the context is still not connected. */
734
0
        if (!(c->flags & REDIS_CONNECTED))
735
0
            return;
736
0
    }
737
738
0
    c->funcs->async_read(ac);
739
0
}
740
741
0
void redisAsyncWrite(redisAsyncContext *ac) {
742
0
    redisContext *c = &(ac->c);
743
0
    int done = 0;
744
745
0
    if (redisBufferWrite(c,&done) == REDIS_ERR) {
746
0
        __redisAsyncDisconnect(ac);
747
0
    } else {
748
        /* Continue writing when not done, stop writing otherwise */
749
0
        if (!done)
750
0
            _EL_ADD_WRITE(ac);
751
0
        else
752
0
            _EL_DEL_WRITE(ac);
753
754
        /* Always schedule reads after writes */
755
0
        _EL_ADD_READ(ac);
756
0
    }
757
0
}
758
759
0
void redisAsyncHandleWrite(redisAsyncContext *ac) {
760
0
    redisContext *c = &(ac->c);
761
    /* must not be called from a callback */
762
0
    assert(!(c->flags & REDIS_IN_CALLBACK));
763
764
0
    if (!(c->flags & REDIS_CONNECTED)) {
765
        /* Abort connect was not successful. */
766
0
        if (__redisAsyncHandleConnect(ac) != REDIS_OK)
767
0
            return;
768
        /* Try again later when the context is still not connected. */
769
0
        if (!(c->flags & REDIS_CONNECTED))
770
0
            return;
771
0
    }
772
773
0
    c->funcs->async_write(ac);
774
0
}
775
776
0
void redisAsyncHandleTimeout(redisAsyncContext *ac) {
777
0
    redisContext *c = &(ac->c);
778
0
    redisCallback cb;
779
    /* must not be called from a callback */
780
0
    assert(!(c->flags & REDIS_IN_CALLBACK));
781
782
0
    if ((c->flags & REDIS_CONNECTED)) {
783
0
        if (ac->replies.head == NULL && ac->sub.replies.head == NULL) {
784
            /* Nothing to do - just an idle timeout */
785
0
            return;
786
0
        }
787
788
0
        if (!ac->c.command_timeout ||
789
0
            (!ac->c.command_timeout->tv_sec && !ac->c.command_timeout->tv_usec)) {
790
            /* A belated connect timeout arriving, ignore */
791
0
            return;
792
0
        }
793
0
    }
794
795
0
    if (!c->err) {
796
0
        __redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout");
797
0
        __redisAsyncCopyError(ac);
798
0
    }
799
800
0
    if (!(c->flags & REDIS_CONNECTED)) {
801
0
        __redisRunConnectCallback(ac, REDIS_ERR);
802
0
    }
803
804
0
    while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) {
805
0
        __redisRunCallback(ac, &cb, NULL);
806
0
    }
807
808
    /**
809
     * TODO: Don't automatically sever the connection,
810
     * rather, allow to ignore <x> responses before the queue is clear
811
     */
812
0
    __redisAsyncDisconnect(ac);
813
0
}
814
815
/* Sets a pointer to the first argument and its length starting at p. Returns
816
 * the number of bytes to skip to get to the following argument. */
817
0
static const char *nextArgument(const char *start, const char **str, size_t *len) {
818
0
    const char *p = start;
819
0
    if (p[0] != '$') {
820
0
        p = strchr(p,'$');
821
0
        if (p == NULL) return NULL;
822
0
    }
823
824
0
    *len = (int)strtol(p+1,NULL,10);
825
0
    p = strchr(p,'\r');
826
0
    assert(p);
827
0
    *str = p+2;
828
0
    return p+2+(*len)+2;
829
0
}
830
831
/* Helper function for the redisAsyncCommand* family of functions. Writes a
832
 * formatted command to the output buffer and registers the provided callback
833
 * function with the context. */
834
0
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
835
0
    redisContext *c = &(ac->c);
836
0
    redisCallback cb;
837
0
    struct dict *cbdict;
838
0
    dictIterator it;
839
0
    dictEntry *de;
840
0
    redisCallback *existcb;
841
0
    int pvariant, hasnext;
842
0
    const char *cstr, *astr;
843
0
    size_t clen, alen;
844
0
    const char *p;
845
0
    sds sname;
846
0
    int ret;
847
848
    /* Don't accept new commands when the connection is about to be closed. */
849
0
    if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
850
851
    /* Setup callback */
852
0
    cb.fn = fn;
853
0
    cb.privdata = privdata;
854
0
    cb.pending_subs = 1;
855
0
    cb.unsubscribe_sent = 0;
856
857
    /* Find out which command will be appended. */
858
0
    p = nextArgument(cmd,&cstr,&clen);
859
0
    assert(p != NULL);
860
0
    hasnext = (p[0] == '$');
861
0
    pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
862
0
    cstr += pvariant;
863
0
    clen -= pvariant;
864
865
0
    if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
866
0
        c->flags |= REDIS_SUBSCRIBED;
867
868
        /* Add every channel/pattern to the list of subscription callbacks. */
869
0
        while ((p = nextArgument(p,&astr,&alen)) != NULL) {
870
0
            sname = sdsnewlen(astr,alen);
871
0
            if (sname == NULL)
872
0
                goto oom;
873
874
0
            if (pvariant)
875
0
                cbdict = ac->sub.patterns;
876
0
            else
877
0
                cbdict = ac->sub.channels;
878
879
0
            de = dictFind(cbdict,sname);
880
881
0
            if (de != NULL) {
882
0
                existcb = dictGetEntryVal(de);
883
0
                cb.pending_subs = existcb->pending_subs + 1;
884
0
            }
885
886
0
            ret = dictReplace(cbdict,sname,&cb);
887
888
0
            if (ret == 0) sdsfree(sname);
889
0
        }
890
0
    } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
891
        /* It is only useful to call (P)UNSUBSCRIBE when the context is
892
         * subscribed to one or more channels or patterns. */
893
0
        if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
894
895
0
        if (pvariant)
896
0
            cbdict = ac->sub.patterns;
897
0
        else
898
0
            cbdict = ac->sub.channels;
899
900
0
        if (hasnext) {
901
            /* Send an unsubscribe with specific channels/patterns.
902
             * Bookkeeping the number of expected replies */
903
0
            while ((p = nextArgument(p,&astr,&alen)) != NULL) {
904
0
                sname = sdsnewlen(astr,alen);
905
0
                if (sname == NULL)
906
0
                    goto oom;
907
908
0
                de = dictFind(cbdict,sname);
909
0
                if (de != NULL) {
910
0
                    existcb = dictGetEntryVal(de);
911
0
                    if (existcb->unsubscribe_sent == 0)
912
0
                        existcb->unsubscribe_sent = 1;
913
0
                    else
914
                        /* Already sent, reply to be ignored */
915
0
                        ac->sub.pending_unsubs += 1;
916
0
                } else {
917
                    /* Not subscribed to, reply to be ignored */
918
0
                    ac->sub.pending_unsubs += 1;
919
0
                }
920
0
                sdsfree(sname);
921
0
            }
922
0
        } else {
923
            /* Send an unsubscribe without specific channels/patterns.
924
             * Bookkeeping the number of expected replies */
925
0
            int no_subs = 1;
926
0
            dictInitIterator(&it,cbdict);
927
0
            while ((de = dictNext(&it)) != NULL) {
928
0
                existcb = dictGetEntryVal(de);
929
0
                if (existcb->unsubscribe_sent == 0) {
930
0
                    existcb->unsubscribe_sent = 1;
931
0
                    no_subs = 0;
932
0
                }
933
0
            }
934
            /* Unsubscribing to all channels/patterns, where none is
935
             * subscribed to, results in a single reply to be ignored. */
936
0
            if (no_subs == 1)
937
0
                ac->sub.pending_unsubs += 1;
938
0
        }
939
940
        /* (P)UNSUBSCRIBE does not have its own response: every channel or
941
         * pattern that is unsubscribed will receive a message. This means we
942
         * should not append a callback function for this command. */
943
0
    } else if (strncasecmp(cstr,"monitor\r\n",9) == 0) {
944
        /* Set monitor flag and push callback */
945
0
        c->flags |= REDIS_MONITORING;
946
0
        if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK)
947
0
            goto oom;
948
0
    } else {
949
0
        if (c->flags & REDIS_SUBSCRIBED) {
950
0
            if (__redisPushCallback(&ac->sub.replies,&cb) != REDIS_OK)
951
0
                goto oom;
952
0
        } else {
953
0
            if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK)
954
0
                goto oom;
955
0
        }
956
0
    }
957
958
0
    __redisAppendCommand(c,cmd,len);
959
960
    /* Always schedule a write when the write buffer is non-empty */
961
0
    _EL_ADD_WRITE(ac);
962
963
0
    return REDIS_OK;
964
0
oom:
965
0
    __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory");
966
0
    __redisAsyncCopyError(ac);
967
0
    return REDIS_ERR;
968
0
}
969
970
0
int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
971
0
    char *cmd;
972
0
    int len;
973
0
    int status;
974
0
    len = redisvFormatCommand(&cmd,format,ap);
975
976
    /* We don't want to pass -1 or -2 to future functions as a length. */
977
0
    if (len < 0)
978
0
        return REDIS_ERR;
979
980
0
    status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
981
0
    hi_free(cmd);
982
0
    return status;
983
0
}
984
985
0
int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) {
986
0
    va_list ap;
987
0
    int status;
988
0
    va_start(ap,format);
989
0
    status = redisvAsyncCommand(ac,fn,privdata,format,ap);
990
0
    va_end(ap);
991
0
    return status;
992
0
}
993
994
0
int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
995
0
    sds cmd;
996
0
    long long len;
997
0
    int status;
998
0
    len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
999
0
    if (len < 0)
1000
0
        return REDIS_ERR;
1001
0
    status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
1002
0
    sdsfree(cmd);
1003
0
    return status;
1004
0
}
1005
1006
0
int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
1007
0
    int status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
1008
0
    return status;
1009
0
}
1010
1011
0
redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn) {
1012
0
    redisAsyncPushFn *old = ac->push_cb;
1013
0
    ac->push_cb = fn;
1014
0
    return old;
1015
0
}
1016
1017
0
int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) {
1018
0
    if (!ac->c.command_timeout) {
1019
0
        ac->c.command_timeout = hi_calloc(1, sizeof(tv));
1020
0
        if (ac->c.command_timeout == NULL) {
1021
0
            __redisSetError(&ac->c, REDIS_ERR_OOM, "Out of memory");
1022
0
            __redisAsyncCopyError(ac);
1023
0
            return REDIS_ERR;
1024
0
        }
1025
0
    }
1026
1027
0
    if (tv.tv_sec != ac->c.command_timeout->tv_sec ||
1028
0
        tv.tv_usec != ac->c.command_timeout->tv_usec)
1029
0
    {
1030
0
        *ac->c.command_timeout = tv;
1031
0
    }
1032
1033
0
    return REDIS_OK;
1034
0
}