1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use crate::{
    core::{advance, async_advance, Airlock as _, Next},
    ops::{Coroutine, GeneratorState},
    sync::{engine::Airlock, Co},
};
use std::{future::Future, pin::Pin};

/// This is a generator which can be shared between threads.
///
/// [_See the module-level docs for examples._](.)
pub struct Gen<Y, R, F: Future> {
    airlock: Airlock<Y, R>,
    future: Pin<Box<F>>,
}

impl<Y, R, F: Future> Gen<Y, R, F> {
    /// Creates a new generator from a function.
    ///
    /// The function accepts a [`Co`] object, and returns a future. Every time
    /// the generator is resumed, the future is polled. Each time the future is
    /// polled, it should do one of two things:
    ///
    /// - Call `co.yield_()`, and then return `Poll::Pending`.
    /// - Drop the `Co`, and then return `Poll::Ready`.
    ///
    /// Typically this exchange will happen in the context of an `async fn`.
    ///
    /// [_See the module-level docs for examples._](.)
    pub fn new(producer: impl FnOnce(Co<Y, R>) -> F) -> Self {
        let airlock = Airlock::default();
        let future = { Box::pin(producer(Co::new(airlock.clone()))) };
        Self { airlock, future }
    }

    /// Resumes execution of the generator.
    ///
    /// `arg` is the resume argument. If the generator was previously paused by
    /// awaiting a future returned from `co.yield()`, that future will complete,
    /// and return `arg`.
    ///
    /// If the generator yields a value, `Yielded` is returned. Otherwise,
    /// `Completed` is returned.
    ///
    /// [_See the module-level docs for examples._](.)
    pub fn resume_with(&mut self, arg: R) -> GeneratorState<Y, F::Output> {
        self.airlock.replace(Next::Resume(arg));
        advance(self.future.as_mut(), &self.airlock)
    }
}

impl<Y, F: Future> Gen<Y, (), F> {
    /// Resumes execution of the generator.
    ///
    /// If the generator yields a value, `Yielded` is returned. Otherwise,
    /// `Completed` is returned.
    ///
    /// [_See the module-level docs for examples._](.)
    pub fn resume(&mut self) -> GeneratorState<Y, F::Output> {
        self.resume_with(())
    }

    /// Resumes execution of the generator.
    ///
    /// If the generator pauses without yielding, `Poll::Pending` is returned.
    /// If the generator yields a value, `Poll::Ready(Yielded)` is returned.
    /// Otherwise, `Poll::Ready(Completed)` is returned.
    ///
    /// [_See the module-level docs for examples._](.)
    pub fn async_resume(
        &mut self,
    ) -> impl Future<Output = GeneratorState<Y, F::Output>> + '_ {
        self.airlock.replace(Next::Resume(()));
        async_advance(self.future.as_mut(), self.airlock.clone())
    }
}

impl<Y, R, F: Future> Coroutine for Gen<Y, R, F> {
    type Yield = Y;
    type Resume = R;
    type Return = F::Output;

    fn resume_with(
        mut self: Pin<&mut Self>,
        arg: R,
    ) -> GeneratorState<Self::Yield, Self::Return> {
        Self::resume_with(&mut *self, arg)
    }
}