TyrusWebSocketEngine.java

/*
 * Copyright (c) 2012, 2023 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.tyrus.core;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.websocket.CloseReason;
import javax.websocket.DeploymentException;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.Extension;
import javax.websocket.WebSocketContainer;
import javax.websocket.server.ServerEndpointConfig;

import org.glassfish.tyrus.core.cluster.ClusterContext;
import org.glassfish.tyrus.core.extension.ExtendedExtension;
import org.glassfish.tyrus.core.frame.CloseFrame;
import org.glassfish.tyrus.core.frame.Frame;
import org.glassfish.tyrus.core.l10n.LocalizationMessages;
import org.glassfish.tyrus.core.monitoring.ApplicationEventListener;
import org.glassfish.tyrus.core.monitoring.EndpointEventListener;
import org.glassfish.tyrus.core.monitoring.MessageEventListener;
import org.glassfish.tyrus.core.uri.Match;
import org.glassfish.tyrus.core.wsadl.model.Application;
import org.glassfish.tyrus.spi.Connection;
import org.glassfish.tyrus.spi.ReadHandler;
import org.glassfish.tyrus.spi.UpgradeRequest;
import org.glassfish.tyrus.spi.UpgradeResponse;
import org.glassfish.tyrus.spi.WebSocketEngine;
import org.glassfish.tyrus.spi.Writer;

/**
 * {@link WebSocketEngine} implementation, which handles server-side handshake, validation and data processing.
 *
 * @author Alexey Stashok
 * @author Pavel Bucek (pavel.bucek at oracle.com)
 * @see org.glassfish.tyrus.core.TyrusWebSocket
 * @see org.glassfish.tyrus.core.TyrusEndpointWrapper
 */
public class TyrusWebSocketEngine implements WebSocketEngine {

    /**
     * Maximum size of incoming buffer in bytes.
     * <p>
     * The value must be {@link java.lang.Integer} or its primitive alternative.
     * <p>
     * Default value is 4194315, which means that TyrusWebSocketEngine is by default
     * capable of processing messages up to 4 MB.
     */
    public static final String INCOMING_BUFFER_SIZE = "org.glassfish.tyrus.incomingBufferSize";

    /**
     * Maximum number of open sessions per server application.
     * <p>
     * The value must be positive {@link java.lang.Integer} or its primitive alternative. Negative values
     * and zero are ignored.
     * <p>
     * The number of open sessions per application is not limited by default.
     */
    public static final String MAX_SESSIONS_PER_APP = "org.glassfish.tyrus.maxSessionsPerApp";

    /**
     * Maximum number of open sessions per unique remote address.
     * <p>
     * The value must be positive {@link java.lang.Integer} or its primitive alternative. Negative values
     * and zero are ignored.
     * <p>
     * The number of open sessions per remote address is not limited by default.
     */
    public static final String MAX_SESSIONS_PER_REMOTE_ADDR = "org.glassfish.tyrus.maxSessionsPerRemoteAddr";

    /**
     * Property used for configuring the type of tracing supported by the server.
     * <p>
     * The value is expected to be string value of {@link org.glassfish.tyrus.core.DebugContext.TracingType}.
     * <p>
     * The default value is {@link org.glassfish.tyrus.core.DebugContext.TracingType#OFF}.
     */
    public static final String TRACING_TYPE = "org.glassfish.tyrus.server.tracingType";

    /**
     * Property used for configuring tracing threshold.
     * <p>
     * The value is expected to be string value of {@link org.glassfish.tyrus.core.DebugContext.TracingThreshold}.
     * <p>
     * The default value is {@link org.glassfish.tyrus.core.DebugContext.TracingThreshold#SUMMARY}.
     */
    public static final String TRACING_THRESHOLD = "org.glassfish.tyrus.server.tracingThreshold";

    /**
     * Wsadl support.
     * <p>
     * Wsadl is experimental feature which exposes endpoint configuration in form of XML file,
     * similarly as Wadl for REST services. Currently generated Wsadl contains only set of
     * endpoints and their endpoint paths. Wsadl is exposed on URI ending by "application.wsadl".
     * <p>
     * The value must be string, {@code "true"} means that the feature is enable, {@code "false"} that the feature
     * is disabled.
     * <p>
     * Default value is "false";
     */
    @Beta
    public static final String WSADL_SUPPORT = "org.glassfish.tyrus.server.wsadl";

    /**
     * Parallel broadcast support.
     * <p>
     * {@link org.glassfish.tyrus.core.TyrusSession#broadcast(String)} and {@link org.glassfish.tyrus.core
     * .TyrusSession#broadcast(java.nio.ByteBuffer)} operations are by default executed in parallel. The parallel
     * execution of broadcast can be disabled by setting this server property to {@code false}.
     * <p>
     * Expected value is {@code true} or {@code false} and the default value is {@code false}.
     *
     * @see org.glassfish.tyrus.core.TyrusSession#broadcast(String)
     * @see org.glassfish.tyrus.core.TyrusSession#broadcast(java.nio.ByteBuffer)
     */
    public static final String PARALLEL_BROADCAST_ENABLED = "org.glassfish.tyrus.server.parallelBroadcastEnabled";

    private static final int BUFFER_STEP_SIZE = 256;
    private static final Logger LOGGER = Logger.getLogger(TyrusWebSocketEngine.class.getName());

    private static final UpgradeInfo NOT_APPLICABLE_UPGRADE_INFO =
            new NoConnectionUpgradeInfo(UpgradeStatus.NOT_APPLICABLE);
    private static final UpgradeInfo HANDSHAKE_FAILED_UPGRADE_INFO =
            new NoConnectionUpgradeInfo(UpgradeStatus.HANDSHAKE_FAILED);
    private static final TyrusEndpointWrapper.SessionListener NO_OP_SESSION_LISTENER =
            new TyrusEndpointWrapper.SessionListener() {
            };

    private final Set<TyrusEndpointWrapper> endpointWrappers =
            Collections.newSetFromMap(new ConcurrentHashMap<TyrusEndpointWrapper, Boolean>());
    private final ComponentProviderService componentProviderService = ComponentProviderService.create();
    private final WebSocketContainer webSocketContainer;

    private int incomingBufferSize = 4194315; // 4M (payload) + 11 (frame overhead)

    private final ClusterContext clusterContext;
    private final ApplicationEventListener applicationEventListener;
    private final TyrusEndpointWrapper.SessionListener sessionListener;
    private final Boolean parallelBroadcastEnabled;

    private final DebugContext.TracingType tracingType;
    private final DebugContext.TracingThreshold tracingThreshold;

    /**
     * Create {@link org.glassfish.tyrus.core.TyrusWebSocketEngine.TyrusWebSocketEngineBuilder}
     * instance based on passed {@link WebSocketContainer}.
     *
     * @param webSocketContainer {@link WebSocketContainer} instance. Cannot be {@code null}.
     * @return new builder.
     */
    public static TyrusWebSocketEngineBuilder builder(WebSocketContainer webSocketContainer) {
        return new TyrusWebSocketEngineBuilder(webSocketContainer);
    }

    /**
     * Create {@link WebSocketEngine} instance based on passed {@link WebSocketContainer} and with configured maximal
     * incoming buffer size.
     *
     * @param webSocketContainer       used {@link WebSocketContainer} instance.
     * @param incomingBufferSize       maximal incoming buffer size (this engine won't be able to process messages
     *                                 bigger than this number. If null, default value will be used).
     * @param clusterContext           cluster context instance. {@code null} indicates standalone mode.
     * @param applicationEventListener listener used to collect monitored events.
     * @param maxSessionsPerApp        maximal number of open sessions per application. If {@code null}, no limit is
     *                                 applied.
     * @param maxSessionsPerRemoteAddr maximal number of open sessions per remote address. If {@code null}, no limit is
     *                                 applied.
     * @param tracingType              type of tracing.
     * @param tracingThreshold         tracing threshold.
     * @param parallelBroadcastEnabled {@code true} if parallel broadcast should be enabled, {@code true} is default.
     */
    private TyrusWebSocketEngine(WebSocketContainer webSocketContainer, Integer incomingBufferSize,
                                 ClusterContext clusterContext, ApplicationEventListener applicationEventListener,
                                 final Integer maxSessionsPerApp, final Integer maxSessionsPerRemoteAddr,
                                 DebugContext.TracingType tracingType, DebugContext.TracingThreshold tracingThreshold,
                                 Boolean parallelBroadcastEnabled) {
        if (incomingBufferSize != null) {
            this.incomingBufferSize = incomingBufferSize;
        }
        this.webSocketContainer = webSocketContainer;
        this.clusterContext = clusterContext;
        this.parallelBroadcastEnabled = parallelBroadcastEnabled;
        if (applicationEventListener == null) {
            // create dummy instance in order not to have to check null pointer
            this.applicationEventListener = ApplicationEventListener.NO_OP;
        } else {
            LOGGER.config(
                    "Application event listener " + applicationEventListener.getClass().getName() + " registered");
            this.applicationEventListener = applicationEventListener;
        }

        LOGGER.config("Incoming buffer size: " + this.incomingBufferSize);
        LOGGER.config("Max sessions per app: " + maxSessionsPerApp);
        LOGGER.config("Max sessions per remote address: " + maxSessionsPerRemoteAddr);
        // parallel broadcast is enabled by default, so null means true
        LOGGER.config("Parallel broadcast enabled: " + (parallelBroadcastEnabled != null && parallelBroadcastEnabled));

        this.tracingType = tracingType;
        this.tracingThreshold = tracingThreshold;

        this.sessionListener = maxSessionsPerApp == null && maxSessionsPerRemoteAddr == null
                ? NO_OP_SESSION_LISTENER : new TyrusEndpointWrapper.SessionListener() {
            // Implementation of {@link org.glassfish.tyrus.core.TyrusEndpointWrapper.SessionListener} counting
            // sessions.

            // limit per application counter
            private final AtomicInteger counter = new AtomicInteger(0);
            private final Object counterLock = new Object();

            // limit per remote address counter
            private final Map<String, AtomicInteger> remoteAddressCounters = new HashMap<String, AtomicInteger>();

            @Override
            public OnOpenResult onOpen(final TyrusSession session) {
                if (maxSessionsPerApp != null) {
                    synchronized (counterLock) {
                        if (counter.get() >= maxSessionsPerApp) {
                            return OnOpenResult.MAX_SESSIONS_PER_APP_EXCEEDED;
                        } else {
                            counter.incrementAndGet();
                        }
                    }
                }

                if (maxSessionsPerRemoteAddr != null) {
                    synchronized (remoteAddressCounters) {
                        AtomicInteger remoteAddressCounter = remoteAddressCounters.get(session.getRemoteAddr());
                        if (remoteAddressCounter == null) {
                            remoteAddressCounter = new AtomicInteger(1);
                            remoteAddressCounters.put(session.getRemoteAddr(), remoteAddressCounter);
                        } else if (remoteAddressCounter.get() >= maxSessionsPerRemoteAddr) {
                            return OnOpenResult.MAX_SESSIONS_PER_REMOTE_ADDR_EXCEEDED;
                        } else {
                            remoteAddressCounter.incrementAndGet();
                        }
                    }
                }

                return OnOpenResult.SESSION_ALLOWED;
            }

            @Override
            public void onClose(final TyrusSession session, final CloseReason closeReason) {
                if (maxSessionsPerApp != null) {
                    synchronized (counterLock) {
                        counter.decrementAndGet();
                    }
                }
                if (maxSessionsPerRemoteAddr != null) {
                    synchronized (remoteAddressCounters) {
                        int remoteAddressCounter = remoteAddressCounters.get(session.getRemoteAddr()).decrementAndGet();
                        if (remoteAddressCounter == 0) {
                            remoteAddressCounters.remove(session.getRemoteAddr());
                        }
                    }
                }
            }
        };
    }

    private static ProtocolHandler loadHandler(UpgradeRequest request) {
        for (Version version : Version.values()) {
            if (version.validate(request)) {
                return version.createHandler(false, null);
            }
        }
        return null;
    }

    private static void handleUnsupportedVersion(final UpgradeRequest request, UpgradeResponse response) {
        response.setStatus(426);
        response.getHeaders().put(UpgradeRequest.SEC_WEBSOCKET_VERSION,
                                  Arrays.asList(Version.getSupportedWireProtocolVersions()));
    }

    TyrusEndpointWrapper getEndpointWrapper(UpgradeRequest request, DebugContext debugContext) throws
            HandshakeException {
        if (endpointWrappers.isEmpty()) {
            return null;
        }

        final String requestPath = request.getRequestUri();

        for (Match m : Match.getAllMatches(requestPath, endpointWrappers, debugContext)) {
            final TyrusEndpointWrapper endpointWrapper = m.getEndpointWrapper();

            for (Map.Entry<String, String> parameter : m.getParameters().entrySet()) {
                request.getParameterMap().put(parameter.getKey(), Arrays.asList(parameter.getValue()));
            }

            if (endpointWrapper.upgrade(request)) {
                debugContext.appendTraceMessage(LOGGER, Level.FINE, DebugContext.Type.MESSAGE_IN,
                                                "Endpoint selected as a match to the handshake URI: ",
                                                endpointWrapper.getEndpointPath());
                debugContext.appendLogMessage(LOGGER, Level.FINER, DebugContext.Type.MESSAGE_IN, "Target endpoint: ",
                                              endpointWrapper);
                return endpointWrapper;
            }
        }

        return null;
    }

    @Override
    public UpgradeInfo upgrade(final UpgradeRequest request, final UpgradeResponse response) {

        DebugContext debugContext = createDebugContext(request);

        if (LOGGER.isLoggable(Level.FINE)) {
            debugContext.appendLogMessage(LOGGER, Level.FINE, DebugContext.Type.MESSAGE_IN,
                                          "Received handshake request:\n" + Utils.stringifyUpgradeRequest(request));
        }

        final TyrusEndpointWrapper endpointWrapper;
        try {
            endpointWrapper = getEndpointWrapper(request, debugContext);
        } catch (HandshakeException e) {
            return handleHandshakeException(e, response);
        }

        if (endpointWrapper != null) {
            final ProtocolHandler protocolHandler = loadHandler(request);
            if (protocolHandler == null) {
                handleUnsupportedVersion(request, response);
                debugContext.appendTraceMessage(LOGGER, Level.FINE, DebugContext.Type.MESSAGE_IN,
                                                "Upgrade request contains unsupported version of Websocket protocol");

                if (LOGGER.isLoggable(Level.FINE)) {
                    debugContext.appendLogMessage(LOGGER, Level.FINE, DebugContext.Type.MESSAGE_OUT,
                                                  "Sending handshake response:\n"
                                                          + Utils.stringifyUpgradeResponse(response));
                }

                response.getHeaders().putAll(debugContext.getTracingHeaders());
                debugContext.flush();
                return HANDSHAKE_FAILED_UPGRADE_INFO;
            }

            final ExtendedExtension.ExtensionContext extensionContext = new ExtendedExtension.ExtensionContext() {

                private final Map<String, Object> properties = new HashMap<String, Object>();

                @Override
                public Map<String, Object> getProperties() {
                    return properties;
                }
            };

            try {
                protocolHandler.handshake(endpointWrapper, request, response, extensionContext);
            } catch (HandshakeException e) {
                return handleHandshakeException(e, response);
            }

            if (LOGGER.isLoggable(Level.FINE)) {
                logExtensionsAndSubprotocol(protocolHandler, debugContext);
            }

            if (clusterContext != null
                    && request.getHeaders().get(UpgradeRequest.CLUSTER_CONNECTION_ID_HEADER) == null) {
                // TODO: we might need to introduce some property to check whether we should put this header into the
                // response.
                String connectionId = clusterContext.createConnectionId();
                response.getHeaders()
                        .put(UpgradeRequest.CLUSTER_CONNECTION_ID_HEADER, Collections.singletonList(connectionId));

                debugContext
                        .appendLogMessage(LOGGER, Level.FINE, DebugContext.Type.OTHER, "Connection ID: ", connectionId);
            }

            if (LOGGER.isLoggable(Level.FINE)) {
                debugContext.appendLogMessage(LOGGER, Level.FINE, DebugContext.Type.MESSAGE_OUT,
                                              "Sending handshake response:\n"
                                                      + Utils.stringifyUpgradeResponse(response) + "\n");
            }

            response.getHeaders().putAll(debugContext.getTracingHeaders());
            switch (response.getStatus()) {
                case 101:
                case 300:
                case 301:
                case 302:
                case 303:
                case 307:
                case 308:
                case 401:
                case 503:
                    return new SuccessfulUpgradeInfo(endpointWrapper, protocolHandler, incomingBufferSize, request, response,
                            extensionContext, debugContext);
                default:
                    return new NoConnectionUpgradeInfo(UpgradeStatus.HANDSHAKE_FAILED);
            }
        }

        response.setStatus(500);
        response.getHeaders().putAll(debugContext.getTracingHeaders());
        debugContext.flush();
        return NOT_APPLICABLE_UPGRADE_INFO;
    }

    private void logExtensionsAndSubprotocol(ProtocolHandler protocolHandler, DebugContext debugContext) {
        StringBuilder sb = new StringBuilder();
        sb.append("Using negotiated extensions: [");
        boolean isFirst = true;
        for (Extension extension : protocolHandler.getExtensions()) {
            if (isFirst) {
                isFirst = false;
            } else {
                sb.append(", ");
            }
            sb.append(extension.getName());
        }
        sb.append("]");

        debugContext.appendLogMessage(LOGGER, Level.FINE, DebugContext.Type.OTHER, "Using negotiated extensions: ", sb);
        debugContext.appendLogMessage(LOGGER, Level.FINE, DebugContext.Type.OTHER, "Using negotiated subprotocol: ",
                                      protocolHandler.getSubProtocol());
    }

    private DebugContext createDebugContext(UpgradeRequest upgradeRequest) {
        String thresholdHeader = upgradeRequest.getHeader(UpgradeRequest.TRACING_THRESHOLD);

        DebugContext.TracingThreshold threshold = tracingThreshold;

        Exception thresholdHeaderParsingError = null;
        if (thresholdHeader != null) {
            try {
                threshold = DebugContext.TracingThreshold.valueOf(thresholdHeader);
            } catch (Exception e) {
                thresholdHeaderParsingError = e;
            }
        }

        DebugContext debugContext;
        if (tracingType == DebugContext.TracingType.ALL || tracingType == DebugContext.TracingType.ON_DEMAND
                && upgradeRequest.getHeader(UpgradeRequest.ENABLE_TRACING_HEADER) != null) {
            debugContext = new DebugContext(threshold);
        } else {
            debugContext = new DebugContext();
        }

        if (thresholdHeaderParsingError != null) {
            debugContext.appendTraceMessageWithThrowable(LOGGER, Level.WARNING, DebugContext.Type.MESSAGE_IN,
                                                         thresholdHeaderParsingError,
                                                         "An error occurred while parsing ",
                                                         UpgradeRequest.TRACING_THRESHOLD, " header:",
                                                         thresholdHeaderParsingError.getMessage());
        }

        return debugContext;
    }

    private UpgradeInfo handleHandshakeException(HandshakeException handshakeException, UpgradeResponse response) {
        LOGGER.log(Level.CONFIG, handshakeException.getMessage(), handshakeException);
        response.setStatus(handshakeException.getHttpStatusCode());
        return HANDSHAKE_FAILED_UPGRADE_INFO;
    }

    private static class TyrusReadHandler implements ReadHandler {

        private final ProtocolHandler protocolHandler;
        private final TyrusWebSocket socket;
        private final TyrusEndpointWrapper endpointWrapper;
        private final int incomingBufferSize;
        private final ExtendedExtension.ExtensionContext extensionContext;
        private final DebugContext debugContext;

        private volatile ByteBuffer buffer;

        private TyrusReadHandler(ProtocolHandler protocolHandler, TyrusWebSocket socket,
                                 TyrusEndpointWrapper endpointWrapper, int incomingBufferSize,
                                 ExtendedExtension.ExtensionContext extensionContext, DebugContext debugContext) {
            this.extensionContext = extensionContext;
            this.protocolHandler = protocolHandler;
            this.socket = socket;
            this.endpointWrapper = endpointWrapper;
            this.incomingBufferSize = incomingBufferSize;
            this.debugContext = debugContext;
        }

        @Override
        public void handle(ByteBuffer data) {
            try {
                if (data != null && data.hasRemaining()) {

                    if (buffer != null) {
                        data = Utils.appendBuffers(buffer, data, incomingBufferSize, BUFFER_STEP_SIZE);
                    } else {
                        int newSize = data.remaining();
                        if (newSize > incomingBufferSize) {
                            throw new IllegalArgumentException(LocalizationMessages.BUFFER_OVERFLOW());
                        } else {
                            final int roundedSize = (newSize % BUFFER_STEP_SIZE) > 0
                                    ? ((newSize / BUFFER_STEP_SIZE) + 1) * BUFFER_STEP_SIZE : newSize;
                            final ByteBuffer result =
                                    ByteBuffer.allocate(roundedSize > incomingBufferSize ? newSize : roundedSize);
                            result.flip();
                            data = Utils.appendBuffers(result, data, incomingBufferSize, BUFFER_STEP_SIZE);
                        }
                    }

                    do {
                        final Frame incomingFrame = protocolHandler.unframe(data);

                        if (incomingFrame == null) {
                            buffer = data;
                            break;
                        } else {
                            Frame frame = incomingFrame;

                            for (Extension extension : protocolHandler.getExtensions()) {
                                if (extension instanceof ExtendedExtension) {
                                    try {
                                        frame = ((ExtendedExtension) extension)
                                                .processIncoming(extensionContext, frame);
                                    } catch (Throwable t) {
                                        debugContext.appendLogMessageWithThrowable(
                                                LOGGER, Level.FINE, DebugContext.Type.MESSAGE_IN, t, "Extension '",
                                                extension.getName(),
                                                "' threw an exception during processIncoming method invocation: ",
                                                t.getMessage());
                                    }
                                }
                            }

                            protocolHandler.process(frame, socket);
                        }
                    } while (true);
                }
            } catch (WebSocketException e) {
                debugContext.appendLogMessageWithThrowable(LOGGER, Level.FINE, DebugContext.Type.MESSAGE_IN, e,
                                                           e.getMessage());
                socket.onClose(new CloseFrame(e.getCloseReason()));
            } catch (Exception e) {
                String message = e.getMessage();
                debugContext.appendLogMessageWithThrowable(LOGGER, Level.FINE, DebugContext.Type.MESSAGE_IN, e,
                                                           e.getMessage());
                if (endpointWrapper.onError(socket, e)) {
                    socket.onClose(new CloseFrame(CloseReasons.create(CloseReason.CloseCodes.UNEXPECTED_CONDITION, message)));
                }
            }
        }
    }

    /**
     * Set incoming buffer size.
     *
     * @param incomingBufferSize buffer size in bytes.
     * @deprecated Please use {@link org.glassfish.tyrus.core.TyrusWebSocketEngine
     * .TyrusWebSocketEngineBuilder#incomingBufferSize(Integer)}
     * instead.
     */
    public void setIncomingBufferSize(int incomingBufferSize) {
        this.incomingBufferSize = incomingBufferSize;
    }

    /**
     * Registers the specified {@link TyrusEndpointWrapper} with the <code>WebSocketEngine</code>.
     *
     * @param endpointWrapper the {@link TyrusEndpointWrapper} to register.
     * @throws DeploymentException when added endpoint responds to same path as some already registered endpoint.
     */
    private void register(TyrusEndpointWrapper endpointWrapper) throws DeploymentException {
        checkPath(endpointWrapper);
        LOGGER.log(Level.FINER, "Registered endpoint: " + endpointWrapper);
        endpointWrappers.add(endpointWrapper);
    }

    @Override
    public void register(Class<?> endpointClass, String contextPath) throws DeploymentException {

        final ErrorCollector collector = new ErrorCollector();

        EndpointEventListenerWrapper endpointEventListenerWrapper = new EndpointEventListenerWrapper();
        AnnotatedEndpoint endpoint = AnnotatedEndpoint
                .fromClass(endpointClass, componentProviderService, true, incomingBufferSize, collector,
                           endpointEventListenerWrapper, webSocketContainer.getInstalledExtensions());
        EndpointConfig config = endpoint.getEndpointConfig();

        TyrusEndpointWrapper endpointWrapper =
                new TyrusEndpointWrapper(
                        endpoint, config, componentProviderService, webSocketContainer, contextPath,
                        config instanceof ServerEndpointConfig ? ((ServerEndpointConfig) config).getConfigurator()
                                : null, sessionListener, clusterContext, endpointEventListenerWrapper,
                        parallelBroadcastEnabled);

        if (collector.isEmpty()) {
            register(endpointWrapper);
        } else {
            throw collector.composeComprehensiveException();
        }

        String endpointPath = config instanceof ServerEndpointConfig ? ((ServerEndpointConfig) config).getPath() : null;
        EndpointEventListener endpointEventListener =
                applicationEventListener.onEndpointRegistered(endpointPath, endpointClass);
        endpointEventListenerWrapper.setEndpointEventListener(endpointEventListener);
    }

    @Override
    public void register(ServerEndpointConfig serverConfig, String contextPath) throws DeploymentException {

        TyrusEndpointWrapper endpointWrapper;

        Class<?> endpointClass = serverConfig.getEndpointClass();
        Class<?> parent = endpointClass;
        boolean isEndpointClass = false;

        do {
            parent = parent.getSuperclass();
            if (parent.equals(Endpoint.class)) {
                isEndpointClass = true;
            }
        } while (!parent.equals(Object.class));

        EndpointEventListenerWrapper endpointEventListenerWrapper = new EndpointEventListenerWrapper();

        if (isEndpointClass) {
            // we are pretty sure that endpoint class is javax.websocket.Endpoint descendant.
            //noinspection unchecked
            endpointWrapper = new TyrusEndpointWrapper((Class<? extends Endpoint>) endpointClass, serverConfig,
                                                       componentProviderService,
                                                       webSocketContainer, contextPath, serverConfig.getConfigurator(),
                                                       sessionListener, clusterContext, endpointEventListenerWrapper,
                                                       parallelBroadcastEnabled);
        } else {
            final ErrorCollector collector = new ErrorCollector();

            final AnnotatedEndpoint endpoint = AnnotatedEndpoint
                    .fromClass(endpointClass, componentProviderService, true, incomingBufferSize, collector,
                               endpointEventListenerWrapper, webSocketContainer.getInstalledExtensions());
            final EndpointConfig config = endpoint.getEndpointConfig();

            endpointWrapper = new TyrusEndpointWrapper(
                    endpoint, config, componentProviderService, webSocketContainer, contextPath,
                    config instanceof ServerEndpointConfig ? ((ServerEndpointConfig) config).getConfigurator() : null,
                    sessionListener, clusterContext, endpointEventListenerWrapper, parallelBroadcastEnabled);

            if (!collector.isEmpty()) {
                throw collector.composeComprehensiveException();
            }
        }

        register(endpointWrapper);
        EndpointEventListener endpointEventListener =
                applicationEventListener.onEndpointRegistered(serverConfig.getPath(), endpointClass);
        endpointEventListenerWrapper.setEndpointEventListener(endpointEventListener);
    }

    private void checkPath(TyrusEndpointWrapper endpoint) throws DeploymentException {
        for (TyrusEndpointWrapper endpointWrapper : endpointWrappers) {
            if (Match.isEquivalent(endpoint.getEndpointPath(), endpointWrapper.getEndpointPath())) {
                throw new DeploymentException(LocalizationMessages.EQUIVALENT_PATHS(endpoint.getEndpointPath(),
                                                                                    endpointWrapper.getEndpointPath()));
            }
        }
    }

    /**
     * Un-registers the specified {@link TyrusEndpointWrapper} with the <code>WebSocketEngine</code>.
     *
     * @param endpointWrapper the {@link TyrusEndpointWrapper} to un-register.
     */
    public void unregister(TyrusEndpointWrapper endpointWrapper) {
        endpointWrappers.remove(endpointWrapper);
        applicationEventListener.onEndpointUnregistered(endpointWrapper.getEndpointPath());
    }

    private static class NoConnectionUpgradeInfo implements UpgradeInfo {
        private final UpgradeStatus status;

        NoConnectionUpgradeInfo(UpgradeStatus status) {
            this.status = status;
        }

        @Override
        public UpgradeStatus getStatus() {
            return status;
        }

        @Override
        public Connection createConnection(Writer writer, Connection.CloseListener closeListener) {
            return null;
        }
    }

    private static class SuccessfulUpgradeInfo implements UpgradeInfo {

        private final TyrusEndpointWrapper endpointWrapper;
        private final ProtocolHandler protocolHandler;
        private final int incomingBufferSize;
        private final UpgradeRequest upgradeRequest;
        private final UpgradeResponse upgradeResponse;
        private final ExtendedExtension.ExtensionContext extensionContext;
        private final DebugContext debugContext;

        SuccessfulUpgradeInfo(TyrusEndpointWrapper endpointWrapper, ProtocolHandler protocolHandler,
                              int incomingBufferSize,
                              UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse,
                              ExtendedExtension.ExtensionContext extensionContext, DebugContext debugContext) {
            this.endpointWrapper = endpointWrapper;
            this.protocolHandler = protocolHandler;
            this.incomingBufferSize = incomingBufferSize;
            this.upgradeRequest = upgradeRequest;
            this.upgradeResponse = upgradeResponse;
            this.extensionContext = extensionContext;
            this.debugContext = debugContext;
        }

        @Override
        public UpgradeStatus getStatus() {
            return UpgradeStatus.SUCCESS;
        }

        @Override
        public Connection createConnection(Writer writer, Connection.CloseListener closeListener) {
            TyrusConnection tyrusConnection =
                    new TyrusConnection(endpointWrapper, protocolHandler, incomingBufferSize, writer, closeListener,
                                        upgradeRequest, upgradeResponse, extensionContext, debugContext);
            debugContext.flush();
            return tyrusConnection;
        }
    }

    /**
     * Get {@link org.glassfish.tyrus.core.monitoring.ApplicationEventListener} related to current {@link
     * org.glassfish.tyrus.core.TyrusWebSocketEngine} instance.
     *
     * @return listener instance.
     */
    public ApplicationEventListener getApplicationEventListener() {
        return applicationEventListener;
    }

    /**
     * Get {@link org.glassfish.tyrus.core.wsadl.model.Application} representing current set of deployed endpoints.
     *
     * @return application representing current set of deployed endpoints.
     */
    @Beta
    public Application getWsadlApplication() {
        Application application = new Application();
        for (TyrusEndpointWrapper wrapper : endpointWrappers) {
            org.glassfish.tyrus.core.wsadl.model.Endpoint endpoint =
                    new org.glassfish.tyrus.core.wsadl.model.Endpoint();
            endpoint.setPath(wrapper.getServerEndpointPath());
            application.getEndpoint().add(endpoint);
        }

        return application;
    }

    static class TyrusConnection implements Connection {

        private final ReadHandler readHandler;
        private final Writer writer;
        private final CloseListener closeListener;
        private final TyrusWebSocket socket;
        private final ExtendedExtension.ExtensionContext extensionContext;
        private final List<Extension> extensions;

        TyrusConnection(TyrusEndpointWrapper endpointWrapper, ProtocolHandler protocolHandler, int incomingBufferSize,
                        Writer writer, CloseListener closeListener,
                        UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse,
                        ExtendedExtension.ExtensionContext extensionContext, DebugContext debugContext) {
            protocolHandler.setWriter(writer);
            extensions = protocolHandler.getExtensions();
            this.socket = endpointWrapper.createSocket(protocolHandler);

            // TODO: we might need to introduce some property to check whether we should put this header into the
            // response.
            final List<String> connectionIdHeader =
                    upgradeRequest.getHeaders().get(UpgradeRequest.CLUSTER_CONNECTION_ID_HEADER);
            String connectionId;
            if (connectionIdHeader != null && connectionIdHeader.size() == 1) {
                connectionId = connectionIdHeader.get(0);
            } else {
                connectionId = upgradeResponse.getFirstHeaderValue(UpgradeRequest.CLUSTER_CONNECTION_ID_HEADER);
            }

            this.socket.onConnect(upgradeRequest, protocolHandler.getSubProtocol(), extensions, connectionId,
                                  debugContext);

            this.readHandler =
                    new TyrusReadHandler(protocolHandler, socket, endpointWrapper, incomingBufferSize, extensionContext,
                                         debugContext);
            this.writer = writer;
            this.closeListener = closeListener;
            this.extensionContext = extensionContext;
        }

        @Override
        public ReadHandler getReadHandler() {
            return readHandler;
        }

        @Override
        public Writer getWriter() {
            return writer;
        }

        @Override
        public CloseListener getCloseListener() {
            return closeListener;
        }

        @Override
        public void close(CloseReason reason) {
            if (!socket.isConnected()) {
                return;
            }

            socket.close(reason.getCloseCode().getCode(), reason.getReasonPhrase());

            for (Extension extension : extensions) {
                if (extension instanceof ExtendedExtension) {
                    try {
                        ((ExtendedExtension) extension).destroy(extensionContext);
                    } catch (Throwable t) {
                        // ignore.
                    }
                }
            }
        }
    }

    /**
     * {@link org.glassfish.tyrus.core.TyrusWebSocketEngine} builder.
     */
    public static class TyrusWebSocketEngineBuilder {

        private final WebSocketContainer webSocketContainer;

        private Integer incomingBufferSize = null;
        private ClusterContext clusterContext = null;
        private ApplicationEventListener applicationEventListener = null;
        private Integer maxSessionsPerApp = null;
        private Integer maxSessionsPerRemoteAddr = null;
        private DebugContext.TracingType tracingType = null;
        private DebugContext.TracingThreshold tracingThreshold = null;
        private Boolean parallelBroadcastEnabled = null;

        /**
         * Create new {@link org.glassfish.tyrus.core.TyrusWebSocketEngine} instance with current set of parameters.
         *
         * @return new {@link org.glassfish.tyrus.core.TyrusWebSocketEngine} instance.
         */
        public TyrusWebSocketEngine build() {
            if (maxSessionsPerApp != null && maxSessionsPerApp <= 0) {
                LOGGER.log(Level.CONFIG,
                           "Invalid configuration value " + MAX_SESSIONS_PER_APP + " (" + maxSessionsPerApp
                                   + "), expected value greater than 0.");
                maxSessionsPerApp = null;
            }

            if (maxSessionsPerRemoteAddr != null && maxSessionsPerRemoteAddr <= 0) {
                LOGGER.log(Level.CONFIG, "Invalid configuration value " + MAX_SESSIONS_PER_REMOTE_ADDR + " ("
                        + maxSessionsPerRemoteAddr + "), expected value greater than 0.");
                maxSessionsPerRemoteAddr = null;
            }

            if (maxSessionsPerApp != null && maxSessionsPerRemoteAddr != null
                    && maxSessionsPerApp < maxSessionsPerRemoteAddr) {
                LOGGER.log(Level.FINE,
                           String.format("Invalid configuration - value %s (%d) cannot be greater then %s (%d).",
                                         MAX_SESSIONS_PER_REMOTE_ADDR, maxSessionsPerRemoteAddr, MAX_SESSIONS_PER_APP,
                                         maxSessionsPerApp));
            }

            return new TyrusWebSocketEngine(webSocketContainer, incomingBufferSize, clusterContext,
                                            applicationEventListener, maxSessionsPerApp, maxSessionsPerRemoteAddr,
                                            tracingType, tracingThreshold, parallelBroadcastEnabled);
        }

        TyrusWebSocketEngineBuilder(WebSocketContainer webSocketContainer) {
            if (webSocketContainer == null) {
                throw new NullPointerException();
            }

            this.webSocketContainer = webSocketContainer;
        }

        /**
         * Set {@link org.glassfish.tyrus.core.monitoring.ApplicationEventListener}.
         * <p>
         * Listener can be used for monitoring various events and properties, such as deployed endpoints, ongoing
         * sessions etc...
         *
         * @param applicationEventListener listener instance used for building {@link org.glassfish.tyrus.core
         *                                 .TyrusWebSocketEngine}. Can be {@code null}.
         * @return updated builder.
         */
        public TyrusWebSocketEngineBuilder applicationEventListener(ApplicationEventListener applicationEventListener) {
            this.applicationEventListener = applicationEventListener;
            return this;
        }

        /**
         * Set incoming buffer size.
         *
         * @param incomingBufferSize maximal incoming buffer size (this engine won't be able to process messages bigger
         *                           than this number. If {@code null}, default value will be used).
         * @return updated builder.
         */
        public TyrusWebSocketEngineBuilder incomingBufferSize(Integer incomingBufferSize) {
            this.incomingBufferSize = incomingBufferSize;
            return this;
        }

        /**
         * Set {@link org.glassfish.tyrus.core.cluster.ClusterContext}.
         * <p>
         * ClusterContext provides clustering functionality.
         *
         * @param clusterContext cluster context instance. {@code null} indicates standalone mode.
         * @return updated builder.
         */
        public TyrusWebSocketEngineBuilder clusterContext(ClusterContext clusterContext) {
            this.clusterContext = clusterContext;
            return this;
        }

        /**
         * Set maximal number of open sessions per server application.
         *
         * @param maxSessionsPerApp maximal number of open sessions. If {@code null}, no limit is applied.
         * @return updated builder.
         */
        public TyrusWebSocketEngineBuilder maxSessionsPerApp(Integer maxSessionsPerApp) {
            this.maxSessionsPerApp = maxSessionsPerApp;
            return this;
        }

        /**
         * Set maximal number of open sessions from remote address.
         *
         * @param maxSessionsPerRemoteAddr maximal number of open sessions from remote address. If {@code null}, no
         *                                 limit is applied.
         * @return updated builder.
         */
        public TyrusWebSocketEngineBuilder maxSessionsPerRemoteAddr(Integer maxSessionsPerRemoteAddr) {
            this.maxSessionsPerRemoteAddr = maxSessionsPerRemoteAddr;
            return this;
        }

        /**
         * Set type of tracing.
         *
         * @param tracingType tracing type.
         * @return updated builder.
         */
        public TyrusWebSocketEngineBuilder tracingType(DebugContext.TracingType tracingType) {
            this.tracingType = tracingType;
            return this;
        }

        /**
         * Set tracing threshold.
         *
         * @param tracingThreshold tracing threshold.
         * @return updated builder.
         */
        public TyrusWebSocketEngineBuilder tracingThreshold(DebugContext.TracingThreshold tracingThreshold) {
            this.tracingThreshold = tracingThreshold;
            return this;
        }

        public TyrusWebSocketEngineBuilder parallelBroadcastEnabled(Boolean parallelBroadcastEnabled) {
            this.parallelBroadcastEnabled = parallelBroadcastEnabled;
            return this;
        }
    }

    /**
     * Endpoint event listener wrapper that allows setting the wrapped endpoint event listener later.
     */
    private static class EndpointEventListenerWrapper implements EndpointEventListener {

        private volatile EndpointEventListener endpointEventListener = EndpointEventListener.NO_OP;

        void setEndpointEventListener(EndpointEventListener endpointEventListener) {
            this.endpointEventListener = endpointEventListener;
        }

        @Override
        public MessageEventListener onSessionOpened(String sessionId) {
            return endpointEventListener.onSessionOpened(sessionId);
        }

        @Override
        public void onSessionClosed(String sessionId) {
            endpointEventListener.onSessionClosed(sessionId);
        }

        @Override
        public void onError(String sessionId, Throwable t) {
            endpointEventListener.onError(sessionId, t);
        }
    }
}