Rust从入门到精通之精通篇:24.高级异步编程

时间:2025-03-28 07:40:58

高级异步编程

在 Rust 精通篇中,我们将深入探索 Rust 的高级异步编程技术。Rust 的异步编程模型基于 Future 特征和异步运行时,提供了高效的非阻塞 I/O 和并发处理能力。在本章中,我们将超越基础知识,探索如何构建高性能异步系统和自定义执行器。

异步编程回顾

在深入高级主题之前,让我们简要回顾 Rust 的异步编程模型:

use std::time::Duration;
use tokio::time;

#[tokio::main]
async fn main() {
    // 创建两个异步任务
    let task1 = async {
        time::sleep(Duration::from_millis(100)).await;
        println!("任务 1 完成");
        1
    };
    
    let task2 = async {
        time::sleep(Duration::from_millis(50)).await;
        println!("任务 2 完成");
        2
    };
    
    // 并发执行两个任务
    let (result1, result2) = tokio::join!(task1, task2);
    println!("结果: {} + {} = {}", result1, result2, result1 + result2);
}

Rust 的异步编程基于以下核心概念:

  1. Future 特征:表示可能尚未完成的计算
  2. async/await 语法:简化异步代码的编写
  3. 异步运行时:如 Tokio、async-std 等,负责执行和调度异步任务
  4. 任务(Task):可独立调度的异步执行单元

Future 深入理解

Future 特征的内部机制

Future 特征是 Rust 异步编程的核心:

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

poll 方法是 Future 的核心,它尝试将异步计算推进到完成状态:

  • 如果 Future 已完成,返回 Poll::Ready(result)
  • 如果 Future 尚未完成,返回 Poll::Pending 并安排在事件发生时重新调用 poll

手动实现 Future

下面是一个简单的 Future 实现示例:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Delay {
    when: Instant,
}

impl Future for Delay {
    type Output = ();
    
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if Instant::now() >= self.when {
            println!("Future 已完成");
            Poll::Ready(())
        } else {
            // 安排在未来某个时刻重新调用 poll
            let waker = cx.waker().clone();
            let when = self.when;
            
            std::thread::spawn(move || {
                let now = Instant::now();
                if now < when {
                    std::thread::sleep(when - now);
                }
                waker.wake();
            });
            
            println!("Future 尚未完成");
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let future = Delay {
        when: Instant::now() + Duration::from_secs(1),
    };
    
    println!("等待 Future 完成...");
    future.await;
    println!("主函数结束");
}

Pin 和 Unpin

Pin 类型在异步 Rust 中至关重要,它防止自引用结构在内存中被移动:

use std::marker::PhantomPinned;
use std::pin::Pin;

// 自引用结构体
struct SelfReferential {
    data: String,
    // 指向 data 字段的指针
    ptr_to_data: *const String,
    // 标记此类型不能安全地实现 Unpin
    _marker: PhantomPinned,
}

impl SelfReferential {
    fn new(data: String) -> Pin<Box<Self>> {
        let mut boxed = Box::new(SelfReferential {
            data,
            ptr_to_data: std::ptr::null(),
            _marker: PhantomPinned,
        });
        
        let ptr = &boxed.data as *const String;
        boxed.ptr_to_data = ptr;
        
        // 将 Box 转换为 Pin<Box>
        Pin::new(boxed)
    }
    
    fn get_data(self: Pin<&Self>) -> &str {
        // 安全:数据不会被移动,因为它被固定了
        let self_ref = unsafe { self.get_ref() };
        &self_ref.data
    }
    
    fn get_ptr(self: Pin<&Self>) -> *const String {
        self.ptr_to_data
    }
}

fn main() {
    let pinned = SelfReferential::new("hello".to_string());
    
    // 验证指针确实指向数据
    let data_ptr = &pinned.data as *const String;
    let ptr = pinned.as_ref().get_ptr();
    
    println!("数据指针: {:?}", data_ptr);
    println!("存储的指针: {:?}", ptr);
    println!("数据: {}", pinned.as_ref().get_data());
    
    assert_eq!(data_ptr, ptr);
}

异步运行时深入剖析

执行器(Executor)工作原理

异步执行器负责调度和运行 Future,下面是一个简单执行器的实现:

use futures::future::BoxFuture;
use futures::task::{waker_ref, ArcWake};
use futures::Future;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Sender, Receiver};
use std::task::{Context, Poll};

// 任务结构,包含一个 Future
struct Task {
    future: Mutex<Option<BoxFuture<'static, ()>>>,
    sender: Sender<Arc<Task>>,
}

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // 将自己发送到任务队列,以便重新执行
        let cloned = arc_self.clone();
        arc_self.sender.send(cloned).expect("任务队列已满");
    }
}

// 简单的执行器
struct Executor {
    sender: Sender<Arc<Task>>,
    receiver: Receiver<Arc<Task>>,
}

impl Executor {
    fn new() -> Self {
        let (sender, receiver) = channel();
        Executor { sender, receiver }
    }
    
    // 生成新任务
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let task = Arc::new(Task {
            future: Mutex::new(Some(Box::pin(future))),
            sender: self.sender.clone(),
        });
        
        self.sender.send(task).expect("任务队列已满");
    }
    
    // 运行执行器
    fn run(&self) {
        while let Ok(task) = self.receiver.recv() {
            // 创建 waker 和上下文
            let waker = waker_ref(&task);
            let mut context = Context::from_waker(&waker);
            
            // 尝试推进 Future
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                match Future::poll(Pin::new(&mut future), &mut context) {
                    Poll::Pending => {
                        // Future 尚未完成,放回任务中
                        *future_slot = Some(future);
                    }
                    Poll::Ready(()) => {
                        // Future 已完成,丢弃它
                        // 不需要放回 future_slot
                    }
                }
            }
        }
    }
}

fn main() {
    let executor = Executor::new();
    
    // 生成一些任务
    executor.spawn(async {
        println!("任务 1 开始");
        // 模拟异步操作
        futures::future::ready(()).await;
        println!("任务 1 完成");
    });
    
    executor.spawn(async {
        println!("任务 2 开始");
        // 模拟异步操作
        futures::future::ready(()).await;
        println!("任务 2 完成");
    });
    
    // 运行执行器
    executor.run();
}

事件循环与反应器(Reactor)

完整的异步运行时通常包含执行器和反应器两部分:

  • 执行器:负责调度和运行 Future
  • 反应器:负责监听 I/O 事件并唤醒相关任务

下面是一个简化的反应器示例:

use mio::{Events, Interest, Poll, Token};
use mio::net::TcpListener;
use std::collections::HashMap;
use std::io;
use std::sync::{Arc, Mutex};
use std::task::Waker;

// 简化的反应器
struct Reactor {
    poll: Poll,
    wakers: Mutex<HashMap<Token, Waker>>,
}

impl Reactor {
    fn new() -> io::Result<Self> {
        Ok(Reactor {
            poll: Poll::new()?,
            wakers: Mutex::new(HashMap::new()),
        })
    }
    
    // 注册 I/O 资源和唤醒器
    fn register(&self, source: &mut TcpListener, token: Token, waker: Waker) -> io::Result<()> {
        self.poll.registry().register(source, token, Interest::READABLE)?;
        self.wakers.lock().unwrap().insert(token, waker);
        Ok(())
    }
    
    // 运行一次事件循环
    fn run_once(&self) -> io::Result<()> {
        let mut events = Events::with_capacity(1024);
        self.poll.poll(&mut events, None)?;
        
        for event in events.iter() {
            if let Some(waker) = self.wakers.lock().unwrap().get(&event.token()) {
                waker.wake_by_ref();
            }
        }
        
        Ok(())
    }
}

高级异步模式

流(Stream)处理

Stream 特征类似于 Future,但可以产生多个值:

use futures::stream::{self, StreamExt};

#[tokio::main]
async fn main() {
    // 创建一个简单的流
    let mut stream = stream::iter(1..=5);
    
    // 使用 next 方法逐个处理流中的元素
    while let Some(value) = stream.next().await {
        println!("值: {}", value);
    }
    
    // 使用组合子处理流
    let sum = stream::iter(1..=10)
        .map(|x| x * 2)
        .filter(|x| futures::future::ready(*x % 3 == 0))
        .fold(0, |acc, x| async move { acc + x })
        .await;
    
    println!("总和: {}", sum);
}

并发控制模式

信号量(Semaphore)

信号量用于限制并发任务数量:

use std::time::Duration;
use tokio::sync::Semaphore;
use tokio::time;

#[tokio::main]
async fn main() {
    // 创建一个容量为 3 的信号量
    let semaphore = Arc::new(Semaphore::new(3));
    let mut handles = vec![];
    
    for id in 0..8 {
        let semaphore = Arc::clone(&semaphore);
        let handle = tokio::spawn(async move {