DefaultAccessLogReceiver.java

/*
 * JBoss, Home of Professional Open Source.
 * Copyright 2014 Red Hat, Inc., and individual contributors
 * as indicated by the @author tags.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package io.undertow.server.handlers.accesslog;

import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Deque;
import java.util.Locale;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import io.undertow.UndertowLogger;
import io.undertow.UndertowMessages;

/**
 * Log Receiver that stores logs in a directory under the specified file name, and rotates them after
 * midnight.
 * <p>
 * Web threads do not touch the log file, but simply queue messages to be written later by a worker thread.
 * A lightweight CAS based locking mechanism is used to ensure than only 1 thread is active writing messages at
 * any given time
 *
 * @author Stuart Douglas
 */
public class DefaultAccessLogReceiver implements AccessLogReceiver, Runnable, Closeable {
    private static final String DEFAULT_LOG_SUFFIX = "log";
    private static final int DEFAULT_RETRY_COUNT = 150;
    private static final int DEFAULT_RETRY_DELAY = 200;

    private final Executor logWriteExecutor;

    private final Deque<String> pendingMessages;

    //0 = not running - access log handler is not running, nor scheduled
    //1 = queued - log handler has been scheduled to run
    //2 = running - log handler is in run() method and performs I/O
    //3 = closing/closed - run() method as triggered by close() to terminate
    @SuppressWarnings("unused")
    private volatile int state = 0;

    private static final AtomicIntegerFieldUpdater<DefaultAccessLogReceiver> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(DefaultAccessLogReceiver.class, "state");

    private long changeOverPoint;
    private String currentDateString;
    private boolean forceLogRotation;

    private final Path outputDirectory;
    private final Path defaultLogFile;

    private final String logBaseName;
    private final String logNameSuffix;

    private BufferedWriter writer = null;

    private volatile boolean closed = false;
    private boolean initialRun = true;
    private final boolean rotate;
    private final LogFileHeaderGenerator fileHeaderGenerator;
    private final int closeRetryCount;
    private final int closeRetryDelay;

    public DefaultAccessLogReceiver(final Executor logWriteExecutor, final File outputDirectory, final String logBaseName) {
        this(logWriteExecutor, outputDirectory.toPath(), logBaseName, null);
    }

    public DefaultAccessLogReceiver(final Executor logWriteExecutor, final File outputDirectory, final String logBaseName, final String logNameSuffix) {
        this(logWriteExecutor, outputDirectory.toPath(), logBaseName, logNameSuffix, true);
    }

    public DefaultAccessLogReceiver(final Executor logWriteExecutor, final File outputDirectory, final String logBaseName, final String logNameSuffix, boolean rotate) {
        this(logWriteExecutor, outputDirectory.toPath(), logBaseName, logNameSuffix, rotate);
    }

    public DefaultAccessLogReceiver(final Executor logWriteExecutor, final Path outputDirectory, final String logBaseName) {
        this(logWriteExecutor, outputDirectory, logBaseName, null);
    }

    public DefaultAccessLogReceiver(final Executor logWriteExecutor, final Path outputDirectory, final String logBaseName, final String logNameSuffix) {
        this(logWriteExecutor, outputDirectory, logBaseName, logNameSuffix, true);
    }

    public DefaultAccessLogReceiver(final Executor logWriteExecutor, final Path outputDirectory, final String logBaseName, final String logNameSuffix, boolean rotate) {
        this(logWriteExecutor, outputDirectory, logBaseName, logNameSuffix, rotate, null);
    }

    private DefaultAccessLogReceiver(final Executor logWriteExecutor, final Path outputDirectory, final String logBaseName, final String logNameSuffix, boolean rotate, LogFileHeaderGenerator fileHeader) {
        this(logWriteExecutor, outputDirectory, logBaseName, logNameSuffix, rotate, null, DEFAULT_RETRY_COUNT, DEFAULT_RETRY_DELAY);
    }

    private DefaultAccessLogReceiver(final Executor logWriteExecutor, final Path outputDirectory, final String logBaseName, final String logNameSuffix, boolean rotate, LogFileHeaderGenerator fileHeader, final int closeRetryCount, final int closeRetryDelay) {
        this.logWriteExecutor = logWriteExecutor;
        this.outputDirectory = outputDirectory;
        this.logBaseName = logBaseName;
        this.rotate = rotate;
        this.fileHeaderGenerator = fileHeader;
        this.logNameSuffix = (logNameSuffix != null) ? logNameSuffix : DEFAULT_LOG_SUFFIX;
        this.pendingMessages = new ConcurrentLinkedDeque<>();
        this.defaultLogFile = outputDirectory.resolve(logBaseName + this.logNameSuffix);
        this.closeRetryCount = closeRetryCount;
        this.closeRetryDelay = closeRetryDelay;
        calculateChangeOverPoint();
    }

    private void calculateChangeOverPoint() {
        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.SECOND, 0);
        calendar.set(Calendar.MINUTE, 0);
        calendar.set(Calendar.HOUR_OF_DAY, 0);
        calendar.add(Calendar.DATE, 1);
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd", Locale.US);
        currentDateString = df.format(new Date());
        // if there is an existing default log file, use the date last modified instead of the current date
        if (Files.exists(defaultLogFile)) {
            try {
                currentDateString = df.format(new Date(Files.getLastModifiedTime(defaultLogFile).toMillis()));
            } catch(IOException e){
                // ignore. use the current date if exception happens.
            }
        }
        changeOverPoint = calendar.getTimeInMillis();
    }

    @Override
    public void logMessage(final String message) {
        if(closed) {
            //Log handler is closing, other resources should as well, there shouldn't
            //be resources served that required this to log stuff into AL file.
            throw UndertowMessages.MESSAGES.failedToLogAccessOnClose();
        }
        this.pendingMessages.add(message);
        if (this.state == 0 && stateUpdater.compareAndSet(this, 0, 1)) {
            logWriteExecutor.execute(this);
        }
    }

    /**
     * processes all queued log messages
     */
    @Override
    public void run() {
        //check if we can transition to 2. If so, perform tasks in small chunks and check this.closed.
        //move into 3 if(this.closed) and terminate run()
        if (!stateUpdater.compareAndSet(this, 1, 2)) {
            return;
        }
        //NOTE: once we are here, run() control state transition, unless it is too slow
        //and close takes over after grace period.

        if (forceLogRotation || System.currentTimeMillis() > changeOverPoint) {
            performFileRotation();
        } else if (initialRun && Files.exists(defaultLogFile)) {
            checkAndRotateOnInitialRun();
        }

        if(closed) {
            //better to check initially, rather than in loop, reach out to RAM all the time
            if (!stateUpdater.compareAndSet(this, 2, 3)) {
                UndertowLogger.ROOT_LOGGER.accessLogWorkerFailureOnTransition();
            }
            return;
        }
        //only grab at most 1000 messages at a time
        try {
            if(initOutput()) {
                for (int i = 0; i < 1000 && !pendingMessages.isEmpty(); ++i) {
                    final String msg = pendingMessages.peek();
                    if (msg == null) {
                        break;
                    }
                    if (!writeMessage(msg)) {
                        break;
                    }

                    // NOTE:this is very similar to remove(), but without screenNull
                    // at best, it will work like poll/remove, at worst, will do nothing
                    if (!pendingMessages.remove(msg)) {
                        break;
                    }
                }
            }
        }finally {
            // flush what we might have
            try {
                //this can happen when log has been rotated and there were no write
                final BufferedWriter bw = this.writer;
                if(bw != null)
                    bw.flush();
            } catch (IOException e) {
                UndertowLogger.ROOT_LOGGER.errorWritingAccessLog(e);
            }
            if(this.closed) {
                if (!stateUpdater.compareAndSet(this, 2, 3)) {
                    UndertowLogger.ROOT_LOGGER.accessLogWorkerFailureOnTransition();
                }
                return;
            } else {
                if (!pendingMessages.isEmpty() || forceLogRotation) {
                    if (stateUpdater.compareAndSet(this, 2, 1)) {
                        logWriteExecutor.execute(this);
                    } else {
                        UndertowLogger.ROOT_LOGGER.accessLogWorkerFailureOnReschedule();
                    }
                } else {
                    if (!stateUpdater.compareAndSet(this, 2, 0)) {
                        UndertowLogger.ROOT_LOGGER.accessLogWorkerFailureOnTransition();
                    }
                }
            }
        }
    }

    /**
     * For tests only. Blocks the current thread until all messages are written
     * Just does a busy wait.
     * <p>
     * DO NOT USE THIS OUTSIDE OF A TEST
     */
    void awaitWrittenForTest() throws InterruptedException {
        while (!pendingMessages.isEmpty() || forceLogRotation) {
            Thread.sleep(10);
        }
        while (state != 0) {
            Thread.sleep(10);
        }
    }

    private boolean writeMessage(final String message) {
        //NOTE: is there a need to rotate on write?
        //if (System.currentTimeMillis() > changeOverPoint) {
        //    performFileRotation();
        //}
        try {
            final BufferedWriter bw = this.writer;
            if (bw != null) {
                bw.write(message);
                bw.newLine();
                return true;
            }
            return false;
        } catch (IOException e) {
            UndertowLogger.ROOT_LOGGER.errorWritingAccessLog(e);
            return false;
        }
    }

    private boolean initOutput() {
        try {
            if (this.writer == null) {
                //TODO: does this ^^ need a isOpen check?
                this.writer = Files.newBufferedWriter(defaultLogFile, StandardCharsets.UTF_8, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
                if (Files.size(defaultLogFile) == 0 && fileHeaderGenerator != null) {
                    String header = fileHeaderGenerator.generateHeader();
                    if (header != null) {
                        this.writer.write(header);
                        this.writer.newLine();
                        this.writer.flush();
                    }
                }
            }
            return true;
        } catch (IOException e) {
            UndertowLogger.ROOT_LOGGER.errorWritingAccessLog(e);
            return false;
        }
    }
    private void checkAndRotateOnInitialRun() {
      //if there is an existing log file check if it should be rotated
        long lm = 0;
        try {
            lm = Files.getLastModifiedTime(defaultLogFile).toMillis();
        } catch (IOException e) {
            UndertowLogger.ROOT_LOGGER.errorRotatingAccessLog(e);
        }
        Calendar c = Calendar.getInstance();
        c.setTimeInMillis(changeOverPoint);
        c.add(Calendar.DATE, -1);
        if (lm <= c.getTimeInMillis()) {
            performFileRotation();
        }
        initialRun = false;
    }

    private void performFileRotation() {
        forceLogRotation = false;
        if (!rotate) {
            return;
        }
        try {
            if (this.writer != null) {
                this.writer.flush();
                this.writer.close();
                this.writer = null;
            }
            if (!Files.exists(defaultLogFile)) {
                return;
            }
            Path newFile = outputDirectory.resolve(logBaseName + currentDateString + "." + logNameSuffix);
            int count = 0;
            while (Files.exists(newFile)) {
                ++count;
                newFile = outputDirectory.resolve(logBaseName + currentDateString + "-" + count + "." + logNameSuffix);
            }
            Files.move(defaultLogFile, newFile);
        } catch (IOException e) {
            UndertowLogger.ROOT_LOGGER.errorRotatingAccessLog(e);
        } finally {
            calculateChangeOverPoint();
        }
    }

    /**
     * forces a log rotation. This rotation is performed in an async manner, you cannot rely on the rotation
     * being performed immediately after this method returns.
     */
    public void rotate() {
        forceLogRotation = true;
        if (stateUpdater.compareAndSet(this, 0, 1)) {
            logWriteExecutor.execute(this);
        }
    }

    @SuppressWarnings("static-access")
    @Override
    public void close() throws IOException {
        synchronized (this) {
            if(this.closed) {
                return;
            }
            this.closed = true;
        }
        if (this.stateUpdater.compareAndSet(this, 0, 3)) {
            flushAndTerminate();
            return;
        } else {
            // state[1,2] - scheduled or running, attempt schedule hijack
            if (this.stateUpdater.compareAndSet(this, 1, 3)) {
                //this means this thread raced against scheduled run(). run() will exit ASAP
                //as 1->2 wont be possible, we are at 3 and this.closed == true
                flushAndTerminate();
                return;
            }
            // either failed race to 1->3 or we were in 2. We have to wait here sometime.
            // wait ~30s(by default), if situation does not clear up, try dumping stuff
            for (int i = 0; i < this.closeRetryCount; i++) {
                try {
                    Thread.currentThread().sleep(this.closeRetryDelay);
                } catch (InterruptedException e) {
                    UndertowLogger.ROOT_LOGGER.closeInterrupted(e);
                    break;
                }
                if (this.stateUpdater.get(this) == 3) {
                    break;
                }
            }
            final int tempEndState = this.stateUpdater.getAndSet(this, 3);
            if (tempEndState == 2) {
                UndertowLogger.ROOT_LOGGER.accessLogWorkerNoTermination();
            }
            flushAndTerminate();
        }
    }

    protected void flushAndTerminate() {
        try {
            while (!this.pendingMessages.isEmpty()) {
                final String msg = this.pendingMessages.poll();
                // TODO: clarify this, how is this possible?
                if (msg == null) {
                    continue;
                }
                writeMessage(msg);
            }

            this.writer.flush();
            this.writer.close();
            this.writer = null;

        } catch (IOException e) {
            UndertowLogger.ROOT_LOGGER.errorWritingAccessLog(e);
        } finally {
            //NOTE: no need, it cant be reused?
            //stateUpdater.set(this, 0);
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    public static class Builder {
        private Executor logWriteExecutor;
        private Path outputDirectory;
        private String logBaseName;
        private String logNameSuffix;
        private boolean rotate;
        private LogFileHeaderGenerator logFileHeaderGenerator;
        /**
         * Number of attempts for closing thread to take over control, attempts interval is specified by {@link #closeRetryCount}
         */
        private int closeRetryCount = DEFAULT_RETRY_COUNT;
        /**
         * Delay(ms) between retries when closing thread attempts to take over control.
         */
        private int closeRetryDelay = DEFAULT_RETRY_DELAY;

        public Executor getLogWriteExecutor() {
            return logWriteExecutor;
        }

        public Builder setLogWriteExecutor(Executor logWriteExecutor) {
            this.logWriteExecutor = logWriteExecutor;
            return this;
        }

        public Path getOutputDirectory() {
            return outputDirectory;
        }

        public Builder setOutputDirectory(Path outputDirectory) {
            this.outputDirectory = outputDirectory;
            return this;
        }

        public String getLogBaseName() {
            return logBaseName;
        }

        public Builder setLogBaseName(String logBaseName) {
            this.logBaseName = logBaseName;
            return this;
        }

        public String getLogNameSuffix() {
            return logNameSuffix;
        }

        public Builder setLogNameSuffix(String logNameSuffix) {
            this.logNameSuffix = logNameSuffix;
            return this;
        }

        public boolean isRotate() {
            return rotate;
        }

        public Builder setRotate(boolean rotate) {
            this.rotate = rotate;
            return this;
        }

        public LogFileHeaderGenerator getLogFileHeaderGenerator() {
            return logFileHeaderGenerator;
        }

        public Builder setLogFileHeaderGenerator(LogFileHeaderGenerator logFileHeaderGenerator) {
            this.logFileHeaderGenerator = logFileHeaderGenerator;
            return this;
        }

        public int getCloseRetryCount() {
            return closeRetryCount;
        }

        public Builder setCloseRetryCount(int closeRetryCount) {
            this.closeRetryCount = closeRetryCount;
            return this;
        }

        public int getCloseRetryDelay() {
            return closeRetryDelay;
        }

        /**
         * Delay in ms between retrying poll on state to check for proper termination of worker
         * @param closeRetryDelay
         */
        public Builder setCloseRetryDelay(int closeRetryDelay) {
            this.closeRetryDelay = closeRetryDelay;
            return this;
        }

        public DefaultAccessLogReceiver build() {
            return new DefaultAccessLogReceiver(logWriteExecutor, outputDirectory, logBaseName, logNameSuffix, rotate, logFileHeaderGenerator, closeRetryCount, closeRetryDelay);
        }
    }
}