【翻译】200行代码讲透RUST FUTURES (7)
发布日期:2021-05-20 06:56:31 浏览次数:26 分类:精选文章

本文共 4825 字,大约阅读时间需要 16 分钟。

实现Futures——主要例子

我们将通过创建一个伪 Reactor 和一个简单的执行器,来实现我们自己的 Futures 系统。这个系统允许你在浏览器中编辑和运行代码。以下将介绍该示例的主要实现细节。

从引入依赖开始

为了实现 Futures,我们需要引入一些标准库和非标准库。这将帮助我们创建一个功能强大的执行器:

use std::{future::Future, pin::Pin, sync::{mpsc::{channel, Sender}, Arc, Mutex}, 
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
thread::{self, JoinHandle}, time::{Duration, Instant}};

执行器的设计与实现

执行器的主要职责是获取一个或多个 Futures,并运行它们到完成。为了实现这一点,执行器需要轮询 Futures 的状态,并根据轮询结果进行相应处理。

轮询 Futures 的状态

在轮询 Futures 时,可能会遇到以下几种情况:

  • Ready(就绪):Futures 已经准备好了,可以继续执行。
  • Pending(等待中):Futures尚未被轮询过,因此需要传入一个 Waker 来挂起它。
  • Pending(等待中):Futures已经被轮询过,但仍处于等待状态。
  • Rust 提供了 Waker 来实现 Reactor 和执行器之间的通信。Reactors 存储这个 Waker,并在 Futures 等待的事件完成时调用 Waker: wake(),从而唤醒 Futures。

    伪 Reactor 的实现

    我们的 Reactor 将作为一个伪实现,用于模拟 I/O 交互。这将帮助我们在浏览器中运行代码,而不会产生实际的 I/O 延迟。Reactors 通常用于处理异步 I/O 操作,但在本例中,我们将其简化为处理超时事件。

    Reactors 的主要功能包括:

    • 监听事件(如超时)。
    • 调度 Futures。
    • 唤醒已经完成的 Futures。

    以下是一个伪 Reactor 的结构:

    #[derive(Debug)]
    enum Event {
    Close,
    Timeout(u64, usize),
    }
    struct Reactor {
    dispatcher: mpsc::Sender
    ,
    handle: Option
    ,
    tasks: HashMap< usize, TaskState >,
    }
    impl Reactor {
    fn new() -> Arc
    >> {
    let (tx, rx) = channel();
    Reactor {
    dispatcher: tx,
    handle: None,
    tasks: HashMap::new()
    }
    let reactor = Arc::new(Mutex::new(Box::new(Reactor { dispatcher: tx, handle: None, tasks: HashMap::new() })));
    Reactor::new_clone(&reactor)
    }
    fn new_clone(reactor: &Arc
    >>>) -> Arc
    >> { let clone = reactor.clone(); Clonevided from the original Reactor. return clone; } pub fn close(&mut self) { self.dispatcher.send(Event::Close).unwrap(); } pub fn is_ready(&self, id: usize) -> bool { self.tasks.get(&id).map(|state| { match state { TaskState::Ready => true, _ => false, } }).unwrap_or(false) } pub fn wake(&mut self, id: usize) { if let Some(state) = self.tasks.get_mut(&id) { match mem::replace(&mut state, TaskState::Ready) { TaskState::NotReady(waker) => waker.wake(), _ => (), } } } pub fn register(&mut self, duration: u64, waker: Waker, id: usize) { if self.tasks.insert(id, TaskState::NotReady(waker)).is_NONE() { self.dispatcher.send(Event::Timeout(duration, id)).unwrap(); } } }

    实现自己的 Futures

    为了实现自己的 Futures,我们需要定义一个 Task 结构体,其派生自 Future 特раيت。Task 占用一个 ID 和一个 Reactor 引用,表示该任务感兴趣的 Reactor。这使得 Reactors 能够在任务完成时调用相应的唤醒器(Waker)。

    #[derive(Clone)]
    struct MyWaker {
    parker: Arc
    ,
    }
    impl MyWaker {
    fn wake(s: &MyWaker) {
    let waker_arc = unsafe { Arc::from_raw(s) };
    waker_arc.parker.unpark();
    }
    fn clone(s: &MyWaker) -> RawWaker {
    let raw_waker = RawWaker::new(s as *const MyWaker, &VTABLE);
    Waker::from_raw(raw_waker)
    }
    }
    const VTABLE: RawWakerVTable = unsafe {
    RawWakerVTable::new(
    |s: *const MyWaker| MyWaker::clone(s),
    |s: *const MyWaker| MyWaker::wake(s),
    // 这里可能需要定义更多函数...
    )
    };
    struct Task {
    id: usize,
    reactor: Arc
    >>,
    data: u64,
    }
    impl Task {
    pub fn new(reactor: Arc
    >>, duration: u64, id: usize) -> Self { Task { id, reactor, data: duration } } } impl Future for Task { type Output = usize; fn poll(self: Pin<&mut Task>, cx: &mut Context) -> Poll
    { // 我们需要获取 Reactor 的锁,以确保操作的原子性 let mut reactor = self.reactor.lock().unwrap(); // 检查任务的状态 if reactor.is_ready(self.id) { // 任务已经准备好了 reactor.wake(self.id); return Poll::Ready(self.id); } // 检查任务是否已经在 Reactor 中注册 if reactor.tasks.contains_key(&self.id) { // 不在的话,注册当前任务 reactor.register(self.data, cx.waker().clone(), self.id); return Poll::Pending; } // 如果任务未完成,则继续等待 reactor.register(self.data, cx.waker().clone(), self.id); Poll::Pending } }

    exposing futures

    以下是如何在浏览器中公开和运行这些 Futures 的实例:

    use std::time::{Duration, Instant};
    fn main() {
    let start = Instant::now();
    // 创建 Reactor
    let reactor = Reactor::new();
    // 创建两个 Task Future
    let future1 = Task::new(reactor, 1, 1);
    let future2 = Task::new(reactor, 2, 2);
    // 定义两个 async 块来等待 futures
    let fut1 = async {
    let val = future1.await;
    let dur = (Instant::now() - start).as_secs_f32();
    println!("Future 1 completed at time: {:.2}.", dur)
    };
    let fut2 = async {
    let val = future2(await);
    let dur = (Instant::now() - start).as_secs_f32();
    println!("Future 2 completed at time: {:.2}.", dur)
    };
    // 等待所有 future 完成
    let mainfut = async {
    fut1.await;
    fut2.await;
    };
    // 执行者将阻塞在 mainfut 等待结果
    block_on(mainfut);
    // 关闭 Reactor 线程
    reactor.lock().map(|mut r| r.close()).unwrap();
    }

    总结

    在这个例子中,我们通过定义一个简化的 Reactor 和自己的 Futures 实现,展示了如何在 Rust 中实现异步编程。这个系统允许我们在多个任务之间传递和协调,并在 Reactor 引用完成后自动唤醒相关任务。

    如果你对本例有更多问题或想知道更多细节,可以继续深入研究代码或查看下一章的内容。

    上一篇:基于Rust的FTP客户端开源工具实践
    下一篇:Rust FFI 编程 - Rust导出共享库04

    发表评论

    最新留言

    路过,博主的博客真漂亮。。
    [***.116.15.85]2025年04月15日 02时19分11秒