Coverage Report

Created: 2024-02-25 06:15

/src/h2o/deps/hiredis/async.c
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
3
 * Copyright (c) 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4
 *
5
 * All rights reserved.
6
 *
7
 * Redistribution and use in source and binary forms, with or without
8
 * modification, are permitted provided that the following conditions are met:
9
 *
10
 *   * Redistributions of source code must retain the above copyright notice,
11
 *     this list of conditions and the following disclaimer.
12
 *   * Redistributions in binary form must reproduce the above copyright
13
 *     notice, this list of conditions and the following disclaimer in the
14
 *     documentation and/or other materials provided with the distribution.
15
 *   * Neither the name of Redis nor the names of its contributors may be used
16
 *     to endorse or promote products derived from this software without
17
 *     specific prior written permission.
18
 *
19
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29
 * POSSIBILITY OF SUCH DAMAGE.
30
 */
31
32
#include "fmacros.h"
33
#include <stdlib.h>
34
#include <string.h>
35
#include <strings.h>
36
#include <assert.h>
37
#include <ctype.h>
38
#include <errno.h>
39
#include "async.h"
40
#include "net.h"
41
#include "dict.c"
42
#include "sds.h"
43
44
0
#define _EL_ADD_READ(ctx) do { \
45
0
        if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \
46
0
    } while(0)
47
#define _EL_DEL_READ(ctx) do { \
48
        if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \
49
    } while(0)
50
0
#define _EL_ADD_WRITE(ctx) do { \
51
0
        if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \
52
0
    } while(0)
53
0
#define _EL_DEL_WRITE(ctx) do { \
54
0
        if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \
55
0
    } while(0)
56
0
#define _EL_CLEANUP(ctx) do { \
57
0
        if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \
58
0
    } while(0);
59
60
/* Forward declaration of function in hiredis.c */
61
int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
62
63
/* Functions managing dictionary of callbacks for pub/sub. */
64
0
static unsigned int callbackHash(const void *key) {
65
0
    return dictGenHashFunction((const unsigned char *)key,
66
0
                               sdslen((const sds)key));
67
0
}
68
69
0
static void *callbackValDup(void *privdata, const void *src) {
70
0
    ((void) privdata);
71
0
    redisCallback *dup = malloc(sizeof(*dup));
72
0
    memcpy(dup,src,sizeof(*dup));
73
0
    return dup;
74
0
}
75
76
0
static int callbackKeyCompare(void *privdata, const void *key1, const void *key2) {
77
0
    int l1, l2;
78
0
    ((void) privdata);
79
80
0
    l1 = sdslen((const sds)key1);
81
0
    l2 = sdslen((const sds)key2);
82
0
    if (l1 != l2) return 0;
83
0
    return memcmp(key1,key2,l1) == 0;
84
0
}
85
86
0
static void callbackKeyDestructor(void *privdata, void *key) {
87
0
    ((void) privdata);
88
0
    sdsfree((sds)key);
89
0
}
90
91
0
static void callbackValDestructor(void *privdata, void *val) {
92
0
    ((void) privdata);
93
0
    free(val);
94
0
}
95
96
static dictType callbackDict = {
97
    callbackHash,
98
    NULL,
99
    callbackValDup,
100
    callbackKeyCompare,
101
    callbackKeyDestructor,
102
    callbackValDestructor
103
};
104
105
0
static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
106
0
    redisAsyncContext *ac;
107
108
0
    ac = realloc(c,sizeof(redisAsyncContext));
109
0
    if (ac == NULL)
110
0
        return NULL;
111
112
0
    c = &(ac->c);
113
114
    /* The regular connect functions will always set the flag REDIS_CONNECTED.
115
     * For the async API, we want to wait until the first write event is
116
     * received up before setting this flag, so reset it here. */
117
0
    c->flags &= ~REDIS_CONNECTED;
118
119
0
    ac->err = 0;
120
0
    ac->errstr = NULL;
121
0
    ac->data = NULL;
122
123
0
    ac->ev.data = NULL;
124
0
    ac->ev.addRead = NULL;
125
0
    ac->ev.delRead = NULL;
126
0
    ac->ev.addWrite = NULL;
127
0
    ac->ev.delWrite = NULL;
128
0
    ac->ev.cleanup = NULL;
129
130
0
    ac->onConnect = NULL;
131
0
    ac->onDisconnect = NULL;
132
133
0
    ac->replies.head = NULL;
134
0
    ac->replies.tail = NULL;
135
0
    ac->sub.invalid.head = NULL;
136
0
    ac->sub.invalid.tail = NULL;
137
0
    ac->sub.channels = dictCreate(&callbackDict,NULL);
138
0
    ac->sub.patterns = dictCreate(&callbackDict,NULL);
139
0
    return ac;
140
0
}
141
142
/* We want the error field to be accessible directly instead of requiring
143
 * an indirection to the redisContext struct. */
144
0
static void __redisAsyncCopyError(redisAsyncContext *ac) {
145
0
    if (!ac)
146
0
        return;
147
148
0
    redisContext *c = &(ac->c);
149
0
    ac->err = c->err;
150
0
    ac->errstr = c->errstr;
151
0
}
152
153
0
redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
154
0
    redisContext *c;
155
0
    redisAsyncContext *ac;
156
157
0
    c = redisConnectNonBlock(ip,port);
158
0
    if (c == NULL)
159
0
        return NULL;
160
161
0
    ac = redisAsyncInitialize(c);
162
0
    if (ac == NULL) {
163
0
        redisFree(c);
164
0
        return NULL;
165
0
    }
166
167
0
    __redisAsyncCopyError(ac);
168
0
    return ac;
169
0
}
170
171
redisAsyncContext *redisAsyncConnectBind(const char *ip, int port,
172
0
                                         const char *source_addr) {
173
0
    redisContext *c = redisConnectBindNonBlock(ip,port,source_addr);
174
0
    redisAsyncContext *ac = redisAsyncInitialize(c);
175
0
    __redisAsyncCopyError(ac);
176
0
    return ac;
177
0
}
178
179
redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
180
0
                                                  const char *source_addr) {
181
0
    redisContext *c = redisConnectBindNonBlockWithReuse(ip,port,source_addr);
182
0
    redisAsyncContext *ac = redisAsyncInitialize(c);
183
0
    __redisAsyncCopyError(ac);
184
0
    return ac;
185
0
}
186
187
0
redisAsyncContext *redisAsyncConnectUnix(const char *path) {
188
0
    redisContext *c;
189
0
    redisAsyncContext *ac;
190
191
0
    c = redisConnectUnixNonBlock(path);
192
0
    if (c == NULL)
193
0
        return NULL;
194
195
0
    ac = redisAsyncInitialize(c);
196
0
    if (ac == NULL) {
197
0
        redisFree(c);
198
0
        return NULL;
199
0
    }
200
201
0
    __redisAsyncCopyError(ac);
202
0
    return ac;
203
0
}
204
205
0
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
206
0
    if (ac->onConnect == NULL) {
207
0
        ac->onConnect = fn;
208
209
        /* The common way to detect an established connection is to wait for
210
         * the first write event to be fired. This assumes the related event
211
         * library functions are already set. */
212
0
        _EL_ADD_WRITE(ac);
213
0
        return REDIS_OK;
214
0
    }
215
0
    return REDIS_ERR;
216
0
}
217
218
0
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
219
0
    if (ac->onDisconnect == NULL) {
220
0
        ac->onDisconnect = fn;
221
0
        return REDIS_OK;
222
0
    }
223
0
    return REDIS_ERR;
224
0
}
225
226
/* Helper functions to push/shift callbacks */
227
0
static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
228
0
    redisCallback *cb;
229
230
    /* Copy callback from stack to heap */
231
0
    cb = malloc(sizeof(*cb));
232
0
    if (cb == NULL)
233
0
        return REDIS_ERR_OOM;
234
235
0
    if (source != NULL) {
236
0
        memcpy(cb,source,sizeof(*cb));
237
0
        cb->next = NULL;
238
0
    }
239
240
    /* Store callback in list */
241
0
    if (list->head == NULL)
242
0
        list->head = cb;
243
0
    if (list->tail != NULL)
244
0
        list->tail->next = cb;
245
0
    list->tail = cb;
246
0
    return REDIS_OK;
247
0
}
248
249
0
static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) {
250
0
    redisCallback *cb = list->head;
251
0
    if (cb != NULL) {
252
0
        list->head = cb->next;
253
0
        if (cb == list->tail)
254
0
            list->tail = NULL;
255
256
        /* Copy callback from heap to stack */
257
0
        if (target != NULL)
258
0
            memcpy(target,cb,sizeof(*cb));
259
0
        free(cb);
260
0
        return REDIS_OK;
261
0
    }
262
0
    return REDIS_ERR;
263
0
}
264
265
0
static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
266
0
    redisContext *c = &(ac->c);
267
0
    if (cb->fn != NULL) {
268
0
        c->flags |= REDIS_IN_CALLBACK;
269
0
        cb->fn(ac,reply,cb->privdata);
270
0
        c->flags &= ~REDIS_IN_CALLBACK;
271
0
    }
272
0
}
273
274
/* Helper function to free the context. */
275
0
static void __redisAsyncFree(redisAsyncContext *ac) {
276
0
    redisContext *c = &(ac->c);
277
0
    redisCallback cb;
278
0
    dictIterator *it;
279
0
    dictEntry *de;
280
281
    /* Execute pending callbacks with NULL reply. */
282
0
    while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
283
0
        __redisRunCallback(ac,&cb,NULL);
284
285
    /* Execute callbacks for invalid commands */
286
0
    while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
287
0
        __redisRunCallback(ac,&cb,NULL);
288
289
    /* Run subscription callbacks callbacks with NULL reply */
290
0
    it = dictGetIterator(ac->sub.channels);
291
0
    while ((de = dictNext(it)) != NULL)
292
0
        __redisRunCallback(ac,dictGetEntryVal(de),NULL);
293
0
    dictReleaseIterator(it);
294
0
    dictRelease(ac->sub.channels);
295
296
0
    it = dictGetIterator(ac->sub.patterns);
297
0
    while ((de = dictNext(it)) != NULL)
298
0
        __redisRunCallback(ac,dictGetEntryVal(de),NULL);
299
0
    dictReleaseIterator(it);
300
0
    dictRelease(ac->sub.patterns);
301
302
    /* Signal event lib to clean up */
303
0
    _EL_CLEANUP(ac);
304
305
    /* Execute disconnect callback. When redisAsyncFree() initiated destroying
306
     * this context, the status will always be REDIS_OK. */
307
0
    if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
308
0
        if (c->flags & REDIS_FREEING) {
309
0
            ac->onDisconnect(ac,REDIS_OK);
310
0
        } else {
311
0
            ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
312
0
        }
313
0
    }
314
315
    /* Cleanup self */
316
0
    redisFree(c);
317
0
}
318
319
/* Free the async context. When this function is called from a callback,
320
 * control needs to be returned to redisProcessCallbacks() before actual
321
 * free'ing. To do so, a flag is set on the context which is picked up by
322
 * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */
323
0
void redisAsyncFree(redisAsyncContext *ac) {
324
0
    redisContext *c = &(ac->c);
325
0
    c->flags |= REDIS_FREEING;
326
0
    if (!(c->flags & REDIS_IN_CALLBACK))
327
0
        __redisAsyncFree(ac);
328
0
}
329
330
/* Helper function to make the disconnect happen and clean up. */
331
0
static void __redisAsyncDisconnect(redisAsyncContext *ac) {
332
0
    redisContext *c = &(ac->c);
333
334
    /* Make sure error is accessible if there is any */
335
0
    __redisAsyncCopyError(ac);
336
337
0
    if (ac->err == 0) {
338
        /* For clean disconnects, there should be no pending callbacks. */
339
0
        int ret = __redisShiftCallback(&ac->replies,NULL);
340
0
        assert(ret == REDIS_ERR);
341
0
    } else {
342
        /* Disconnection is caused by an error, make sure that pending
343
         * callbacks cannot call new commands. */
344
0
        c->flags |= REDIS_DISCONNECTING;
345
0
    }
346
347
    /* For non-clean disconnects, __redisAsyncFree() will execute pending
348
     * callbacks with a NULL-reply. */
349
0
    __redisAsyncFree(ac);
350
0
}
351
352
/* Tries to do a clean disconnect from Redis, meaning it stops new commands
353
 * from being issued, but tries to flush the output buffer and execute
354
 * callbacks for all remaining replies. When this function is called from a
355
 * callback, there might be more replies and we can safely defer disconnecting
356
 * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately
357
 * when there are no pending callbacks. */
358
0
void redisAsyncDisconnect(redisAsyncContext *ac) {
359
0
    redisContext *c = &(ac->c);
360
0
    c->flags |= REDIS_DISCONNECTING;
361
0
    if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
362
0
        __redisAsyncDisconnect(ac);
363
0
}
364
365
0
static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
366
0
    redisContext *c = &(ac->c);
367
0
    dict *callbacks;
368
0
    dictEntry *de;
369
0
    int pvariant;
370
0
    char *stype;
371
0
    sds sname;
372
373
    /* Custom reply functions are not supported for pub/sub. This will fail
374
     * very hard when they are used... */
375
0
    if (reply->type == REDIS_REPLY_ARRAY) {
376
0
        assert(reply->elements >= 2);
377
0
        assert(reply->element[0]->type == REDIS_REPLY_STRING);
378
0
        stype = reply->element[0]->str;
379
0
        pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
380
381
0
        if (pvariant)
382
0
            callbacks = ac->sub.patterns;
383
0
        else
384
0
            callbacks = ac->sub.channels;
385
386
        /* Locate the right callback */
387
0
        assert(reply->element[1]->type == REDIS_REPLY_STRING);
388
0
        sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
389
0
        de = dictFind(callbacks,sname);
390
0
        if (de != NULL) {
391
0
            memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb));
392
393
            /* If this is an unsubscribe message, remove it. */
394
0
            if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
395
0
                dictDelete(callbacks,sname);
396
397
                /* If this was the last unsubscribe message, revert to
398
                 * non-subscribe mode. */
399
0
                assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
400
0
                if (reply->element[2]->integer == 0)
401
0
                    c->flags &= ~REDIS_SUBSCRIBED;
402
0
            }
403
0
        }
404
0
        sdsfree(sname);
405
0
    } else {
406
        /* Shift callback for invalid commands. */
407
0
        __redisShiftCallback(&ac->sub.invalid,dstcb);
408
0
    }
409
0
    return REDIS_OK;
410
0
}
411
412
0
void redisProcessCallbacks(redisAsyncContext *ac) {
413
0
    redisContext *c = &(ac->c);
414
0
    redisCallback cb = {NULL, NULL, NULL};
415
0
    void *reply = NULL;
416
0
    int status;
417
418
0
    while((status = redisGetReply(c,&reply)) == REDIS_OK) {
419
0
        if (reply == NULL) {
420
            /* When the connection is being disconnected and there are
421
             * no more replies, this is the cue to really disconnect. */
422
0
            if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0
423
0
                && ac->replies.head == NULL) {
424
0
                __redisAsyncDisconnect(ac);
425
0
                return;
426
0
            }
427
428
            /* If monitor mode, repush callback */
429
0
            if(c->flags & REDIS_MONITORING) {
430
0
                __redisPushCallback(&ac->replies,&cb);
431
0
            }
432
433
            /* When the connection is not being disconnected, simply stop
434
             * trying to get replies and wait for the next loop tick. */
435
0
            break;
436
0
        }
437
438
        /* Even if the context is subscribed, pending regular callbacks will
439
         * get a reply before pub/sub messages arrive. */
440
0
        if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
441
            /*
442
             * A spontaneous reply in a not-subscribed context can be the error
443
             * reply that is sent when a new connection exceeds the maximum
444
             * number of allowed connections on the server side.
445
             *
446
             * This is seen as an error instead of a regular reply because the
447
             * server closes the connection after sending it.
448
             *
449
             * To prevent the error from being overwritten by an EOF error the
450
             * connection is closed here. See issue #43.
451
             *
452
             * Another possibility is that the server is loading its dataset.
453
             * In this case we also want to close the connection, and have the
454
             * user wait until the server is ready to take our request.
455
             */
456
0
            if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
457
0
                c->err = REDIS_ERR_OTHER;
458
0
                snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
459
0
                c->reader->fn->freeObject(reply);
460
0
                __redisAsyncDisconnect(ac);
461
0
                return;
462
0
            }
463
            /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
464
0
            assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
465
0
            if(c->flags & REDIS_SUBSCRIBED)
466
0
                __redisGetSubscribeCallback(ac,reply,&cb);
467
0
        }
468
469
0
        if (cb.fn != NULL) {
470
0
            __redisRunCallback(ac,&cb,reply);
471
0
            c->reader->fn->freeObject(reply);
472
473
            /* Proceed with free'ing when redisAsyncFree() was called. */
474
0
            if (c->flags & REDIS_FREEING) {
475
0
                __redisAsyncFree(ac);
476
0
                return;
477
0
            }
478
0
        } else {
479
            /* No callback for this reply. This can either be a NULL callback,
480
             * or there were no callbacks to begin with. Either way, don't
481
             * abort with an error, but simply ignore it because the client
482
             * doesn't know what the server will spit out over the wire. */
483
0
            c->reader->fn->freeObject(reply);
484
0
        }
485
0
    }
486
487
    /* Disconnect when there was an error reading the reply */
488
0
    if (status != REDIS_OK)
489
0
        __redisAsyncDisconnect(ac);
490
0
}
491
492
/* Internal helper function to detect socket status the first time a read or
493
 * write event fires. When connecting was not successful, the connect callback
494
 * is called with a REDIS_ERR status and the context is free'd. */
495
0
static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
496
0
    redisContext *c = &(ac->c);
497
498
0
    if (redisCheckSocketError(c) == REDIS_ERR) {
499
        /* Try again later when connect(2) is still in progress. */
500
0
        if (errno == EINPROGRESS)
501
0
            return REDIS_OK;
502
503
0
        if (ac->onConnect) ac->onConnect(ac,REDIS_ERR);
504
0
        __redisAsyncDisconnect(ac);
505
0
        return REDIS_ERR;
506
0
    }
507
508
    /* Mark context as connected. */
509
0
    c->flags |= REDIS_CONNECTED;
510
0
    if (ac->onConnect) ac->onConnect(ac,REDIS_OK);
511
0
    return REDIS_OK;
512
0
}
513
514
/* This function should be called when the socket is readable.
515
 * It processes all replies that can be read and executes their callbacks.
516
 */
517
0
void redisAsyncHandleRead(redisAsyncContext *ac) {
518
0
    redisContext *c = &(ac->c);
519
520
0
    if (!(c->flags & REDIS_CONNECTED)) {
521
        /* Abort connect was not successful. */
522
0
        if (__redisAsyncHandleConnect(ac) != REDIS_OK)
523
0
            return;
524
        /* Try again later when the context is still not connected. */
525
0
        if (!(c->flags & REDIS_CONNECTED))
526
0
            return;
527
0
    }
528
529
0
    if (redisBufferRead(c) == REDIS_ERR) {
530
0
        __redisAsyncDisconnect(ac);
531
0
    } else {
532
        /* Always re-schedule reads */
533
0
        _EL_ADD_READ(ac);
534
0
        redisProcessCallbacks(ac);
535
0
    }
536
0
}
537
538
0
void redisAsyncHandleWrite(redisAsyncContext *ac) {
539
0
    redisContext *c = &(ac->c);
540
0
    int done = 0;
541
542
0
    if (!(c->flags & REDIS_CONNECTED)) {
543
        /* Abort connect was not successful. */
544
0
        if (__redisAsyncHandleConnect(ac) != REDIS_OK)
545
0
            return;
546
        /* Try again later when the context is still not connected. */
547
0
        if (!(c->flags & REDIS_CONNECTED))
548
0
            return;
549
0
    }
550
551
0
    if (redisBufferWrite(c,&done) == REDIS_ERR) {
552
0
        __redisAsyncDisconnect(ac);
553
0
    } else {
554
        /* Continue writing when not done, stop writing otherwise */
555
0
        if (!done)
556
0
            _EL_ADD_WRITE(ac);
557
0
        else
558
0
            _EL_DEL_WRITE(ac);
559
560
        /* Always schedule reads after writes */
561
0
        _EL_ADD_READ(ac);
562
0
    }
563
0
}
564
565
/* Sets a pointer to the first argument and its length starting at p. Returns
566
 * the number of bytes to skip to get to the following argument. */
567
0
static const char *nextArgument(const char *start, const char **str, size_t *len) {
568
0
    const char *p = start;
569
0
    if (p[0] != '$') {
570
0
        p = strchr(p,'$');
571
0
        if (p == NULL) return NULL;
572
0
    }
573
574
0
    *len = (int)strtol(p+1,NULL,10);
575
0
    p = strchr(p,'\r');
576
0
    assert(p);
577
0
    *str = p+2;
578
0
    return p+2+(*len)+2;
579
0
}
580
581
/* Helper function for the redisAsyncCommand* family of functions. Writes a
582
 * formatted command to the output buffer and registers the provided callback
583
 * function with the context. */
584
0
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
585
0
    redisContext *c = &(ac->c);
586
0
    redisCallback cb;
587
0
    int pvariant, hasnext;
588
0
    const char *cstr, *astr;
589
0
    size_t clen, alen;
590
0
    const char *p;
591
0
    sds sname;
592
0
    int ret;
593
594
    /* Don't accept new commands when the connection is about to be closed. */
595
0
    if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
596
597
    /* Setup callback */
598
0
    cb.fn = fn;
599
0
    cb.privdata = privdata;
600
601
    /* Find out which command will be appended. */
602
0
    p = nextArgument(cmd,&cstr,&clen);
603
0
    assert(p != NULL);
604
0
    hasnext = (p[0] == '$');
605
0
    pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
606
0
    cstr += pvariant;
607
0
    clen -= pvariant;
608
609
0
    if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
610
0
        c->flags |= REDIS_SUBSCRIBED;
611
612
        /* Add every channel/pattern to the list of subscription callbacks. */
613
0
        while ((p = nextArgument(p,&astr,&alen)) != NULL) {
614
0
            sname = sdsnewlen(astr,alen);
615
0
            if (pvariant)
616
0
                ret = dictReplace(ac->sub.patterns,sname,&cb);
617
0
            else
618
0
                ret = dictReplace(ac->sub.channels,sname,&cb);
619
620
0
            if (ret == 0) sdsfree(sname);
621
0
        }
622
0
    } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
623
        /* It is only useful to call (P)UNSUBSCRIBE when the context is
624
         * subscribed to one or more channels or patterns. */
625
0
        if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
626
627
        /* (P)UNSUBSCRIBE does not have its own response: every channel or
628
         * pattern that is unsubscribed will receive a message. This means we
629
         * should not append a callback function for this command. */
630
0
     } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
631
         /* Set monitor flag and push callback */
632
0
         c->flags |= REDIS_MONITORING;
633
0
         __redisPushCallback(&ac->replies,&cb);
634
0
    } else {
635
0
        if (c->flags & REDIS_SUBSCRIBED)
636
            /* This will likely result in an error reply, but it needs to be
637
             * received and passed to the callback. */
638
0
            __redisPushCallback(&ac->sub.invalid,&cb);
639
0
        else
640
0
            __redisPushCallback(&ac->replies,&cb);
641
0
    }
642
643
0
    __redisAppendCommand(c,cmd,len);
644
645
    /* Always schedule a write when the write buffer is non-empty */
646
0
    _EL_ADD_WRITE(ac);
647
648
0
    return REDIS_OK;
649
0
}
650
651
0
int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
652
0
    char *cmd;
653
0
    int len;
654
0
    int status;
655
0
    len = redisvFormatCommand(&cmd,format,ap);
656
657
    /* We don't want to pass -1 or -2 to future functions as a length. */
658
0
    if (len < 0)
659
0
        return REDIS_ERR;
660
661
0
    status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
662
0
    free(cmd);
663
0
    return status;
664
0
}
665
666
0
int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) {
667
0
    va_list ap;
668
0
    int status;
669
0
    va_start(ap,format);
670
0
    status = redisvAsyncCommand(ac,fn,privdata,format,ap);
671
0
    va_end(ap);
672
0
    return status;
673
0
}
674
675
0
int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
676
0
    sds cmd;
677
0
    int len;
678
0
    int status;
679
0
    len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
680
0
    if (len < 0)
681
0
        return REDIS_ERR;
682
0
    status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
683
0
    sdsfree(cmd);
684
0
    return status;
685
0
}
686
687
0
int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
688
0
    int status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
689
0
    return status;
690
0
}