FlowActivityRowKey.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.yarn.server.timelineservice.storage.flow;

import java.util.List;

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineSchemaUtils;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString;
import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator;

/**
 * Represents a rowkey for the flow activity table.
 */
public class FlowActivityRowKey {

  private final String clusterId;
  private final Long dayTs;
  private final String userId;
  private final String flowName;
  private final FlowActivityRowKeyConverter
      flowActivityRowKeyConverter = new FlowActivityRowKeyConverter();

  /**
   * @param clusterId identifying the cluster
   * @param dayTs to be converted to the top of the day timestamp
   * @param userId identifying user
   * @param flowName identifying the flow
   */
  public FlowActivityRowKey(String clusterId, Long dayTs, String userId,
      String flowName) {
    this(clusterId, dayTs, userId, flowName, true);
  }

  /**
   * @param clusterId identifying the cluster
   * @param timestamp when the flow activity happened. May be converted to the
   *          top of the day depending on the convertDayTsToTopOfDay argument.
   * @param userId identifying user
   * @param flowName identifying the flow
   * @param convertDayTsToTopOfDay if true and timestamp isn't null, then
   *          timestamp will be converted to the top-of-the day timestamp
   */
  protected FlowActivityRowKey(String clusterId, Long timestamp, String userId,
      String flowName, boolean convertDayTsToTopOfDay) {
    this.clusterId = clusterId;
    if (convertDayTsToTopOfDay && (timestamp != null)) {
      this.dayTs = HBaseTimelineSchemaUtils.getTopOfTheDayTimestamp(timestamp);
    } else {
      this.dayTs = timestamp;
    }
    this.userId = userId;
    this.flowName = flowName;
  }

  public String getClusterId() {
    return clusterId;
  }

  public Long getDayTimestamp() {
    return dayTs;
  }

  public String getUserId() {
    return userId;
  }

  public String getFlowName() {
    return flowName;
  }

  /**
   * Constructs a row key for the flow activity table as follows:
   * {@code clusterId!dayTimestamp!user!flowName}.
   *
   * @return byte array for the row key
   */
  public byte[] getRowKey() {
    return flowActivityRowKeyConverter.encode(this);
  }

  /**
   * Given the raw row key as bytes, returns the row key as an object.
   *
   * @param rowKey Byte representation of row key.
   * @return A <cite>FlowActivityRowKey</cite> object.
   */
  public static FlowActivityRowKey parseRowKey(byte[] rowKey) {
    return new FlowActivityRowKeyConverter().decode(rowKey);
  }

  /**
   * Constructs a row key for the flow activity table as follows:
   * {@code clusterId!dayTimestamp!user!flowName}.
   * @return String representation of row key
   */
  public String getRowKeyAsString() {
    return flowActivityRowKeyConverter.encodeAsString(this);
  }

  /**
   * Given the raw row key as string, returns the row key as an object.
   * @param encodedRowKey String representation of row key.
   * @return A <cite>FlowActivityRowKey</cite> object.
   */
  public static FlowActivityRowKey parseRowKeyFromString(String encodedRowKey) {
    return new FlowActivityRowKeyConverter().decodeFromString(encodedRowKey);
  }

  /**
   * Encodes and decodes row key for flow activity table. The row key is of the
   * form : clusterId!dayTimestamp!user!flowName. dayTimestamp(top of the day
   * timestamp) is a long and rest are strings.
   * <p>
   */
  final private static class FlowActivityRowKeyConverter
      implements KeyConverter<FlowActivityRowKey>,
      KeyConverterToString<FlowActivityRowKey> {

    private FlowActivityRowKeyConverter() {
    }

    /**
     * The flow activity row key is of the form
     * clusterId!dayTimestamp!user!flowName with each segment separated by !.
     * The sizes below indicate sizes of each one of these segements in
     * sequence. clusterId, user and flowName are strings. Top of the day
     * timestamp is a long hence 8 bytes in size. Strings are variable in size
     * (i.e. they end whenever separator is encountered). This is used while
     * decoding and helps in determining where to split.
     */
    private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE,
        Bytes.SIZEOF_LONG, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE };

    /*
     * (non-Javadoc)
     *
     * Encodes FlowActivityRowKey object into a byte array with each
     * component/field in FlowActivityRowKey separated by Separator#QUALIFIERS.
     * This leads to an flow activity table row key of the form
     * clusterId!dayTimestamp!user!flowName. If dayTimestamp in passed
     * FlowActivityRowKey object is null and clusterId is not null, then this
     * returns a row key prefix as clusterId! and if userId in
     * FlowActivityRowKey is null (and the fields preceding it i.e. clusterId
     * and dayTimestamp are not null), this returns a row key prefix as
     * clusterId!dayTimeStamp! dayTimestamp is inverted while encoding as it
     * helps maintain a descending order for row keys in flow activity table.
     *
     * @see org.apache.hadoop.yarn.server.timelineservice.storage.common
     * .KeyConverter#encode(java.lang.Object)
     */
    @Override
    public byte[] encode(FlowActivityRowKey rowKey) {
      if (rowKey.getDayTimestamp() == null) {
        return Separator.QUALIFIERS.join(Separator.encode(
            rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
            Separator.QUALIFIERS), Separator.EMPTY_BYTES);
      }
      if (rowKey.getUserId() == null) {
        return Separator.QUALIFIERS.join(Separator.encode(
            rowKey.getClusterId(), Separator.SPACE, Separator.TAB,
            Separator.QUALIFIERS), Bytes.toBytes(LongConverter
            .invertLong(rowKey.getDayTimestamp())), Separator.EMPTY_BYTES);
      }
      return Separator.QUALIFIERS.join(Separator.encode(rowKey.getClusterId(),
          Separator.SPACE, Separator.TAB, Separator.QUALIFIERS), Bytes
          .toBytes(LongConverter.invertLong(rowKey.getDayTimestamp())),
          Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB,
              Separator.QUALIFIERS), Separator.encode(rowKey.getFlowName(),
              Separator.SPACE, Separator.TAB, Separator.QUALIFIERS));
    }

    /*
     * (non-Javadoc)
     *
     * @see
     * org.apache.hadoop.yarn.server.timelineservice.storage.common
     * .KeyConverter#decode(byte[])
     */
    @Override
    public FlowActivityRowKey decode(byte[] rowKey) {
      byte[][] rowKeyComponents =
          Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES);
      if (rowKeyComponents.length != 4) {
        throw new IllegalArgumentException("the row key is not valid for "
            + "a flow activity");
      }
      String clusterId =
          Separator.decode(Bytes.toString(rowKeyComponents[0]),
              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
      Long dayTs = LongConverter.invertLong(Bytes.toLong(rowKeyComponents[1]));
      String userId =
          Separator.decode(Bytes.toString(rowKeyComponents[2]),
              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
      String flowName =
          Separator.decode(Bytes.toString(rowKeyComponents[3]),
              Separator.QUALIFIERS, Separator.TAB, Separator.SPACE);
      return new FlowActivityRowKey(clusterId, dayTs, userId, flowName);
    }

    @Override
    public String encodeAsString(FlowActivityRowKey key) {
      if (key.getDayTimestamp() == null) {
        return TimelineReaderUtils
            .joinAndEscapeStrings(new String[] {key.clusterId});
      } else if (key.getUserId() == null) {
        return TimelineReaderUtils.joinAndEscapeStrings(
            new String[] {key.clusterId, key.dayTs.toString()});
      } else if (key.getFlowName() == null) {
        return TimelineReaderUtils.joinAndEscapeStrings(
            new String[] {key.clusterId, key.dayTs.toString(), key.userId});
      }
      return TimelineReaderUtils.joinAndEscapeStrings(new String[] {
          key.clusterId, key.dayTs.toString(), key.userId, key.flowName});
    }

    @Override
    public FlowActivityRowKey decodeFromString(String encodedRowKey) {
      List<String> split = TimelineReaderUtils.split(encodedRowKey);
      if (split == null || split.size() != 4) {
        throw new IllegalArgumentException(
            "Invalid row key for flow activity.");
      }
      Long dayTs = Long.valueOf(split.get(1));
      return new FlowActivityRowKey(split.get(0), dayTs, split.get(2),
          split.get(3));
    }
  }
}