Warm tip: This article is reproduced from serverfault.com, please click

Rust create a sink from a future

发布于 2020-11-27 23:03:28

I want to transform some reader/writer into a record pipe of elements. I managed to make the reader->stream direction using futures::stream::unfold as per this answer. However, I'm having trouble with the sink->writer. I'm basically looking for some inverse function to unfold.

I know there is AsyncWriterExt::into_sink but this only works if I can generate all bytes to write in one batch. I also found this answer suggesting to use drain() after with(). This didn't work however due to lifetime issues (the FnMut cannot effectively store the reference to the writer, or at least I didn't manage to do this.

So what I'm looking for is some function that I could call like fold(initial_state, |element| {(writer.write(element).await, new_state)}). You get the idea (I hope).

I've also seen that there's async_codec but that seems like overkill for me. In the meantime I've resorted to storing all writes as a stream and then use writer.into_sink().with_flat_map(). But that's really ugly.

Questioner
piegames
Viewed
0
piegames 2020-11-28 17:35:57

Edit: I'm apparently not the only one wanting this, see the upstream implementation. Users from the future (hehe) will be able to simply use futures::sink::unfold.


Okay, I took my courage together and hacked something together based off futures::stream::unfold:

fn fold<T, F, Fut, Item, E>(init: T, f: F) -> FoldSink<T, F, Fut>
where
    F: FnMut(T, Item) -> Fut,
    Fut: Future<Output = Result<T, E>>
{
    FoldSink {
        f,
        state: Some(init),
        fut: None,
    }
}


use pin_project::pin_project;

#[pin_project]
struct FoldSink<T, F, Fut> {
    f: F,
    state: Option<T>,
    #[pin]
    fut: Option<Fut>,
}

impl <T, Item, F, Fut, E> futures::sink::Sink<Item> for FoldSink<T, F, Fut>
where
    F: FnMut(T, Item) -> Fut,
    Fut: Future<Output = Result<T, E>>
{
    type Error = E;

    fn poll_ready(self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
        let mut this = self.project();

        match this.fut.as_mut().as_pin_mut() {
            Some(fut) => {
                match fut.poll(ctx) {
                    Poll::Ready(Ok(new_state)) => {
                        this.fut.set(None);
                        *this.state = Some(new_state);
                        Poll::Ready(Ok(()))
                    },
                    Poll::Ready(Err(e)) => {
                        this.fut.set(None);
                        Poll::Ready(Err(e))
                    },
                    Poll::Pending => Poll::Pending,
                }
            },
            None => {
                Poll::Ready(Ok(()))
            }
        }
    }

    fn start_send(self: std::pin::Pin<&mut Self>, item: Item) -> Result<(), E> {
        let mut this = self.project();
        this.fut.set(Some((this.f)(this.state.take().expect("todo invalid state"), item)));
        Ok(())
    }

    fn poll_flush(self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
        self.poll_ready(ctx)
    }

    fn poll_close(mut self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
        futures::ready!(self.as_mut().poll_ready(ctx))?;
        let this = self.project();
        this.state.take().unwrap();
        Poll::Ready(Ok(()))
    }
}

Comments and improvements welcome!