DestinationSequenceTest.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.cxf.ws.rm;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Timer;

import org.apache.cxf.Bus;
import org.apache.cxf.endpoint.Endpoint;
import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.interceptor.InterceptorProvider;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.service.model.BindingInfo;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.service.model.InterfaceInfo;
import org.apache.cxf.service.model.OperationInfo;
import org.apache.cxf.service.model.ServiceInfo;
import org.apache.cxf.ws.addressing.AttributedURIType;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.ws.addressing.Names;
import org.apache.cxf.ws.addressing.WSAddressingFeature;
import org.apache.cxf.ws.addressing.WSAddressingFeature.WSAddressingFeatureApplier;
import org.apache.cxf.ws.rm.RMConfiguration.DeliveryAssurance;
import org.apache.cxf.ws.rm.manager.AcksPolicyType;
import org.apache.cxf.ws.rm.manager.DestinationPolicyType;
import org.apache.cxf.ws.rm.persistence.RMStore;
import org.apache.cxf.ws.rm.v200702.Identifier;
import org.apache.cxf.ws.rm.v200702.ObjectFactory;
import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;
import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement.AcknowledgementRange;
import org.apache.cxf.ws.rm.v200702.SequenceType;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class DestinationSequenceTest {

    private ObjectFactory factory;
    private Identifier id;
    private EndpointReferenceType ref;
    private Destination destination;
    private RMManager manager;
    private RMEndpoint endpoint;
    private RMConfiguration config;
    private AcksPolicyType ap;
    private DestinationPolicyType dp;

    @Before
    public void setUp() {
        factory = new ObjectFactory();
        ref = mock(EndpointReferenceType.class);
        id = factory.createIdentifier();
        id.setValue("seq");
    }

    @After
    public void tearDown() {
        ref = null;
        destination = null;
        manager = null;
        config = null;
        dp = null;
        ap = null;

    }

    @Test
    public void testConstructors() {

        Identifier otherId = factory.createIdentifier();
        otherId.setValue("otherSeq");

        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        assertEquals(id, seq.getIdentifier());
        assertEquals(0L, seq.getLastMessageNumber());
        assertSame(ref, seq.getAcksTo());
        assertNotNull(seq.getAcknowledgment());
        assertNotNull(seq.getMonitor());

        SequenceAcknowledgement ack = new SequenceAcknowledgement();
        seq = new DestinationSequence(id, ref, 10L, ack, ProtocolVariation.RM10WSA200408);
        assertEquals(id, seq.getIdentifier());
        assertEquals(10L, seq.getLastMessageNumber());
        assertSame(ref, seq.getAcksTo());
        assertSame(ack, seq.getAcknowledgment());
        assertNotNull(seq.getMonitor());

    }

    @Test
    public void testEqualsAndHashCode() {

        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        DestinationSequence otherSeq = null;
        assertNotEquals(seq, otherSeq);
        otherSeq = new DestinationSequence(id, ref, destination, ProtocolVariation.RM10WSA200408);
        assertEquals(seq, otherSeq);
        assertEquals(seq.hashCode(), otherSeq.hashCode());
        Identifier otherId = factory.createIdentifier();
        otherId.setValue("otherSeq");
        otherSeq = new DestinationSequence(otherId, ref, destination, ProtocolVariation.RM10WSA200408);
        assertNotEquals(seq, otherSeq);
        assertNotEquals(seq.hashCode(), otherSeq.hashCode());
    }

    @Test
    public void testGetSetDestination() {
        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        seq.setDestination(destination);
        assertSame(destination, seq.getDestination());
    }

    @Test
    public void testGetEndpointIdentifier() {
        setUpDestination();
        String name = "abc";
        when(destination.getName()).thenReturn(name);

        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        assertEquals("Unexpected endpoint identifier", name, seq.getEndpointIdentifier());
    }

    @Test
    public void testAcknowledgeBasic() throws SequenceFault {
        Timer timer = mock(Timer.class);
        setUpDestination(timer, null);
        Message message1 = setUpMessage("1");
        Message message2 = setUpMessage("2");

        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        List<AcknowledgementRange> ranges = seq.getAcknowledgment().getAcknowledgementRange();
        assertEquals(0, ranges.size());

        seq.acknowledge(message1);
        assertEquals(1, ranges.size());
        AcknowledgementRange r1 = ranges.get(0);
        assertEquals(1, r1.getLower().intValue());
        assertEquals(1, r1.getUpper().intValue());

        seq.acknowledge(message2);
        assertEquals(1, ranges.size());
        r1 = ranges.get(0);
        assertEquals(1, r1.getLower().intValue());
        assertEquals(2, r1.getUpper().intValue());
    }

/*    @Test
    public void testAcknowledgeLastMessageNumberExceeded() throws SequenceFault {
        Timer timer = mock(Timer.class);
        RMEndpoint rme = mock(RMEndpoint.class);
        when(rme.getEncoderDecoder()).thenReturn(EncoderDecoder10Impl.INSTANCE);
        setUpDestination(timer, rme);
        Message message1 = setUpMessage("1");
        Message message2 = setUpMessage("2", true);

        DestinationSequence seq = new DestinationSequence(id, ref, destination);

        seq.acknowledge(message1);
        seq.setLastMessageNumber(1);
        try {
            seq.acknowledge(message2);
            fail("Expected SequenceFault not thrown.");
        } catch (SequenceFault sf) {
            assertEquals("SequenceTerminated", sf.getSequenceFault().getFaultCode().getLocalPart());
        }
    }   */

    @Test
    public void testAcknowledgeAppendRange() throws SequenceFault {
        Timer timer = mock(Timer.class);
        setUpDestination(timer, null);
        Message[] messages = new Message [] {
            setUpMessage("1"),
            setUpMessage("2"),
            setUpMessage("5"),
            setUpMessage("4"),
            setUpMessage("6")
        };

        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        List<AcknowledgementRange> ranges = seq.getAcknowledgment().getAcknowledgementRange();
        for (int i = 0; i < messages.length; i++) {
            seq.acknowledge(messages[i]);
        }
        assertEquals(2, ranges.size());
        AcknowledgementRange r = ranges.get(0);
        assertEquals(1, r.getLower().intValue());
        assertEquals(2, r.getUpper().intValue());
        r = ranges.get(1);
        assertEquals(4, r.getLower().intValue());
        assertEquals(6, r.getUpper().intValue());
    }

    @Test
    public void testAcknowledgeInsertRange() throws SequenceFault {
        Timer timer = mock(Timer.class);
        setUpDestination(timer, null);

        Bus bus = mock(Bus.class);
        when(manager.getBus()).thenReturn(bus);

        Message[] messages = new Message [] {
            setUpMessage("1"),
            setUpMessage("2"),
            setUpMessage("9"),
            setUpMessage("10"),
            setUpMessage("4"),
            setUpMessage("9"),
            setUpMessage("2")
        };

        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        List<AcknowledgementRange> ranges = seq.getAcknowledgment().getAcknowledgementRange();
        for (int i = 0; i < messages.length; i++) {
            seq.acknowledge(messages[i]);
        }

        assertEquals(3, ranges.size());
        AcknowledgementRange r = ranges.get(0);
        assertEquals(1, r.getLower().intValue());
        assertEquals(2, r.getUpper().intValue());
        r = ranges.get(1);
        assertEquals(4, r.getLower().intValue());
        assertEquals(4, r.getUpper().intValue());
        r = ranges.get(2);
        assertEquals(9, r.getLower().intValue());
        assertEquals(10, r.getUpper().intValue());
    }

    @Test
    public void testAcknowledgePrependRange() throws SequenceFault {
        Timer timer = mock(Timer.class);
        setUpDestination(timer, null);
        Message[] messages = new Message [] {
            setUpMessage("4"),
            setUpMessage("5"),
            setUpMessage("6"),
            setUpMessage("4"),
            setUpMessage("2"),
            setUpMessage("2")
        };

        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        List<AcknowledgementRange> ranges = seq.getAcknowledgment().getAcknowledgementRange();
        for (int i = 0; i < messages.length; i++) {
            seq.acknowledge(messages[i]);
        }
        assertEquals(2, ranges.size());
        AcknowledgementRange r = ranges.get(0);
        assertEquals(2, r.getLower().intValue());
        assertEquals(2, r.getUpper().intValue());
        r = ranges.get(1);
        assertEquals(4, r.getLower().intValue());
        assertEquals(6, r.getUpper().intValue());
    }

    @Test
    public void testMerge() {
        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        List<AcknowledgementRange> ranges = seq.getAcknowledgment().getAcknowledgementRange();
        AcknowledgementRange r;
        for (int i = 0; i < 5; i++) {
            r = new AcknowledgementRange();
            r.setLower(Long.valueOf(3 * i + 1));
            r.setUpper(Long.valueOf(3 * i + 3));
            ranges.add(r);
        }
        seq.mergeRanges();
        assertEquals(1, ranges.size());
        r = ranges.get(0);
        assertEquals(Long.valueOf(1), r.getLower());
        assertEquals(Long.valueOf(15), r.getUpper());
        ranges.clear();
        for (int i = 0; i < 5; i++) {
            r = new AcknowledgementRange();
            r.setLower(Long.valueOf(3 * i + 1));
            r.setUpper(Long.valueOf(3 * i + 2));
            ranges.add(r);
        }
        seq.mergeRanges();
        assertEquals(5, ranges.size());
        ranges.clear();
        for (int i = 0; i < 5; i++) {
            if (i != 2) {
                r = new AcknowledgementRange();
                r.setLower(Long.valueOf(3 * i + 1));
                r.setUpper(Long.valueOf(3 * i + 3));
                ranges.add(r);
            }
        }
        seq.mergeRanges();
        assertEquals(2, ranges.size());
        r = ranges.get(0);
        assertEquals(Long.valueOf(1), r.getLower());
        assertEquals(Long.valueOf(6), r.getUpper());
        r = ranges.get(1);
        assertEquals(Long.valueOf(10), r.getLower());
        assertEquals(Long.valueOf(15), r.getUpper());
    }

    @Test
    public void testMonitor() throws SequenceFault, InterruptedException {
        Timer timer = mock(Timer.class);
        setUpDestination(timer, null);
        Message[] messages = new Message[15];
        for (int i = 0; i < messages.length; i++) {
            messages[i] = setUpMessage(Integer.toString(i + 1));
        }

        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        SequenceMonitor monitor = seq.getMonitor();
        assertNotNull(monitor);
        monitor.setMonitorInterval(500L);

        assertEquals(0, monitor.getMPM());

        for (int i = 0; i < 10; i++) {
            seq.acknowledge(messages[i]);
            Thread.sleep(55L);
        }
        int mpm1 = monitor.getMPM();
        assertTrue("unexpected MPM: " + mpm1, mpm1 > 0);

        for (int i = 10; i < messages.length; i++) {
            seq.acknowledge(messages[i]);
            Thread.sleep(110L);
        }
        int mpm2 = monitor.getMPM();
        assertTrue(mpm2 > 0);
        assertTrue(mpm1 > mpm2);
    }

    @Test
    public void testAcknowledgeImmediate() throws SequenceFault {
        Timer timer = mock(Timer.class);
        setUpDestination(timer, null);
        Message message = setUpMessage("1");

        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        assertFalse(seq.sendAcknowledgement());

        seq.acknowledge(message);

        assertTrue(seq.sendAcknowledgement());
        seq.acknowledgmentSent();
        assertFalse(seq.sendAcknowledgement());
    }

    @Test
    public void testAcknowledgeDeferred() throws SequenceFault, RMException, InterruptedException {
        Timer timer = new Timer();
        RMEndpoint rme = mock(RMEndpoint.class);
        setUpDestination(timer, rme);

        Bus bus = mock(Bus.class);
        when(manager.getBus()).thenReturn(bus);

        when(bus.getExtension(WSAddressingFeatureApplier.class))
        .thenReturn(new WSAddressingFeatureApplier() {
            @Override
            public void initializeProvider(WSAddressingFeature feature, InterceptorProvider provider, Bus bus) {
            }
        });

        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        
        AttributedURIType uri = mock(AttributedURIType.class);
        when(ref.getAddress()).thenReturn(uri);

        Proxy proxy = spy(new Proxy(rme));
        when(rme.getProxy()).thenReturn(proxy);
        doCallRealMethod().when(proxy).acknowledge(seq);
        when(destination.getReliableEndpoint()).thenReturn(endpoint);

        InterfaceInfo ii = mock(InterfaceInfo.class);
        when(ii.getOperation(ProtocolVariation.RM10WSA200408.getConstants().getSequenceAckOperationName()))
            .thenReturn(new OperationInfo());

        ServiceInfo si = new ServiceInfo();
        si.setInterface(ii);

        BindingInfo bi = new BindingInfo(si, null);
        Endpoint ae = mock(Endpoint.class);
        when(ae.getEndpointInfo()).thenReturn(new EndpointInfo(si, RM10Constants.NAMESPACE_URI));
        when(endpoint.getBindingInfo(ProtocolVariation.RM10WSA200408)).thenReturn(bi);
        when(endpoint.getEndpoint(ProtocolVariation.RM10WSA200408)).thenReturn(ae);
        when(endpoint.getManager()).thenReturn(manager);

        Message[] messages = new Message[] {
            setUpMessage("1"),
            setUpMessage("2"),
            setUpMessage("3")
        };

        ap.setIntraMessageThreshold(0);
        config.setAcknowledgementInterval(200L);

        assertFalse(seq.sendAcknowledgement());

        for (int i = 0; i < messages.length; i++) {
            seq.acknowledge(messages[i]);
        }

        assertFalse(seq.sendAcknowledgement());

        Thread.sleep(250L);

        assertTrue(seq.sendAcknowledgement());
        seq.acknowledgmentSent();
        assertFalse(seq.sendAcknowledgement());
    }

    @Test
    public void testCorrelationID() {
        setUpDestination();
        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        String correlationID = "abdc1234";
        assertNull("unexpected correlation ID", seq.getCorrelationID());
        seq.setCorrelationID(correlationID);
        assertEquals("unexpected correlation ID",
                     correlationID,
                     seq.getCorrelationID());
    }

    @Test
    public void testApplyDeliveryAssuranceAtMostOnce() throws RMException {
        setUpDestination();

        long mn = 10;
        SequenceAcknowledgement ack = mock(SequenceAcknowledgement.class);
        List<AcknowledgementRange> ranges = new ArrayList<>();
        AcknowledgementRange r = mock(AcknowledgementRange.class);
        when(ack.getAcknowledgementRange()).thenReturn(ranges);
        config.setDeliveryAssurance(DeliveryAssurance.AT_MOST_ONCE);

        DestinationSequence ds = new DestinationSequence(id, ref, 0, ack, ProtocolVariation.RM10WSA200408);
        ds.setDestination(destination);
        ds.applyDeliveryAssurance(mn, null);

        ranges.add(r);
        when(destination.getReliableEndpoint()).thenReturn(endpoint);
        when(endpoint.getConfiguration()).thenReturn(config);
        when(ack.getAcknowledgementRange()).thenReturn(ranges);
        when(r.getLower()).thenReturn(Long.valueOf(5));
        when(r.getUpper()).thenReturn(Long.valueOf(15));

        ds.applyDeliveryAssurance(mn, null);
    }

    @Test
    public void testInOrderWait() throws InterruptedException {
        setUpDestination();
        
        Bus bus = mock(Bus.class);
        when(manager.getBus()).thenReturn(bus);

        Message[] messages = new Message[5];
        for (int i = 0; i < messages.length; i++) {
            messages[i] = setUpMessage(Integer.toString(i + 1));
        }

        config.setDeliveryAssurance(DeliveryAssurance.AT_LEAST_ONCE);

        SequenceAcknowledgement ack = factory.createSequenceAcknowledgement();
        List<AcknowledgementRange> ranges = new ArrayList<>();

        final AcknowledgementRange r =
            factory.createSequenceAcknowledgementAcknowledgementRange();
        r.setUpper(Long.valueOf(messages.length));
        ranges.add(r);
        final DestinationSequence ds = new DestinationSequence(id, ref, 0, ack,
            ProtocolVariation.RM10WSA200408);
        ds.setDestination(destination);

        class Acknowledger extends Thread {
            Message message;
            long messageNr;

            Acknowledger(Message m, long mn) {
                message = m;
                messageNr = mn;
            }

            public void run() {
                try {
                    ds.acknowledge(message);
                    ds.applyDeliveryAssurance(messageNr, message);
                } catch (Exception ex) {
                    // ignore
                }
            }
        }

        Thread[] threads = new Thread[messages.length];
        for (int i = messages.length - 1; i >= 0; i--) {
            threads[i] = new Acknowledger(messages[i], i + 1);
            threads[i].start();
            Thread.sleep(100L);
        }

        boolean timedOut = false;
        for (int i = 0; i < messages.length; i++) {
            try {
                threads[i].join(1000L);
            } catch (InterruptedException ex) {
                timedOut = true;
            }
        }
        assertFalse("timed out waiting for messages to be processed in order", timedOut);
    }

    @Test
    public void testScheduleSequenceTermination() throws SequenceFault, InterruptedException {
        Timer timer = new Timer();
        RMEndpoint rme = mock(RMEndpoint.class);
        when(rme.getProxy()).thenReturn(mock(Proxy.class));
        setUpDestination(timer, rme);

        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        doCallRealMethod().when(destination).terminateSequence(seq, true);

        Message message = setUpMessage("1");

        long arrival = System.currentTimeMillis();
        when(rme.getLastApplicationMessage()).thenReturn(arrival);

        config.setInactivityTimeout(200L);

        seq.acknowledge(message);

        Thread.sleep(250L);
    }

    @Test
    public void testSequenceTermination() {
        destination = mock(Destination.class);
        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        RMEndpoint rme = mock(RMEndpoint.class);
        when(destination.getReliableEndpoint()).thenReturn(rme);
        manager = mock(RMManager.class);
        RMStore store = mock(RMStore.class);
        when(manager.getStore()).thenReturn(store);
        when(destination.getManager()).thenReturn(manager);
        DestinationSequence.SequenceTermination st = seq.new SequenceTermination();
        st.updateInactivityTimeout(30000L);
        long lastAppMessage = System.currentTimeMillis() - 30000L;
        when(rme.getLastControlMessage()).thenReturn(0L);
        when(rme.getLastApplicationMessage()).thenReturn(lastAppMessage);
        doCallRealMethod().when(destination).terminateSequence(seq, true);

        st.run();
    }

    @Test
    public void testSequenceTerminationNotNecessary() {
        destination = mock(Destination.class);
        manager = mock(RMManager.class);
        when(destination.getManager()).thenReturn(manager);
        Timer t = new Timer();
        when(manager.getTimer()).thenReturn(t);
        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        RMEndpoint rme = mock(RMEndpoint.class);
        when(destination.getReliableEndpoint()).thenReturn(rme);
        DestinationSequence.SequenceTermination st = seq.new SequenceTermination();
        st.updateInactivityTimeout(30000L);
        long lastAppMessage = System.currentTimeMillis() - 1000L;
        when(rme.getLastControlMessage()).thenReturn(0L);
        when(rme.getLastApplicationMessage()).thenReturn(lastAppMessage);

        st.run();
    }

    @Test
    public void testCanPiggybackAckOnPartialResponse() {
        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        AttributedURIType uri = mock(AttributedURIType.class);
        when(ref.getAddress()).thenReturn(uri);
        String addr = "http://localhost:9999/reponses";
        when(uri.getValue()).thenReturn(addr);
        assertFalse(seq.canPiggybackAckOnPartialResponse());

        when(ref.getAddress()).thenReturn(uri);
        when(uri.getValue()).thenReturn(Names.WSA_ANONYMOUS_ADDRESS);
        assertTrue(seq.canPiggybackAckOnPartialResponse());
    }

    @Test
    public void testPurgeAcknowledged() {
        destination = mock(Destination.class);
        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);
        manager = mock(RMManager.class);
        when(destination.getManager()).thenReturn(manager);
        RMStore store = mock(RMStore.class);
        when(manager.getStore()).thenReturn(store);

        seq.purgeAcknowledged(1);
        verify(store).removeMessages(eq(id),
             CastUtils.cast(isA(Collection.class), Long.class), eq(false));
    }

    @Test
    public void testCancelDeferredAcknowledgements() {
        destination = mock(Destination.class);
        manager = mock(RMManager.class);
        when(destination.getManager()).thenReturn(manager);
        Timer t = new Timer();
        when(manager.getTimer()).thenReturn(t);
        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);

        seq.scheduleDeferredAcknowledgement(30000L);
        seq.cancelDeferredAcknowledgments();
        seq.cancelDeferredAcknowledgments();
        t.cancel();
    }

    @Test
    public void testCancelTermination() {
        destination = mock(Destination.class);
        manager = mock(RMManager.class);
        when(destination.getManager()).thenReturn(manager);
        Timer t = new Timer();
        when(manager.getTimer()).thenReturn(t);
        DestinationSequence seq = new DestinationSequence(id, ref, destination,
            ProtocolVariation.RM10WSA200408);

        seq.scheduleSequenceTermination(30000L);
        seq.cancelTermination();
        t.cancel();
    }

    private void setUpDestination() {
        setUpDestination(null, null);
    }

    private void setUpDestination(Timer timer, RMEndpoint rme) {

        manager = mock(RMManager.class);

        org.apache.cxf.ws.rm.manager.ObjectFactory cfgFactory =
            new org.apache.cxf.ws.rm.manager.ObjectFactory();
        dp = cfgFactory.createDestinationPolicyType();
        ap = cfgFactory.createAcksPolicyType();
        dp.setAcksPolicy(ap);

        config = new RMConfiguration();
        config.setBaseRetransmissionInterval(3000L);
        when(manager.getConfiguration()).thenReturn(config);
        endpoint = rme;
        if (endpoint == null) {
            endpoint = mock(RMEndpoint.class);
        }
        when(endpoint.getConfiguration()).thenReturn(config);

        when(manager.getDestinationPolicy()).thenReturn(dp);
        when(manager.getStore()).thenReturn(null);

        destination = mock(Destination.class);
        when(destination.getManager()).thenReturn(manager);
        when(destination.getReliableEndpoint()).thenReturn(endpoint);

        if (null != timer) {
            when(manager.getTimer()).thenReturn(timer);
        }

    }

    private Message setUpMessage(String messageNr) {
        return setUpMessage(messageNr, false);
    }

    private Message setUpMessage(String messageNr, boolean useuri) {
        Message message = mock(Message.class);
        Exchange exchange = mock(Exchange.class);
        when(message.getExchange()).thenReturn(exchange);
        when(exchange.getOutMessage()).thenReturn(null);
        when(exchange.getOutFaultMessage()).thenReturn(null);
        RMProperties rmps = mock(RMProperties.class);
        when(message.get(RMMessageConstants.RM_PROPERTIES_INBOUND)).thenReturn(rmps);
        SequenceType st = mock(SequenceType.class);
        when(rmps.getSequence()).thenReturn(st);
        Long val = Long.valueOf(messageNr);
        when(st.getMessageNumber()).thenReturn(val);
        if (useuri) {
            when(rmps.getNamespaceURI()).thenReturn(RM10Constants.NAMESPACE_URI);
        }
        return message;
    }
}