/**
* @module google/bigquery_utils
*/
var google = require('googleapis');
var util = require('util');
var auth = require('./auth');
var transports = require('../transports');
var fs = require('fs');
//TODO: Refactor using gcloud API rather than generic googleapi's REST wrapper.
//TODO: At the very least, wouldn't have to reauthenticate on every call, might cut down some overhead
exports.createBigQueryTableFromConfig = createBigQueryTableFromConfig;
exports.loadFullBigQueryConfig = loadFullBigQueryConfig;
var BIGQUERY_SCOPE = exports.BIGQUERY_SCOPE = 'https://www.googleapis.com/auth/bigquery';
/**
* Helper function to create new BigQuery tables from BigQuery logging config files.
*
* @param {Object} authClient fully authorized authentication client, either JWT, Oauth2 or Compute
* @param {Object} config JSON BigQuery config object following appropriate specs
* @param {String} event_type specific event type to create table for
* @param {Function} callback callback
*/
function createBigQueryTableFromConfig(authClient, config, event_type, callback){
var bq = google.bigquery({auth: authClient, version: 'v2'});
var params = {
projectId: config.projectId,
datasetId: config.event_tables[event_type].datasetId,
resource: {
kind: "bigquery#table",
tableReference: {
projectId: config.projectId,
datasetId: config.event_tables[event_type].datasetId,
tableId: config.event_tables[event_type].tableId
},
schema: config.event_tables[event_type].schema
}
};
bq.tables.insert(params, function(err, response){
if (err) return callback(err);
return callback(null, response);
});
}
/**
* Helper to get full BigQuery config for logging transport
*
* Automatically joins in default HTTP logging config so there's
* no need to define redundantly in every repo.
*
* Simply adds any keys found in child config to base config and returns joined
* objects, no fancy logic.
*
* As such, any keys specified in child config WILL OVERRIDE base config values.
*
* @param [path='./bq_config.json'] - if null, will default to './bq_config.json'
* @param [baseConfigPath='/google/bq_config.json'] - path to base config file in config repo.
* If null, will default to '/google/bq_config.json'
*/
function loadFullBigQueryConfig(path, baseConfigPath){
path = path || './bq_config.json';
baseConfigPath = baseConfigPath || '/google/bq_config.json';
var base_config_dir = process.env["NODE_CONFIG_DIR"] || './config';
var base_config = JSON.parse(fs.readFileSync(base_config_dir + baseConfigPath, 'utf8'));
var child_config = JSON.parse(fs.readFileSync(path, 'utf8'));
var child_event_tables = child_config.event_tables;
if (child_event_tables){
for (var event in child_event_tables){
if (child_event_tables.hasOwnProperty(event)){
base_config.event_tables[event] = child_event_tables[event];
}
}
}
if (child_config.projectId){
base_config.projectId = child_config.projectId;
}
return base_config;
}
/**
* BigQuery implementation of EventStreamer.
*
* Streams log lines into BigQuery tables using insertAll() API command,
* using Redis to store lines locally to reduce the number of calls required.
*
* ## USAGE
* This implementation requires external configuration file defining field schema
* for each "type" of event being logged, as BQ table expects fixed field schema
* with rigid typing. It is recommended that you write custom logging
* methods to enforce each event-type schema for desired events.
*
* ## Ex:
* Given a config file containing:
*
* ```
* {"projectId": "project-id-781",
* "event_tables": {
* "SOME_EVENT": {
* "tableId": "test_events",
* "datasetId": "test_events_dataset",
* "schema": {
* "fields": [
* {"name": "level","type": "STRING"},
* {"name": "msg","type": "STRING"},
* {"name": "tstamp","type": "TIMESTAMP"},
* {"name": "val1","type": "STRING"},
* {"name": "val2","type": "STRING"}
* ]
* }
* }
* }
* ```
*
* You would log SOME_EVENT type events to the table "test_events" using the following
* call to the `log` method:
*
* ```
* $ logger.log("info", msg, {type: "SOME_EVENT, val2: "a val", val2: "another val"})
* ```
*
* IMPORTANT: If the `meta` key `type` is not found, or the given `type` is not configured in the
* provided config file, the event will be ignored in this transport.
*
* @todo: Add support for OAuth2.0, Compute authorization
*
* @class
* @param {String} configFile - path to BigQuery event schema config file. See specifications
* for this file above.
* @param {String} - Path to JWT secrets JSON file.
* @param {Number} chunkSize - number of rows to cache per event-type before sending to BigQuery.
* Default is 500 (maxRequestSize)
* @type {Function}
*/
var BigQueryEventStreamer = exports.BigQueryEventStreamer = function(config, jwtSecretsFile, chunkSize){
// parse config file parameters
transports.EventStreamer.call(this, {});
//var config = JSON.parse(fs.readFileSync(config, 'utf8'));
this.projectId = config.projectId;
this.event_tables = config.event_tables;
this.jwtSecretsFile = jwtSecretsFile;
this.google_api_version = 'v2';
// this is a hard limit on length of BigQuery insertAll row count,
// API will throw an error if more rows are sent in a request
this.maxRequestSize = 500;
// no magic to this number but shouldn't be much less in order to
// avoid bombarding Google API with requests
this.minChunkSize = 1;
this.chunkSize = chunkSize || this.maxRequestSize;
if (this.chunkSize < this.minChunkSize || chunkSize > this.maxRequestSize){
var msg = "chunkSize must be between "+this.minChunkSize+" and "+this.maxRequestSize;
throw new Error(msg);
}
};
util.inherits(BigQueryEventStreamer, transports.EventStreamer);
/**
* Executes API call to `BigQuery.insertAll()`
*
* Generates a single HTTP POST request to Google BigQuery REST endpoint
* containing all rows in cache, writing to table & dataset specified in config
* per event_type
*
* @param {String} event_type event type indicated by `meta['type']` val passed to `log` method
* @param {Array} cached_rows array of row objects
* @param {Function} callback
*/
BigQueryEventStreamer.prototype.streamEvents = function(event_type, cached_rows, callback){
var self = this;
auth.getJWTAuthClient(self.jwtSecretsFile, BIGQUERY_SCOPE,function(err, auth){
var bq = google.bigquery({auth: auth, version: self.google_api_version});
var event_table = self.event_tables[event_type];
var params = {
projectId: self.projectId,
datasetId: event_table.datasetId,
tableId: event_table.tableId,
resource: {kind: "bigquery#tableDataInsertAllRequest", rows: cached_rows}
};
bq.tabledata.insertAll(params, function(err, response){
if (err) return callback(err);
if (!response.insertErrors){
return callback(null, response)
} else {
return callback('API errors pushing log cache: ' +
JSON.stringify(response.insertErrors))
}
});
});
};
/**
* Hook to parse config file into eventSchema format accepted by transport
* class.
*/
BigQueryEventStreamer.prototype.getEventSchema = function(){
var self = this;
var event_schema = {};
for (var etype in self.event_tables){
var fields = [];
if (self.event_tables.hasOwnProperty(etype)){
self.event_tables[etype].schema.fields.forEach(function(item){
fields.push(item.name);
});
event_schema[etype] = fields;
}
}
return event_schema;
};