JMSDestinationTest.java
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cxf.transport.jms;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionConsumer;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.ConnectionMetaData;
import jakarta.jms.DeliveryMode;
import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
import jakarta.jms.InvalidClientIDException;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
import jakarta.jms.Queue;
import jakarta.jms.ServerSessionPool;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.cxf.common.util.ReflectionUtil;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.security.SecurityContext;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.transport.MultiplexDestination;
import org.apache.cxf.transport.jms.util.ResourceCloser;
import org.junit.Ignore;
import org.junit.Test;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class JMSDestinationTest extends AbstractJMSTester {
private static class FaultyConnection implements Connection {
private final Connection delegate;
FaultyConnection(final Connection delegate) {
this.delegate = delegate;
}
@Override
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
return delegate.createSession(transacted, acknowledgeMode);
}
@Override
public String getClientID() throws JMSException {
return delegate.getClientID();
}
@Override
public void setClientID(String clientID) throws JMSException {
delegate.setClientID(clientID);
}
@Override
public ConnectionMetaData getMetaData() throws JMSException {
return delegate.getMetaData();
}
@Override
public ExceptionListener getExceptionListener() throws JMSException {
return delegate.getExceptionListener();
}
@Override
public void setExceptionListener(ExceptionListener listener) throws JMSException {
delegate.setExceptionListener(listener);
}
@Override
public void start() throws JMSException {
delegate.start();
}
@Override
public void stop() throws JMSException {
delegate.stop();
}
@Override
public void close() throws JMSException {
delegate.close();
}
@Override
public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
ServerSessionPool sessionPool, int maxMessages) throws JMSException {
return delegate.createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages);
}
@Override
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
return delegate.createDurableConnectionConsumer(topic, subscriptionName, messageSelector,
sessionPool, maxMessages);
}
@Override
public Session createSession(int sessionMode) throws JMSException {
return delegate.createSession(sessionMode);
}
@Override
public Session createSession() throws JMSException {
return delegate.createSession();
}
@Override
public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName,
String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
return delegate.createSharedConnectionConsumer(topic, subscriptionName,
messageSelector, sessionPool, maxMessages);
}
@Override
public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName,
String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
return delegate.createSharedDurableConnectionConsumer(topic, subscriptionName, messageSelector,
sessionPool, maxMessages);
}
}
private static final class FaultyConnectionFactory implements ConnectionFactory {
private final AtomicInteger latch;
private final ConnectionFactory delegate;
private final Function<Connection, Connection> wrapper;
private final AtomicInteger connectionsCreated = new AtomicInteger(0);
private FaultyConnectionFactory(ConnectionFactory delegate, int faults) {
this(delegate, FaultyConnection::new, faults);
}
private FaultyConnectionFactory(ConnectionFactory delegate,
Function<Connection, Connection> wrapper, int faults) {
this.delegate = delegate;
this.wrapper = wrapper;
this.latch = new AtomicInteger(faults);
}
@Override
public Connection createConnection() throws JMSException {
if (latch.getAndDecrement() <= 0) {
connectionsCreated.incrementAndGet();
return wrapper.apply(delegate.createConnection());
} else {
throw new JMSException("createConnection() failed (simulated)");
}
}
@Override
public Connection createConnection(String userName, String password) throws JMSException {
if (latch.decrementAndGet() <= 0) {
return wrapper.apply(delegate.createConnection(userName, password));
} else {
throw new JMSException("createConnection(userName, password) failed (simulated)");
}
}
@Override
public JMSContext createContext() {
return delegate.createContext();
}
@Override
public JMSContext createContext(String userName, String password) {
return delegate.createContext(userName, password);
}
@Override
public JMSContext createContext(String userName, String password, int sessionMode) {
return delegate.createContext(userName, password, sessionMode);
}
@Override
public JMSContext createContext(int sessionMode) {
return delegate.createContext(sessionMode);
}
}
@Test
public void testGetConfigurationFromWSDL() throws Exception {
EndpointInfo ei = setupServiceInfo("HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
JMSDestination destination = setupJMSDestination(ei);
assertEquals("Can't get the right AddressPolicy's Destination",
"test.jmstransport.binary",
destination.getJmsConfig().getTargetDestination());
destination.shutdown();
}
@Test
public void testDurableSubscriber() throws Exception {
EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort");
JMSConduit conduit = setupJMSConduitWithObserver(ei);
Message outMessage = createMessage();
JMSDestination destination = setupJMSDestination(ei);
destination.setMessageObserver(createMessageObserver());
// The JMSBroker (ActiveMQ 5.x) need to take some time to setup the DurableSubscriber
Thread.sleep(500L);
sendOneWayMessage(conduit, outMessage);
Message destMessage = waitForReceiveDestMessage();
assertNotNull("The destiantion should have got the message ", destMessage);
verifyReceivedMessage(destMessage);
verifyHeaders(destMessage, outMessage);
conduit.close();
destination.shutdown();
}
@Test(expected = InvalidClientIDException.class)
public void testDurableInvalidClientId() throws Throwable {
Connection con = cf1.createConnection();
JMSDestination destination = null;
try {
con.setClientID("testClient");
con.start();
EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort");
JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
jmsConfig.setDurableSubscriptionClientId("testClient");
jmsConfig.setDurableSubscriptionName("testsub");
jmsConfig.setConnectionFactory(cf);
destination = new JMSDestination(bus, ei, jmsConfig);
destination.setMessageObserver(createMessageObserver());
} catch (RuntimeException e) {
throw e.getCause();
} finally {
ResourceCloser.close(con);
destination.shutdown();
}
}
@Test
public void testOneWayDestination() throws Exception {
EndpointInfo ei = setupServiceInfo("HWStaticReplyQBinMsgService", "HWStaticReplyQBinMsgPort");
JMSDestination destination = setupJMSDestination(ei);
destination.setMessageObserver(createMessageObserver());
JMSConduit conduit = setupJMSConduitWithObserver(ei);
Message outMessage = createMessage();
sendOneWayMessage(conduit, outMessage);
// wait for the message to be get from the destination
Message destMessage = waitForReceiveDestMessage();
// just verify the Destination inMessage
assertNotNull("The destiantion should have got the message ", destMessage);
verifyReceivedMessage(destMessage);
verifyHeaders(destMessage, outMessage);
conduit.close();
destination.shutdown();
}
private static void setupMessageHeader(Message outMessage, String correlationId, String replyTo) {
JMSMessageHeadersType header = new JMSMessageHeadersType();
header.setJMSCorrelationID(correlationId);
header.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
header.setJMSPriority(1);
header.setTimeToLive(1000L);
header.setJMSReplyTo(replyTo);
outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header);
outMessage.put(Message.ENCODING, "US-ASCII");
}
private void verifyRequestResponseHeaders(Message msgIn, Message msgOut) {
JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut
.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
String inEncoding = (String) msgIn.get(Message.ENCODING);
String outEncoding = (String) msgOut.get(Message.ENCODING);
assertEquals("The message encoding should be equal", inEncoding, outEncoding);
JMSMessageHeadersType inHeader = (JMSMessageHeadersType)msgIn
.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
verifyJmsHeaderEquality(outHeader, inHeader);
}
@Test
public void testRoundTripDestination() throws Exception {
Message msg = testRoundTripDestination(true);
SecurityContext securityContext = msg.get(SecurityContext.class);
assertNotNull("SecurityContext should be set in message received by JMSDestination", securityContext);
assertEquals("Principal in SecurityContext should be", "testUser",
securityContext.getUserPrincipal().getName());
}
@Test
public void testRoundTripDestinationDoNotCreateSecurityContext() throws Exception {
Message msg = testRoundTripDestination(false);
SecurityContext securityContext = msg.get(SecurityContext.class);
assertNull("SecurityContext should not be set in message received by JMSDestination", securityContext);
}
private Message testRoundTripDestination(boolean createSecurityContext) throws Exception {
EndpointInfo ei = setupServiceInfo("HelloWorldService", "HelloWorldPort");
JMSConduit conduit = setupJMSConduitWithObserver(ei);
conduit.getJmsConfig().setCreateSecurityContext(createSecurityContext);
final Message outMessage = createMessage();
final JMSDestination destination = setupJMSDestination(ei);
// set up MessageObserver for handling the conduit message
MessageObserver observer = new MessageObserver() {
public void onMessage(Message m) {
Exchange exchange = new ExchangeImpl();
exchange.setInMessage(m);
m.setExchange(exchange);
verifyReceivedMessage(m);
verifyHeaders(m, outMessage);
// setup the message for
Conduit backConduit;
try {
backConduit = destination.getBackChannel(m);
// wait for the message to be got from the conduit
Message replyMessage = new MessageImpl();
sendOneWayMessage(backConduit, replyMessage);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
destination.setMessageObserver(observer);
sendMessageSync(conduit, outMessage);
// wait for the message to be got from the destination,
// create the thread to handler the Destination incoming message
verifyReceivedMessage(waitForReceiveInMessage());
// wait for a while for the jms session recycling
// Send a second message to check for an issue
// Where the session was closed the second time
sendMessageSync(conduit, outMessage);
Message inMessage = waitForReceiveInMessage();
verifyReceivedMessage(inMessage);
// wait for a while for the jms session recycling
// Thread.sleep(1000L);
conduit.close();
destination.shutdown();
return inMessage;
}
@Test
public void testProperty() throws Exception {
EndpointInfo ei = setupServiceInfo("HelloWorldService", "HelloWorldPort");
final String customPropertyName = "THIS_PROPERTY_WILL_NOT_BE_AUTO_COPIED";
// set up the conduit send to be true
JMSConduit conduit = setupJMSConduitWithObserver(ei);
final Message outMessage = createMessage();
JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
headers.putProperty(customPropertyName, customPropertyName);
final JMSDestination destination = setupJMSDestination(ei);
// set up MessageObserver for handling the conduit message
MessageObserver observer = new MessageObserver() {
public void onMessage(Message m) {
Exchange exchange = new ExchangeImpl();
exchange.setInMessage(m);
m.setExchange(exchange);
verifyReceivedMessage(m);
verifyHeaders(m, outMessage);
// setup the message for
Conduit backConduit;
try {
backConduit = destination.getBackChannel(m);
// wait for the message to be got from the conduit
Message replyMessage = new MessageImpl();
// copy the message encoding
replyMessage.put(Message.ENCODING, m.get(Message.ENCODING));
sendOneWayMessage(backConduit, replyMessage);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
destination.setMessageObserver(observer);
sendMessageSync(conduit, outMessage);
// wait for the message to be got from the destination,
// create the thread to handler the Destination incoming message
Message inMessage = waitForReceiveInMessage();
verifyReceivedMessage(inMessage);
verifyRequestResponseHeaders(inMessage, outMessage);
JMSMessageHeadersType inHeader = (JMSMessageHeadersType)inMessage
.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
assertNotNull("The inHeader should not be null", inHeader);
// TODO we need to check the SOAP JMS transport properties here
// wait for a while for the jms session recycling
// Thread.sleep(1000L);
conduit.close();
destination.shutdown();
}
@Test
public void testTemporaryQueueDeletionUponReset() throws Exception {
EndpointInfo ei = setupServiceInfo("HelloWorldService", "HelloWorldPort");
// Store the connection so we could check temporary queues
final AtomicReference<ActiveMQConnection> connectionHolder = new AtomicReference<>();
// set up the conduit send to be true
JMSConduit conduit = setupJMSConduitWithObserver(ei, c -> new ConnectionFactory() {
@Override
public Connection createConnection() throws JMSException {
final Connection connection = c.createConnection();
connectionHolder.set((ActiveMQConnection)connection);
return connection;
}
@Override
public Connection createConnection(String userName, String password) throws JMSException {
final Connection connection = c.createConnection(userName, password);
connectionHolder.set((ActiveMQConnection)connection);
return connection;
}
@Override
public JMSContext createContext() {
return c.createContext();
}
@Override
public JMSContext createContext(String userName, String password) {
return c.createContext(userName, password);
}
@Override
public JMSContext createContext(String userName, String password, int sessionMode) {
return c.createContext(userName, password, sessionMode);
}
@Override
public JMSContext createContext(int sessionMode) {
return c.createContext(sessionMode);
}
});
assertNull(conduit.getJmsConfig().getReplyDestination());
final Message outMessage = createMessage();
// Capture the DestinationSource instance associated with the connection
final JMSDestination destination = setupJMSDestination(ei);
// set up MessageObserver for handling the conduit message
final MessageObserver observer = new MessageObserver() {
public void onMessage(Message m) {
final Exchange exchange = new ExchangeImpl();
exchange.setInMessage(m);
m.setExchange(exchange);
try {
final Conduit backConduit = destination.getBackChannel(m);
sendOneWayMessage(backConduit, new MessageImpl());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
destination.setMessageObserver(observer);
sendMessageSync(conduit, outMessage);
// wait for the message to be got from the destination,
// create the thread to handler the Destination incoming message
Message inMessage = waitForReceiveInMessage();
verifyReceivedMessage(inMessage);
final ActiveMQConnection connection = connectionHolder.get();
assertThat(ReflectionUtil.accessDeclaredField("tempQueues", ActiveMQConnection.class,
connection, Set.class).size(), equalTo(1));
// Force manual temporary queue deletion by resetting the reply destination
conduit.getJmsConfig().resetCachedReplyDestination();
// The queue deletion events (as well as others) are propagated asynchronously
await()
.atMost(1, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(ReflectionUtil.accessDeclaredField("tempQueues", ActiveMQConnection.class,
connection, Set.class).size(), equalTo(0)));
conduit.close();
destination.shutdown();
}
@Test
public void testIsMultiplexCapable() throws Exception {
EndpointInfo ei = setupServiceInfo("HelloWorldService", "HelloWorldPort");
final JMSDestination destination = setupJMSDestination(ei);
destination.setMessageObserver(createMessageObserver());
assertTrue("is multiplex", destination instanceof MultiplexDestination);
destination.shutdown();
}
@Test
public void testSecurityContext() throws Exception {
SecurityContext ctx = testSecurityContext(true);
assertNotNull("SecurityContext should be set in message received by JMSDestination", ctx);
assertEquals("Principal in SecurityContext should be", "testUser",
ctx.getUserPrincipal().getName());
}
@Test
public void testDoNotCreateSecurityContext() throws Exception {
SecurityContext ctx = testSecurityContext(false);
assertNull("SecurityContext should not be set in message received by JMSDestination", ctx);
}
private SecurityContext testSecurityContext(boolean createSecurityContext) throws Exception {
EndpointInfo ei = setupServiceInfo("HelloWorldService", "HelloWorldPort");
final JMSDestination destination = setupJMSDestination(ei);
destination.getJmsConfig().setCreateSecurityContext(createSecurityContext);
destination.setMessageObserver(createMessageObserver());
// set up the conduit send to be true
JMSConduit conduit = setupJMSConduitWithObserver(ei);
final Message outMessage = createMessage();
sendOneWayMessage(conduit, outMessage);
Message destMessage = waitForReceiveDestMessage();
SecurityContext securityContext = destMessage.get(SecurityContext.class);
conduit.close();
destination.shutdown();
return securityContext;
}
@Test
@Ignore
public void testOneWayReplyToSetUnset() throws Exception {
/* 1. Test that replyTo destination set in WSDL is NOT used
* in spec compliant mode */
EndpointInfo ei = setupServiceInfo(
"HWStaticReplyQBinMsgService", "HWStaticReplyQBinMsgPort");
JMSConduit conduit = setupJMSConduitWithObserver(ei);
Message outMessage = createMessage();
JMSDestination destination = setupJMSDestination(ei);
destination.setMessageObserver(createMessageObserver());
sendOneWayMessage(conduit, outMessage);
Message destMessage = waitForReceiveDestMessage();
// just verify the Destination inMessage
assertNotNull("The destination should have got the message ", destMessage);
verifyReplyToNotSet(destMessage);
/* 2. Test that replyTo destination set in WSDL IS used
* in spec non-compliant mode */
sendOneWayMessage(conduit, outMessage);
destMessage = waitForReceiveDestMessage();
assertNotNull("The destination should have got the message ", destMessage);
String exName = getQueueName(conduit.getJmsConfig().getReplyDestination());
verifyReplyToSet(destMessage, Queue.class, exName);
/* 3. Test that replyTo destination provided via invocation context
* overrides the value set in WSDL and IS used in spec non-compliant mode */
String contextReplyTo = conduit.getJmsConfig().getReplyDestination() + ".context";
exName += ".context";
setupMessageHeader(outMessage, "cidValue", contextReplyTo);
sendOneWayMessage(conduit, outMessage);
destMessage = waitForReceiveDestMessage();
assertNotNull("The destiantion should have got the message ", destMessage);
verifyReplyToSet(destMessage, Queue.class, exName);
/* 4. Test that replyTo destination provided via invocation context
* and the value set in WSDL are NOT used in spec non-compliant mode
* when JMSConstants.JMS_SET_REPLY_TO == false */
setupMessageHeader(outMessage, null, null);
outMessage.put(JMSConstants.JMS_SET_REPLY_TO, Boolean.FALSE);
sendOneWayMessage(conduit, outMessage);
destMessage = waitForReceiveDestMessage();
assertNotNull("The destiantion should have got the message ", destMessage);
verifyReplyToNotSet(destMessage);
/* 5. Test that replyTo destination set in WSDL IS used in spec non-compliant
* mode when JMSConstants.JMS_SET_REPLY_TO == true */
setupMessageHeader(outMessage, null, null);
outMessage.put(JMSConstants.JMS_SET_REPLY_TO, Boolean.TRUE);
sendOneWayMessage(conduit, outMessage);
destMessage = waitForReceiveDestMessage();
assertNotNull("The destiantion should have got the message ", destMessage);
exName = getQueueName(conduit.getJmsConfig().getReplyDestination());
verifyReplyToSet(destMessage, Queue.class, exName);
conduit.close();
destination.shutdown();
}
@Test
public void testMessageObserverExceptionHandling() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort");
JMSConduit conduit = setupJMSConduitWithObserver(ei);
JMSDestination destination = setupJMSDestination(ei);
destination.setMessageObserver(new MessageObserver() {
@Override
public void onMessage(Message message) {
try {
throw new RuntimeException("Error!");
} finally {
latch.countDown();
}
}
});
final Message outMessage = createMessage();
Thread.sleep(500L);
sendOneWayMessage(conduit, outMessage);
latch.await(5, TimeUnit.SECONDS);
conduit.close();
destination.shutdown();
}
@Test
public void testConnectionFactoryExceptionHandling() throws Exception {
EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort");
final Function<ConnectionFactory, ConnectionFactory> wrapper =
new Function<ConnectionFactory, ConnectionFactory>() {
@Override
public ConnectionFactory apply(ConnectionFactory cf) {
return new FaultyConnectionFactory(cf, 3);
}
};
JMSConduit conduit = setupJMSConduitWithObserver(ei);
JMSDestination destination = setupJMSDestination(ei, wrapper);
destination.getJmsConfig().setRetryInterval(1000);
destination.setMessageObserver(createMessageObserver());
final Message outMessage = createMessage();
Thread.sleep(4000L);
sendOneWayMessage(conduit, outMessage);
// wait for the message to be got from the destination,
// create the thread to handler the Destination incoming message
Message inMessage = waitForReceiveDestMessage();
verifyReceivedMessage(inMessage);
conduit.close();
destination.shutdown();
}
@Test
public void testBrokerExceptionHandling() throws Exception {
EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort");
JMSConduit conduit = setupJMSConduitWithObserver(ei);
JMSDestination destination = setupJMSDestination(ei);
destination.getJmsConfig().setRetryInterval(1000);
destination.setMessageObserver(createMessageObserver());
Thread.sleep(500L);
broker.stop();
broker.start();
Thread.sleep(2000L);
final Message outMessage = createMessage();
sendOneWayMessage(conduit, outMessage);
// wait for the message to be got from the destination,
// create the thread to handler the Destination incoming message
Message inMessage = waitForReceiveDestMessage();
verifyReceivedMessage(inMessage);
conduit.close();
destination.shutdown();
}
@SuppressWarnings("unused")
@Test
public void testSessionsExceptionHandling() throws Exception {
final EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort");
final AtomicInteger sessionsToFail = new AtomicInteger(5);
final Function<Connection, Connection> connection = c -> new FaultyConnection(c) {
@Override
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
// Fail five times, starting with on successful call
final int value = sessionsToFail.getAndDecrement();
if (value >= 0 && value < 5) {
throw new JMSException("createSession() failed (simulated)");
} else {
return super.createSession(transacted, acknowledgeMode);
}
}
};
final FaultyConnectionFactory faultyConnectionFactory = new FaultyConnectionFactory(cf, connection, 0);
final Function<ConnectionFactory, ConnectionFactory> wrapper =
new Function<ConnectionFactory, ConnectionFactory>() {
@Override
public ConnectionFactory apply(ConnectionFactory cf) {
return faultyConnectionFactory;
}
};
JMSConduit conduit = setupJMSConduitWithObserver(ei);
JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
jmsConfig.setConnectionFactory(wrapper.apply(cf));
jmsConfig.setRetryInterval(1000);
jmsConfig.setConcurrentConsumers(10);
JMSDestination destination = new JMSDestination(bus, ei, jmsConfig);
destination.setMessageObserver(createMessageObserver());
final Message outMessage = createMessage();
Thread.sleep(4000L);
sendOneWayMessage(conduit, outMessage);
// wait for the message to be got from the destination,
// create the thread to handler the Destination incoming message
Message inMessage = waitForReceiveDestMessage();
verifyReceivedMessage(inMessage);
conduit.close();
destination.shutdown();
assertEquals("Only two createConnection() calls allowed because restartConnection() should be "
+ "called only once.", 2, faultyConnectionFactory.connectionsCreated.get());
}
private String getQueueName(String exName) {
if (exName == null) {
return null;
}
return (exName.indexOf('/') != -1 && exName.indexOf('/') < exName.length())
? exName.substring(exName.indexOf('/') + 1) : exName;
}
protected void verifyReplyToNotSet(Message cxfMsg) {
jakarta.jms.Message jmsMsg =
jakarta.jms.Message.class.cast(cxfMsg.get(JMSConstants.JMS_REQUEST_MESSAGE));
assertNotNull("JMS Messsage must be null", jmsMsg);
}
private String getDestinationName(Destination dest) throws JMSException {
if (dest instanceof Queue) {
return ((Queue)dest).getQueueName();
}
return ((Topic)dest).getTopicName();
}
protected void verifyReplyToSet(Message cxfMsg,
Class<? extends Destination> type,
String expectedName) throws Exception {
jakarta.jms.Message jmsMsg =
jakarta.jms.Message.class.cast(cxfMsg.get(JMSConstants.JMS_REQUEST_MESSAGE));
assertNotNull("JMS Messsage must not be null", jmsMsg);
assertNotNull("JMS Messsage's replyTo must not be null", jmsMsg.getJMSReplyTo());
assertTrue("JMS Messsage's replyTo type must be of type " + type.getName(),
type.isAssignableFrom(jmsMsg.getJMSReplyTo().getClass()));
String receivedName = getDestinationName(jmsMsg.getJMSReplyTo());
assertTrue("JMS Messsage's replyTo must be named " + expectedName + " but was " + receivedName,
receivedName.equals(expectedName));
}
}