ProtocolHandler.java

/*
 * Copyright (c) 2012, 2025 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.tyrus.core;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.websocket.CloseReason;
import javax.websocket.Extension;
import javax.websocket.SendHandler;
import javax.websocket.SendResult;
import javax.websocket.server.HandshakeRequest;

import org.glassfish.tyrus.core.extension.ExtendedExtension;
import org.glassfish.tyrus.core.frame.BinaryFrame;
import org.glassfish.tyrus.core.frame.CloseFrame;
import org.glassfish.tyrus.core.frame.Frame;
import org.glassfish.tyrus.core.frame.TextFrame;
import org.glassfish.tyrus.core.frame.TyrusFrame;
import org.glassfish.tyrus.core.l10n.LocalizationMessages;
import org.glassfish.tyrus.core.monitoring.MessageEventListener;
import org.glassfish.tyrus.spi.CompletionHandler;
import org.glassfish.tyrus.spi.UpgradeRequest;
import org.glassfish.tyrus.spi.UpgradeResponse;
import org.glassfish.tyrus.spi.Writer;
import org.glassfish.tyrus.spi.WriterInfo;

/**
 * Tyrus protocol handler.
 * <p>
 * Responsible for framing and unframing raw websocket frames. Tyrus creates exactly one instance per Session.
 */
public final class ProtocolHandler {

    /**
     * RFC 6455
     */
    public static final int MASK_SIZE = 4;

    private static final Logger LOGGER = Logger.getLogger(ProtocolHandler.class.getName());
    private static final int SEND_TIMEOUT = 3000; // millis.

    private final boolean client;
    private final MaskingKeyGenerator maskingKeyGenerator;
    private final ParsingState parsingState = new ParsingState();

    private volatile TyrusWebSocket webSocket;
    private volatile byte outFragmentedType;
    private volatile Writer writer;
    private volatile byte inFragmentedType;
    private volatile boolean processingFragment;
    private volatile String subProtocol = null;
    private volatile List<Extension> extensions;
    private volatile ExtendedExtension.ExtensionContext extensionContext;
    private volatile ByteBuffer remainder = null;
    private volatile boolean hasExtensions = false;
    private volatile MessageEventListener messageEventListener = MessageEventListener.NO_OP;
    private volatile SendingFragmentState sendingFragment = SendingFragmentState.IDLE;

    private static final WriterInfo CLOSE = new WriterInfo(WriterInfo.MessageType.CLOSE, WriterInfo.RemoteEndpointType.SUPER);
    private static final WriterInfo NULL_INFO = new WriterInfo(null, null);

    /**
     * Synchronizes all public send* (including stream variants) methods.
     * <p>
     * The reason for this lock is that we need to have consistent value in {#sendingFragment} field to be able to
     * determine the sending state of this particular instance/session.
     */
    private final Lock lock = new ReentrantLock();

    /**
     * If partial message is being send and we want to send partial message with different type or other whole message,
     * we need to wait until "idleCondition" is signalled.
     */
    private final Condition idleCondition = lock.newCondition();

    /**
     * Sending state.
     */
    private static enum SendingFragmentState {

        /**
         * Session is idle - no partial message in progress.
         */
        IDLE,

        /**
         * Sending partial text message - final frame was not yet sent.
         */
        SENDING_TEXT,

        /**
         * Sending partial binary message - final frame was not yet sent.
         */
        SENDING_BINARY
    }

    /**
     * Constructor.
     *
     * @param client              {@code true} when this instance is on client side, {@code false} when on server side.
     * @param maskingKeyGenerator random number generator that will be used for generating masking keys. Masking keys
     *                            are required only on the client side and {@code maskingKeyGenerator} should be {@code
     *                            null} on the server side. If {@code null} on the client side, {@link
     *                            java.security.SecureRandom} will be used by default.
     */
    ProtocolHandler(boolean client, MaskingKeyGenerator maskingKeyGenerator) {
        this.client = client;

        if (client) {
            if (maskingKeyGenerator != null) {
                this.maskingKeyGenerator = maskingKeyGenerator;
            } else {
                this.maskingKeyGenerator = new MaskingKeyGenerator() {

                    private final SecureRandom secureRandom = new SecureRandom();

                    @Override
                    public int nextInt() {
                        return secureRandom.nextInt();
                    }
                };
            }
        } else {
            // masking key is not used on the server
            this.maskingKeyGenerator = null;
        }
    }

    /**
     * Set {@link Writer} instance.
     * <p>
     * The set instance is used for "sending" all outgoing WebSocket frames.
     *
     * @param writer {@link Writer} to be set.
     */
    public void setWriter(Writer writer) {
        this.writer = writer;
    }

    /**
     * Returns true when current connection has some negotiated extension.
     *
     * @return {@code true} if there is at least one negotiated extension associated to this connection, {@code false}
     * otherwise.
     */
    public boolean hasExtensions() {
        return hasExtensions;
    }

    /**
     * Server side handshake processing.
     *
     * @param endpointWrapper  endpoint related to the handshake (path is already matched).
     * @param request          handshake request.
     * @param response         handshake response.
     * @param extensionContext extension context.
     * @return server handshake object.
     * @throws HandshakeException when there is problem with received {@link UpgradeRequest}.
     */
    public Handshake handshake(TyrusEndpointWrapper endpointWrapper, UpgradeRequest request, UpgradeResponse
            response, ExtendedExtension.ExtensionContext extensionContext) throws HandshakeException {
        final Handshake handshake = Handshake.createServerHandshake(request, extensionContext);
        this.extensions = handshake.respond(request, response, endpointWrapper);
        this.subProtocol = response.getFirstHeaderValue(HandshakeRequest.SEC_WEBSOCKET_PROTOCOL);
        this.extensionContext = extensionContext;
        hasExtensions = extensions != null && extensions.size() > 0;
        return handshake;
    }

    /* package */ List<Extension> getExtensions() {
        return extensions;
    }

    /**
     * Client side. Set extensions negotiated for this WebSocket session/connection.
     *
     * @param extensions list of negotiated extensions. Can be {@code null}.
     */
    public void setExtensions(List<Extension> extensions) {
        this.extensions = extensions;
        this.hasExtensions = extensions != null && extensions.size() > 0;
    }

    /* package */ String getSubProtocol() {
        return subProtocol;
    }

    /**
     * Client side. Set WebSocket.
     *
     * @param webSocket client WebSocket connection.
     */
    public void setWebSocket(TyrusWebSocket webSocket) {
        this.webSocket = webSocket;
    }

    /**
     * Client side. Set extension context.
     *
     * @param extensionContext extension context.
     */
    public void setExtensionContext(ExtendedExtension.ExtensionContext extensionContext) {
        this.extensionContext = extensionContext;
    }

    /**
     * Set message event listener.
     *
     * @param messageEventListener message event listener.
     */
    public void setMessageEventListener(MessageEventListener messageEventListener) {
        this.messageEventListener = messageEventListener;
    }

    /**
     * Not message frames - ping/pong/...
     */
    /* package */
    final Future<Frame> send(TyrusFrame frame, WriterInfo writerInfo) {
        return send(frame, null, writerInfo, true);
    }

    private
    Future<Frame> send(TyrusFrame frame, CompletionHandler<Frame> completionHandler, WriterInfo writerInfo, Boolean useTimeout) {
        return write(frame, completionHandler, writerInfo, useTimeout);
    }

    private
    Future<Frame> send(ByteBuffer frame, CompletionHandler<Frame> completionHandler, WriterInfo writerInfo, Boolean useTimeout) {
        return write(frame, completionHandler, writerInfo, useTimeout);
    }

    @Deprecated
    public Future<Frame> send(byte[] data) {
        return send(data, NULL_INFO);
    }

    public Future<Frame> send(byte[] data, WriterInfo writerInfo) {
        lock.lock();
        try {
            checkSendingFragment();

            return send(new BinaryFrame(data, false, true), null, writerInfo, true);
        } finally {
            lock.unlock();
        }
    }

    @Deprecated
    public void send(final byte[] data, final SendHandler handler) {
        send(data, handler, NULL_INFO);
    }

    public void send(final byte[] data, final SendHandler handler, WriterInfo writerInfo) {
        lock.lock();

        try {
            checkSendingFragment();

            send(new BinaryFrame(data, false, true), new CompletionHandler<Frame>() {
                @Override
                public void failed(Throwable throwable) {
                    handler.onResult(new SendResult(throwable));
                }

                @Override
                public void completed(Frame result) {
                    handler.onResult(new SendResult());
                }
            }, writerInfo, true);
        } finally {
            lock.unlock();
        }
    }

    @Deprecated
    public Future<Frame> send(String data) {
        return send(data, NULL_INFO);
    }

    public Future<Frame> send(String data, WriterInfo writerInfo) {
        lock.lock();

        try {
            checkSendingFragment();
            return send(new TextFrame(data, false, true), writerInfo);
        } finally {
            lock.unlock();
        }
    }

    @Deprecated
    public void send(final String data, final SendHandler handler) {
        send(data, handler, NULL_INFO);
    }

    public void send(final String data, final SendHandler handler, WriterInfo writerInfo) {
        lock.lock();

        try {
            checkSendingFragment();

            send(new TextFrame(data, false, true), new CompletionHandler<Frame>() {
                @Override
                public void failed(Throwable throwable) {
                    handler.onResult(new SendResult(throwable));
                }

                @Override
                public void completed(Frame result) {
                    handler.onResult(new SendResult());
                }
            }, writerInfo, true);
        } finally {
            lock.unlock();
        }
    }

    /**
     * Raw frame is always whole (not partial).
     *
     * @param data serialized frame.
     * @return send future.
     */
    public Future<Frame> sendRawFrame(ByteBuffer data) {
        lock.lock();

        try {
            checkSendingFragment();

            return send(data, null, new WriterInfo(WriterInfo.MessageType.BINARY, WriterInfo.RemoteEndpointType.BROADCAST), true);
        } finally {
            lock.unlock();
        }
    }

    /**
     * Check whether current {@link ProtocolHandler} is sending a partial message.
     * <p>
     * If yes, wait for {@value ProtocolHandler#SEND_TIMEOUT} and if the message still cannot be sent, throw {@link
     * IllegalStateException}.
     */
    private void checkSendingFragment() {
        final long timeout = System.currentTimeMillis() + SEND_TIMEOUT;

        // idleCondition can be signalled but other thread could be scheduled before this one; of that thread starts
        // sending another partial message, we should wait again for the condition to be signalled.
        while (sendingFragment != SendingFragmentState.IDLE) {
            final long currentTimeMillis = System.currentTimeMillis();

            // timeout already reached.
            if (currentTimeMillis >= timeout) {
                throw new IllegalStateException();
            }

            try {
                if (!idleCondition.await(timeout - currentTimeMillis, TimeUnit.MILLISECONDS)) {
                    throw new IllegalStateException();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
            }
        }
    }

    @Deprecated
    public Future<Frame> stream(boolean last, byte[] bytes, int off, int len) {
        return stream(last, bytes, off, len, NULL_INFO);
    }

    public Future<Frame> stream(boolean last, byte[] bytes, int off, int len, WriterInfo writerInfo) {
        lock.lock();

        try {
            switch (sendingFragment) {
                case SENDING_BINARY:
                    Future<Frame> frameFuture = send(
                            new BinaryFrame(Arrays.copyOfRange(bytes, off, off + len), true, last), writerInfo);
                    if (last) {
                        sendingFragment = SendingFragmentState.IDLE;
                        idleCondition.signalAll();
                    }
                    return frameFuture;

                case SENDING_TEXT:
                    checkSendingFragment();
                    sendingFragment = (last ? SendingFragmentState.IDLE : SendingFragmentState.SENDING_BINARY);
                    return send(new BinaryFrame(Arrays.copyOfRange(bytes, off, off + len), false, last), writerInfo);

                default:
                    // IDLE
                    sendingFragment = (last ? SendingFragmentState.IDLE : SendingFragmentState.SENDING_BINARY);
                    return send(new BinaryFrame(Arrays.copyOfRange(bytes, off, off + len), false, last), writerInfo);
            }

        } finally {
            lock.unlock();
        }
    }

    @Deprecated
    public Future<Frame> stream(boolean last, String fragment) {
        return stream(last, fragment, NULL_INFO);
    }

    public Future<Frame> stream(boolean last, String fragment, WriterInfo writerInfo) {
        lock.lock();

        try {
            switch (sendingFragment) {
                case SENDING_TEXT:
                    Future<Frame> frameFuture = send(new TextFrame(fragment, true, last), writerInfo);
                    if (last) {
                        sendingFragment = SendingFragmentState.IDLE;
                        idleCondition.signalAll();
                    }
                    return frameFuture;

                case SENDING_BINARY:
                    checkSendingFragment();
                    sendingFragment = (last ? SendingFragmentState.IDLE : SendingFragmentState.SENDING_TEXT);
                    return send(new TextFrame(fragment, false, last), writerInfo);

                default:
                    // IDLE
                    sendingFragment = (last ? SendingFragmentState.IDLE : SendingFragmentState.SENDING_TEXT);
                    return send(new TextFrame(fragment, false, last), writerInfo);
            }

        } finally {
            lock.unlock();
        }
    }

    public Future<Frame> close(final int code, final String reason) {
        final CloseFrame outgoingCloseFrame;
        final CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.getCloseCode(code), reason);

        if (code == CloseReason.CloseCodes.NO_STATUS_CODE.getCode()
                || code == CloseReason.CloseCodes.CLOSED_ABNORMALLY.getCode()
                || code == CloseReason.CloseCodes.TLS_HANDSHAKE_FAILURE.getCode()
                // client side cannot send SERVICE_RESTART or TRY_AGAIN_LATER
                // will be replaced with NORMAL_CLOSURE
                || (client && (code == CloseReason.CloseCodes.SERVICE_RESTART.getCode()
                || code == CloseReason.CloseCodes.TRY_AGAIN_LATER.getCode()))) {

            outgoingCloseFrame = new CloseFrame(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, reason));
        } else {
            outgoingCloseFrame = new CloseFrame(closeReason);
        }

        Future<Frame> send;
        lock.lock();
        try {
            send = send(outgoingCloseFrame, null, CLOSE, false);
        } catch (Exception e) {
            send = new TyrusFuture<>();
            ((TyrusFuture) send).setFailure(e);
            LOGGER.warning(LocalizationMessages.EXCEPTION_CLOSE(e.getMessage()));
        } finally {
            lock.unlock();
        }

        webSocket.onClose(new CloseFrame(closeReason));

        return send;
    }

    private Future<Frame> write(final TyrusFrame frame, final CompletionHandler<Frame> completionHandler,
                                WriterInfo data, boolean useTimeout) {
        final Writer localWriter = writer;
        final TyrusFuture<Frame> future = new TyrusFuture<Frame>();

        if (localWriter == null) {
            throw new IllegalStateException(LocalizationMessages.CONNECTION_NULL());
        }

        final ByteBuffer byteBuffer = frame(frame);
        localWriter.write(byteBuffer, new CompletionHandlerWrapper(completionHandler, future, frame), data);
        messageEventListener.onFrameSent(frame.getFrameType(), frame.getPayloadLength());

        return future;
    }

    private Future<Frame> write(final ByteBuffer frame, final CompletionHandler<Frame> completionHandler,
                                WriterInfo data, boolean useTimeout) {
        final Writer localWriter = writer;
        final TyrusFuture<Frame> future = new TyrusFuture<Frame>();

        if (localWriter == null) {
            throw new IllegalStateException(LocalizationMessages.CONNECTION_NULL());
        }

        localWriter.write(frame, new CompletionHandlerWrapper(completionHandler, future, null), data);

        return future;
    }

    /**
     * Convert a byte[] to a long. Used for rebuilding payload length.
     *
     * @param bytes byte array to be converted.
     * @return converted byte array.
     */
    private long decodeLength(byte[] bytes) {
        return Utils.toLong(bytes, 0, bytes.length);
    }

    /**
     * Converts the length given to the appropriate framing data: <ol> <li>0-125 one element that is the payload
     * length.
     * <li>up to 0xFFFF, 3 element array starting with 126 with the following 2 bytes interpreted as a 16 bit unsigned
     * integer showing the payload length. <li>else 9 element array starting with 127 with the following 8 bytes
     * interpreted as a 64-bit unsigned integer (the high bit must be 0) showing the payload length. </ol>
     *
     * @param length the payload size
     * @return the array
     */
    private byte[] encodeLength(final long length) {
        byte[] lengthBytes;
        if (length <= 125) {
            lengthBytes = new byte[1];
            lengthBytes[0] = (byte) length;
        } else {
            byte[] b = Utils.toArray(length);
            if (length <= 0xFFFF) {
                lengthBytes = new byte[3];
                lengthBytes[0] = 126;
                System.arraycopy(b, 6, lengthBytes, 1, 2);
            } else {
                lengthBytes = new byte[9];
                lengthBytes[0] = 127;
                System.arraycopy(b, 0, lengthBytes, 1, 8);
            }
        }
        return lengthBytes;
    }

    private void validate(final byte fragmentType, byte opcode) {
        if (opcode != 0 && opcode != fragmentType && !isControlFrame(opcode)) {
            throw new ProtocolException(LocalizationMessages.SEND_MESSAGE_INFRAGMENT());
        }
    }

    private byte checkForLastFrame(Frame frame) {
        byte local = frame.getOpcode();

        if (frame.isControlFrame()) {
            local |= 0x80;
            return local;
        }

        if (!frame.isFin()) {
            if (outFragmentedType != 0) {
                local = 0x00;
            } else {
                outFragmentedType = local;
                local &= 0x7F;
            }
            validate(outFragmentedType, local);
        } else if (outFragmentedType != 0) {
            local = (byte) 0x80;
            outFragmentedType = 0;
        } else {
            local |= 0x80;
        }
        return local;
    }

    /* package */ void doClose() {
        final Writer localWriter = writer;
        if (localWriter == null) {
            throw new IllegalStateException(LocalizationMessages.CONNECTION_NULL());
        }

        try {
            localWriter.close();
        } catch (IOException e) {
            throw new IllegalStateException(LocalizationMessages.IOEXCEPTION_CLOSE(), e);
        }
    }

    /* package */ ByteBuffer frame(Frame frame) {

        if (client) {
            frame = Frame.builder(frame).maskingKey(maskingKeyGenerator.nextInt()).mask(true).build();
        }

        if (extensions != null && extensions.size() > 0) {
            for (Extension extension : extensions) {
                if (extension instanceof ExtendedExtension) {
                    try {
                        frame = ((ExtendedExtension) extension).processOutgoing(extensionContext, frame);
                    } catch (Throwable t) {
                        // TODO: define ExtendedExtension exception handling.
                        LOGGER.log(Level.FINE, LocalizationMessages.EXTENSION_EXCEPTION(extension.getName(), t
                                .getMessage()), t);
                    }
                }
            }
        }

        byte opcode = checkForLastFrame(frame);
        if (frame.isRsv1()) {
            opcode |= 0x40;
        }
        if (frame.isRsv2()) {
            opcode |= 0x20;
        }
        if (frame.isRsv3()) {
            opcode |= 0x10;
        }

        final byte[] bytes = frame.getPayloadData();
        final byte[] lengthBytes = encodeLength(frame.getPayloadLength());

        // TODO - length limited to int, it should be long (see RFC 9788, chapter 5.2)
        // TODO - in that case, we will need to NOT store dataframe inmemory - introduce maskingByteStream or
        // TODO   maskingByteBuffer
        final int payloadLength = (int) frame.getPayloadLength();
        int length = 1 + lengthBytes.length + payloadLength + (client ? MASK_SIZE : 0);
        int payloadStart = 1 + lengthBytes.length + (client ? MASK_SIZE : 0);
        final byte[] packet = new byte[length];
        packet[0] = opcode;
        System.arraycopy(lengthBytes, 0, packet, 1, lengthBytes.length);
        // if client, then we need to mask data.
        if (client) {
            Integer maskingKey = frame.getMaskingKey();
            if (maskingKey == null) {
                // TODO: improve validation/exception handling
                // TODO: related to ExtendedExtension
                throw new ProtocolException("Masking key cannot be null when sending message from client to server.");
            }
            Masker masker = new Masker(maskingKey);
            packet[1] |= 0x80;
            masker.mask(packet, payloadStart, bytes, payloadLength);
            System.arraycopy(masker.getMask(), 0, packet, payloadStart - MASK_SIZE, MASK_SIZE);
        } else {
            System.arraycopy(bytes, 0, packet, payloadStart, payloadLength);
        }
        return ByteBuffer.wrap(packet);
    }

    /**
     * TODO!
     *
     * @param buffer TODO.
     * @return TODO.
     */
    public Frame unframe(ByteBuffer buffer) {

        try {
            // this do { .. } while cycle was forced by findbugs check - complained about missing break statements.
            do {
                switch (parsingState.state.get()) {
                    case 0:
                        if (buffer.remaining() < 2) {
                            // Don't have enough bytes to read opcode and lengthCode
                            return null;
                        }

                        byte opcode = buffer.get();


                        parsingState.finalFragment = isBitSet(opcode, 7);
                        parsingState.controlFrame = isControlFrame(opcode);
                        parsingState.opcode = (byte) (opcode & 0x7f);
                        if (!parsingState.finalFragment && parsingState.controlFrame) {
                            throw new ProtocolException(LocalizationMessages.CONTROL_FRAME_FRAGMENTED());
                        }

                        byte lengthCode = buffer.get();

                        parsingState.masked = (lengthCode & 0x80) == 0x80;
                        parsingState.masker = new Masker(buffer);
                        if (parsingState.masked) {
                            lengthCode ^= 0x80;
                        }
                        parsingState.lengthCode = lengthCode;

                        parsingState.state.incrementAndGet();
                        break;
                    case 1:
                        if (parsingState.lengthCode <= 125) {
                            parsingState.length = parsingState.lengthCode;
                        } else {
                            if (parsingState.controlFrame) {
                                throw new ProtocolException(LocalizationMessages.CONTROL_FRAME_LENGTH());
                            }

                            final int lengthBytes = parsingState.lengthCode == 126 ? 2 : 8;
                            if (buffer.remaining() < lengthBytes) {
                                // Don't have enough bytes to read length
                                return null;
                            }
                            parsingState.masker.setBuffer(buffer);
                            parsingState.length = decodeLength(parsingState.masker.unmask(lengthBytes));
                        }
                        parsingState.state.incrementAndGet();
                        break;
                    case 2:
                        if (parsingState.masked) {
                            if (buffer.remaining() < MASK_SIZE) {
                                // Don't have enough bytes to read mask
                                return null;
                            }
                            parsingState.masker.setBuffer(buffer);
                            parsingState.masker.readMask();
                        }
                        parsingState.state.incrementAndGet();
                        break;
                    case 3:
                        if (buffer.remaining() < parsingState.length) {
                            return null;
                        }

                        parsingState.masker.setBuffer(buffer);
                        final byte[] data = parsingState.masker.unmask((int) parsingState.length);
                        if (data.length != parsingState.length) {
                            throw new ProtocolException(
                                    LocalizationMessages.DATA_UNEXPECTED_LENGTH(data.length, parsingState.length));
                        }

                        final Frame frame = Frame.builder().fin(parsingState.finalFragment)
                                                 .rsv1(isBitSet(parsingState.opcode, 6))
                                                 .rsv2(isBitSet(parsingState.opcode, 5))
                                                 .rsv3(isBitSet(parsingState.opcode, 4))
                                                 .opcode((byte) (parsingState.opcode & 0xf))
                                                 .payloadLength(parsingState.length)
                                                 .payloadData(data)
                                                 .build();

                        parsingState.recycle();

                        return frame;
                    default:
                        // Should never get here
                        throw new IllegalStateException(LocalizationMessages.UNEXPECTED_STATE(parsingState.state));
                }
            } while (true);
        } catch (Exception e) {
            parsingState.recycle();
            throw (RuntimeException) e;
        }
    }

    /**
     * TODO.
     * <p>
     * called after Extension execution.
     * <p>
     * validates frame + processes its content
     *
     * @param frame  TODO.
     * @param socket TODO.
     */
    public void process(Frame frame, TyrusWebSocket socket) {
        if (frame.isRsv1() || frame.isRsv2() || frame.isRsv3()) {
            throw new ProtocolException(LocalizationMessages.RSV_INCORRECTLY_SET());
        }

        final byte opcode = frame.getOpcode();
        final boolean fin = frame.isFin();
        if (!frame.isControlFrame()) {
            final boolean continuationFrame = (opcode == 0);
            if (continuationFrame && !processingFragment) {
                throw new ProtocolException(LocalizationMessages.UNEXPECTED_END_FRAGMENT());
            }
            if (processingFragment && !continuationFrame) {
                throw new ProtocolException(LocalizationMessages.FRAGMENT_INVALID_OPCODE());
            }
            if (!fin && !continuationFrame) {
                processingFragment = true;
            }
            if (!fin) {
                if (inFragmentedType == 0) {
                    inFragmentedType = opcode;
                }
            }
        }

        TyrusFrame tyrusFrame = TyrusFrame.wrap(frame, inFragmentedType, remainder);

        // TODO - utf8 decoder needs this state to be shared among decoded frames.
        // TODO - investigate whether it can be removed; (this effectively denies lazy decoding)
        if (tyrusFrame instanceof TextFrame) {
            remainder = ((TextFrame) tyrusFrame).getRemainder();
        }

        // server should not allow receiving 1012 or 1013 from the client
        // (SERVICE_RESTART and TRY_AGAIN_LATER does not make sense from the client side.
        if (!client) {
            if (tyrusFrame.isControlFrame() && tyrusFrame instanceof CloseFrame) {
                CloseReason.CloseCode closeCode = ((CloseFrame) tyrusFrame).getCloseReason().getCloseCode();
                if (closeCode.equals(CloseReason.CloseCodes.SERVICE_RESTART)
                        || closeCode.equals(CloseReason.CloseCodes.TRY_AGAIN_LATER)) {
                    throw new ProtocolException("Illegal close code: " + closeCode);
                }
            }
        }

        tyrusFrame.respond(socket);

        if (!tyrusFrame.isControlFrame() && fin) {
            inFragmentedType = 0;
            processingFragment = false;
        }
    }

    private boolean isControlFrame(byte opcode) {
        return (opcode & 0x08) == 0x08;
    }

    private boolean isBitSet(final byte b, int bit) {
        return ((b >> bit & 1) != 0);
    }

    /**
     * Handler passed to the {@link org.glassfish.tyrus.spi.Writer}.
     */
    private static class CompletionHandlerWrapper extends CompletionHandler<ByteBuffer> {

        private final CompletionHandler<Frame> frameCompletionHandler;
        private final TyrusFuture<Frame> future;
        private final Frame frame;

        private CompletionHandlerWrapper(CompletionHandler<Frame> frameCompletionHandler, TyrusFuture<Frame> future,
                                         Frame frame) {
            this.frameCompletionHandler = frameCompletionHandler;
            this.future = future;
            this.frame = frame;
        }

        @Override
        public void cancelled() {
            if (frameCompletionHandler != null) {
                frameCompletionHandler.cancelled();
            }

            if (future != null) {
                future.setFailure(new RuntimeException(LocalizationMessages.FRAME_WRITE_CANCELLED()));
            }
        }

        @Override
        public void failed(Throwable throwable) {
            if (frameCompletionHandler != null) {
                frameCompletionHandler.failed(throwable);
            }

            if (future != null) {
                future.setFailure(throwable);
            }
        }

        @Override
        public void completed(ByteBuffer result) {
            if (frameCompletionHandler != null) {
                frameCompletionHandler.completed(frame);
            }

            if (future != null) {
                future.setResult(frame);
            }
        }

        @Override
        public void updated(ByteBuffer result) {
            if (frameCompletionHandler != null) {
                frameCompletionHandler.updated(frame);
            }
        }
    }

    private static class ParsingState {
        final AtomicInteger state = new AtomicInteger(0);
        volatile byte opcode = (byte) -1;
        volatile long length = -1;
        volatile boolean masked;
        volatile Masker masker;
        volatile boolean finalFragment;
        volatile boolean controlFrame;

        private volatile byte lengthCode = -1;

        void recycle() {
            state.set(0);
            opcode = (byte) -1;
            length = -1;
            lengthCode = -1;
            masked = false;
            masker = null;
            finalFragment = false;
            controlFrame = false;
        }
    }
}