BlockingInputTest.java

/*
 * Copyright (c) 2014, 2017 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.tyrus.test.standard_config;

import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.PongMessage;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

import org.glassfish.tyrus.client.ClientManager;
import org.glassfish.tyrus.server.Server;
import org.glassfish.tyrus.test.tools.TestContainer;

import org.junit.Test;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
 * Tests that threads blocked in {@link java.io.Reader} and {@link java.io.InputStream} are unblocked after the session
 * has been closed. Also tests that attempt to read from {@link java.io.Reader} and {@link java.io.InputStream}
 * will fail when the session has been closed.
 *
 * @author Petr Janouch
 */
public class BlockingInputTest extends TestContainer {

    /**
     * Test that a thread blocked in {@link java.io.Reader} on the client side gets released if the session is closed by
     * the client.
     */
    @Test
    public void testReaderCloseByClient() {
        Server server = null;
        try {
            CountDownLatch threadReleasedLatch = new CountDownLatch(1);
            CountDownLatch messageLatch = new CountDownLatch(1);

            server = startServer(AnnotatedServerTextEndpoint.class);
            ClientManager client = createClient();
            Session session = client.connectToServer(new CloseByClientEndpoint(threadReleasedLatch, messageLatch),
                                                     getURI(AnnotatedServerTextEndpoint.class));

            assertTrue(messageLatch.await(1, TimeUnit.SECONDS));
            // give the client endpoint some time to get blocked
            Thread.sleep(100);
            session.close();
            assertTrue(threadReleasedLatch.await(1, TimeUnit.SECONDS));
        } catch (Exception e) {
            e.printStackTrace();
            fail();
        } finally {
            stopServer(server);
        }
    }

    /**
     * Test that a thread blocked in {@link java.io.InputStream} on the client side gets released if the session is
     * closed by the client.
     */
    @Test
    public void testInputStreamCloseByClient() {
        Server server = null;
        try {
            CountDownLatch threadReleasedLatch = new CountDownLatch(1);
            CountDownLatch messageLatch = new CountDownLatch(1);

            server = startServer(AnnotatedServerBinaryEndpoint.class);
            ClientManager client = createClient();
            Session session = client.connectToServer(new CloseByClientEndpoint(threadReleasedLatch, messageLatch),
                                                     getURI(AnnotatedServerBinaryEndpoint.class));

            assertTrue(messageLatch.await(1, TimeUnit.SECONDS));
            // give the client endpoint some time to get blocked
            Thread.sleep(100);
            session.close();
            assertTrue(threadReleasedLatch.await(1, TimeUnit.SECONDS));
        } catch (Exception e) {
            e.printStackTrace();
            fail();
        } finally {
            stopServer(server);
        }
    }

    /**
     * Test that a thread blocked in {@link java.io.Reader} on the client side gets released if the session is closed
     * by the server.
     */
    @Test
    public void testReaderCloseByServer() {
        Server server = null;
        try {
            CountDownLatch threadReleasedLatch = new CountDownLatch(1);

            server = startServer(AnnotatedServerTextEndpoint.class);
            ClientManager client = createClient();
            client.connectToServer(new CloseByServerEndpoint(threadReleasedLatch),
                                   getURI(AnnotatedServerTextEndpoint.class));

            assertTrue(threadReleasedLatch.await(1, TimeUnit.SECONDS));
        } catch (Exception e) {
            e.printStackTrace();
            fail();
        } finally {
            stopServer(server);
        }
    }

    /**
     * Test that a thread blocked in {@link java.io.InputStream} on the client side gets released if the session is
     * closed by the server.
     */
    @Test
    public void testInputStreamCloseByServer() {
        Server server = null;
        try {
            CountDownLatch threadReleasedLatch = new CountDownLatch(1);

            server = startServer(AnnotatedServerBinaryEndpoint.class);
            ClientManager client = createClient();
            client.connectToServer(new CloseByServerEndpoint(threadReleasedLatch),
                                   getURI(AnnotatedServerBinaryEndpoint.class));

            assertTrue(threadReleasedLatch.await(1, TimeUnit.SECONDS));
        } catch (Exception e) {
            e.printStackTrace();
            fail();
        } finally {
            stopServer(server);
        }
    }

    /**
     * Test that an attempt to read from {@link java.io.Reader} will throw {@link java.io.IOException} if the session
     * has been closed.
     */
    @Test
    public void testReaderWithClosedSession() {
        Server server = null;
        try {
            CountDownLatch threadReleasedLatch = new CountDownLatch(1);

            server = startServer(AnnotatedServerTextEndpoint.class);
            ClientManager client = createClient();
            client.connectToServer(new ReadFromClosedSessionEndpoint(threadReleasedLatch),
                                   getURI(AnnotatedServerTextEndpoint.class));

            assertTrue(threadReleasedLatch.await(1, TimeUnit.SECONDS));
        } catch (Exception e) {
            e.printStackTrace();
            fail();
        } finally {
            stopServer(server);
        }
    }

    /**
     * Test that an attempt to read from {@link java.io.InputStream} will throw {@link java.io.IOException} if the
     * session has been closed.
     */
    @Test
    public void testInputStreamWithClosedSession() {
        Server server = null;
        try {
            CountDownLatch threadReleasedLatch = new CountDownLatch(1);

            server = startServer(AnnotatedServerBinaryEndpoint.class);
            ClientManager client = createClient();
            client.connectToServer(new ReadFromClosedSessionEndpoint(threadReleasedLatch),
                                   getURI(AnnotatedServerBinaryEndpoint.class));

            assertTrue(threadReleasedLatch.await(1, TimeUnit.SECONDS));
        } catch (Exception e) {
            e.printStackTrace();
            fail();
        } finally {
            stopServer(server);
        }
    }

    @ServerEndpoint("/blockingTextInputEndpoint")
    public static class AnnotatedServerTextEndpoint {

        @OnOpen
        public void onOpen(Session session) throws IOException {
            session.getBasicRemote().sendText("A", false);
        }

        @OnMessage
        public void OnMessage(PongMessage message, Session session) throws IOException, InterruptedException {
            // give the client endpoint some time to get blocked
            Thread.sleep(100);
            session.close();
        }
    }

    @ServerEndpoint("/blockingBinaryInputEndpoint")
    public static class AnnotatedServerBinaryEndpoint {

        @OnOpen
        public void onOpen(Session session) throws IOException {
            session.getBasicRemote().sendBinary(ByteBuffer.wrap("A".getBytes()), false);
        }

        @OnMessage
        public void OnMessage(PongMessage message, Session session) throws IOException, InterruptedException {
            // give the client endpoint some time to get blocked
            Thread.sleep(100);
            session.close();
        }
    }

    @ClientEndpoint
    public static class CloseByServerEndpoint {

        /**
         * Latch waiting for the blocked thread to be released.
         */
        private final CountDownLatch threadReleasedLatch;

        public CloseByServerEndpoint(CountDownLatch threadReleasedLatch) {
            this.threadReleasedLatch = threadReleasedLatch;
        }

        @OnMessage
        public void onMessage(Session session, Reader reader) throws IOException {
            reader.read();
            //server will close the session upon receiving a pong
            session.getAsyncRemote().sendPong(null);
            try {
                reader.read();
            } catch (IOException e) {
                threadReleasedLatch.countDown();
            }
        }

        @OnMessage
        public void onMessage(Session session, InputStream stream) throws IOException {
            stream.read();
            //server will close the session upon receiving a pong
            session.getAsyncRemote().sendPong(null);
            try {
                stream.read();
            } catch (Exception e) {
                threadReleasedLatch.countDown();
            }
        }
    }

    @ClientEndpoint
    public static class CloseByClientEndpoint {

        /**
         * Latch waiting for the blocked thread to be released.
         */
        private final CountDownLatch threadReleasedLatch;
        /**
         * Latch waiting for a message from the server.
         */
        private final CountDownLatch messageLatch;

        public CloseByClientEndpoint(CountDownLatch threadReleasedLatch, CountDownLatch messageLatch) {
            this.threadReleasedLatch = threadReleasedLatch;
            this.messageLatch = messageLatch;
        }

        @OnMessage
        public void onMessage(Session session, Reader reader) throws IOException {
            reader.read();
            messageLatch.countDown();
            try {
                reader.read();
            } catch (IOException e) {
                threadReleasedLatch.countDown();
            }
        }

        @OnMessage
        public void onMessage(Session session, InputStream stream) throws IOException {
            stream.read();
            messageLatch.countDown();
            try {
                stream.read();
            } catch (IOException e) {
                threadReleasedLatch.countDown();
            }
        }
    }

    @ClientEndpoint
    public static class ReadFromClosedSessionEndpoint {

        /**
         * Latch waiting for the blocked thread to be released.
         */
        private final CountDownLatch threadReleasedLatch;

        public ReadFromClosedSessionEndpoint(CountDownLatch threadReleasedLatch) {
            this.threadReleasedLatch = threadReleasedLatch;
        }

        @OnMessage
        public void onMessage(Session session, Reader reader) throws IOException {
            reader.read();
            session.close();
            try {
                reader.read();
            } catch (IOException e) {
                threadReleasedLatch.countDown();
            }
        }

        @OnMessage
        public void onMessage(Session session, InputStream stream) throws IOException {
            stream.read();
            session.close();
            try {
                stream.read();
            } catch (IOException e) {
                threadReleasedLatch.countDown();
            }
        }
    }
}