RMManagerTest.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.cxf.ws.rm;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.TimerTask;

import javax.xml.namespace.QName;

import org.apache.cxf.Bus;
import org.apache.cxf.binding.Binding;
import org.apache.cxf.binding.soap.SoapBinding;
import org.apache.cxf.bus.spring.SpringBusFactory;
import org.apache.cxf.endpoint.Client;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.io.CachedOutputStream;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.service.Service;
import org.apache.cxf.service.model.BindingInfo;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.service.model.InterfaceInfo;
import org.apache.cxf.service.model.ServiceInfo;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.ws.addressing.AddressingProperties;
import org.apache.cxf.ws.addressing.AttributedURIType;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.ws.addressing.JAXWSAConstants;
import org.apache.cxf.ws.addressing.RelatesToType;
import org.apache.cxf.ws.rm.manager.DestinationPolicyType;
import org.apache.cxf.ws.rm.manager.SequenceTerminationPolicyType;
import org.apache.cxf.ws.rm.manager.SourcePolicyType;
import org.apache.cxf.ws.rm.persistence.RMMessage;
import org.apache.cxf.ws.rm.persistence.RMStore;
import org.apache.cxf.ws.rm.v200702.CreateSequenceResponseType;
import org.apache.cxf.ws.rm.v200702.Identifier;

import org.junit.Test;
import org.mockito.ArgumentCaptor;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class RMManagerTest {

    private static final String MULTIPART_TYPE = "multipart/related; type=\"text/xml\";"
        + " boundary=\"uuid:74b6a245-2e17-40eb-a86c-308664e18460\"; start=\"<root."
        + "message@cxf.apache.org>\"; start-info=\"application/soap+xml\"";
    private RMManager manager;


    @Test
    public void testAccessors() {
        manager = new RMManager();
        assertNull(manager.getBus());
        assertNull(manager.getStore());
        assertNull(manager.getRetransmissionQueue());
        assertNotNull(manager.getTimer());

        Bus bus = mock(Bus.class);
        RMStore store = mock(RMStore.class);
        RetransmissionQueue queue = mock(RetransmissionQueue.class);

        manager.setBus(bus);
        manager.setStore(store);
        manager.setRetransmissionQueue(queue);
        assertSame(bus, manager.getBus());
        assertSame(store, manager.getStore());
        assertSame(queue, manager.getRetransmissionQueue());
    }

    @Test
    public void testInitialisation() {
        manager = new RMManager();
        assertNull("sourcePolicy is set.", manager.getSourcePolicy());
        assertNull("destinationPolicy is set.", manager.getDestinationPolicy());

        manager.initialise();

        RMConfiguration cfg = manager.getConfiguration();
        assertNotNull("RMConfiguration is not set.", cfg);
        assertNotNull("sourcePolicy is not set.", manager.getSourcePolicy());
        assertNotNull("destinationPolicy is not set.", manager.getDestinationPolicy());
        assertNotNull("deliveryAssirance is not set.", cfg.getDeliveryAssurance());

        assertTrue(cfg.isExponentialBackoff());
        assertEquals(3000L, cfg.getBaseRetransmissionInterval().longValue());
        assertNull(cfg.getAcknowledgementInterval());
        assertNull(cfg.getInactivityTimeout());

        SourcePolicyType sp = manager.getSourcePolicy();
        assertEquals(0L, sp.getSequenceExpiration().getTimeInMillis(new Date()));
        assertEquals(0L, sp.getOfferedSequenceExpiration().getTimeInMillis(new Date()));
        assertNull(sp.getAcksTo());
        assertTrue(sp.isIncludeOffer());
        SequenceTerminationPolicyType stp = sp.getSequenceTerminationPolicy();
        assertEquals(0, stp.getMaxRanges());
        assertEquals(0, stp.getMaxUnacknowledged());
        assertTrue(stp.isTerminateOnShutdown());
        assertEquals(0, stp.getMaxLength());

        DestinationPolicyType dp = manager.getDestinationPolicy();
        assertNotNull(dp.getAcksPolicy());
        assertEquals(dp.getAcksPolicy().getIntraMessageThreshold(), 10);
    }

    @Test
    public void testCustom() {
        Bus bus = new SpringBusFactory().createBus("org/apache/cxf/ws/rm/custom-rmmanager.xml", false);
        manager = bus.getExtension(RMManager.class);
        assertNotNull("sourcePolicy is not set.", manager.getSourcePolicy());
        assertNotNull("destinationPolicy is not set.", manager.getDestinationPolicy());

        manager.initialise();

        RMConfiguration cfg = manager.getConfiguration();
        assertNotNull("RMConfiguration is not set.", cfg);
        assertNotNull("deliveryAssurance is not set.", cfg.getDeliveryAssurance());

        assertFalse(cfg.isExponentialBackoff());
        assertEquals(10000L, cfg.getBaseRetransmissionInterval().longValue());
        assertEquals(10000L, cfg.getAcknowledgementIntervalTime());
        assertNull(cfg.getInactivityTimeout());

        SourcePolicyType sp = manager.getSourcePolicy();
        assertEquals(0L, sp.getSequenceExpiration().getTimeInMillis(new Date()));
        assertEquals(0L, sp.getOfferedSequenceExpiration().getTimeInMillis(new Date()));
        assertNull(sp.getAcksTo());
        assertTrue(sp.isIncludeOffer());
        SequenceTerminationPolicyType stp = sp.getSequenceTerminationPolicy();
        assertEquals(0, stp.getMaxRanges());
        assertEquals(0, stp.getMaxUnacknowledged());
        assertFalse(stp.isTerminateOnShutdown());
        assertEquals(0, stp.getMaxLength());

        DestinationPolicyType dp = manager.getDestinationPolicy();
        assertNotNull(dp.getAcksPolicy());
        assertEquals(dp.getAcksPolicy().getIntraMessageThreshold(), 0);
    }

    @Test
    public void testStartServer() throws NoSuchMethodException {
        manager = spy(new RMManager());
        Server s = mock(Server.class);
        Endpoint e = mock(Endpoint.class);
        when(s.getEndpoint()).thenReturn(e);

        manager.startServer(s);
        verify(manager, times(1)).recoverReliableEndpoint(e, (Conduit)null);
    }

    @Test
    public void testClientCreated() throws NoSuchMethodException {
        manager = spy(new RMManager());
        Client client = mock(Client.class);

        //none of this is called if no store

        //Endpoint endpoint = mock(Endpoint.class);
        //when(client.getEndpoint()).thenReturn(endpoint);
        //Conduit conduit = mock(Conduit.class);
        //when(client.getConduit()).thenReturn(conduit);
        //manager.recoverReliableEndpoint(endpoint, conduit);
        //whenLastCall();
        manager.clientCreated(client);
    }

    @Test
    public void testGetBindingFaultFactory() {
        SoapBinding binding = mock(SoapBinding.class);
        assertNotNull(new RMManager().getBindingFaultFactory(binding));
    }

    @Test
    public void testGetReliableEndpointServerSideCreate() throws NoSuchMethodException, RMException {
        manager = spy(new RMManager());
        manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
        Message message = mock(Message.class);
        Exchange exchange = mock(Exchange.class);
        when(message.getExchange()).thenReturn(exchange);
        WrappedEndpoint wre = mock(WrappedEndpoint.class);
        when(exchange.getEndpoint()).thenReturn(wre);
        EndpointInfo ei = mock(EndpointInfo.class);
        when(wre.getEndpointInfo()).thenReturn(ei);
        QName name = RM10Constants.PORT_NAME;
        when(ei.getName()).thenReturn(name);
        Endpoint e = mock(Endpoint.class);
        when(wre.getWrappedEndpoint()).thenReturn(e);
        RMEndpoint rme = mock(RMEndpoint.class);
        doReturn(rme).when(manager).createReliableEndpoint(e);
        org.apache.cxf.transport.Destination destination = mock(org.apache.cxf.transport.Destination.class);
        when(exchange.getDestination()).thenReturn(destination);
        AddressingProperties maps = mock(AddressingProperties.class);
        when(message.get(Message.REQUESTOR_ROLE)).thenReturn(null);
        when(message.get(JAXWSAConstants.ADDRESSING_PROPERTIES_INBOUND))
            .thenReturn(maps);
        EndpointReferenceType replyTo = RMUtils.createAnonymousReference();
        when(maps.getReplyTo()).thenReturn(replyTo);
        when(exchange.getConduit(message)).thenReturn(null);

        assertSame(rme, manager.getReliableEndpoint(message));
        verify(rme, times(1)).initialise(manager.getConfiguration(), null, replyTo, null, message);

        when(message.getExchange()).thenReturn(exchange);
        when(exchange.getEndpoint()).thenReturn(wre);
        when(wre.getEndpointInfo()).thenReturn(ei);
        when(ei.getName()).thenReturn(name);
        when(wre.getWrappedEndpoint()).thenReturn(e);

        assertSame(rme, manager.getReliableEndpoint(message));
    }

    @Test
    public void testGetReliableEndpointClientSideCreate() throws NoSuchMethodException, RMException {
        manager = spy(new RMManager());
        manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
        Message message = mock(Message.class);
        Exchange exchange = mock(Exchange.class);
        when(message.getExchange()).thenReturn(exchange);
        Endpoint endpoint = mock(Endpoint.class);
        when(exchange.getEndpoint()).thenReturn(endpoint);
        EndpointInfo ei = mock(EndpointInfo.class);
        when(endpoint.getEndpointInfo()).thenReturn(ei);
        QName name = new QName("http://x.y.z/a", "GreeterPort");
        when(ei.getName()).thenReturn(name);
        RMEndpoint rme = mock(RMEndpoint.class);
        when(manager.createReliableEndpoint(endpoint))
            .thenReturn(rme);
        when(exchange.getDestination()).thenReturn(null);
        Conduit conduit = mock(Conduit.class);
        when(exchange.getConduit(message)).thenReturn(conduit);

        assertSame(rme, manager.getReliableEndpoint(message));
        verify(rme, times(1)).initialise(manager.getConfiguration(), conduit, null, null, message);

        when(message.getExchange()).thenReturn(exchange);
        when(exchange.getEndpoint()).thenReturn(endpoint);
        when(endpoint.getEndpointInfo()).thenReturn(ei);
        when(ei.getName()).thenReturn(name);

        assertSame(rme, manager.getReliableEndpoint(message));
    }

    @Test
    public void testGetReliableEndpointExisting() throws NoSuchMethodException, RMException {
        manager = spy(new RMManager());
        manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
        Message message = mock(Message.class);
        Exchange exchange = mock(Exchange.class);
        when(message.getExchange()).thenReturn(exchange);

        RMConfiguration config = new RMConfiguration();
        config.setRMNamespace(RM10Constants.NAMESPACE_URI);
        config.setRM10AddressingNamespace(RM10Constants.NAMESPACE_URI);
        when(manager.getEffectiveConfiguration(message)).thenReturn(config);
        Endpoint endpoint = mock(Endpoint.class);
        when(exchange.getEndpoint()).thenReturn(endpoint);
        EndpointInfo ei = mock(EndpointInfo.class);
        when(endpoint.getEndpointInfo()).thenReturn(ei);
        QName name = new QName("http://x.y.z/a", "GreeterPort");
        when(ei.getName()).thenReturn(name);
        RMEndpoint rme = mock(RMEndpoint.class);
        manager.getReliableEndpointsMap().put(endpoint, rme);

        assertSame(rme, manager.getReliableEndpoint(message));
    }

    @Test
    public void testGetDestination() throws NoSuchMethodException, RMException {
        manager = spy(new RMManager());
        Message message = mock(Message.class);
        RMEndpoint rme = mock(RMEndpoint.class);
        doReturn(rme).when(manager).getReliableEndpoint(message);
        Destination destination = mock(Destination.class);
        when(rme.getDestination()).thenReturn(destination);

        assertSame(destination, manager.getDestination(message));

        when(manager.getReliableEndpoint(message)).thenReturn(null);
        assertNull(manager.getDestination(message));
    }

    @Test
    public void testGetSource() throws NoSuchMethodException, RMException {
        manager = spy(new RMManager());
        Message message = mock(Message.class);
        RMEndpoint rme = mock(RMEndpoint.class);
        doReturn(rme).when(manager).getReliableEndpoint(message);
        Source source = mock(Source.class);
        when(rme.getSource()).thenReturn(source);

        assertSame(source, manager.getSource(message));

        when(manager.getReliableEndpoint(message)).thenReturn(null);
        assertNull(manager.getSource(message));
    }

    @Test
    public void testGetExistingSequence() throws NoSuchMethodException, SequenceFault, RMException {
        manager = spy(new RMManager());
        Message message = mock(Message.class);
        Identifier inSid = mock(Identifier.class);

        Source source = mock(Source.class);
        doReturn(source).when(manager).getSource(message);
        SourceSequence sseq = mock(SourceSequence.class);
        when(source.getCurrent(inSid)).thenReturn(sseq);
        assertSame(sseq, manager.getSequence(inSid, message, null));
    }

    @Test
    public void testGetNewSequence() throws NoSuchMethodException, SequenceFault, RMException {
        manager = spy(new RMManager());
        Message message = mock(Message.class);
        Exchange exchange = mock(Exchange.class);
        when(message.getContextualPropertyKeys()).thenReturn(new HashSet<>());
        when(message.getExchange()).thenReturn(exchange);
        when(exchange.getOutMessage()).thenReturn(message);
        when(exchange.getInMessage()).thenReturn(null);
        when(exchange.getOutFaultMessage()).thenReturn(null);
        Conduit conduit = mock(Conduit.class);
        when(exchange.getConduit(message)).thenReturn(conduit);
        Identifier inSid = mock(Identifier.class);
        AddressingProperties maps = mock(AddressingProperties.class);
        Source source = mock(Source.class);
        doReturn(source).when(manager).getSource(message);
        when(source.getCurrent(inSid)).thenReturn(null);
        AttributedURIType uri = mock(AttributedURIType.class);
        when(maps.getTo()).thenReturn(uri);
        when(uri.getValue()).thenReturn("http://localhost:9001/TestPort");
        EndpointReferenceType epr = RMUtils.createNoneReference();
        when(maps.getReplyTo()).thenReturn(epr);
        RMEndpoint rme = mock(RMEndpoint.class);
        when(source.getReliableEndpoint()).thenReturn(rme);
        Proxy proxy = mock(Proxy.class);
        when(rme.getProxy()).thenReturn(proxy);
        CreateSequenceResponseType createResponse = mock(CreateSequenceResponseType.class);
        when(proxy.createSequence(isA(EndpointReferenceType.class),
                             isNull(RelatesToType.class),
                             eq(false),
                             isA(ProtocolVariation.class),
                             isA(Exchange.class),
                             CastUtils.cast(isA(Map.class), String.class, Object.class)))
            .thenReturn(createResponse);
        Servant servant = mock(Servant.class);
        when(rme.getServant()).thenReturn(servant);
        SourceSequence sseq = mock(SourceSequence.class);
        when(source.awaitCurrent(inSid)).thenReturn(sseq);

        assertSame(sseq, manager.getSequence(inSid, message, maps));
        verify(source, times(2)).getReliableEndpoint();
        verify(servant, times(1)).createSequenceResponse(createResponse, ProtocolVariation.RM10WSA200408);
        verify(sseq, times(1)).setTarget(isA(EndpointReferenceType.class));
    }

    @Test
    public void testShutdown() {
        Bus bus = new SpringBusFactory().createBus("org/apache/cxf/ws/rm/rmmanager.xml", false);
        manager = bus.getExtension(RMManager.class);
        Endpoint e = mock(Endpoint.class);
        RMEndpoint rme = mock(RMEndpoint.class);
        manager.getReliableEndpointsMap().put(e, rme);
        manager.getTimer(); //start the timer
        assertNotNull(manager);
        class TestTask extends TimerTask {
            public void run() {
            }
        }

        bus.shutdown(true);
        try {
            manager.getTimer().schedule(new TestTask(), 5000);
            fail("Timer has not been cancelled.");
        } catch (IllegalStateException ex) {
            // expected
        }
        verify(rme, times(1)).shutdown();
    }

    @Test
    public void testShutdownReliableEndpoint() {
        manager = new RMManager();
        Endpoint e = mock(Endpoint.class);
        RMEndpoint rme = mock(RMEndpoint.class);
        manager.shutdownReliableEndpoint(e);

        manager.getReliableEndpointsMap().put(e, rme);
        manager.shutdownReliableEndpoint(e);

        assertNull(manager.getReliableEndpointsMap().get(e));
        verify(rme, times(1)).shutdown();
    }

    @Test
    public void testRecoverReliableEndpoint() {
        manager = new RMManager();
        Endpoint endpoint = mock(Endpoint.class);
        Conduit conduit = mock(Conduit.class);

        manager.recoverReliableEndpoint(endpoint, conduit);

        RMStore store = mock(RMStore.class);
        manager.setStore(store);

        manager.recoverReliableEndpoint(endpoint, conduit);
    }

    @Test
    public void testRecoverReliableClientEndpoint() throws NoSuchMethodException, IOException {
        manager = spy(new RMManager());
        manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
        Endpoint endpoint = mock(Endpoint.class);
        EndpointInfo ei = mock(EndpointInfo.class);
        ServiceInfo si = mock(ServiceInfo.class);
        BindingInfo bi = mock(BindingInfo.class);
        InterfaceInfo ii = mock(InterfaceInfo.class);
        setUpEndpointForRecovery(endpoint, ei, si, bi, ii);
        Conduit conduit = mock(Conduit.class);
        var verifications = setUpRecoverReliableEndpoint(endpoint, conduit, null, null, null);
        manager.recoverReliableEndpoint(endpoint, conduit);
        verifications.run();

        setUpEndpointForRecovery(endpoint, ei, si, bi, ii);
        SourceSequence ss = mock(SourceSequence.class);
        DestinationSequence ds = mock(DestinationSequence.class);
        verifications = setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, null);
        manager.recoverReliableEndpoint(endpoint, conduit);
        verifications.run();

        reset(ss);
        setUpEndpointForRecovery(endpoint, ei, si, bi, ii);
        RMMessage m = mock(RMMessage.class);
        verifications = setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, m);
        manager.recoverReliableEndpoint(endpoint, conduit);
        verifications.run();
    }

    @Test
    public void testRecoverReliableClientEndpointWithAttachment() throws NoSuchMethodException, IOException {
        manager = spy(new RMManager());
        manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
        Endpoint endpoint = mock(Endpoint.class);
        EndpointInfo ei = mock(EndpointInfo.class);
        ServiceInfo si = mock(ServiceInfo.class);
        BindingInfo bi = mock(BindingInfo.class);
        InterfaceInfo ii = mock(InterfaceInfo.class);
        setUpEndpointForRecovery(endpoint, ei, si, bi, ii);
        Conduit conduit = mock(Conduit.class);
        SourceSequence ss = mock(SourceSequence.class);
        DestinationSequence ds = mock(DestinationSequence.class);
        RMMessage m1 = new RMMessage();
        InputStream fis = getClass().getResourceAsStream("persistence/SerializedRMMessage.txt");
        CachedOutputStream cos = new CachedOutputStream();
        IOUtils.copyAndCloseInput(fis, cos);
        cos.flush();
        m1.setContent(cos);
        m1.setTo("toAddress");
        m1.setMessageNumber(Long.valueOf(10));
        m1.setContentType(MULTIPART_TYPE);
        ArgumentCaptor<Message> mc = ArgumentCaptor.forClass(Message.class);

        var verification = setUpRecoverReliableEndpointWithAttachment(endpoint, conduit, ss, ds, m1, mc);
        manager.recoverReliableEndpoint(endpoint, conduit);
        verification.run();

        Message msg = mc.getValue();
        assertNotNull(msg);
        assertNotNull(msg.getExchange());
        assertSame(msg, msg.getExchange().getOutMessage());

        CachedOutputStream cos1 = (CachedOutputStream) msg.get(RMMessageConstants.SAVED_CONTENT);
        assertStartsWith(cos1.getInputStream(), "<soap:Envelope");
        assertEquals(1, msg.getAttachments().size());
    }

    Runnable setUpRecoverReliableEndpointWithAttachment(Endpoint endpoint,
                                      Conduit conduit,
                                      SourceSequence ss,
                                      DestinationSequence ds, RMMessage m,
                                      ArgumentCaptor<Message> mc) throws IOException {
        RMStore store = mock(RMStore.class);
        RetransmissionQueue oqueue = mock(RetransmissionQueue.class);
        RedeliveryQueue iqueue = mock(RedeliveryQueue.class);
        manager.setStore(store);
        manager.setRetransmissionQueue(oqueue);
        manager.setRedeliveryQueue(iqueue);

        Collection<SourceSequence> sss = new ArrayList<>();
        if (null != ss) {
            sss.add(ss);
        }
        when(store.getSourceSequences("{S}s.{P}p@cxf"))
            .thenReturn(sss);
        if (null == ss) {
            return () -> { };
        }

        Collection<DestinationSequence> dss = new ArrayList<>();
        if (null != ds) {
            dss.add(ds);
        }
        when(store.getDestinationSequences("{S}s.{P}p@cxf"))
            .thenReturn(dss);
        if (null == ds) {
            return () -> { };
        }

        Collection<RMMessage> ms = new ArrayList<>();
        if (null != m) {
            ms.add(m);
        }
        Identifier id = new Identifier();
        id.setValue("S1");
        when(ss.getIdentifier()).thenReturn(id);
        when(ss.getProtocol()).thenReturn(ProtocolVariation.RM10WSA200408);
        when(store.getMessages(id, true)).thenReturn(ms);


        RMEndpoint rme = mock(RMEndpoint.class);
        when(manager.createReliableEndpoint(endpoint))
            .thenReturn(rme);
        Source source = mock(Source.class);
        when(rme.getSource()).thenReturn(source);

        Destination destination = mock(Destination.class);
        when(rme.getDestination()).thenReturn(destination);

        Service service = mock(Service.class);
        when(endpoint.getService()).thenReturn(service);
        Binding binding = mock(Binding.class);
        when(endpoint.getBinding()).thenReturn(binding);

        when(ss.isLastMessage()).thenReturn(true);
        when(ss.getCurrentMessageNr()).thenReturn(Long.valueOf(10));
        if (null == m) {
            return () -> { };
        }

        return () -> {
            verify(ss, times(null == m ? 1 : 3)).getIdentifier();
            verify(destination, times(1)).addSequence(ds, false);
            verify(oqueue, times(1)).addUnacknowledged(mc.capture());
            verify(oqueue, times(1)).start();
            verify(iqueue, times(1)).start();
        };
    }



    Endpoint setUpEndpointForRecovery(Endpoint endpoint,
                                      EndpointInfo ei,
                                    ServiceInfo si,
                                    BindingInfo bi,
                                    InterfaceInfo ii) {
        when(endpoint.getEndpointInfo()).thenReturn(ei);
        when(ei.getService()).thenReturn(si);
        when(si.getName()).thenReturn(new QName("S", "s"));
        when(ei.getName()).thenReturn(new QName("P", "p"));
        when(si.getInterface()).thenReturn(ii);
        when(ei.getBinding()).thenReturn(bi);
        return endpoint;
    }

    Runnable setUpRecoverReliableEndpoint(Endpoint endpoint,
                                      Conduit conduit,
                                      SourceSequence ss,
                                      DestinationSequence ds, RMMessage m)
                                          throws IOException  {
        RMStore store = mock(RMStore.class);
        RetransmissionQueue oqueue = mock(RetransmissionQueue.class);
        RedeliveryQueue iqueue = mock(RedeliveryQueue.class);
        manager.setStore(store);
        manager.setRetransmissionQueue(oqueue);
        manager.setRedeliveryQueue(iqueue);

        Collection<SourceSequence> sss = new ArrayList<>();
        if (null != ss) {
            sss.add(ss);
        }
        when(store.getSourceSequences("{S}s.{P}p@cxf"))
            .thenReturn(sss);
        if (null == ss) {
            return () -> { };
        }

        Collection<DestinationSequence> dss = new ArrayList<>();
        if (null != ds) {
            dss.add(ds);
        }
        when(store.getDestinationSequences("{S}s.{P}p@cxf"))
            .thenReturn(dss);
        if (null == ds) {
            return () -> { };
        }
        Collection<RMMessage> ms = new ArrayList<>();
        if (null != m) {
            ms.add(m);
        }
        Identifier id = new Identifier();
        id.setValue("S1");
        when(ss.getIdentifier()).thenReturn(id);
        when(ss.getProtocol()).thenReturn(ProtocolVariation.RM10WSA200408);
        when(store.getMessages(id, true)).thenReturn(ms);


        RMEndpoint rme = mock(RMEndpoint.class);
        when(manager.createReliableEndpoint(endpoint))
            .thenReturn(rme);
        Source source = mock(Source.class);
        when(rme.getSource()).thenReturn(source);

        Destination destination = mock(Destination.class);
        when(rme.getDestination()).thenReturn(destination);

        Service service = mock(Service.class);
        when(endpoint.getService()).thenReturn(service);
        Binding binding = mock(Binding.class);
        when(endpoint.getBinding()).thenReturn(binding);

        when(ss.isLastMessage()).thenReturn(true);
        when(ss.getCurrentMessageNr()).thenReturn(Long.valueOf(10));
        if (null == m) {
            return () -> { };
        }
        when(m.getMessageNumber()).thenReturn(Long.valueOf(10));
        if (null == conduit) {
            when(m.getTo()).thenReturn("toAddress");
        }
        InputStream is = new ByteArrayInputStream(new byte[0]);
        CachedOutputStream cos = new CachedOutputStream();
        IOUtils.copy(is, cos);
        cos.flush();
        is.close();
        when(m.getContent()).thenReturn(cos);

        return () -> {
            verify(m, times(2)).getMessageNumber();
            verify(ss, times(null == m ? 1 : 3)).getIdentifier();
            verify(destination, times(1)).addSequence(ds, false);
            verify(oqueue, times(1)).addUnacknowledged(isA(Message.class));
            verify(oqueue, times(1)).start();
            verify(iqueue, times(1)).start();
        };
    }

    @Test
    public void testDefaultSequenceIdentifierGenerator() {
        manager = new RMManager();
        assertNull(manager.getIdGenerator());
        SequenceIdentifierGenerator generator = manager.new DefaultSequenceIdentifierGenerator();
        manager.setIdGenerator(generator);
        assertSame(generator, manager.getIdGenerator());
        Identifier id1 = generator.generateSequenceIdentifier();
        assertNotNull(id1);
        assertNotNull(id1.getValue());
        Identifier id2 = generator.generateSequenceIdentifier();
        assertTrue(id1 != id2);
        assertNotEquals(id1.getValue(), id2.getValue());
    }

    // just read the begining of the input and compare it against the specified string
    private static boolean assertStartsWith(InputStream in, String starting) {
        assertNotNull(in);
        byte[] buf = new byte[starting.length()];
        try {
            in.read(buf, 0, buf.length);
            assertEquals(starting, new String(buf, StandardCharsets.UTF_8));
            in.close();
            return true;
        } catch (IOException e) {
            // ignore
        }
        return false;
    }
}