/**
* @module transports
*/
var dates = require('./dates');
var uuid = require('node-uuid');
var redis = require('redis');
var util = require('util'),
winston = require('winston');
var async = require('async');
var os = require('os');
var retry = require('retry');
//TODO: Fix exit listener
//TODO: purge activeCache if idle
/**
* Representation of specific event-type cache in Redis.
*
* Handles all communication directly with Redis layer and contains the
* state of an individual cache.
*
* Mostly used for internal purposes in this module to make EventStreamer
* & RedisEventCache classes easier to deal with.
*
* @param {String} event_type
* @param {Number} chunkSize
* @param {redis.RedisClient} redisClient
* @constructor
*/
var EventTypeCache = function(event_type, chunkSize, redisClient){
this.event_type = event_type;
this.chunkSize = chunkSize;
this.redisClient = redisClient;
// cache_id used to identify this handler's keys in cache in case multiple handlers
// are instantiated for same event_type, which would happen if node is running
// on multiple CPU's simultaneously.
this.cache_id = uuid.v4();
// Redis keys & key prefixes
this.cacheKey = util.format('%s_%s',this.cache_id, this.event_type);
this.chunkQueueKey = util.format('%s_%s_chunkQueue',this.cache_id, this.event_type);
this.channel = util.format('%s_%s', this.cache_id, this.event_type);
//TODO: would like to find a way to avoid having to store this on object
//TODO: tried a bunch of different ways but couldn't ensure that active cache would
//TODO: be capped properly under high-load scenarios in redis (see: race condition), so I ultimately
//TODO: just caved and stored it on the object
this._activeChunk = [];
};
EventTypeCache.prototype._encodeInsertIds = function(insertIds){
return JSON.stringify(insertIds);
};
EventTypeCache.prototype._decodeInsertIds = function(insertIds_str){
// since empty string will be added as new member of scored set
// when the last chunk has reached its limit, add decoding logic in here
// to split string on commas OR return empty list if empty string found
return JSON.parse(insertIds_str);
};
EventTypeCache.prototype.writeChunkToCache = function(insertIds, callback){
var self = this;
var score = Date.now();
var encoded_chunk = self._encodeInsertIds(insertIds);
self.redisClient.ZADD(self.chunkQueueKey,score,encoded_chunk,function(err, reply){
if (err) return callback(err);
return callback(null, encoded_chunk);
});
};
/**
* Adds row to cache and performs all of the necessary operations to ensure
* integrity of the cache according to chunkSize parameters.
*
* Most important function in terms of ensuring integrity of the cache, as it is
* responsible for publishing message to redis event type channel indicating what
* rows to push to EventStreamer
*
* Implements a relatively simple algorithm to ensure that:
* - No rows are removed from the cache before they can be streamed to datastore
* - Messages are published only when equally-sized chunk of rows has been cached
* - Caching process does not block the event loop
*
* Full steps, each executed within callback:
* TODO: FILL IN
*
* @param insertId
* @param row_json
* @param callback
*/
EventTypeCache.prototype.addRowToCache = function(insertId, row_json, callback){
var self = this;
var encoded_row = JSON.stringify(row_json);
// deal with activeChunk stored in memory first to ensure it gets reset properly
// can't do this inside async callback or you'll cause a race condition
var chunkToWrite = false;
var len = self._activeChunk.push(insertId);
if (len == self.chunkSize){
chunkToWrite = self._activeChunk;
self._activeChunk = [];
}
//write to redis
self.redisClient.HSET(self.cacheKey,insertId,encoded_row,function(err,reply){
if (err) return callback(err);
if (chunkToWrite){
// if it's reached chunkSize, save chunk to queue and publish message
// use timestamp in MS as score, there should be an infinitesimally small
// chance of collision here
self.writeChunkToCache(chunkToWrite, function(err, encoded_chunk){
if (err) return callback(err);
self.redisClient.publish(self.channel,encoded_chunk);
return callback(null, true);
});
}
});
};
/**
* Retrieves and decodes event rows from cache by insertId
*
* @param {String} insertIds_str encoded string of insertIds, which are keys in redis hash
* @param {Function} callback takes (err, parsed_rows_array)
*/
EventTypeCache.prototype.getRowsFromCache = function(insertIds_str, callback){
var self = this;
var insertIds = self._decodeInsertIds(insertIds_str);
self.redisClient.HMGET(self.cacheKey, insertIds, function(err, reply){
if (err) return callback(err);
var parsed_rows = [];
reply.forEach(function(item){
parsed_rows.push(JSON.parse(item))
});
callback(null, parsed_rows);
});
};
/**
* Handles cleanup of cache elements -- chunk from chunkQueue and the cache itself.
*
* @param {Array} insertIds_str array of insertIds in chunk to remove
* @param callback
*/
EventTypeCache.prototype.clearChunkFromCache = function(insertIds_str, callback){
var self = this;
//var insertIds_str = self._encodeInsertIds(insertIds);
self.redisClient.ZREM(self.chunkQueueKey, insertIds_str, function(err, reply){
if (err) return callback(err);
try {
var args = self._decodeInsertIds(insertIds_str);
} catch (e) {
return callback('No insertIds to clear from cache');
}
// annoying but have to do it to expand insertIds array into args
args.splice(0, 0, self.cacheKey);
args.push(function(err, reply){
if (err) return callback(err);
return callback(null, reply);
});
self.redisClient.HDEL.apply(self.redisClient, args);
});
};
/**
* Almost trivial wrapper to zrange, but added so users don't have
* to interact with cache directly.
*/
EventTypeCache.prototype.getAllChunks = function(callback){
var self = this;
self.redisClient.ZRANGE(self.chunkQueueKey, 0, -1, function(err, reply){
if (err) return callback(err);
//NOTE: This will clear the in-memory cache as well, i.e. the activeChunk
callback(null, reply)
});
};
/**
* Gets this._activeChunk contents from memory and clears array.
*
* Should be used with caution, and only on events like exit, SIGINT, etc.
*
* @param callback takes (err, chunk_str)
* @returns {*}
*/
EventTypeCache.prototype.getAndClearActiveChunk = function(callback){
var self = this;
if (self._activeChunk.length == 0){
return callback(null, null);
} else {
var chunkStr = self._activeChunk;
self._activeChunk = [];
return callback(null, self._encodeInsertIds(chunkStr));
}
};
/* --------------------- BEGIN EVENTSTREAMER ----------------------- */
/**
* Base class to hold the scaffolding to stream events to desired data store from
* Redis caching system exposed by RedisEventCache transport.
*
* Subclasses should handle all of the option parsing
*
* @param {Object} options
* @param {Number} options.chunkSize size of each chunk to stream. Hard lowerlimit is 20.
* @param {Object} options.retryOptions options object to pass to retry.operation() call. See
* retry docs for details.
* @type {Function}
* @class
*/
var EventStreamer = exports.EventStreamer = function(options){
options = options || {};
this.chunkSize = options.chunkSize || 20;
this.retry_options = options.retry_options || {};
};
EventStreamer.prototype._addCacheHandlers = function(cache_handlers){
this.cache_handlers = cache_handlers;
};
/**
* Base method to stream event chunks from cache to data store.
*
* All subclasses should override this method and customize to write to
* individual storage engine.
*
* This method on the base class simply logs all lines to console,
* should not be used in a production environment.
*
* @param {String} event_type event_type key passed to meta object in logger.log method
* @param {Array} cached_rows array of objects retrieved from redis
* @param callback takes (err, response)
*/
EventStreamer.prototype.streamEvents = function(event_type, cached_rows, callback){
try {
var msg = util.format("==== Begin streaming chunk (length = %s) from cache ====",
cached_rows.length);
console.log(msg);
cached_rows.forEach(function(item){
console.log(event_type + ": " + JSON.stringify(item));
});
console.log("==== End of cached chunk, chunk will now be deleted... ====");
callback(null, true);
} catch (e) {
callback(e);
}
};
/**
* Placeholder hook for subclasses to override
*/
EventStreamer.prototype.getEventSchema = function(){
throw new Error('Event Schema is not defined')
};
//
/**
* Wraps this.streamEvents to handle retrieval of specific cached rows based on insertIds
* from cached hash object, and deletion of rows on successful stream
*
* Will also auto-retry calls to streamEvents
*
* @param {String} event_type event_type key
* @param {String} insertIds_str encoded string of insertIds
* @param {EventTypeCache} cache_handler instance of EventTypeCache
* @param {Boolean} use_retry whether or not to use retry handler. Should only be false if being called
* on exit.
* @param {Function} callback callback
*/
EventStreamer.prototype.getRowsAndStream = function(event_type,insertIds_str,cache_handler,use_retry,callback){
var self = this;
cache_handler.getRowsFromCache(insertIds_str, function(err, parsed_rows){
if (err) return callback(err);
// now stream events
if (use_retry){
// wrap in retry handler to automatically retry call w/ exponential backoff
// if it fails
var operation = retry.operation(self.retry_options);
operation.attempt(function(retryAttempt){
self.streamEvents(event_type,parsed_rows,function(err, response){
if (!operation.retry(err)){
// if successful, clear the cache of chunk
cache_handler.clearChunkFromCache(insertIds_str, function(err, reply){
if (err) return callback(err);
callback(null, reply);
});
} else {
console.log('Attempt to Stream failed, retrying')
}
callback(err ? operation.mainError() : null, response);
});
});
} else {
self.streamEvents(event_type, parsed_rows, function (err, response) {
if (err) return callback(err);
cache_handler.clearChunkFromCache(insertIds_str, function (err, reply) {
if (err) return callback(err);
callback(null, reply);
});
});
}
});
};
/**
* Helper method to stream and clear _activeCache in-memory cache on each handler.
*
* Useful for calling on process exit or termination to clean up in-memory
* chunk.
*
* @param callback
*/
EventStreamer.prototype.streamAndClearActiveChunk = function(callback){
var self = this;
var async_funcs = [];
Object.keys(self.cache_handlers).forEach(function(event_type){
var cache = self.cache_handlers[event_type];
async_funcs.push(function(callback){
cache.getAndClearActiveChunk(function(err, chunk_str){
if (err) return callback(err);
if (chunk_str){
self.getRowsAndStream(event_type,chunk_str,cache,false,function(err, reply){
if (err) return callback(err);
callback()
});
} else {
callback();
}
});
});
});
async.parallel(async_funcs, function(err, result){
if (err) return callback(err);
callback(null, true);
});
};
/**
* Method to handle event cache using Redis subscription to event_type channels.
*
* Essentially just wraps this.StreamEvents and pushes cache lines to it when
* they are ready, then deletes them from the cache on successful push.
*
* Adds a subscriber to all `event_type` channels which handles all messages indicating
* that cache chunk is ready.
*
* Also adds event listener to `process.SIGINT` to stream all cached events on Node
* process interruption.
*
* @param {redis.RedisClient} defaultRedisClient default read/write client
* @param {redis.RedisClient} subscriberRedisClient client that subscribes to messages
*/
EventStreamer.prototype.bindToCache = function(defaultRedisClient,subscriberRedisClient){
var self = this;
// subscribe to all event channels in config file
//create a small router mapping redis channels to cache_handlers
var channel_router = {};
for (var event_type in self.cache_handlers){
if (self.cache_handlers.hasOwnProperty(event_type)){
var ch = self.cache_handlers[event_type];
channel_router[ch.channel] = ch;
}
}
var channels = Object.keys(channel_router);
// need a new client as a client in "subscribe" mode cannot get/set values
subscriberRedisClient.subscribe.apply(subscriberRedisClient, channels);
// listen for messages on event_type channels, which indicate that a cache
// chunk is ready to be streamed.
subscriberRedisClient.on("message", function(channel, insertIds_str){
//insertIds (keys in redis hash object) come encoded in the message
//as CSV list
//console.log("Received Message for n = " + n);
var cache_handler = channel_router[channel];
var event_type = cache_handler.event_type;
self.getRowsAndStream(event_type, insertIds_str, cache_handler, true, function(err, reply){
if (err) console.log(err);
});
});
//TODO: I could never get these fucking listeners to work properly. Works when running
//TODO: this module as a standalone test, but when these listeners are active in a real
//TODO: node environment, the process simply exits before the activeChunk can be streamed.
//TODO: Would really like to get this working as it would save a lot of headache but I just
//TODO: don't have the patience now.
// Listener to clear each event_type cache on process exit
//exitHook(self.streamAndClearActiveCache);
//process.on('SIGINT', function(){
// self.streamAndClearActiveChunk(function(err, success){
// if (err) console.log(err);
// if (success){
// console.log('All RedisEventCache activeChunks streamed and cleared, process will now exit.');
// } else {
// console.log('Something went wrong, activeChunk was not cleared properly. Process will now exit.')
// }
// process.exit(0);
//
// })
//});
//process.on('SIGTERM', function(){
// self.streamAndClearActiveChunk(function(err, success){
// if (err) console.log(err);
// if (success){
// console.log('All RedisEventCache activeChunks streamed and cleared, process will now exit.');
// } else {
// console.log('Something went wrong, activeChunk was not cleared properly. Process will now exit.')
// }
// process.exit(0);
// })
//});
//process.on('exit', function(){
// self.streamAndClearActiveChunk(function(err, success){
// if (err) console.log(err);
// if (success){
// console.log('All RedisEventCache activeChunks streamed and cleared, process will now exit.');
// } else {
// console.log('Something went wrong, activeChunk was not cleared properly. Process will now exit.')
// }
// process.exit(0);
// })
//});
};
/* -------------------- BEGIN Actual Winston Transport ------------------------ */
/**
* Custom Redis "Event" logging transport for Winston.
*
* Internally, it uses Redis' built-in pub/sub messaging feature to notify
* its listener method when the cache is ready to be purged.
*
* You would log SOME_EVENT type events to the table "test_events" using the following
* call to the `log` method:
*
* ```
* logger.log("info", msg, {type: "SOME_EVENT, val2: "a val", val2: "another val"})
* ```
*
* IMPORTANT: If the `meta` key `type` is not found, or the given `type` is not configured in the
* provided config file, the event will be ignored in this transport.
*
* @class
* @todo: Cleanup (a lot)
* @param {Object} options options object
* @param {String} [options.level="info"] logging level
* @param {Boolean} [options.timestamp=true] whether to include UTC timestamp field in log row.
* @param {String} [options.redis_host='127.0.0.1'] host for redis client
* @param {Number} [options.redis_port=6379] port for redis client.
* @param {Object} [options.redis_options={}] options for redis client.
* @param {EventStreamer} [options.eventStreamer=EventStreamer()] EventStreamer instance for specific datastore
* @param {Object} [options.eventSchema] Object where keys = event_types and
* values = Array of fields for each type. If not provided, will try to get this from
* this.evenStreamer.getEventSchema method. If provided, will override whatever this method
* returns. **NOTE:** To return ALL keys from logged `meta` object, use special value `{"event_type": "*"}`
*
* @type {Function}
*/
var RedisEventCache = exports.RedisEventCache = winston.transports.RedisEventCache = function(options){
this.name = "RedisEventCache";
options = options || {};
this.level = options.level || "info";
this.timestamp = options.timestamp || true;
this.redis_host = options.redis_host || '127.0.0.1';
this.redis_port = options.redis_port || 6379;
this.redis_options = options.redis_options || {};
this.eventStreamer = options.eventStreamer || new EventStreamer();
//default to 5 if not present on eventStreamer instance
this.chunkSize = this.eventStreamer.chunkSize || 5;
try {
this.eventSchema = options.eventSchema || this.eventStreamer.getEventSchema();
} catch (e){
throw new Error("Error thrown trying to figure out what eventSchema to use:" + e +
". You must specify eventSchema either in options or using " +
"eventStreamer.getEventSchema hook.")
}
// create redis clients
this.defaultRedisClient = redis.createClient(this.redis_port,
this.redis_host, this.redis_options);
this.subscriberRedisClient = redis.createClient(this.redis_port,
this.redis_host, this.redis_options);
this.cache_handlers = {};
var self = this;
Object.keys(this.eventSchema).forEach(function(event){
self.cache_handlers[event] = new EventTypeCache(event, self.chunkSize, self.defaultRedisClient)
});
// finally, bind eventStreamer to event channel messages to handle
// event cache
this.eventStreamer._addCacheHandlers(this.cache_handlers);
this.eventStreamer.bindToCache(this.defaultRedisClient, this.subscriberRedisClient);
};
util.inherits(RedisEventCache, winston.Transport);
/**
* Inserts row into Redis event-specific cache for cache listener to deal with.
*
* @param {String} event_type
* @param {String} insertId unique ID used as field in redis cache, and used to push to BigQuery
* @param {Object} row_json
* @param {Function} callback callback function, takes (err, success)
* @private
*/
RedisEventCache.prototype._addRowToCache = function(event_type, insertId, row_json, callback){
var cache_handler = this.cache_handlers[event_type];
cache_handler.addRowToCache(insertId, row_json, function(err, reply){
if (err) return callback(err);
callback(null, reply);
});
};
/**
* Base logger method.
*
* Ignores any messages without `type` field present in meta object, or with
* a `type` not configured in `this.event_types`.
*
* Otherwise, parses log message and metadata and sends to this._addRowtoCache,
* where all the caching logic is taken care of.
*
* NOTE: also augments all loglines with "hostname" field
*
* @param level
* @param msg
* @param meta
* @param callback
*/
RedisEventCache.prototype.log = function(level, msg, meta, callback){
var self = this;
// only process if there is a table associated with this type
var event_types = Object.keys(self.eventSchema);
if (event_types.indexOf(meta.type) > -1){
// first create row
var fields = self.eventSchema[meta.type];
var row_json = {};
// check for '*' special field declaration used to pass all
// fields in meta
if (fields == '*'){
row_json = meta;
} else {
fields.forEach(function(item){
//TODO: Should do some basic type checking here
row_json[item] = meta[item];
});
}
//TODO: Get this from logging formatter instead
if (self.timestamp){
row_json.tstamp = dates.isoFormatUTCNow('Z');
}
row_json.level = level;
row_json.msg = msg;
row_json.hostname = os.hostname();
var insertId = uuid.v1(); // timestamp-based UUID.
var row = {
insertId: uuid.v1(), // unique insert ID prevents duplicate entries
json: row_json
};
// Now push row to the redis cache
self._addRowToCache(meta.type, insertId, row, function(err, success){
if (err) return callback(err);
return callback(null, success);
});
}
};
/**
* Method to clear out and stream all Redis event caches of matching types
*
* !!!!! WARNING: ONLY USE AFTER SERVER CRASHES, DO NOT USE WHILE SERVER IS RUNNING
* OR ELSE ACTIVE CACHES WILL BE DESTROYED. !!!!!
*
* NOTE: THIS IS REALLY A HACK BECAUSE I CAN'T GET EXIT LISTENERS TO WORK
* PROPERLY. IF YOU CAN GET EXIT LISTENERS TO WORK (I.E. GET CACHE TO CLEAR
* ON EXIT) THEN YOU SHOULD NEVER HAVE TO USE THIS EVER EVER EVER
*
* !!!! YOU HAVE BEEN WARNED !!!!!
*
* @private
*/
RedisEventCache.prototype.clearZombieEventCaches = function(callback, limit){
var self = this;
var outer_funcs = [];
limit = limit || 100000;
Object.keys(self.eventSchema).forEach(function(event_type){
if (self.eventSchema.hasOwnProperty(event_type)){
outer_funcs.push(function(cb){
// loop over all keys matching event types found in Redis
self.defaultRedisClient.KEYS('*_' + event_type, function(err, results){
if (err) return console.error(err);
if (results.length > 0){
var these_results = results.slice(0,limit-1);
console.log('Zombie event caches (cachi?) found: ' + these_results.join('\n'));
console.log('Will now stream each cache and clear...');
// push each stream to an async func array and execute
async.each(these_results, function(hash_key, each_callback){
self.defaultRedisClient.HGETALL(hash_key, function(err, hash){
if (err) {
console.error(err);
return each_callback();
}
var cached_rows = [];
var insertIds = [];
Object.keys(hash).forEach(function(insertId){
if (hash.hasOwnProperty(insertId)){
insertIds.push(insertId);
cached_rows.push(JSON.parse(hash[insertId]));
}
});
self.eventStreamer.streamEvents(event_type, cached_rows, function(err, reply){
if (err) {
console.error('Error streaming event cache ' + hash_key + ':');
console.error(err);
return each_callback();
}
console.log('Success! Streamed rows in event cache ' + hash_key + ': ' + JSON.stringify(reply));
// only delete rows which were streamed, just in case
// if all rows were streamed successfully, this is the same thing as
// just deleting the key altogether
//use insertIds array as args to apply to HDEL command
//since you can't just specify hash fields as an array in the command
insertIds.splice(0,0,hash_key);
insertIds.push(function(err, reply){
if (err) {
console.error('Error deleting fields from hash key ' + hash_key + ': ' + err);
return each_callback();
}
console.log('Successfully deleted ' + reply + ' fields from hash key ' + hash_key);
return each_callback();
});
self.defaultRedisClient.HDEL.apply(self.defaultRedisClient, insertIds);
});
});
}, function(err){
if (err) return cb(err);
return cb();
});
} else {
console.log('No event caches (cachi?) found, nothing to do here. Exiting...');
return cb();
}
});
});
}
});
async.parallel(outer_funcs, function(err){
if (err) return callback(err);
return callback();
})
};
///* TESTING ONLY */
//var test_schema = {
// "TEST": ['level','msg','tstamp','val1','val2']
//};
//var logger = new winston.Logger({
// transports: [
// new winston.transports.RedisEventCache({eventSchema: test_schema})
// ]
//});
//for (var i=0; i < 119; i++){
// logger.info(i.toString() + ' ajiodn', {type: "TEST", val1: i.toString(), val2: i.toString()});
//}