NettyEntityWriter.java
/*
* Copyright (c) 2023, 2025 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.glassfish.jersey.netty.connector.internal;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.handler.stream.ChunkedInput;
import org.glassfish.jersey.client.ClientRequest;
import org.glassfish.jersey.client.RequestEntityProcessing;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
/**
* The Entity Writer is used to write entity in Netty. One implementation is delayed,
* so that the complete message length can be set to Content-Length header.
*/
public interface NettyEntityWriter {
/**
* Type of the entity writer. {@code CHUNKED} is used for chunked data. {@code PRESET} is for buffered data, but the
* content length was pre-set by the customer. {@code DELAYED} is for buffered data where the content-length is unknown.
* The headers must not be written before the entity is provided by MessageBodyWriter to know the exact length.
*/
enum Type {
CHUNKED,
PRESET,
DELAYED
}
/**
* Writes the Object to the channel
* @param object object to be written
*/
void write(Object object);
/**
* Writes the Object to the channel and flush.
* @param object object to be written
*/
void writeAndFlush(Object object);
/**
* Flushes the writen objects. Can throw IOException.
* @throws IOException exception.
*/
void flush() throws IOException;
/**
* Get the netty Chunked Input to be written.
* @return The Chunked input instance
*/
ChunkedInput<ByteBuf> getChunkedInput();
/**
* Get the {@link OutputStream} used to write an entity
* @return the OutputStream to write an entity
*/
OutputStream getOutputStream();
/**
* Get the length of the entity written to the {@link OutputStream}
* @return length of the entity.
*/
long getLength();
/**
* Return Type of the {@link NettyEntityWriter}.
* @return type of the writer.
*/
Type getType();
static NettyEntityWriter getInstance(
ClientRequest clientRequest, Channel channel, Supplier<RequestEntityProcessing> requestEntityProcessingSupplier) {
final long lengthLong = clientRequest.getLengthLong();
final RequestEntityProcessing entityProcessing = requestEntityProcessingSupplier.get();
if ((entityProcessing == null && lengthLong == -1) || entityProcessing == RequestEntityProcessing.CHUNKED) {
return new DirectEntityWriter(channel, Type.CHUNKED);
} else if (lengthLong != -1) {
return new DirectEntityWriter(channel, Type.PRESET);
} else {
return new DelayedEntityWriter(channel, Type.DELAYED);
}
}
class DirectEntityWriter implements NettyEntityWriter {
private final Channel channel;
private final JerseyChunkedInput stream;
private final Type type;
public DirectEntityWriter(Channel channel, Type type) {
this.channel = channel;
stream = new JerseyChunkedInput(channel);
this.type = type;
}
@Override
public void write(Object object) {
channel.write(object);
}
@Override
public void writeAndFlush(Object object) {
channel.writeAndFlush(object);
}
@Override
public void flush() {
channel.flush();
}
@Override
public ChunkedInput<ByteBuf> getChunkedInput() {
return stream;
}
@Override
public OutputStream getOutputStream() {
return stream;
}
@Override
public long getLength() {
return stream.progress();
}
@Override
public Type getType() {
return type;
}
}
class DelayedEntityWriter implements NettyEntityWriter {
private final List<Runnable> delayedOps;
private final DirectEntityWriter writer;
private final DelayedOutputStream outputStream;
private boolean flushed = false;
private boolean closed = false;
public DelayedEntityWriter(Channel channel, Type type) {
this.writer = new DirectEntityWriter(channel, type);
this.delayedOps = new LinkedList<>();
this.outputStream = new DelayedOutputStream();
}
@Override
public void write(Object object) {
if (!flushed) {
delayedOps.add(() -> writer.write(object));
} else {
writer.write(object);
}
}
@Override
public void writeAndFlush(Object object) {
if (!flushed) {
delayedOps.add(() -> writer.writeAndFlush(object));
} else {
writer.writeAndFlush(object);
}
}
@Override
public void flush() throws IOException {
_flush();
if (!closed) {
closed = true;
writer.getOutputStream().close(); // Jersey automatically closes DelayedOutputStream not this one!
}
writer.flush();
}
private void _flush() throws IOException {
if (!flushed) {
flushed = true;
for (Runnable runnable : delayedOps) {
runnable.run();
}
outputStream._flush();
}
}
@Override
public ChunkedInput<ByteBuf> getChunkedInput() {
return writer.getChunkedInput();
}
@Override
public OutputStream getOutputStream() {
return outputStream;
}
@Override
public long getLength() {
return outputStream.writeLen;
}
@Override
public Type getType() {
return writer.getType();
}
private class DelayedOutputStream extends OutputStream {
private final List<WriteAction> actions = new ArrayList<>();
private int writeLen = 0;
private final AtomicBoolean streamFlushed = new AtomicBoolean(false);
@Override
public void write(int b) throws IOException {
write(new byte[]{(byte) (b & 0xFF)}, 0, 1);
}
@Override
public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (!flushed) {
actions.add(new WriteAction(b, off, len));
writeLen += len;
} else {
_flush();
writer.getOutputStream().write(b, off, len);
writer.getOutputStream().flush();
}
}
public void _flush() throws IOException {
if (streamFlushed.compareAndSet(false, true)) {
DelayedEntityWriter.this._flush();
for (WriteAction action : actions) {
action.run();
}
actions.clear();
}
}
}
private class WriteAction {
private final byte[] b;
private WriteAction(byte[] b, int off, int len) {
this.b = new byte[len]; // b passed in can be reused
System.arraycopy(b, off, this.b, 0, len);
}
public void run() throws IOException {
writer.getOutputStream().write(b, 0, b.length);
writer.getOutputStream().flush();
}
}
}
}