异步 Rust 中传统与现代的碰撞

I18N: English | 简体中文

前置知识:async-booktokio 基础。

时间回溯到 2019 年,在 Rust 1.39 发布后,异步生态大量地转向了使用 async / .await 的现代写法。但受限于 Rust 严格而复杂的类型系统,许多功能时至今日依然需要传统写法才能完成,这使得我们有时候需要在传统与现代写法之间穿梭。本人经过考古探索,得到了一些提示与经验,希望能对读者有帮助。

传统与现代写法简述

我们都知道,最原始的异步是回调的形式,Future 是对回调的封装,而 asyncFuture 的语法糖1

现在假设我们要实现一个 File,用于异步读取硬盘上的文件。

传统写法:

struct File;
impl File {
    fn poll_read(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Vec<u8>> { todo!() }
}
fn run(file: Pin<&mut File>, cx: &mut Context) -> Poll<()> {
    let v = match file.poll_read(cx) {
        Poll::Ready(v) => v,
        Poll::Pending => return Poll::Pending,
    };
    println!("read finish");
    Poll::Ready(())
}

传统写法揭示了 Rust 中异步函数的本质:返回任务状态,绑定 Context 中的 Waker 以及时通知调度器。

现代写法:

struct File;
impl File {
    async fn read(&self) -> Vec<u8> { todo!() }
}
async fn run(file: File) {
    let v = file.read().await;
    println!("read finish");
}

现代写法非常简洁。相比于传统写法,在不增加开销的前提下,隐藏了细节。

你可能会问,cx: &mut Context 去哪儿了?这个问题的答案,我们马上会提到。

现代中的传统

有时候我们会发现,库中的某个函数使用了传统写法,而我们需要在 async {} 中调用它。这种情况很常见。只需使用 poll_fn

// use futures::future::poll_fn;
use std::future::poll_fn; // stabilized in 1.64 (#99306)
let v = poll_fn(|cx| Pin::new(&mut file).poll_read(cx)).await;

查看 poll_fn 的实现,我们发现它返回了一个 PollFn。这个对象之所以可以被 await,是因为它实现了 Future trait,提供了 fn poll()。若要在 async {} 中使用 fn poll_read(),就需要将它包裹一层,以返回一个实现了 Future 的对象:

struct ReadFuture<'a>(&'a mut File);
impl<'a> Future for ReadFuture<'a> {
    type Output = Vec<u8>;
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        // 在 `poll` 中调用 `poll_read`
        Pin::new(&mut *self.get_mut().0).poll_read(cx)
    }
}
let v = ReadFuture(&mut file).await; // 调用了 `poll`

也许你已经猜到了,cxasync {} 中是隐式存在的。当 .await 时,cx 会被传递给 ReadFuture::poll(self, cx),又传递给 File::poll_read(self, cx)

现代写法的局限性

现代写法简洁有力,却在很多情况下无法使用。最常见的便是:

trait AsyncRead {
    async fn read(&self) -> Vec<u8>;
}
impl AsyncRead for File {
    async fn read(&self) -> Vec<u8> { todo!() }
}

尝试编译,会得到错误 E0706,并建议使用 async-trait 这个 crate。

至于为什么目前难以在 trait 中声明 async fn这篇文章 中给出了详细解释。如果你不想学那么多鬼东西,也至少需要阅读 async-trait 文档中的解释,了解一下宏展开后是什么样的。

一般来说,async fn 是零开销的,使用 async-trait 却并不是零开销的,它会导致不必要的堆分配。这也是为什么目前许多库中依然存在传统的,返回 Pollfn poll_sth 写法,例如 tokio::net::TcpStream::poll_read

传统中的现代

现在假设你已经知道了 async-trait 展开后的样子。

于是我们可能会碰到一些问题,比如想给自己的 struct 实现某个带有 poll_sth 函数的 trait。这里以 futures::stream::Stream 为例。

我们会需要实现:

struct File;
impl File {
    async fn read(&self) -> Option<Vec<u8>> {
        tokio::time::sleep(Duration::from_millis(500)).await;
        Some(vec![1, 2, 3])
    }
}
struct FileStream;
impl Stream for FileStream {
    type Item = Vec<u8>;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        // 在这里调用 File::read
    }
}

File::read 模拟异步读取文件,每次读取一点内容,文件结束时返回 None。这里为了简化代码,它是永远读不完的。

现在让我们创建 FileStream 并实现 trait。poll_next 需要不断地调用 async 函数 File::read 来读取内容。这应该不太难,对么?毕竟前边说过, async 只是语法糖。所以我们这么写:

struct FileStream {
    file: File,
    fut: Pin<Box<dyn Future<Output = Option<Vec<u8>>> + Send>>,
}
impl Stream for FileStream {
    type Item = Vec<u8>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let v = ready!(Pin::new(&mut self.fut).poll(cx));
        self.fut = Box::pin(self.file.read()); // error!
        // cannot borrow `self` as mutable because it is also borrowed as immutable
        Poll::Ready(v)
    }
}

获取到 Ready 后,我们肯定要更新 self.fut。于是我们顺理成章地得到了很经典的 E0502

解决的思路很简单,File 也放进 Future::Output 里边就可以了。我们每次得到 Readyself.fut 之后,将其中的 File 包进下一个 Future 中。

type FileStreamFuture = Pin<Box<dyn Future<Output = (Option<Vec<u8>>, File)> + Send>>;
struct FileStream(FileStreamFuture);
// 在 poll_next 中:
let (v, file) = ready!(self.0.as_mut().poll(cx));
self.0 = Box::pin(async { (file.read().await, file) });

可以看到,我们现在将 File::read 的返回值与 File 本身放在了一起,这就避免了同时持有可变和不可变借用。

完整的可运行的代码(点击展开)
#![feature(future_poll_fn)] // stabilized in 1.64 (#99306)
use futures_core::{ready, Stream};
use std::future::{poll_fn, Future};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
struct File;
impl File {
    async fn read(&self) -> Option<Vec<u8>> {
        tokio::time::sleep(Duration::from_millis(500)).await;
        Some(vec![1, 2, 3])
    }
}
type FileStreamFuture = Pin<Box<dyn Future<Output = (Option<Vec<u8>>, File)> + Send>>;
struct FileStream(FileStreamFuture);
impl FileStream {
    fn make_future(file: File) -> FileStreamFuture {
        Box::pin(async { (file.read().await, file) })
    }
    fn new(file: File) -> Self {
        Self(Self::make_future(file))
    }
}
impl Stream for FileStream {
    type Item = Vec<u8>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let (v, file) = ready!(self.0.as_mut().poll(cx));
        self.0 = Self::make_future(file);
        Poll::Ready(v)
    }
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
    let mut file_stream = FileStream::new(File);
    while let Some(v) = poll_fn(|cx| Pin::new(&mut file_stream).poll_next(cx)).await {
        println!("{:?}", v);
    }
}

真麻烦。要不,把 File::read 也改成 poll_read 得了?哦,一切都退化到了三年前,这写起来太让人难受了,特别是如果要调用的 async fn 里又调用了别的 async fn,这时候需要修改的可能不仅仅是一个函数,而是所有被直接和间接调用的,甚至第三方 crate 里的函数,这几乎是不可能的。这就是著名的 Colored Function 问题的一个体现。

async fn 返回的是 opaque type,这里为了方便,使用了 tarit object。这是可以避免的,但已经超出了本文的主题。如果你感兴趣,可以看看 一种常见的做法

常见的错误写法

上一节中的写法看起来似乎有点繁琐,如果你足够“聪明”,可能会想到如下的所谓“简化写法”:

听从编译器的馊主意

直接在 poll_next 中写下 file.read().poll(cx),编译器会一步一步提示,需要套上 Box 以存储体积未知的对象,没有找到 fn poll() 但在 Pin<&mut> 中找到了…最后你会写出这个东西:

struct FileStream<'a>(Pin<&'a mut File>);
impl<'a> Stream for FileStream<'a> {
    type Item = Vec<u8>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        ready!(Box::pin(self.0.as_mut().read()).as_mut().poll(cx))
    }
}

运行试试。欸,在输出了一次之后,它不动了。为什么?

显然,Context 中的 Waker 被传递给 File::read 返回的 Future,又被传递给 tokio::time::sleepFuture。当定时器到期后,它理应唤醒调度器。

但我们可以看到,在这种写法下,调用 File::read 后返回的 Future,在 poll_next 函数结束后,连带着里面的定时器 Future 和它所持有的 Waker 一同被 Drop 了。既然 Waker 被 Drop 了,调度器就不再能被主动唤醒。这种写法是错误的。在获取到值之前,必须在一直持有 Future

自信地使用 unsafe

当我们尝试更新 self.fut 而碰上了 E0502 错误时,看起来,这似乎没什么问题,是编译器太傻了。这是 struct 里的两个字段,内存也不重叠。于是我们直接诉诸暴力,使用 unsafe

let file = unsafe { std::ptr::addr_of_mut!(self.file).as_mut().unwrap() };
self.fut = Box::pin(file.read());

(顺便在旁边自信地写上注释:// safety: two fields was not overlapped

通过编译了,运行,看起来不错。但如果将此程序改写,真的应用到了真实的文件或网络请求上后,运行一阵子,会发现,哦,core dumped

这是因为:如果 FileStream 被 Drop 了,那么 File 也会被 Drop,在这之后如果调度器被意外唤醒,程序就会尝试读取已经被 Drop 掉的 File,导致 use-after-free。

参考文献

  1. https://users.rust-lang.org/t/desugaring-async-fn/63698

  2. https://aturon.github.io/blog/2016/08/11/futures/

  3. https://docs.rs/tokio-stream/0.1.9/tokio_stream/wrappers/struct.BroadcastStream.html