TyrusRemoteEndpoint.java

/*
 * Copyright (c) 2011, 2020 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.tyrus.core;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.websocket.CloseReason;
import javax.websocket.EncodeException;
import javax.websocket.SendHandler;
import javax.websocket.SendResult;

import org.glassfish.tyrus.core.l10n.LocalizationMessages;
import org.glassfish.tyrus.spi.WriterInfo;
import org.glassfish.tyrus.spi.WriterInfo.MessageType;
import org.glassfish.tyrus.spi.WriterInfo.RemoteEndpointType;

import static org.glassfish.tyrus.core.Utils.checkNotNull;

/**
 * Wraps the {@link javax.websocket.RemoteEndpoint} and represents the other side of the websocket connection.
 *
 * @author Danny Coward (danny.coward at oracle.com)
 * @author Martin Matula (martin.matula at oracle.com)
 * @author Stepan Kopriva (stepan.kopriva at oracle.com)
 * @author Pavel Bucek (pavel.bucek at oracle.com)
 */
public abstract class TyrusRemoteEndpoint implements javax.websocket.RemoteEndpoint {

    final TyrusSession session;
    final TyrusWebSocket webSocket;

    private final TyrusEndpointWrapper endpointWrapper;

    private static final Logger LOGGER = Logger.getLogger(TyrusRemoteEndpoint.class.getName());

    private TyrusRemoteEndpoint(TyrusSession session, TyrusWebSocket socket, TyrusEndpointWrapper endpointWrapper) {
        this.webSocket = socket;
        this.endpointWrapper = endpointWrapper;
        this.session = session;
    }

    static class Basic extends TyrusRemoteEndpoint implements javax.websocket.RemoteEndpoint.Basic {

        Basic(TyrusSession session, TyrusWebSocket socket, TyrusEndpointWrapper endpointWrapper) {
            super(session, socket, endpointWrapper);
        }

        @Override
        public void sendText(String text) throws IOException {
            checkNotNull(text, "text");

            session.getDebugContext()
                   .appendLogMessage(LOGGER, Level.FINEST, DebugContext.Type.MESSAGE_OUT, "Sending text message: ",
                                     text);

            final Future<?> future = webSocket.sendText(text, new WriterInfo(MessageType.TEXT, RemoteEndpointType.BASIC));
            try {
                processFuture(future);
            } finally {
                session.restartIdleTimeoutExecutor();
            }
        }

        @Override
        public void sendBinary(ByteBuffer data) throws IOException {
            checkNotNull(data, "data");

            session.getDebugContext()
                   .appendLogMessage(LOGGER, Level.FINEST, DebugContext.Type.MESSAGE_OUT, "Sending binary message");

            final Future<?> future = webSocket.sendBinary(Utils.getRemainingArray(data),
                    new WriterInfo(WriterInfo.MessageType.BINARY, WriterInfo.RemoteEndpointType.BASIC));
            try {
                processFuture(future);
            } finally {
                session.restartIdleTimeoutExecutor();
            }
        }

        @Override
        public void sendText(String partialMessage, boolean isLast) throws IOException {
            checkNotNull(partialMessage, "partialMessage");

            session.getDebugContext().appendLogMessage(LOGGER, Level.FINEST, DebugContext.Type.MESSAGE_OUT,
                                                       "Sending partial text message: ", partialMessage);

            final Future<?> future = webSocket.sendText(partialMessage, isLast,
                    new WriterInfo(isLast ? MessageType.TEXT : MessageType.TEXT_CONTINUATION, RemoteEndpointType.BASIC));
            try {
                processFuture(future);
            } finally {
                session.restartIdleTimeoutExecutor();
            }
        }

        @Override
        public void sendBinary(ByteBuffer partialByte, boolean isLast) throws IOException {
            checkNotNull(partialByte, "partialByte");

            session.getDebugContext().appendLogMessage(LOGGER, Level.FINEST, DebugContext.Type.MESSAGE_OUT,
                                                       "Sending partial binary message");

            final Future<?> future = webSocket.sendBinary(Utils.getRemainingArray(partialByte), isLast,
                    new WriterInfo(isLast ? MessageType.BINARY : MessageType.BINARY_CONTINUATION, RemoteEndpointType.BASIC));
            try {
                processFuture(future);
            } finally {
                session.restartIdleTimeoutExecutor();
            }
        }

        /**
         * Wait for the future to be completed.
         * <p>
         * {@link java.util.concurrent.Future#get()} will be invoked and exception processed (if thrown).
         *
         * @param future to be processed.
         * @throws IOException when {@link java.io.IOException} is the cause of thrown {@link
         *                     java.util.concurrent.ExecutionException} it will be extracted and rethrown. Otherwise
         *                     whole ExecutionException will be rethrown wrapped in {@link java.io.IOException}.
         */
        private void processFuture(Future<?> future) throws IOException {
            try {
                future.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException e) {
                if (e.getCause() instanceof IOException) {
                    throw (IOException) e.getCause();
                } else {
                    throw new IOException(e.getCause());
                }
            }
        }

        @Override
        public void sendObject(Object data) throws IOException, EncodeException {
            checkNotNull(data, "data");
            final Future<?> future = sendSyncObject(data, new WriterInfo(MessageType.OBJECT, RemoteEndpointType.BASIC));
            try {
                future.get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException e) {
                if (e.getCause() instanceof IOException) {
                    throw (IOException) e.getCause();
                } else if (e.getCause() instanceof EncodeException) {
                    throw (EncodeException) e.getCause();
                } else {
                    throw new IOException(e.getCause());
                }
            }
            session.restartIdleTimeoutExecutor();
        }

        @Override
        public OutputStream getSendStream() throws IOException {
            return new OutputStreamToAsyncBinaryAdapter(webSocket);
        }

        @Override
        public Writer getSendWriter() throws IOException {
            return new WriterToAsyncTextAdapter(webSocket);
        }
    }

    static class Async extends TyrusRemoteEndpoint implements javax.websocket.RemoteEndpoint.Async {
        private long sendTimeout;

        Async(TyrusSession session, TyrusWebSocket socket, TyrusEndpointWrapper endpointWrapper) {
            super(session, socket, endpointWrapper);

            if (session.getContainer() != null) {
                setSendTimeout(session.getContainer().getDefaultAsyncSendTimeout());
            }
        }

        @Override
        public void sendText(String text, SendHandler handler) {
            checkNotNull(text, "text");
            checkNotNull(handler, "handler");
            session.restartIdleTimeoutExecutor();
            sendAsync(text, handler, AsyncMessageType.TEXT);
        }

        @Override
        public Future<Void> sendText(String text) {
            checkNotNull(text, "text");
            session.restartIdleTimeoutExecutor();
            return sendAsync(text, AsyncMessageType.TEXT);
        }

        @Override
        public Future<Void> sendBinary(ByteBuffer data) {
            checkNotNull(data, "data");
            session.restartIdleTimeoutExecutor();
            return sendAsync(data, AsyncMessageType.BINARY);
        }

        @Override
        public void sendBinary(ByteBuffer data, SendHandler handler) {
            checkNotNull(data, "data");
            checkNotNull(handler, "handler");
            session.restartIdleTimeoutExecutor();
            sendAsync(data, handler, AsyncMessageType.BINARY);
        }

        @Override
        public void sendObject(Object data, SendHandler handler) {
            checkNotNull(data, "data");
            checkNotNull(handler, "handler");
            session.restartIdleTimeoutExecutor();
            sendAsync(data, handler, AsyncMessageType.OBJECT);
        }

        @Override
        public Future<Void> sendObject(Object data) {
            checkNotNull(data, "data");
            session.restartIdleTimeoutExecutor();
            return sendAsync(data, AsyncMessageType.OBJECT);
        }

        @Override
        public long getSendTimeout() {
            return sendTimeout;
        }

        @Override
        public void setSendTimeout(long timeoutmillis) {
            sendTimeout = timeoutmillis;
            webSocket.setWriteTimeout(timeoutmillis);
        }

        /**
         * Sends the message asynchronously.
         * <p>
         * IMPORTANT NOTE: There is no need to start new thread here. All writer operations are by default
         * asynchronous,
         * the only difference between sync and async writer are that sync send should wait for future.get().
         *
         * @param message message to be sent
         * @param type    message type
         * @return message sending callback {@link Future}
         */
        private Future<Void> sendAsync(final Object message, final AsyncMessageType type) {
            Future<?> result = null;

            switch (type) {
                case TEXT:
                    session.getDebugContext().appendLogMessage(
                            LOGGER, Level.FINEST, DebugContext.Type.MESSAGE_OUT, "Sending text message: ", message);
                    result = webSocket.sendText((String) message, new WriterInfo(MessageType.TEXT, RemoteEndpointType.ASYNC));
                    break;

                case BINARY:
                    session.getDebugContext().appendLogMessage(
                            LOGGER, Level.FINEST, DebugContext.Type.MESSAGE_OUT, "Sending binary message");
                    result = webSocket.sendBinary(Utils.getRemainingArray((ByteBuffer) message),
                                new WriterInfo(MessageType.BINARY, RemoteEndpointType.ASYNC));
                    break;

                case OBJECT:
                    result = sendSyncObject(message, new WriterInfo(MessageType.OBJECT, RemoteEndpointType.ASYNC));
                    break;
            }

            final Future<?> finalResult = result;

            return new Future<Void>() {
                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
                    return finalResult.cancel(mayInterruptIfRunning);
                }

                @Override
                public boolean isCancelled() {
                    return finalResult.isCancelled();
                }

                @Override
                public boolean isDone() {
                    return finalResult.isDone();
                }

                @Override
                public Void get() throws InterruptedException, ExecutionException {
                    finalResult.get();
                    return null;
                }

                @Override
                public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
                        TimeoutException {
                    finalResult.get(timeout, unit);
                    return null;
                }
            };
        }

        /**
         * Sends the message asynchronously.
         * <p>
         * IMPORTANT NOTE: There is no need to start new thread here. All writer are by default asynchronous, only
         * difference between sync and async writer are that sync send should wait for future.get().
         *
         * @param message message to be sent
         * @param handler message sending callback handler
         * @param type    message type
         */
        private void sendAsync(final Object message, final SendHandler handler, final AsyncMessageType type) {
            switch (type) {
                case TEXT:
                    webSocket.sendText((String) message, handler, new WriterInfo(MessageType.TEXT, RemoteEndpointType.ASYNC));
                    break;

                case BINARY:
                    webSocket.sendBinary(Utils.getRemainingArray((ByteBuffer) message), handler,
                            new WriterInfo(MessageType.BINARY, RemoteEndpointType.ASYNC));
                    break;

                case OBJECT:
                    sendSyncObject(message, handler, new WriterInfo(MessageType.OBJECT, RemoteEndpointType.ASYNC));
                    break;
            }
        }

        private static enum AsyncMessageType {
            TEXT, // String
            BINARY,  // ByteBuffer
            OBJECT // OBJECT
        }
    }

    @SuppressWarnings("unchecked")
    Future<?> sendSyncObject(Object o, WriterInfo writerInfo) {
        Object toSend;
        try {
            session.getDebugContext()
                   .appendLogMessage(LOGGER, Level.FINEST, DebugContext.Type.MESSAGE_OUT, "Sending object: ", o);
            toSend = endpointWrapper.doEncode(session, o);
        } catch (final Exception e) {
            return new Future<Object>() {
                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
                    return false;
                }

                @Override
                public boolean isCancelled() {
                    return false;
                }

                @Override
                public boolean isDone() {
                    return true;
                }

                @Override
                public Object get() throws InterruptedException, ExecutionException {
                    throw new ExecutionException(e);
                }

                @Override
                public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
                        TimeoutException {
                    throw new ExecutionException(e);
                }
            };
        }

        if (toSend instanceof String) {
            return webSocket.sendText((String) toSend, writerInfo);
        } else if (toSend instanceof ByteBuffer) {
            return webSocket.sendBinary(Utils.getRemainingArray((ByteBuffer) toSend), writerInfo);
        } else if (toSend instanceof StringWriter) {
            StringWriter writer = (StringWriter) toSend;
            StringBuffer sb = writer.getBuffer();
            return webSocket.sendText(sb.toString(), writerInfo);
        } else if (toSend instanceof ByteArrayOutputStream) {
            ByteArrayOutputStream baos = (ByteArrayOutputStream) toSend;
            return webSocket.sendBinary(baos.toByteArray(), writerInfo);
        }

        return null;
    }

    // TODO: naming
    @SuppressWarnings("unchecked")
    void sendSyncObject(Object o, SendHandler handler, WriterInfo writerInfo) {
        if (o instanceof String) {
            webSocket.sendText((String) o, handler, writerInfo);
        } else {
            Object toSend = null;
            try {
                toSend = endpointWrapper.doEncode(session, o);
            } catch (final Exception e) {
                handler.onResult(new SendResult(e));
            }

            if (toSend instanceof String) {
                webSocket.sendText((String) toSend, handler, writerInfo);
            } else if (toSend instanceof ByteBuffer) {
                webSocket.sendBinary(Utils.getRemainingArray((ByteBuffer) toSend), handler, writerInfo);
            } else if (toSend instanceof StringWriter) {
                StringWriter writer = (StringWriter) toSend;
                StringBuffer sb = writer.getBuffer();
                webSocket.sendText(sb.toString(), handler, writerInfo);
            } else if (toSend instanceof ByteArrayOutputStream) {
                ByteArrayOutputStream baos = (ByteArrayOutputStream) toSend;
                webSocket.sendBinary(baos.toByteArray(), handler, writerInfo);
            }
        }
    }

    @Override
    public void sendPing(ByteBuffer applicationData) throws IOException {
        if (applicationData != null && applicationData.remaining() > 125) {
            throw new IllegalArgumentException(LocalizationMessages.APPLICATION_DATA_TOO_LONG("Ping"));
        }
        session.restartIdleTimeoutExecutor();
        webSocket.sendPing(Utils.getRemainingArray(applicationData));
    }

    @Override
    public void sendPong(ByteBuffer applicationData) throws IOException {
        if (applicationData != null && applicationData.remaining() > 125) {
            throw new IllegalArgumentException(LocalizationMessages.APPLICATION_DATA_TOO_LONG("Pong"));
        }
        session.restartIdleTimeoutExecutor();
        webSocket.sendPong(Utils.getRemainingArray(applicationData));
    }

    @Override
    public String toString() {
        return "Wrapped: " + getClass().getSimpleName();
    }

    @Override
    public void setBatchingAllowed(boolean allowed) {
        // TODO: Implement.
    }

    @Override
    public boolean getBatchingAllowed() {
        return false;  // TODO: Implement.
    }

    @Override
    public void flushBatch() {
        // TODO: Implement.
    }

    public void close(CloseReason cr) {
        LOGGER.fine("Close public void close(CloseReason cr): " + cr);
        webSocket.close(cr);
    }
}