ReactiveGridFsResource.java
/*
* Copyright 2019-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.gridfs;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import org.bson.BsonValue;
import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.data.mongodb.util.BsonUtils;
import org.springframework.util.Assert;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.reactivestreams.client.gridfs.GridFSDownloadPublisher;
/**
* Reactive {@link GridFSFile} based {@link Resource} implementation. Note that the {@link #getDownloadStream() content}
* can be consumed only once.
*
* @author Mark Paluch
* @author Christoph Strobl
* @since 2.2
*/
public class ReactiveGridFsResource implements GridFsObject<Object, Publisher<DataBuffer>> {
private final AtomicBoolean consumed = new AtomicBoolean(false);
private final @Nullable Object id;
private final Options options;
private final String filename;
private final @Nullable GridFSDownloadPublisher downloadPublisher;
private final DataBufferFactory dataBufferFactory;
/**
* Creates a new, absent {@link ReactiveGridFsResource}.
*
* @param filename filename of the absent resource.
* @param downloadPublisher
*/
public ReactiveGridFsResource(String filename, @Nullable GridFSDownloadPublisher downloadPublisher) {
this(null, filename, Options.none(), downloadPublisher);
}
/**
* Creates a new, absent {@link ReactiveGridFsResource}.
*
* @param id
* @param filename filename of the absent resource.
* @param options
* @param downloadPublisher
* @since 3.0
*/
public ReactiveGridFsResource(@Nullable Object id, String filename, Options options,
@Nullable GridFSDownloadPublisher downloadPublisher) {
this(id, filename, options, downloadPublisher, new DefaultDataBufferFactory());
}
ReactiveGridFsResource(GridFSFile file, @Nullable GridFSDownloadPublisher downloadPublisher, DataBufferFactory dataBufferFactory) {
this(file.getId(), file.getFilename(), Options.from(file), downloadPublisher, dataBufferFactory);
}
/**
* Creates a new, absent {@link ReactiveGridFsResource}.
*
* @param id
* @param filename filename of the absent resource.
* @param options
* @param downloadPublisher
* @param dataBufferFactory
* @since 3.0
*/
ReactiveGridFsResource(@Nullable Object id, String filename, Options options,
@Nullable GridFSDownloadPublisher downloadPublisher, DataBufferFactory dataBufferFactory) {
this.id = id;
this.filename = filename;
this.options = options;
this.downloadPublisher = downloadPublisher;
this.dataBufferFactory = dataBufferFactory;
}
/**
* Obtain an absent {@link ReactiveGridFsResource}.
*
* @param filename filename of the absent resource, must not be {@literal null}.
* @return never {@literal null}.
* @since 2.1
*/
public static ReactiveGridFsResource absent(String filename) {
Assert.notNull(filename, "Filename must not be null");
return new ReactiveGridFsResource(filename, null);
}
@Override
public @Nullable Object getFileId() {
return id instanceof BsonValue bsonValue ? BsonUtils.toJavaType(bsonValue) : id;
}
/**
* @see org.springframework.core.io.AbstractResource#getFilename()
*/
public String getFilename() throws IllegalStateException {
return this.filename;
}
/**
* @return the underlying {@link GridFSFile}. Can be {@literal null} if absent.
* @since 2.2
*/
public Mono<GridFSFile> getGridFSFile() {
return downloadPublisher != null ? Mono.from(downloadPublisher.getGridFSFile()) : Mono.empty();
}
/**
* Obtain the data as {@link InputStream}. <br />
* <strong>NOTE:</strong> Buffers data in memory. Use {@link #getDownloadStream()} for large files.
*
* @throws IllegalStateException if the underlying {@link Publisher} has already been consumed.
* @see org.springframework.core.io.InputStreamResource#getInputStream()
* @see #getDownloadStream()
* @see DataBufferUtils#join(Publisher)
* @since 3.0
*/
public Mono<InputStream> getInputStream() throws IllegalStateException {
return getDownloadStream() //
.transform(DataBufferUtils::join) //
.as(Mono::from) //
.map(DataBuffer::asInputStream);
}
/**
* Obtain the download stream emitting chunks of data as they come in. <br />
*
* @return {@link Flux#empty()} if the file does not exist.
* @throws IllegalStateException if the underlying {@link Publisher} has already been consumed.
* @see org.springframework.core.io.InputStreamResource#getInputStream()
* @see #getDownloadStream()
* @see DataBufferUtils#join(Publisher)
* @since 3.0
*/
public Flux<DataBuffer> getDownloadStream() {
if (downloadPublisher == null) {
return Flux.empty();
}
return createDownloadStream(downloadPublisher);
}
@Override
public Flux<DataBuffer> getContent() {
return getDownloadStream();
}
@Override
public Options getOptions() {
return options;
}
/**
* Obtain the download stream emitting chunks of data with given {@code chunkSize} as they come in.
*
* @param chunkSize the preferred number of bytes per emitted {@link DataBuffer}.
* @return {@link Flux#empty()} if the file does not exist.
* @throws IllegalStateException if the underlying {@link Publisher} has already been consumed.
* @see org.springframework.core.io.InputStreamResource#getInputStream()
* @see #getDownloadStream()
* @see DataBufferUtils#join(Publisher)
* @since 3.0
*/
public Flux<DataBuffer> getDownloadStream(int chunkSize) {
if (downloadPublisher == null) {
return Flux.empty();
}
return createDownloadStream(downloadPublisher.bufferSizeBytes(chunkSize));
}
private Flux<DataBuffer> createDownloadStream(GridFSDownloadPublisher publisher) {
return Flux.from(publisher) //
.map(dataBufferFactory::wrap) //
.doOnSubscribe(it -> this.verifyStreamStillAvailable());
}
public boolean exists() {
return downloadPublisher != null;
}
private void verifyStreamStillAvailable() {
if (!consumed.compareAndSet(false, true)) {
throw new IllegalStateException("Stream already consumed.");
}
}
}