ChangeStreamEvent.java
/*
* Copyright 2018-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.core;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.Document;
import org.jspecify.annotations.Nullable;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.OperationType;
/**
* {@link Message} implementation specific to MongoDB <a href="https://docs.mongodb.com/manual/changeStreams/">Change
* Streams</a>.
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Myroslav Kosinskyi
* @since 2.1
*/
public class ChangeStreamEvent<T> {
@SuppressWarnings("rawtypes") //
private static final AtomicReferenceFieldUpdater<ChangeStreamEvent, Object> CONVERTED_FULL_DOCUMENT_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(ChangeStreamEvent.class, Object.class, "convertedFullDocument");
@SuppressWarnings("rawtypes") //
private static final AtomicReferenceFieldUpdater<ChangeStreamEvent, Object> CONVERTED_FULL_DOCUMENT_BEFORE_CHANGE_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(ChangeStreamEvent.class, Object.class, "convertedFullDocumentBeforeChange");
private final @Nullable ChangeStreamDocument<Document> raw;
private final Class<T> targetType;
private final MongoConverter converter;
// accessed through CONVERTED_FULL_DOCUMENT_UPDATER.
private volatile @Nullable T convertedFullDocument;
// accessed through CONVERTED_FULL_DOCUMENT_BEFORE_CHANGE_UPDATER.
private volatile @Nullable T convertedFullDocumentBeforeChange;
/**
* @param raw can be {@literal null}.
* @param targetType must not be {@literal null}.
* @param converter must not be {@literal null}.
*/
public ChangeStreamEvent(@Nullable ChangeStreamDocument<Document> raw, Class<T> targetType,
MongoConverter converter) {
this.raw = raw;
this.targetType = targetType;
this.converter = converter;
}
/**
* Get the raw {@link ChangeStreamDocument} as emitted by the driver.
*
* @return can be {@literal null}.
*/
public @Nullable ChangeStreamDocument<Document> getRaw() {
return raw;
}
/**
* Get the {@link ChangeStreamDocument#getClusterTime() cluster time} as {@link Instant} the event was emitted at.
*
* @return can be {@literal null}.
*/
public @Nullable Instant getTimestamp() {
return getBsonTimestamp() != null && raw != null
? converter.getConversionService().convert(raw.getClusterTime(), Instant.class)
: null;
}
/**
* Get the {@link ChangeStreamDocument#getClusterTime() cluster time}.
*
* @return can be {@literal null}.
* @since 2.2
*/
@Nullable
public BsonTimestamp getBsonTimestamp() {
return raw != null ? raw.getClusterTime() : null;
}
/**
* Get the {@link ChangeStreamDocument#getResumeToken() resume token} for this event.
*
* @return can be {@literal null}.
*/
public @Nullable BsonValue getResumeToken() {
return raw != null ? raw.getResumeToken() : null;
}
/**
* Get the {@link ChangeStreamDocument#getOperationType() operation type} for this event.
*
* @return can be {@literal null}.
*/
public @Nullable OperationType getOperationType() {
return raw != null ? raw.getOperationType() : null;
}
/**
* Get the database name the event was originated at.
*
* @return can be {@literal null}.
*/
public @Nullable String getDatabaseName() {
return raw != null ? raw.getNamespace().getDatabaseName() : null;
}
/**
* Get the collection name the event was originated at.
*
* @return can be {@literal null}.
*/
public @Nullable String getCollectionName() {
return raw != null ? raw.getNamespace().getCollectionName() : null;
}
/**
* Get the potentially converted {@link ChangeStreamDocument#getFullDocument()}.
*
* @return {@literal null} when {@link #getRaw()} or {@link ChangeStreamDocument#getFullDocument()} is
* {@literal null}.
*/
public @Nullable T getBody() {
if (raw == null || raw.getFullDocument() == null) {
return null;
}
return getConvertedFullDocument(raw.getFullDocument());
}
/**
* Get the potentially converted {@link ChangeStreamDocument#getFullDocumentBeforeChange() document} before being
* changed.
*
* @return {@literal null} when {@link #getRaw()} or {@link ChangeStreamDocument#getFullDocumentBeforeChange()} is
* {@literal null}.
* @since 4.0
*/
public @Nullable T getBodyBeforeChange() {
if (raw == null || raw.getFullDocumentBeforeChange() == null) {
return null;
}
return getConvertedFullDocumentBeforeChange(raw.getFullDocumentBeforeChange());
}
@SuppressWarnings("unchecked")
private T getConvertedFullDocumentBeforeChange(Document fullDocument) {
return (T) doGetConverted(fullDocument, CONVERTED_FULL_DOCUMENT_BEFORE_CHANGE_UPDATER);
}
@SuppressWarnings("unchecked")
private T getConvertedFullDocument(Document fullDocument) {
return (T) doGetConverted(fullDocument, CONVERTED_FULL_DOCUMENT_UPDATER);
}
@SuppressWarnings("NullAway")
private Object doGetConverted(Document fullDocument, AtomicReferenceFieldUpdater<ChangeStreamEvent, Object> updater) {
Object result = updater.get(this);
if (result != null) {
return result;
}
if (ClassUtils.isAssignable(Document.class, fullDocument.getClass())) {
result = converter.read(targetType, fullDocument);
return updater.compareAndSet(this, null, result) ? result : updater.get(this);
}
if (converter.getConversionService().canConvert(fullDocument.getClass(), targetType)) {
result = converter.getConversionService().convert(fullDocument, targetType);
return updater.compareAndSet(this, null, result) ? result : updater.get(this);
}
throw new IllegalArgumentException(
String.format("No converter found capable of converting %s to %s", fullDocument.getClass(), targetType));
}
@Override
public String toString() {
return "ChangeStreamEvent {" + "raw=" + raw + ", targetType=" + targetType + '}';
}
@Override
public boolean equals(@Nullable Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
ChangeStreamEvent<?> that = (ChangeStreamEvent<?>) o;
if (!ObjectUtils.nullSafeEquals(this.raw, that.raw)) {
return false;
}
return ObjectUtils.nullSafeEquals(this.targetType, that.targetType);
}
@Override
public int hashCode() {
int result = raw != null ? raw.hashCode() : 0;
result = 31 * result + ObjectUtils.nullSafeHashCode(targetType);
return result;
}
}