RollingLevelDBTimelineStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.timeline;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;

import org.apache.commons.collections4.map.LRUMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.timeline.RollingLevelDB.RollingWriteBatch;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;

import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.ReadOptions;
import org.iq80.leveldb.WriteBatch;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.FSTClazzNameRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.nio.charset.StandardCharsets.UTF_8;

import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
import static org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.prefixMatches;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_TTL_MS;

import static org.fusesource.leveldbjni.JniDBFactory.bytes;

/**
 * <p>
 * An implementation of an application timeline store backed by leveldb.
 * </p>
 *
 * <p>
 * There are three sections of the db, the start time section, the entity
 * section, and the indexed entity section.
 * </p>
 *
 * <p>
 * The start time section is used to retrieve the unique start time for a given
 * entity. Its values each contain a start time while its keys are of the form:
 * </p>
 *
 * <pre>
 *   START_TIME_LOOKUP_PREFIX + entity type + entity id
 * </pre>
 *
 * <p>
 * The entity section is ordered by entity type, then entity start time
 * descending, then entity ID. There are four sub-sections of the entity
 * section: events, primary filters, related entities, and other info. The event
 * entries have event info serialized into their values. The other info entries
 * have values corresponding to the values of the other info name/value map for
 * the entry (note the names are contained in the key). All other entries have
 * empty values. The key structure is as follows:
 * </p>
 *
 * <pre>
 *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id
 *
 *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
 *     DOMAIN_ID_COLUMN
 *
 *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
 *     EVENTS_COLUMN + reveventtimestamp + eventtype
 *
 *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
 *     PRIMARY_FILTERS_COLUMN + name + value
 *
 *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
 *     OTHER_INFO_COLUMN + name
 *
 *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
 *     RELATED_ENTITIES_COLUMN + relatedentity type + relatedentity id
 * </pre>
 *
 * <p>
 * The indexed entity section contains a primary filter name and primary filter
 * value as the prefix. Within a given name/value, entire entity entries are
 * stored in the same format as described in the entity section above (below,
 * "key" represents any one of the possible entity entry keys described above).
 * </p>
 *
 * <pre>
 *   INDEXED_ENTRY_PREFIX + primaryfilter name + primaryfilter value +
 *     key
 * </pre>
 */
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class RollingLevelDBTimelineStore extends AbstractService implements
    TimelineStore {
  private static final Logger LOG = LoggerFactory
      .getLogger(RollingLevelDBTimelineStore.class);
  private static FSTConfiguration fstConf =
      FSTConfiguration.createDefaultConfiguration();
  // Fall back to 2.24 parsing if 2.50 parsing fails
  private static FSTConfiguration fstConf224 =
      FSTConfiguration.createDefaultConfiguration();
  // Static class code for 2.24
  private static final int LINKED_HASH_MAP_224_CODE = 83;

  static {
    fstConf.setShareReferences(false);
    fstConf224.setShareReferences(false);
    // YARN-6654 unable to find class for code 83 (LinkedHashMap)
    // The linked hash map was changed between 2.24 and 2.50 so that
    // the static code for LinkedHashMap (83) was changed to a dynamic
    // code.
    FSTClazzNameRegistry registry = fstConf224.getClassRegistry();
    registry.registerClass(
        LinkedHashMap.class, LINKED_HASH_MAP_224_CODE, fstConf224);
  }

  @Private
  @VisibleForTesting
  static final String FILENAME = "leveldb-timeline-store";
  static final String DOMAIN = "domain-ldb";
  static final String ENTITY = "entity-ldb";
  static final String INDEX = "indexes-ldb";
  static final String STARTTIME = "starttime-ldb";
  static final String OWNER = "owner-ldb";

  @VisibleForTesting
  //Extension to FILENAME where backup will be stored in case we need to
  //call LevelDb recovery
  static final String BACKUP_EXT = ".backup-";

  private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(UTF_8);
  private static final byte[] EVENTS_COLUMN = "e".getBytes(UTF_8);
  private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(UTF_8);
  private static final byte[] OTHER_INFO_COLUMN = "i".getBytes(UTF_8);
  private static final byte[] RELATED_ENTITIES_COLUMN = "r".getBytes(UTF_8);

  private static final byte[] DESCRIPTION_COLUMN = "d".getBytes(UTF_8);
  private static final byte[] OWNER_COLUMN = "o".getBytes(UTF_8);
  private static final byte[] READER_COLUMN = "r".getBytes(UTF_8);
  private static final byte[] WRITER_COLUMN = "w".getBytes(UTF_8);
  private static final byte[] TIMESTAMP_COLUMN = "t".getBytes(UTF_8);

  private static final byte[] EMPTY_BYTES = new byte[0];

  private static final String TIMELINE_STORE_VERSION_KEY =
      "timeline-store-version";

  private static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 0);

  private static long writeBatchSize = 10000;

  @Private
  @VisibleForTesting
  static final FsPermission LEVELDB_DIR_UMASK = FsPermission
      .createImmutable((short) 0700);

  private Map<EntityIdentifier, Long> startTimeWriteCache;
  private Map<EntityIdentifier, Long> startTimeReadCache;

  private DB domaindb;
  private RollingLevelDB entitydb;
  private RollingLevelDB indexdb;
  private DB starttimedb;
  private DB ownerdb;

  private Thread deletionThread;

  public RollingLevelDBTimelineStore() {
    super(RollingLevelDBTimelineStore.class.getName());
  }

  private JniDBFactory factory;
  @VisibleForTesting
  void setFactory(JniDBFactory fact) {
    this.factory = fact;
  }

  @Override
  @SuppressWarnings("unchecked")
  protected void serviceInit(Configuration conf) throws Exception {
    Preconditions
        .checkArgument(conf.getLong(TIMELINE_SERVICE_TTL_MS,
            DEFAULT_TIMELINE_SERVICE_TTL_MS) > 0,
            "%s property value should be greater than zero",
            TIMELINE_SERVICE_TTL_MS);
    Preconditions.checkArgument(conf.getLong(
        TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS) > 0,
        "%s property value should be greater than zero",
        TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
    Preconditions.checkArgument(conf.getLong(
        TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE) >= 0,
        "%s property value should be greater than or equal to zero",
        TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE);
    Preconditions.checkArgument(conf.getLong(
        TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE) > 0,
        " %s property value should be greater than zero",
        TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
    Preconditions.checkArgument(conf.getLong(
        TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE) > 0,
        "%s property value should be greater than zero",
        TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
    Preconditions.checkArgument(conf.getLong(
        TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES) > 0,
        "%s property value should be greater than zero",
        TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES);
    Preconditions.checkArgument(conf.getLong(
        TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE) > 0,
        "%s property value should be greater than zero",
        TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE);

    Options options = new Options();
    options.createIfMissing(true);
    options.cacheSize(conf.getLong(
        TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
    if(factory == null) {
      factory = new JniDBFactory();
    }
    Path dbPath = new Path(
        conf.get(TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
    Path domainDBPath = new Path(dbPath, DOMAIN);
    Path starttimeDBPath = new Path(dbPath, STARTTIME);
    Path ownerDBPath = new Path(dbPath, OWNER);
    try (FileSystem localFS = FileSystem.getLocal(conf)) {
      if (!localFS.exists(dbPath)) {
        if (!localFS.mkdirs(dbPath)) {
          throw new IOException("Couldn't create directory for leveldb "
              + "timeline store " + dbPath);
        }
        localFS.setPermission(dbPath, LEVELDB_DIR_UMASK);
      }
      if (!localFS.exists(domainDBPath)) {
        if (!localFS.mkdirs(domainDBPath)) {
          throw new IOException("Couldn't create directory for leveldb "
              + "timeline store " + domainDBPath);
        }
        localFS.setPermission(domainDBPath, LEVELDB_DIR_UMASK);
      }
      if (!localFS.exists(starttimeDBPath)) {
        if (!localFS.mkdirs(starttimeDBPath)) {
          throw new IOException("Couldn't create directory for leveldb "
              + "timeline store " + starttimeDBPath);
        }
        localFS.setPermission(starttimeDBPath, LEVELDB_DIR_UMASK);
      }
      if (!localFS.exists(ownerDBPath)) {
        if (!localFS.mkdirs(ownerDBPath)) {
          throw new IOException("Couldn't create directory for leveldb "
              + "timeline store " + ownerDBPath);
        }
        localFS.setPermission(ownerDBPath, LEVELDB_DIR_UMASK);
      }
    }
    options.maxOpenFiles(conf.getInt(
        TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES));
    options.writeBufferSize(conf.getInt(
        TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE));
    LOG.info("Using leveldb path " + dbPath);
    domaindb = LeveldbUtils.loadOrRepairLevelDb(factory, domainDBPath, options);
    entitydb = new RollingLevelDB(ENTITY);
    entitydb.init(conf);
    indexdb = new RollingLevelDB(INDEX);
    indexdb.init(conf);
    starttimedb = LeveldbUtils.loadOrRepairLevelDb(factory, starttimeDBPath, options);
    ownerdb = LeveldbUtils.loadOrRepairLevelDb(factory, ownerDBPath, options);
    checkVersion();
    startTimeWriteCache = Collections.synchronizedMap(new LRUMap(
        getStartTimeWriteCacheSize(conf)));
    startTimeReadCache = Collections.synchronizedMap(new LRUMap(
        getStartTimeReadCacheSize(conf)));

    writeBatchSize = conf.getInt(
        TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE,
        DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE);

    super.serviceInit(conf);
  }

  @Override
  protected void serviceStart() throws Exception {
    if (getConfig().getBoolean(TIMELINE_SERVICE_TTL_ENABLE, true)) {
      deletionThread = new EntityDeletionThread(getConfig());
      deletionThread.start();
    }
    super.serviceStart();
  }

  @Override
  protected void serviceStop() throws Exception {
    if (deletionThread != null) {
      deletionThread.interrupt();
      LOG.info("Waiting for deletion thread to complete its current action");
      try {
        deletionThread.join();
      } catch (InterruptedException e) {
        LOG.warn("Interrupted while waiting for deletion thread to complete,"
            + " closing db now", e);
      }
    }
    IOUtils.cleanupWithLogger(LOG, domaindb);
    IOUtils.cleanupWithLogger(LOG, starttimedb);
    IOUtils.cleanupWithLogger(LOG, ownerdb);
    entitydb.stop();
    indexdb.stop();
    super.serviceStop();
  }

  private class EntityDeletionThread extends Thread {
    private final long ttl;
    private final long ttlInterval;

    EntityDeletionThread(Configuration conf) {
      ttl = conf.getLong(TIMELINE_SERVICE_TTL_MS,
          DEFAULT_TIMELINE_SERVICE_TTL_MS);
      ttlInterval = conf.getLong(
          TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
          DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
      LOG.info("Starting deletion thread with ttl " + ttl + " and cycle "
          + "interval " + ttlInterval);
    }

    @Override
    public void run() {
      Thread.currentThread().setName("Leveldb Timeline Store Retention");
      while (true) {
        long timestamp = System.currentTimeMillis() - ttl;
        try {
          discardOldEntities(timestamp);
          Thread.sleep(ttlInterval);
        } catch (IOException e) {
          LOG.error(e.toString());
        } catch (InterruptedException e) {
          LOG.info("Deletion thread received interrupt, exiting");
          break;
        }
      }
    }
  }

  @Override
  public TimelineEntity getEntity(String entityId, String entityType,
      EnumSet<Field> fields) throws IOException {
    Long revStartTime = getStartTimeLong(entityId, entityType);
    if (revStartTime == null) {
      LOG.debug("Could not find start time for {} {} ", entityType, entityId);
      return null;
    }
    byte[] prefix = KeyBuilder.newInstance().add(entityType)
        .add(writeReverseOrderedLong(revStartTime)).add(entityId)
        .getBytesForLookup();

    DB db = entitydb.getDBForStartTime(revStartTime);
    if (db == null) {
      LOG.debug("Could not find db for {} {} ", entityType, entityId);
      return null;
    }
    try (DBIterator iterator = db.iterator()) {
      iterator.seek(prefix);

      return getEntity(entityId, entityType, revStartTime, fields, iterator,
          prefix, prefix.length);
    }
  }

  /**
   * Read entity from a db iterator. If no information is found in the specified
   * fields for this entity, return null.
   */
  private static TimelineEntity getEntity(String entityId, String entityType,
      Long startTime, EnumSet<Field> fields, DBIterator iterator,
      byte[] prefix, int prefixlen) throws IOException {
    if (fields == null) {
      fields = EnumSet.allOf(Field.class);
    }

    TimelineEntity entity = new TimelineEntity();
    boolean events = false;
    boolean lastEvent = false;
    if (fields.contains(Field.EVENTS)) {
      events = true;
    } else if (fields.contains(Field.LAST_EVENT_ONLY)) {
      lastEvent = true;
    } else {
      entity.setEvents(null);
    }
    boolean relatedEntities = false;
    if (fields.contains(Field.RELATED_ENTITIES)) {
      relatedEntities = true;
    } else {
      entity.setRelatedEntities(null);
    }
    boolean primaryFilters = false;
    if (fields.contains(Field.PRIMARY_FILTERS)) {
      primaryFilters = true;
    } else {
      entity.setPrimaryFilters(null);
    }
    boolean otherInfo = false;
    if (fields.contains(Field.OTHER_INFO)) {
      otherInfo = true;
    } else {
      entity.setOtherInfo(null);
    }

    // iterate through the entity's entry, parsing information if it is part
    // of a requested field
    for (; iterator.hasNext(); iterator.next()) {
      byte[] key = iterator.peekNext().getKey();
      if (!prefixMatches(prefix, prefixlen, key)) {
        break;
      }
      if (key.length == prefixlen) {
        continue;
      }
      if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
        if (primaryFilters) {
          addPrimaryFilter(entity, key, prefixlen
              + PRIMARY_FILTERS_COLUMN.length);
        }
      } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
        if (otherInfo) {
          Object o = null;
          String keyStr = parseRemainingKey(key,
              prefixlen + OTHER_INFO_COLUMN.length);
          try {
            o = fstConf.asObject(iterator.peekNext().getValue());
            entity.addOtherInfo(keyStr, o);
          } catch (Exception ignore) {
            try {
              // Fall back to 2.24 parser
              o = fstConf224.asObject(iterator.peekNext().getValue());
              entity.addOtherInfo(keyStr, o);
            } catch (Exception e) {
              LOG.warn("Error while decoding "
                  + entityId + ":otherInfo:" + keyStr, e);
            }
          }
        }
      } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
        if (relatedEntities) {
          addRelatedEntity(entity, key, prefixlen
              + RELATED_ENTITIES_COLUMN.length);
        }
      } else if (key[prefixlen] == EVENTS_COLUMN[0]) {
        if (events || (lastEvent && entity.getEvents().size() == 0)) {
          TimelineEvent event = getEntityEvent(null, key, prefixlen
              + EVENTS_COLUMN.length, iterator.peekNext().getValue());
          if (event != null) {
            entity.addEvent(event);
          }
        }
      } else if (key[prefixlen] == DOMAIN_ID_COLUMN[0]) {
        byte[] v = iterator.peekNext().getValue();
        String domainId = new String(v, UTF_8);
        entity.setDomainId(domainId);
      } else {
        LOG.warn(String.format("Found unexpected column for entity %s of "
            + "type %s (0x%02x)", entityId, entityType, key[prefixlen]));
      }
    }

    entity.setEntityId(entityId);
    entity.setEntityType(entityType);
    entity.setStartTime(startTime);

    return entity;
  }

  @Override
  public TimelineEvents getEntityTimelines(String entityType,
      SortedSet<String> entityIds, Long limit, Long windowStart,
      Long windowEnd, Set<String> eventType) throws IOException {
    TimelineEvents events = new TimelineEvents();
    if (entityIds == null || entityIds.isEmpty()) {
      return events;
    }
    // create a lexicographically-ordered map from start time to entities
    Map<byte[], List<EntityIdentifier>> startTimeMap =
        new TreeMap<byte[], List<EntityIdentifier>>(
        new Comparator<byte[]>() {
          @Override
          public int compare(byte[] o1, byte[] o2) {
            return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0,
                o2.length);
          }
        });

      // look up start times for the specified entities
      // skip entities with no start time
    for (String entityId : entityIds) {
      byte[] startTime = getStartTime(entityId, entityType);
      if (startTime != null) {
        List<EntityIdentifier> entities = startTimeMap.get(startTime);
        if (entities == null) {
          entities = new ArrayList<EntityIdentifier>();
          startTimeMap.put(startTime, entities);
        }
        entities.add(new EntityIdentifier(entityId, entityType));
      }
    }
    for (Entry<byte[], List<EntityIdentifier>> entry : startTimeMap
          .entrySet()) {
      // look up the events matching the given parameters (limit,
      // start time, end time, event types) for entities whose start times
      // were found and add the entities to the return list
      byte[] revStartTime = entry.getKey();
      for (EntityIdentifier entityIdentifier : entry.getValue()) {
        EventsOfOneEntity entity = new EventsOfOneEntity();
        entity.setEntityId(entityIdentifier.getId());
        entity.setEntityType(entityType);
        events.addEvent(entity);
        KeyBuilder kb = KeyBuilder.newInstance().add(entityType)
            .add(revStartTime).add(entityIdentifier.getId())
            .add(EVENTS_COLUMN);
        byte[] prefix = kb.getBytesForLookup();
        if (windowEnd == null) {
          windowEnd = Long.MAX_VALUE;
        }
        byte[] revts = writeReverseOrderedLong(windowEnd);
        kb.add(revts);
        byte[] first = kb.getBytesForLookup();
        byte[] last = null;
        if (windowStart != null) {
          last = KeyBuilder.newInstance().add(prefix)
              .add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
        }
        if (limit == null) {
          limit = DEFAULT_LIMIT;
        }
        DB db = entitydb.getDBForStartTime(readReverseOrderedLong(
            revStartTime, 0));
        if (db == null) {
          continue;
        }
        try (DBIterator iterator = db.iterator()) {
          for (iterator.seek(first); entity.getEvents().size() < limit
              && iterator.hasNext(); iterator.next()) {
            byte[] key = iterator.peekNext().getKey();
            if (!prefixMatches(prefix, prefix.length, key)
                || (last != null && WritableComparator.compareBytes(key, 0,
                key.length, last, 0, last.length) > 0)) {
              break;
            }
            TimelineEvent event = getEntityEvent(eventType, key, prefix.length,
                iterator.peekNext().getValue());
            if (event != null) {
              entity.addEvent(event);
            }
          }
        }
      }
    }
    return events;
  }

  @Override
  public TimelineEntities getEntities(String entityType, Long limit,
      Long windowStart, Long windowEnd, String fromId, Long fromTs,
      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
      EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
    if (primaryFilter == null) {
      // if no primary filter is specified, prefix the lookup with
      // ENTITY_ENTRY_PREFIX
      return getEntityByTime(EMPTY_BYTES, entityType, limit, windowStart,
          windowEnd, fromId, fromTs, secondaryFilters, fields, checkAcl, false);
    } else {
      // if a primary filter is specified, prefix the lookup with
      // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
      // ENTITY_ENTRY_PREFIX
      byte[] base = KeyBuilder.newInstance().add(primaryFilter.getName())
          .add(fstConf.asByteArray(primaryFilter.getValue()), true)
          .getBytesForLookup();
      return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
          fromId, fromTs, secondaryFilters, fields, checkAcl, true);
    }
  }

  /**
   * Retrieves a list of entities satisfying given parameters.
   *
   * @param base
   *          A byte array prefix for the lookup
   * @param entityType
   *          The type of the entity
   * @param limit
   *          A limit on the number of entities to return
   * @param starttime
   *          The earliest entity start time to retrieve (exclusive)
   * @param endtime
   *          The latest entity start time to retrieve (inclusive)
   * @param fromId
   *          Retrieve entities starting with this entity
   * @param fromTs
   *          Ignore entities with insert timestamp later than this ts
   * @param secondaryFilters
   *          Filter pairs that the entities should match
   * @param fields
   *          The set of fields to retrieve
   * @param usingPrimaryFilter
   *          true if this query is using a primary filter
   * @return A list of entities
   * @throws IOException
   */
  private TimelineEntities getEntityByTime(byte[] base, String entityType,
      Long limit, Long starttime, Long endtime, String fromId, Long fromTs,
      Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields,
      CheckAcl checkAcl, boolean usingPrimaryFilter) throws IOException {
    KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
    // only db keys matching the prefix (base + entity type) will be parsed
    byte[] prefix = kb.getBytesForLookup();
    if (endtime == null) {
      // if end time is null, place no restriction on end time
      endtime = Long.MAX_VALUE;
    }

    // Sanitize the fields parameter
    if (fields == null) {
      fields = EnumSet.allOf(Field.class);
    }

    // construct a first key that will be seeked to using end time or fromId
    long firstStartTime = Long.MAX_VALUE;
    byte[] first = null;
    if (fromId != null) {
      Long fromIdStartTime = getStartTimeLong(fromId, entityType);
      if (fromIdStartTime == null) {
        // no start time for provided id, so return empty entities
        return new TimelineEntities();
      }
      if (fromIdStartTime <= endtime) {
        // if provided id's start time falls before the end of the window,
        // use it to construct the seek key
        firstStartTime = fromIdStartTime;
        first = kb.add(writeReverseOrderedLong(fromIdStartTime)).add(fromId)
            .getBytesForLookup();
      }
    }
    // if seek key wasn't constructed using fromId, construct it using end ts
    if (first == null) {
      firstStartTime = endtime;
      first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
    }
    byte[] last = null;
    if (starttime != null) {
      // if start time is not null, set a last key that will not be
      // iterated past
      last = KeyBuilder.newInstance().add(base).add(entityType)
          .add(writeReverseOrderedLong(starttime)).getBytesForLookup();
    }
    if (limit == null) {
      // if limit is not specified, use the default
      limit = DEFAULT_LIMIT;
    }

    TimelineEntities entities = new TimelineEntities();
    RollingLevelDB rollingdb = null;
    if (usingPrimaryFilter) {
      rollingdb = indexdb;
    } else {
      rollingdb = entitydb;
    }

    DB db = rollingdb.getDBForStartTime(firstStartTime);
    while (entities.getEntities().size() < limit && db != null) {
      try (DBIterator iterator = db.iterator()) {
        iterator.seek(first);

        // iterate until one of the following conditions is met: limit is
        // reached, there are no more keys, the key prefix no longer matches,
        // or a start time has been specified and reached/exceeded
        while (entities.getEntities().size() < limit && iterator.hasNext()) {
          byte[] key = iterator.peekNext().getKey();
          if (!prefixMatches(prefix, prefix.length, key)
              || (last != null && WritableComparator.compareBytes(key, 0,
              key.length, last, 0, last.length) > 0)) {
            break;
          }
          // read the start time and entity id from the current key
          KeyParser kp = new KeyParser(key, prefix.length);
          Long startTime = kp.getNextLong();
          String entityId = kp.getNextString();

          if (fromTs != null) {
            long insertTime = readReverseOrderedLong(iterator.peekNext()
                .getValue(), 0);
            if (insertTime > fromTs) {
              byte[] firstKey = key;
              while (iterator.hasNext()) {
                key = iterator.peekNext().getKey();
                iterator.next();
                if (!prefixMatches(firstKey, kp.getOffset(), key)) {
                  break;
                }
              }
              continue;
            }
          }
          // Even if other info and primary filter fields are not included, we
          // still need to load them to match secondary filters when they are
          // non-empty
          EnumSet<Field> queryFields = EnumSet.copyOf(fields);
          boolean addPrimaryFilters = false;
          boolean addOtherInfo = false;
          if (secondaryFilters != null && secondaryFilters.size() > 0) {
            if (!queryFields.contains(Field.PRIMARY_FILTERS)) {
              queryFields.add(Field.PRIMARY_FILTERS);
              addPrimaryFilters = true;
            }
            if (!queryFields.contains(Field.OTHER_INFO)) {
              queryFields.add(Field.OTHER_INFO);
              addOtherInfo = true;
            }
          }

          // parse the entity that owns this key, iterating over all keys for
          // the entity
          TimelineEntity entity = null;
          if (usingPrimaryFilter) {
            entity = getEntity(entityId, entityType, queryFields);
            iterator.next();
          } else {
            entity = getEntity(entityId, entityType, startTime, queryFields,
                iterator, key, kp.getOffset());
          }

          if (entity != null) {
            // determine if the retrieved entity matches the provided secondary
            // filters, and if so add it to the list of entities to return
            boolean filterPassed = true;
            if (secondaryFilters != null) {
              for (NameValuePair filter : secondaryFilters) {
                Object v = entity.getOtherInfo().get(filter.getName());
                if (v == null) {
                  Set<Object> vs = entity.getPrimaryFilters()
                          .get(filter.getName());
                  if (vs == null || !vs.contains(filter.getValue())) {
                    filterPassed = false;
                    break;
                  }
                } else if (!v.equals(filter.getValue())) {
                  filterPassed = false;
                  break;
                }
              }
            }
            if (filterPassed) {
              if (entity.getDomainId() == null) {
                entity.setDomainId(DEFAULT_DOMAIN_ID);
              }
              if (checkAcl == null || checkAcl.check(entity)) {
                // Remove primary filter and other info if they are added for
                // matching secondary filters
                if (addPrimaryFilters) {
                  entity.setPrimaryFilters(null);
                }
                if (addOtherInfo) {
                  entity.setOtherInfo(null);
                }
                entities.addEntity(entity);
              }
            }
          }
        }
        db = rollingdb.getPreviousDB(db);
      }
    }
    return entities;
  }

  /**
   * Put a single entity. If there is an error, add a TimelinePutError to the
   * given response.
   *
   * @param entityUpdates
   *          a map containing all the scheduled writes for this put to the
   *          entity db
   * @param indexUpdates
   *          a map containing all the scheduled writes for this put to the
   *          index db
   */
  private long putEntities(TreeMap<Long, RollingWriteBatch> entityUpdates,
      TreeMap<Long, RollingWriteBatch> indexUpdates, TimelineEntity entity,
      TimelinePutResponse response) {

    long putCount = 0;
    List<EntityIdentifier> relatedEntitiesWithoutStartTimes =
        new ArrayList<EntityIdentifier>();
    byte[] revStartTime = null;
    Map<String, Set<Object>> primaryFilters = null;
    try {
      List<TimelineEvent> events = entity.getEvents();
      // look up the start time for the entity
      Long startTime = getAndSetStartTime(entity.getEntityId(),
          entity.getEntityType(), entity.getStartTime(), events);
      if (startTime == null) {
        // if no start time is found, add an error and return
        TimelinePutError error = new TimelinePutError();
        error.setEntityId(entity.getEntityId());
        error.setEntityType(entity.getEntityType());
        error.setErrorCode(TimelinePutError.NO_START_TIME);
        response.addError(error);
        return putCount;
      }

      // Must have a domain
      if (StringUtils.isEmpty(entity.getDomainId())) {
        TimelinePutError error = new TimelinePutError();
        error.setEntityId(entity.getEntityId());
        error.setEntityType(entity.getEntityType());
        error.setErrorCode(TimelinePutError.NO_DOMAIN);
        response.addError(error);
        return putCount;
      }

      revStartTime = writeReverseOrderedLong(startTime);
      long roundedStartTime = entitydb.computeCurrentCheckMillis(startTime);
      RollingWriteBatch rollingWriteBatch = entityUpdates.get(roundedStartTime);
      if (rollingWriteBatch == null) {
        DB db = entitydb.getDBForStartTime(startTime);
        if (db != null) {
          WriteBatch writeBatch = db.createWriteBatch();
          rollingWriteBatch = new RollingWriteBatch(db, writeBatch);
          entityUpdates.put(roundedStartTime, rollingWriteBatch);
        }
      }
      if (rollingWriteBatch == null) {
        // if no start time is found, add an error and return
        TimelinePutError error = new TimelinePutError();
        error.setEntityId(entity.getEntityId());
        error.setEntityType(entity.getEntityType());
        error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
        response.addError(error);
        return putCount;
      }
      WriteBatch writeBatch = rollingWriteBatch.getWriteBatch();

      // Save off the getBytes conversion to avoid unnecessary cost
      byte[] entityIdBytes = entity.getEntityId().getBytes(UTF_8);
      byte[] entityTypeBytes = entity.getEntityType().getBytes(UTF_8);
      byte[] domainIdBytes = entity.getDomainId().getBytes(UTF_8);

      // write entity marker
      byte[] markerKey = KeyBuilder.newInstance(3).add(entityTypeBytes, true)
          .add(revStartTime).add(entityIdBytes, true).getBytesForLookup();
      writeBatch.put(markerKey, EMPTY_BYTES);
      ++putCount;

      // write domain id entry
      byte[] domainkey = KeyBuilder.newInstance(4).add(entityTypeBytes, true)
          .add(revStartTime).add(entityIdBytes, true).add(DOMAIN_ID_COLUMN)
          .getBytes();
      writeBatch.put(domainkey, domainIdBytes);
      ++putCount;

      // write event entries
      if (events != null) {
        for (TimelineEvent event : events) {
          byte[] revts = writeReverseOrderedLong(event.getTimestamp());
          byte[] key = KeyBuilder.newInstance().add(entityTypeBytes, true)
              .add(revStartTime).add(entityIdBytes, true).add(EVENTS_COLUMN)
              .add(revts).add(event.getEventType().getBytes(UTF_8)).getBytes();
          byte[] value = fstConf.asByteArray(event.getEventInfo());
          writeBatch.put(key, value);
          ++putCount;
        }
      }

      // write primary filter entries
      primaryFilters = entity.getPrimaryFilters();
      if (primaryFilters != null) {
        for (Entry<String, Set<Object>> primaryFilter : primaryFilters
            .entrySet()) {
          for (Object primaryFilterValue : primaryFilter.getValue()) {
            byte[] key = KeyBuilder.newInstance(6).add(entityTypeBytes, true)
                .add(revStartTime).add(entityIdBytes, true)
                .add(PRIMARY_FILTERS_COLUMN).add(primaryFilter.getKey())
                .add(fstConf.asByteArray(primaryFilterValue)).getBytes();
            writeBatch.put(key, EMPTY_BYTES);
            ++putCount;
          }
        }
      }

      // write other info entries
      Map<String, Object> otherInfo = entity.getOtherInfo();
      if (otherInfo != null) {
        for (Entry<String, Object> info : otherInfo.entrySet()) {
          byte[] key = KeyBuilder.newInstance(5).add(entityTypeBytes, true)
              .add(revStartTime).add(entityIdBytes, true)
              .add(OTHER_INFO_COLUMN).add(info.getKey()).getBytes();
          byte[] value = fstConf.asByteArray(info.getValue());
          writeBatch.put(key, value);
          ++putCount;
        }
      }

      // write related entity entries
      Map<String, Set<String>> relatedEntities = entity.getRelatedEntities();
      if (relatedEntities != null) {
        for (Entry<String, Set<String>> relatedEntityList : relatedEntities
            .entrySet()) {
          String relatedEntityType = relatedEntityList.getKey();
          for (String relatedEntityId : relatedEntityList.getValue()) {
            // look up start time of related entity
            Long relatedStartTimeLong = getStartTimeLong(relatedEntityId,
                relatedEntityType);
            // delay writing the related entity if no start time is found
            if (relatedStartTimeLong == null) {
              relatedEntitiesWithoutStartTimes.add(new EntityIdentifier(
                  relatedEntityId, relatedEntityType));
              continue;
            }

            byte[] relatedEntityStartTime =
                writeReverseOrderedLong(relatedStartTimeLong);
            long relatedRoundedStartTime = entitydb
                .computeCurrentCheckMillis(relatedStartTimeLong);
            RollingWriteBatch relatedRollingWriteBatch = entityUpdates
                .get(relatedRoundedStartTime);
            if (relatedRollingWriteBatch == null) {
              DB db = entitydb.getDBForStartTime(relatedStartTimeLong);
              if (db != null) {
                WriteBatch relatedWriteBatch = db.createWriteBatch();
                relatedRollingWriteBatch = new RollingWriteBatch(db,
                    relatedWriteBatch);
                entityUpdates.put(relatedRoundedStartTime,
                    relatedRollingWriteBatch);
              }
            }
            if (relatedRollingWriteBatch == null) {
              // if no start time is found, add an error and return
              TimelinePutError error = new TimelinePutError();
              error.setEntityId(entity.getEntityId());
              error.setEntityType(entity.getEntityType());
              error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
              response.addError(error);
              continue;
            }
            // This is the existing entity
            byte[] relatedDomainIdBytes = relatedRollingWriteBatch.getDB().get(
                createDomainIdKey(relatedEntityId, relatedEntityType,
                    relatedEntityStartTime));
            // The timeline data created by the server before 2.6 won't have
            // the domain field. We assume this timeline data is in the
            // default timeline domain.
            String domainId = null;
            if (relatedDomainIdBytes == null) {
              domainId = TimelineDataManager.DEFAULT_DOMAIN_ID;
            } else {
              domainId = new String(relatedDomainIdBytes, UTF_8);
            }
            if (!domainId.equals(entity.getDomainId())) {
              // in this case the entity will be put, but the relation will be
              // ignored
              TimelinePutError error = new TimelinePutError();
              error.setEntityId(entity.getEntityId());
              error.setEntityType(entity.getEntityType());
              error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION);
              response.addError(error);
              continue;
            }
            // write "forward" entry (related entity -> entity)
            byte[] key = createRelatedEntityKey(relatedEntityId,
                relatedEntityType, relatedEntityStartTime,
                entity.getEntityId(), entity.getEntityType());
            WriteBatch relatedWriteBatch = relatedRollingWriteBatch
                .getWriteBatch();
            relatedWriteBatch.put(key, EMPTY_BYTES);
            ++putCount;
          }
        }
      }

      // write index entities
      RollingWriteBatch indexRollingWriteBatch = indexUpdates
          .get(roundedStartTime);
      if (indexRollingWriteBatch == null) {
        DB db = indexdb.getDBForStartTime(startTime);
        if (db != null) {
          WriteBatch indexWriteBatch = db.createWriteBatch();
          indexRollingWriteBatch = new RollingWriteBatch(db, indexWriteBatch);
          indexUpdates.put(roundedStartTime, indexRollingWriteBatch);
        }
      }
      if (indexRollingWriteBatch == null) {
        // if no start time is found, add an error and return
        TimelinePutError error = new TimelinePutError();
        error.setEntityId(entity.getEntityId());
        error.setEntityType(entity.getEntityType());
        error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
        response.addError(error);
        return putCount;
      }
      WriteBatch indexWriteBatch = indexRollingWriteBatch.getWriteBatch();
      putCount += writePrimaryFilterEntries(indexWriteBatch, primaryFilters,
          markerKey, EMPTY_BYTES);
    } catch (IOException e) {
      LOG.error("Error putting entity " + entity.getEntityId() + " of type "
          + entity.getEntityType(), e);
      TimelinePutError error = new TimelinePutError();
      error.setEntityId(entity.getEntityId());
      error.setEntityType(entity.getEntityType());
      error.setErrorCode(TimelinePutError.IO_EXCEPTION);
      response.addError(error);
    }

    for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) {
      try {
        Long relatedEntityStartAndInsertTime = getAndSetStartTime(
            relatedEntity.getId(), relatedEntity.getType(),
            readReverseOrderedLong(revStartTime, 0), null);
        if (relatedEntityStartAndInsertTime == null) {
          throw new IOException("Error setting start time for related entity");
        }
        long relatedStartTimeLong = relatedEntityStartAndInsertTime;
        long relatedRoundedStartTime = entitydb
            .computeCurrentCheckMillis(relatedStartTimeLong);
        RollingWriteBatch relatedRollingWriteBatch = entityUpdates
            .get(relatedRoundedStartTime);
        if (relatedRollingWriteBatch == null) {
          DB db = entitydb.getDBForStartTime(relatedStartTimeLong);
          if (db != null) {
            WriteBatch relatedWriteBatch = db.createWriteBatch();
            relatedRollingWriteBatch = new RollingWriteBatch(db,
                relatedWriteBatch);
            entityUpdates
                .put(relatedRoundedStartTime, relatedRollingWriteBatch);
          }
        }
        if (relatedRollingWriteBatch == null) {
          // if no start time is found, add an error and return
          TimelinePutError error = new TimelinePutError();
          error.setEntityId(entity.getEntityId());
          error.setEntityType(entity.getEntityType());
          error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
          response.addError(error);
          continue;
        }
        WriteBatch relatedWriteBatch = relatedRollingWriteBatch.getWriteBatch();
        byte[] relatedEntityStartTime =
            writeReverseOrderedLong(relatedEntityStartAndInsertTime);
        // This is the new entity, the domain should be the same
        byte[] key = createDomainIdKey(relatedEntity.getId(),
            relatedEntity.getType(), relatedEntityStartTime);
        relatedWriteBatch.put(key, entity.getDomainId().getBytes(UTF_8));
        ++putCount;
        relatedWriteBatch.put(
            createRelatedEntityKey(relatedEntity.getId(),
                relatedEntity.getType(), relatedEntityStartTime,
                entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES);
        ++putCount;
        relatedWriteBatch.put(
            createEntityMarkerKey(relatedEntity.getId(),
                relatedEntity.getType(), relatedEntityStartTime), EMPTY_BYTES);
        ++putCount;
      } catch (IOException e) {
        LOG.error(
            "Error putting related entity " + relatedEntity.getId()
                + " of type " + relatedEntity.getType() + " for entity "
                + entity.getEntityId() + " of type " + entity.getEntityType(),
            e);
        TimelinePutError error = new TimelinePutError();
        error.setEntityId(entity.getEntityId());
        error.setEntityType(entity.getEntityType());
        error.setErrorCode(TimelinePutError.IO_EXCEPTION);
        response.addError(error);
      }
    }

    return putCount;
  }

  /**
   * For a given key / value pair that has been written to the db, write
   * additional entries to the db for each primary filter.
   */
  private static long writePrimaryFilterEntries(WriteBatch writeBatch,
      Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value)
      throws IOException {
    long putCount = 0;
    if (primaryFilters != null) {
      for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) {
        for (Object pfval : pf.getValue()) {
          writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval, key), value);
          ++putCount;
        }
      }
    }
    return putCount;
  }

  @Override
  public TimelinePutResponse put(TimelineEntities entities) {
    LOG.debug("Starting put");
    TimelinePutResponse response = new TimelinePutResponse();
    TreeMap<Long, RollingWriteBatch> entityUpdates =
        new TreeMap<Long, RollingWriteBatch>();
    TreeMap<Long, RollingWriteBatch> indexUpdates =
        new TreeMap<Long, RollingWriteBatch>();

    long entityCount = 0;
    long indexCount = 0;

    try {

      for (TimelineEntity entity : entities.getEntities()) {
        entityCount += putEntities(entityUpdates, indexUpdates, entity,
            response);
      }

      for (RollingWriteBatch entityUpdate : entityUpdates.values()) {
        entityUpdate.write();
      }

      for (RollingWriteBatch indexUpdate : indexUpdates.values()) {
        indexUpdate.write();
      }

    } finally {

      for (RollingWriteBatch entityRollingWriteBatch : entityUpdates.values()) {
        entityRollingWriteBatch.close();
      }
      for (RollingWriteBatch indexRollingWriteBatch : indexUpdates.values()) {
        indexRollingWriteBatch.close();
      }
    }
    LOG.debug("Put {} new leveldb entity entries and {} new leveldb index"
        + " entries from {} timeline entities", entityCount, indexCount,
        entities.getEntities().size());
    return response;
  }

  /**
   * Get the unique start time for a given entity as a byte array that sorts the
   * timestamps in reverse order (see
   * {@link GenericObjectMapper#writeReverseOrderedLong(long)}).
   *
   * @param entityId
   *          The id of the entity
   * @param entityType
   *          The type of the entity
   * @return A byte array, null if not found
   * @throws IOException
   */
  private byte[] getStartTime(String entityId, String entityType)
      throws IOException {
    Long l = getStartTimeLong(entityId, entityType);
    return l == null ? null : writeReverseOrderedLong(l);
  }

  /**
   * Get the unique start time for a given entity as a Long.
   *
   * @param entityId
   *          The id of the entity
   * @param entityType
   *          The type of the entity
   * @return A Long, null if not found
   * @throws IOException
   */
  private Long getStartTimeLong(String entityId, String entityType)
      throws IOException {
    EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
    // start time is not provided, so try to look it up
    if (startTimeReadCache.containsKey(entity)) {
      // found the start time in the cache
      return startTimeReadCache.get(entity);
    } else {
      // try to look up the start time in the db
      byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
      byte[] v = starttimedb.get(b);
      if (v == null) {
        // did not find the start time in the db
        return null;
      } else {
        // found the start time in the db
        Long l = readReverseOrderedLong(v, 0);
        startTimeReadCache.put(entity, l);
        return l;
      }
    }
  }

  /**
   * Get the unique start time for a given entity as a byte array that sorts the
   * timestamps in reverse order (see
   * {@link GenericObjectMapper#writeReverseOrderedLong(long)}). If the start
   * time doesn't exist, set it based on the information provided.
   *
   * @param entityId
   *          The id of the entity
   * @param entityType
   *          The type of the entity
   * @param startTime
   *          The start time of the entity, or null
   * @param events
   *          A list of events for the entity, or null
   * @return A StartAndInsertTime
   * @throws IOException
   */
  private Long getAndSetStartTime(String entityId,
      String entityType, Long startTime, List<TimelineEvent> events)
      throws IOException {
    EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
    Long time = startTimeWriteCache.get(entity);
    if (time != null) {
      // return the value in the cache
      return time;
    }
    if (startTime == null && events != null) {
      // calculate best guess start time based on lowest event time
      startTime = Long.MAX_VALUE;
      for (TimelineEvent e : events) {
        if (e.getTimestamp() < startTime) {
          startTime = e.getTimestamp();
        }
      }
    }
    // check the provided start time matches the db
    return checkStartTimeInDb(entity, startTime);
  }

  /**
   * Checks db for start time and returns it if it exists. If it doesn't exist,
   * writes the suggested start time (if it is not null). This is only called
   * when the start time is not found in the cache, so it adds it back into the
   * cache if it is found.
   */
  private Long checkStartTimeInDb(EntityIdentifier entity,
      Long suggestedStartTime) throws IOException {
    Long startAndInsertTime = null;
    // create lookup key for start time
    byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
    synchronized (this) {
      // retrieve value for key
      byte[] v = starttimedb.get(b);
      if (v == null) {
        // start time doesn't exist in db
        if (suggestedStartTime == null) {
          return null;
        }
        startAndInsertTime = suggestedStartTime;

        // write suggested start time
        starttimedb.put(b, writeReverseOrderedLong(suggestedStartTime));
      } else {
        // found start time in db, so ignore suggested start time
        startAndInsertTime = readReverseOrderedLong(v, 0);
      }
    }
    startTimeWriteCache.put(entity, startAndInsertTime);
    startTimeReadCache.put(entity, startAndInsertTime);
    return startAndInsertTime;
  }

  /**
   * Creates a key for looking up the start time of a given entity, of the form
   * START_TIME_LOOKUP_PREFIX + entity type + entity id.
   */
  private static byte[] createStartTimeLookupKey(String entityId,
      String entityType) throws IOException {
    return KeyBuilder.newInstance().add(entityType).add(entityId).getBytes();
  }

  /**
   * Creates an entity marker, serializing ENTITY_ENTRY_PREFIX + entity type +
   * revstarttime + entity id.
   */
  private static byte[] createEntityMarkerKey(String entityId,
      String entityType, byte[] revStartTime) throws IOException {
    return KeyBuilder.newInstance().add(entityType).add(revStartTime)
        .add(entityId).getBytesForLookup();
  }

  /**
   * Creates an index entry for the given key of the form INDEXED_ENTRY_PREFIX +
   * primaryfiltername + primaryfiltervalue + key.
   */
  private static byte[] addPrimaryFilterToKey(String primaryFilterName,
      Object primaryFilterValue, byte[] key) throws IOException {
    return KeyBuilder.newInstance().add(primaryFilterName)
        .add(fstConf.asByteArray(primaryFilterValue), true).add(key).getBytes();
  }

  /**
   * Creates an event object from the given key, offset, and value. If the event
   * type is not contained in the specified set of event types, returns null.
   */
  private static TimelineEvent getEntityEvent(Set<String> eventTypes,
      byte[] key, int offset, byte[] value) throws IOException {
    KeyParser kp = new KeyParser(key, offset);
    long ts = kp.getNextLong();
    String tstype = kp.getNextString();
    if (eventTypes == null || eventTypes.contains(tstype)) {
      TimelineEvent event = new TimelineEvent();
      event.setTimestamp(ts);
      event.setEventType(tstype);
      Object o = null;
      try {
        o = fstConf.asObject(value);
      } catch (Exception ignore) {
        try {
          // Fall back to 2.24 parser
          o = fstConf224.asObject(value);
        } catch (Exception e) {
          LOG.warn("Error while decoding " + tstype, e);
        }
      }
      if (o == null) {
        event.setEventInfo(null);
      } else if (o instanceof Map) {
        @SuppressWarnings("unchecked")
        Map<String, Object> m = (Map<String, Object>) o;
        event.setEventInfo(m);
      } else {
        throw new IOException("Couldn't deserialize event info map");
      }
      return event;
    }
    return null;
  }

  /**
   * Parses the primary filter from the given key at the given offset and adds
   * it to the given entity.
   */
  private static void addPrimaryFilter(TimelineEntity entity, byte[] key,
      int offset) throws IOException {
    KeyParser kp = new KeyParser(key, offset);
    String name = kp.getNextString();
    byte[] bytes = kp.getRemainingBytes();
    Object value = null;
    try {
      value = fstConf.asObject(bytes);
      entity.addPrimaryFilter(name, value);
    } catch (Exception ignore) {
      try {
        // Fall back to 2.24 parser
        value = fstConf224.asObject(bytes);
        entity.addPrimaryFilter(name, value);
      } catch (Exception e) {
        LOG.warn("Error while decoding " + name, e);
      }
    }
  }

  /**
   * Creates a string representation of the byte array from the given offset to
   * the end of the array (for parsing other info keys).
   */
  private static String parseRemainingKey(byte[] b, int offset) {
    return new String(b, offset, b.length - offset, UTF_8);
  }

  /**
   * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX + entity type
   * + revstarttime + entity id + RELATED_ENTITIES_COLUMN + relatedentity type +
   * relatedentity id.
   */
  private static byte[] createRelatedEntityKey(String entityId,
      String entityType, byte[] revStartTime, String relatedEntityId,
      String relatedEntityType) throws IOException {
    return KeyBuilder.newInstance().add(entityType).add(revStartTime)
        .add(entityId).add(RELATED_ENTITIES_COLUMN).add(relatedEntityType)
        .add(relatedEntityId).getBytes();
  }

  /**
   * Parses the related entity from the given key at the given offset and adds
   * it to the given entity.
   */
  private static void addRelatedEntity(TimelineEntity entity, byte[] key,
      int offset) throws IOException {
    KeyParser kp = new KeyParser(key, offset);
    String type = kp.getNextString();
    String id = kp.getNextString();
    entity.addRelatedEntity(type, id);
  }

  /**
   * Creates a domain id key, serializing ENTITY_ENTRY_PREFIX + entity type +
   * revstarttime + entity id + DOMAIN_ID_COLUMN.
   */
  private static byte[] createDomainIdKey(String entityId, String entityType,
      byte[] revStartTime) throws IOException {
    return KeyBuilder.newInstance().add(entityType).add(revStartTime)
        .add(entityId).add(DOMAIN_ID_COLUMN).getBytes();
  }

  /**
   * Clears the cache to test reloading start times from leveldb (only for
   * testing).
   */
  @VisibleForTesting
  void clearStartTimeCache() {
    startTimeWriteCache.clear();
    startTimeReadCache.clear();
  }

  @VisibleForTesting
  static int getStartTimeReadCacheSize(Configuration conf) {
    return conf
        .getInt(
            TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
            DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
  }

  @VisibleForTesting
  static int getStartTimeWriteCacheSize(Configuration conf) {
    return conf
        .getInt(
            TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
            DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
  }

  @VisibleForTesting
  long evictOldStartTimes(long minStartTime) throws IOException {
    LOG.info("Searching for start times to evict earlier than " + minStartTime);

    long batchSize = 0;
    long totalCount = 0;
    long startTimesCount = 0;

    WriteBatch writeBatch = null;

    ReadOptions readOptions = new ReadOptions();
    readOptions.fillCache(false);
    try (DBIterator iterator = starttimedb.iterator(readOptions)) {

      // seek to the first start time entry
      iterator.seekToFirst();
      writeBatch = starttimedb.createWriteBatch();

      // evaluate each start time entry to see if it needs to be evicted or not
      while (iterator.hasNext()) {
        Map.Entry<byte[], byte[]> current = iterator.next();
        byte[] entityKey = current.getKey();
        byte[] entityValue = current.getValue();
        long startTime = readReverseOrderedLong(entityValue, 0);
        if (startTime < minStartTime) {
          ++batchSize;
          ++startTimesCount;
          writeBatch.delete(entityKey);

          // a large delete will hold the lock for too long
          if (batchSize >= writeBatchSize) {
            LOG.debug("Preparing to delete a batch of {} old start times",
                batchSize);
            starttimedb.write(writeBatch);
            LOG.debug("Deleted batch of {}. Total start times deleted"
                + " so far this cycle: {}", batchSize, startTimesCount);
            IOUtils.cleanupWithLogger(LOG, writeBatch);
            writeBatch = starttimedb.createWriteBatch();
            batchSize = 0;
          }
        }
        ++totalCount;
      }
      LOG.debug("Preparing to delete a batch of {} old start times",
          batchSize);
      starttimedb.write(writeBatch);
      LOG.debug("Deleted batch of {}. Total start times deleted so far"
          + " this cycle: {}", batchSize, startTimesCount);
      LOG.info("Deleted " + startTimesCount + "/" + totalCount
          + " start time entities earlier than " + minStartTime);
    } finally {
      IOUtils.cleanupWithLogger(LOG, writeBatch);
    }
    return startTimesCount;
  }

  /**
   * Discards entities with start timestamp less than or equal to the given
   * timestamp.
   */
  @VisibleForTesting
  void discardOldEntities(long timestamp) throws IOException,
      InterruptedException {
    long totalCount = 0;
    long t1 = System.currentTimeMillis();
    try {
      totalCount += evictOldStartTimes(timestamp);
      indexdb.evictOldDBs();
      entitydb.evictOldDBs();
    } finally {
      long t2 = System.currentTimeMillis();
      LOG.info("Discarded " + totalCount + " entities for timestamp "
          + timestamp + " and earlier in " + (t2 - t1) / 1000.0 + " seconds");
    }
  }

  Version loadVersion() throws IOException {
    byte[] data = starttimedb.get(bytes(TIMELINE_STORE_VERSION_KEY));
    // if version is not stored previously, treat it as 1.0.
    if (data == null || data.length == 0) {
      return Version.newInstance(1, 0);
    }
    Version version = new VersionPBImpl(VersionProto.parseFrom(data));
    return version;
  }

  // Only used for test
  @VisibleForTesting
  void storeVersion(Version state) throws IOException {
    dbStoreVersion(state);
  }

  private void dbStoreVersion(Version state) throws IOException {
    String key = TIMELINE_STORE_VERSION_KEY;
    byte[] data = ((VersionPBImpl) state).getProto().toByteArray();
    try {
      starttimedb.put(bytes(key), data);
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

  Version getCurrentVersion() {
    return CURRENT_VERSION_INFO;
  }

  /**
   * 1) Versioning timeline store: major.minor. For e.g. 1.0, 1.1, 1.2...1.25,
   * 2.0 etc. 2) Any incompatible change of TS-store is a major upgrade, and any
   * compatible change of TS-store is a minor upgrade. 3) Within a minor
   * upgrade, say 1.1 to 1.2: overwrite the version info and proceed as normal.
   * 4) Within a major upgrade, say 1.2 to 2.0: throw exception and indicate
   * user to use a separate upgrade tool to upgrade timeline store or remove
   * incompatible old state.
   */
  private void checkVersion() throws IOException {
    Version loadedVersion = loadVersion();
    LOG.info("Loaded timeline store version info " + loadedVersion);
    if (loadedVersion.equals(getCurrentVersion())) {
      return;
    }
    if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
      LOG.info("Storing timeline store version info " + getCurrentVersion());
      dbStoreVersion(CURRENT_VERSION_INFO);
    } else {
      String incompatibleMessage = "Incompatible version for timeline store: "
          + "expecting version " + getCurrentVersion()
          + ", but loading version " + loadedVersion;
      LOG.error(incompatibleMessage);
      throw new IOException(incompatibleMessage);
    }
  }

  // TODO: make data retention work with the domain data as well
  @Override
  public void put(TimelineDomain domain) throws IOException {
    try (WriteBatch domainWriteBatch = domaindb.createWriteBatch();
         WriteBatch ownerWriteBatch = ownerdb.createWriteBatch();) {

      if (domain.getId() == null || domain.getId().length() == 0) {
        throw new IllegalArgumentException("Domain doesn't have an ID");
      }
      if (domain.getOwner() == null || domain.getOwner().length() == 0) {
        throw new IllegalArgumentException("Domain doesn't have an owner.");
      }

      // Write description
      byte[] domainEntryKey = createDomainEntryKey(domain.getId(),
          DESCRIPTION_COLUMN);
      byte[] ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
          domain.getId(), DESCRIPTION_COLUMN);
      if (domain.getDescription() != null) {
        domainWriteBatch.put(domainEntryKey,
            domain.getDescription().getBytes(UTF_8));
        ownerWriteBatch.put(ownerLookupEntryKey, domain.getDescription()
            .getBytes(UTF_8));
      } else {
        domainWriteBatch.put(domainEntryKey, EMPTY_BYTES);
        ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
      }

      // Write owner
      domainEntryKey = createDomainEntryKey(domain.getId(), OWNER_COLUMN);
      ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
          domain.getId(), OWNER_COLUMN);
      // Null check for owner is done before
      domainWriteBatch.put(domainEntryKey, domain.getOwner().getBytes(UTF_8));
      ownerWriteBatch.put(ownerLookupEntryKey, domain.getOwner()
          .getBytes(UTF_8));

      // Write readers
      domainEntryKey = createDomainEntryKey(domain.getId(), READER_COLUMN);
      ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
          domain.getId(), READER_COLUMN);
      if (domain.getReaders() != null && domain.getReaders().length() > 0) {
        domainWriteBatch.put(domainEntryKey, domain.getReaders()
            .getBytes(UTF_8));
        ownerWriteBatch.put(ownerLookupEntryKey,
            domain.getReaders().getBytes(UTF_8));
      } else {
        domainWriteBatch.put(domainEntryKey, EMPTY_BYTES);
        ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
      }

      // Write writers
      domainEntryKey = createDomainEntryKey(domain.getId(), WRITER_COLUMN);
      ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
          domain.getId(), WRITER_COLUMN);
      if (domain.getWriters() != null && domain.getWriters().length() > 0) {
        domainWriteBatch.put(domainEntryKey, domain.getWriters()
            .getBytes(UTF_8));
        ownerWriteBatch.put(ownerLookupEntryKey,
            domain.getWriters().getBytes(UTF_8));
      } else {
        domainWriteBatch.put(domainEntryKey, EMPTY_BYTES);
        ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
      }

      // Write creation time and modification time
      // We put both timestamps together because they are always retrieved
      // together, and store them in the same way as we did for the entity's
      // start time and insert time.
      domainEntryKey = createDomainEntryKey(domain.getId(), TIMESTAMP_COLUMN);
      ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
          domain.getId(), TIMESTAMP_COLUMN);
      long currentTimestamp = System.currentTimeMillis();
      byte[] timestamps = domaindb.get(domainEntryKey);
      if (timestamps == null) {
        timestamps = new byte[16];
        writeReverseOrderedLong(currentTimestamp, timestamps, 0);
        writeReverseOrderedLong(currentTimestamp, timestamps, 8);
      } else {
        writeReverseOrderedLong(currentTimestamp, timestamps, 8);
      }
      domainWriteBatch.put(domainEntryKey, timestamps);
      ownerWriteBatch.put(ownerLookupEntryKey, timestamps);
      domaindb.write(domainWriteBatch);
      ownerdb.write(ownerWriteBatch);
    }
  }

  /**
   * Creates a domain entity key with column name suffix, of the form
   * DOMAIN_ENTRY_PREFIX + domain id + column name.
   */
  private static byte[] createDomainEntryKey(String domainId, byte[] columnName)
      throws IOException {
    return KeyBuilder.newInstance().add(domainId).add(columnName).getBytes();
  }

  /**
   * Creates an owner lookup key with column name suffix, of the form
   * OWNER_LOOKUP_PREFIX + owner + domain id + column name.
   */
  private static byte[] createOwnerLookupKey(String owner, String domainId,
      byte[] columnName) throws IOException {
    return KeyBuilder.newInstance().add(owner).add(domainId).add(columnName)
        .getBytes();
  }

  @Override
  public TimelineDomain getDomain(String domainId) throws IOException {
    try (DBIterator iterator = domaindb.iterator()) {
      byte[] prefix = KeyBuilder.newInstance().add(domainId)
          .getBytesForLookup();
      iterator.seek(prefix);
      return getTimelineDomain(iterator, domainId, prefix);
    }
  }

  @Override
  public TimelineDomains getDomains(String owner) throws IOException {
    try (DBIterator iterator = ownerdb.iterator()) {
      byte[] prefix = KeyBuilder.newInstance().add(owner).getBytesForLookup();
      iterator.seek(prefix);
      List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
      while (iterator.hasNext()) {
        byte[] key = iterator.peekNext().getKey();
        if (!prefixMatches(prefix, prefix.length, key)) {
          break;
        }
        // Iterator to parse the rows of an individual domain
        KeyParser kp = new KeyParser(key, prefix.length);
        String domainId = kp.getNextString();
        byte[] prefixExt = KeyBuilder.newInstance().add(owner).add(domainId)
            .getBytesForLookup();
        TimelineDomain domainToReturn = getTimelineDomain(iterator, domainId,
            prefixExt);
        if (domainToReturn != null) {
          domains.add(domainToReturn);
        }
      }
      // Sort the domains to return
      Collections.sort(domains, new Comparator<TimelineDomain>() {
        @Override
        public int compare(TimelineDomain domain1, TimelineDomain domain2) {
          int result = domain2.getCreatedTime().compareTo(
              domain1.getCreatedTime());
          if (result == 0) {
            return domain2.getModifiedTime().compareTo(
                domain1.getModifiedTime());
          } else {
            return result;
          }
        }
      });
      TimelineDomains domainsToReturn = new TimelineDomains();
      domainsToReturn.addDomains(domains);
      return domainsToReturn;
    }
  }

  private static TimelineDomain getTimelineDomain(DBIterator iterator,
      String domainId, byte[] prefix) throws IOException {
    // Iterate over all the rows whose key starts with prefix to retrieve the
    // domain information.
    TimelineDomain domain = new TimelineDomain();
    domain.setId(domainId);
    boolean noRows = true;
    for (; iterator.hasNext(); iterator.next()) {
      byte[] key = iterator.peekNext().getKey();
      if (!prefixMatches(prefix, prefix.length, key)) {
        break;
      }
      if (noRows) {
        noRows = false;
      }
      byte[] value = iterator.peekNext().getValue();
      if (value != null && value.length > 0) {
        if (key[prefix.length] == DESCRIPTION_COLUMN[0]) {
          domain.setDescription(new String(value, UTF_8));
        } else if (key[prefix.length] == OWNER_COLUMN[0]) {
          domain.setOwner(new String(value, UTF_8));
        } else if (key[prefix.length] == READER_COLUMN[0]) {
          domain.setReaders(new String(value, UTF_8));
        } else if (key[prefix.length] == WRITER_COLUMN[0]) {
          domain.setWriters(new String(value, UTF_8));
        } else if (key[prefix.length] == TIMESTAMP_COLUMN[0]) {
          domain.setCreatedTime(readReverseOrderedLong(value, 0));
          domain.setModifiedTime(readReverseOrderedLong(value, 8));
        } else {
          LOG.error("Unrecognized domain column: " + key[prefix.length]);
        }
      }
    }
    if (noRows) {
      return null;
    } else {
      return domain;
    }
  }
}