FluxFirstNonEmptyEmittingTests.java

/*
 * Copyright 2013-present the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.commons.publisher;

import java.time.Duration;
import java.util.Arrays;

import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Operators;
import reactor.test.StepVerifier;

import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNullPointerException;

/**
 * @author Tim Ysewyn
 */
public class FluxFirstNonEmptyEmittingTests {

	@Test
	public void arrayNull() {
		assertThatNullPointerException().isThrownBy(() -> CloudFlux.firstNonEmpty((Publisher<Integer>[]) null));
	}

	@Test
	public void iterableNull() {
		assertThatNullPointerException().isThrownBy(() -> CloudFlux.firstNonEmpty((Iterable<Publisher<Integer>>) null));
	}

	@Test
	public void firstWinner() {
		StepVerifier.create(CloudFlux.firstNonEmpty(Flux.range(1, 10), Flux.range(11, 10)))
			.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
			.verifyComplete();
	}

	@Test
	public void firstWinnerSecondEmpty() {
		StepVerifier.create(CloudFlux.firstNonEmpty(Flux.range(1, 10), Flux.empty()))
			.expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
			.verifyComplete();
	}

	@Test
	public void firstWinnerBackpressured() {
		StepVerifier.create(CloudFlux.firstNonEmpty(Flux.range(1, 10), Flux.range(11, 10)))
			.thenRequest(5)
			.expectNext(1, 2, 3, 4, 5)
			.thenCancel()
			.verifyThenAssertThat()
			.hasNotDiscardedElements()
			.hasNotDroppedElements()
			.hasNotDroppedErrors();
	}

	@Test
	public void secondWinner() {
		StepVerifier.create(CloudFlux.firstNonEmpty(Flux.never(), Flux.range(11, 10).log()))
			.expectNext(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
			.verifyComplete();
	}

	@Test
	public void secondWinnerFirstEmpty() {
		StepVerifier.create(CloudFlux.firstNonEmpty(Flux.empty(), Flux.range(11, 10).log()))
			.expectNext(11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
			.verifyComplete();
	}

	@Test
	public void bothEmpty() {
		StepVerifier.create(CloudFlux.firstNonEmpty(Flux.empty(), Flux.empty()))
			.expectComplete()
			.verifyThenAssertThat()
			.hasNotDiscardedElements()
			.hasNotDroppedElements()
			.hasNotDroppedErrors();
	}

	@Test
	public void neverAndEmpty() {
		StepVerifier.withVirtualTime(() -> CloudFlux.firstNonEmpty(Flux.never(), Flux.empty()))
			.expectSubscription()
			.expectNoEvent(Duration.ofDays(1))
			.thenCancel()
			.verifyThenAssertThat()
			.hasNotDiscardedElements()
			.hasNotDroppedElements()
			.hasNotDroppedErrors();
	}

	@Test
	public void firstEmitsError() {
		RuntimeException ex = new RuntimeException("forced failure");
		StepVerifier.create(CloudFlux.firstNonEmpty(Flux.<Integer>error(ex), Flux.empty()))
			.expectErrorMessage("forced failure")
			.verifyThenAssertThat()
			.hasNotDiscardedElements()
			.hasNotDroppedElements()
			.hasNotDroppedErrors();
	}

	@Test
	public void secondEmitsError() {
		RuntimeException ex = new RuntimeException("forced failure");
		StepVerifier.create(CloudFlux.firstNonEmpty(Flux.empty(), Flux.<Integer>error(ex)))
			.expectErrorMessage("forced failure")
			.verifyThenAssertThat()
			.hasNotDiscardedElements()
			.hasNotDroppedElements()
			.hasNotDroppedErrors();
	}

	@Test
	public void neverAndSecondEmitsError() {
		RuntimeException ex = new RuntimeException("forced failure");
		StepVerifier.create(CloudFlux.firstNonEmpty(Flux.never(), Flux.<Integer>error(ex)))
			.expectErrorMessage("forced failure")
			.verifyThenAssertThat()
			.hasNotDiscardedElements()
			.hasNotDroppedElements()
			.hasNotDroppedErrors();
	}

	@Test
	public void singleArrayNullSource() {
		StepVerifier.create(CloudFlux.firstNonEmpty((Publisher<Object>) null))
			.expectError(NullPointerException.class)
			.verify();
	}

	@Test
	public void arrayOneIsNullSource() {
		StepVerifier.create(CloudFlux.firstNonEmpty(Flux.never(), null, Flux.never()))
			.expectError(NullPointerException.class)
			.verify();
	}

	@Test
	public void singleIterableNullSource() {
		StepVerifier.create(CloudFlux.firstNonEmpty(singletonList((Publisher<Object>) null)))
			.expectError(NullPointerException.class)
			.verify();
	}

	@Test
	public void iterableOneIsNullSource() {
		StepVerifier
			.create(CloudFlux.firstNonEmpty(Arrays.asList(Flux.never(), (Publisher<Object>) null, Flux.never())))
			.expectError(NullPointerException.class)
			.verify();
	}

	@Test
	public void scanSubscriber() {
		CoreSubscriber<String> actual = new TestSubscriber<>();
		FluxFirstNonEmptyEmitting.RaceCoordinator<String> parent = new FluxFirstNonEmptyEmitting.RaceCoordinator<>(1);
		FluxFirstNonEmptyEmitting.FirstNonEmptyEmittingSubscriber<String> test = new FluxFirstNonEmptyEmitting.FirstNonEmptyEmittingSubscriber<>(
				actual, parent, 1);
		Subscription sub = Operators.emptySubscription();
		test.onSubscribe(sub);

		assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(sub);
		assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
		assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse();
		parent.cancelled = true;
		assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue();
	}

	@Test
	public void scanRaceCoordinator() {
		CoreSubscriber<String> actual = new TestSubscriber<>();
		FluxFirstNonEmptyEmitting.RaceCoordinator<String> parent = new FluxFirstNonEmptyEmitting.RaceCoordinator<>(1);
		FluxFirstNonEmptyEmitting.FirstNonEmptyEmittingSubscriber<String> test = new FluxFirstNonEmptyEmitting.FirstNonEmptyEmittingSubscriber<>(
				actual, parent, 1);
		Subscription sub = Operators.emptySubscription();
		test.onSubscribe(sub);

		assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(sub);
		assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual);
		assertThat(parent.scan(Scannable.Attr.CANCELLED)).isFalse();
		parent.cancelled = true;
		assertThat(parent.scan(Scannable.Attr.CANCELLED)).isTrue();
	}

	static class TestSubscriber<T> extends BaseSubscriber<T> implements Scannable {

		@Override
		public Object scanUnsafe(Attr key) {
			return null;
		}

	}

}