EbppsItemsSample.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.datasketches.sampling;

import static org.apache.datasketches.common.Util.LS;

import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.datasketches.common.SketchesArgumentException;

// this is a supporting class used to hold the raw data sample
final class EbppsItemsSample<T> {

  private double c_;            // Current sample size, including fractional part
  private T partialItem_;       // a sample item corresponding to a partial weight
  private ArrayList<T> data_;   // full sample items

  private Random rand_;         // ThreadLocalRandom.current() in general

  // basic constructor
  EbppsItemsSample(final int reservedSize) {
    c_ = 0.0;
    data_ = new ArrayList<>(reservedSize);
    rand_ = ThreadLocalRandom.current();
  }

  // copy constructor used during merge
  EbppsItemsSample(final EbppsItemsSample<T> other) {
    c_ = other.c_;
    partialItem_ = other.partialItem_;
    data_ = new ArrayList<>(other.data_);
    rand_ = other.rand_;
  }

  // constructor used for deserialization and testing
  // does NOT copy the incoming ArrayList since this is an internal
  // class's package-private constructor, not something directly
  // taking user data
  EbppsItemsSample(final ArrayList<T> data, final T partialItem, final double c) {
    if (c < 0.0 || Double.isNaN(c) || Double.isInfinite(c)) {
      throw new SketchesArgumentException("C must be nonnegative and finite. Found: " + c);
    }

    c_ = c;
    partialItem_ = partialItem;
    data_ = data;
    rand_ = ThreadLocalRandom.current();
  }

  // Used in lieu of a constructor to populate a temporary sample
  // with data before immediately merging it. This approach
  // avoids excessive object allocation calls.
  // rand_ is not set since it is not expected to be used from
  // this object
  void replaceContent(final T item, final double theta) {
    if (theta < 0.0 || theta > 1.0 || Double.isNaN(theta)) {
      throw new SketchesArgumentException("Theta must be in the range [0.0, 1.0]. Found: " + theta);
    }

    c_ = theta;
    if (theta == 1.0) {
      if (data_ != null && data_.size() == 1) {
        data_.set(0, item);
      } else {
        data_ = new ArrayList<>(1);
        data_.add(item);
      }
      partialItem_ = null;
    } else {
      data_ = null;
      partialItem_ = item;
    }
  }

  void reset() {
    c_ = 0.0;
    partialItem_ = null;
    data_.clear();
  }

  ArrayList<T> getSample() {
    final double cFrac = c_ % 1;
    final boolean includePartial = partialItem_ != null && rand_.nextDouble() < cFrac;
    final int resultSize = (data_ != null ? data_.size() : 0) + (includePartial ? 1 : 0);

    if (resultSize == 0) {
      return null;
    }

    final ArrayList<T> result = new ArrayList<>(resultSize);
    if (data_ != null) {
      result.addAll(data_);
    }

    if (includePartial) {
      result.add(partialItem_);
    }

    return result;
  }

  @SuppressWarnings("unchecked")
  T[] getAllSamples(final Class<?> clazz) {
    // Is it faster to use sublist and append 1?
    final T[] itemsArray = (T[]) Array.newInstance(clazz, getNumRetainedItems());
    int i = 0;
    if (data_ != null) {
      for (T item : data_) {
        if (item != null) {
          itemsArray[i++] = item;
        }
      }
    }
    if (partialItem_ != null) {
      itemsArray[i] = partialItem_; // no need to increment i again
    }

    return itemsArray;
  }

  // package-private for use in merge and serialization
  ArrayList<T> getFullItems() {
    return data_;
  }

  // package-private for use in merge and serialization
  T getPartialItem() {
    return partialItem_;
  }

  double getC() { return c_; }

  boolean hasPartialItem() { return partialItem_ != null; }

  // for testing to allow setting the seed
  void replaceRandom(final Random r) {
    rand_ = r;
  }

  void downsample(final double theta) {
    if (theta >= 1.0) { return; }

    final double newC = theta * c_;
    final double newCInt = Math.floor(newC);
    final double newCFrac = newC % 1;
    final double cInt = Math.floor(c_);
    final double cFrac = c_ % 1;

    if (newCInt == 0.0) {
      // no full items retained
      if (rand_.nextDouble() > (cFrac / c_)) {
        swapWithPartialItem();
      }
      data_.clear();
    } else if (newCInt == cInt) {
      // no items deleted
      if (rand_.nextDouble() > (1 - theta * cFrac) / (1 - newCFrac)) {
        swapWithPartialItem();
      }
    } else {
      if (rand_.nextDouble() < theta * cFrac) {
        // subsample data in random order; last item is partial
        // create sample size newC then swapWithPartialItem()
        subsample((int) newCInt);
        swapWithPartialItem();
      } else {
        // create sample size newCInt + 1 then moveOneToPartialItem()
        subsample((int) newCInt + 1);
        moveOneToPartialItem();
      }
    }

    if (newC == newCInt) {
      partialItem_ = null;
    }

    c_ = newC;
  }

  void merge(final EbppsItemsSample<T> other) {
    //double cInt = Math.floor(c_);
    final double cFrac = c_ % 1;
    final double otherCFrac = other.c_ % 1;

    // update c_ here but do NOT recompute fractional part yet
    c_ += other.c_;

    if (other.data_ != null) {
      data_.addAll(other.data_);
    }

    // This modifies the original algorithm slightly due to numeric
    // precision issues. Specifically, the test if cFrac + otherCFrac == 1.0
    // happens before tests for < 1.0 or > 1.0 and can also be triggered
    // if c_ == floor(c_) (the updated value of c_, not the input).
    //
    // We can still run into issues where cFrac + otherCFrac == epsilon
    // and the first case would have ideally triggered. As a result, we must
    // check if the partial item exists before adding to the data_ vector.

    if (cFrac == 0.0 && otherCFrac == 0.0) {
      partialItem_ = null;
    } else if (cFrac + otherCFrac == 1.0 || c_ == Math.floor(c_)) {
      if (rand_.nextDouble() <= cFrac) {
        if (partialItem_ != null) {
          data_.add(partialItem_);
        }
      } else {
        if (other.partialItem_ != null) {
          data_.add(other.partialItem_);
        }
      }
      partialItem_ = null;
    } else if (cFrac + otherCFrac < 1.0) {
      if (rand_.nextDouble() > cFrac / (cFrac + otherCFrac)) {
        partialItem_ = other.partialItem_;
      }
    } else { // cFrac + otherCFrac > 1
      if (rand_.nextDouble() <= (1 - cFrac) / ((1 - cFrac) + (1 - otherCFrac))) {
        data_.add(other.partialItem_);
      } else {
        data_.add(partialItem_);
        partialItem_ = other.partialItem_;
      }
    }
  }

  @Override
  public String toString() {
    final StringBuilder sb = new StringBuilder();

    sb.append("  sample:").append(LS);
    int idx = 0;
    for (T item : data_) {
      sb.append("\t").append(idx++).append(":\t").append(item.toString()).append(LS);
    }
    sb.append("  partial: ");
    if (partialItem_ != null) {
      sb.append(partialItem_).append(LS);
    } else {
      sb.append("NULL").append(LS);
    }

    return sb.toString();
  }

  void subsample(final int numSamples) {
    // we can perform a Fisher-Yates style shuffle, stopping after
    // numSamples points since subsequent swaps would only be
    // between items after num_samples. This is valid since a
    // point from anywhere in the initial array would be eligible
    // to end up in the final subsample.

    if (numSamples == data_.size()) { return; }

    final int dataLen = data_.size();
    for (int i = 0; i < numSamples; ++i) {
      final int j = i + rand_.nextInt(dataLen - i);
      // swap i and j
      final T tmp = data_.get(i);
      data_.set(i, data_.get(j));
      data_.set(j, tmp);
    }

    // clear anything beyond numSamples
    data_.subList(numSamples, data_.size()).clear();
  }

  void swapWithPartialItem() {
    if (partialItem_ == null) {
      moveOneToPartialItem();
    } else {
      final int idx = rand_.nextInt(data_.size());
      final T tmp = partialItem_;
      partialItem_ = data_.get(idx);
      data_.set(idx, tmp);
    }
  }

  void moveOneToPartialItem() {
    final int idx = rand_.nextInt(data_.size());
    // swap selected item to end so we can delete it easily
    final int lastIdx = data_.size() - 1;
    if (idx != lastIdx) {
      final T tmp = data_.get(idx);
      data_.set(idx, data_.get(lastIdx));
      partialItem_ = tmp;
    } else {
      partialItem_ = data_.get(lastIdx);
    }

    data_.remove(lastIdx);
  }

  int getNumRetainedItems() {
    return (data_ != null ? data_.size() : 0)
         + (partialItem_ != null ? 1 : 0);
  }
}