DefaultWatchThread.java

/*
 * Copyright 2017-2020 original authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package io.micronaut.scheduling.io.watch;

import io.micronaut.context.LifeCycle;
import io.micronaut.context.annotation.Parallel;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.util.StringUtils;
import io.micronaut.scheduling.io.watch.event.FileChangedEvent;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.micronaut.core.annotation.NonNull;
import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * Simple watch service that simply stops the server if any changes occur. It is up to an external tool to watch the server.
 *
 * <p>For example with Gradle you use <code>./gradlew run --continuous</code></p>
 *
 * @author graemerocher
 * @since 1.1.0
 */
@Requires(property = FileWatchConfiguration.PATHS)
@Requires(property = FileWatchConfiguration.ENABLED, value = StringUtils.TRUE, defaultValue = StringUtils.FALSE)
@Requires(condition = FileWatchCondition.class)
@Requires(notEnv = {Environment.FUNCTION, Environment.ANDROID})
@Requires(beans = WatchService.class)
@Parallel
@Singleton
public class DefaultWatchThread implements LifeCycle<DefaultWatchThread> {

    private static final Logger LOG = LoggerFactory.getLogger(DefaultWatchThread.class);
    private final FileWatchConfiguration configuration;
    private final AtomicBoolean active = new AtomicBoolean(true);
    private final ApplicationEventPublisher eventPublisher;
    private final WatchService watchService;
    private Collection<WatchKey> watchKeys = new ConcurrentLinkedQueue<>();

    /**
     * Default constructor.
     *
     * @param eventPublisher The event publisher
     * @param configuration the configuration
     * @param watchService the watch service
     */
    protected DefaultWatchThread(
            ApplicationEventPublisher eventPublisher,
            FileWatchConfiguration configuration,
            WatchService watchService) {
        this.eventPublisher = eventPublisher;
        this.configuration = configuration;
        this.watchService = watchService;
    }

    @Override
    public boolean isRunning() {
        return active.get();
    }

    @Override
    @PostConstruct
    public DefaultWatchThread start() {
        try {
            final List<Path> paths = configuration.getPaths();
            if (!paths.isEmpty()) {
                for (Path path : paths) {
                    if (path.toFile().exists()) {
                        addWatchDirectory(path);
                    }
                }
            }

            if (!watchKeys.isEmpty()) {
                new Thread(() -> {
                    while (active.get()) {
                        try {
                            WatchKey watchKey = watchService.poll(configuration.getCheckInterval().toMillis(), TimeUnit.MILLISECONDS);
                            if (watchKey != null && watchKeys.contains(watchKey)) {
                                List<WatchEvent<?>> watchEvents = watchKey.pollEvents();
                                for (WatchEvent<?> watchEvent : watchEvents) {
                                    WatchEvent.Kind<?> kind = watchEvent.kind();
                                    if (kind == StandardWatchEventKinds.OVERFLOW) {
                                        if (LOG.isWarnEnabled()) {
                                            LOG.warn("WatchService Overflow occurred");
                                        }
                                    } else {
                                        final Object context = watchEvent.context();
                                        if (context instanceof Path path) {

                                            if (LOG.isDebugEnabled()) {
                                                LOG.debug("File at path {} changed. Firing change event: {}", context, kind);
                                            }
                                            eventPublisher.publishEvent(new FileChangedEvent(
                                                    path,
                                                    kind
                                            ));
                                        }
                                    }
                                }
                                watchKey.reset();
                            }
                        } catch (InterruptedException | ClosedWatchServiceException e) {
                            // ignore
                            Thread.currentThread().interrupt();
                        }
                    }
                }, "micronaut-filewatch-thread").start();
            }
        } catch (IOException e) {
            if (LOG.isErrorEnabled()) {
                LOG.error("Error starting file watch service: {}", e.getMessage(), e);
            }
        }
        return this;
    }

    @Override
    public DefaultWatchThread stop() {
        active.set(false);
        closeWatchService();
        return this;
    }

    @Override
    @PreDestroy
    public void close() {
        stop();
    }

    /**
     * @return The watch service used.
     */
    public @NonNull WatchService getWatchService() {
        return watchService;
    }

    /**
     * Closes the watch service.
     */
    protected void closeWatchService() {
        try {
            getWatchService().close();
        } catch (IOException e) {
            if (LOG.isErrorEnabled()) {
                LOG.error("Error stopping file watch service: {}", e.getMessage(), e);
            }
        }
    }

    /**
     * Registers a patch to watch.
     *
     * @param dir The directory to watch
     * @return The watch key
     * @throws IOException if an error occurs.
     */
    protected @NonNull WatchKey registerPath(@NonNull Path dir) throws IOException {
        return dir.register(watchService,
                StandardWatchEventKinds.ENTRY_CREATE,
                StandardWatchEventKinds.ENTRY_DELETE,
                StandardWatchEventKinds.ENTRY_MODIFY
        );
    }

    private boolean isValidDirectoryToMonitor(File file) {
        return file.isDirectory() && !file.isHidden() && !file.getName().startsWith(".");
    }

    private Path addWatchDirectory(Path p) throws IOException {
        return Files.walkFileTree(p, new SimpleFileVisitor<Path>() {
            @Override
            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
                    throws IOException {

                if (!isValidDirectoryToMonitor(dir.toFile())) {
                    return FileVisitResult.SKIP_SUBTREE;
                }
                WatchKey watchKey = registerPath(dir);
                watchKeys.add(watchKey);
                return FileVisitResult.CONTINUE;
            }
        });
    }

}