import { StreamController } from 'tricklejs'
import { type StreamInterface } from 'tricklejs/dist/types'

export function asyncExpand<E, T>(
  stream: StreamInterface<T>,
  convert: (data: T) => StreamInterface<E>
) {
  const controller = new StreamController<E>()
  controller.onListen = () => {
    const subscription = stream.listen(
      (event) => {
        let newStream: StreamInterface<E> | undefined
        try {
          newStream = convert(event)
        } catch (e) {
          if (e instanceof Error) {
            controller.addError(e)
          }
          return
        }
        if (newStream) {
          subscription.pause()
          controller.addStream(newStream).then(() => subscription.resume())
        }
      },
      {
        onError: (...args) => controller.addError(...args),
        onDone: (...args) => controller.close(...args),
      }
    )
    controller.onCancel = (...args) => subscription.cancel(...args)
    if (!stream.isBroadcast) {
      controller.onPause = (...args) => subscription.pause(...args)
      controller.onResume = (...args) => subscription.resume(...args)
    }
  }
  return controller.stream
}
