深入淺出Rust異步編程之Tokio

2021-03-03 高可用架構

本文以tokio為例簡單介紹Rust異步編程相關的一些知識。

首先讓我們看看為什麼使用rust來進行異步編程。這裡tokio官方給出了一個性能測試的對比,可以看到tokio是性能最好,實際上運行這個基準測試的時候,tokio性能更好的2.0版本尚未發布,否則估計性能還有很大提升。因此,我們可以認為需要非常極致性能的時候,我們可以選擇rust+tokio來實現。


Rust網絡編程

Rust實際上並不跟一定的網絡編程模型強綁定,實際rust可以實現阻塞IO+多線程,非阻塞IO+回調,用戶態線程等多種模型。這裡著重介紹Rust實現的用戶態線程。

首先,Rust的用戶態線程是一種基於Future的用戶態線程,關於Future本身,本文後續部分有詳細論述。

其次,由於是Rust實現,因此可以做到零成本抽象,並且更容易做到安全。

最後,由於沒有運行時大量內存分配,沒有動態邏輯分派,也沒有GC開銷,所以該實現的效率非常高。

Rust異步編程是構建在作業系統相關API上,MIO庫類似Java的Nio庫,針對多種作業系統的不同API做了統一封裝。Future庫類似Java的Future庫,提供了相關接口和常用的組合能力。Tokio構建於兩者之上,在MIO和future的基礎上實現了用戶態線程。使用Tokio進行異步編程的技術棧如下,需要注意的是,應用程式會同時接觸到Tokio和future的API。

Futures


future是rust異步編程的核心。首先我們介紹什麼是future。future是一段異步計算程序,可以在將來獲取產生的數據。舉例來說,獲取資料庫查詢結果,RPC調用這些實際上都可以使用future來實現。通常實現future有兩種模式,一種基於推模式,也被稱為基於完成的模式,一種基於拉模式,也被稱為基於就緒的模式。Rust的future庫實現了基於拉模式的future。

rust的future選擇拉模式來實現。接口定義如下:

pub trait Future {
type Item;
type Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error>;}

假設一個future要做這樣的功能,從TCP數據流讀取數據並計算自己讀了多少個字節並進行回調。那用代碼表示:

struct MyTcpStream {    socket: TcpStream,    nread: u64,}
impl Future for MyTcpStream { type Item =u64; type Error = io::Error;
fn poll(&mut self) -> Poll<Item, io::Error> { let mut buf = [0;10]; loop { match self.socket.read(&mut buf) { Async::Ready(0) => return Async::Ready(self.nread), Async::Ready(n) => self.nread += n, Async::NotReady => return Async::NotReady, } } }
}


每次調用poll方法,MyTcpStream都會調用socket的read方法(這裡的TcpStream本身也是一個future,read內部也是調用poll方法),當read返回為Async::NotReady的時候,調度器會將當前的Task休眠,如果返回Async::Read(n)表示讀到了數據,則給計數器加對應的數,如果返回Async::Ready(0),則表示TcpStream裡有的數據已經讀完,就將計數器返回。

為了方便大家使用,future庫包提供了很多組合子,以AndThen組合子為例:

enum AndThen<A,F> {    First(A, F),}
fn poll(&mut self) -> Async<Item> { match fut_a.poll() { Async::Ready(v) => Async::Ready(f(v)), Async::NotReady => Async::NotReady, }}


這裡AndThen枚舉,First有兩個值,其中A是一個future,F是一個閉包,AndThen實現的poll方法,就是假如調用future_a的poll方法有返回值,那麼就調用閉包,並將其返回值包裝為Async::Ready返回,如果poll的返回值是Async::NotReady則同樣返回Async::NotReady。有了這個AndThen方法,通過組合子函數(比如and_then實際上是將上一個future和閉包傳入生成一個AndThen future),我們就可以實現一些複雜邏輯:

let f=MyTcpStream::connect(&remote_addr)  .and_then(|num| {println!("already read %d",num);  return num;}).and_then(|num| {    process(num)  });  tokio::spawn(f);


上面的代碼就是建立Tcp連接,然後每次讀數據,都通過第一個and_then列印日誌,然後再通過第二個and_then做其他處理,tokio::spawn用於執行最終的future,用圖形來表示:
 如果沒有數據:
 如果有數據:
 如果將MyTcpStream的poll實現改為:

fn poll(&mut self) -> Poll<Item, io::Error> {        let mut buf = [0;1024];        let mut bytes = bytesMut::new();        loop {            match self.socket.read(&mut buf) {                Async::Ready(0) => return Async::Ready(bytes.to_vec()),                Async::Ready(n) => bytes.put(buf[0..n]),                Async::NotReady => return Async::NotReady,            }        }    }


這段代碼主要是將socket中數據讀出,然後包裝為Async::Ready或者Async::NotReady供下一個future使用,我們就可以實現更複雜的邏輯,比如:

MyTcpStream::connect(&remote_addr)  .and_then(|sock| io::write(sock, handshake))   .and_then(|sock| io::read_exact(sock, 10))   .and_then(|(sock, handshake)| {      validate(handshake);    io::write(sock, request)  })  .and_then(|sock| io::read_exact(sock, 10))  .and_then(|(sock, response)| {     process(response)  })


我們上面解釋了future和組合子,漏掉一個重要的API,就是:


當我們使用spawn方法的時候,tokio會將傳入的future生成一個task,由於future內部包含了另外的future,所以就組成了如下所示結構,其中task就是輕量級線程。

Tokio


上面我們介紹了future相關的內容,接下來我們先看看tokio如何使用,我們這裡先用taokio啟動一個伺服器,代碼如下:

let listener = TcpListener::bind(&addr).unwrap();
let server = listener.incoming().for_each(move |socket| { tokio::spawn(process(socket)); Ok(())}).map_err(|err| { println!("accept error = {:?}", err);});
tokio::run(server);


上面的代碼首先生成一個TcpListener,listener的incomming和foreach會將連進來的tcp連接生成TcpStream(即代碼中的socket),針對每一個連接啟動一個用戶態線程處理。

Tokio本身是基於Mio和future庫來實現的,其主要包含兩個主要的大功能部分(本文不是對源碼進行分析,Tokio不同版本之間的差異也較大,只是進行原理說明),reactor和scheduler。

scheduler負責對task進行調度,上文所展示的task調度部分功能就是由scheduler負責,reactor部分主要是負責事件觸發,比如網絡事件,文件系統事件,定時器等等。用圖展示如下:
 當有事件觸發的時候,reactor會通過task的api通知scheduler運行該任務。
 對於Reactor來說,其中最重要的結構是Poll和io_dispatch,在linux上Poll是對Epoll實例的封裝(在其他作業系統上也類似),io_dispatch其中記錄了調度相關的信息,具體來說主要是記錄了task的id和fd的對應關係。當通過Poll獲取到FD事件的時候,通過io_dispatch找到task,然後再通知調度器。
 TcpListner實際並非rust std庫中的TcpListner,tokio對其進行了包裝,每次有新連接到來的時候都會生成一個新的TcpStream。

TcpStream也是tokio包裝後的TcpStream,可以看到其中包含一個PollEvented,而PollEvented內部包含實際的TcpSteam。PollEvented構造之後,會調用io_dispatch中的註冊接口,然後在第一次調用poll的時候,將fd和task關聯。

Async/await


通過上面的文章可以看到,直接使用tokio相關API還是有些難度的,然而在rust 1.39.0之後的版本,我們可以使用async/awai特性來簡化代碼,使得代碼更容易理解。使用async/await後,上面的代碼可以簡化為:

#[tokio::main]pub async fn main() -> Result<(), Box<dyn Error>> {    let mut stream = TcpStream::connect("127.0.0.1:6142").await?;    println!("created stream");    let result = stream.write(b"hello world\n").await;    println!("wrote to stream; success={:?}", result.is_ok());    Ok(())}


要點在於對於需要異步的函數使用async修飾,在調用async函數的時候使用await獲取返回結果。實際上async函數是由編輯器生成的future,await也是由編譯器生成代碼調用future的poll方法。因此真正用好async/await也需要對上面的內容了解清楚。

Tips


最後,使用tokio有一些需要注意地方:

生命周期的問題。聲明周期的問題是一直貫穿rust的,具體到tokio使用上來說,最主要的是self的生命周期問題,主要是因為runtime要求借用是靜態的,這個跟對象本身的聲明周期是有矛盾的。我們推薦的主要做法是使用actor模型,這樣可以消除掉對於靜態生命周期的要求。

注意兼容問題,最主要是需要注意future01和future03的兼容性問題,future官方提供了兼容包,來做版本之間的兼容,如果要使用async/await,推薦儘量使用future03庫。

runtime,tokio的一個runtime對應一個線程池,因此推薦對不同業務使用不同線程池,減少業務之間相互影響。

使用TaskExecutor/Handle來spawm一個task。上面代碼裡經常使用的tokio::spawn是針對默認runtime的,如果使用了不同的runtime,那麼就不能使用tokio::spawn。另外,TaskExecutor/Handle支持clone,可以解決一些生命周期帶來的問題。

可以在代碼中通過api通知task運行。

近期文章推薦

技術原創及架構實踐文章,歡迎通過公眾號菜單「聯繫我們」進行投稿。

高可用架構

改變網際網路的構建方式


長按二維碼 關注「高可用架構」公眾號

相關焦點

  • 【Rust日報】2020-11-07 Rust 異步架構圖
    網站連結,https://rustfest.global/播放連結,https://watch.rustfest.global/Rust 異步架構圖關於Tokio,async-std,smol的異步架構圖片連結,https://i.redd.it/6kxvfm94kox51.png(搬運到Github連結,https://raw.githubusercontent.com
  • 【Rust日報】2020-12-20 tokio 1.0 即將發布!
    tokio 1.0 即將發布!tokio 1.0 將於今年年底發布,算起來也沒幾天了,小夥伴敬請期待吧。
  • 【Rust日報】2020-07-16 - tokio 0.3發布
    連結:https://www.reddit.com/r/rust/comments/jbzorm/valves_proton_513_now_uses_rust/rust 樹莓派交叉編譯,並部署web服務連結:https://tiziano88.medium.com/compiling-rust-for-raspberry-pi-arm-922b55dbb050連結
  • 真實世界中的 Rust 異步編程
    在上一篇 解密 Rust 異步編程 中我們已經知道了 Funture 工作的原理,讓我們看看真實世界裡面是如何工作的。
  • Rust async 威力之圖片下載器
    Rust 的異步還是很強大的,下面我們來寫一個圖片下載器。
  • 【Rust日報】 2019-08-12:Tokio alpha 版發布,新版本支持async/await
    Tokio alpha 版發布#tokio新版本支持async/awaittokio = "=0.2.0-alpha
  • 【譯】理解Rust中的Futures(一)
    這也是通往async/await[2]的基石,async/await 能夠讓用戶像寫同步代碼一樣來寫異步代碼。Async/await 在 Rust 初期還沒有準備好,但是這並不意味著你不應該在你的 Rust 項目中開始使用 futures。tokio[3] crate 穩定、易用且快速。
  • Rust 中調用 GitHub Web API - Rust Cookbook 中文版
    tokio::main 用於設置異步執行器,該進程異步等待 reqwest::get 完成,然後將響應信息反序列化到用戶實例中。use serde::Deserialize;use reqwest::Error;#[derive(Deserialize, Debug)]struct User { login: String, id: u32,}#[tokio::main]async fn main() -> Result
  • 【Rust每周一庫】smol - 異步rumtime
    簡介smol是一個輕量而高效的異步runtime。
  • 【Rust日報】 2019-07-16:「新手向」Rust vs C++ : 實現神經網絡
    Read More在Rust中使用異步網絡收集廣播UDP數據包#async #UDP本文是async和tokio的一次嘗試,作者寫了一個通過UDP廣播發現本地網絡上的設備的應用,並且介紹了他實現過程中的一些心得。
  • 【Rust日報】2021-01-26 太素桌面系統:基於RISC-V架構的Rust系統內核
    作者@大樹之下製作項目的初衷是出於對編程和創造的熱情。作業系統博大精深,它知識豐富的內涵吸引了作者,大樹之下一做就是一年。在系列文章中,作者肯定了RISC-V在架構設計上的先進性。作者編寫系列文章的初衷,是節省內核寫手們查找資料的時間和精力,出於開源和共享精神,將自己整理的知識以教程的形式饋贈給網友。
  • 頂級程式語言之對比:Rust VS Go
    現如今新程式語言層出不窮,從如此多的程式語言中選擇一款最適合的變得相當困難。因此,我們在本文中將討論兩種最受開發者歡迎的程式語言:Rust和Go語言。除了介紹這語言外,還會比較兩種語言之優缺點。 Rust語言
  • 【Rust投稿】從零實現消息中間件(4)-SERVER.CLIENT
    rust #[derive(Debug)] pub struct Client<T: SubListTrait> { pub srv: Arc<Mutex<ServerState<T>>>, pub cid: u64, pub msg_sender: Arc<Mutex<ClientMessageSender>>, }
  • 回溯 Rust 2020:正在成為最受歡迎的程式語言
    總體來說,成立該基金會的目的是為了 Rust 維護人員可以快樂地把工作做到最好,並且基金會的主要目標是啟動 Rust 貢獻者(特別是那些自願提供且不受僱主支持的貢獻者)。同時,在 Redmonk 的程式語言排名中,Rust 躋身前 20 名,鑑於 Java,C,JavaScript 等語言的地位,這個成就很了不起。2019 年初,已經有數百家公司在軟體生產中使用 Rust,例如 Dropbox、Yelp 和 Cloudflare 等,今年這個數字更大。Rust 也同樣被很多科技巨頭青睞,比如 Google、微軟、蘋果、Facebook 和 Mozilla 等。
  • Rust 不適合開發 Web API
    但與其它程式語言相比,用它構建網站會很慢。它比編譯型程式語言 Go 慢得多,也比解釋型程式語言 JavaScript、Ruby 和 Python 等慢得多。一旦代碼被編譯,一切就變得非常棒了!但在我的情況下,甚至基本 API 功能都不完整,一個不複雜的系統——居然花了 10 多分鐘來編譯。Google 代碼構建的硬體配置很差,每次都會超時,我啥都編譯不了。
  • 【Rust日報】 2019-08-21:「官方」async_await將在Rust 1.39穩定版中發布
    Read More: https://github.com/rust-lang/rust/pull/63209#issuecomment-523113079宣告:async-std 異步標準庫的測試版#async_std並打算在2019年9月26日前發布1.0版。該庫附帶了一本書和完善的應用編程接口文檔,並將很快提供一個穩定的接口來支持異步庫和應用程式。
  • 【每周一庫】- Tonic 基於Rust的gRPC實現
    TonicgRPC的rust實現,高性能,開源,為行動裝置與HTTP/2準備的通用RPC框架tonic是基於HTTP
  • 【Rust每周一庫】once_cell - 最多初始化一次的cell
    , GLOBAL_DATA.lock().unwrap());}其中的好處就是不需要神奇的宏魔法了,這也是近期tokio使用它取代了之前的lazy_static的原因:https://github.com/tokio-rs/tokio/pull/3187use once_cell::unsync::Lazy;fn main() {
  • 【Rust日報】2021-01-30 postage Rust 異步通道庫,Rustc Dev Guide 中文翻譯志願者招募
    postage,Rust 異步通道庫postage-rs,一個Rust 異步通道庫,可以更輕鬆地在 Rust 異步中開發基於消息的應用