ReactiveMongoPersistentEntityIndexCreator.java
/*
* Copyright 2018-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core.index;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.mongodb.UncategorizedMongoDbException;
import org.springframework.data.mongodb.core.index.MongoPersistentEntityIndexResolver.IndexDefinitionHolder;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.util.MongoDbErrorCodes;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import com.mongodb.MongoException;
/**
* Component that inspects {@link MongoPersistentEntity} instances contained in the given {@link MongoMappingContext}
* for indexing metadata and ensures the indexes to be available using reactive infrastructure.
*
* @author Mark Paluch
* @since 2.1
*/
public class ReactiveMongoPersistentEntityIndexCreator {
private static final Log LOGGER = LogFactory.getLog(ReactiveMongoPersistentEntityIndexCreator.class);
private final Map<Class<?>, Boolean> classesSeen = new ConcurrentHashMap<Class<?>, Boolean>();
private final MongoMappingContext mappingContext;
private final ReactiveIndexOperationsProvider operationsProvider;
private final IndexResolver indexResolver;
/**
* Creates a new {@link ReactiveMongoPersistentEntityIndexCreator} for the given {@link MongoMappingContext},
* {@link ReactiveIndexOperationsProvider}.
*
* @param mappingContext must not be {@literal null}.
* @param operationsProvider must not be {@literal null}.
*/
public ReactiveMongoPersistentEntityIndexCreator(MongoMappingContext mappingContext,
ReactiveIndexOperationsProvider operationsProvider) {
this(mappingContext, operationsProvider, IndexResolver.create(mappingContext));
}
/**
* Creates a new {@link ReactiveMongoPersistentEntityIndexCreator} for the given {@link MongoMappingContext},
* {@link ReactiveIndexOperationsProvider}, and {@link IndexResolver}.
*
* @param mappingContext must not be {@literal null}.
* @param operationsProvider must not be {@literal null}.
* @param indexResolver must not be {@literal null}.
*/
public ReactiveMongoPersistentEntityIndexCreator(MongoMappingContext mappingContext,
ReactiveIndexOperationsProvider operationsProvider, IndexResolver indexResolver) {
Assert.notNull(mappingContext, "MongoMappingContext must not be null");
Assert.notNull(operationsProvider, "ReactiveIndexOperations must not be null");
Assert.notNull(indexResolver, "IndexResolver must not be null");
this.mappingContext = mappingContext;
this.operationsProvider = operationsProvider;
this.indexResolver = indexResolver;
}
/**
* Returns whether the current index creator was registered for the given {@link MappingContext}.
*
* @param context
* @return
*/
public boolean isIndexCreatorFor(MappingContext<?, ?> context) {
return this.mappingContext.equals(context);
}
/**
* Inspect entities for index creation.
*
* @return a {@link Mono} that completes without value after indexes were created.
*/
public Mono<Void> checkForIndexes(MongoPersistentEntity<?> entity) {
Class<?> type = entity.getType();
if (!classesSeen.containsKey(type)) {
if (this.classesSeen.put(type, Boolean.TRUE) == null) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Analyzing class " + type + " for index information");
}
return checkForAndCreateIndexes(entity);
}
}
return Mono.empty();
}
private Mono<Void> checkForAndCreateIndexes(MongoPersistentEntity<?> entity) {
List<Mono<?>> publishers = new ArrayList<>();
if (entity.isAnnotationPresent(Document.class)) {
String collection = entity.getCollection();
for (IndexDefinition indexDefinition : indexResolver.resolveIndexFor(entity.getTypeInformation())) {
IndexDefinitionHolder indexToCreate = indexDefinition instanceof IndexDefinitionHolder definitionHolder
? definitionHolder
: new IndexDefinitionHolder("", indexDefinition, collection);
publishers.add(createIndex(indexToCreate));
}
}
return publishers.isEmpty() ? Mono.empty() : Flux.merge(publishers).then();
}
Mono<String> createIndex(IndexDefinitionHolder indexDefinition) {
return operationsProvider.indexOps(indexDefinition.getCollection()).ensureIndex(indexDefinition) //
.onErrorResume(ReactiveMongoPersistentEntityIndexCreator::isDataIntegrityViolation,
e -> translateException(e, indexDefinition));
}
private Mono<? extends String> translateException(Throwable e, IndexDefinitionHolder indexDefinition) {
Mono<IndexInfo> existingIndex = fetchIndexInformation(indexDefinition);
Mono<String> defaultError = Mono.error(new DataIntegrityViolationException(
String.format("Cannot create index for '%s' in collection '%s' with keys '%s' and options '%s'",
indexDefinition.getPath(), indexDefinition.getCollection(), indexDefinition.getIndexKeys(),
indexDefinition.getIndexOptions()),
e.getCause()));
return existingIndex.flatMap(it -> {
return Mono.<String> error(new DataIntegrityViolationException(
String.format("Index already defined as '%s'", indexDefinition.getPath()), e.getCause()));
}).switchIfEmpty(defaultError);
}
private Mono<IndexInfo> fetchIndexInformation(IndexDefinitionHolder indexDefinition) {
Object indexNameToLookUp = indexDefinition.getIndexOptions().get("name");
Flux<IndexInfo> existingIndexes = operationsProvider.indexOps(indexDefinition.getCollection()).getIndexInfo();
return existingIndexes //
.filter(indexInfo -> ObjectUtils.nullSafeEquals(indexNameToLookUp, indexInfo.getName())) //
.next() //
.doOnError(e -> {
if(LOGGER.isDebugEnabled()) {
LOGGER.debug(
String.format("Failed to load index information for collection '%s'", indexDefinition.getCollection()),
e);
}
});
}
private static boolean isDataIntegrityViolation(Throwable t) {
if (t instanceof UncategorizedMongoDbException) {
return t.getCause() instanceof MongoException mongoException
&& MongoDbErrorCodes.isDataIntegrityViolationCode(mongoException.getCode());
}
return false;
}
}