异步 Rust 中传统与现代的碰撞
I18N: English | 简体中文
前置知识:async-book 和 tokio 基础。
时间回溯到 2019 年,在 Rust 1.39 发布后,异步生态大量地转向了使用 async / .await
的现代写法。但受限于 Rust 严格而复杂的类型系统,许多功能时至今日依然需要传统写法才能完成,这使得我们有时候需要在传统与现代写法之间穿梭。本人经过考古探索,得到了一些提示与经验,希望能对读者有帮助。
传统与现代写法简述
我们都知道,最原始的异步是回调的形式,Future
是对回调的封装,而 async
是 Future
的语法糖1。
现在假设我们要实现一个 File
,用于异步读取硬盘上的文件。
传统写法:
struct File;
impl File {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Vec<u8>> { todo!() }
}
fn run(file: Pin<&mut File>, cx: &mut Context) -> Poll<()> {
let v = match file.poll_read(cx) {
Poll::Ready(v) => v,
Poll::Pending => return Poll::Pending,
};
println!("read finish");
Poll::Ready(())
}
传统写法揭示了 Rust 中异步函数的本质:返回任务状态,绑定 Context
中的 Waker
以及时通知调度器。
现代写法:
struct File;
impl File {
async fn read(&self) -> Vec<u8> { todo!() }
}
async fn run(file: File) {
let v = file.read().await;
println!("read finish");
}
现代写法非常简洁。相比于传统写法,在不增加开销的前提下,隐藏了细节。
你可能会问,cx: &mut Context
去哪儿了?这个问题的答案,我们马上会提到。
现代中的传统
有时候我们会发现,库中的某个函数使用了传统写法,而我们需要在 async {}
中调用它。这种情况很常见。只需使用 poll_fn
:
// use futures::future::poll_fn;
use std::future::poll_fn; // stabilized in 1.64 (#99306)
let v = poll_fn(|cx| Pin::new(&mut file).poll_read(cx)).await;
查看 poll_fn
的实现,我们发现它返回了一个 PollFn
。这个对象之所以可以被 await
,是因为它实现了 Future
trait,提供了 fn poll()
。若要在 async {}
中使用 fn poll_read()
,就需要将它包裹一层,以返回一个实现了 Future
的对象:
struct ReadFuture<'a>(&'a mut File);
impl<'a> Future for ReadFuture<'a> {
type Output = Vec<u8>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// 在 `poll` 中调用 `poll_read`
Pin::new(&mut *self.get_mut().0).poll_read(cx)
}
}
let v = ReadFuture(&mut file).await; // 调用了 `poll`
也许你已经猜到了,cx
在 async {}
中是隐式存在的。当 .await
时,cx
会被传递给 ReadFuture::poll(self, cx)
,又传递给 File::poll_read(self, cx)
。
现代写法的局限性
现代写法简洁有力,却在很多情况下无法使用。最常见的便是:
trait AsyncRead {
async fn read(&self) -> Vec<u8>;
}
impl AsyncRead for File {
async fn read(&self) -> Vec<u8> { todo!() }
}
尝试编译,会得到错误 E0706,并建议使用 async-trait 这个 crate。
至于为什么目前难以在 trait
中声明 async fn
,这篇文章 中给出了详细解释。如果你不想学那么多鬼东西,也至少需要阅读 async-trait 文档中的解释,了解一下宏展开后是什么样的。
一般来说,async fn
是零开销的,使用 async-trait
却并不是零开销的,它会导致不必要的堆分配。这也是为什么目前许多库中依然存在传统的,返回 Poll
的 fn poll_sth
写法,例如 tokio::net::TcpStream::poll_read
。
传统中的现代
现在假设你已经知道了 async-trait 展开后的样子。
于是我们可能会碰到一些问题,比如想给自己的 struct 实现某个带有 poll_sth
函数的 trait。这里以 futures::stream::Stream
为例。
我们会需要实现:
struct File;
impl File {
async fn read(&self) -> Option<Vec<u8>> {
tokio::time::sleep(Duration::from_millis(500)).await;
Some(vec![1, 2, 3])
}
}
struct FileStream;
impl Stream for FileStream {
type Item = Vec<u8>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
// 在这里调用 File::read
}
}
File::read
模拟异步读取文件,每次读取一点内容,文件结束时返回 None
。这里为了简化代码,它是永远读不完的。
现在让我们创建 FileStream
并实现 trait。poll_next
需要不断地调用 async 函数 File::read
来读取内容。这应该不太难,对么?毕竟前边说过, async 只是语法糖。所以我们这么写:
struct FileStream {
file: File,
fut: Pin<Box<dyn Future<Output = Option<Vec<u8>>> + Send>>,
}
impl Stream for FileStream {
type Item = Vec<u8>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let v = ready!(Pin::new(&mut self.fut).poll(cx));
self.fut = Box::pin(self.file.read()); // error!
// cannot borrow `self` as mutable because it is also borrowed as immutable
Poll::Ready(v)
}
}
获取到 Ready
后,我们肯定要更新 self.fut
。于是我们顺理成章地得到了很经典的 E0502。
解决的思路很简单,把 File
也放进 Future::Output
里边就可以了。我们每次得到 Ready
的 self.fut
之后,将其中的 File
包进下一个 Future
中。
type FileStreamFuture = Pin<Box<dyn Future<Output = (Option<Vec<u8>>, File)> + Send>>;
struct FileStream(FileStreamFuture);
// 在 poll_next 中:
let (v, file) = ready!(self.0.as_mut().poll(cx));
self.0 = Box::pin(async { (file.read().await, file) });
可以看到,我们现在将 File::read
的返回值与 File
本身放在了一起,这就避免了同时持有可变和不可变借用。
完整的可运行的代码(点击展开)
#![feature(future_poll_fn)] // stabilized in 1.64 (#99306)
use futures_core::{ready, Stream};
use std::future::{poll_fn, Future};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
struct File;
impl File {
async fn read(&self) -> Option<Vec<u8>> {
tokio::time::sleep(Duration::from_millis(500)).await;
Some(vec![1, 2, 3])
}
}
type FileStreamFuture = Pin<Box<dyn Future<Output = (Option<Vec<u8>>, File)> + Send>>;
struct FileStream(FileStreamFuture);
impl FileStream {
fn make_future(file: File) -> FileStreamFuture {
Box::pin(async { (file.read().await, file) })
}
fn new(file: File) -> Self {
Self(Self::make_future(file))
}
}
impl Stream for FileStream {
type Item = Vec<u8>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let (v, file) = ready!(self.0.as_mut().poll(cx));
self.0 = Self::make_future(file);
Poll::Ready(v)
}
}
#[tokio::main(flavor = "current_thread")]
async fn main() {
let mut file_stream = FileStream::new(File);
while let Some(v) = poll_fn(|cx| Pin::new(&mut file_stream).poll_next(cx)).await {
println!("{:?}", v);
}
}
真麻烦。要不,把 File::read
也改成 poll_read
得了?哦,一切都退化到了三年前,这写起来太让人难受了,特别是如果要调用的 async fn 里又调用了别的 async fn,这时候需要修改的可能不仅仅是一个函数,而是所有被直接和间接调用的,甚至第三方 crate 里的函数,这几乎是不可能的。这就是著名的 Colored Function 问题的一个体现。
async fn 返回的是 opaque type,这里为了方便,使用了 tarit object。这是可以避免的,但已经超出了本文的主题。如果你感兴趣,可以看看 一种常见的做法。
常见的错误写法
上一节中的写法看起来似乎有点繁琐,如果你足够“聪明”,可能会想到如下的所谓“简化写法”:
听从编译器的馊主意
直接在 poll_next
中写下 file.read().poll(cx)
,编译器会一步一步提示,需要套上 Box
以存储体积未知的对象,没有找到 fn poll()
但在 Pin<&mut>
中找到了…最后你会写出这个东西:
struct FileStream<'a>(Pin<&'a mut File>);
impl<'a> Stream for FileStream<'a> {
type Item = Vec<u8>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
ready!(Box::pin(self.0.as_mut().read()).as_mut().poll(cx))
}
}
运行试试。欸,在输出了一次之后,它不动了。为什么?
显然,Context
中的 Waker
被传递给 File::read
返回的 Future
,又被传递给 tokio::time::sleep
的 Future
。当定时器到期后,它理应唤醒调度器。
但我们可以看到,在这种写法下,调用 File::read
后返回的 Future
,在 poll_next
函数结束后,连带着里面的定时器 Future
和它所持有的 Waker
一同被 Drop 了。既然 Waker
被 Drop 了,调度器就不再能被主动唤醒。这种写法是错误的。在获取到值之前,必须在一直持有 Future
。
自信地使用 unsafe
当我们尝试更新 self.fut
而碰上了 E0502 错误时,看起来,这似乎没什么问题,是编译器太傻了。这是 struct 里的两个字段,内存也不重叠。于是我们直接诉诸暴力,使用 unsafe
:
let file = unsafe { std::ptr::addr_of_mut!(self.file).as_mut().unwrap() };
self.fut = Box::pin(file.read());
(顺便在旁边自信地写上注释:// safety: two fields was not overlapped
)
通过编译了,运行,看起来不错。但如果将此程序改写,真的应用到了真实的文件或网络请求上后,运行一阵子,会发现,哦,core dumped
!
这是因为:如果 FileStream
被 Drop 了,那么 File
也会被 Drop,在这之后如果调度器被意外唤醒,程序就会尝试读取已经被 Drop 掉的 File
,导致 use-after-free。