Source: google/bigquery_utils.js

/**
 * @module google/bigquery_utils
 */

var google = require('googleapis');
var util = require('util');
var auth = require('./auth');
var transports = require('../transports');
var fs = require('fs');

//TODO: Refactor using gcloud API rather than generic googleapi's REST wrapper.
//TODO: At the very least, wouldn't have to reauthenticate on every call, might cut down some overhead

exports.createBigQueryTableFromConfig = createBigQueryTableFromConfig;
exports.loadFullBigQueryConfig = loadFullBigQueryConfig;

var BIGQUERY_SCOPE = exports.BIGQUERY_SCOPE = 'https://www.googleapis.com/auth/bigquery';

/**
 * Helper function to create new BigQuery tables from BigQuery logging config files.
 *
 * @param {Object} authClient fully authorized authentication client, either JWT, Oauth2 or Compute
 * @param {Object} config JSON BigQuery config object following appropriate specs
 * @param {String} event_type specific event type to create table for
 * @param {Function} callback callback
 */
function createBigQueryTableFromConfig(authClient, config, event_type, callback){
    var bq = google.bigquery({auth: authClient, version: 'v2'});
    var params = {
        projectId: config.projectId,
        datasetId: config.event_tables[event_type].datasetId,
        resource: {
            kind: "bigquery#table",
            tableReference: {
                projectId: config.projectId,
                datasetId: config.event_tables[event_type].datasetId,
                tableId: config.event_tables[event_type].tableId
            },
            schema: config.event_tables[event_type].schema
        }
    };
    bq.tables.insert(params, function(err, response){
        if (err) return callback(err);
        return callback(null, response);
    });
}

/**
 * Helper to get full BigQuery config for logging transport
 *
 * Automatically joins in default HTTP logging config so there's
 * no need to define redundantly in every repo.
 *
 * Simply adds any keys found in child config to base config and returns joined
 * objects, no fancy logic.
 *
 * As such, any keys specified in child config WILL OVERRIDE base config values.
 *
 * @param [path='./bq_config.json'] - if null, will default to './bq_config.json'
 * @param [baseConfigPath='/google/bq_config.json'] - path to base config file in config repo.
 *      If null, will default to '/google/bq_config.json'
 */
function loadFullBigQueryConfig(path, baseConfigPath){
    path = path || './bq_config.json';
    baseConfigPath = baseConfigPath || '/google/bq_config.json';
    var base_config_dir = process.env["NODE_CONFIG_DIR"] || './config';
    var base_config = JSON.parse(fs.readFileSync(base_config_dir + baseConfigPath, 'utf8'));
    var child_config = JSON.parse(fs.readFileSync(path, 'utf8'));
    var child_event_tables = child_config.event_tables;
    if (child_event_tables){
        for (var event in child_event_tables){
            if (child_event_tables.hasOwnProperty(event)){
                base_config.event_tables[event] = child_event_tables[event];
            }
        }
    }
    if (child_config.projectId){
        base_config.projectId = child_config.projectId;
    }
    return base_config;
}

/**
 * BigQuery implementation of EventStreamer.
 *
 * Streams log lines into BigQuery tables using insertAll() API command,
 * using Redis to store lines locally to reduce the number of calls required.
 *
 * ## USAGE
 * This implementation requires external configuration file defining field schema
 * for each "type" of event being logged, as BQ table expects fixed field schema
 * with rigid typing.  It is recommended that you write custom logging
 * methods to enforce each event-type schema for desired events.
 *
 * ## Ex:
 * Given a config file containing:
 *
 * ```
 * {"projectId": "project-id-781",
 *  "event_tables": {
 *   "SOME_EVENT": {
 *     "tableId": "test_events",
 *     "datasetId": "test_events_dataset",
 *     "schema": {
 *       "fields": [
 *         {"name": "level","type": "STRING"},
 *         {"name": "msg","type": "STRING"},
 *         {"name": "tstamp","type": "TIMESTAMP"},
 *         {"name": "val1","type": "STRING"},
 *         {"name": "val2","type": "STRING"}
 *       ]
 *     }
 *   }
 * }
 * ```
 *
 * You would log SOME_EVENT type events to the table "test_events" using the following
 * call to the `log` method:
 *
 * ```
 * $ logger.log("info", msg, {type: "SOME_EVENT, val2: "a val", val2: "another val"})
 * ```
 *
 * IMPORTANT: If the `meta` key `type` is not found, or the given `type` is not configured in the
 * provided config file, the event will be ignored in this transport.
 *
 * @todo: Add support for OAuth2.0, Compute authorization
 *
 * @class
 * @param  {String} configFile - path to BigQuery event schema config file.  See specifications
 *         for this file above.
 * @param  {String} - Path to JWT secrets JSON file.
 * @param  {Number} chunkSize - number of rows to cache per event-type before sending to BigQuery.
 *         Default is 500 (maxRequestSize)
 * @type {Function}
 */
var BigQueryEventStreamer = exports.BigQueryEventStreamer = function(config, jwtSecretsFile, chunkSize){

    // parse config file parameters
    transports.EventStreamer.call(this, {});
    //var config          =   JSON.parse(fs.readFileSync(config, 'utf8'));
    this.projectId      =   config.projectId;
    this.event_tables   =   config.event_tables;
    this.jwtSecretsFile    =   jwtSecretsFile;
    this.google_api_version = 'v2';

    // this is a hard limit on length of BigQuery insertAll row count,
    // API will throw an error if more rows are sent in a request
    this.maxRequestSize =   500;
    // no magic to this number but shouldn't be much less in order to
    // avoid bombarding Google API with requests
    this.minChunkSize   =   1;
    this.chunkSize      =   chunkSize || this.maxRequestSize;
    if (this.chunkSize < this.minChunkSize || chunkSize > this.maxRequestSize){
        var msg = "chunkSize must be between "+this.minChunkSize+" and "+this.maxRequestSize;
        throw new Error(msg);
    }
};
util.inherits(BigQueryEventStreamer, transports.EventStreamer);

/**
 * Executes API call to `BigQuery.insertAll()`
 *
 * Generates a single HTTP POST request to Google BigQuery REST endpoint
 * containing all rows in cache, writing to table & dataset specified in config
 * per event_type
 *
 * @param {String} event_type event type indicated by `meta['type']` val passed to `log` method
 * @param {Array} cached_rows array of row objects
 * @param {Function} callback
 */
BigQueryEventStreamer.prototype.streamEvents = function(event_type, cached_rows, callback){
    var self = this;
    auth.getJWTAuthClient(self.jwtSecretsFile, BIGQUERY_SCOPE,function(err, auth){
        var bq = google.bigquery({auth: auth, version: self.google_api_version});
        var event_table = self.event_tables[event_type];
        var params = {
            projectId: self.projectId,
            datasetId: event_table.datasetId,
            tableId: event_table.tableId,
            resource: {kind: "bigquery#tableDataInsertAllRequest", rows: cached_rows}
        };
        bq.tabledata.insertAll(params, function(err, response){
            if (err) return callback(err);
            if (!response.insertErrors){
                return callback(null, response)
            } else {
                return callback('API errors pushing log cache: ' +
                JSON.stringify(response.insertErrors))
            }
        });
    });
};

/**
 * Hook to parse config file into eventSchema format accepted by transport
 * class.
 */
BigQueryEventStreamer.prototype.getEventSchema = function(){
    var self = this;
    var event_schema = {};
    for (var etype in self.event_tables){
        var fields = [];
        if (self.event_tables.hasOwnProperty(etype)){
            self.event_tables[etype].schema.fields.forEach(function(item){
                fields.push(item.name);
            });
            event_schema[etype] = fields;
        }
    }
    return event_schema;
};