AuditLogHiveTableParser.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.tools.dynamometer.workloadgenerator.audit;

import java.io.IOException;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;

/**
 * This {@link AuditCommandParser} is used to read commands assuming that the
 * input was generated by a Hive query storing uncompressed output files, in
 * which fields should be separated by the start-of-heading (U+0001) character.
 * The fields available should be, in order:
 *
 * <pre>
 *   relativeTimestampMs,ugi,command,src,dest,sourceIP
 * </pre>
 *
 * Where relativeTimestampMs represents the time elapsed between the start of
 * the audit log and the occurrence of the audit event. Assuming your audit logs
 * are available in Hive, this can be generated with a query looking like:
 *
 * <pre>
 *   INSERT OVERWRITE DIRECTORY '${outputPath}'
 *   SELECT (timestamp - ${startTime} AS relTime, ugi, cmd, src, dst, ip
 *   FROM '${auditLogTableLocation}'
 *   WHERE
 *     timestamp {@literal >=} ${startTime}
 *     AND timestamp {@literal <} ${endTime}
 *   DISTRIBUTE BY src
 *   SORT BY relTime ASC;
 * </pre>
 *
 * Note that the sorting step is important; events in each distinct file must be
 * in time-ascending order.
 */
public class AuditLogHiveTableParser implements AuditCommandParser {

  private static final String FIELD_SEPARATOR = "\u0001";

  @Override
  public void initialize(Configuration conf) throws IOException {
    // Nothing to be done
  }

  @Override
  public AuditReplayCommand parse(Long sequence, Text inputLine,
      Function<Long, Long> relativeToAbsolute) throws IOException {
    String[] fields = inputLine.toString().split(FIELD_SEPARATOR);
    long absoluteTimestamp = relativeToAbsolute
        .apply(Long.parseLong(fields[0]));
    return new AuditReplayCommand(sequence, absoluteTimestamp, fields[1], fields[2],
        fields[3], fields[4], fields[5]);
  }

}