DelayedCachedOutputStreamCleaner.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.cxf.io;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import jakarta.annotation.Resource;
import org.apache.cxf.Bus;
import org.apache.cxf.buslifecycle.BusLifeCycleListener;
import org.apache.cxf.buslifecycle.BusLifeCycleManager;
import org.apache.cxf.common.logging.LogUtils;

public final class DelayedCachedOutputStreamCleaner implements CachedOutputStreamCleaner, BusLifeCycleListener {
    private static final Logger LOG = LogUtils.getL7dLogger(DelayedCachedOutputStreamCleaner.class);
    private static final long MIN_DELAY = 2000; /* 2 seconds */
    private static final DelayedCleaner NOOP_CLEANER = new DelayedCleaner() {
        // NOOP
    };

    private DelayedCleaner cleaner = NOOP_CLEANER;
    private boolean cleanupOnShutdown = true;

    private interface DelayedCleaner extends CachedOutputStreamCleaner, Closeable {
        @Override
        default void register(Closeable closeable) {
        }

        @Override
        default void unregister(Closeable closeable) {
        }

        @Override
        default void close() {
        }

        @Override
        default void clean() {
        }

        @Override
        default int size() {
            return 0;
        }

        default void forceClean() {
        }
    }

    private static final class DelayedCleanerImpl implements DelayedCleaner {
        private final long delay; /* default is 30 minutes, in milliseconds */
        private final DelayQueue<DelayedCloseable> queue = new DelayQueue<>();
        private final Timer timer;
        
        DelayedCleanerImpl(final long delay) {
            this.delay = delay;
            this.timer = new Timer("DelayedCachedOutputStreamCleaner", true);
            this.timer.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    clean();
                }
            }, 0, Math.max(MIN_DELAY, delay >> 1));
        }

        @Override
        public void register(Closeable closeable) {
            queue.put(new DelayedCloseable(closeable, delay));
        }

        @Override
        public void unregister(Closeable closeable) {
            queue.remove(new DelayedCloseable(closeable, delay));
        }

        @Override
        public void clean() {
            final Collection<DelayedCloseable> closeables = new ArrayList<>();
            queue.drainTo(closeables);
            clean(closeables);
        }

        @Override
        public void forceClean() {
            clean(queue);
        }

        @Override
        public void close()  {
            timer.cancel();
            queue.clear();
        }

        @Override
        public int size() {
            return queue.size();
        }

        private void clean(Collection<DelayedCloseable> closeables) {
            final Iterator<DelayedCloseable> iterator = closeables.iterator();
            while (iterator.hasNext()) {
                final DelayedCloseable next = iterator.next();
                try {
                    iterator.remove();
                    LOG.warning("Unclosed (leaked?) stream detected: " + next.closeable.hashCode());
                    next.closeable.close();
                } catch (final IOException | RuntimeException ex) {
                    LOG.warning("Unable to close (leaked?) stream: " + ex.getMessage());
                }
            }
        }
    }

    private static final class DelayedCloseable implements Delayed {
        private final Closeable closeable;
        private final long expireAt;
        
        DelayedCloseable(final Closeable closeable, final long delay) {
            this.closeable = closeable;
            this.expireAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(delay, TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            return Long.compare(expireAt, ((DelayedCloseable) o).expireAt);
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(expireAt - System.nanoTime(), TimeUnit.NANOSECONDS);
        }

        @Override
        public int hashCode() {
            return Objects.hash(closeable);
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            
            if (obj == null) {
                return false;
            }
            
            if (getClass() != obj.getClass()) {
                return false;
            }
            
            final DelayedCloseable other = (DelayedCloseable) obj;
            return Objects.equals(closeable, other.closeable);
        }
    }

    @Resource
    public void setBus(Bus bus) {
        Number delayValue = null;
        BusLifeCycleManager busLifeCycleManager = null;
        Boolean cleanupOnShutdownValue = null;

        if (bus != null) {
            delayValue = (Number) bus.getProperty(CachedConstants.CLEANER_DELAY_BUS_PROP);
            cleanupOnShutdownValue = (Boolean) bus.getProperty(CachedConstants.CLEANER_CLEAN_ON_SHUTDOWN_BUS_PROP);
            busLifeCycleManager = bus.getExtension(BusLifeCycleManager.class);
        }

        if (cleaner != null) {
            cleaner.close();
        }

        if (cleanupOnShutdownValue != null) {
            cleanupOnShutdown = cleanupOnShutdownValue;
        } else {
            cleanupOnShutdown = true;
        }

        if (delayValue == null) {
            // Default delay is set to 30 mins
            cleaner = new DelayedCleanerImpl(TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES));
        } else {
            final long value = delayValue.longValue();
            if (value > 0 && value >= MIN_DELAY) {
                cleaner = new DelayedCleanerImpl(value); /* already in milliseconds */
            } else {
                cleaner = NOOP_CLEANER;
                if (value != 0) {
                    throw new IllegalArgumentException("The value of " + CachedConstants.CLEANER_DELAY_BUS_PROP 
                        + " property is invalid: " + value + " (should be >= " + MIN_DELAY + ", 0 to deactivate)");
                }
            }
        }

        if (busLifeCycleManager != null) {
            busLifeCycleManager.registerLifeCycleListener(this);
        }
    }

    @Override
    public void register(Closeable closeable) {
        cleaner.register(closeable);
    }

    @Override
    public void unregister(Closeable closeable) {
        cleaner.unregister(closeable);
    }

    @Override
    public int size() {
        return cleaner.size();
    }

    @Override
    public void clean() {
        cleaner.clean();
    }
    
    @Override
    public void initComplete() {
    }
    
    @Override
    public void postShutdown() {
        // If cleanup on shutdown is asked, force cleaning all cached output streams
        if (cleanupOnShutdown) {
            forceClean();
            cleaner.close();
        }
    }
    
    @Override
    public void preShutdown() {
        // If cleanup on shutdown is asked, defer closing till postShutdown hook
        if (!cleanupOnShutdown) {
            cleaner.close();
        }
    }

    public void forceClean() {
        cleaner.forceClean();
    }
}