AbstractH2StreamMultiplexer.java

/*
 * ====================================================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * ====================================================================
 *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package org.apache.hc.core5.http2.impl.nio;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.charset.CharacterCodingException;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLSession;

import org.apache.hc.core5.concurrent.CancellableDependency;
import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.EndpointDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpStreamResetException;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.RequestNotExecutedException;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.impl.BasicEndpointDetails;
import org.apache.hc.core5.http.impl.BasicHttpConnectionMetrics;
import org.apache.hc.core5.http.impl.CharCodingSupport;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncPushProducer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.command.ExecutableCommand;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.http2.H2ConnectionException;
import org.apache.hc.core5.http2.H2Error;
import org.apache.hc.core5.http2.H2StreamResetException;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.http2.config.H2Param;
import org.apache.hc.core5.http2.config.H2Setting;
import org.apache.hc.core5.http2.frame.FrameFactory;
import org.apache.hc.core5.http2.frame.FrameFlag;
import org.apache.hc.core5.http2.frame.FrameType;
import org.apache.hc.core5.http2.frame.RawFrame;
import org.apache.hc.core5.http2.frame.StreamIdGenerator;
import org.apache.hc.core5.http2.hpack.HPackDecoder;
import org.apache.hc.core5.http2.hpack.HPackEncoder;
import org.apache.hc.core5.http2.impl.BasicH2TransportMetrics;
import org.apache.hc.core5.http2.nio.AsyncPingHandler;
import org.apache.hc.core5.http2.nio.command.PingCommand;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.reactor.Command;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.reactor.ssl.TlsDetails;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.ByteArrayBuffer;
import org.apache.hc.core5.util.Identifiable;
import org.apache.hc.core5.util.Timeout;

abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnection {

    private static final long LINGER_TIME = 1000; // 1 second
    private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024; // 10 MiB

    enum ConnectionHandshake { READY, ACTIVE, GRACEFUL_SHUTDOWN, SHUTDOWN}
    enum SettingsHandshake { READY, TRANSMITTED, ACKED }

    private final ProtocolIOSession ioSession;
    private final FrameFactory frameFactory;
    private final StreamIdGenerator idGenerator;
    private final HttpProcessor httpProcessor;
    private final H2Config localConfig;
    private final BasicH2TransportMetrics inputMetrics;
    private final BasicH2TransportMetrics outputMetrics;
    private final BasicHttpConnectionMetrics connMetrics;
    private final FrameInputBuffer inputBuffer;
    private final FrameOutputBuffer outputBuffer;
    private final Deque<RawFrame> outputQueue;
    private final HPackEncoder hPackEncoder;
    private final HPackDecoder hPackDecoder;
    private final Map<Integer, H2Stream> streamMap;
    private final Queue<AsyncPingHandler> pingHandlers;
    private final AtomicInteger connInputWindow;
    private final AtomicInteger connOutputWindow;
    private final AtomicInteger outputRequests;
    private final AtomicInteger lastStreamId;
    private final H2StreamListener streamListener;

    private ConnectionHandshake connState = ConnectionHandshake.READY;
    private SettingsHandshake localSettingState = SettingsHandshake.READY;
    private SettingsHandshake remoteSettingState = SettingsHandshake.READY;

    private int initInputWinSize;
    private int initOutputWinSize;
    private int lowMark;

    private volatile H2Config remoteConfig;

    private Continuation continuation;

    private int processedRemoteStreamId;
    private EndpointDetails endpointDetails;
    private boolean goAwayReceived;

    AbstractH2StreamMultiplexer(
            final ProtocolIOSession ioSession,
            final FrameFactory frameFactory,
            final StreamIdGenerator idGenerator,
            final HttpProcessor httpProcessor,
            final CharCodingConfig charCodingConfig,
            final H2Config h2Config,
            final H2StreamListener streamListener) {
        this.ioSession = Args.notNull(ioSession, "IO session");
        this.frameFactory = Args.notNull(frameFactory, "Frame factory");
        this.idGenerator = Args.notNull(idGenerator, "Stream id generator");
        this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
        this.localConfig = h2Config != null ? h2Config : H2Config.DEFAULT;
        this.inputMetrics = new BasicH2TransportMetrics();
        this.outputMetrics = new BasicH2TransportMetrics();
        this.connMetrics = new BasicHttpConnectionMetrics(this.inputMetrics, this.outputMetrics);
        this.inputBuffer = new FrameInputBuffer(this.inputMetrics, this.localConfig.getMaxFrameSize());
        this.outputBuffer = new FrameOutputBuffer(this.outputMetrics, this.localConfig.getMaxFrameSize());
        this.outputQueue = new ConcurrentLinkedDeque<>();
        this.pingHandlers = new ConcurrentLinkedQueue<>();
        this.outputRequests = new AtomicInteger(0);
        this.lastStreamId = new AtomicInteger(0);
        this.hPackEncoder = new HPackEncoder(CharCodingSupport.createEncoder(charCodingConfig));
        this.hPackDecoder = new HPackDecoder(CharCodingSupport.createDecoder(charCodingConfig));
        this.streamMap = new ConcurrentHashMap<>();
        this.remoteConfig = H2Config.INIT;
        this.connInputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());
        this.connOutputWindow = new AtomicInteger(H2Config.INIT.getInitialWindowSize());

        this.initInputWinSize = H2Config.INIT.getInitialWindowSize();
        this.initOutputWinSize = H2Config.INIT.getInitialWindowSize();

        this.hPackDecoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
        this.hPackEncoder.setMaxTableSize(H2Config.INIT.getHeaderTableSize());
        this.hPackDecoder.setMaxListSize(H2Config.INIT.getMaxHeaderListSize());

        this.lowMark = H2Config.INIT.getInitialWindowSize() / 2;
        this.streamListener = streamListener;
    }

    @Override
    public String getId() {
        return ioSession.getId();
    }

    abstract void acceptHeaderFrame() throws H2ConnectionException;

    abstract void acceptPushRequest() throws H2ConnectionException;

    abstract void acceptPushFrame() throws H2ConnectionException;

    abstract H2StreamHandler createRemotelyInitiatedStream(
            H2StreamChannel channel,
            HttpProcessor httpProcessor,
            BasicHttpConnectionMetrics connMetrics,
            HandlerFactory<AsyncPushConsumer> pushHandlerFactory) throws IOException;

    abstract H2StreamHandler createLocallyInitiatedStream(
            ExecutableCommand command,
            H2StreamChannel channel,
            HttpProcessor httpProcessor,
            BasicHttpConnectionMetrics connMetrics) throws IOException;

    private int updateWindow(final AtomicInteger window, final int delta) throws ArithmeticException {
        for (;;) {
            final int current = window.get();
            long newValue = (long) current + delta;

            //TODO: work-around for what looks like a bug in Ngnix (1.11)
            // Tolerate if the update window exceeded by one
            if (newValue == 0x80000000L) {
                newValue = Integer.MAX_VALUE;
            }
            //TODO: needs to be removed

            if (Math.abs(newValue) > 0x7fffffffL) {
                throw new ArithmeticException("Update causes flow control window to exceed " + Integer.MAX_VALUE);
            }
            if (window.compareAndSet(current, (int) newValue)) {
                return (int) newValue;
            }
        }
    }

    private int updateInputWindow(
            final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
        final int newSize = updateWindow(window, delta);
        if (streamListener != null) {
            streamListener.onInputFlowControl(this, streamId, delta, newSize);
        }
        return newSize;
    }

    private int updateOutputWindow(
            final int streamId, final AtomicInteger window, final int delta) throws ArithmeticException {
        final int newSize = updateWindow(window, delta);
        if (streamListener != null) {
            streamListener.onOutputFlowControl(this, streamId, delta, newSize);
        }
        return newSize;
    }

    private void commitFrameInternal(final RawFrame frame) throws IOException {
        if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
            if (streamListener != null) {
                streamListener.onFrameOutput(this, frame.getStreamId(), frame);
            }
            outputBuffer.write(frame, ioSession);
        } else {
            outputQueue.addLast(frame);
        }
        ioSession.setEvent(SelectionKey.OP_WRITE);
    }

    private void commitFrame(final RawFrame frame) throws IOException {
        Args.notNull(frame, "Frame");
        ioSession.getLock().lock();
        try {
            commitFrameInternal(frame);
        } finally {
            ioSession.getLock().unlock();
        }
    }

    private void commitHeaders(
            final int streamId, final List<? extends Header> headers, final boolean endStream) throws IOException {
        if (streamListener != null) {
            streamListener.onHeaderOutput(this, streamId, headers);
        }
        final ByteArrayBuffer buf = new ByteArrayBuffer(512);
        hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());

        int off = 0;
        int remaining = buf.length();
        boolean continuation = false;

        while (remaining > 0) {
            final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
            final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);

            remaining -= chunk;
            off += chunk;

            final boolean endHeaders = remaining == 0;
            final RawFrame frame;
            if (!continuation) {
                frame = frameFactory.createHeaders(streamId, payload, endHeaders, endStream);
                continuation = true;
            } else {
                frame = frameFactory.createContinuation(streamId, payload, endHeaders);
            }
            commitFrameInternal(frame);
        }
    }

    private void commitPushPromise(
            final int streamId, final int promisedStreamId, final List<Header> headers) throws IOException {
        if (headers == null || headers.isEmpty()) {
            throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
        }
        if (streamListener != null) {
            streamListener.onHeaderOutput(this, streamId, headers);
        }
        final ByteArrayBuffer buf = new ByteArrayBuffer(512);
        buf.append((byte)(promisedStreamId >> 24));
        buf.append((byte)(promisedStreamId >> 16));
        buf.append((byte)(promisedStreamId >> 8));
        buf.append((byte)(promisedStreamId));

        hPackEncoder.encodeHeaders(buf, headers, localConfig.isCompressionEnabled());

        int off = 0;
        int remaining = buf.length();
        boolean continuation = false;

        while (remaining > 0) {
            final int chunk = Math.min(remoteConfig.getMaxFrameSize(), remaining);
            final ByteBuffer payload = ByteBuffer.wrap(buf.array(), off, chunk);

            remaining -= chunk;
            off += chunk;

            final boolean endHeaders = remaining == 0;
            final RawFrame frame;
            if (!continuation) {
                frame = frameFactory.createPushPromise(streamId, payload, endHeaders);
                continuation = true;
            } else {
                frame = frameFactory.createContinuation(streamId, payload, endHeaders);
            }
            commitFrameInternal(frame);
        }
    }

    private void streamDataFrame(
            final int streamId,
            final AtomicInteger streamOutputWindow,
            final ByteBuffer payload,
            final int chunk) throws IOException {
        final RawFrame dataFrame = frameFactory.createData(streamId, payload, false);
        if (streamListener != null) {
            streamListener.onFrameOutput(this, streamId, dataFrame);
        }
        updateOutputWindow(0, connOutputWindow, -chunk);
        updateOutputWindow(streamId, streamOutputWindow, -chunk);
        outputBuffer.write(dataFrame, ioSession);
    }

    private int streamData(
            final int streamId, final AtomicInteger streamOutputWindow, final ByteBuffer payload) throws IOException {
        if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
            final int capacity = Math.min(connOutputWindow.get(), streamOutputWindow.get());
            if (capacity <= 0) {
                return 0;
            }
            final int maxPayloadSize = Math.min(capacity, outputBuffer.getMaxFramePayloadSize());
            final int chunk;
            if (payload.remaining() <= maxPayloadSize) {
                chunk = payload.remaining();
                streamDataFrame(streamId, streamOutputWindow, payload, chunk);
            } else {
                chunk = maxPayloadSize;
                final int originalLimit = payload.limit();
                try {
                    payload.limit(payload.position() + chunk);
                    streamDataFrame(streamId, streamOutputWindow, payload, chunk);
                } finally {
                    payload.limit(originalLimit);
                }
            }
            payload.position(payload.position() + chunk);
            ioSession.setEvent(SelectionKey.OP_WRITE);
            return chunk;
        }
        return 0;
    }

    private void incrementInputCapacity(
            final int streamId, final AtomicInteger inputWindow, final int inputCapacity) throws IOException {
        if (inputCapacity > 0) {
            final int streamWinSize = inputWindow.get();
            final int remainingCapacity = Integer.MAX_VALUE - streamWinSize;
            final int chunk = Math.min(inputCapacity, remainingCapacity);
            if (chunk != 0) {
                final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(streamId, chunk);
                commitFrame(windowUpdateFrame);
                updateInputWindow(streamId, inputWindow, chunk);
            }
        }
    }

    private void requestSessionOutput() {
        outputRequests.incrementAndGet();
        ioSession.setEvent(SelectionKey.OP_WRITE);
    }

    private void updateLastStreamId(final int streamId) {
        final int currentId = lastStreamId.get();
        if (streamId > currentId) {
            lastStreamId.compareAndSet(currentId, streamId);
        }
    }

    private int generateStreamId() {
        for (;;) {
            final int currentId = lastStreamId.get();
            final int newStreamId = idGenerator.generate(currentId);
            if (lastStreamId.compareAndSet(currentId, newStreamId)) {
                return newStreamId;
            }
        }
    }

    public final void onConnect() throws HttpException, IOException {
        connState = ConnectionHandshake.ACTIVE;
        final RawFrame settingsFrame = frameFactory.createSettings(
                new H2Setting(H2Param.HEADER_TABLE_SIZE, localConfig.getHeaderTableSize()),
                new H2Setting(H2Param.ENABLE_PUSH, localConfig.isPushEnabled() ? 1 : 0),
                new H2Setting(H2Param.MAX_CONCURRENT_STREAMS, localConfig.getMaxConcurrentStreams()),
                new H2Setting(H2Param.INITIAL_WINDOW_SIZE, localConfig.getInitialWindowSize()),
                new H2Setting(H2Param.MAX_FRAME_SIZE, localConfig.getMaxFrameSize()),
                new H2Setting(H2Param.MAX_HEADER_LIST_SIZE, localConfig.getMaxHeaderListSize()));

        commitFrame(settingsFrame);
        localSettingState = SettingsHandshake.TRANSMITTED;
        maximizeConnWindow(connInputWindow.get());

        if (streamListener != null) {
            final int initInputWindow = connInputWindow.get();
            streamListener.onInputFlowControl(this, 0, initInputWindow, initInputWindow);
            final int initOutputWindow = connOutputWindow.get();
            streamListener.onOutputFlowControl(this, 0, initOutputWindow, initOutputWindow);
        }
    }

    public final void onInput(final ByteBuffer src) throws HttpException, IOException {
        if (connState == ConnectionHandshake.SHUTDOWN) {
            ioSession.clearEvent(SelectionKey.OP_READ);
        } else {
            for (;;) {
                final RawFrame frame = inputBuffer.read(src, ioSession);
                if (frame == null) {
                    break;
                }
                if (streamListener != null) {
                    streamListener.onFrameInput(this, frame.getStreamId(), frame);
                }
                consumeFrame(frame);
            }
        }
    }

    public final void onOutput() throws HttpException, IOException {
        ioSession.getLock().lock();
        try {
            if (!outputBuffer.isEmpty()) {
                outputBuffer.flush(ioSession);
            }
            while (outputBuffer.isEmpty()) {
                final RawFrame frame = outputQueue.poll();
                if (frame != null) {
                    if (streamListener != null) {
                        streamListener.onFrameOutput(this, frame.getStreamId(), frame);
                    }
                    outputBuffer.write(frame, ioSession);
                } else {
                    break;
                }
            }
        } finally {
            ioSession.getLock().unlock();
        }

        if (connState.compareTo(ConnectionHandshake.SHUTDOWN) < 0) {

            if (connOutputWindow.get() > 0 && remoteSettingState == SettingsHandshake.ACKED) {
                produceOutput();
            }
            final int pendingOutputRequests = outputRequests.get();
            boolean outputPending = false;
            if (!streamMap.isEmpty() && connOutputWindow.get() > 0) {
                for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
                    final Map.Entry<Integer, H2Stream> entry = it.next();
                    final H2Stream stream = entry.getValue();
                    if (!stream.isLocalClosed()
                            && stream.getOutputWindow().get() > 0
                            && stream.isOutputReady()) {
                        outputPending = true;
                        break;
                    }
                }
            }
            ioSession.getLock().lock();
            try {
                if (!outputPending && outputBuffer.isEmpty() && outputQueue.isEmpty()
                        && outputRequests.compareAndSet(pendingOutputRequests, 0)) {
                    ioSession.clearEvent(SelectionKey.OP_WRITE);
                } else {
                    outputRequests.addAndGet(-pendingOutputRequests);
                }
            } finally {
                ioSession.getLock().unlock();
            }
        }

        if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0 && remoteSettingState == SettingsHandshake.ACKED) {
            processPendingCommands();
        }
        if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) == 0) {
            int liveStreams = 0;
            for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
                final Map.Entry<Integer, H2Stream> entry = it.next();
                final H2Stream stream = entry.getValue();
                if (stream.isLocalClosed() && stream.isRemoteClosed()) {
                    stream.releaseResources();
                    it.remove();
                } else {
                    if (idGenerator.isSameSide(stream.getId()) || stream.getId() <= processedRemoteStreamId) {
                        liveStreams++;
                    }
                }
            }
            if (liveStreams == 0) {
                connState = ConnectionHandshake.SHUTDOWN;
            }
        }
        if (connState.compareTo(ConnectionHandshake.SHUTDOWN) >= 0) {
            if (!streamMap.isEmpty()) {
                for (final H2Stream stream : streamMap.values()) {
                    stream.releaseResources();
                }
                streamMap.clear();
            }
            ioSession.getLock().lock();
            try {
                if (outputBuffer.isEmpty() && outputQueue.isEmpty()) {
                    ioSession.close();
                }
            } finally {
                ioSession.getLock().unlock();
            }
        }
    }

    public final void onTimeout(final Timeout timeout) throws HttpException, IOException {
        connState = ConnectionHandshake.SHUTDOWN;

        final RawFrame goAway;
        if (localSettingState != SettingsHandshake.ACKED) {
            goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.SETTINGS_TIMEOUT,
                            "Setting timeout (" + timeout + ")");
        } else {
            goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR,
                            "Timeout due to inactivity (" + timeout + ")");
        }
        commitFrame(goAway);
        for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
            final Map.Entry<Integer, H2Stream> entry = it.next();
            final H2Stream stream = entry.getValue();
            stream.reset(new H2StreamResetException(H2Error.NO_ERROR, "Timeout due to inactivity (" + timeout + ")"));
        }
        streamMap.clear();
    }

    public final void onDisconnect() {
        for (;;) {
            final AsyncPingHandler pingHandler = pingHandlers.poll();
            if (pingHandler != null) {
                pingHandler.cancel();
            } else {
                break;
            }
        }
        for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
            final Map.Entry<Integer, H2Stream> entry = it.next();
            final H2Stream stream = entry.getValue();
            stream.cancel();
        }
        for (;;) {
            final Command command = ioSession.poll();
            if (command != null) {
                if (command instanceof ExecutableCommand) {
                    ((ExecutableCommand) command).failed(new ConnectionClosedException());
                } else {
                    command.cancel();
                }
            } else {
                break;
            }
        }
    }

    private void processPendingCommands() throws IOException, HttpException {
        while (streamMap.size() < remoteConfig.getMaxConcurrentStreams()) {
            final Command command = ioSession.poll();
            if (command == null) {
                break;
            }
            if (command instanceof ShutdownCommand) {
                final ShutdownCommand shutdownCommand = (ShutdownCommand) command;
                if (shutdownCommand.getType() == CloseMode.IMMEDIATE) {
                    for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
                        final Map.Entry<Integer, H2Stream> entry = it.next();
                        final H2Stream stream = entry.getValue();
                        stream.cancel();
                    }
                    streamMap.clear();
                    connState = ConnectionHandshake.SHUTDOWN;
                } else {
                    if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
                        final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, H2Error.NO_ERROR, "Graceful shutdown");
                        commitFrame(goAway);
                        connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
                    }
                }
                break;
            } else if (command instanceof PingCommand) {
                final PingCommand pingCommand = (PingCommand) command;
                final AsyncPingHandler handler = pingCommand.getHandler();
                pingHandlers.add(handler);
                final RawFrame ping = frameFactory.createPing(handler.getData());
                commitFrame(ping);
            } else if (command instanceof ExecutableCommand) {
                final int streamId = generateStreamId();
                final H2StreamChannelImpl channel = new H2StreamChannelImpl(
                        streamId, true, initInputWinSize, initOutputWinSize);
                final ExecutableCommand executableCommand = (ExecutableCommand) command;
                final H2StreamHandler streamHandler = createLocallyInitiatedStream(
                        executableCommand, channel, httpProcessor, connMetrics);

                final H2Stream stream = new H2Stream(channel, streamHandler, false);
                streamMap.put(streamId, stream);

                if (streamListener != null) {
                    final int initInputWindow = stream.getInputWindow().get();
                    streamListener.onInputFlowControl(this, streamId, initInputWindow, initInputWindow);
                    final int initOutputWindow = stream.getOutputWindow().get();
                    streamListener.onOutputFlowControl(this, streamId, initOutputWindow, initOutputWindow);
                }

                if (stream.isOutputReady()) {
                    stream.produceOutput();
                }
                final CancellableDependency cancellableDependency = executableCommand.getCancellableDependency();
                if (cancellableDependency != null) {
                    cancellableDependency.setDependency(stream::abort);
                }
                if (!outputQueue.isEmpty()) {
                    return;
                }
            }
        }
    }

    public final void onException(final Exception cause) {
        try {
            for (;;) {
                final AsyncPingHandler pingHandler = pingHandlers.poll();
                if (pingHandler != null) {
                    pingHandler.failed(cause);
                } else {
                    break;
                }
            }
            for (;;) {
                final Command command = ioSession.poll();
                if (command != null) {
                    if (command instanceof ExecutableCommand) {
                        ((ExecutableCommand) command).failed(new ConnectionClosedException());
                    } else {
                        command.cancel();
                    }
                } else {
                    break;
                }
            }
            for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
                final Map.Entry<Integer, H2Stream> entry = it.next();
                final H2Stream stream = entry.getValue();
                stream.reset(cause);
            }
            streamMap.clear();
            if (!(cause instanceof ConnectionClosedException)) {
                if (connState.compareTo(ConnectionHandshake.GRACEFUL_SHUTDOWN) <= 0) {
                    final H2Error errorCode;
                    if (cause instanceof H2ConnectionException) {
                        errorCode = H2Error.getByCode(((H2ConnectionException) cause).getCode());
                    } else if (cause instanceof ProtocolException){
                        errorCode = H2Error.PROTOCOL_ERROR;
                    } else {
                        errorCode = H2Error.INTERNAL_ERROR;
                    }
                    final RawFrame goAway = frameFactory.createGoAway(processedRemoteStreamId, errorCode, cause.getMessage());
                    commitFrame(goAway);
                }
            }
        } catch (final IOException ignore) {
        } finally {
            connState = ConnectionHandshake.SHUTDOWN;
            final CloseMode closeMode;
            if (cause instanceof ConnectionClosedException) {
                closeMode = CloseMode.GRACEFUL;
            } else if (cause instanceof SSLHandshakeException) {
                closeMode = CloseMode.GRACEFUL;
            } else if (cause instanceof IOException) {
                closeMode = CloseMode.IMMEDIATE;
            } else {
                closeMode = CloseMode.GRACEFUL;
            }
            ioSession.close(closeMode);
        }
    }

    private H2Stream getValidStream(final int streamId) throws H2ConnectionException {
        if (streamId == 0) {
            throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
        }
        final H2Stream stream = streamMap.get(streamId);
        if (stream == null) {
            if (streamId <= lastStreamId.get()) {
                throw new H2ConnectionException(H2Error.STREAM_CLOSED, "Stream closed");
            } else {
                throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
            }
        }
        return stream;
    }

    private void consumeFrame(final RawFrame frame) throws HttpException, IOException {
        final FrameType frameType = FrameType.valueOf(frame.getType());
        final int streamId = frame.getStreamId();
        if (continuation != null && frameType != FrameType.CONTINUATION) {
            throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "CONTINUATION frame expected");
        }
        switch (frameType) {
            case DATA: {
                final H2Stream stream = getValidStream(streamId);
                try {
                    consumeDataFrame(frame, stream);
                } catch (final H2StreamResetException ex) {
                    stream.localReset(ex);
                } catch (final HttpStreamResetException ex) {
                    stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
                }

                if (stream.isTerminated()) {
                    streamMap.remove(streamId);
                    stream.releaseResources();
                    requestSessionOutput();
                }
            }
            break;
            case HEADERS: {
                if (streamId == 0) {
                    throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
                }
                H2Stream stream = streamMap.get(streamId);
                if (stream == null) {
                    acceptHeaderFrame();

                    if (idGenerator.isSameSide(streamId)) {
                        throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
                    }
                    if (goAwayReceived ) {
                        throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
                    }

                    updateLastStreamId(streamId);

                    final H2StreamChannelImpl channel = new H2StreamChannelImpl(
                            streamId, false, initInputWinSize, initOutputWinSize);
                    final H2StreamHandler streamHandler;
                    if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
                        streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics, null);
                    } else {
                        streamHandler = NoopH2StreamHandler.INSTANCE;
                        channel.setLocalEndStream();
                    }

                    stream = new H2Stream(channel, streamHandler, true);
                    if (stream.isOutputReady()) {
                        stream.produceOutput();
                    }
                    streamMap.put(streamId, stream);
                }

                try {
                    consumeHeaderFrame(frame, stream);

                    if (stream.isOutputReady()) {
                        stream.produceOutput();
                    }
                } catch (final H2StreamResetException ex) {
                    stream.localReset(ex);
                } catch (final HttpStreamResetException ex) {
                    stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
                } catch (final HttpException ex) {
                    stream.handle(ex);
                }

                if (stream.isTerminated()) {
                    streamMap.remove(streamId);
                    stream.releaseResources();
                    requestSessionOutput();
                }
            }
            break;
            case CONTINUATION: {
                if (continuation == null) {
                    throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION frame");
                }
                if (streamId != continuation.streamId) {
                    throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected CONTINUATION stream id: " + streamId);
                }

                final H2Stream stream = getValidStream(streamId);
                try {

                    consumeContinuationFrame(frame, stream);
                } catch (final H2StreamResetException ex) {
                    stream.localReset(ex);
                } catch (final HttpStreamResetException ex) {
                    stream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.CANCEL);
                }

                if (stream.isTerminated()) {
                    streamMap.remove(streamId);
                    stream.releaseResources();
                    requestSessionOutput();
                }
            }
            break;
            case WINDOW_UPDATE: {
                final ByteBuffer payload = frame.getPayload();
                if (payload == null || payload.remaining() != 4) {
                    throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid WINDOW_UPDATE frame payload");
                }
                final int delta = payload.getInt();
                if (delta <= 0) {
                    throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Invalid WINDOW_UPDATE delta");
                }
                if (streamId == 0) {
                    try {
                        updateOutputWindow(0, connOutputWindow, delta);
                    } catch (final ArithmeticException ex) {
                        throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
                    }
                } else {
                    final H2Stream stream = streamMap.get(streamId);
                    if (stream != null) {
                        try {
                            updateOutputWindow(streamId, stream.getOutputWindow(), delta);
                        } catch (final ArithmeticException ex) {
                            throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
                        }
                    }
                }
                ioSession.setEvent(SelectionKey.OP_WRITE);
            }
            break;
            case RST_STREAM: {
                if (streamId == 0) {
                    throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
                }
                final H2Stream stream = streamMap.get(streamId);
                if (stream == null) {
                    if (streamId > lastStreamId.get()) {
                        throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected stream id: " + streamId);
                    }
                } else {
                    final ByteBuffer payload = frame.getPayload();
                    if (payload == null || payload.remaining() != 4) {
                        throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid RST_STREAM frame payload");
                    }
                    final int errorCode = payload.getInt();
                    stream.reset(new H2StreamResetException(errorCode, "Stream reset (" + errorCode + ")"));
                    streamMap.remove(streamId);
                    stream.releaseResources();
                    requestSessionOutput();
                }
            }
            break;
            case PING: {
                if (streamId != 0) {
                    throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
                }
                final ByteBuffer ping = frame.getPayloadContent();
                if (ping == null || ping.remaining() != 8) {
                    throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PING frame payload");
                }
                if (frame.isFlagSet(FrameFlag.ACK)) {
                    final AsyncPingHandler pingHandler = pingHandlers.poll();
                    if (pingHandler != null) {
                        pingHandler.consumeResponse(ping);
                    }
                } else {
                    final ByteBuffer pong = ByteBuffer.allocate(ping.remaining());
                    pong.put(ping);
                    pong.flip();
                    final RawFrame response = frameFactory.createPingAck(pong);
                    commitFrame(response);
                }
            }
            break;
            case SETTINGS: {
                if (streamId != 0) {
                    throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
                }
                if (frame.isFlagSet(FrameFlag.ACK)) {
                    if (localSettingState == SettingsHandshake.TRANSMITTED) {
                        localSettingState = SettingsHandshake.ACKED;
                        ioSession.setEvent(SelectionKey.OP_WRITE);
                        applyLocalSettings();
                    }
                } else {
                    final ByteBuffer payload = frame.getPayload();
                    if (payload != null) {
                        if ((payload.remaining() % 6) != 0) {
                            throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid SETTINGS payload");
                        }
                        consumeSettingsFrame(payload);
                        remoteSettingState = SettingsHandshake.TRANSMITTED;
                    }
                    // Send ACK
                    final RawFrame response = frameFactory.createSettingsAck();
                    commitFrame(response);
                    remoteSettingState = SettingsHandshake.ACKED;
                }
            }
            break;
            case PRIORITY:
                // Stream priority not supported
                break;
            case PUSH_PROMISE: {
                acceptPushFrame();

                if (goAwayReceived ) {
                    throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "GOAWAY received");
                }

                if (!localConfig.isPushEnabled()) {
                    throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Push is disabled");
                }

                final H2Stream stream = getValidStream(streamId);
                if (stream.isRemoteClosed()) {
                    stream.localReset(new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream closed"));
                    break;
                }

                final ByteBuffer payload = frame.getPayloadContent();
                if (payload == null || payload.remaining() < 4) {
                    throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid PUSH_PROMISE payload");
                }
                final int promisedStreamId = payload.getInt();
                if (promisedStreamId == 0 || idGenerator.isSameSide(promisedStreamId)) {
                    throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal promised stream id: " + promisedStreamId);
                }
                if (streamMap.get(promisedStreamId) != null) {
                    throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Unexpected promised stream id: " + promisedStreamId);
                }

                updateLastStreamId(promisedStreamId);

                final H2StreamChannelImpl channel = new H2StreamChannelImpl(
                        promisedStreamId, false, initInputWinSize, initOutputWinSize);
                final H2StreamHandler streamHandler;
                if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
                    streamHandler = createRemotelyInitiatedStream(channel, httpProcessor, connMetrics,
                            stream.getPushHandlerFactory());
                } else {
                    streamHandler = NoopH2StreamHandler.INSTANCE;
                    channel.setLocalEndStream();
                }

                final H2Stream promisedStream = new H2Stream(channel, streamHandler, true);
                streamMap.put(promisedStreamId, promisedStream);

                try {
                    consumePushPromiseFrame(frame, payload, promisedStream);
                } catch (final H2StreamResetException ex) {
                    promisedStream.localReset(ex);
                } catch (final HttpStreamResetException ex) {
                    promisedStream.localReset(ex, ex.getCause() != null ? H2Error.INTERNAL_ERROR : H2Error.NO_ERROR);
                }
            }
            break;
            case GOAWAY: {
                if (streamId != 0) {
                    throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id");
                }
                final ByteBuffer payload = frame.getPayload();
                if (payload == null || payload.remaining() < 8) {
                    throw new H2ConnectionException(H2Error.FRAME_SIZE_ERROR, "Invalid GOAWAY payload");
                }
                final int processedLocalStreamId = payload.getInt();
                final int errorCode = payload.getInt();
                goAwayReceived = true;
                if (errorCode == H2Error.NO_ERROR.getCode()) {
                    if (connState.compareTo(ConnectionHandshake.ACTIVE) <= 0) {
                        for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
                            final Map.Entry<Integer, H2Stream> entry = it.next();
                            final int activeStreamId = entry.getKey();
                            if (!idGenerator.isSameSide(activeStreamId) && activeStreamId > processedLocalStreamId) {
                                final H2Stream stream = entry.getValue();
                                stream.cancel();
                                it.remove();
                            }
                        }
                    }
                    connState = streamMap.isEmpty() ? ConnectionHandshake.SHUTDOWN : ConnectionHandshake.GRACEFUL_SHUTDOWN;
                } else {
                    for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
                        final Map.Entry<Integer, H2Stream> entry = it.next();
                        final H2Stream stream = entry.getValue();
                        stream.reset(new H2StreamResetException(errorCode, "Connection terminated by the peer (" + errorCode + ")"));
                    }
                    streamMap.clear();
                    connState = ConnectionHandshake.SHUTDOWN;
                }
            }
            ioSession.setEvent(SelectionKey.OP_WRITE);
            break;
        }
    }

    private void consumeDataFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
        final int streamId = stream.getId();
        final ByteBuffer payload = frame.getPayloadContent();
        if (payload != null) {
            final int frameLength = frame.getLength();
            final int streamWinSize = updateInputWindow(streamId, stream.getInputWindow(), -frameLength);
            if (streamWinSize < lowMark && !stream.isRemoteClosed()) {
                stream.produceInputCapacityUpdate();
            }
            final int connWinSize = updateInputWindow(0, connInputWindow, -frameLength);
            if (connWinSize < CONNECTION_WINDOW_LOW_MARK) {
                maximizeConnWindow(connWinSize);
            }
        }
        if (stream.isRemoteClosed()) {
            throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
        }
        if (frame.isFlagSet(FrameFlag.END_STREAM)) {
            stream.setRemoteEndStream();
        }
        if (stream.isLocalReset()) {
            return;
        }
        stream.consumeData(payload);
    }

    private void maximizeConnWindow(final int connWinSize) throws IOException {
        final int delta = Integer.MAX_VALUE - connWinSize;
        if (delta > 0) {
            final RawFrame windowUpdateFrame = frameFactory.createWindowUpdate(0, delta);
            commitFrame(windowUpdateFrame);
            updateInputWindow(0, connInputWindow, delta);
        }
    }

    private void consumePushPromiseFrame(final RawFrame frame, final ByteBuffer payload, final H2Stream promisedStream) throws HttpException, IOException {
        final int promisedStreamId = promisedStream.getId();
        if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
            continuation = new Continuation(promisedStreamId, frame.getType(), true);
        }
        if (continuation == null) {
            final List<Header> headers = hPackDecoder.decodeHeaders(payload);
            if (promisedStreamId > processedRemoteStreamId) {
                processedRemoteStreamId = promisedStreamId;
            }
            if (streamListener != null) {
                streamListener.onHeaderInput(this, promisedStreamId, headers);
            }
            promisedStream.consumePromise(headers);
        } else {
            continuation.copyPayload(payload);
        }
    }

    List<Header> decodeHeaders(final ByteBuffer payload) throws HttpException {
        return hPackDecoder.decodeHeaders(payload);
    }

    private void consumeHeaderFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
        final int streamId = stream.getId();
        if (!frame.isFlagSet(FrameFlag.END_HEADERS)) {
            continuation = new Continuation(streamId, frame.getType(), frame.isFlagSet(FrameFlag.END_STREAM));
        }
        final ByteBuffer payload = frame.getPayloadContent();
        if (frame.isFlagSet(FrameFlag.PRIORITY)) {
            // Priority not supported
            payload.getInt();
            payload.get();
        }
        if (continuation == null) {
            final List<Header> headers = decodeHeaders(payload);
            if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
                processedRemoteStreamId = streamId;
            }
            if (streamListener != null) {
                streamListener.onHeaderInput(this, streamId, headers);
            }
            if (stream.isRemoteClosed()) {
                throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
            }
            if (stream.isLocalReset()) {
                return;
            }
            if (frame.isFlagSet(FrameFlag.END_STREAM)) {
                stream.setRemoteEndStream();
            }
            stream.consumeHeader(headers);
        } else {
            continuation.copyPayload(payload);
        }
    }

    private void consumeContinuationFrame(final RawFrame frame, final H2Stream stream) throws HttpException, IOException {
        final int streamId = frame.getStreamId();
        final ByteBuffer payload = frame.getPayload();
        continuation.copyPayload(payload);
        if (frame.isFlagSet(FrameFlag.END_HEADERS)) {
            final List<Header> headers = decodeHeaders(continuation.getContent());
            if (stream.isRemoteInitiated() && streamId > processedRemoteStreamId) {
                processedRemoteStreamId = streamId;
            }
            if (streamListener != null) {
                streamListener.onHeaderInput(this, streamId, headers);
            }
            if (stream.isRemoteClosed()) {
                throw new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream already closed");
            }
            if (stream.isLocalReset()) {
                return;
            }
            if (continuation.endStream) {
                stream.setRemoteEndStream();
            }
            if (continuation.type == FrameType.PUSH_PROMISE.getValue()) {
                stream.consumePromise(headers);
            } else {
                stream.consumeHeader(headers);
            }
            continuation = null;
        }
    }

    private void consumeSettingsFrame(final ByteBuffer payload) throws IOException {
        final H2Config.Builder configBuilder = H2Config.initial();
        while (payload.hasRemaining()) {
            final int code = payload.getShort();
            final int value = payload.getInt();
            final H2Param param = H2Param.valueOf(code);
            if (param != null) {
                switch (param) {
                    case HEADER_TABLE_SIZE:
                        try {
                            configBuilder.setHeaderTableSize(value);
                        } catch (final IllegalArgumentException ex) {
                            throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
                        }
                        break;
                    case MAX_CONCURRENT_STREAMS:
                        try {
                            configBuilder.setMaxConcurrentStreams(value);
                        } catch (final IllegalArgumentException ex) {
                            throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
                        }
                        break;
                    case ENABLE_PUSH:
                        configBuilder.setPushEnabled(value == 1);
                        break;
                    case INITIAL_WINDOW_SIZE:
                        try {
                            configBuilder.setInitialWindowSize(value);
                        } catch (final IllegalArgumentException ex) {
                            throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
                        }
                        break;
                    case MAX_FRAME_SIZE:
                        try {
                            configBuilder.setMaxFrameSize(value);
                        } catch (final IllegalArgumentException ex) {
                            throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
                        }
                        break;
                    case MAX_HEADER_LIST_SIZE:
                        try {
                            configBuilder.setMaxHeaderListSize(value);
                        } catch (final IllegalArgumentException ex) {
                            throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, ex.getMessage());
                        }
                        break;
                }
            }
        }
        applyRemoteSettings(configBuilder.build());
    }

    private void produceOutput() throws HttpException, IOException {
        for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
            final Map.Entry<Integer, H2Stream> entry = it.next();
            final H2Stream stream = entry.getValue();
            if (!stream.isLocalClosed() && stream.getOutputWindow().get() > 0) {
                stream.produceOutput();
            }
            if (stream.isTerminated()) {
                it.remove();
                stream.releaseResources();
                requestSessionOutput();
            }
            if (!outputQueue.isEmpty()) {
                break;
            }
        }
    }

    private void applyRemoteSettings(final H2Config config) throws H2ConnectionException {
        remoteConfig = config;

        hPackEncoder.setMaxTableSize(remoteConfig.getHeaderTableSize());
        final int delta = remoteConfig.getInitialWindowSize() - initOutputWinSize;
        initOutputWinSize = remoteConfig.getInitialWindowSize();

        final int maxFrameSize = remoteConfig.getMaxFrameSize();
        if (maxFrameSize < outputBuffer.getMaxFramePayloadSize()) {
            try {
                outputBuffer.resize(maxFrameSize);
            } catch (final BufferOverflowException ex) {
                throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Failure resizing the frame output buffer");
            }
        }

        if (delta != 0) {
            if (!streamMap.isEmpty()) {
                for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
                    final Map.Entry<Integer, H2Stream> entry = it.next();
                    final H2Stream stream = entry.getValue();
                    try {
                        updateOutputWindow(stream.getId(), stream.getOutputWindow(), delta);
                    } catch (final ArithmeticException ex) {
                        throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
                    }
                }
            }
        }
    }

    private void applyLocalSettings() throws H2ConnectionException {
        hPackDecoder.setMaxTableSize(localConfig.getHeaderTableSize());
        hPackDecoder.setMaxListSize(localConfig.getMaxHeaderListSize());

        final int delta = localConfig.getInitialWindowSize() - initInputWinSize;
        initInputWinSize = localConfig.getInitialWindowSize();

        if (delta != 0 && !streamMap.isEmpty()) {
            for (final Iterator<Map.Entry<Integer, H2Stream>> it = streamMap.entrySet().iterator(); it.hasNext(); ) {
                final Map.Entry<Integer, H2Stream> entry = it.next();
                final H2Stream stream = entry.getValue();
                try {
                    updateInputWindow(stream.getId(), stream.getInputWindow(), delta);
                } catch (final ArithmeticException ex) {
                    throw new H2ConnectionException(H2Error.FLOW_CONTROL_ERROR, ex.getMessage());
                }
            }
        }
        lowMark = initInputWinSize / 2;
    }

    @Override
    public void close() throws IOException {
        ioSession.enqueue(ShutdownCommand.GRACEFUL, Command.Priority.IMMEDIATE);
    }

    @Override
    public void close(final CloseMode closeMode) {
        ioSession.close(closeMode);
    }

    @Override
    public boolean isOpen() {
        return connState == ConnectionHandshake.ACTIVE;
    }

    @Override
    public void setSocketTimeout(final Timeout timeout) {
        ioSession.setSocketTimeout(timeout);
    }

    @Override
    public SSLSession getSSLSession() {
        final TlsDetails tlsDetails = ioSession.getTlsDetails();
        return tlsDetails != null ? tlsDetails.getSSLSession() : null;
    }

    @Override
    public EndpointDetails getEndpointDetails() {
        if (endpointDetails == null) {
            endpointDetails = new BasicEndpointDetails(
                    ioSession.getRemoteAddress(),
                    ioSession.getLocalAddress(),
                    connMetrics,
                    ioSession.getSocketTimeout());
        }
        return endpointDetails;
    }

    @Override
    public Timeout getSocketTimeout() {
        return ioSession.getSocketTimeout();
    }

    @Override
    public ProtocolVersion getProtocolVersion() {
        return HttpVersion.HTTP_2;
    }

    @Override
    public SocketAddress getRemoteAddress() {
        return ioSession.getRemoteAddress();
    }

    @Override
    public SocketAddress getLocalAddress() {
        return ioSession.getLocalAddress();
    }

    void appendState(final StringBuilder buf) {
        buf.append("connState=").append(connState)
                .append(", connInputWindow=").append(connInputWindow)
                .append(", connOutputWindow=").append(connOutputWindow)
                .append(", outputQueue=").append(outputQueue.size())
                .append(", streamMap=").append(streamMap.size())
                .append(", processedRemoteStreamId=").append(processedRemoteStreamId);
    }

    private static class Continuation {

        final int streamId;
        final int type;
        final boolean endStream;
        final ByteArrayBuffer headerBuffer;

        private Continuation(final int streamId, final int type, final boolean endStream) {
            this.streamId = streamId;
            this.type = type;
            this.endStream = endStream;
            this.headerBuffer = new ByteArrayBuffer(1024);
        }

        void copyPayload(final ByteBuffer payload) {
            if (payload == null) {
                return;
            }
            headerBuffer.ensureCapacity(payload.remaining());
            payload.get(headerBuffer.array(), headerBuffer.length(), payload.remaining());
        }

        ByteBuffer getContent() {
            return ByteBuffer.wrap(headerBuffer.array(), 0, headerBuffer.length());
        }

    }

    private class H2StreamChannelImpl implements H2StreamChannel {

        private final int id;
        private final AtomicInteger inputWindow;
        private final AtomicInteger outputWindow;

        private volatile boolean idle;
        private volatile boolean remoteEndStream;
        private volatile boolean localEndStream;

        private volatile long deadline;

        H2StreamChannelImpl(final int id, final boolean idle, final int initialInputWindowSize, final int initialOutputWindowSize) {
            this.id = id;
            this.idle = idle;
            this.inputWindow = new AtomicInteger(initialInputWindowSize);
            this.outputWindow = new AtomicInteger(initialOutputWindowSize);
        }

        int getId() {
            return id;
        }

        AtomicInteger getOutputWindow() {
            return outputWindow;
        }

        AtomicInteger getInputWindow() {
            return inputWindow;
        }

        @Override
        public void submit(final List<Header> headers, final boolean endStream) throws IOException {
            ioSession.getLock().lock();
            try {
                if (headers == null || headers.isEmpty()) {
                    throw new H2ConnectionException(H2Error.INTERNAL_ERROR, "Message headers are missing");
                }
                if (localEndStream) {
                    return;
                }
                idle = false;
                commitHeaders(id, headers, endStream);
                if (endStream) {
                    localEndStream = true;
                }
            } finally {
                ioSession.getLock().unlock();
            }
        }

        @Override
        public void push(final List<Header> headers, final AsyncPushProducer pushProducer) throws HttpException, IOException {
            acceptPushRequest();
            final int promisedStreamId = generateStreamId();
            final H2StreamChannelImpl channel = new H2StreamChannelImpl(
                    promisedStreamId,
                    true,
                    localConfig.getInitialWindowSize(),
                    remoteConfig.getInitialWindowSize());
            final HttpCoreContext context = HttpCoreContext.create();
            context.setSSLSession(getSSLSession());
            context.setEndpointDetails(getEndpointDetails());
            final H2StreamHandler streamHandler = new ServerPushH2StreamHandler(
                    channel, httpProcessor, connMetrics, pushProducer, context);
            final H2Stream stream = new H2Stream(channel, streamHandler, false);
            streamMap.put(promisedStreamId, stream);

            ioSession.getLock().lock();
            try {
                if (localEndStream) {
                    stream.releaseResources();
                    return;
                }
                commitPushPromise(id, promisedStreamId, headers);
                idle = false;
            } finally {
                ioSession.getLock().unlock();
            }
        }

        @Override
        public void update(final int increment) throws IOException {
            if (remoteEndStream) {
                return;
            }
            incrementInputCapacity(0, connInputWindow, increment);
            incrementInputCapacity(id, inputWindow, increment);
        }

        @Override
        public int write(final ByteBuffer payload) throws IOException {
            ioSession.getLock().lock();
            try {
                if (localEndStream) {
                    return 0;
                }
                return streamData(id, outputWindow, payload);
            } finally {
                ioSession.getLock().unlock();
            }
        }

        @Override
        public void endStream(final List<? extends Header> trailers) throws IOException {
            ioSession.getLock().lock();
            try {
                if (localEndStream) {
                    return;
                }
                localEndStream = true;
                if (trailers != null && !trailers.isEmpty()) {
                    commitHeaders(id, trailers, true);
                } else {
                    final RawFrame frame = frameFactory.createData(id, null, true);
                    commitFrameInternal(frame);
                }
            } finally {
                ioSession.getLock().unlock();
            }
        }

        @Override
        public void endStream() throws IOException {
            endStream(null);
        }

        @Override
        public void requestOutput() {
            requestSessionOutput();
        }

        boolean isRemoteClosed() {
            return remoteEndStream;
        }

        void setRemoteEndStream() {
            remoteEndStream = true;
        }

        boolean isLocalClosed() {
            return localEndStream;
        }

        void setLocalEndStream() {
            localEndStream = true;
        }

        boolean isLocalReset() {
            return deadline > 0;
        }

        boolean isResetDeadline() {
            final long l = deadline;
            return l > 0 && l < System.currentTimeMillis();
        }

        boolean localReset(final int code) throws IOException {
            ioSession.getLock().lock();
            try {
                if (localEndStream) {
                    return false;
                }
                localEndStream = true;
                deadline = System.currentTimeMillis() + LINGER_TIME;
                if (!idle) {
                    final RawFrame resetStream = frameFactory.createResetStream(id, code);
                    commitFrameInternal(resetStream);
                    return true;
                }
                return false;
            } finally {
                ioSession.getLock().unlock();
            }
        }

        boolean localReset(final H2Error error) throws IOException {
            return localReset(error!= null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
        }

        @Override
        public boolean cancel() {
            try {
                return localReset(H2Error.CANCEL);
            } catch (final IOException ignore) {
                return false;
            }
        }

        void appendState(final StringBuilder buf) {
            buf.append("id=").append(id)
                    .append(", connState=").append(connState)
                    .append(", inputWindow=").append(inputWindow)
                    .append(", outputWindow=").append(outputWindow)
                    .append(", localEndStream=").append(localEndStream)
                    .append(", idle=").append(idle);
        }

        @Override
        public String toString() {
            final StringBuilder buf = new StringBuilder();
            buf.append("[");
            appendState(buf);
            buf.append("]");
            return buf.toString();
        }

    }

    static class H2Stream {

        private final H2StreamChannelImpl channel;
        private final H2StreamHandler handler;
        private final boolean remoteInitiated;

        private H2Stream(
                final H2StreamChannelImpl channel,
                final H2StreamHandler handler,
                final boolean remoteInitiated) {
            this.channel = channel;
            this.handler = handler;
            this.remoteInitiated = remoteInitiated;
        }

        int getId() {
            return channel.getId();
        }

        boolean isRemoteInitiated() {
            return remoteInitiated;
        }

        AtomicInteger getOutputWindow() {
            return channel.getOutputWindow();
        }

        AtomicInteger getInputWindow() {
            return channel.getInputWindow();
        }

        boolean isTerminated() {
            return channel.isLocalClosed() && (channel.isRemoteClosed() || channel.isResetDeadline());
        }

        boolean isRemoteClosed() {
            return channel.isRemoteClosed();
        }

        boolean isLocalClosed() {
            return channel.isLocalClosed();
        }

        boolean isLocalReset() {
            return channel.isLocalReset();
        }

        void setRemoteEndStream() {
            channel.setRemoteEndStream();
        }

        void consumePromise(final List<Header> headers) throws HttpException, IOException {
            try {
                handler.consumePromise(headers);
                channel.setLocalEndStream();
            } catch (final ProtocolException ex) {
                localReset(ex, H2Error.PROTOCOL_ERROR);
            }
        }

        void consumeHeader(final List<Header> headers) throws HttpException, IOException {
            try {
                handler.consumeHeader(headers, channel.isRemoteClosed());
            } catch (final ProtocolException ex) {
                localReset(ex, H2Error.PROTOCOL_ERROR);
            }
        }

        void consumeData(final ByteBuffer src) throws HttpException, IOException {
            try {
                handler.consumeData(src, channel.isRemoteClosed());
            } catch (final CharacterCodingException ex) {
                localReset(ex, H2Error.INTERNAL_ERROR);
            } catch (final ProtocolException ex) {
                localReset(ex, H2Error.PROTOCOL_ERROR);
            }
        }

        boolean isOutputReady() {
            return handler.isOutputReady();
        }

        void produceOutput() throws HttpException, IOException {
            try {
                handler.produceOutput();
            } catch (final ProtocolException ex) {
                localReset(ex, H2Error.PROTOCOL_ERROR);
            }
        }

        void produceInputCapacityUpdate() throws IOException {
            handler.updateInputCapacity();
        }

        void reset(final Exception cause) {
            channel.setRemoteEndStream();
            channel.setLocalEndStream();
            handler.failed(cause);
        }

        void localReset(final Exception cause, final int code) throws IOException {
            channel.localReset(code);
            handler.failed(cause);
        }

        void localReset(final Exception cause, final H2Error error) throws IOException {
            localReset(cause, error != null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
        }

        void localReset(final H2StreamResetException ex) throws IOException {
            localReset(ex, ex.getCode());
        }

        void handle(final HttpException ex) throws IOException, HttpException {
            handler.handle(ex, channel.isRemoteClosed());
        }

        HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
            return handler.getPushHandlerFactory();
        }

        void cancel() {
            reset(new RequestNotExecutedException());
        }

        boolean abort() {
            final boolean cancelled = channel.cancel();
            handler.failed(new RequestNotExecutedException());
            return cancelled;
        }

        void releaseResources() {
            handler.releaseResources();
        }

        void appendState(final StringBuilder buf) {
            buf.append("channel=[");
            channel.appendState(buf);
            buf.append("]");
        }

        @Override
        public String toString() {
            final StringBuilder buf = new StringBuilder();
            buf.append("[");
            appendState(buf);
            buf.append("]");
            return buf.toString();
        }

    }

}