DelayedCachedOutputStreamCleanerTest.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cxf.io;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cxf.Bus;
import org.apache.cxf.bus.extension.ExtensionManagerBus;
import org.junit.After;
import org.junit.Test;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
public class DelayedCachedOutputStreamCleanerTest {
private Bus bus;
@After
public void tearDown() {
if (bus != null) {
bus.shutdown(true);
bus = null;
}
}
@Test
public void testNoop() {
final Map<String, Object> properties = Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 0);
bus = new ExtensionManagerBus(new HashMap<>(), properties);
final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class);
assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */
assertNoopCleaner(cleaner);
}
@Test
public void testForceClean() throws InterruptedException {
bus = new ExtensionManagerBus();
final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class);
assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class));
final AtomicBoolean latch = new AtomicBoolean(false);
final Closeable closeable = () -> latch.compareAndSet(false, true);
cleaner.register(closeable);
final DelayedCachedOutputStreamCleaner delayedCleaner = (DelayedCachedOutputStreamCleaner) cleaner;
delayedCleaner.forceClean();
// Await for Closeable::close to be called
assertThat(latch.get(), is(true));
}
@Test
public void testClean() throws InterruptedException {
final AtomicInteger latch = new AtomicInteger();
final Closeable closeable1 = () -> latch.incrementAndGet();
final Closeable closeable2 = () -> latch.incrementAndGet();
/* Delay of 2.5 seconds */
final Map<String, Object> properties = Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 2500);
bus = new ExtensionManagerBus(new HashMap<>(), properties);
final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class);
cleaner.register(closeable1);
cleaner.register(closeable2);
// Await for Closeable::close to be called on schedule
await().atMost(5, TimeUnit.SECONDS).untilAtomic(latch, equalTo(2));
assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class));
}
@Test
public void testForceCleanForEmpty() throws InterruptedException {
bus = new ExtensionManagerBus();
final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class);
assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class));
final AtomicBoolean latch = new AtomicBoolean(false);
final Closeable closeable = () -> latch.compareAndSet(false, true);
cleaner.register(closeable);
cleaner.unregister(closeable);
final DelayedCachedOutputStreamCleaner delayedCleaner = (DelayedCachedOutputStreamCleaner) cleaner;
delayedCleaner.forceClean();
// Closeable::close should not be called
assertThat(latch.get(), is(false));
}
@Test
public void testForceCleanException() throws InterruptedException {
bus = new ExtensionManagerBus();
final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class);
assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class));
final AtomicInteger latch = new AtomicInteger();
final Closeable closeable2 = () -> latch.incrementAndGet();
final Closeable closeable1 = () -> {
latch.incrementAndGet();
throw new IOException("Simulated");
};
cleaner.register(closeable1);
cleaner.register(closeable2);
final DelayedCachedOutputStreamCleaner delayedCleaner = (DelayedCachedOutputStreamCleaner) cleaner;
delayedCleaner.forceClean();
// Try to call force clean one more time
delayedCleaner.forceClean();
// Await for Closeable::close to be called
assertThat(latch.get(), equalTo(2));
}
@Test
public void testCleanOnShutdown() throws InterruptedException {
/* Delay of 5 seconds */
final Map<String, Object> properties = Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 5000);
bus = new ExtensionManagerBus(new HashMap<>(), properties);
final AtomicBoolean latch = new AtomicBoolean();
final Closeable closeable = () -> latch.compareAndSet(false, true);
final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class);
cleaner.register(closeable);
// Closes the bus, the cleaner should cancel the internal timer(s)
bus.shutdown(true);
// The Closeable::close should be called on shutdown
assertThat(latch.get(), is(true));
}
@Test
public void testCleanOnShutdownDisabled() throws InterruptedException {
/* Delay of 3 seconds */
final Map<String, Object> properties = new HashMap<>();
properties.put(CachedConstants.CLEANER_DELAY_BUS_PROP, 3000); /* 3 seconds */
properties.put(CachedConstants.CLEANER_CLEAN_ON_SHUTDOWN_BUS_PROP, false);
bus = new ExtensionManagerBus(new HashMap<>(), properties);
final AtomicBoolean latch = new AtomicBoolean();
final Closeable closeable = () -> latch.compareAndSet(false, true);
final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class);
cleaner.register(closeable);
// Closes the bus, the cleaner should cancel the internal timer(s)
bus.shutdown(true);
// The Closeable::close should not be called since timer(s) is cancelled
await().during(4, TimeUnit.SECONDS).atMost(5, TimeUnit.SECONDS).untilAtomic(latch, is(false));
}
@Test
public void testNegativeDelay() throws InterruptedException {
final Map<String, Object> properties = Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, -1);
bus = new ExtensionManagerBus(new HashMap<>(), properties);
final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class);
assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */
assertNoopCleaner(cleaner);
}
@Test
public void testTooSmallDelay() throws InterruptedException {
final Map<String, Object> properties = Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 1500);
bus = new ExtensionManagerBus(new HashMap<>(), properties);
final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class);
assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */
assertNoopCleaner(cleaner);
}
@Test
public void testDelayedClean() throws InterruptedException {
final AtomicInteger latch = new AtomicInteger();
final Closeable closeable1 = () -> latch.incrementAndGet();
final Closeable closeable2 = () -> latch.incrementAndGet();
/* Delay of 5 seconds */
final Map<String, Object> properties = Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 2500);
bus = new ExtensionManagerBus(new HashMap<>(), properties);
final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class);
cleaner.register(closeable1);
Thread.sleep(2000);
cleaner.register(closeable2);
// Await for Closeable::close to be called on schedule
await().atMost(3, TimeUnit.SECONDS).untilAtomic(latch, equalTo(1));
// Await for Closeable::close to be called on schedule
await().atMost(5, TimeUnit.SECONDS).untilAtomic(latch, equalTo(2));
assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class));
}
private void assertNoopCleaner(final CachedOutputStreamCleaner cleaner) {
final AtomicBoolean latch = new AtomicBoolean(false);
final Closeable closeable = () -> latch.compareAndSet(false, true);
cleaner.register(closeable);
final DelayedCachedOutputStreamCleaner delayedCleaner = (DelayedCachedOutputStreamCleaner) cleaner;
delayedCleaner.forceClean();
// Noop, Closeable::close should not be called
assertThat(latch.get(), is(false));
}
}