Source: transports.js

/**
 * @module transports
 */

var dates = require('./dates');
var uuid = require('node-uuid');
var redis = require('redis');
var util = require('util'),
    winston = require('winston');
var async = require('async');
var os = require('os');
var retry = require('retry');

//TODO: Fix exit listener
//TODO: purge activeCache if idle

/**
 * Representation of specific event-type cache in Redis.
 *
 * Handles all communication directly with Redis layer and contains the
 * state of an individual cache.
 *
 * Mostly used for internal purposes in this module to make EventStreamer
 * & RedisEventCache classes easier to deal with.
 *
 * @param {String} event_type
 * @param {Number} chunkSize
 * @param {redis.RedisClient} redisClient
 * @constructor
 */
var EventTypeCache = function(event_type, chunkSize, redisClient){
    this.event_type     =   event_type;
    this.chunkSize      =   chunkSize;
    this.redisClient    =   redisClient;

    // cache_id used to identify this handler's keys in cache in case multiple handlers
    // are instantiated for same event_type, which would happen if node is running
    // on multiple CPU's simultaneously.
    this.cache_id           =   uuid.v4();
    // Redis keys & key prefixes
    this.cacheKey           =   util.format('%s_%s',this.cache_id, this.event_type);
    this.chunkQueueKey      =   util.format('%s_%s_chunkQueue',this.cache_id, this.event_type);
    this.channel            =   util.format('%s_%s', this.cache_id, this.event_type);
    //TODO: would like to find a way to avoid having to store this on object
    //TODO: tried a bunch of different ways but couldn't ensure that active cache would
    //TODO: be capped properly under high-load scenarios in redis (see: race condition), so I ultimately
    //TODO: just caved and stored it on the object
    this._activeChunk       =   [];
};
EventTypeCache.prototype._encodeInsertIds = function(insertIds){
    return JSON.stringify(insertIds);
};

EventTypeCache.prototype._decodeInsertIds = function(insertIds_str){
    // since empty string will be added as new member of scored set
    // when the last chunk has reached its limit, add decoding logic in here
    // to split string on commas OR return empty list if empty string found
    return JSON.parse(insertIds_str);
};
EventTypeCache.prototype.writeChunkToCache = function(insertIds, callback){
    var self = this;
    var score = Date.now();
    var encoded_chunk = self._encodeInsertIds(insertIds);
    self.redisClient.ZADD(self.chunkQueueKey,score,encoded_chunk,function(err, reply){
        if (err) return callback(err);
        return callback(null, encoded_chunk);
    });
};

/**
 * Adds row to cache and performs all of the necessary operations to ensure
 * integrity of the cache according to chunkSize parameters.
 *
 * Most important function in terms of ensuring integrity of the cache, as it is
 * responsible for publishing message to redis event type channel indicating what
 * rows to push to EventStreamer
 *
 * Implements a relatively simple algorithm to ensure that:
 * - No rows are removed from the cache before they can be streamed to datastore
 * - Messages are published only when equally-sized chunk of rows has been cached
 * - Caching process does not block the event loop
 *
 * Full steps, each executed within callback:
 * TODO: FILL IN
 *
 * @param insertId
 * @param row_json
 * @param callback
 */
EventTypeCache.prototype.addRowToCache = function(insertId, row_json, callback){
    var self = this;
    var encoded_row = JSON.stringify(row_json);
    // deal with activeChunk stored in memory first to ensure it gets reset properly
    // can't do this inside async callback or you'll cause a race condition
    var chunkToWrite = false;
    var len = self._activeChunk.push(insertId);
    if (len == self.chunkSize){
        chunkToWrite = self._activeChunk;
        self._activeChunk = [];
    }
    //write to redis
    self.redisClient.HSET(self.cacheKey,insertId,encoded_row,function(err,reply){
        if (err) return callback(err);
        if (chunkToWrite){
            // if it's reached chunkSize, save chunk to queue and publish message
            // use timestamp in MS as score, there should be an infinitesimally small
            // chance of collision here
            self.writeChunkToCache(chunkToWrite, function(err, encoded_chunk){
                if (err) return callback(err);
                self.redisClient.publish(self.channel,encoded_chunk);
                return callback(null, true);
            });
        }
    });
};

/**
 * Retrieves and decodes event rows from cache by insertId
 *
 * @param {String} insertIds_str encoded string of insertIds, which are keys in redis hash
 * @param {Function} callback takes (err, parsed_rows_array)
 */
EventTypeCache.prototype.getRowsFromCache = function(insertIds_str, callback){
    var self = this;
    var insertIds = self._decodeInsertIds(insertIds_str);
    self.redisClient.HMGET(self.cacheKey, insertIds, function(err, reply){
        if (err) return callback(err);
        var parsed_rows = [];
        reply.forEach(function(item){
            parsed_rows.push(JSON.parse(item))
        });
        callback(null, parsed_rows);
    });
};

/**
 * Handles cleanup of cache elements -- chunk from chunkQueue and the cache itself.
 *
 * @param {Array} insertIds_str array of insertIds in chunk to remove
 * @param callback
 */
EventTypeCache.prototype.clearChunkFromCache = function(insertIds_str, callback){
    var self = this;
    //var insertIds_str = self._encodeInsertIds(insertIds);
    self.redisClient.ZREM(self.chunkQueueKey, insertIds_str, function(err, reply){
        if (err) return callback(err);
        try {
            var args = self._decodeInsertIds(insertIds_str);
        } catch (e) {
            return callback('No insertIds to clear from cache');
        }

        // annoying but have to do it to expand insertIds array into args
        args.splice(0, 0, self.cacheKey);
        args.push(function(err, reply){
            if (err) return callback(err);
            return callback(null, reply);
        });
        self.redisClient.HDEL.apply(self.redisClient, args);
    });
};

/**
 * Almost trivial wrapper to zrange, but added so users don't have
 * to interact with cache directly.
 */
EventTypeCache.prototype.getAllChunks = function(callback){
    var self = this;
    self.redisClient.ZRANGE(self.chunkQueueKey, 0, -1, function(err, reply){
        if (err) return callback(err);
        //NOTE: This will clear the in-memory cache as well, i.e. the activeChunk
        callback(null, reply)
    });
};

/**
 * Gets this._activeChunk contents from memory and clears array.
 *
 * Should be used with caution, and only on events like exit, SIGINT, etc.
 *
 * @param callback takes (err, chunk_str)
 * @returns {*}
 */
EventTypeCache.prototype.getAndClearActiveChunk = function(callback){
    var self = this;
    if (self._activeChunk.length == 0){
        return callback(null, null);
    } else {
        var chunkStr = self._activeChunk;
        self._activeChunk = [];
        return callback(null, self._encodeInsertIds(chunkStr));
    }
};

/* --------------------- BEGIN EVENTSTREAMER ----------------------- */
/**
 * Base class to hold the scaffolding to stream events to desired data store from
 * Redis caching system exposed by RedisEventCache transport.
 *
 * Subclasses should handle all of the option parsing
 *
 * @param {Object} options
 * @param {Number} options.chunkSize size of each chunk to stream. Hard lowerlimit is 20.
 * @param {Object} options.retryOptions options object to pass to retry.operation() call.  See
 *        retry docs for details.
 * @type {Function}
 * @class
 */
var EventStreamer = exports.EventStreamer = function(options){
    options             =   options || {};
    this.chunkSize      =   options.chunkSize || 20;
    this.retry_options  =   options.retry_options || {};
};
EventStreamer.prototype._addCacheHandlers = function(cache_handlers){
    this.cache_handlers = cache_handlers;
};
/**
 * Base method to stream event chunks from cache to data store.
 *
 * All subclasses should override this method and customize to write to
 * individual storage engine.
 *
 * This method on the base class simply logs all lines to console,
 * should not be used in a production environment.
 *
 * @param {String} event_type event_type key passed to meta object in logger.log method
 * @param {Array} cached_rows array of objects retrieved from redis
 * @param callback takes (err, response)
 */
EventStreamer.prototype.streamEvents = function(event_type, cached_rows, callback){
    try {
        var msg = util.format("==== Begin streaming chunk (length = %s) from cache ====",
                              cached_rows.length);
        console.log(msg);
        cached_rows.forEach(function(item){
            console.log(event_type + ": " + JSON.stringify(item));
        });
        console.log("==== End of cached chunk, chunk will now be deleted... ====");
        callback(null, true);
    } catch (e) {
        callback(e);
    }
};

/**
 * Placeholder hook for subclasses to override
 */
EventStreamer.prototype.getEventSchema = function(){
    throw new Error('Event Schema is not defined')
};

//
/**
 * Wraps this.streamEvents to handle retrieval of specific cached rows based on insertIds
 * from cached hash object, and deletion of rows on successful stream
 *
 * Will also auto-retry calls to streamEvents
 *
 * @param {String} event_type event_type key
 * @param {String} insertIds_str encoded string of insertIds
 * @param {EventTypeCache} cache_handler instance of EventTypeCache
 * @param {Boolean} use_retry whether or not to use retry handler.  Should only be false if being called
 *        on exit.
 * @param {Function} callback callback
 */
EventStreamer.prototype.getRowsAndStream = function(event_type,insertIds_str,cache_handler,use_retry,callback){
    var self = this;
    cache_handler.getRowsFromCache(insertIds_str, function(err, parsed_rows){
        if (err) return callback(err);
        // now stream events
        if (use_retry){
            // wrap in retry handler to automatically retry call w/ exponential backoff
            // if it fails
            var operation = retry.operation(self.retry_options);
            operation.attempt(function(retryAttempt){
                self.streamEvents(event_type,parsed_rows,function(err, response){
                    if (!operation.retry(err)){
                        // if successful, clear the cache of chunk
                        cache_handler.clearChunkFromCache(insertIds_str, function(err, reply){
                            if (err) return callback(err);
                            callback(null, reply);
                        });
                    } else {
                        console.log('Attempt to Stream failed, retrying')
                    }
                    callback(err ? operation.mainError() : null, response);
                });
            });
        } else {
            self.streamEvents(event_type, parsed_rows, function (err, response) {
                if (err) return callback(err);
                cache_handler.clearChunkFromCache(insertIds_str, function (err, reply) {
                    if (err) return callback(err);
                    callback(null, reply);
                });
            });
        }
    });
};

/**
 * Helper method to stream and clear _activeCache in-memory cache on each handler.
 *
 * Useful for calling on process exit or termination to clean up in-memory
 * chunk.
 *
 * @param callback
 */
EventStreamer.prototype.streamAndClearActiveChunk = function(callback){
    var self = this;
    var async_funcs = [];
    Object.keys(self.cache_handlers).forEach(function(event_type){
        var cache = self.cache_handlers[event_type];
        async_funcs.push(function(callback){
            cache.getAndClearActiveChunk(function(err, chunk_str){
                if (err) return callback(err);
                if (chunk_str){
                    self.getRowsAndStream(event_type,chunk_str,cache,false,function(err, reply){
                        if (err) return callback(err);
                        callback()
                    });
                } else {
                    callback();
                }
            });
        });
    });
    async.parallel(async_funcs, function(err, result){
        if (err) return callback(err);
        callback(null, true);
    });
};

/**
 * Method to handle event cache using Redis subscription to event_type channels.
 *
 * Essentially just wraps this.StreamEvents and pushes cache lines to it when
 * they are ready, then deletes them from the cache on successful push.
 *
 * Adds a subscriber to all `event_type` channels which handles all messages indicating
 * that cache chunk is ready.
 *
 * Also adds event listener to `process.SIGINT` to stream all cached events on Node
 * process interruption.
 *
 * @param {redis.RedisClient} defaultRedisClient default read/write client
 * @param {redis.RedisClient} subscriberRedisClient client that subscribes to messages
 */
EventStreamer.prototype.bindToCache = function(defaultRedisClient,subscriberRedisClient){
    var self = this;
    // subscribe to all event channels in config file

    //create a small router mapping redis channels to cache_handlers
    var channel_router = {};
    for (var event_type in self.cache_handlers){
        if (self.cache_handlers.hasOwnProperty(event_type)){
            var ch = self.cache_handlers[event_type];
            channel_router[ch.channel] = ch;
        }
    }
    var channels = Object.keys(channel_router);

    // need a new client as a client in "subscribe" mode cannot get/set values
    subscriberRedisClient.subscribe.apply(subscriberRedisClient, channels);
    // listen for messages on event_type channels, which indicate that a cache
    // chunk is ready to be streamed.
    subscriberRedisClient.on("message", function(channel, insertIds_str){
        //insertIds (keys in redis hash object) come encoded in the message
        //as CSV list
        //console.log("Received Message for n = " + n);
        var cache_handler = channel_router[channel];
        var event_type = cache_handler.event_type;
        self.getRowsAndStream(event_type, insertIds_str, cache_handler, true, function(err, reply){
            if (err) console.log(err);
        });
    });

    //TODO: I could never get these fucking listeners to work properly.  Works when running
    //TODO: this module as a standalone test, but when these listeners are active in a real
    //TODO: node environment, the process simply exits before the activeChunk can be streamed.
    //TODO: Would really like to get this working as it would save a lot of headache but I just
    //TODO: don't have the patience now.
    // Listener to clear each event_type cache on process exit
    //exitHook(self.streamAndClearActiveCache);
    //process.on('SIGINT', function(){
    //    self.streamAndClearActiveChunk(function(err, success){
    //        if (err) console.log(err);
    //        if (success){
    //            console.log('All RedisEventCache activeChunks streamed and cleared, process will now exit.');
    //        } else {
    //            console.log('Something went wrong, activeChunk was not cleared properly. Process will now exit.')
    //        }
    //        process.exit(0);
    //
    //    })
    //});
    //process.on('SIGTERM', function(){
    //    self.streamAndClearActiveChunk(function(err, success){
    //        if (err) console.log(err);
    //        if (success){
    //            console.log('All RedisEventCache activeChunks streamed and cleared, process will now exit.');
    //        } else {
    //            console.log('Something went wrong, activeChunk was not cleared properly. Process will now exit.')
    //        }
    //        process.exit(0);
    //    })
    //});
    //process.on('exit', function(){
    //    self.streamAndClearActiveChunk(function(err, success){
    //        if (err) console.log(err);
    //        if (success){
    //            console.log('All RedisEventCache activeChunks streamed and cleared, process will now exit.');
    //        } else {
    //            console.log('Something went wrong, activeChunk was not cleared properly. Process will now exit.')
    //        }
    //        process.exit(0);
    //    })
    //});
};

/* -------------------- BEGIN Actual Winston Transport ------------------------ */

/**
 * Custom Redis "Event" logging transport for Winston.
 *
 * Internally, it uses Redis' built-in pub/sub messaging feature to notify
 * its listener method when the cache is ready to be purged.
 *
 * You would log SOME_EVENT type events to the table "test_events" using the following
 * call to the `log` method:
 *
 * ```
 * logger.log("info", msg, {type: "SOME_EVENT, val2: "a val", val2: "another val"})
 * ```
 *
 * IMPORTANT: If the `meta` key `type` is not found, or the given `type` is not configured in the
 * provided config file, the event will be ignored in this transport.
 *
 * @class
 * @todo: Cleanup (a lot)
 * @param  {Object} options options object
 * @param  {String} [options.level="info"] logging level
 * @param  {Boolean} [options.timestamp=true] whether to include UTC timestamp field in log row.
 * @param  {String} [options.redis_host='127.0.0.1'] host for redis client
 * @param  {Number} [options.redis_port=6379] port for redis client.
 * @param  {Object} [options.redis_options={}] options for redis client.
 * @param  {EventStreamer} [options.eventStreamer=EventStreamer()] EventStreamer instance for specific datastore
 * @param  {Object} [options.eventSchema] Object where keys = event_types and
 *         values = Array of fields for each type.  If not provided, will try to get this from
 *         this.evenStreamer.getEventSchema method.  If provided, will override whatever this method
 *         returns. **NOTE:** To return ALL keys from logged `meta` object, use special value `{"event_type": "*"}`
 *
 * @type {Function}
 */
var RedisEventCache = exports.RedisEventCache = winston.transports.RedisEventCache = function(options){
    this.name = "RedisEventCache";

    options = options || {};
    this.level          =   options.level || "info";
    this.timestamp      =   options.timestamp || true;
    this.redis_host     =   options.redis_host || '127.0.0.1';
    this.redis_port     =   options.redis_port || 6379;
    this.redis_options  =   options.redis_options || {};
    this.eventStreamer  =   options.eventStreamer || new EventStreamer();
    //default to 5 if not present on eventStreamer instance
    this.chunkSize      =   this.eventStreamer.chunkSize || 5;

    try {
        this.eventSchema = options.eventSchema || this.eventStreamer.getEventSchema();
    } catch (e){
        throw new Error("Error thrown trying to figure out what eventSchema to use:" + e +
        ". You must specify eventSchema either in options or using " +
        "eventStreamer.getEventSchema hook.")
    }
    // create redis clients
    this.defaultRedisClient = redis.createClient(this.redis_port,
        this.redis_host, this.redis_options);
    this.subscriberRedisClient = redis.createClient(this.redis_port,
        this.redis_host, this.redis_options);

    this.cache_handlers = {};
    var self = this;
    Object.keys(this.eventSchema).forEach(function(event){
        self.cache_handlers[event] = new EventTypeCache(event, self.chunkSize, self.defaultRedisClient)
    });

    // finally, bind eventStreamer to event channel messages to handle
    // event cache
    this.eventStreamer._addCacheHandlers(this.cache_handlers);
    this.eventStreamer.bindToCache(this.defaultRedisClient, this.subscriberRedisClient);
};
util.inherits(RedisEventCache, winston.Transport);

/**
 * Inserts row into Redis event-specific cache for cache listener to deal with.
 *
 * @param {String} event_type
 * @param {String} insertId unique ID used as field in redis cache, and used to push to BigQuery
 * @param {Object} row_json
 * @param {Function} callback callback function, takes (err, success)
 * @private
 */
RedisEventCache.prototype._addRowToCache = function(event_type, insertId, row_json, callback){
    var cache_handler = this.cache_handlers[event_type];
    cache_handler.addRowToCache(insertId, row_json, function(err, reply){
        if (err) return callback(err);
        callback(null, reply);
    });
};

/**
 * Base logger method.
 *
 * Ignores any messages without `type` field present in meta object, or with
 * a `type` not configured in `this.event_types`.
 *
 * Otherwise, parses log message and metadata and sends to this._addRowtoCache,
 * where all the caching logic is taken care of.
 *
 * NOTE: also augments all loglines with "hostname" field
 *
 * @param level
 * @param msg
 * @param meta
 * @param callback
 */
RedisEventCache.prototype.log = function(level, msg, meta, callback){
    var self = this;
    // only process if there is a table associated with this type
    var event_types = Object.keys(self.eventSchema);
    if (event_types.indexOf(meta.type) > -1){
        // first create row
        var fields = self.eventSchema[meta.type];
        var row_json = {};
        // check for '*' special field declaration used to pass all
        // fields in meta
        if (fields == '*'){
            row_json = meta;
        } else {
            fields.forEach(function(item){
                //TODO: Should do some basic type checking here
                row_json[item] = meta[item];
            });
        }
        //TODO: Get this from logging formatter instead
        if (self.timestamp){
            row_json.tstamp = dates.isoFormatUTCNow('Z');
        }
        row_json.level = level;
        row_json.msg = msg;
        row_json.hostname = os.hostname();
        var insertId = uuid.v1(); // timestamp-based UUID.
        var row = {
            insertId: uuid.v1(), // unique insert ID prevents duplicate entries
            json: row_json
        };
        // Now push row to the redis cache
        self._addRowToCache(meta.type, insertId, row, function(err, success){
            if (err) return callback(err);
            return callback(null, success);
        });
    }
};

/**
 * Method to clear out and stream all Redis event caches of matching types
 *
 * !!!!! WARNING: ONLY USE AFTER SERVER CRASHES, DO NOT USE WHILE SERVER IS RUNNING
 * OR ELSE ACTIVE CACHES WILL BE DESTROYED.  !!!!!
 *
 * NOTE: THIS IS REALLY A HACK BECAUSE I CAN'T GET EXIT LISTENERS TO WORK
 * PROPERLY.  IF YOU CAN GET EXIT LISTENERS TO WORK (I.E. GET CACHE TO CLEAR
 * ON EXIT) THEN YOU SHOULD NEVER HAVE TO USE THIS EVER EVER EVER
 *
 * !!!! YOU HAVE BEEN WARNED !!!!!
 *
 * @private
 */
RedisEventCache.prototype.clearZombieEventCaches = function(callback, limit){
    var self = this;
    var outer_funcs = [];
    limit = limit || 100000;
    Object.keys(self.eventSchema).forEach(function(event_type){
        if (self.eventSchema.hasOwnProperty(event_type)){
            outer_funcs.push(function(cb){
                // loop over all keys matching event types found in Redis
                self.defaultRedisClient.KEYS('*_' + event_type, function(err, results){
                    if (err) return console.error(err);
                    if (results.length > 0){
                        var these_results = results.slice(0,limit-1);
                        console.log('Zombie event caches (cachi?) found: ' + these_results.join('\n'));
                        console.log('Will now stream each cache and clear...');
                        // push each stream to an async func array and execute
                        async.each(these_results, function(hash_key, each_callback){
                            self.defaultRedisClient.HGETALL(hash_key, function(err, hash){
                                if (err) {
                                    console.error(err);
                                    return each_callback();
                                }
                                var cached_rows = [];
                                var insertIds = [];
                                Object.keys(hash).forEach(function(insertId){
                                    if (hash.hasOwnProperty(insertId)){
                                        insertIds.push(insertId);
                                        cached_rows.push(JSON.parse(hash[insertId]));
                                    }
                                });
                                self.eventStreamer.streamEvents(event_type, cached_rows, function(err, reply){
                                    if (err) {
                                        console.error('Error streaming event cache ' + hash_key + ':');
                                        console.error(err);
                                        return each_callback();
                                    }
                                    console.log('Success! Streamed rows in event cache ' + hash_key + ': ' + JSON.stringify(reply));
                                    // only delete rows which were streamed, just in case
                                    // if all rows were streamed successfully, this is the same thing as
                                    // just deleting the key altogether
                                    //use insertIds array as args to apply to HDEL command
                                    //since you can't just specify hash fields as an array in the command
                                    insertIds.splice(0,0,hash_key);
                                    insertIds.push(function(err, reply){
                                        if (err) {
                                            console.error('Error deleting fields from hash key ' + hash_key + ': ' + err);
                                            return each_callback();
                                        }
                                        console.log('Successfully deleted ' + reply + ' fields from hash key ' + hash_key);
                                        return each_callback();
                                    });
                                    self.defaultRedisClient.HDEL.apply(self.defaultRedisClient, insertIds);
                                });
                            });
                        }, function(err){
                            if (err) return cb(err);
                            return cb();
                        });
                    } else {
                        console.log('No event caches (cachi?) found, nothing to do here. Exiting...');
                        return cb();
                    }
                });
            });

        }
    });
    async.parallel(outer_funcs, function(err){
        if (err) return callback(err);
        return callback();
    })
};

///* TESTING ONLY */
//var test_schema = {
//    "TEST": ['level','msg','tstamp','val1','val2']
//};
//var logger = new winston.Logger({
//    transports: [
//        new winston.transports.RedisEventCache({eventSchema: test_schema})
//    ]
//});
//for (var i=0; i < 119; i++){
//    logger.info(i.toString() + ' ajiodn', {type: "TEST", val1: i.toString(), val2: i.toString()});
//}