I want to transform some reader/writer into a record pipe of elements. I managed to make the reader->stream direction using futures::stream::unfold
as per this answer. However, I'm having trouble with the sink->writer. I'm basically looking for some inverse function to unfold
.
I know there is AsyncWriterExt::into_sink
but this only works if I can generate all bytes to write in one batch. I also found this answer suggesting to use drain()
after with()
. This didn't work however due to lifetime issues (the FnMut
cannot effectively store the reference to the writer, or at least I didn't manage to do this.
So what I'm looking for is some function that I could call like fold(initial_state, |element| {(writer.write(element).await, new_state)})
. You get the idea (I hope).
I've also seen that there's async_codec
but that seems like overkill for me. In the meantime I've resorted to storing all writes as a stream and then use writer.into_sink().with_flat_map()
. But that's really ugly.
Edit: I'm apparently not the only one wanting this, see the upstream implementation. Users from the future (hehe) will be able to simply use futures::sink::unfold
.
Okay, I took my courage together and hacked something together based off futures::stream::unfold
:
fn fold<T, F, Fut, Item, E>(init: T, f: F) -> FoldSink<T, F, Fut>
where
F: FnMut(T, Item) -> Fut,
Fut: Future<Output = Result<T, E>>
{
FoldSink {
f,
state: Some(init),
fut: None,
}
}
use pin_project::pin_project;
#[pin_project]
struct FoldSink<T, F, Fut> {
f: F,
state: Option<T>,
#[pin]
fut: Option<Fut>,
}
impl <T, Item, F, Fut, E> futures::sink::Sink<Item> for FoldSink<T, F, Fut>
where
F: FnMut(T, Item) -> Fut,
Fut: Future<Output = Result<T, E>>
{
type Error = E;
fn poll_ready(self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
let mut this = self.project();
match this.fut.as_mut().as_pin_mut() {
Some(fut) => {
match fut.poll(ctx) {
Poll::Ready(Ok(new_state)) => {
this.fut.set(None);
*this.state = Some(new_state);
Poll::Ready(Ok(()))
},
Poll::Ready(Err(e)) => {
this.fut.set(None);
Poll::Ready(Err(e))
},
Poll::Pending => Poll::Pending,
}
},
None => {
Poll::Ready(Ok(()))
}
}
}
fn start_send(self: std::pin::Pin<&mut Self>, item: Item) -> Result<(), E> {
let mut this = self.project();
this.fut.set(Some((this.f)(this.state.take().expect("todo invalid state"), item)));
Ok(())
}
fn poll_flush(self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
self.poll_ready(ctx)
}
fn poll_close(mut self: std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Result<(), E>> {
futures::ready!(self.as_mut().poll_ready(ctx))?;
let this = self.project();
this.state.take().unwrap();
Poll::Ready(Ok(()))
}
}
Comments and improvements welcome!