H2Stream.java
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.core5.http2.impl.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http2.H2Error;
import org.apache.hc.core5.http2.H2StreamResetException;
class H2Stream {
enum State { RESERVED, OPEN, CLOSED }
private static final long LINGER_TIME = 1000; // 1 second
private final H2StreamChannel channel;
private final H2StreamHandler handler;
private final Consumer<State> stateChangeCallback;
private final AtomicReference<State> transitionRef;
private final AtomicBoolean released;
private final AtomicBoolean cancelled;
private volatile boolean reserved;
private volatile boolean remoteClosed;
H2Stream(final H2StreamChannel channel, final H2StreamHandler handler, final Consumer<State> stateChangeCallback) {
this.channel = channel;
this.handler = handler;
this.stateChangeCallback = stateChangeCallback;
this.reserved = true;
this.transitionRef = new AtomicReference<>(State.RESERVED);
this.released = new AtomicBoolean();
this.cancelled = new AtomicBoolean();
}
int getId() {
return channel.getId();
}
boolean isReserved() {
return reserved;
}
private void triggerOpen() {
if (transitionRef.compareAndSet(State.RESERVED, State.OPEN) && stateChangeCallback != null) {
stateChangeCallback.accept(State.OPEN);
}
}
private void triggerClosed() {
if (transitionRef.compareAndSet(State.OPEN, State.CLOSED) && stateChangeCallback != null) {
stateChangeCallback.accept(State.CLOSED);
}
}
void activate() {
reserved = false;
triggerOpen();
}
AtomicInteger getOutputWindow() {
return channel.getOutputWindow();
}
AtomicInteger getInputWindow() {
return channel.getInputWindow();
}
private boolean isPastLingerDeadline() {
final long localResetTime = channel.getLocalResetTime();
return localResetTime > 0 && localResetTime + LINGER_TIME < System.currentTimeMillis();
}
boolean isClosedPastLingerDeadline() {
return channel.isLocalClosed() && (remoteClosed || isPastLingerDeadline());
}
boolean isClosed() {
return channel.isLocalClosed() && (remoteClosed || channel.isLocalReset());
}
boolean isActive() {
return !reserved && !isClosed();
}
boolean isRemoteClosed() {
return remoteClosed;
}
void markRemoteClosed() {
remoteClosed = true;
}
boolean isLocalClosed() {
return channel.isLocalClosed();
}
void consumePromise(final List<Header> headers) throws HttpException, IOException {
try {
if (channel.isLocalReset()) {
return;
}
if (cancelled.get()) {
localResetCancelled();
return;
}
handler.consumePromise(headers);
channel.markLocalClosed();
} catch (final ProtocolException ex) {
localReset(ex, H2Error.PROTOCOL_ERROR);
}
}
void consumeHeader(final List<Header> headers, final boolean endOfStream) throws HttpException, IOException {
try {
if (endOfStream) {
remoteClosed = true;
}
if (channel.isLocalReset()) {
return;
}
if (cancelled.get()) {
localResetCancelled();
return;
}
handler.consumeHeader(headers, remoteClosed);
} catch (final ProtocolException ex) {
localReset(ex, H2Error.PROTOCOL_ERROR);
}
}
void consumeData(final ByteBuffer src, final boolean endOfStream) throws HttpException, IOException {
try {
if (endOfStream) {
remoteClosed = true;
}
if (channel.isLocalReset()) {
return;
}
if (cancelled.get()) {
localResetCancelled();
return;
}
handler.consumeData(src, remoteClosed);
} catch (final CharacterCodingException ex) {
localReset(ex, H2Error.INTERNAL_ERROR);
} catch (final ProtocolException ex) {
localReset(ex, H2Error.PROTOCOL_ERROR);
}
}
boolean isOutputReady() {
return !reserved && !channel.isLocalClosed() && handler.isOutputReady();
}
void produceOutput() throws HttpException, IOException {
try {
handler.produceOutput();
} catch (final ProtocolException ex) {
localReset(ex, H2Error.PROTOCOL_ERROR);
}
}
void produceInputCapacityUpdate() throws IOException {
handler.updateInputCapacity();
}
void fail(final Exception cause) {
remoteClosed = true;
channel.markLocalClosed();
if (released.compareAndSet(false, true)) {
try {
handler.failed(cause);
handler.releaseResources();
} finally {
triggerClosed();
}
}
}
void localReset(final Exception cause, final int code) throws IOException {
channel.localReset(code);
if (released.compareAndSet(false, true)) {
try {
handler.failed(cause);
handler.releaseResources();
} finally {
triggerClosed();
}
}
}
void localReset(final Exception cause, final H2Error error) throws IOException {
localReset(cause, error != null ? error.getCode() : H2Error.INTERNAL_ERROR.getCode());
}
void localReset(final H2StreamResetException ex) throws IOException {
localReset(ex, ex.getCode());
}
void localResetCancelled() throws IOException {
localReset(new H2StreamResetException(H2Error.CANCEL, "Cancelled"));
}
void handle(final HttpException ex) throws IOException, HttpException {
handler.handle(ex, remoteClosed);
}
HandlerFactory<AsyncPushConsumer> getPushHandlerFactory() {
return handler.getPushHandlerFactory();
}
boolean abort() {
if (cancelled.compareAndSet(false, true)) {
channel.requestOutput();
return true;
} else {
return false;
}
}
boolean abortGracefully() throws IOException {
if (!isLocalClosed() && isRemoteClosed()) {
channel.endStream();
releaseResources();
return true;
} else {
return abort();
}
}
void releaseResources() {
if (released.compareAndSet(false, true)) {
try {
handler.releaseResources();
} finally {
triggerClosed();
}
}
}
@Override
public String toString() {
final StringBuilder buf = new StringBuilder();
buf.append("[")
.append("id=").append(channel.getId())
.append(", reserved=").append(reserved)
.append(", removeClosed=").append(remoteClosed)
.append(", localClosed=").append(channel.isLocalClosed())
.append(", localReset=").append(channel.isLocalReset())
.append("]");
return buf.toString();
}
}