KllHeapItemsSketch.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.datasketches.kll;
import static org.apache.datasketches.common.ByteArrayUtil.copyBytes;
import static org.apache.datasketches.kll.KllPreambleUtil.DATA_START_ADR;
import static org.apache.datasketches.kll.KllPreambleUtil.N_LONG_ADR;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_EMPTY;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_FULL;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_SINGLE;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.UPDATABLE;
import java.lang.reflect.Array;
import java.util.Comparator;
import org.apache.datasketches.common.ArrayOfItemsSerDe;
import org.apache.datasketches.common.SketchesArgumentException;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
/**
* This class implements an on-heap items KllSketch.
*
* <p>Please refer to the documentation in the package-info:<br>
* {@link org.apache.datasketches.kll}</p>
*
* @author Lee Rhodes, Kevin Lang
*/
@SuppressWarnings("unchecked")
final class KllHeapItemsSketch<T> extends KllItemsSketch<T> {
private final int k; // configured size of K.
private final int m; // configured size of M.
private long n; // number of items input into this sketch.
private int minK; // dynamic minK for error estimation after merging with different k.
private boolean isLevelZeroSorted;
private T minItem;
private T maxItem;
private Object[] itemsArr;
/**
* New instance heap constructor.
* @param k parameter that controls size of the sketch and accuracy of estimates.
* @param m parameter controls the minimum level width in items. It can be 2, 4, 6 or 8.
* The DEFAULT_M, which is 8 is recommended. Other sizes of <em>m</em> should be considered
* experimental as they have not been as well characterized.
* @param comparator user specified comparator of type T.
* @param serDe serialization / deserialization class
*/
KllHeapItemsSketch(final int k, final int m, final Comparator<? super T> comparator,
final ArrayOfItemsSerDe<T> serDe) {
super(UPDATABLE, comparator, serDe);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
this.levelsArr = new int[] {k, k};
this.readOnly = false;
this.k = k;
this.m = m;
this.n = 0;
this.minK = k;
this.isLevelZeroSorted = false;
this.minItem = null;
this.maxItem = null;
this.itemsArr = new Object[k];
}
/**
* Used for creating a temporary sketch for use with weighted updates.
*/
KllHeapItemsSketch(final int k, final int m, final T item, final long weight, final Comparator<? super T> comparator,
final ArrayOfItemsSerDe<T> serDe) {
super(UPDATABLE, comparator, serDe);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
this.levelsArr = KllHelper.createLevelsArray(weight);
this.readOnly = false;
this.k = k;
this.m = m;
this.n = weight;
this.minK = k;
this.isLevelZeroSorted = false;
this.minItem = item;
this.maxItem = item;
this.itemsArr = KllItemsHelper.createItemsArray(serDe.getClassOfT(), item, weight);
}
/**
* The Heapify constructor
* @param srcMem the Source Memory image that contains data.
* @param comparator the comparator for this sketch and given Memory.
* @param serDe the serializer / deserializer for this sketch and the given Memory.
*/
KllHeapItemsSketch(
final Memory srcMem,
final Comparator<? super T> comparator,
final ArrayOfItemsSerDe<T> serDe) {
super(SketchStructure.UPDATABLE, comparator, serDe);
final KllMemoryValidate memVal = new KllMemoryValidate(srcMem, SketchType.ITEMS_SKETCH, serDe);
this.k = memVal.k;
this.m = memVal.m;
this.levelsArr = memVal.levelsArr;
this.readOnly = false;
this.n = memVal.n;
this.minK = memVal.minK;
this.isLevelZeroSorted = memVal.level0SortedFlag;
this.itemsArr = new Object[levelsArr[memVal.numLevels]]; //updatable size
final SketchStructure memStruct = memVal.sketchStructure;
if (memStruct == COMPACT_EMPTY) {
this.minItem = null;
this.maxItem = null;
this.itemsArr = new Object[k];
} else if (memStruct == COMPACT_SINGLE) {
final int offset = N_LONG_ADR;
final T item = serDe.deserializeFromMemory(srcMem, offset, 1)[0];
this.minItem = item;
this.maxItem = item;
itemsArr[k - 1] = item;
} else if (memStruct == COMPACT_FULL) {
int offset = DATA_START_ADR + memVal.numLevels * Integer.BYTES;
this.minItem = serDe.deserializeFromMemory(srcMem, offset, 1)[0];
offset += serDe.sizeOf(minItem);
this.maxItem = serDe.deserializeFromMemory(srcMem, offset, 1)[0];
offset += serDe.sizeOf(maxItem);
final int numRetained = levelsArr[memVal.numLevels] - levelsArr[0];
final Object[] retItems = serDe.deserializeFromMemory(srcMem, offset, numRetained);
System.arraycopy(retItems, 0, itemsArr, levelsArr[0], numRetained);
} else { //memStruct == UPDATABLE
throw new SketchesArgumentException(UNSUPPORTED_MSG + "UPDATABLE");
}
}
//End of constructors
@Override
String getItemAsString(final int index) {
if (isEmpty()) { return "Null"; }
return serDe.toString((T)(itemsArr[index]));
}
@Override
public int getK() {
return k;
}
@Override
public T getMaxItem() {
if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
return maxItem;
}
@Override
String getMaxItemAsString() {
if (isEmpty()) { return "Null"; }
return serDe.toString(maxItem);
}
@Override
public T getMinItem() {
if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
return minItem;
}
@Override
String getMinItemAsString() {
if (isEmpty()) { return "Null"; }
return serDe.toString(minItem);
}
@Override
public long getN() {
return n;
}
//restricted
@Override
int getM() {
return m;
}
@Override
int getMinK() {
return minK;
}
@Override
byte[] getMinMaxByteArr() {
final byte[] minBytes = serDe.serializeToByteArray(minItem);
final byte[] maxBytes = serDe.serializeToByteArray(maxItem);
final byte[] minMaxBytes = new byte[minBytes.length + maxBytes.length];
copyBytes(minBytes, 0, minMaxBytes, 0, minBytes.length);
copyBytes(maxBytes, 0, minMaxBytes, minBytes.length, maxBytes.length);
return minMaxBytes;
}
@Override
int getMinMaxSizeBytes() {
final int minBytes = serDe.sizeOf(minItem);
final int maxBytes = serDe.sizeOf(maxItem);
return minBytes + maxBytes;
}
@Override
T[] getRetainedItemsArray() {
final int numRet = getNumRetained();
final T[] outArr = (T[]) Array.newInstance(serDe.getClassOfT(), numRet);
System.arraycopy(itemsArr, levelsArr[0], outArr, 0 , numRet);
return outArr;
}
@Override
byte[] getRetainedItemsByteArr() {
final T[] retArr = getRetainedItemsArray();
return serDe.serializeToByteArray(retArr);
}
@Override
int getRetainedItemsSizeBytes() {
return getRetainedItemsByteArr().length;
}
@Override
T getSingleItem() {
if (n != 1L) { throw new SketchesArgumentException(NOT_SINGLE_ITEM_MSG); }
final T item = (T) itemsArr[k - 1];
return item;
}
@Override
byte[] getSingleItemByteArr() {
return serDe.serializeToByteArray(getSingleItem());
}
@Override
int getSingleItemSizeBytes() {
return serDe.sizeOf(getSingleItem());
}
@Override
T[] getTotalItemsArray() {
if (n == 0) { return (T[]) Array.newInstance(serDe.getClassOfT(), k); }
final T[] outArr = (T[]) Array.newInstance(serDe.getClassOfT(), itemsArr.length);
System.arraycopy(itemsArr, 0, outArr, 0, itemsArr.length);
return outArr;
}
@Override
WritableMemory getWritableMemory() {
return null;
}
@Override
void incN(final int increment) { n += increment; }
@Override
boolean isLevelZeroSorted() {
return isLevelZeroSorted;
}
@Override
void setLevelZeroSorted(final boolean sorted) {
isLevelZeroSorted = sorted;
}
@Override
void setMinK(final int minK) {
this.minK = minK;
}
@Override
void setN(final long n) {
this.n = n;
}
@Override
void setItemsArray(final Object[] itemsArr) {
this.itemsArr = itemsArr;
}
@Override
void setItemsArrayAt(final int index, final Object item) {
this.itemsArr[index] = item;
}
@Override
void setMaxItem(final Object item) {
this.maxItem = (T) item;
}
@Override
void setMinItem(final Object item) {
this.minItem = (T) item;
}
}