TestRemoteIterators.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.util.functional;

import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.util.Preconditions;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.test.AbstractHadoopTestBase;

import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.util.functional.RemoteIterators.*;
import static org.apache.hadoop.util.functional.RemoteIterators.haltableRemoteIterator;
import static org.assertj.core.api.Assertions.assertThat;

/**
 * Test for {@link RemoteIterators}.
 *
 */
public class TestRemoteIterators extends AbstractHadoopTestBase {

  private static final Logger LOG = LoggerFactory.getLogger(
      TestRemoteIterators.class);

  private static final String[] DATA = {"a", "b", "c"};

  /** Counter for lambda-expressions. */
  private int counter;

  @Test
  public void testIterateArray() throws Throwable {
    verifyInvoked(remoteIteratorFromArray(DATA), DATA.length,
        (s) -> LOG.info(s));
  }

  @Test
  public void testIterateArrayMapped() throws Throwable {
    verifyInvoked(
        mappingRemoteIterator(
            remoteIteratorFromArray(DATA),
            (d) -> {
              counter += d.length();
              return d;
            }),
        DATA.length,
        this::log);
    assertCounterValue(3);
  }

  public void log(Object o) {
    LOG.info("{}", o);
  }

  /**
   * Singleton is iterated through once.
   * The toString() call is passed through.
   */
  @Test
  public void testSingleton() throws Throwable {
    StringBuilder result = new StringBuilder();
    String name = "singleton";
    RemoteIterator<String> it = remoteIteratorFromSingleton(name);
    assertStringValueContains(it, "SingletonIterator");
    assertStringValueContains(it, name);
    verifyInvoked(
        it,
        1,
        (s) -> result.append(s));
    assertThat(result.toString())
        .isEqualTo(name);
  }

  @Test
  public void testSingletonNotClosed() throws Throwable {
    CloseCounter closeCounter = new CloseCounter();
    RemoteIterator<CloseCounter> it = remoteIteratorFromSingleton(closeCounter);
    verifyInvoked(it, 1, this::log);
    close(it);
    closeCounter.assertCloseCount(0);
  }

  /**
   * A null singleton is not an error.
   */
  @Test
  public void testNullSingleton() throws Throwable {
    verifyInvoked(remoteIteratorFromSingleton(null), 0, this::log);
  }


  /**
   * If you create a singleton iterator and it is an IOStatisticsSource,
   * then that is the statistics which can be extracted from the
   * iterator.
   */
  @Test
  public void testSingletonStats() throws Throwable {
    IOStatsInstance singleton = new IOStatsInstance();
    RemoteIterator<IOStatsInstance> it
        = remoteIteratorFromSingleton(singleton);
    extractStatistics(it);
  }

  /**
   * The mapping remote iterator passes IOStatistics
   * calls down.
   */
  @Test
  public void testMappedSingletonStats() throws Throwable {
    IOStatsInstance singleton = new IOStatsInstance();
    RemoteIterator<String> it
        = mappingRemoteIterator(remoteIteratorFromSingleton(singleton),
        Object::toString);
    verifyInvoked(it, 1, this::log);
    extractStatistics(it);
  }

  /**
   * Close() calls are passed through.
   */
  @Test
  public void testClosePassthrough() throws Throwable {
    CountdownRemoteIterator countdown = new CountdownRemoteIterator(0);
    RemoteIterator<Integer> it = mappingRemoteIterator(
        countdown,
        i -> i);
    verifyInvoked(it, 0, this::log);
    // the foreach() operation called close()
    countdown.assertCloseCount(1);
    extractStatistics(countdown);
    ((Closeable)it).close();
    countdown.assertCloseCount(1);
  }

  @Test
  public void testMapping() throws Throwable {
    CountdownRemoteIterator countdown = new CountdownRemoteIterator(100);
    RemoteIterator<Integer> it = mappingRemoteIterator(
        countdown,
        i -> i);
    verifyInvoked(it, 100, c -> counter++);
    assertCounterValue(100);
    extractStatistics(it);
    assertStringValueContains(it, "CountdownRemoteIterator");
    close(it);
    countdown.assertCloseCount(1);
  }

  @Test
  public void testFiltering() throws Throwable {
    CountdownRemoteIterator countdown = new CountdownRemoteIterator(100);
    // only even numbers are passed through
    RemoteIterator<Integer> it = filteringRemoteIterator(
        countdown,
        i -> (i % 2) == 0);
    verifyInvoked(it, 50, c -> counter++);
    assertCounterValue(50);
    extractStatistics(it);
    close(it);
    countdown.assertCloseCount(1);
  }

  /**
   * A filter which accepts nothing results in
   * an empty iteration.
   */
  @Test
  public void testFilterNoneAccepted() throws Throwable {
    // nothing gets through
    RemoteIterator<Integer> it = filteringRemoteIterator(
        new CountdownRemoteIterator(100),
        i -> false);
    verifyInvoked(it, 0, c -> counter++);
    assertCounterValue(0);
    extractStatistics(it);
  }

  @Test
  public void testFilterAllAccepted() throws Throwable {
    // nothing gets through
    RemoteIterator<Integer> it = filteringRemoteIterator(
        new CountdownRemoteIterator(100),
        i -> true);
    verifyInvoked(it, 100, c -> counter++);
    assertStringValueContains(it, "CountdownRemoteIterator");
  }

  @Test
  public void testJavaIteratorSupport() throws Throwable {
    CountdownIterator countdownIterator = new CountdownIterator(100);
    RemoteIterator<Integer> it = remoteIteratorFromIterator(
        countdownIterator);
    verifyInvoked(it, 100, c -> counter++);
    assertStringValueContains(it, "CountdownIterator");
    extractStatistics(it);
    close(it);
    countdownIterator.assertCloseCount(1);
  }

  @Test
  public void testJavaIterableSupport() throws Throwable {
    CountdownIterable countdown = new CountdownIterable(100);
    RemoteIterator<Integer> it = remoteIteratorFromIterable(
        countdown);
    verifyInvoked(it, 100, c -> counter++);
    assertStringValueContains(it, "CountdownIterator");
    extractStatistics(it);
    // close the iterator
    close(it);
    countdown.assertCloseCount(0);
    // and a new iterator can be crated
    verifyInvoked(remoteIteratorFromIterable(countdown),
        100, c -> counter++);
  }

  /**
   * If a RemoteIterator is constructed from an iterable
   * and that is to be closed, we close it.
   */
  @Test
  public void testJavaIterableClose() throws Throwable {
    CountdownIterable countdown = new CountdownIterable(100);
    RemoteIterator<Integer> it = closingRemoteIterator(
        remoteIteratorFromIterable(countdown),
        countdown);
    verifyInvoked(it, 100, c -> counter++);
    assertStringValueContains(it, "CountdownIterator");
    extractStatistics(it);

    // verify the iterator was self closed in hasNext()
    countdown.assertCloseCount(1);

    // explicitly close the iterator
    close(it);
    countdown.assertCloseCount(1);
    // and a new iterator cannot be created
    intercept(IllegalStateException.class, () ->
        remoteIteratorFromIterable(countdown));
  }

  /**
   * If a RemoteIterator is constructed from an iterable
   * and that is to be closed, we close it.
   */
  @SuppressWarnings("InfiniteLoopStatement")
  @Test
  public void testJavaIterableCloseInNextLoop() throws Throwable {
    CountdownIterable countdown = new CountdownIterable(100);
    RemoteIterator<Integer> it = closingRemoteIterator(
        remoteIteratorFromIterable(countdown),
        countdown);
    try {
      while(true) {
        it.next();
      }
    } catch (NoSuchElementException expected) {

    }
    // verify the iterator was self closed in next()
    countdown.assertCloseCount(1);

  }

  @Test
  public void testHaltableIterator() throws Throwable {
    final int limit = 4;
    AtomicInteger count = new AtomicInteger(limit);

    // a countdown of 10, but the halting predicate will fail earlier
    // if the value of "count" has dropped to zero
    final RemoteIterator<Long> it =
        haltableRemoteIterator(
            rangeExcludingIterator(0, 10),
            () -> count.get() > 0);

    verifyInvoked(it, limit, (v) -> count.decrementAndGet());
  }

  @Test
  public void testHaltableIteratorNoHalt() throws Throwable {

    // a countdown of 10, but the halting predicate will fail earlier
    // if the value of "count" has dropped to zero
    final int finish = 10;
    final RemoteIterator<Long> it =
        haltableRemoteIterator(
            rangeExcludingIterator(0, finish),
            () -> true);

    verifyInvoked(it, finish);
  }

  @Test
  public void testRangeExcludingIterator() throws Throwable {
    verifyInvoked(rangeExcludingIterator(0, 0), 0);
    verifyInvoked(rangeExcludingIterator(0, -1), 0);
    verifyInvoked(rangeExcludingIterator(0, 100), 100);
    intercept(NoSuchElementException.class, () ->
        rangeExcludingIterator(0, 0).next());
  }

  /**
   * assert that the string value of an object contains the
   * expected text.
   * @param o object
   * @param expected  expected text
   */
  protected void assertStringValueContains(
      final Object o,
      final String expected) {
    assertThat(o.toString())
        .describedAs("Object string value")
        .contains(expected);
  }

  /**
   * Assert that the counter field is at a specific value.
   * @param expected counter
   */
  protected void assertCounterValue(final int expected) {
    assertThat(counter)
        .describedAs("Counter value")
        .isEqualTo(expected);
  }

  /**
   * Verify that the iteration completes with a given size.
   * @param it iterator
   * @param <T> type.
   * @param length expected size
   * @param consumer consumer
   */
  protected <T> void verifyInvoked(final RemoteIterator<T> it,
      int length,
      ConsumerRaisingIOE<T> consumer)
      throws IOException {
    assertThat(foreach(it, consumer))
        .describedAs("Scan through iterator %s", it)
        .isEqualTo(length);
  }

  /**
   * Verify that the iteration completes with a given invocation count.
   * @param it iterator
   * @param <T> type.
   * @param length expected size
   */
  protected <T> void verifyInvoked(
      final RemoteIterator<T> it,
      final int length)
        throws IOException {
    verifyInvoked(it, length, (t) -> { });

  }
  /**
   * Close an iterator if it is iterable.
   * @param it iterator
   * @param <T> type.
   */
  private <T> void close(final RemoteIterator<T> it) throws IOException {
    if (it instanceof Closeable) {
      ((Closeable) it).close();
    }
  }

  /**
   * Class whose close() call increments a counter.
   */
  private static class CloseCounter extends
      IOStatsInstance implements Closeable {

    private int closeCount;

    @Override
    public void close() throws IOException {
      closeCount++;
      LOG.info("close ${}", closeCount);
    }

    public int getCloseCount() {
      return closeCount;
    }

    public void reset() {
      closeCount = 0;
    }

    public void assertCloseCount(int expected) {
      assertThat(closeCount)
          .describedAs("Close count")
          .isEqualTo(expected);
    }

  }

  /**
   * Simple class to implement IOStatistics.
   */
  private static class IOStatsInstance implements IOStatisticsSource {

    private IOStatisticsSnapshot stats = new IOStatisticsSnapshot();

    @Override
    public IOStatistics getIOStatistics() {
      return stats;
    }

  }

  /**
   * Iterator which counts down.
   */
  private static final class CountdownRemoteIterator extends CloseCounter
      implements RemoteIterator<Integer> {

    private int limit;

    private CountdownRemoteIterator(final int limit) {
      this.limit = limit;
    }

    @Override
    public boolean hasNext() throws IOException {
      return limit > 0;
    }

    @Override
    public Integer next() throws IOException {
      return limit--;
    }

    @Override
    public String toString() {
      return "CountdownRemoteIterator{" +
          "limit=" + limit +
          '}';
    }
  }

  /**
   * Iterator which counts down.
   */
  private static final class CountdownIterator extends CloseCounter
      implements Iterator<Integer> {

    private int limit;

    private CountdownIterator(final int limit) {
      this.limit = limit;
    }

    @Override
    public boolean hasNext() {
      return limit > 0;
    }

    @Override
    public Integer next() {
      if (!hasNext()) {
        throw new NoSuchElementException("limit reached");
      }
      return limit--;
    }

    @Override
    public String toString() {
      return "CountdownIterator{" +
          "limit=" + limit +
          '}';
    }
  }

  /**
   * Iterable for countdown iterators.
   * Once closed, calls to iterator() raise an exception.
   */
  private static final class CountdownIterable extends CloseCounter
      implements Iterable<Integer> {

    private int limit;

    private CountdownIterable(final int limit) {
      this.limit = limit;
    }

    @Override
    public Iterator<Integer> iterator() {
      Preconditions.checkState(getCloseCount() == 0);

      return new CountdownIterator(limit);
    }
  }

}