JobInterruptMonitorPlugin.java

/*
 * All content copyright Terracotta, Inc., unless otherwise indicated. All rights reserved.
 * Copyright IBM Corp. 2024, 2025
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy
 * of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package org.quartz.plugins.interrupt;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.quartz.JobExecutionContext;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.Trigger.CompletedExecutionInstruction;
import org.quartz.listeners.TriggerListenerSupport;
import org.quartz.spi.ClassLoadHelper;
import org.quartz.spi.SchedulerPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This plugin catches the event of job running for a long time (more than the
 * configured max time) and tells the scheduler to "try" interrupting it if
 * enabled.
 * 
 * @see org.quartz.Scheduler#interrupt(JobKey)
 * 
 * @author Rama Chavali
 */

public class JobInterruptMonitorPlugin extends TriggerListenerSupport implements SchedulerPlugin {

    private static final String JOB_INTERRUPT_MONITOR_KEY = "JOB_INTERRUPT_MONITOR_KEY";

    private long DEFAULT_MAX_RUNTIME = 300000;
    
    private String name;

    private ScheduledExecutorService executor;

    @SuppressWarnings("rawtypes")
    private ScheduledFuture future;

    private Scheduler scheduler;

    private final Logger log = LoggerFactory.getLogger(getClass());
    
    //Constants

    public static final String AUTO_INTERRUPTIBLE = "AutoInterruptable";

    public static final String MAX_RUN_TIME = "MaxRunTime";

    public JobInterruptMonitorPlugin() {
    }

    @Override
    public void start() {
    }

    @Override

    public void shutdown() {
        this.executor.shutdown();
    }

    protected Logger getLog() {
        return log;
    }

    @SuppressWarnings("rawtypes")
    public ScheduledFuture scheduleJobInterruptMonitor(JobKey jobkey, long delay) {
        return this.executor.schedule(new InterruptMonitor(jobkey, scheduler), delay, TimeUnit.MILLISECONDS);
    }

    // Bean Property Methods

    public long getDefaultMaxRunTime() {
        return this.DEFAULT_MAX_RUNTIME;
    }

    public void setDefaultMaxRunTime(long defaultMaxRunTime) {
        this.DEFAULT_MAX_RUNTIME = defaultMaxRunTime;
    }

    // Trigger Listener Methods
    public String getName() {
        return name;
    }

    public void triggerFired(Trigger trigger, JobExecutionContext context) {
        // Call the scheduleJobInterruptMonitor and capture the ScheduledFuture in context
        try {
            // Schedule Monitor only if the job wants AutoInterruptable functionality
            if (context.getJobDetail().getJobDataMap().getBoolean(AUTO_INTERRUPTIBLE)) {
                JobInterruptMonitorPlugin monitorPlugin = (JobInterruptMonitorPlugin) context.getScheduler()
                        .getContext().get(JOB_INTERRUPT_MONITOR_KEY);
                // Get the MaxRuntime from Job Data if NOT available use DEFAULT_MAX_RUNTIME from Plugin Configuration
                long jobDataDelay  = DEFAULT_MAX_RUNTIME;

                if (context.getJobDetail().getJobDataMap().get(MAX_RUN_TIME) != null){
                     jobDataDelay = context.getJobDetail().getJobDataMap().getLong(MAX_RUN_TIME);
                }
                future = monitorPlugin.scheduleJobInterruptMonitor(context.getJobDetail().getKey(), jobDataDelay);
                getLog().debug("Job's Interrupt Monitor has been scheduled to interrupt with the delay :{}", DEFAULT_MAX_RUNTIME);
            }
        } catch (SchedulerException e) {
            getLog().info("Error scheduling interrupt monitor {}", e.getMessage(), e);
        }
    }

    public void triggerComplete(Trigger trigger, JobExecutionContext context,
            CompletedExecutionInstruction triggerInstructionCode) {
        // cancel the Future if job is complete
        if (future != null) {
            future.cancel(true);
        }
    }

    @Override
    public void initialize(String name, Scheduler scheduler, ClassLoadHelper helper) throws SchedulerException {

        getLog().info("Registering Job Interrupt Monitor Plugin");
        this.name = name;
        this.executor = Executors.newScheduledThreadPool(1);
        scheduler.getContext().put(JOB_INTERRUPT_MONITOR_KEY, this);
        this.scheduler = scheduler;
        // Set the trigger Listener as this class to the ListenerManager here
        this.scheduler.getListenerManager().addTriggerListener(this);

    }

    static class InterruptMonitor implements Runnable {

        private final JobKey jobKey;
        private final Scheduler scheduler;

        private final Logger log = LoggerFactory.getLogger(getClass());

        InterruptMonitor(JobKey jobKey, Scheduler scheduler) {
            this.jobKey = jobKey;
            this.scheduler = scheduler;
        }

        protected Logger getLog() {
            return log;
        }

        @Override
        public void run() {
            try {

                // Interrupt the job here - using Scheduler API that gets propagated to Job's interrupt
                getLog().info("Interrupting Job as it ran more than the configured max time. Job Details [{}:{}]", jobKey.getName(), jobKey.getGroup());
                scheduler.interrupt(jobKey);
            } catch (SchedulerException x) {
                getLog().info("Error interrupting Job: {}", x.getMessage(), x);
            }
        }
    }
}