高级异步编程
在 Rust 精通篇中,我们将深入探索 Rust 的高级异步编程技术。Rust 的异步编程模型基于 Future 特征和异步运行时,提供了高效的非阻塞 I/O 和并发处理能力。在本章中,我们将超越基础知识,探索如何构建高性能异步系统和自定义执行器。
异步编程回顾
在深入高级主题之前,让我们简要回顾 Rust 的异步编程模型:
use std::time::Duration;
use tokio::time;
#[tokio::main]
async fn main() {
// 创建两个异步任务
let task1 = async {
time::sleep(Duration::from_millis(100)).await;
println!("任务 1 完成");
1
};
let task2 = async {
time::sleep(Duration::from_millis(50)).await;
println!("任务 2 完成");
2
};
// 并发执行两个任务
let (result1, result2) = tokio::join!(task1, task2);
println!("结果: {} + {} = {}", result1, result2, result1 + result2);
}
Rust 的异步编程基于以下核心概念:
- Future 特征:表示可能尚未完成的计算
- async/await 语法:简化异步代码的编写
- 异步运行时:如 Tokio、async-std 等,负责执行和调度异步任务
- 任务(Task):可独立调度的异步执行单元
Future 深入理解
Future 特征的内部机制
Future
特征是 Rust 异步编程的核心:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
poll
方法是 Future 的核心,它尝试将异步计算推进到完成状态:
- 如果 Future 已完成,返回
Poll::Ready(result)
- 如果 Future 尚未完成,返回
Poll::Pending
并安排在事件发生时重新调用poll
手动实现 Future
下面是一个简单的 Future 实现示例:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if Instant::now() >= self.when {
println!("Future 已完成");
Poll::Ready(())
} else {
// 安排在未来某个时刻重新调用 poll
let waker = cx.waker().clone();
let when = self.when;
std::thread::spawn(move || {
let now = Instant::now();
if now < when {
std::thread::sleep(when - now);
}
waker.wake();
});
println!("Future 尚未完成");
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let future = Delay {
when: Instant::now() + Duration::from_secs(1),
};
println!("等待 Future 完成...");
future.await;
println!("主函数结束");
}
Pin 和 Unpin
Pin
类型在异步 Rust 中至关重要,它防止自引用结构在内存中被移动:
use std::marker::PhantomPinned;
use std::pin::Pin;
// 自引用结构体
struct SelfReferential {
data: String,
// 指向 data 字段的指针
ptr_to_data: *const String,
// 标记此类型不能安全地实现 Unpin
_marker: PhantomPinned,
}
impl SelfReferential {
fn new(data: String) -> Pin<Box<Self>> {
let mut boxed = Box::new(SelfReferential {
data,
ptr_to_data: std::ptr::null(),
_marker: PhantomPinned,
});
let ptr = &boxed.data as *const String;
boxed.ptr_to_data = ptr;
// 将 Box 转换为 Pin<Box>
Pin::new(boxed)
}
fn get_data(self: Pin<&Self>) -> &str {
// 安全:数据不会被移动,因为它被固定了
let self_ref = unsafe { self.get_ref() };
&self_ref.data
}
fn get_ptr(self: Pin<&Self>) -> *const String {
self.ptr_to_data
}
}
fn main() {
let pinned = SelfReferential::new("hello".to_string());
// 验证指针确实指向数据
let data_ptr = &pinned.data as *const String;
let ptr = pinned.as_ref().get_ptr();
println!("数据指针: {:?}", data_ptr);
println!("存储的指针: {:?}", ptr);
println!("数据: {}", pinned.as_ref().get_data());
assert_eq!(data_ptr, ptr);
}
异步运行时深入剖析
执行器(Executor)工作原理
异步执行器负责调度和运行 Future,下面是一个简单执行器的实现:
use futures::future::BoxFuture;
use futures::task::{waker_ref, ArcWake};
use futures::Future;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Sender, Receiver};
use std::task::{Context, Poll};
// 任务结构,包含一个 Future
struct Task {
future: Mutex<Option<BoxFuture<'static, ()>>>,
sender: Sender<Arc<Task>>,
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// 将自己发送到任务队列,以便重新执行
let cloned = arc_self.clone();
arc_self.sender.send(cloned).expect("任务队列已满");
}
}
// 简单的执行器
struct Executor {
sender: Sender<Arc<Task>>,
receiver: Receiver<Arc<Task>>,
}
impl Executor {
fn new() -> Self {
let (sender, receiver) = channel();
Executor { sender, receiver }
}
// 生成新任务
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let task = Arc::new(Task {
future: Mutex::new(Some(Box::pin(future))),
sender: self.sender.clone(),
});
self.sender.send(task).expect("任务队列已满");
}
// 运行执行器
fn run(&self) {
while let Ok(task) = self.receiver.recv() {
// 创建 waker 和上下文
let waker = waker_ref(&task);
let mut context = Context::from_waker(&waker);
// 尝试推进 Future
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
match Future::poll(Pin::new(&mut future), &mut context) {
Poll::Pending => {
// Future 尚未完成,放回任务中
*future_slot = Some(future);
}
Poll::Ready(()) => {
// Future 已完成,丢弃它
// 不需要放回 future_slot
}
}
}
}
}
}
fn main() {
let executor = Executor::new();
// 生成一些任务
executor.spawn(async {
println!("任务 1 开始");
// 模拟异步操作
futures::future::ready(()).await;
println!("任务 1 完成");
});
executor.spawn(async {
println!("任务 2 开始");
// 模拟异步操作
futures::future::ready(()).await;
println!("任务 2 完成");
});
// 运行执行器
executor.run();
}
事件循环与反应器(Reactor)
完整的异步运行时通常包含执行器和反应器两部分:
- 执行器:负责调度和运行 Future
- 反应器:负责监听 I/O 事件并唤醒相关任务
下面是一个简化的反应器示例:
use mio::{Events, Interest, Poll, Token};
use mio::net::TcpListener;
use std::collections::HashMap;
use std::io;
use std::sync::{Arc, Mutex};
use std::task::Waker;
// 简化的反应器
struct Reactor {
poll: Poll,
wakers: Mutex<HashMap<Token, Waker>>,
}
impl Reactor {
fn new() -> io::Result<Self> {
Ok(Reactor {
poll: Poll::new()?,
wakers: Mutex::new(HashMap::new()),
})
}
// 注册 I/O 资源和唤醒器
fn register(&self, source: &mut TcpListener, token: Token, waker: Waker) -> io::Result<()> {
self.poll.registry().register(source, token, Interest::READABLE)?;
self.wakers.lock().unwrap().insert(token, waker);
Ok(())
}
// 运行一次事件循环
fn run_once(&self) -> io::Result<()> {
let mut events = Events::with_capacity(1024);
self.poll.poll(&mut events, None)?;
for event in events.iter() {
if let Some(waker) = self.wakers.lock().unwrap().get(&event.token()) {
waker.wake_by_ref();
}
}
Ok(())
}
}
高级异步模式
流(Stream)处理
Stream
特征类似于 Future
,但可以产生多个值:
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
// 创建一个简单的流
let mut stream = stream::iter(1..=5);
// 使用 next 方法逐个处理流中的元素
while let Some(value) = stream.next().await {
println!("值: {}", value);
}
// 使用组合子处理流
let sum = stream::iter(1..=10)
.map(|x| x * 2)
.filter(|x| futures::future::ready(*x % 3 == 0))
.fold(0, |acc, x| async move { acc + x })
.await;
println!("总和: {}", sum);
}
并发控制模式
信号量(Semaphore)
信号量用于限制并发任务数量:
use std::time::Duration;
use tokio::sync::Semaphore;
use tokio::time;
#[tokio::main]
async fn main() {
// 创建一个容量为 3 的信号量
let semaphore = Arc::new(Semaphore::new(3));
let mut handles = vec![];
for id in 0..8 {
let semaphore = Arc::clone(&semaphore);
let handle = tokio::spawn(async move {