Stream<E> asyncExpand<E>(Stream<E> convert(T event))

Creates a new stream with the events of a stream per original event.

This acts like expand, except that convert returns a Stream instead of an Iterable. The events of the returned stream becomes the events of the returned stream, in the order they are produced.

If convert returns null, no value is put on the output stream, just as if it returned an empty stream.

The returned stream is a broadcast stream if this stream is.

Source

Stream<E> asyncExpand<E>(Stream<E> convert(T event)) {
  StreamController<E> controller;
  StreamSubscription<T> subscription;
  void onListen() {
    assert(controller is _StreamController ||
        controller is _BroadcastStreamController);
    final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/;
    subscription = this.listen((T event) {
      Stream<E> newStream;
      try {
        newStream = convert(event);
      } catch (e, s) {
        controller.addError(e, s);
        return;
      }
      if (newStream != null) {
        subscription.pause();
        controller.addStream(newStream).whenComplete(subscription.resume);
      }
    },
        onError: eventSink._addError, // Avoid Zone error replacement.
        onDone: controller.close);
  }

  if (this.isBroadcast) {
    controller = new StreamController<E>.broadcast(
        onListen: onListen,
        onCancel: () {
          subscription.cancel();
        },
        sync: true);
  } else {
    controller = new StreamController<E>(
        onListen: onListen,
        onPause: () {
          subscription.pause();
        },
        onResume: () {
          subscription.resume();
        },
        onCancel: () => subscription.cancel(),
        sync: true);
  }
  return controller.stream;
}