JacksonCoreProcessor.java
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.jackson.core.parser;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.async.ByteArrayFeeder;
import com.fasterxml.jackson.core.io.JsonEOFException;
import com.fasterxml.jackson.core.json.async.NonBlockingJsonParser;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.async.processor.SingleThreadedBufferingProcessor;
import io.micronaut.jackson.core.tree.JsonNodeTreeCodec;
import io.micronaut.jackson.core.tree.JsonStreamTransfer;
import io.micronaut.jackson.core.tree.TreeGenerator;
import io.micronaut.json.JsonStreamConfig;
import io.micronaut.json.tree.JsonNode;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Optional;
/**
* A Reactive streams publisher that publishes a {@link JsonNode} once the JSON has been fully consumed.
* Uses {@link NonBlockingJsonParser} internally allowing the parsing of
* JSON from an incoming stream of bytes in a non-blocking manner
*
* @author Graeme Rocher
* @since 1.0
*/
@Internal
public class JacksonCoreProcessor extends SingleThreadedBufferingProcessor<byte[], JsonNode> {
private static final Logger LOG = LoggerFactory.getLogger(JacksonCoreProcessor.class);
private NonBlockingJsonParser currentNonBlockingJsonParser;
private TreeGenerator currentGenerator = null;
private final JsonFactory jsonFactory;
private final JsonStreamConfig deserializationConfig;
private final JsonNodeTreeCodec treeCodec;
private final boolean streamArray;
private boolean started;
private boolean rootIsArray;
private boolean jsonStream;
/**
* Creates a new JacksonProcessor.
*
* @param streamArray Whether arrays should be streamed
* @param jsonFactory Factory to use for creating the parser
* @param deserializationConfig The deserialization configuration (in particular bignum handling)
*/
public JacksonCoreProcessor(boolean streamArray, JsonFactory jsonFactory, @NonNull JsonStreamConfig deserializationConfig) {
this.jsonFactory = jsonFactory;
this.streamArray = streamArray;
this.treeCodec = JsonNodeTreeCodec.getInstance().withConfig(deserializationConfig);
this.jsonStream = true;
this.deserializationConfig = deserializationConfig;
try {
this.currentNonBlockingJsonParser = (NonBlockingJsonParser) jsonFactory.createNonBlockingByteArrayParser();
} catch (IOException e) {
throw new IllegalStateException("Failed to create non-blocking JSON parser: " + e.getMessage(), e);
}
}
/**
* @return Whether more input is needed
*/
public boolean needMoreInput() {
return currentNonBlockingJsonParser.getNonBlockingInputFeeder().needMoreInput();
}
@Override
protected void doOnComplete() {
if (jsonStream && currentGenerator == null) {
super.doOnComplete();
} else if (needMoreInput()) {
doOnError(new JsonEOFException(currentNonBlockingJsonParser, JsonToken.NOT_AVAILABLE, "Unexpected end-of-input"));
} else {
super.doOnComplete();
}
}
@Override
protected void onUpstreamMessage(byte[] message) {
if (LOG.isTraceEnabled()) {
LOG.trace("Received upstream bytes of length: {}", message.length);
}
try {
if (message.length == 0) {
if (needMoreInput()) {
requestMoreInput();
}
return;
}
final ByteArrayFeeder byteFeeder = byteFeeder(message);
JsonToken event;
while ((event = currentNonBlockingJsonParser.nextToken()) != JsonToken.NOT_AVAILABLE) {
if (!started) {
started = true;
if (streamArray && event == JsonToken.START_ARRAY) {
rootIsArray = true;
jsonStream = false;
continue;
}
}
if (currentGenerator == null) {
if (event == JsonToken.END_ARRAY && rootIsArray) {
byteFeeder.endOfInput();
break;
}
currentGenerator = treeCodec.createTreeGenerator();
}
JsonStreamTransfer.transferCurrentToken(currentNonBlockingJsonParser, currentGenerator, deserializationConfig);
if (currentGenerator.isComplete()) {
publishNode(currentGenerator.getCompletedValue());
currentGenerator = null;
}
}
if (jsonStream) {
if (currentGenerator == null) {
byteFeeder.endOfInput();
}
requestMoreInput();
} else {
if (needMoreInput()) {
requestMoreInput();
}
}
} catch (IOException e) {
onError(e);
}
}
private void publishNode(final JsonNode root) {
final Optional<Subscriber<? super JsonNode>> opt = currentDownstreamSubscriber();
if (opt.isPresent()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Materialized new JsonNode call onNext...");
}
opt.get().onNext(root);
}
}
private void requestMoreInput() {
if (LOG.isTraceEnabled()) {
LOG.trace("More input required to parse JSON. Demanding more.");
}
upstreamSubscription.request(1);
upstreamDemand++;
}
private ByteArrayFeeder byteFeeder(byte[] message) throws IOException {
ByteArrayFeeder byteFeeder = currentNonBlockingJsonParser.getNonBlockingInputFeeder();
final boolean needMoreInput = byteFeeder.needMoreInput();
if (!needMoreInput) {
currentNonBlockingJsonParser = (NonBlockingJsonParser) jsonFactory.createNonBlockingByteArrayParser();
byteFeeder = currentNonBlockingJsonParser.getNonBlockingInputFeeder();
}
byteFeeder.feedInput(message, 0, message.length);
return byteFeeder;
}
}