InboundSseEventImpl.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cxf.jaxrs.sse.client;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.OptionalLong;
import java.util.logging.Logger;
import jakarta.ws.rs.core.GenericType;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.MultivaluedHashMap;
import jakarta.ws.rs.core.MultivaluedMap;
import jakarta.ws.rs.ext.MessageBodyReader;
import jakarta.ws.rs.sse.InboundSseEvent;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.jaxrs.client.ClientProviderFactory;
import org.apache.cxf.message.Message;
public final class InboundSseEventImpl implements InboundSseEvent {
private final String id;
private final String name;
private final String comment;
private final long reconnectDelay;
private final boolean reconnectDelaySet;
private final String data;
private final ClientProviderFactory factory;
private final Message message;
static class Builder {
private static final Logger LOG = LogUtils.getL7dLogger(Builder.class);
private String name; /* the default event type would be "message" */
private String id;
private String comment;
private OptionalLong reconnectDelay = OptionalLong.empty();
private String data;
Builder() {
}
Builder id(String i) {
this.id = i;
return this;
}
Builder name(String n) {
this.name = n;
return this;
}
Builder comment(String cmt) {
this.comment = cmt;
return this;
}
Builder reconnectDelay(String rd) {
try {
this.reconnectDelay = OptionalLong.of(Long.parseLong(rd));
} catch (final NumberFormatException ex) {
LOG.warning("Unable to parse reconnectDelay, long number expected: " + ex.getMessage());
}
return this;
}
Builder appendData(String d) {
this.data = this.data == null ? d : this.data + '\n' + d;
return this;
}
InboundSseEvent build(ClientProviderFactory factory, Message message) {
return new InboundSseEventImpl(id, name, comment, reconnectDelay.orElse(RECONNECT_NOT_SET),
reconnectDelay.isPresent(), data, factory, message);
}
}
//CHECKSTYLE:OFF
InboundSseEventImpl(String id, String name, String comment, long reconnectDelay, boolean reconnectDelaySet,
String data, ClientProviderFactory factory, Message message) {
//CHECKSTYLE:ON
this.id = id;
this.name = name;
this.comment = comment;
this.reconnectDelay = reconnectDelay;
this.reconnectDelaySet = reconnectDelaySet;
this.data = data;
this.factory = factory;
this.message = message;
}
@Override
public String getId() {
return id;
}
@Override
public String getName() {
return name;
}
@Override
public String getComment() {
return comment;
}
@Override
public long getReconnectDelay() {
return reconnectDelay;
}
@Override
public boolean isReconnectDelaySet() {
return reconnectDelaySet;
}
@Override
public boolean isEmpty() {
return false;
}
@Override
public String readData() {
return data;
}
@Override
public <T> T readData(Class<T> type) {
return read(type, type, MediaType.TEXT_PLAIN_TYPE);
}
@Override
@SuppressWarnings("unchecked")
public <T> T readData(GenericType<T> type) {
return read((Class<T>)type.getRawType(), type.getType(), MediaType.TEXT_PLAIN_TYPE);
}
@Override
public <T> T readData(Class<T> messageType, MediaType mediaType) {
return read(messageType, messageType, mediaType);
}
@Override
@SuppressWarnings("unchecked")
public <T> T readData(GenericType<T> type, MediaType mediaType) {
return read((Class<T>)type.getRawType(), type.getType(), mediaType);
}
private <T> T read(Class<T> messageType, Type type, MediaType mediaType) {
if (data == null) {
return null;
}
final Annotation[] annotations = new Annotation[0];
final MultivaluedMap<String, String> headers = new MultivaluedHashMap<>(0);
final MessageBodyReader<T> reader = factory.createMessageBodyReader(messageType, type,
annotations, mediaType, message);
if (reader == null) {
throw new RuntimeException("No suitable message body reader for class: " + messageType.getName());
}
try (ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8))) {
return reader.readFrom(messageType, type, annotations, mediaType, headers, is);
} catch (final IOException ex) {
throw new RuntimeException("Unable to read data of type " + messageType.getName(), ex);
}
}
}