use crate::stream_ext::Fuse;
use crate::Stream;
use tokio::time::{Instant, Sleep};
use core::future::Future;
use core::pin::Pin;
use core::task::{ready, Context, Poll};
use pin_project_lite::pin_project;
use std::fmt;
use std::time::Duration;
pin_project! {
/// Stream returned by the [`timeout`](super::StreamExt::timeout) method.
#[must_use = "streams do nothing unless polled"]
pub struct Timeout<S> {
stream: Fuse<S>,
deadline: Sleep,
duration: Duration,
poll_deadline: bool,
/// Error returned by `Timeout` and `TimeoutRepeating`.
#[derive(Debug, PartialEq, Eq)]
pub struct Elapsed(());
impl<S: Stream> Timeout<S> {
pub(super) fn new(stream: S, duration: Duration) -> Self {
let next = Instant::now() + duration;
let deadline = tokio::time::sleep_until(next);
Timeout {
stream: Fuse::new(stream),
poll_deadline: true,
impl<S: Stream> Stream for Timeout<S> {
type Item = Result<S::Item, Elapsed>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let me = self.project();
match {
Poll::Ready(v) => {
if v.is_some() {
let next = Instant::now() + *me.duration;
*me.poll_deadline = true;
return Poll::Ready(;
Poll::Pending => {}
if *me.poll_deadline {
*me.poll_deadline = false;
return Poll::Ready(Some(Err(Elapsed::new())));
fn size_hint(&self) -> (usize, Option<usize>) {
let (lower, upper) =;
// The timeout stream may insert an error before and after each message
// from the underlying stream, but no more than one error between each
// message. Hence the upper bound is computed as 2x+1.
// Using a helper function to enable use of question mark operator.
fn twice_plus_one(value: Option<usize>) -> Option<usize> {
(lower, twice_plus_one(upper))
// ===== impl Elapsed =====
impl Elapsed {
pub(crate) fn new() -> Self {
impl fmt::Display for Elapsed {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
"deadline has elapsed".fmt(fmt)
impl std::error::Error for Elapsed {}
impl From<Elapsed> for std::io::Error {
fn from(_err: Elapsed) -> std::io::Error {