
本文共 4825 字,大约阅读时间需要 16 分钟。
实现Futures——主要例子
我们将通过创建一个伪 Reactor 和一个简单的执行器,来实现我们自己的 Futures 系统。这个系统允许你在浏览器中编辑和运行代码。以下将介绍该示例的主要实现细节。
从引入依赖开始
为了实现 Futures,我们需要引入一些标准库和非标准库。这将帮助我们创建一个功能强大的执行器:
use std::{future::Future, pin::Pin, sync::{mpsc::{channel, Sender}, Arc, Mutex}, task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, thread::{self, JoinHandle}, time::{Duration, Instant}};
执行器的设计与实现
执行器的主要职责是获取一个或多个 Futures,并运行它们到完成。为了实现这一点,执行器需要轮询 Futures 的状态,并根据轮询结果进行相应处理。
轮询 Futures 的状态
在轮询 Futures 时,可能会遇到以下几种情况:
Rust 提供了 Waker
来实现 Reactor 和执行器之间的通信。Reactors 存储这个 Waker
,并在 Futures 等待的事件完成时调用 Waker: wake()
,从而唤醒 Futures。
伪 Reactor 的实现
我们的 Reactor 将作为一个伪实现,用于模拟 I/O 交互。这将帮助我们在浏览器中运行代码,而不会产生实际的 I/O 延迟。Reactors 通常用于处理异步 I/O 操作,但在本例中,我们将其简化为处理超时事件。
Reactors 的主要功能包括:
- 监听事件(如超时)。
- 调度 Futures。
- 唤醒已经完成的 Futures。
以下是一个伪 Reactor 的结构:
#[derive(Debug)]enum Event { Close, Timeout(u64, usize),}struct Reactor { dispatcher: mpsc::Sender, handle: Option , tasks: HashMap< usize, TaskState >,}impl Reactor { fn new() -> Arc >> { let (tx, rx) = channel(); Reactor { dispatcher: tx, handle: None, tasks: HashMap::new() } let reactor = Arc::new(Mutex::new(Box::new(Reactor { dispatcher: tx, handle: None, tasks: HashMap::new() }))); Reactor::new_clone(&reactor) } fn new_clone(reactor: &Arc >>>) -> Arc >> { let clone = reactor.clone(); Clonevided from the original Reactor. return clone; } pub fn close(&mut self) { self.dispatcher.send(Event::Close).unwrap(); } pub fn is_ready(&self, id: usize) -> bool { self.tasks.get(&id).map(|state| { match state { TaskState::Ready => true, _ => false, } }).unwrap_or(false) } pub fn wake(&mut self, id: usize) { if let Some(state) = self.tasks.get_mut(&id) { match mem::replace(&mut state, TaskState::Ready) { TaskState::NotReady(waker) => waker.wake(), _ => (), } } } pub fn register(&mut self, duration: u64, waker: Waker, id: usize) { if self.tasks.insert(id, TaskState::NotReady(waker)).is_NONE() { self.dispatcher.send(Event::Timeout(duration, id)).unwrap(); } } }
实现自己的 Futures
为了实现自己的 Futures,我们需要定义一个 Task
结构体,其派生自 Future
特раيت。Task 占用一个 ID 和一个 Reactor 引用,表示该任务感兴趣的 Reactor。这使得 Reactors 能够在任务完成时调用相应的唤醒器(Waker)。
#[derive(Clone)]struct MyWaker { parker: Arc,}impl MyWaker { fn wake(s: &MyWaker) { let waker_arc = unsafe { Arc::from_raw(s) }; waker_arc.parker.unpark(); } fn clone(s: &MyWaker) -> RawWaker { let raw_waker = RawWaker::new(s as *const MyWaker, &VTABLE); Waker::from_raw(raw_waker) }}const VTABLE: RawWakerVTable = unsafe { RawWakerVTable::new( |s: *const MyWaker| MyWaker::clone(s), |s: *const MyWaker| MyWaker::wake(s), // 这里可能需要定义更多函数... )};struct Task { id: usize, reactor: Arc >>, data: u64,}impl Task { pub fn new(reactor: Arc >>, duration: u64, id: usize) -> Self { Task { id, reactor, data: duration } } } impl Future for Task { type Output = usize; fn poll(self: Pin<&mut Task>, cx: &mut Context) -> Poll
exposing futures
以下是如何在浏览器中公开和运行这些 Futures 的实例:
use std::time::{Duration, Instant};fn main() { let start = Instant::now(); // 创建 Reactor let reactor = Reactor::new(); // 创建两个 Task Future let future1 = Task::new(reactor, 1, 1); let future2 = Task::new(reactor, 2, 2); // 定义两个 async 块来等待 futures let fut1 = async { let val = future1.await; let dur = (Instant::now() - start).as_secs_f32(); println!("Future 1 completed at time: {:.2}.", dur) }; let fut2 = async { let val = future2(await); let dur = (Instant::now() - start).as_secs_f32(); println!("Future 2 completed at time: {:.2}.", dur) }; // 等待所有 future 完成 let mainfut = async { fut1.await; fut2.await; }; // 执行者将阻塞在 mainfut 等待结果 block_on(mainfut); // 关闭 Reactor 线程 reactor.lock().map(|mut r| r.close()).unwrap();}
总结
在这个例子中,我们通过定义一个简化的 Reactor 和自己的 Futures 实现,展示了如何在 Rust 中实现异步编程。这个系统允许我们在多个任务之间传递和协调,并在 Reactor 引用完成后自动唤醒相关任务。
如果你对本例有更多问题或想知道更多细节,可以继续深入研究代码或查看下一章的内容。
发表评论
最新留言
关于作者
