SourceSequenceTest.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements. See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.cxf.ws.rm;

import java.util.Date;

import javax.xml.datatype.Duration;

import org.apache.cxf.jaxb.DatatypeFactory;
import org.apache.cxf.ws.rm.manager.SequenceTerminationPolicyType;
import org.apache.cxf.ws.rm.manager.SourcePolicyType;
import org.apache.cxf.ws.rm.v200702.Expires;
import org.apache.cxf.ws.rm.v200702.Identifier;
import org.apache.cxf.ws.rm.v200702.ObjectFactory;
import org.apache.cxf.ws.rm.v200702.SequenceAcknowledgement;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class SourceSequenceTest {

    private ObjectFactory factory;
    private Identifier id;

    private Source source;
    private RMManager manager;
    private SourcePolicyType sp;
    private SequenceTerminationPolicyType stp;
    private RetransmissionQueue rq;

    @Before
    public void setUp() {
        factory = new ObjectFactory();
        id = factory.createIdentifier();
        id.setValue("seq");
    }

    @After
    public void tearDown() {
        source = null;
        manager = null;
        sp = null;
        stp = null;
        rq = null;
    }

    protected void setUpSource() {
        source = mock(Source.class);
        manager = mock(RMManager.class);
        when(source.getManager()).thenReturn(manager);
        rq = mock(RetransmissionQueue.class);
        when(manager.getRetransmissionQueue()).thenReturn(rq);

        // default termination policy

        org.apache.cxf.ws.rm.manager.ObjectFactory cfgFactory =
            new org.apache.cxf.ws.rm.manager.ObjectFactory();
        sp = cfgFactory.createSourcePolicyType();
        stp = cfgFactory
            .createSequenceTerminationPolicyType();
        sp.setSequenceTerminationPolicy(stp);
        when(manager.getSourcePolicy()).thenReturn(sp);
    }

    @Test
    public void testConstructors() {

        Identifier otherId = factory.createIdentifier();
        otherId.setValue("otherSeq");

        SourceSequence seq = new SourceSequence(id, ProtocolVariation.RM10WSA200408);
        assertEquals(id, seq.getIdentifier());
        assertFalse(seq.isLastMessage());
        assertFalse(seq.isExpired());
        assertEquals(0, seq.getCurrentMessageNr());
        assertNotNull(seq.getAcknowledgement());
        assertEquals(0, seq.getAcknowledgement().getAcknowledgementRange().size());
        assertFalse(seq.allAcknowledged());
        assertFalse(seq.offeredBy(otherId));

        Date expiry = new Date(System.currentTimeMillis() + 3600 * 1000);

        seq = new SourceSequence(id, expiry, null, ProtocolVariation.RM10WSA200408);
        assertEquals(id, seq.getIdentifier());
        assertFalse(seq.isLastMessage());
        assertFalse(seq.isExpired());
        assertEquals(0, seq.getCurrentMessageNr());
        assertNotNull(seq.getAcknowledgement());
        assertEquals(0, seq.getAcknowledgement().getAcknowledgementRange().size());
        assertFalse(seq.allAcknowledged());
        assertFalse(seq.offeredBy(otherId));

        seq = new SourceSequence(id, expiry, otherId, ProtocolVariation.RM10WSA200408);
        assertTrue(seq.offeredBy(otherId));
        assertFalse(seq.offeredBy(id));
    }

    @Test
    public void testSetExpires() {
        SourceSequence seq = new SourceSequence(id, ProtocolVariation.RM10WSA200408);

        Expires expires = factory.createExpires();
        seq.setExpires(expires);

        assertFalse(seq.isExpired());

        Duration d = DatatypeFactory.PT0S;
        expires.setValue(d);
        seq.setExpires(expires);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ex) {
            assertFalse(seq.isExpired());
        }

        d = DatatypeFactory.createDuration("PT1S");
        expires.setValue(d);
        seq.setExpires(expires);
        assertFalse(seq.isExpired());

        d = DatatypeFactory.createDuration("-PT1S");
        expires.setValue(d);
        seq.setExpires(expires);
        assertTrue(seq.isExpired());
    }

    @Test
    public void testEqualsAndHashCode() {
        SourceSequence seq = new SourceSequence(id, ProtocolVariation.RM10WSA200408);
        SourceSequence otherSeq = null;
        assertNotEquals(seq, otherSeq);
        otherSeq = new SourceSequence(id, ProtocolVariation.RM10WSA200408);
        assertEquals(seq, otherSeq);
        assertEquals(seq.hashCode(), otherSeq.hashCode());
        Identifier otherId = factory.createIdentifier();
        otherId.setValue("otherSeq");
        otherSeq = new SourceSequence(otherId, ProtocolVariation.RM10WSA200408);
        assertNotEquals(seq, otherSeq);
        assertTrue(seq.hashCode() != otherSeq.hashCode());
    }

    @Test
    public void testSetAcknowledged() throws RMException {
        SourceSequence seq = new SourceSequence(id, ProtocolVariation.RM10WSA200408);
        setUpSource();
        seq.setSource(source);

        SequenceAcknowledgement ack = factory.createSequenceAcknowledgement();
        SequenceAcknowledgement.AcknowledgementRange r =
            factory.createSequenceAcknowledgementAcknowledgementRange();
        r.setLower(Long.valueOf(1));
        r.setUpper(Long.valueOf(2));
        ack.getAcknowledgementRange().add(r);
        r = factory.createSequenceAcknowledgementAcknowledgementRange();
        r.setLower(Long.valueOf(4));
        r.setUpper(Long.valueOf(6));
        ack.getAcknowledgementRange().add(r);
        r = factory.createSequenceAcknowledgementAcknowledgementRange();
        r.setLower(Long.valueOf(8));
        r.setUpper(Long.valueOf(10));
        ack.getAcknowledgementRange().add(r);
        doNothing().when(rq).purgeAcknowledged(seq);

        seq.setAcknowledged(ack);
        assertSame(ack, seq.getAcknowledgement());
        assertEquals(3, ack.getAcknowledgementRange().size());
        assertFalse(seq.isAcknowledged(3));
        assertTrue(seq.isAcknowledged(5));

        verify(rq, atLeastOnce()).purgeAcknowledged(seq);
    }

    @Test
    public void testAllAcknowledged() throws RMException {

        SourceSequence seq = new SourceSequence(id, null, null, 4, false,
                                                ProtocolVariation.RM10WSA200408);
        setUpSource();
        seq.setSource(source);

        assertFalse(seq.allAcknowledged());
        seq.setLastMessage(true);
        assertFalse(seq.allAcknowledged());
        SequenceAcknowledgement ack = factory.createSequenceAcknowledgement();
        SequenceAcknowledgement.AcknowledgementRange r =
            factory.createSequenceAcknowledgementAcknowledgementRange();
        r.setLower(Long.valueOf(1));
        r.setUpper(Long.valueOf(2));
        ack.getAcknowledgementRange().add(r);
        rq.purgeAcknowledged(seq);

        seq.setAcknowledged(ack);
        assertFalse(seq.allAcknowledged());
        r.setUpper(Long.valueOf(4));
        assertTrue(seq.allAcknowledged());

        verify(rq, atLeastOnce()).purgeAcknowledged(seq);
    }

    @Test
    public void testNextMessageNumber() throws RMException {
        setUpSource();

        // default termination policy

        SourceSequence seq = new SourceSequence(id, ProtocolVariation.RM10WSA200408);
        seq.setSource(source);
        assertFalse(nextMessages(seq, 10));
        
        // termination policy max length = 1

        seq = new SourceSequence(id, ProtocolVariation.RM10WSA200408);
        seq.setSource(source);
        stp.setMaxLength(1);
        assertTrue(nextMessages(seq, 10));
        assertEquals(1, seq.getCurrentMessageNr());

        // termination policy max length = 5
        seq = new SourceSequence(id, ProtocolVariation.RM10WSA200408);
        seq.setSource(source);
        stp.setMaxLength(5);
        assertFalse(nextMessages(seq, 2));

        // termination policy max range exceeded

        seq = new SourceSequence(id, ProtocolVariation.RM10WSA200408);
        seq.setSource(source);
        stp.setMaxLength(0);
        stp.setMaxRanges(3);
        acknowledge(seq, 1, 2, 4, 5, 6, 8, 9, 10);
        assertTrue(nextMessages(seq, 10));
        assertEquals(1, seq.getCurrentMessageNr());
        verify(rq, atLeastOnce()).purgeAcknowledged(seq);

        // termination policy max range not exceeded

        seq = new SourceSequence(id, ProtocolVariation.RM10WSA200408);
        seq.setSource(source);
        stp.setMaxLength(0);
        stp.setMaxRanges(4);
        acknowledge(seq, 1, 2, 4, 5, 6, 8, 9, 10);
        assertFalse(nextMessages(seq, 10));

        // termination policy max unacknowledged
    }

    @Test
    public void testGetEndpointIdentfier() {
        setUpSource();
        String name = "abc";
        when(source.getName()).thenReturn(name);

        SourceSequence seq = new SourceSequence(id, ProtocolVariation.RM10WSA200408);
        seq.setSource(source);
        assertEquals("Unexpected endpoint identifier", name, seq.getEndpointIdentifier());
    }

    @Test
    public void testCheckOfferingSequenceClosed() {
        setUpSource();

        RMEndpoint rme = mock(RMEndpoint.class);
        when(source.getReliableEndpoint()).thenReturn(rme);
        Destination destination = mock(Destination.class);
        when(rme.getDestination()).thenReturn(destination);
        DestinationSequence dseq = mock(DestinationSequence.class);
        Identifier did = mock(Identifier.class);
        when(destination.getSequence(did)).thenReturn(dseq);
        when(dseq.getLastMessageNumber()).thenReturn(Long.valueOf(1));
        when(did.getValue()).thenReturn("dseq");

        SourceSequence seq = new SourceSequence(id, null, did, ProtocolVariation.RM10WSA200408);
        seq.setSource(source);
        seq.nextMessageNumber(did, 1, false);
        assertTrue(seq.isLastMessage());
    }

    private boolean nextMessages(SourceSequence seq,
                                 int n) {
        int i = 0;
        while ((i < n) && !seq.isLastMessage()) {
            seq.nextMessageNumber();
            i++;
        }
        return seq.isLastMessage();
    }

    protected void acknowledge(SourceSequence seq, int... messageNumbers) throws RMException {
        SequenceAcknowledgement ack = factory.createSequenceAcknowledgement();
        int i = 0;
        while (i < messageNumbers.length) {
            SequenceAcknowledgement.AcknowledgementRange r =
                factory.createSequenceAcknowledgementAcknowledgementRange();
            Long l = Long.valueOf(messageNumbers[i]);
            r.setLower(l);
            i++;

            while (i < messageNumbers.length && (messageNumbers[i] - messageNumbers[i - 1]) == 1) {
                i++;
            }
            Long u = Long.valueOf(messageNumbers[i - 1]);
            r.setUpper(u);
            ack.getAcknowledgementRange().add(r);
        }
        seq.setAcknowledged(ack);
    }
}