StreamPumperTest.java
/********************************************************************************
* CruiseControl, a Continuous Integration Toolkit
* Copyright (c) 2003, ThoughtWorks, Inc.
* 651 W Washington Ave. Suite 500
* Chicago, IL 60661 USA
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* + Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* + Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided
* with the distribution.
*
* + Neither the name of ThoughtWorks, Inc., CruiseControl, nor the
* names of its contributors may be used to endorse or promote
* products derived from this software without specific prior
* written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
********************************************************************************/
package org.codehaus.plexus.util.cli;
/*
* Copyright The Codehaus Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* <p>StreamPumperTest class.</p>
*
* @author <a href="mailto:pj@thoughtworks.com">Paul Julius</a>
* @version $Id: $Id
* @since 3.4.0
*/
class StreamPumperTest {
private final String lineSeparator = System.lineSeparator();
/**
* <p>testPumping.</p>
*/
@Test
void pumping() {
String line1 = "line1";
String line2 = "line2";
String lines = line1 + "\n" + line2;
ByteArrayInputStream inputStream = new ByteArrayInputStream(lines.getBytes());
TestConsumer consumer = new TestConsumer();
StreamPumper pumper = new StreamPumper(inputStream, consumer);
new Thread(pumper).run();
// Check the consumer to see if it got both lines.
assertTrue(consumer.wasLineConsumed(line1, 1000));
assertTrue(consumer.wasLineConsumed(line2, 1000));
}
/**
* <p>testPumpingWithPrintWriter.</p>
*/
@Test
void pumpingWithPrintWriter() {
String inputString = "This a test string";
ByteArrayInputStream bais = new ByteArrayInputStream(inputString.getBytes());
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
StreamPumper pumper = new StreamPumper(bais, pw);
pumper.run();
pumper.flush();
System.out.println("aaa" + sw);
assertEquals("This a test string" + lineSeparator, sw.toString());
pumper.close();
}
/**
* <p>testPumperReadsInputStreamUntilEndEvenIfConsumerFails.</p>
*/
@Test
void pumperReadsInputStreamUntilEndEvenIfConsumerFails() {
// the number of bytes generated should surely exceed the read buffer used by the pumper
GeneratorInputStream gis = new GeneratorInputStream(1024 * 1024 * 4);
StreamPumper pumper = new StreamPumper(gis, new FailingConsumer());
pumper.run();
assertEquals(gis.size, gis.read, "input stream was not fully consumed, producer deadlocks");
assertTrue(gis.closed);
assertNotNull(pumper.getException());
}
static class GeneratorInputStream extends InputStream {
final int size;
int read = 0;
boolean closed = false;
public GeneratorInputStream(int size) {
this.size = size;
}
public int read() throws IOException {
if (read < size) {
read++;
return '\n';
} else {
return -1;
}
}
public void close() throws IOException {
closed = true;
}
}
static class FailingConsumer implements StreamConsumer {
public void consumeLine(String line) {
throw new NullPointerException("too bad, the consumer is badly implemented...");
}
}
/**
* Used by the test to track whether a line actually got consumed or not.
*/
static class TestConsumer implements StreamConsumer {
private final List<String> lines = new ArrayList<>();
/**
* Checks to see if this consumer consumed a particular line. This method will wait up to timeout number of
* milliseconds for the line to get consumed.
*
* @param testLine Line to test for.
* @param timeout Number of milliseconds to wait for the line.
* @return true if the line gets consumed, else false.
*/
public boolean wasLineConsumed(String testLine, long timeout) {
long start = System.currentTimeMillis();
long trialTime = 0;
do {
if (lines.contains(testLine)) {
return true;
}
// Sleep a bit.
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// ignoring...
}
// How long have been waiting for the line?
trialTime = System.currentTimeMillis() - start;
} while (trialTime < timeout);
// If we got here, then the line wasn't consumed within the timeout
return false;
}
public void consumeLine(String line) {
lines.add(line);
}
}
/**
* <p>testEnabled.</p>
*/
@Test
void enabled() {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream("AB\nCE\nEF".getBytes());
TestConsumer streamConsumer = new TestConsumer();
StreamPumper streamPumper = new StreamPumper(byteArrayInputStream, streamConsumer);
streamPumper.run();
assertEquals(3, streamConsumer.lines.size());
}
/**
* <p>testDisabled.</p>
*/
@Test
void disabled() {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream("AB\nCE\nEF".getBytes());
TestConsumer streamConsumer = new TestConsumer();
StreamPumper streamPumper = new StreamPumper(byteArrayInputStream, streamConsumer);
streamPumper.disable();
streamPumper.run();
assertEquals(0, streamConsumer.lines.size());
}
}