SseExecutorBuilder.java
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.sse;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.sse.impl.SseParser;
import org.apache.hc.core5.util.Args;
/**
* Builder for {@link SseExecutor}.
*
* <p>Use this builder when you want to provide defaults (headers, reconnect policy,
* parser strategy, custom executors, etc.) for all {@link EventSource}s opened
* through the resulting {@link SseExecutor}.</p>
*
* <p>If no {@link CloseableHttpAsyncClient} is supplied, the process-wide shared client
* from {@link SseExecutor#getSharedClient()} is used and {@link SseExecutor#close()} becomes
* a no-op.</p>
*
* <h3>Example</h3>
* <pre>{@code
* SseExecutor exec = SseExecutor.custom()
* .setEventSourceConfig(
* EventSourceConfig.builder()
* .backoff(new ExponentialJitterBackoff(1000, 30000, 2.0, 250))
* .maxReconnects(-1)
* .build())
* .addDefaultHeader("User-Agent", "my-sse-client/1.0")
* .setParserStrategy(SseParser.BYTE)
* .build();
* }</pre>
*
* @since 5.7
*/
public final class SseExecutorBuilder {
private CloseableHttpAsyncClient client;
private ScheduledExecutorService scheduler; // optional
private Executor callbackExecutor; // optional
private EventSourceConfig config = EventSourceConfig.DEFAULT;
private final LinkedHashMap<String, String> defaultHeaders = new LinkedHashMap<>();
private SseParser parserStrategy = SseParser.CHAR;
SseExecutorBuilder() {
}
/**
* Supplies a custom async HTTP client. The caller owns its lifecycle and
* {@link SseExecutor#close()} will close it.
* @param client the client to use
* @return this builder
*/
public SseExecutorBuilder setHttpClient(final CloseableHttpAsyncClient client) {
this.client = Args.notNull(client, "HTTP Async Client");
return this;
}
/**
* Sets the scheduler to use for reconnect delays. If not provided, the internal shared
* scheduler is used.
* @param scheduler the scheduler to use
* @return this builder
*/
public SseExecutorBuilder setScheduler(final ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
return this;
}
/**
* Sets the executor used to dispatch {@link EventSourceListener} callbacks.
* If not provided, callbacks run inline on the I/O thread.
* @param callbackExecutor the executor to use
* @return this builder
*/
public SseExecutorBuilder setCallbackExecutor(final Executor callbackExecutor) {
this.callbackExecutor = callbackExecutor;
return this;
}
/**
* Sets the default reconnect/backoff configuration applied to opened streams.
* @param cfg the reconnect configuration
* @return this builder
*/
public SseExecutorBuilder setEventSourceConfig(final EventSourceConfig cfg) {
this.config = Args.notNull(cfg, "EventSourceConfig");
return this;
}
/**
* Replaces the default headers (sent on every opened stream).
* @param headers the headers to use
* @return this builder
*/
public SseExecutorBuilder setDefaultHeaders(final Map<String, String> headers) {
this.defaultHeaders.clear();
if (headers != null && !headers.isEmpty()) {
this.defaultHeaders.putAll(headers);
}
return this;
}
/**
* Adds or replaces a single default header.
* @param name the header name
* @param value the header value
* @return this builder
*/
public SseExecutorBuilder addDefaultHeader(final String name, final String value) {
this.defaultHeaders.put(Args.notNull(name, "name"), value);
return this;
}
/**
* Chooses the parser strategy: {@link SseParser#CHAR} (spec-level, default)
* or {@link SseParser#BYTE} (byte-level framing with minimal decoding).
* @param parser the parser strategy to use
* @return this builder
*/
public SseExecutorBuilder setParserStrategy(final SseParser parser) {
this.parserStrategy = parser != null ? parser : SseParser.CHAR;
return this;
}
/**
* Builds the {@link SseExecutor}.
* @return a new {@link SseExecutor}
*/
public SseExecutor build() {
final CloseableHttpAsyncClient c = (client != null) ? client : SseExecutor.getSharedClient();
final boolean isShared = c == SseExecutor.SHARED_CLIENT;
final Map<String, String> dh = defaultHeaders.isEmpty()
? Collections.emptyMap()
: new LinkedHashMap<>(defaultHeaders);
return new SseExecutor(c, isShared, scheduler, callbackExecutor, config, dh, parserStrategy);
}
}