KllItemsSketch.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.datasketches.kll;

import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.UPDATABLE;
import static org.apache.datasketches.kll.KllSketch.SketchType.ITEMS_SKETCH;

import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Objects;

import org.apache.datasketches.common.ArrayOfItemsSerDe;
import org.apache.datasketches.common.SketchesArgumentException;
import org.apache.datasketches.common.Util;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.MemoryRequestServer;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.quantilescommon.GenericPartitionBoundaries;
import org.apache.datasketches.quantilescommon.ItemsSketchSortedView;
import org.apache.datasketches.quantilescommon.QuantileSearchCriteria;
import org.apache.datasketches.quantilescommon.QuantilesGenericAPI;
import org.apache.datasketches.quantilescommon.QuantilesGenericSketchIterator;

/**
 * This variation of the KllSketch implements generic data types. The user must provide
 * a suitable implementation of the <i>java.lang.Comparator</i> as well as an implementation of
 * the serializer / deserializer, <i>org.apache.datasketches.common.ArrayOfItemsSerDe</i>.
 * @param <T> The sketch data type.
 * @see org.apache.datasketches.kll.KllSketch
 */
@SuppressWarnings("unchecked")
public abstract class KllItemsSketch<T> extends KllSketch implements QuantilesGenericAPI<T> {
  private ItemsSketchSortedView<T> itemsSV = null;
  final Comparator<? super T> comparator;
  final ArrayOfItemsSerDe<T> serDe;

  KllItemsSketch( //pass-through constructor
      final SketchStructure skStructure,
      final Comparator<? super T> comparator,
      final ArrayOfItemsSerDe<T> serDe) {
    super(ITEMS_SKETCH, skStructure);
    Objects.requireNonNull(comparator, "Comparator must not be null.");
    Objects.requireNonNull(serDe, "SerDe must not be null.");
    this.comparator = comparator;
    this.serDe = serDe;
  }

  //Factories for new heap instances.

  /**
   * Create a new heap instance of this sketch with the default <em>k = 200</em>.
   * The default <em>k</em> = 200 results in a normalized rank error of about
   * 1.65%. Larger K will have smaller error but the sketch will be larger (and slower).
   * @param comparator to compare items
   * @param serDe Serializer / deserializer for an array of items, <i>T[]</i>.
   * @param <T> The sketch data type.
   * @return new KllItemsSketch on the Java heap.
   */
  public static <T> KllItemsSketch<T> newHeapInstance(
      final Comparator<? super T> comparator,
      final ArrayOfItemsSerDe<T> serDe) {
    final KllItemsSketch<T> itmSk =
        new KllHeapItemsSketch<>(DEFAULT_K, DEFAULT_M, comparator, serDe);
    return itmSk;
  }

  /**
   * Create a new heap instance of this sketch with a given parameter <em>k</em>.
   * <em>k</em> can be between DEFAULT_M and 65535, inclusive.
   * The default <em>k</em> = 200 results in a normalized rank error of about
   * 1.65%. Larger K will have smaller error but the sketch will be larger (and slower).
   * @param k parameter that controls size of the sketch and accuracy of estimates.
   * @param comparator to compare items
   * @param serDe Serializer / deserializer for items of type <i>T</i> and <i>T[]</i>.
   * @param <T> The sketch data type
   * @return new KllItemsSketch on the heap.
   */
  public static <T> KllItemsSketch<T> newHeapInstance(
      final int k,
      final Comparator<? super T> comparator,
      final ArrayOfItemsSerDe<T> serDe) {
    return new KllHeapItemsSketch<>(k, DEFAULT_M, comparator, serDe);
  }

  // Factory to create an heap instance from a Memory image

  /**
   * Factory heapify takes a compact sketch image in Memory and instantiates an on-heap sketch.
   * The resulting sketch will not retain any link to the source Memory.
   * @param srcMem a compact Memory image of a sketch serialized by this sketch and of the same type of T.
   * @param comparator to compare items
   * @param serDe Serializer / deserializer for items of type <i>T</i> and <i>T[]</i>.
   * @param <T> The sketch data type
   * @return a heap-based sketch based on the given Memory.
   */
  public static <T> KllItemsSketch<T> heapify(
      final Memory srcMem,
      final Comparator<? super T> comparator,
      final ArrayOfItemsSerDe<T> serDe) {
    return new KllHeapItemsSketch<>(srcMem, comparator, serDe);
  }

  //Factory to wrap a Read-Only Memory

  /**
   * Constructs a thin wrapper on the heap around a Memory (or WritableMemory) already initialized with a
   * validated sketch image of a type T consistent with the given comparator and serDe.
   * A reference to the Memory is kept in the sketch and must remain in scope consistent
   * with the temporal scope of this sketch. The amount of data kept on the heap is very small.
   * All of the item data originally collected by the given Memory sketch object remains in the
   * Memory object
   * @param srcMem the Memory object that this sketch will wrap.
   * @param comparator to compare items
   * @param serDe Serializer / deserializer for items of type <i>T</i> and <i>T[]</i>.
   * @param <T> The sketch data type
   * @return a heap-base sketch that is a thin wrapper around the given srcMem.
   */
  public static <T> KllItemsSketch<T> wrap(
      final Memory srcMem,
      final Comparator<? super T> comparator,
      final ArrayOfItemsSerDe<T> serDe) {
    final KllMemoryValidate memVal = new KllMemoryValidate(srcMem, SketchType.ITEMS_SKETCH, serDe);
    return new KllDirectCompactItemsSketch<>(memVal, comparator, serDe);
  }

  //END of Constructors

  @Override
  public double[] getCDF(final T[] splitPoints, final QuantileSearchCriteria searchCrit) {
    if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
    refreshSortedView();
    return itemsSV.getCDF(splitPoints, searchCrit);
  }

  @Override
  public Class<T> getClassOfT() { return serDe.getClassOfT(); }

  @Override
  public Comparator<? super T> getComparator() {
    return comparator;
  }

  @Override
  public GenericPartitionBoundaries<T> getPartitionBoundariesFromNumParts(
      final int numEquallySizedParts,
      final QuantileSearchCriteria searchCrit) {
    if (isEmpty()) { throw new IllegalArgumentException(EMPTY_MSG); }
    refreshSortedView();
    return itemsSV.getPartitionBoundariesFromNumParts(numEquallySizedParts, searchCrit);
  }

  @Override
  public GenericPartitionBoundaries<T> getPartitionBoundariesFromPartSize(
      final long nominalPartSizeItems,
      final QuantileSearchCriteria searchCrit) {
    if (isEmpty()) { throw new IllegalArgumentException(EMPTY_MSG); }
    refreshSortedView();
    return itemsSV.getPartitionBoundariesFromPartSize(nominalPartSizeItems, searchCrit);
  }

  @Override
  public double[] getPMF(final T[] splitPoints, final QuantileSearchCriteria searchCrit) {
    if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
    refreshSortedView();
    return itemsSV.getPMF(splitPoints, searchCrit);
  }

  @Override
  public T getQuantile(final double rank, final QuantileSearchCriteria searchCrit) {
    if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
    refreshSortedView();
    return itemsSV.getQuantile(rank, searchCrit);
  }

  @Override
  public T[] getQuantiles(final double[] ranks, final QuantileSearchCriteria searchCrit) {
    if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
    refreshSortedView();
    final int len = ranks.length;
    final T[] quantiles = (T[]) Array.newInstance(getMinItem().getClass(), len);
    for (int i = 0; i < len; i++) {
      quantiles[i] = itemsSV.getQuantile(ranks[i], searchCrit);
    }
    return quantiles;
  }

  @Override
  public T getQuantileLowerBound(final double rank) {
    return getQuantile(max(0, rank - KllHelper.getNormalizedRankError(getMinK(), false)));
  }

  @Override
  public T getQuantileUpperBound(final double rank) {
    return getQuantile(min(1.0, rank + KllHelper.getNormalizedRankError(getMinK(), false)));
  }

  @Override
  public double getRank(final T quantile, final QuantileSearchCriteria searchCrit) {
    if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
    refreshSortedView();
    return itemsSV.getRank(quantile, searchCrit);
  }

  /**
   * {@inheritDoc}
   * The approximate probability that the true rank is within the confidence interval
   * specified by the upper and lower rank bounds for this sketch is 0.99.
   */
  @Override
  public double getRankLowerBound(final double rank) {
    return max(0.0, rank - KllHelper.getNormalizedRankError(getMinK(), false));
  }

  /**
   * {@inheritDoc}
   * The approximate probability that the true rank is within the confidence interval
   * specified by the upper and lower rank bounds for this sketch is 0.99.
   */
  @Override
  public double getRankUpperBound(final double rank) {
    return min(1.0, rank + KllHelper.getNormalizedRankError(getMinK(), false));
  }

  @Override
  public double[] getRanks(final T[] quantiles, final QuantileSearchCriteria searchCrit) {
    if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
    refreshSortedView();
    final int len = quantiles.length;
    final double[] ranks = new double[len];
    for (int i = 0; i < len; i++) {
      ranks[i] = itemsSV.getRank(quantiles[i], searchCrit);
    }
    return ranks;
  }

  @Override
  public final ItemsSketchSortedView<T> getSortedView() {
    if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
    return refreshSortedView();
  }

  @Override
  public QuantilesGenericSketchIterator<T> iterator() {
    return new KllItemsSketchIterator<>(
        getTotalItemsArray(), getLevelsArray(SketchStructure.UPDATABLE), getNumLevels());
  }

  @Override
  public final void merge(final KllSketch other) {
    if (readOnly || sketchStructure != UPDATABLE) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
    if (this == other) { throw new SketchesArgumentException(SELF_MERGE_MSG); }
    final KllItemsSketch<T> othItmSk = (KllItemsSketch<T>)other;
    if (othItmSk.isEmpty()) { return; }
    KllItemsHelper.mergeItemImpl(this, othItmSk, comparator);
    itemsSV = null;
  }

  @Override
  public void reset() {
    if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
    final int k = getK();
    setN(0);
    setMinK(k);
    setNumLevels(1);
    setLevelZeroSorted(false);
    setLevelsArray(new int[] {k, k});
    setMinItem(null);
    setMaxItem(null);
    setItemsArray(new Object[k]);
    itemsSV = null;
  }

  /**
   * Export the current sketch as a compact byte array.
   * @return the current sketch as a compact byte array.
   */
  public byte[] toByteArray() {
    return KllHelper.toByteArray(this, false);
  }

  @Override
  public String toString(final boolean withLevels, final boolean withLevelsAndItems) {
    KllSketch sketch = this;
    if (hasMemory()) {
      final Memory mem = getWritableMemory();
      assert mem != null;
      sketch = KllItemsSketch.heapify((Memory)getWritableMemory(), comparator, serDe);
    }
    return KllHelper.toStringImpl(sketch, withLevels, withLevelsAndItems, getSerDe());
  }

  @Override
  public void update(final T item) {
    if (item == null) { return; } //ignore
    if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
    KllItemsHelper.updateItem(this, item);
    itemsSV = null;
  }

  /**
   * Weighted update. Updates this sketch with the given item the number of times specified by the given integer weight.
   * @param item the item to be repeated. NaNs are ignored.
   * @param weight the number of times the update of item is to be repeated. It must be &ge; one.
   */
  public void update(final T item, final long weight) {
    if (item == null) { return; } //ignore
    if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
    if (weight < 1L) { throw new SketchesArgumentException("Weight is less than one."); }
    if (weight == 1L) { KllItemsHelper.updateItem(this, item); }
    else { KllItemsHelper.updateItem(this, item, weight); }
    itemsSV = null;
  }

  //restricted

  @Override
  MemoryRequestServer getMemoryRequestServer() {
    //this is not used and must return a null
    return null;
  }

  @Override
  abstract byte[] getMinMaxByteArr();

  @Override
  abstract int getMinMaxSizeBytes();

  abstract T[] getRetainedItemsArray();

  @Override
  abstract byte[] getRetainedItemsByteArr();

  @Override
  abstract int getRetainedItemsSizeBytes();

  //abstract Object[] getRetainedItemsArray();

  @Override
  ArrayOfItemsSerDe<T> getSerDe() { return serDe; }

  abstract T getSingleItem();

  @Override
  abstract byte[] getSingleItemByteArr();

  @Override
  abstract int getSingleItemSizeBytes();

  /**
   * @return a full array of items as if the sketch was in COMPACT_FULL or UPDATABLE format.
   * This will include zeros and possibly some free space.
   */
  abstract T[] getTotalItemsArray();

  @Override
  byte[] getTotalItemsByteArr() {
    throw new SketchesArgumentException(UNSUPPORTED_MSG);
  }

  @Override
  int getTotalItemsNumBytes() {
    throw new SketchesArgumentException(UNSUPPORTED_MSG);
  }

  @Override
  void incNumLevels() {
    //this is not used and must be a no-op.
  }

  abstract void setItemsArray(Object[] ItemsArr);

  abstract void setItemsArrayAt(int index, Object item);

  abstract void setMaxItem(Object item);

  abstract void setMinItem(Object item);

  @Override
  void setNumLevels(final int numLevels) {
    // this is not used and must be a no-op.
  }

  @Override
  void setWritableMemory(final WritableMemory wmem) {
    throw new SketchesArgumentException(UNSUPPORTED_MSG + "Sketch not writable.");
  }

  void updateMinMax(final T item) {
    if (isEmpty()) {
      setMinItem(item);
      setMaxItem(item);
    } else {
      setMinItem(Util.minT(getMinItem(), item, comparator));
      setMaxItem(Util.maxT(getMaxItem(), item, comparator));
    }
  }

  private final ItemsSketchSortedView<T> refreshSortedView() {
    if (itemsSV == null) {
      final CreateSortedView csv = new CreateSortedView();
      itemsSV = csv.getSV();
    }
    return itemsSV;
  }

  @SuppressWarnings({"rawtypes"})
  private final class CreateSortedView {
    T[] quantiles;
    long[] cumWeights; //The new cumWeights array

    ItemsSketchSortedView<T> getSV() {
      if (isEmpty() || getN() == 0) { throw new SketchesArgumentException(EMPTY_MSG); }
      final T[] srcQuantiles = getTotalItemsArray();
      final int[] srcLevelsArr = levelsArr;
      final int srcNumLevels = getNumLevels();

      if (!isLevelZeroSorted()) {
        Arrays.sort(srcQuantiles, srcLevelsArr[0], srcLevelsArr[1], comparator);
        if (!hasMemory()) { setLevelZeroSorted(true); }
      }
      final int numQuantiles = getNumRetained();
      quantiles = (T[]) Array.newInstance(serDe.getClassOfT(), numQuantiles);
      cumWeights = new long[numQuantiles];
      populateFromSketch(srcQuantiles, srcLevelsArr, srcNumLevels, numQuantiles);
      final QuantilesGenericAPI<T> sk = KllItemsSketch.this;
      return new ItemsSketchSortedView(quantiles, cumWeights, sk);
    }

    private void populateFromSketch(final Object[] srcQuantiles, final int[] srcLevelsArr,
        final int srcNumLevels, final int numItems) {
        //Remove free space from both itemsArray and levels array
        final int[] myLevelsArr = new int[srcLevelsArr.length];
        final int offset = srcLevelsArr[0];
        System.arraycopy(srcQuantiles, offset, quantiles, 0, numItems); //remove free space from quantiles arr
        //fill the new cumWeights array with the correct weights and adjust the levels array to match.
        int srcLevel = 0;
        int dstLevel = 0;
        long weight = 1;
        while (srcLevel < srcNumLevels) {
          final int fromIndex = srcLevelsArr[srcLevel] - offset;
          final int toIndex = srcLevelsArr[srcLevel + 1] - offset; // exclusive
          if (fromIndex < toIndex) { // if equal, skip empty level
            Arrays.fill(cumWeights, fromIndex, toIndex, weight);
            myLevelsArr[dstLevel] = fromIndex;
            myLevelsArr[dstLevel + 1] = toIndex;
            dstLevel++;
          }
          srcLevel++;
          weight *= 2;
        }
        final int numLevels = dstLevel;
        blockyTandemMergeSort(quantiles, cumWeights, myLevelsArr, numLevels, comparator); //create unit weights
        KllHelper.convertToCumulative(cumWeights);
      }
  } //End of class CreateSortedView

    private static <T> void blockyTandemMergeSort(final Object[] quantiles, final long[] weights,
        final int[] levels, final int numLevels, final Comparator<? super T> comp) {
      if (numLevels == 1) { return; }

      // duplicate the input in preparation for the "ping-pong" copy reduction strategy.
      final Object[] quantilesTmp = Arrays.copyOf(quantiles, quantiles.length);
      final long[] weightsTmp = Arrays.copyOf(weights, quantiles.length); // don't need the extra one here

      blockyTandemMergeSortRecursion(quantilesTmp, weightsTmp, quantiles, weights, levels, 0, numLevels, comp);
    }

    private static <T> void blockyTandemMergeSortRecursion(
        final Object[] quantilesSrc, final long[] weightsSrc,
        final Object[] quantilesDst, final long[] weightsDst,
        final int[] levels, final int startingLevel, final int numLevels, final Comparator<? super T> comp) {
      if (numLevels == 1) { return; }
      final int numLevels1 = numLevels / 2;
      final int numLevels2 = numLevels - numLevels1;
      assert numLevels1 >= 1;
      assert numLevels2 >= numLevels1;
      final int startingLevel1 = startingLevel;
      final int startingLevel2 = startingLevel + numLevels1;
      // swap roles of src and dst
      blockyTandemMergeSortRecursion(
          quantilesDst, weightsDst,
          quantilesSrc, weightsSrc,
          levels, startingLevel1, numLevels1, comp);
      blockyTandemMergeSortRecursion(
          quantilesDst, weightsDst,
          quantilesSrc, weightsSrc,
          levels, startingLevel2, numLevels2, comp);
      tandemMerge(
          quantilesSrc, weightsSrc,
          quantilesDst, weightsDst,
          levels,
          startingLevel1, numLevels1,
          startingLevel2, numLevels2, comp);
    }

    private static <T> void tandemMerge(
        final Object[] quantilesSrc, final long[] weightsSrc,
        final Object[] quantilesDst, final long[] weightsDst,
        final int[] levelStarts,
        final int startingLevel1, final int numLevels1,
        final int startingLevel2, final int numLevels2, final Comparator<? super T> comp) {
      final int fromIndex1 = levelStarts[startingLevel1];
      final int toIndex1 = levelStarts[startingLevel1 + numLevels1]; // exclusive
      final int fromIndex2 = levelStarts[startingLevel2];
      final int toIndex2 = levelStarts[startingLevel2 + numLevels2]; // exclusive
      int iSrc1 = fromIndex1;
      int iSrc2 = fromIndex2;
      int iDst = fromIndex1;

      while (iSrc1 < toIndex1 && iSrc2 < toIndex2) {
        if (Util.lt((T) quantilesSrc[iSrc1], (T) quantilesSrc[iSrc2], comp)) {
          quantilesDst[iDst] = quantilesSrc[iSrc1];
          weightsDst[iDst] = weightsSrc[iSrc1];
          iSrc1++;
        } else {
          quantilesDst[iDst] = quantilesSrc[iSrc2];
          weightsDst[iDst] = weightsSrc[iSrc2];
          iSrc2++;
        }
        iDst++;
      }
      if (iSrc1 < toIndex1) {
        System.arraycopy(quantilesSrc, iSrc1, quantilesDst, iDst, toIndex1 - iSrc1);
        System.arraycopy(weightsSrc, iSrc1, weightsDst, iDst, toIndex1 - iSrc1);
      } else if (iSrc2 < toIndex2) {
        System.arraycopy(quantilesSrc, iSrc2, quantilesDst, iDst, toIndex2 - iSrc2);
        System.arraycopy(weightsSrc, iSrc2, weightsDst, iDst, toIndex2 - iSrc2);
      }
    }

}