DefaultBinding.java
/*
* Copyright 2013-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.stream.messaging.DirectWithAttributesChannel;
import org.springframework.context.Lifecycle;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.core.Pausable;
import org.springframework.integration.support.context.NamedComponent;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
* Default implementation for a {@link Binding}.
*
* @param <T> type of binding
* @author Jennifer Hickey
* @author Mark Fisher
* @author Gary Russell
* @author Marius Bogoevici
* @author Oleg Zhurakousky
* @author Myeonghyeon Lee
* @author Soby Chacko
* @author Byungjun You
*/
@JsonPropertyOrder({ "bindingName", "name", "group", "pausable", "state" })
@JsonIgnoreProperties("running")
public class DefaultBinding<T> implements Binding<T> {
protected final String name;
protected final String group;
protected final T target;
protected final Lifecycle lifecycle;
private final Log logger = LogFactory.getLog(this.getClass().getName());
private boolean paused;
private boolean restartable;
private Lifecycle companion;
/**
* Creates an instance that associates a given name, group and binding target with an
* optional {@link Lifecycle} component, which will be stopped during unbinding.
* @param name the name of the binding target
* @param group the group (only for input targets)
* @param target the binding target
* @param lifecycle {@link Lifecycle} that runs while the binding is active and will
* be stopped during unbinding
*/
public DefaultBinding(String name, String group, T target, Lifecycle lifecycle) {
Assert.notNull(target, "target must not be null");
this.name = name;
this.group = group;
this.target = target;
this.lifecycle = lifecycle;
this.restartable = StringUtils.hasText(group);
}
public DefaultBinding(String name, T target, Lifecycle lifecycle) {
this(name, null, target, lifecycle);
this.restartable = true;
}
@Override
public String getName() {
return this.name;
}
@Override
public String getBindingName() {
String resolvedName = (this.target instanceof IntegrationObjectSupport integrationObjectSupportTarget)
? integrationObjectSupportTarget.getComponentName() : getName();
return resolvedName == null ? getName() : resolvedName;
}
public String getGroup() {
return this.group;
}
public String getState() {
String state = "N/A";
if (this.lifecycle != null) {
if (isPausable()) {
state = this.paused ? "paused" : this.getRunningState();
}
else {
state = this.getRunningState();
}
}
return state;
}
@Override
public boolean isRunning() {
return this.lifecycle != null && this.lifecycle.isRunning();
}
public boolean isPausable() {
return this.lifecycle instanceof Pausable;
}
@Override
public boolean isPaused() {
return this.paused;
}
@Override
public synchronized void start() {
if (this.companion != null) {
this.companion.start();
}
if (!this.isRunning()) {
if (this.lifecycle != null && this.restartable) {
this.lifecycle.start();
}
else {
this.logger.warn("Can not re-bind an anonymous binding");
}
}
}
@Override
public synchronized void stop() {
if (this.companion != null) {
this.companion.stop();
}
if (this.isRunning()) {
this.lifecycle.stop();
}
// See https://github.com/spring-cloud/spring-cloud-stream/issues/3083 for more details
if (target instanceof DirectWithAttributesChannel attributeChannel) {
attributeChannel.destroy();
}
}
@Override
public synchronized void pause() {
if (this.lifecycle instanceof Pausable pausableLifecycle) {
pausableLifecycle.pause();
this.paused = true;
}
else {
this.logger
.warn("Attempted to pause a component that does not support Pausable "
+ this.lifecycle);
}
}
@Override
public synchronized void resume() {
if (this.lifecycle instanceof Pausable pausableLifecycle) {
pausableLifecycle.resume();
this.paused = false;
}
else {
this.logger.warn(
"Attempted to resume a component that does not support Pausable "
+ this.lifecycle);
}
}
@Override
public void unbind() {
this.stop();
afterUnbind();
}
Lifecycle getEndpoint() {
return this.lifecycle;
}
@Override
public String toString() {
return " Binding [name=" + this.name + ", target=" + this.target + ", lifecycle="
+ ((this.lifecycle instanceof NamedComponent namedComponentWithLifeCycle)
? namedComponentWithLifeCycle.getComponentName()
: ObjectUtils.nullSafeToString(this.lifecycle))
+ "]";
}
/**
* Listener method that executes after unbinding. Subclasses can implement their own
* behaviour on unbinding by overriding this method.
*/
protected void afterUnbind() {
}
private String getRunningState() {
return isRunning() ? "running" : "stopped";
}
/**
* Sets the companion Lifecycle.
* In most cases, when dealing with message producer (e.g., Supplier), performing
* any lifecycle operation on it does nothing as we may need to also perform the same operation on its companion
* object (e.g., SourcePollingChannelAdapter)
*
* @param companion instance of companion {@link Lifecycle} object
*/
public void setCompanion(Lifecycle companion) {
this.companion = companion;
}
}