ReactiveGridFsTemplate.java
/*
* Copyright 2019-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.gridfs;
import static org.springframework.data.mongodb.core.query.Query.*;
import static org.springframework.data.mongodb.gridfs.GridFsCriteria.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.nio.ByteBuffer;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.jspecify.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.SerializationUtils;
import org.springframework.data.mongodb.util.BsonUtils;
import org.springframework.data.util.Lazy;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.client.gridfs.model.GridFSUploadOptions;
import com.mongodb.reactivestreams.client.gridfs.GridFSBucket;
import com.mongodb.reactivestreams.client.gridfs.GridFSBuckets;
import com.mongodb.reactivestreams.client.gridfs.GridFSFindPublisher;
import com.mongodb.reactivestreams.client.gridfs.GridFSUploadPublisher;
/**
* {@link ReactiveGridFsOperations} implementation to store content into MongoDB GridFS. Uses by default
* {@link DefaultDataBufferFactory} to create {@link DataBuffer buffers}.
*
* @author Mark Paluch
* @author Nick Stolwijk
* @author Denis Zavedeev
* @author Christoph Strobl
* @author Mathieu Ouellet
* @since 2.2
*/
public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements ReactiveGridFsOperations {
private final DataBufferFactory dataBufferFactory;
private final Mono<GridFSBucket> bucketSupplier;
/**
* Creates a new {@link ReactiveGridFsTemplate} using the given {@link ReactiveMongoDatabaseFactory} and
* {@link MongoConverter}.
* <p>
* Note that the {@link GridFSBucket} is obtained only once from
* {@link ReactiveMongoDatabaseFactory#getMongoDatabase() MongoDatabase}. Use
* {@link #ReactiveGridFsTemplate(MongoConverter, Mono, DataBufferFactory)} if you want to use different buckets from
* the same Template instance.
*
* @param dbFactory must not be {@literal null}.
* @param converter must not be {@literal null}.
*/
public ReactiveGridFsTemplate(ReactiveMongoDatabaseFactory dbFactory, MongoConverter converter) {
this(dbFactory, converter, null);
}
/**
* Creates a new {@link ReactiveGridFsTemplate} using the given {@link ReactiveMongoDatabaseFactory} and
* {@link MongoConverter}.
* <p>
* Note that the {@link GridFSBucket} is obtained only once from
* {@link ReactiveMongoDatabaseFactory#getMongoDatabase() MongoDatabase}. Use
* {@link #ReactiveGridFsTemplate(MongoConverter, Mono, DataBufferFactory)} if you want to use different buckets from
* the same Template instance.
*
* @param dbFactory must not be {@literal null}.
* @param converter must not be {@literal null}.
* @param bucket can be {@literal null}.
*/
public ReactiveGridFsTemplate(ReactiveMongoDatabaseFactory dbFactory, MongoConverter converter,
@Nullable String bucket) {
this(new DefaultDataBufferFactory(), dbFactory, converter, bucket);
}
/**
* Creates a new {@link ReactiveGridFsTemplate} using the given {@link DataBufferFactory},
* {@link ReactiveMongoDatabaseFactory} and {@link MongoConverter}.
* <p>
* Note that the {@link GridFSBucket} is obtained only once from
* {@link ReactiveMongoDatabaseFactory#getMongoDatabase() MongoDatabase}. Use
* {@link #ReactiveGridFsTemplate(MongoConverter, Mono, DataBufferFactory)} if you want to use different buckets from
* the same Template instance.
*
* @param dataBufferFactory must not be {@literal null}.
* @param dbFactory must not be {@literal null}.
* @param converter must not be {@literal null}.
* @param bucket can be {@literal null}.
*/
public ReactiveGridFsTemplate(DataBufferFactory dataBufferFactory, ReactiveMongoDatabaseFactory dbFactory,
MongoConverter converter, @Nullable String bucket) {
this(converter, Mono.defer(Lazy.of(() -> doGetBucket(dbFactory, bucket))), dataBufferFactory);
}
/**
* Creates a new {@link ReactiveGridFsTemplate} using the given {@link MongoConverter}, {@link Mono} emitting a
* {@link ReactiveMongoDatabaseFactory} and {@link DataBufferFactory}.
*
* @param converter must not be {@literal null}.
* @param gridFSBucket must not be {@literal null}.
* @param dataBufferFactory must not be {@literal null}.
* @since 4.2
*/
public ReactiveGridFsTemplate(MongoConverter converter, Mono<GridFSBucket> gridFSBucket,
DataBufferFactory dataBufferFactory) {
super(converter);
Assert.notNull(gridFSBucket, "GridFSBucket Mono must not be null");
Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null");
this.bucketSupplier = gridFSBucket;
this.dataBufferFactory = dataBufferFactory;
}
@Override
public Mono<ObjectId> store(Publisher<DataBuffer> content, @Nullable String filename, @Nullable String contentType,
@Nullable Object metadata) {
return store(content, filename, contentType, toDocument(metadata));
}
@Override
@SuppressWarnings("unchecked")
public <T> Mono<T> store(GridFsObject<T, Publisher<DataBuffer>> upload) {
GridFSUploadOptions uploadOptions = computeUploadOptionsFor(upload.getOptions().getContentType(),
upload.getOptions().getMetadata());
if (upload.getOptions().getChunkSize() > 0) {
uploadOptions.chunkSizeBytes(upload.getOptions().getChunkSize());
}
String filename = upload.getFilename();
Flux<ByteBuffer> source = Flux.from(upload.getContent()).map(DataBuffer::toByteBuffer);
T fileId = upload.getFileId();
if (fileId == null) {
return (Mono<T>) createMono(new AutoIdCreatingUploadCallback(filename, source, uploadOptions));
}
UploadCallback callback = new UploadCallback(BsonUtils.simpleToBsonValue(fileId), filename, source, uploadOptions);
return createMono(callback).thenReturn(fileId);
}
@Override
public Flux<GridFSFile> find(Query query) {
Document queryObject = getMappedQuery(query.getQueryObject());
Document sortObject = getMappedQuery(query.getSortObject());
return createFlux(new FindCallback(query, queryObject, sortObject));
}
@Override
public Mono<GridFSFile> findOne(Query query) {
Document queryObject = getMappedQuery(query.getQueryObject());
Document sortObject = getMappedQuery(query.getSortObject());
return createFlux(new FindLimitCallback(query, queryObject, sortObject, 2)) //
.collectList() //
.handle((files, sink) -> {
if (files.size() == 1) {
sink.next(files.get(0));
return;
}
if (files.size() > 1) {
sink.error(new IncorrectResultSizeDataAccessException(
"Query " + SerializationUtils.serializeToJsonSafely(query) + " returned non unique result.", 1));
}
});
}
@Override
public Mono<GridFSFile> findFirst(Query query) {
Document queryObject = getMappedQuery(query.getQueryObject());
Document sortObject = getMappedQuery(query.getSortObject());
return createFlux(new FindLimitCallback(query, queryObject, sortObject, 1)).next();
}
@Override
public Mono<Void> delete(Query query) {
return find(query).flatMap(it -> createMono(new DeleteCallback(it.getId()))).then();
}
@Override
public Mono<ReactiveGridFsResource> getResource(String location) {
Assert.notNull(location, "Filename must not be null");
return findOne(query(whereFilename().is(location))).flatMap(this::getResource)
.defaultIfEmpty(ReactiveGridFsResource.absent(location));
}
@Override
public Mono<ReactiveGridFsResource> getResource(GridFSFile file) {
Assert.notNull(file, "GridFSFile must not be null");
return doGetBucket()
.map(it -> new ReactiveGridFsResource(file, it.downloadToPublisher(file.getId()), dataBufferFactory));
}
@Override
public Flux<ReactiveGridFsResource> getResources(String locationPattern) {
if (!StringUtils.hasText(locationPattern)) {
return Flux.empty();
}
AntPath path = new AntPath(locationPattern);
if (path.isPattern()) {
Flux<GridFSFile> files = find(query(whereFilename().regex(path.toRegex())));
return files.flatMap(this::getResource);
}
return getResource(locationPattern).flux();
}
/**
* Create a reusable Mono for a {@link ReactiveBucketCallback}. It's up to the developer to choose to obtain a new
* {@link Flux} or to reuse the {@link Flux}.
*
* @param callback must not be {@literal null}
* @return a {@link Mono} wrapping the {@link ReactiveBucketCallback}.
*/
public <T> Mono<T> createMono(ReactiveBucketCallback<T> callback) {
Assert.notNull(callback, "ReactiveBucketCallback must not be null");
return doGetBucket().flatMap(bucket -> Mono.from(callback.doInBucket(bucket)));
}
/**
* Create a reusable Flux for a {@link ReactiveBucketCallback}. It's up to the developer to choose to obtain a new
* {@link Flux} or to reuse the {@link Flux}.
*
* @param callback must not be {@literal null}
* @return a {@link Flux} wrapping the {@link ReactiveBucketCallback}.
*/
public <T> Flux<T> createFlux(ReactiveBucketCallback<T> callback) {
Assert.notNull(callback, "ReactiveBucketCallback must not be null");
return doGetBucket().flatMapMany(callback::doInBucket);
}
protected Mono<GridFSBucket> doGetBucket() {
return bucketSupplier;
}
private static Mono<GridFSBucket> doGetBucket(ReactiveMongoDatabaseFactory dbFactory, @Nullable String bucket) {
Assert.notNull(dbFactory, "ReactiveMongoDatabaseFactory must not be null");
return dbFactory.getMongoDatabase()
.map(db -> bucket == null ? GridFSBuckets.create(db) : GridFSBuckets.create(db, bucket));
}
/**
* @param <T>
* @author Mathieu Ouellet
* @since 3.0
*/
interface ReactiveBucketCallback<T> {
Publisher<T> doInBucket(GridFSBucket bucket);
}
private static class FindCallback implements ReactiveBucketCallback<GridFSFile> {
private final Query query;
private final Document queryObject;
private final Document sortObject;
public FindCallback(Query query, Document queryObject, Document sortObject) {
this.query = query;
this.queryObject = queryObject;
this.sortObject = sortObject;
}
@Override
public GridFSFindPublisher doInBucket(GridFSBucket bucket) {
GridFSFindPublisher findPublisher = bucket.find(queryObject).sort(sortObject);
if (query.getLimit() > 0) {
findPublisher = findPublisher.limit(query.getLimit());
}
if (query.getSkip() > 0) {
findPublisher = findPublisher.skip(Math.toIntExact(query.getSkip()));
}
Integer cursorBatchSize = query.getMeta().getCursorBatchSize();
if (cursorBatchSize != null) {
findPublisher = findPublisher.batchSize(cursorBatchSize);
}
return findPublisher;
}
}
private static class FindLimitCallback extends FindCallback {
private final int limit;
public FindLimitCallback(Query query, Document queryObject, Document sortObject, int limit) {
super(query, queryObject, sortObject);
this.limit = limit;
}
@Override
public GridFSFindPublisher doInBucket(GridFSBucket bucket) {
return super.doInBucket(bucket).limit(limit);
}
}
private record UploadCallback(BsonValue fileId, String filename, Publisher<ByteBuffer> source,
GridFSUploadOptions uploadOptions) implements ReactiveBucketCallback<Void> {
@Override
public GridFSUploadPublisher<Void> doInBucket(GridFSBucket bucket) {
return bucket.uploadFromPublisher(fileId, filename, source, uploadOptions);
}
}
private record AutoIdCreatingUploadCallback(String filename, Publisher<ByteBuffer> source,
GridFSUploadOptions uploadOptions) implements ReactiveBucketCallback<ObjectId> {
@Override
public GridFSUploadPublisher<ObjectId> doInBucket(GridFSBucket bucket) {
return bucket.uploadFromPublisher(filename, source, uploadOptions);
}
}
private record DeleteCallback(BsonValue id) implements ReactiveBucketCallback<Void> {
@Override
public Publisher<Void> doInBucket(GridFSBucket bucket) {
return bucket.delete(id);
}
}
}