TaskProcessor.java

/*
 * Copyright (c) 2013, 2017 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.tyrus.container.grizzly.client;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * A class responsible for processing {@link Task}. It ensures that only one task will be processed at a time, because
 * Grizzly Worker-thread IOStrategy does not wait until one message is processed before dispatching another one.
 *
 * @author Pavel Bucek (pavel.bucek at oracle.com)
 * @author Petr Janouch
 */
public class TaskProcessor {

    private final Queue<Task> taskQueue = new ConcurrentLinkedQueue<Task>();
    private final Condition condition;
    /**
     * A lock that indicates that a thread is processing a task.
     */
    private final Lock taskLock = new ReentrantLock();

    /**
     * Constructor.
     *
     * @param condition if present, it will be called before processing each {@link Task}. When {@link
     *                  org.glassfish.tyrus.container.grizzly.client.TaskProcessor.Condition#isValid()}
     *                  returns {@code false}, processing will be terminated. If {@code null},
     *                  all tasks from the queue will be processed.
     */
    public TaskProcessor(Condition condition) {
        this.condition = condition;
    }

    /**
     * Constructor.
     * <p>
     * There is no condition that has to be checked before processing each task.
     */
    public TaskProcessor() {
        this.condition = null;
    }

    /**
     * Add a task to the task queue and process as much tasks from the task queue as possible.
     *
     * @param task {@link Task} that should be processed.
     */
    public void processTask(Task task) {
        taskQueue.offer(task);
        processTask();
    }

    /**
     * Process as much tasks from task queue as possible.
     */
    public void processTask() {
        if (!taskLock.tryLock()) {
            // there is another thread processing a task it will take care of this task too
            return;
        }

        try {
            while (!taskQueue.isEmpty()) {
                if (condition != null && !condition.isValid()) {
                    return;
                }

                final Task first = taskQueue.poll();
                if (first == null) {
                    continue;
                }

                first.execute();
            }
        } finally {
            taskLock.unlock();
        }

        /*
         * There is a small chance that another thread will manage to add a task to the queue in the moment when the
         * thread processing tasks has left the while cycle, but has not released the lock yet. In that case a task
         * might be added to the queue and stay there indefinitely. It is quite improbable, but the thread that has
         * finished processing tasks should try to process more tasks after releasing the lock.
         */
        if (!taskQueue.isEmpty()) {
            processTask();
        }
    }

    /**
     * Generic task representation.
     */
    public abstract static class Task {
        /**
         * To be overridden.
         */
        public abstract void execute();
    }

    /**
     * Condition used in {@link #processTask(org.glassfish.tyrus.container.grizzly.client.TaskProcessor.Task)}.
     */
    public interface Condition {

        /**
         * Check the condition.
         *
         * @return {@code true} when condition is valid, {@code false otherwise}.
         */
        public boolean isValid();
    }
}