Union.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.datasketches.tuple;

import static java.lang.Math.min;

import org.apache.datasketches.common.SketchesArgumentException;
import org.apache.datasketches.thetacommon.QuickSelect;
import org.apache.datasketches.thetacommon.ThetaUtil;

/**
 * Compute the union of two or more generic tuple sketches or generic tuple sketches combined with
 * theta sketches. A new instance represents an empty set.
 * @param <S> Type of Summary
 */
public class Union<S extends Summary> {
  private final SummarySetOperations<S> summarySetOps_;
  private QuickSelectSketch<S> qsk_;
  private long unionThetaLong_; // need to maintain outside of the sketch
  private boolean empty_;

  /**
   * Creates new Union instance with instructions on how to process two summaries that
   * overlap. This will have the default nominal entries (K).
   * @param summarySetOps instance of SummarySetOperations
   */
  public Union(final SummarySetOperations<S> summarySetOps) {
    this(ThetaUtil.DEFAULT_NOMINAL_ENTRIES, summarySetOps);
  }

  /**
   * Creates new Union instance.
   * @param nomEntries nominal entries (K). Forced to the nearest power of 2 greater than
   * given value.
   * @param summarySetOps instance of SummarySetOperations
   */
  public Union(final int nomEntries, final SummarySetOperations<S> summarySetOps) {
    summarySetOps_ = summarySetOps;
    qsk_ = new QuickSelectSketch<>(nomEntries, null);
    unionThetaLong_ = qsk_.getThetaLong();
    empty_ = true;
  }

  /**
   * Perform a stateless, pair-wise union operation between two tuple sketches.
   * The returned sketch will be cut back to the smaller of the two k values if required.
   *
   * <p>Nulls and empty sketches are ignored.</p>
   *
   * @param tupleSketchA The first argument
   * @param tupleSketchB The second argument
   * @return the result ordered CompactSketch on the heap.
   */
  public CompactSketch<S> union(final Sketch<S> tupleSketchA, final Sketch<S> tupleSketchB) {
    reset();
    union(tupleSketchA);
    union(tupleSketchB);
    final CompactSketch<S> csk = getResult(true);
    return csk;
  }

  /**
   * Perform a stateless, pair-wise union operation between a tupleSketch and a thetaSketch.
   * The returned sketch will be cut back to the smaller of the two k values if required.
   *
   * <p>Nulls and empty sketches are ignored.</p>
   *
   * @param tupleSketch The first argument
   * @param thetaSketch The second argument
   * @param summary the given proxy summary for the theta sketch, which doesn't have one.
   * This may not be null.
   * @return the result ordered CompactSketch on the heap.
   */
  public CompactSketch<S> union(final Sketch<S> tupleSketch,
      final org.apache.datasketches.theta.Sketch thetaSketch, final S summary) {
    reset();
    union(tupleSketch);
    union(thetaSketch, summary);
    final CompactSketch<S> csk = getResult(true);
    return csk;
  }

  /**
   * Performs a stateful union of the internal set with the given tupleSketch.
   * @param tupleSketch input tuple sketch to merge with the internal set.
   *
   * <p>Nulls and empty sketches are ignored.</p>
   */
  public void union(final Sketch<S> tupleSketch) {
    if (tupleSketch == null || tupleSketch.isEmpty()) { return; }
    empty_ = false;
    unionThetaLong_ = min(tupleSketch.thetaLong_, unionThetaLong_);
    final TupleSketchIterator<S> it = tupleSketch.iterator();
    while (it.next()) {
      qsk_.merge(it.getHash(), it.getSummary(), summarySetOps_);
    }
    unionThetaLong_ = min(unionThetaLong_, qsk_.thetaLong_);
  }

  /**
   * Performs a stateful union of the internal set with the given thetaSketch by combining entries
   * using the hashes from the theta sketch and summary values from the given summary.
   * @param thetaSketch the given theta sketch input. If null or empty, it is ignored.
   * @param summary the given proxy summary for the theta sketch, which doesn't have one. This may
   * not be null.
   */
  public void union(final org.apache.datasketches.theta.Sketch thetaSketch, final S summary) {
    if (summary == null) {
      throw new SketchesArgumentException("Summary cannot be null."); }
    if (thetaSketch == null || thetaSketch.isEmpty()) { return; }
    empty_ = false;
    final long thetaIn = thetaSketch.getThetaLong();
    unionThetaLong_ = min(thetaIn, unionThetaLong_);
    final org.apache.datasketches.theta.HashIterator it = thetaSketch.iterator();
    while (it.next()) {
      qsk_.merge(it.get(), summary, summarySetOps_); //copies summary
    }
    unionThetaLong_ = min(unionThetaLong_, qsk_.thetaLong_);
  }

  /**
   * Gets the result of a sequence of stateful <i>union</i> operations as an unordered CompactSketch
   * @return result of the stateful unions so far. The state of this operation is not reset after the
   * result is returned.
   */
  public CompactSketch<S> getResult() {
    return getResult(false);
  }

  /**
   * Gets the result of a sequence of stateful <i>union</i> operations as an unordered CompactSketch.
   * @param reset If <i>true</i>, clears this operator to the empty state after this result is
   * returned. Set this to <i>false</i> if you wish to obtain an intermediate result.
   * @return result of the stateful union
   */
  @SuppressWarnings("unchecked")
  public CompactSketch<S> getResult(final boolean reset) {
    final CompactSketch<S> result;
    if (empty_) {
      result = qsk_.compact();
    } else if (unionThetaLong_ >= qsk_.thetaLong_ && qsk_.getRetainedEntries() <= qsk_.getNominalEntries()) {
      //unionThetaLong_ >= qsk_.thetaLong_ means we can ignore unionThetaLong_. We don't need to rebuild.
      //qsk_.getRetainedEntries() <= qsk_.getNominalEntries() means we don't need to pull back to k.
      result = qsk_.compact();
    } else {
      final long tmpThetaLong = min(unionThetaLong_, qsk_.thetaLong_);

      //count the number of valid hashes in because Alpha can have dirty values
      int numHashesIn = 0;
      TupleSketchIterator<S> it = qsk_.iterator();
      while (it.next()) { //counts valid hashes
        if (it.getHash() < tmpThetaLong) { numHashesIn++; }
      }

      if (numHashesIn == 0) {
        //numHashes == 0 && empty == false means Theta < 1.0
        //Therefore, this is a degenerate sketch: theta < 1.0, count = 0, empty = false
        result = new CompactSketch<>(null, null, tmpThetaLong, empty_);
      }

      else {
        //we know: empty == false, count > 0
        final int numHashesOut;
        final long thetaLongOut;
        if (numHashesIn > qsk_.getNominalEntries()) {
          //we need to trim hashes and need a new thetaLong
          final long[] tmpHashArr = new long[numHashesIn]; // temporary, order will be destroyed by quick select
          it = qsk_.iterator();
          int i = 0;
          while (it.next()) {
            final long hash = it.getHash();
            if (hash < tmpThetaLong) { tmpHashArr[i++] = hash; }
          }
          numHashesOut = qsk_.getNominalEntries();
          thetaLongOut = QuickSelect.select(tmpHashArr, 0, numHashesIn - 1, numHashesOut);
        } else {
          numHashesOut = numHashesIn;
          thetaLongOut = tmpThetaLong;
        }
        //now prepare the output arrays
        final long[] hashArr = new long[numHashesOut];
        final S[] summaries = Util.newSummaryArray(qsk_.getSummaryTable(), numHashesOut);
        it = qsk_.iterator();
        int i = 0;
        while (it.next()) { //select the qualifying hashes from the gadget synchronized with the summaries
          final long hash = it.getHash();
          if (hash < thetaLongOut) {
            hashArr[i] = hash;
            summaries[i] = (S) it.getSummary().copy();
            i++;
          }
        }
        result = new CompactSketch<>(hashArr, summaries, thetaLongOut, empty_);
      }
    }
    if (reset) { reset(); }
    return result;
  }

  /**
   * Resets the internal set to the initial state, which represents an empty set. This is only useful
   * after sequences of stateful union operations.
   */
  public void reset() {
    qsk_.reset();
    unionThetaLong_ = qsk_.getThetaLong();
    empty_ = true;
  }
}