Class: EventStreamer

transports~EventStreamer

new EventStreamer(options)

Base class to hold the scaffolding to stream events to desired data store from Redis caching system exposed by RedisEventCache transport.

Subclasses should handle all of the option parsing

Parameters:
Name Type Description
options Object
Properties
Name Type Description
chunkSize Number

size of each chunk to stream. Hard lowerlimit is 20.

retryOptions Object

options object to pass to retry.operation() call. See retry docs for details.

Source:

Methods

bindToCache(defaultRedisClient, subscriberRedisClient)

Method to handle event cache using Redis subscription to event_type channels.

Essentially just wraps this.StreamEvents and pushes cache lines to it when they are ready, then deletes them from the cache on successful push.

Adds a subscriber to all event_type channels which handles all messages indicating that cache chunk is ready.

Also adds event listener to process.SIGINT to stream all cached events on Node process interruption.

Parameters:
Name Type Description
defaultRedisClient redis.RedisClient

default read/write client

subscriberRedisClient redis.RedisClient

client that subscribes to messages

Source:

getEventSchema()

Placeholder hook for subclasses to override

Source:

getRowsAndStream(event_type, insertIds_str, cache_handler, use_retry, callback)

Wraps this.streamEvents to handle retrieval of specific cached rows based on insertIds from cached hash object, and deletion of rows on successful stream

Will also auto-retry calls to streamEvents

Parameters:
Name Type Description
event_type String

event_type key

insertIds_str String

encoded string of insertIds

cache_handler EventTypeCache

instance of EventTypeCache

use_retry Boolean

whether or not to use retry handler. Should only be false if being called on exit.

callback function

callback

Source:

streamAndClearActiveChunk(callback)

Helper method to stream and clear _activeCache in-memory cache on each handler.

Useful for calling on process exit or termination to clean up in-memory chunk.

Parameters:
Name Type Description
callback
Source:

streamEvents(event_type, cached_rows, callback)

Base method to stream event chunks from cache to data store.

All subclasses should override this method and customize to write to individual storage engine.

This method on the base class simply logs all lines to console, should not be used in a production environment.

Parameters:
Name Type Description
event_type String

event_type key passed to meta object in logger.log method

cached_rows Array

array of objects retrieved from redis

callback

takes (err, response)

Source: