DoublesSketch.java
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.datasketches.quantiles;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.System.arraycopy;
import static org.apache.datasketches.common.Util.ceilingPowerOf2;
import static org.apache.datasketches.quantiles.ClassicUtil.MAX_PRELONGS;
import static org.apache.datasketches.quantiles.ClassicUtil.MIN_K;
import static org.apache.datasketches.quantiles.ClassicUtil.checkIsCompactMemory;
import static org.apache.datasketches.quantiles.ClassicUtil.checkK;
import static org.apache.datasketches.quantiles.ClassicUtil.computeNumLevelsNeeded;
import static org.apache.datasketches.quantiles.ClassicUtil.computeRetainedItems;
import static org.apache.datasketches.quantiles.DoublesSketchAccessor.BB_LVL_IDX;
import java.util.Arrays;
import java.util.Random;
import org.apache.datasketches.common.SketchesArgumentException;
import org.apache.datasketches.common.SketchesStateException;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.quantilescommon.DoublesSketchSortedView;
import org.apache.datasketches.quantilescommon.QuantileSearchCriteria;
import org.apache.datasketches.quantilescommon.QuantilesAPI;
import org.apache.datasketches.quantilescommon.QuantilesDoublesAPI;
import org.apache.datasketches.quantilescommon.QuantilesDoublesSketchIterator;
/**
* This is an implementation of the Low Discrepancy Mergeable Quantiles Sketch, using doubles,
* described in section 3.2 of the journal version of the paper "Mergeable Summaries"
* by Agarwal, Cormode, Huang, Phillips, Wei, and Yi:
*
* <p>Reference: <a href="http://dblp.org/rec/html/journals/tods/AgarwalCHPWY13"></a></p>
*
* <p>A <i>k</i> of 128 produces a normalized, rank error of about 1.7%.
* For example, the median returned from getQuantile(0.5) will be between the actual quantiles
* from the hypothetically sorted array of input quantiles at normalized ranks of 0.483 and 0.517, with
* a confidence of about 99%.</p>
*
* <pre>
Table Guide for DoublesSketch Size in Bytes and Approximate Error:
K => | 16 32 64 128 256 512 1,024
~ Error => | 12.145% 6.359% 3.317% 1.725% 0.894% 0.463% 0.239%
N | Size in Bytes ->
------------------------------------------------------------------------
0 | 8 8 8 8 8 8 8
1 | 72 72 72 72 72 72 72
3 | 72 72 72 72 72 72 72
7 | 104 104 104 104 104 104 104
15 | 168 168 168 168 168 168 168
31 | 296 296 296 296 296 296 296
63 | 424 552 552 552 552 552 552
127 | 552 808 1,064 1,064 1,064 1,064 1,064
255 | 680 1,064 1,576 2,088 2,088 2,088 2,088
511 | 808 1,320 2,088 3,112 4,136 4,136 4,136
1,023 | 936 1,576 2,600 4,136 6,184 8,232 8,232
2,047 | 1,064 1,832 3,112 5,160 8,232 12,328 16,424
4,095 | 1,192 2,088 3,624 6,184 10,280 16,424 24,616
8,191 | 1,320 2,344 4,136 7,208 12,328 20,520 32,808
16,383 | 1,448 2,600 4,648 8,232 14,376 24,616 41,000
32,767 | 1,576 2,856 5,160 9,256 16,424 28,712 49,192
65,535 | 1,704 3,112 5,672 10,280 18,472 32,808 57,384
131,071 | 1,832 3,368 6,184 11,304 20,520 36,904 65,576
262,143 | 1,960 3,624 6,696 12,328 22,568 41,000 73,768
524,287 | 2,088 3,880 7,208 13,352 24,616 45,096 81,960
1,048,575 | 2,216 4,136 7,720 14,376 26,664 49,192 90,152
2,097,151 | 2,344 4,392 8,232 15,400 28,712 53,288 98,344
4,194,303 | 2,472 4,648 8,744 16,424 30,760 57,384 106,536
8,388,607 | 2,600 4,904 9,256 17,448 32,808 61,480 114,728
16,777,215 | 2,728 5,160 9,768 18,472 34,856 65,576 122,920
33,554,431 | 2,856 5,416 10,280 19,496 36,904 69,672 131,112
67,108,863 | 2,984 5,672 10,792 20,520 38,952 73,768 139,304
134,217,727 | 3,112 5,928 11,304 21,544 41,000 77,864 147,496
268,435,455 | 3,240 6,184 11,816 22,568 43,048 81,960 155,688
536,870,911 | 3,368 6,440 12,328 23,592 45,096 86,056 163,880
1,073,741,823 | 3,496 6,696 12,840 24,616 47,144 90,152 172,072
2,147,483,647 | 3,624 6,952 13,352 25,640 49,192 94,248 180,264
4,294,967,295 | 3,752 7,208 13,864 26,664 51,240 98,344 188,456
* </pre>
*
* @see QuantilesAPI
*/
public abstract class DoublesSketch implements QuantilesDoublesAPI {
/**
* Setting the seed makes the results of the sketch deterministic if the input quantiles are
* received in exactly the same order. This is only useful when performing test comparisons,
* otherwise is not recommended.
*/
static Random rand = new Random();
/**
* Parameter that controls space usage of sketch and accuracy of estimates.
*/
final int k_;
DoublesSketchSortedView doublesSV = null;
DoublesSketch(final int k) {
checkK(k);
k_ = k;
}
synchronized static void setRandom(final long seed) {
DoublesSketch.rand = new Random(seed);
}
/**
* Returns a new builder
* @return a new builder
*/
public static final DoublesSketchBuilder builder() {
return new DoublesSketchBuilder();
}
/**
* Heapify takes the sketch image in Memory and instantiates an on-heap Sketch.
* The resulting sketch will not retain any link to the source Memory.
* @param srcMem a Memory image of a Sketch.
* <a href="{@docRoot}/resources/dictionary.html#mem">See Memory</a>
* @return a heap-based Sketch based on the given Memory
*/
public static DoublesSketch heapify(final Memory srcMem) {
if (checkIsCompactMemory(srcMem)) {
return CompactDoublesSketch.heapify(srcMem);
}
return UpdateDoublesSketch.heapify(srcMem);
}
/**
* Wrap this sketch around the given Memory image of a DoublesSketch, compact or updatable.
* A DirectUpdateDoublesSketch can only wrap an updatable array, and a
* DirectCompactDoublesSketch can only wrap a compact array.
*
* @param srcMem the given Memory image of a DoublesSketch that may have data,
* @return a sketch that wraps the given srcMem
*/
public static DoublesSketch wrap(final Memory srcMem) {
if (checkIsCompactMemory(srcMem)) {
return DirectCompactDoublesSketch.wrapInstance(srcMem);
}
return DirectUpdateDoublesSketchR.wrapInstance(srcMem);
}
@Override
public double[] getCDF(final double[] splitPoints, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
refreshSortedView();
return doublesSV.getCDF(splitPoints, searchCrit);
}
@Override
public abstract double getMaxItem();
@Override
public abstract double getMinItem();
@Override
public double[] getPMF(final double[] splitPoints, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
refreshSortedView();
return doublesSV.getPMF(splitPoints, searchCrit);
}
@Override
public double getQuantile(final double rank, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
refreshSortedView();
return doublesSV.getQuantile(rank, searchCrit);
}
@Override
public double[] getQuantiles(final double[] ranks, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
refreshSortedView();
final int len = ranks.length;
final double[] quantiles = new double[len];
for (int i = 0; i < len; i++) {
quantiles[i] = doublesSV.getQuantile(ranks[i], searchCrit);
}
return quantiles;
}
/**
* {@inheritDoc}
* The approximate probability that the true quantile is within the confidence interval
* specified by the upper and lower quantile bounds for this sketch is 0.99.
*/
@Override
public double getQuantileLowerBound(final double rank) {
return getQuantile(max(0, rank - getNormalizedRankError(k_, false)));
}
/**
* {@inheritDoc}
* The approximate probability that the true quantile is within the confidence interval
* specified by the upper and lower quantile bounds for this sketch is 0.99.
*/
@Override
public double getQuantileUpperBound(final double rank) {
return getQuantile(min(1.0, rank + getNormalizedRankError(k_, false)));
}
@Override
public double getRank(final double quantile, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
refreshSortedView();
return doublesSV.getRank(quantile, searchCrit);
}
/**
* {@inheritDoc}
* The approximate probability that the true rank is within the confidence interval
* specified by the upper and lower rank bounds for this sketch is 0.99.
*/
@Override
public double getRankLowerBound(final double rank) {
return max(0.0, rank - getNormalizedRankError(k_, false));
}
/**
* {@inheritDoc}
* The approximate probability that the true rank is within the confidence interval
* specified by the upper and lower rank bounds for this sketch is 0.99.
*/
@Override
public double getRankUpperBound(final double rank) {
return min(1.0, rank + getNormalizedRankError(k_, false));
}
@Override
public double[] getRanks(final double[] quantiles, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
refreshSortedView();
final int len = quantiles.length;
final double[] ranks = new double[len];
for (int i = 0; i < len; i++) {
ranks[i] = doublesSV.getRank(quantiles[i], searchCrit);
}
return ranks;
}
@Override
public int getK() {
return k_;
}
@Override
public abstract long getN();
@Override
public double getNormalizedRankError(final boolean pmf) {
return getNormalizedRankError(k_, pmf);
}
/**
* Gets the normalized rank error given k and pmf.
* Static method version of the <i>getNormalizedRankError(boolean)</i>.
* The epsilon returned is a best fit to 99 percent confidence empirically measured max error
* in thousands of trials.
* @param k the configuration parameter
* @param pmf if true, returns the "double-sided" normalized rank error for the getPMF() function.
* Otherwise, it is the "single-sided" normalized rank error for all the other queries.
* @return if pmf is true, the normalized rank error for the getPMF() function.
* Otherwise, it is the "single-sided" normalized rank error for all the other queries.
*/
public static double getNormalizedRankError(final int k, final boolean pmf) {
return ClassicUtil.getNormalizedRankError(k, pmf);
}
/**
* Gets the approximate <em>k</em> to use given epsilon, the normalized rank error.
* @param epsilon the normalized rank error between zero and one.
* @param pmf if true, this function returns <em>k</em> assuming the input epsilon
* is the desired "double-sided" epsilon for the getPMF() function. Otherwise, this function
* returns <em>k</em> assuming the input epsilon is the desired "single-sided"
* epsilon for all the other queries.
* @return <i>k</i> given epsilon.
*/
public static int getKFromEpsilon(final double epsilon, final boolean pmf) {
return ClassicUtil.getKFromEpsilon(epsilon, pmf);
}
@Override
public abstract boolean hasMemory();
@Override
public abstract boolean isDirect();
@Override
public boolean isEmpty() {
return getN() == 0;
}
@Override
public boolean isEstimationMode() {
return getN() >= 2L * k_;
}
@Override
public abstract boolean isReadOnly();
/**
* Returns true if the backing resource of <i>this</i> is identical with the backing resource
* of <i>that</i>. The capacities must be the same. If <i>this</i> is a region,
* the region offset must also be the same.
* @param that A different non-null object
* @return true if the backing resource of <i>this</i> is the same as the backing resource
* of <i>that</i>.
*/
public boolean isSameResource(final Memory that) { //Overridden by direct sketches
return false;
}
@Override
public byte[] toByteArray() {
if (isCompact()) {
return toByteArray(true);
}
return toByteArray(false);
}
/**
* Serialize this sketch in a byte array form.
* @param compact if true the sketch will be serialized in compact form.
* DirectCompactDoublesSketch can wrap() only a compact byte array;
* DirectUpdateDoublesSketch can wrap() only a updatable byte array.
* @return this sketch in a byte array form.
*/
public byte[] toByteArray(final boolean compact) {
return DoublesByteArrayImpl.toByteArray(this, compact, compact);
}
/**
* Returns human readable summary information about this sketch.
* Used for debugging.
*/
@Override
public String toString() {
return toString(true, false);
}
/**
* Returns human readable summary information about this sketch.
* Used for debugging.
* @param withLevels if true includes sketch levels array summary information
* @param withLevelsAndItems if true include detail of levels array and items array together
* @return human readable summary information about this sketch.
*/
public String toString(final boolean withLevels, final boolean withLevelsAndItems) {
return DoublesUtil.toString(withLevels, withLevelsAndItems, this);
}
/**
* Returns a human readable string of the preamble of a byte array image of a DoublesSketch.
* Used for debugging.
* @param byteArr the given byte array
* @return a human readable string of the preamble of a byte array image of a DoublesSketch.
*/
public static String toString(final byte[] byteArr) {
return PreambleUtil.toString(byteArr, true);
}
/**
* Returns a human readable string of the preamble of a Memory image of a DoublesSketch.
* Used for debugging.
* @param mem the given Memory
* @return a human readable string of the preamble of a Memory image of a DoublesSketch.
*/
public static String toString(final Memory mem) {
return PreambleUtil.toString(mem, true);
}
/**
* From an source sketch, create a new sketch that must have a smaller K.
* The original sketch is not modified.
*
* @param srcSketch the sourcing sketch
* @param smallerK the new sketch's K that must be smaller than this K.
* It is required that this.getK() = smallerK * 2^(nonnegative integer).
* @param dstMem the destination Memory. It must not overlap the Memory of this sketch.
* If null, a heap sketch will be returned, otherwise it will be off-heap.
*
* @return the new sketch.
*/
public DoublesSketch downSample(final DoublesSketch srcSketch, final int smallerK,
final WritableMemory dstMem) {
return downSampleInternal(srcSketch, smallerK, dstMem);
}
@Override
public int getNumRetained() {
return computeRetainedItems(getK(), getN());
}
/**
* Returns the current number of bytes this sketch would require to store in the compact Memory Format.
* @return the current number of bytes this sketch would require to store in the compact Memory Format.
*/
public int getCurrentCompactSerializedSizeBytes() {
return getCompactSerialiedSizeBytes(getK(), getN());
}
/**
* Returns the number of bytes a DoublesSketch would require to store in compact form
* given <i>k</i> and <i>n</i>. The compact form is not updatable.
* @param k the size configuration parameter for the sketch
* @param n the number of quantiles input into the sketch
* @return the number of bytes required to store this sketch in compact form.
*/
public static int getCompactSerialiedSizeBytes(final int k, final long n) {
if (n == 0) { return 8; }
final int metaPreLongs = MAX_PRELONGS + 2; //plus min, max
return metaPreLongs + computeRetainedItems(k, n) << 3;
}
@Override
public int getSerializedSizeBytes() {
if (isCompact()) { return getCurrentCompactSerializedSizeBytes(); }
return getCurrentUpdatableSerializedSizeBytes();
}
/**
* Returns the current number of bytes this sketch would require to store in the updatable Memory Format.
* @return the current number of bytes this sketch would require to store in the updatable Memory Format.
*/
public int getCurrentUpdatableSerializedSizeBytes() {
return getUpdatableStorageBytes(getK(), getN());
}
/**
* Returns the number of bytes a sketch would require to store in updatable form.
* This uses roughly 2X the storage of the compact form
* given <i>k</i> and <i>n</i>.
* @param k the size configuration parameter for the sketch
* @param n the number of quantiles input into the sketch
* @return the number of bytes this sketch would require to store in updatable form.
*/
public static int getUpdatableStorageBytes(final int k, final long n) {
if (n == 0) { return 8; }
final int metaPre = MAX_PRELONGS + 2; //plus min, max
final int totLevels = computeNumLevelsNeeded(k, n);
if (n <= k) {
final int ceil = Math.max(ceilingPowerOf2((int)n), MIN_K * 2);
return metaPre + ceil << 3;
}
return metaPre + (2 + totLevels) * k << 3;
}
/**
* Puts the current sketch into the given Memory in compact form if there is sufficient space,
* otherwise, it throws an error.
*
* @param dstMem the given memory.
*/
public void putMemory(final WritableMemory dstMem) {
putMemory(dstMem, true);
}
/**
* Puts the current sketch into the given Memory if there is sufficient space, otherwise,
* throws an error.
*
* @param dstMem the given memory.
* @param compact if true, compacts and sorts the base buffer, which optimizes merge
* performance at the cost of slightly increased serialization time.
*/
public void putMemory(final WritableMemory dstMem, final boolean compact) {
if (hasMemory() && isCompact() == compact) {
final Memory srcMem = getMemory();
srcMem.copyTo(0, dstMem, 0, getSerializedSizeBytes());
} else {
final byte[] byteArr = toByteArray(compact);
final int arrLen = byteArr.length;
final long memCap = dstMem.getCapacity();
if (memCap < arrLen) {
throw new SketchesArgumentException(
"Destination Memory not large enough: " + memCap + " < " + arrLen);
}
dstMem.putByteArray(0, byteArr, 0, arrLen);
}
}
@Override
public QuantilesDoublesSketchIterator iterator() {
return new DoublesSketchIterator(this, getBitPattern());
}
/**
* {@inheritDoc}
* <p>The parameter <i>k</i> will not change.</p>
*/
@Override
public abstract void reset();
//Restricted
/*
* DoublesMergeImpl.downSamplingMergeInto requires the target sketch to implement update(), so
* we ensure that the target is an UpdateSketch. The public API, on the other hand, just
* specifies a DoublesSketch. This lets us be more specific about the type without changing the
* public API.
*/
UpdateDoublesSketch downSampleInternal(final DoublesSketch srcSketch, final int smallerK,
final WritableMemory dstMem) {
final UpdateDoublesSketch newSketch = dstMem == null
? HeapUpdateDoublesSketch.newInstance(smallerK)
: DirectUpdateDoublesSketch.newInstance(smallerK, dstMem);
if (srcSketch.isEmpty()) { return newSketch; }
DoublesMergeImpl.downSamplingMergeInto(srcSketch, newSketch);
return newSketch;
}
//Restricted abstract
/**
* Returns true if this sketch is compact
* @return true if this sketch is compact
*/
abstract boolean isCompact();
/**
* Returns the base buffer count
* @return the base buffer count
*/
abstract int getBaseBufferCount();
/**
* Returns the bit pattern for valid log levels
* @return the bit pattern for valid log levels
*/
abstract long getBitPattern();
/**
* Returns the capacity for the combined base buffer
* @return the capacity for the combined base buffer
*/
abstract int getCombinedBufferItemCapacity();
/**
* Returns the combined buffer, in non-compact form.
* @return the combined buffer, in non-compact form.
*/
abstract double[] getCombinedBuffer();
/**
* Gets the Memory if it exists, otherwise returns null.
* @return the Memory if it exists, otherwise returns null.
*/
abstract WritableMemory getMemory();
//************SORTED VIEW****************************
@Override
public final DoublesSketchSortedView getSortedView() {
return refreshSortedView();
}
private final DoublesSketchSortedView refreshSortedView() {
return (doublesSV == null) ? (doublesSV = getSV()) : doublesSV;
}
private DoublesSketchSortedView getSV() {
final long totalN = getN();
if (isEmpty() || (totalN == 0)) { throw new SketchesArgumentException(EMPTY_MSG); }
final int numQuantiles = getNumRetained();
final double[] svQuantiles = new double[numQuantiles];
final long[] svCumWeights = new long[numQuantiles];
final DoublesSketchAccessor sketchAccessor = DoublesSketchAccessor.wrap(this);
// Populate from DoublesSketch:
// copy over the "levels" and then the base buffer, all with appropriate weights
populateFromDoublesSketch(getK(), totalN, getBitPattern(), sketchAccessor, svQuantiles, svCumWeights);
// Sort the first "numSamples" slots of the two arrays in tandem,
// taking advantage of the already sorted blocks of length k
blockyTandemMergeSort(svQuantiles, svCumWeights, numQuantiles, getK());
if (convertToCumulative(svCumWeights) != totalN) {
throw new SketchesStateException("Sorted View is misconfigured. TotalN does not match cumWeights.");
}
return new DoublesSketchSortedView(svQuantiles, svCumWeights, this);
}
private final static void populateFromDoublesSketch(
final int k, final long totalN, final long bitPattern,
final DoublesSketchAccessor sketchAccessor,
final double[] svQuantiles, final long[] svCumWeights) {
long weight = 1;
int index = 0;
long bits = bitPattern;
assert bits == (totalN / (2L * k)); // internal consistency check
for (int lvl = 0; bits != 0L; lvl++, bits >>>= 1) {
weight <<= 1; // X2
if ((bits & 1L) > 0L) {
sketchAccessor.setLevel(lvl);
for (int i = 0; i < sketchAccessor.numItems(); i++) {
svQuantiles[index] = sketchAccessor.get(i);
svCumWeights[index] = weight;
index++;
}
}
}
weight = 1; //NOT a mistake! We just copied the highest level; now we need to copy the base buffer
final int startOfBaseBufferBlock = index;
// Copy BaseBuffer over, along with weight = 1
sketchAccessor.setLevel(BB_LVL_IDX);
for (int i = 0; i < sketchAccessor.numItems(); i++) {
svQuantiles[index] = sketchAccessor.get(i);
svCumWeights[index] = weight;
index++;
}
assert index == svQuantiles.length;
// Must sort the items that came from the base buffer.
// Don't need to sort the corresponding weights because they are all the same.
final int numSamples = index;
Arrays.sort(svQuantiles, startOfBaseBufferBlock, numSamples);
}
/**
* blockyTandemMergeSort() is an implementation of top-down merge sort specialized
* for the case where the input contains successive equal-length blocks
* that have already been sorted, so that only the top part of the
* merge tree remains to be executed. Also, two arrays are sorted in tandem,
* as discussed below.
* @param svQuantiles array of quantiles for sorted view
* @param svCumWts array for the cumulative weights (but not yet cumulative) for sorted view
* @param arrLen length of quantiles array and cumWts array
* @param blkSize size of internal sorted blocks, equal to k
*/
//used by this and UtilTest
static void blockyTandemMergeSort(final double[] svQuantiles, final long[] svCumWts, final int arrLen,
final int blkSize) {
assert blkSize >= 1;
if (arrLen <= blkSize) { return; }
int numblks = arrLen / blkSize;
if ((numblks * blkSize) < arrLen) { numblks += 1; }
assert ((numblks * blkSize) >= arrLen);
// duplication of the input arrays is preparation for the "ping-pong" copy reduction strategy.
final double[] qSrc = Arrays.copyOf(svQuantiles, arrLen);
final long[] cwSrc = Arrays.copyOf(svCumWts, arrLen);
blockyTandemMergeSortRecursion(qSrc, cwSrc,
svQuantiles, svCumWts,
0, numblks,
blkSize, arrLen);
}
/**
* blockyTandemMergeSortRecursion() is called by blockyTandemMergeSort().
* In addition to performing the algorithm's top down recursion,
* it manages the buffer swapping that eliminates most copying.
* It also maps the input's pre-sorted blocks into the subarrays
* that are processed by tandemMerge().
* @param qSrc source array of quantiles
* @param cwSrc source weights array
* @param qDst destination quantiles array
* @param cwDst destination weights array
* @param grpStart group start, refers to pre-sorted blocks such as block 0, block 1, etc.
* @param grpLen group length, refers to pre-sorted blocks such as block 0, block 1, etc.
* @param blkSize block size
* @param arrLim array limit
*/
private static void blockyTandemMergeSortRecursion(final double[] qSrc, final long[] cwSrc,
final double[] qDst, final long[] cwDst, final int grpStart, final int grpLen,
/* indices of blocks */ final int blkSize, final int arrLim) {
// Important note: grpStart and grpLen do NOT refer to positions in the underlying array.
// Instead, they refer to the pre-sorted blocks, such as block 0, block 1, etc.
assert (grpLen > 0);
if (grpLen == 1) { return; }
final int grpLen1 = grpLen / 2;
final int grpLen2 = grpLen - grpLen1;
assert (grpLen1 >= 1);
assert (grpLen2 >= grpLen1);
final int grpStart1 = grpStart;
final int grpStart2 = grpStart + grpLen1;
//swap roles of src and dst
blockyTandemMergeSortRecursion(qDst, cwDst,
qSrc, cwSrc,
grpStart1, grpLen1, blkSize, arrLim);
//swap roles of src and dst
blockyTandemMergeSortRecursion(qDst, cwDst,
qSrc, cwSrc,
grpStart2, grpLen2, blkSize, arrLim);
// here we convert indices of blocks into positions in the underlying array.
final int arrStart1 = grpStart1 * blkSize;
final int arrStart2 = grpStart2 * blkSize;
final int arrLen1 = grpLen1 * blkSize;
int arrLen2 = grpLen2 * blkSize;
// special case for the final block which might be shorter than blkSize.
if ((arrStart2 + arrLen2) > arrLim) { arrLen2 = arrLim - arrStart2; }
tandemMerge(qSrc, cwSrc,
arrStart1, arrLen1,
arrStart2, arrLen2,
qDst, cwDst,
arrStart1); // which will be arrStart3
}
/**
* Performs two merges in tandem. One of them provides the sort keys
* while the other one passively undergoes the same data motion.
* @param qSrc quantiles source
* @param cwSrc cumulative weights source
* @param arrStart1 Array 1 start offset
* @param arrLen1 Array 1 length
* @param arrStart2 Array 2 start offset
* @param arrLen2 Array 2 length
* @param qDst quantiles destination
* @param cwDst cumulative weights destination
* @param arrStart3 Array 3 start offset
*/
private static void tandemMerge(final double[] qSrc, final long[] cwSrc,
final int arrStart1, final int arrLen1,
final int arrStart2, final int arrLen2,
final double[] qDst, final long[] cwDst,
final int arrStart3) {
final int arrStop1 = arrStart1 + arrLen1;
final int arrStop2 = arrStart2 + arrLen2;
int i1 = arrStart1;
int i2 = arrStart2;
int i3 = arrStart3;
while ((i1 < arrStop1) && (i2 < arrStop2)) {
if (qSrc[i2] < qSrc[i1]) {
qDst[i3] = qSrc[i2];
cwDst[i3] = cwSrc[i2];
i2++;
} else {
qDst[i3] = qSrc[i1];
cwDst[i3] = cwSrc[i1];
i1++;
}
i3++;
}
if (i1 < arrStop1) {
arraycopy(qSrc, i1, qDst, i3, arrStop1 - i1);
arraycopy(cwSrc, i1, cwDst, i3, arrStop1 - i1);
} else {
assert i2 < arrStop2;
arraycopy(qSrc, i2, qDst, i3, arrStop2 - i2);
arraycopy(cwSrc, i2, cwDst, i3, arrStop2 - i2);
}
}
/**
* Convert the individual weights into cumulative weights.
* An array of {1,1,1,1} becomes {1,2,3,4}
* @param array of actual weights from the sketch, none of the weights may be zero
* @return total weight
*/
private static long convertToCumulative(final long[] array) {
long subtotal = 0;
for (int i = 0; i < array.length; i++) {
final long newSubtotal = subtotal + array[i];
subtotal = array[i] = newSubtotal;
}
return subtotal;
}
}