use crate::yielder::Receiver;
use futures_core::{FusedStream, Stream};
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[derive(Debug)]
pub struct AsyncStream<T, U> {
rx: Receiver<T>,
done: bool,
#[pin]
generator: U,
}
}
impl<T, U> AsyncStream<T, U> {
#[doc(hidden)]
pub fn new(rx: Receiver<T>, generator: U) -> AsyncStream<T, U> {
AsyncStream {
rx,
done: false,
generator,
}
}
}
impl<T, U> FusedStream for AsyncStream<T, U>
where
U: Future<Output = ()>,
{
fn is_terminated(&self) -> bool {
self.done
}
}
impl<T, U> Stream for AsyncStream<T, U>
where
U: Future<Output = ()>,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let me = self.project();
if *me.done {
return Poll::Ready(None);
}
let mut dst = None;
let res = {
let _enter = me.rx.enter(&mut dst);
me.generator.poll(cx)
};
*me.done = res.is_ready();
if dst.is_some() {
return Poll::Ready(dst.take());
}
if *me.done {
Poll::Ready(None)
} else {
Poll::Pending
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
if self.done {
(0, Some(0))
} else {
(0, None)
}
}
}