Tradition and Modernity in Async Rust

🌐 - 简体中文

Prev knowledges: async-book and tokio basics。

Back to 2019,after Rust 1.39 was released, the async ecos has shifted heavily to the async / .await. However, limited by Rust's strict and complex type system, many functions still require traditional methods today, which makes us sometimes need to shuttle between tradition and modernity. After my archaeology exploration, I got some tips & experience, and hope it can be helpful for you.

Brief of Tradition and Modernity

As we all know, the primitive async is callback, Future encapsulates the callback, and async is a sugar for Future1.

Now let's implement a File to read file on disk asynchronously.

Traditional methods:

struct File;
impl File {
    fn poll_read(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Vec<u8>> { todo!() }
}
fn run(file: Pin<&mut File>, cx: &mut Context) -> Poll<()> {
    let v = match file.poll_read(cx) {
        Poll::Ready(v) => v,
        Poll::Pending => return Poll::Pending,
    };
    println!("read finish");
    Poll::Ready(())
}

This reveals the essence of async fn in Rust: returning task status, binding the Waker in Context to notify the executor.

Modern methods:

struct File;
impl File {
    async fn read(&self) -> Vec<u8> { todo!() }
}
async fn run(file: File) {
    let v = file.read().await;
    println!("read finish");
}

Very concise. Compared with the traditions, the details are hidden without increasing the overhead.

But where is the cx: &mut Context ? We'll get to that in a moment.

Tradition in Modernity

Sometimes a function in a third-party crate uses traditional writings, but we want to use it in async {}. It's a common requirement, we could use poll_fn:

// use futures::future::poll_fn;
use std::future::poll_fn; // stabilized in 1.64 (#99306)
let v = poll_fn(|cx| Pin::new(&mut file).poll_read(cx)).await;

View poll_fn's define, it returns PollFn。This object can be await because it implements the Future trait and provided fn poll(). So if you want to use fn poll_read() in async {}, just wrap it, returns a object which implements Future

// the detail of above
struct ReadFuture<'a>(&'a mut File);
impl<'a> Future for ReadFuture<'a> {
    type Output = Vec<u8>;
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        // call `poll_read` in `poll`
        Pin::new(&mut *self.get_mut().0).poll_read(cx)
    }
}
let v = ReadFuture(&mut file).await; // called `poll`

As you may have guessed, cx is implicitly in async {}. When .await called, cx is passed to ReadFuture::poll(self, cx), then passed to File::poll_read(self, cx).

Limitation of Modernity

Modern method is concise and powerful, but it's limited in many case. The most common are:

trait AsyncRead {
    async fn read(&self) -> Vec<u8>;
}
impl AsyncRead for File {
    async fn read(&self) -> Vec<u8> { todo!() }
}

Try to compile, we got E0706, and adviced to use async-trait crate。

As to why it is difficult to declare async fn in trait currently, this article provided details. If you wouldn't want to know much more, read explanation in async-trait docs at least, to understand the result of marco expand.

Generally, async fn is zero overhead, but async-trait isn't, it will cause unnecessary heap allocations. That's why many crate still use the trational methods, provide fn poll_sth which returns Poll. Such as tokio::net::TcpStream::poll_read

Modernity in Tradition

Now suppose you already know what async-trait is after expanding.

Then we may hit some trouble, such as trying to implement a trait that includes poll_sth for our struct. For example, the futures::stream::Stream.

What we want is:

struct File;
impl File {
    async fn read(&self) -> Option<Vec<u8>> {
        tokio::time::sleep(Duration::from_millis(500)).await;
        Some(vec![1, 2, 3])
    }
}
struct FileStream;
impl Stream for FileStream {
    type Item = Vec<u8>;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        // call `File::read` here
    }
}

File::read simulates reading a file asynchronously, read some content each time, returns None if reached the end. Here to simplify the code, it never ended.

Let's create FileStream and implements the trait. poll_next should call async fn File::read. It will not be difficult, because async is just a syntax sugar, right? So we wrote about this:

struct FileStream {
    file: File,
    fut: Pin<Box<dyn Future<Output = Option<Vec<u8>>> + Send>>,
}
impl Stream for FileStream {
    type Item = Vec<u8>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let v = ready!(Pin::new(&mut self.fut).poll(cx));
        self.fut = Box::pin(self.file.read()); // error!
        // cannot borrow `self` as mutable because it is also borrowed as immutable
        Poll::Ready(v)
    }
}

After Ready, we must update self.fut. So we smoothly get the classic E0502.

The solution is simple, just put File into Future::Output. Each time we got a self.fut which is Ready, wrap File into next Future:

type FileStreamFuture = Pin<Box<dyn Future<Output = (Option<Vec<u8>>, File)> + Send>>;
struct FileStream(FileStreamFuture);
// in `poll_next`:
let (v, file) = ready!(self.0.as_mut().poll(cx));
self.0 = Box::pin(async { (file.read().await, file) });

As you can see, we now place the return value of File::read with File itself, to avoid holding both mutable and immutable borrows.

Complete runnable code (click to expand)
#![feature(future_poll_fn)] // stabilized in 1.64 (#99306)
use futures_core::{ready, Stream};
use std::future::{poll_fn, Future};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
struct File;
impl File {
    async fn read(&self) -> Option<Vec<u8>> {
        tokio::time::sleep(Duration::from_millis(500)).await;
        Some(vec![1, 2, 3])
    }
}
type FileStreamFuture = Pin<Box<dyn Future<Output = (Option<Vec<u8>>, File)> + Send>>;
struct FileStream(FileStreamFuture);
impl FileStream {
    fn make_future(file: File) -> FileStreamFuture {
        Box::pin(async { (file.read().await, file) })
    }
    fn new(file: File) -> Self {
        Self(Self::make_future(file))
    }
}
impl Stream for FileStream {
    type Item = Vec<u8>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let (v, file) = ready!(self.0.as_mut().poll(cx));
        self.0 = Self::make_future(file);
        Poll::Ready(v)
    }
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
    let mut file_stream = FileStream::new(File);
    while let Some(v) = poll_fn(|cx| Pin::new(&mut file_stream).poll_next(cx)).await {
        println!("{:?}", v);
    }
}

So complex. Or, change File::read to poll_read too? Ouch, Everything has regressed to 3 years ago, which is too uncomfortable, especially if the async fn to be called calls another async. It may not only be one function, but all functions called indirectly, even in third-party crates. This is a manifestation of the famous Colored Function problem.

The async fn returns opaque type. For convenience, we used trait object here. This can be avoided, but is beyond the topic of this article. You can take a look at a common practice.

Common Mistakes

Code in the previous section seems complex. If you are "smart" enough, you may think of the following "simplified method":

Follow the "good ideas" of compiler

Write file.read().poll(cx) into poll_next directly, the compiler will hint you step by step. Use Box to store unsized object, haven't fn poll() but exists in Pin<&mut> ... In the end you will write this:

struct FileStream<'a>(Pin<&'a mut File>);
impl<'a> Stream for FileStream<'a> {
    type Item = Vec<u8>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        ready!(Box::pin(self.0.as_mut().read()).as_mut().poll(cx))
    }
}

Try it out. Well, after outputting once, it's stuck. Why?

Certainly, Context contains Waker was put to the File::read's Future, then tokio::time::sleep's Future. After timer expired, executor should be waken up.

But we can see that in this way, the returned Future value of File::read was dropped at the end of poll_next, together with the timer's Future and Waker. Now that Waker is dropped, the executor can no longer be actively woken up. This spelling is wrong。Future must be keep held before getting the output

Use unsafe confidently

While we were trying to update self.fut but hit E0502 error, it doesn't seem to be any problem, the compiler is so stupid. There are two different fields in struct, which has no-overlapped memory area. So we directly resort to violence, using unsafe

let file = unsafe { std::ptr::addr_of_mut!(self.file).as_mut().unwrap() };
self.fut = Box::pin(file.read());

(Confidently write comments on the side:// safety: two fields was not overlapped

Compiled, it runs. But if you apply this on the program which deal with real files or network, and run it for a while, oh, core dumped!

That's because: If FileStream was dropped, the File will be dropped together. Then if the executor is woken up unexpectedly after this, the program will try to read the File that has been dropped, resulting in use-after-free.

References

  1. https://users.rust-lang.org/t/desugaring-async-fn/63698

  2. https://aturon.github.io/blog/2016/08/11/futures/

  3. https://docs.rs/tokio-stream/0.1.9/tokio_stream/wrappers/struct.BroadcastStream.html

Translation

简体中文

前置知识:async-booktokio 基础。

时间回溯到 2019 年,在 Rust 1.39 发布后,异步生态大量地转向了使用 async / .await 的现代写法。但受限于 Rust 严格而复杂的类型系统,许多功能时至今日依然需要传统写法才能完成,这使得我们有时候需要在传统与现代写法之间穿梭。本人经过考古探索,得到了一些提示与经验,希望能对读者有帮助。

传统与现代写法简述

我们都知道,最原始的异步是回调的形式,Future 是对回调的封装,而 asyncFuture 的语法糖1

现在假设我们要实现一个 File,用于异步读取硬盘上的文件。

传统写法:

struct File;
impl File {
    fn poll_read(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Vec<u8>> { todo!() }
}
fn run(file: Pin<&mut File>, cx: &mut Context) -> Poll<()> {
    let v = match file.poll_read(cx) {
        Poll::Ready(v) => v,
        Poll::Pending => return Poll::Pending,
    };
    println!("read finish");
    Poll::Ready(())
}

传统写法揭示了 Rust 中异步函数的本质:返回任务状态,绑定 Context 中的 Waker 以及时通知调度器。

现代写法:

struct File;
impl File {
    async fn read(&self) -> Vec<u8> { todo!() }
}
async fn run(file: File) {
    let v = file.read().await;
    println!("read finish");
}

现代写法非常简洁。相比于传统写法,在不增加开销的前提下,隐藏了细节。

你可能会问,cx: &mut Context 去哪儿了?这个问题的答案,我们马上会提到。

现代中的传统

有时候我们会发现,库中的某个函数使用了传统写法,而我们需要在 async {} 中调用它。这种情况很常见。只需使用 poll_fn

// use futures::future::poll_fn;
use std::future::poll_fn; // stabilized in 1.64 (#99306)
let v = poll_fn(|cx| Pin::new(&mut file).poll_read(cx)).await;

查看 poll_fn 的实现,我们发现它返回了一个 PollFn。这个对象之所以可以被 await,是因为它实现了 Future trait,提供了 fn poll()。若要在 async {} 中使用 fn poll_read(),就需要将它包裹一层,以返回一个实现了 Future 的对象:

struct ReadFuture<'a>(&'a mut File);
impl<'a> Future for ReadFuture<'a> {
    type Output = Vec<u8>;
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        // 在 `poll` 中调用 `poll_read`
        Pin::new(&mut *self.get_mut().0).poll_read(cx)
    }
}
let v = ReadFuture(&mut file).await; // 调用了 `poll`

也许你已经猜到了,cxasync {} 中是隐式存在的。当 .await 时,cx 会被传递给 ReadFuture::poll(self, cx),又传递给 File::poll_read(self, cx)

现代写法的局限性

现代写法简洁有力,却在很多情况下无法使用。最常见的便是:

trait AsyncRead {
    async fn read(&self) -> Vec<u8>;
}
impl AsyncRead for File {
    async fn read(&self) -> Vec<u8> { todo!() }
}

尝试编译,会得到错误 E0706,并建议使用 async-trait 这个 crate。

至于为什么目前难以在 trait 中声明 async fn这篇文章 中给出了详细解释。如果你不想学那么多鬼东西,也至少需要阅读 async-trait 文档中的解释,了解一下宏展开后是什么样的。

一般来说,async fn 是零开销的,使用 async-trait 却并不是零开销的,它会导致不必要的堆分配。这也是为什么目前许多库中依然存在传统的,返回 Pollfn poll_sth 写法,例如 tokio::net::TcpStream::poll_read

传统中的现代

现在假设你已经知道了 async-trait 展开后的样子。

于是我们可能会碰到一些问题,比如想给自己的 struct 实现某个带有 poll_sth 函数的 trait。这里以 futures::stream::Stream 为例。

我们会需要实现:

struct File;
impl File {
    async fn read(&self) -> Option<Vec<u8>> {
        tokio::time::sleep(Duration::from_millis(500)).await;
        Some(vec![1, 2, 3])
    }
}
struct FileStream;
impl Stream for FileStream {
    type Item = Vec<u8>;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        // 在这里调用 File::read
    }
}

File::read 模拟异步读取文件,每次读取一点内容,文件结束时返回 None。这里为了简化代码,它是永远读不完的。

现在让我们创建 FileStream 并实现 trait。poll_next 需要不断地调用 async 函数 File::read 来读取内容。这应该不太难,对么?毕竟前边说过, async 只是语法糖。所以我们这么写:

struct FileStream {
    file: File,
    fut: Pin<Box<dyn Future<Output = Option<Vec<u8>>> + Send>>,
}
impl Stream for FileStream {
    type Item = Vec<u8>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let v = ready!(Pin::new(&mut self.fut).poll(cx));
        self.fut = Box::pin(self.file.read()); // error!
        // cannot borrow `self` as mutable because it is also borrowed as immutable
        Poll::Ready(v)
    }
}

获取到 Ready 后,我们肯定要更新 self.fut。于是我们顺理成章地得到了很经典的 E0502

解决的思路很简单,File 也放进 Future::Output 里边就可以了。我们每次得到 Readyself.fut 之后,将其中的 File 包进下一个 Future 中。

type FileStreamFuture = Pin<Box<dyn Future<Output = (Option<Vec<u8>>, File)> + Send>>;
struct FileStream(FileStreamFuture);
// 在 poll_next 中:
let (v, file) = ready!(self.0.as_mut().poll(cx));
self.0 = Box::pin(async { (file.read().await, file) });

可以看到,我们现在将 File::read 的返回值与 File 本身放在了一起,这就避免了同时持有可变和不可变借用。

完整的可运行的代码(点击展开)
#![feature(future_poll_fn)] // stabilized in 1.64 (#99306)
use futures_core::{ready, Stream};
use std::future::{poll_fn, Future};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
struct File;
impl File {
    async fn read(&self) -> Option<Vec<u8>> {
        tokio::time::sleep(Duration::from_millis(500)).await;
        Some(vec![1, 2, 3])
    }
}
type FileStreamFuture = Pin<Box<dyn Future<Output = (Option<Vec<u8>>, File)> + Send>>;
struct FileStream(FileStreamFuture);
impl FileStream {
    fn make_future(file: File) -> FileStreamFuture {
        Box::pin(async { (file.read().await, file) })
    }
    fn new(file: File) -> Self {
        Self(Self::make_future(file))
    }
}
impl Stream for FileStream {
    type Item = Vec<u8>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let (v, file) = ready!(self.0.as_mut().poll(cx));
        self.0 = Self::make_future(file);
        Poll::Ready(v)
    }
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
    let mut file_stream = FileStream::new(File);
    while let Some(v) = poll_fn(|cx| Pin::new(&mut file_stream).poll_next(cx)).await {
        println!("{:?}", v);
    }
}

真麻烦。要不,把 File::read 也改成 poll_read 得了?哦,一切都退化到了三年前,这写起来太让人难受了,特别是如果要调用的 async fn 里又调用了别的 async fn,这时候需要修改的可能不仅仅是一个函数,而是所有被直接和间接调用的,甚至第三方 crate 里的函数,这几乎是不可能的。这就是著名的 Colored Function 问题的一个体现。

async fn 返回的是 opaque type,这里为了方便,使用了 tarit object。这是可以避免的,但已经超出了本文的主题。如果你感兴趣,可以看看 一种常见的做法

常见的错误写法

上一节中的写法看起来似乎有点繁琐,如果你足够“聪明”,可能会想到如下的所谓“简化写法”:

听从编译器的馊主意

直接在 poll_next 中写下 file.read().poll(cx),编译器会一步一步提示,需要套上 Box 以存储体积未知的对象,没有找到 fn poll() 但在 Pin<&mut> 中找到了…最后你会写出这个东西:

struct FileStream<'a>(Pin<&'a mut File>);
impl<'a> Stream for FileStream<'a> {
    type Item = Vec<u8>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        ready!(Box::pin(self.0.as_mut().read()).as_mut().poll(cx))
    }
}

运行试试。欸,在输出了一次之后,它不动了。为什么?

显然,Context 中的 Waker 被传递给 File::read 返回的 Future,又被传递给 tokio::time::sleepFuture。当定时器到期后,它理应唤醒调度器。

但我们可以看到,在这种写法下,调用 File::read 后返回的 Future,在 poll_next 函数结束后,连带着里面的定时器 Future 和它所持有的 Waker 一同被 Drop 了。既然 Waker 被 Drop 了,调度器就不再能被主动唤醒。这种写法是错误的。在获取到值之前,必须在一直持有 Future

自信地使用 unsafe

当我们尝试更新 self.fut 而碰上了 E0502 错误时,看起来,这似乎没什么问题,是编译器太傻了。这是 struct 里的两个字段,内存也不重叠。于是我们直接诉诸暴力,使用 unsafe

let file = unsafe { std::ptr::addr_of_mut!(self.file).as_mut().unwrap() };
self.fut = Box::pin(file.read());

(顺便在旁边自信地写上注释:// safety: two fields was not overlapped

通过编译了,运行,看起来不错。但如果将此程序改写,真的应用到了真实的文件或网络请求上后,运行一阵子,会发现,哦,core dumped

这是因为:如果 FileStream 被 Drop 了,那么 File 也会被 Drop,在这之后如果调度器被意外唤醒,程序就会尝试读取已经被 Drop 掉的 File,导致 use-after-free。

参考文献

  1. https://users.rust-lang.org/t/desugaring-async-fn/63698

  2. https://aturon.github.io/blog/2016/08/11/futures/

  3. https://docs.rs/tokio-stream/0.1.9/tokio_stream/wrappers/struct.BroadcastStream.html