ThrottledInputStream.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.commons.io.input;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* Provides bandwidth throttling on an InputStream as a filter input stream. The throttling examines the number of bytes read from the underlying InputStream,
* and sleeps for a time interval if the byte-transfer is found to exceed the specified maximum rate. Thus, while the read-rate might exceed the maximum for a
* short interval, the average tends towards the specified maximum, overall.
* <p>
* To build an instance, call {@link #builder()}.
* </p>
* <p>
* Inspired by Apache HBase's class of the same name.
* </p>
*
* @see Builder
* @since 2.16.0
*/
public final class ThrottledInputStream extends CountingInputStream {
// @formatter:off
/**
* Builds a new {@link ThrottledInputStream}.
*
* <h2>Using NIO</h2>
* <pre>{@code
* ThrottledInputStream in = ThrottledInputStream.builder()
* .setPath(Paths.get("MyFile.xml"))
* .setMaxBytes(100_000, ChronoUnit.SECONDS)
* .get();
* }
* </pre>
* <h2>Using IO</h2>
* <pre>{@code
* ThrottledInputStream in = ThrottledInputStream.builder()
* .setFile(new File("MyFile.xml"))
* .setMaxBytes(100_000, ChronoUnit.SECONDS)
* .get();
* }
* </pre>
* <pre>{@code
* ThrottledInputStream in = ThrottledInputStream.builder()
* .setInputStream(inputStream)
* .setMaxBytes(100_000, ChronoUnit.SECONDS)
* .get();
* }
* </pre>
*
* @see #get()
*/
// @formatter:on
public static class Builder extends AbstractBuilder<ThrottledInputStream, Builder> {
/**
* Effectively not throttled.
*/
private double maxBytesPerSecond = Double.MAX_VALUE;
/**
* Constructs a new builder of {@link ThrottledInputStream}.
*/
public Builder() {
// empty
}
/**
* Builds a new {@link ThrottledInputStream}.
* <p>
* You must set an aspect that supports {@link #getInputStream()}, otherwise, this method throws an exception.
* </p>
* <p>
* This builder uses the following aspects:
* </p>
* <ul>
* <li>{@link #getInputStream()} gets the target aspect.</li>
* <li>maxBytesPerSecond</li>
* </ul>
*
* @return a new instance.
* @throws IllegalStateException if the {@code origin} is {@code null}.
* @throws UnsupportedOperationException if the origin cannot be converted to an {@link InputStream}.
* @throws IOException if an I/O error occurs converting to an {@link InputStream} using {@link #getInputStream()}.
* @see #getInputStream()
* @see #getUnchecked()
*/
@Override
public ThrottledInputStream get() throws IOException {
return new ThrottledInputStream(this);
}
// package private for testing.
double getMaxBytesPerSecond() {
return maxBytesPerSecond;
}
/**
* Sets the maximum bytes per time period unit.
* <p>
* For example, to throttle reading to 100K per second, use:
* </p>
* <pre>
* builder.setMaxBytes(100_000, ChronoUnit.SECONDS)
* </pre>
* <p>
* To test idle timeouts for example, use 1 byte per minute, 1 byte per 30 seconds, and so on.
* </p>
*
* @param value the maximum bytes
* @param chronoUnit a duration scale goal.
* @return this instance.
* @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0.
* @since 2.19.0
*/
public Builder setMaxBytes(final long value, final ChronoUnit chronoUnit) {
setMaxBytes(value, chronoUnit.getDuration());
return asThis();
}
/**
* Sets the maximum bytes per duration.
* <p>
* For example, to throttle reading to 100K per second, use:
* </p>
* <pre>
* builder.setMaxBytes(100_000, Duration.ofSeconds(1))
* </pre>
* <p>
* To test idle timeouts for example, use 1 byte per minute, 1 byte per 30 seconds, and so on.
* </p>
*
* @param value the maximum bytes
* @param duration a duration goal.
* @return this instance.
* @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0.
*/
// Consider making public in the future
Builder setMaxBytes(final long value, final Duration duration) {
setMaxBytesPerSecond((double) Objects.requireNonNull(duration, "duration").toMillis() / 1_000 * value);
return asThis();
}
/**
* Sets the maximum bytes per second.
*
* @param maxBytesPerSecond the maximum bytes per second.
* @return this instance.
* @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0.
*/
private Builder setMaxBytesPerSecond(final double maxBytesPerSecond) {
if (maxBytesPerSecond <= 0) {
throw new IllegalArgumentException("Bandwidth " + maxBytesPerSecond + " must be > 0.");
}
this.maxBytesPerSecond = maxBytesPerSecond;
return asThis();
}
/**
* Sets the maximum bytes per second.
*
* @param maxBytesPerSecond the maximum bytes per second.
* @throws IllegalArgumentException Thrown if maxBytesPerSecond <= 0.
*/
public void setMaxBytesPerSecond(final long maxBytesPerSecond) {
setMaxBytesPerSecond((double) maxBytesPerSecond);
// TODO 3.0
// return asThis();
}
}
/**
* Constructs a new {@link Builder}.
*
* @return a new {@link Builder}.
*/
public static Builder builder() {
return new Builder();
}
// package private for testing
static long toSleepMillis(final long bytesRead, final long elapsedMillis, final double maxBytesPerSec) {
if (bytesRead <= 0 || maxBytesPerSec <= 0 || elapsedMillis == 0) {
return 0;
}
// We use this class to load the single source file, so the bytesRead
// and maxBytesPerSec aren't greater than Double.MAX_VALUE.
// We can get the precise sleep time by using the double value.
final long millis = (long) (bytesRead / maxBytesPerSec * 1000 - elapsedMillis);
if (millis <= 0) {
return 0;
}
return millis;
}
private final double maxBytesPerSecond;
private final long startTime = System.currentTimeMillis();
private Duration totalSleepDuration = Duration.ZERO;
private ThrottledInputStream(final Builder builder) throws IOException {
super(builder);
if (builder.maxBytesPerSecond <= 0) {
throw new IllegalArgumentException("Bandwidth " + builder.maxBytesPerSecond + " is invalid.");
}
this.maxBytesPerSecond = builder.maxBytesPerSecond;
}
@Override
protected void beforeRead(final int n) throws IOException {
throttle();
}
/**
* Gets the read-rate from this stream, since creation. Calculated as bytesRead/elapsedTimeSinceStart.
*
* @return Read rate, in bytes/sec.
*/
private long getBytesPerSecond() {
final long elapsedSeconds = (System.currentTimeMillis() - startTime) / 1000;
if (elapsedSeconds == 0) {
return getByteCount();
}
return getByteCount() / elapsedSeconds;
}
// package private for testing.
double getMaxBytesPerSecond() {
return maxBytesPerSecond;
}
private long getSleepMillis() {
return toSleepMillis(getByteCount(), System.currentTimeMillis() - startTime, maxBytesPerSecond);
}
/**
* Gets the total duration spent in sleep.
*
* @return Duration spent in sleep.
*/
// package private for testing
Duration getTotalSleepDuration() {
return totalSleepDuration;
}
private void throttle() throws InterruptedIOException {
final long sleepMillis = getSleepMillis();
if (sleepMillis > 0) {
totalSleepDuration = totalSleepDuration.plus(sleepMillis, ChronoUnit.MILLIS);
try {
TimeUnit.MILLISECONDS.sleep(sleepMillis);
} catch (final InterruptedException e) {
throw new InterruptedIOException("Thread aborted");
}
}
}
/** {@inheritDoc} */
@Override
public String toString() {
return "ThrottledInputStream[bytesRead=" + getByteCount() + ", maxBytesPerSec=" + maxBytesPerSecond + ", bytesPerSec=" + getBytesPerSecond()
+ ", totalSleepDuration=" + totalSleepDuration + ']';
}
}