本文以tokio為例簡單介紹Rust異步編程相關的一些知識。
首先讓我們看看為什麼使用rust來進行異步編程。這裡tokio官方給出了一個性能測試的對比,可以看到tokio是性能最好,實際上運行這個基準測試的時候,tokio性能更好的2.0版本尚未發布,否則估計性能還有很大提升。因此,我們可以認為需要非常極致性能的時候,我們可以選擇rust+tokio來實現。
Rust實際上並不跟一定的網絡編程模型強綁定,實際rust可以實現阻塞IO+多線程,非阻塞IO+回調,用戶態線程等多種模型。這裡著重介紹Rust實現的用戶態線程。
首先,Rust的用戶態線程是一種基於Future的用戶態線程,關於Future本身,本文後續部分有詳細論述。
其次,由於是Rust實現,因此可以做到零成本抽象,並且更容易做到安全。
最後,由於沒有運行時大量內存分配,沒有動態邏輯分派,也沒有GC開銷,所以該實現的效率非常高。
Rust異步編程是構建在作業系統相關API上,MIO庫類似Java的Nio庫,針對多種作業系統的不同API做了統一封裝。Future庫類似Java的Future庫,提供了相關接口和常用的組合能力。Tokio構建於兩者之上,在MIO和future的基礎上實現了用戶態線程。使用Tokio進行異步編程的技術棧如下,需要注意的是,應用程式會同時接觸到Tokio和future的API。
future是rust異步編程的核心。首先我們介紹什麼是future。future是一段異步計算程序,可以在將來獲取產生的數據。舉例來說,獲取資料庫查詢結果,RPC調用這些實際上都可以使用future來實現。通常實現future有兩種模式,一種基於推模式,也被稱為基於完成的模式,一種基於拉模式,也被稱為基於就緒的模式。Rust的future庫實現了基於拉模式的future。
rust的future選擇拉模式來實現。接口定義如下:
pub trait Future {
type Item;
type Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error>;}假設一個future要做這樣的功能,從TCP數據流讀取數據並計算自己讀了多少個字節並進行回調。那用代碼表示:
struct MyTcpStream { socket: TcpStream, nread: u64,}
impl Future for MyTcpStream { type Item =u64; type Error = io::Error;
fn poll(&mut self) -> Poll<Item, io::Error> { let mut buf = [0;10]; loop { match self.socket.read(&mut buf) { Async::Ready(0) => return Async::Ready(self.nread), Async::Ready(n) => self.nread += n, Async::NotReady => return Async::NotReady, } } }
}
每次調用poll方法,MyTcpStream都會調用socket的read方法(這裡的TcpStream本身也是一個future,read內部也是調用poll方法),當read返回為Async::NotReady的時候,調度器會將當前的Task休眠,如果返回Async::Read(n)表示讀到了數據,則給計數器加對應的數,如果返回Async::Ready(0),則表示TcpStream裡有的數據已經讀完,就將計數器返回。
為了方便大家使用,future庫包提供了很多組合子,以AndThen組合子為例:enum AndThen<A,F> { First(A, F),}
fn poll(&mut self) -> Async<Item> { match fut_a.poll() { Async::Ready(v) => Async::Ready(f(v)), Async::NotReady => Async::NotReady, }}
這裡AndThen枚舉,First有兩個值,其中A是一個future,F是一個閉包,AndThen實現的poll方法,就是假如調用future_a的poll方法有返回值,那麼就調用閉包,並將其返回值包裝為Async::Ready返回,如果poll的返回值是Async::NotReady則同樣返回Async::NotReady。有了這個AndThen方法,通過組合子函數(比如and_then實際上是將上一個future和閉包傳入生成一個AndThen future),我們就可以實現一些複雜邏輯:
let f=MyTcpStream::connect(&remote_addr) .and_then(|num| {println!("already read %d",num); return num;}).and_then(|num| { process(num) }); tokio::spawn(f);
上面的代碼就是建立Tcp連接,然後每次讀數據,都通過第一個and_then列印日誌,然後再通過第二個and_then做其他處理,tokio::spawn用於執行最終的future,用圖形來表示:
如果沒有數據:
如果有數據:
如果將MyTcpStream的poll實現改為:
fn poll(&mut self) -> Poll<Item, io::Error> { let mut buf = [0;1024]; let mut bytes = bytesMut::new(); loop { match self.socket.read(&mut buf) { Async::Ready(0) => return Async::Ready(bytes.to_vec()), Async::Ready(n) => bytes.put(buf[0..n]), Async::NotReady => return Async::NotReady, } } }
這段代碼主要是將socket中數據讀出,然後包裝為Async::Ready或者Async::NotReady供下一個future使用,我們就可以實現更複雜的邏輯,比如:
MyTcpStream::connect(&remote_addr) .and_then(|sock| io::write(sock, handshake)) .and_then(|sock| io::read_exact(sock, 10)) .and_then(|(sock, handshake)| { validate(handshake); io::write(sock, request) }) .and_then(|sock| io::read_exact(sock, 10)) .and_then(|(sock, response)| { process(response) })
我們上面解釋了future和組合子,漏掉一個重要的API,就是:
Tokio
當我們使用spawn方法的時候,tokio會將傳入的future生成一個task,由於future內部包含了另外的future,所以就組成了如下所示結構,其中task就是輕量級線程。
上面我們介紹了future相關的內容,接下來我們先看看tokio如何使用,我們這裡先用taokio啟動一個伺服器,代碼如下:
let listener = TcpListener::bind(&addr).unwrap();
let server = listener.incoming().for_each(move |socket| { tokio::spawn(process(socket)); Ok(())}).map_err(|err| { println!("accept error = {:?}", err);});
tokio::run(server);
上面的代碼首先生成一個TcpListener,listener的incomming和foreach會將連進來的tcp連接生成TcpStream(即代碼中的socket),針對每一個連接啟動一個用戶態線程處理。
Async/await
Tokio本身是基於Mio和future庫來實現的,其主要包含兩個主要的大功能部分(本文不是對源碼進行分析,Tokio不同版本之間的差異也較大,只是進行原理說明),reactor和scheduler。
scheduler負責對task進行調度,上文所展示的task調度部分功能就是由scheduler負責,reactor部分主要是負責事件觸發,比如網絡事件,文件系統事件,定時器等等。用圖展示如下:當有事件觸發的時候,reactor會通過task的api通知scheduler運行該任務。
對於Reactor來說,其中最重要的結構是Poll和io_dispatch,在linux上Poll是對Epoll實例的封裝(在其他作業系統上也類似),io_dispatch其中記錄了調度相關的信息,具體來說主要是記錄了task的id和fd的對應關係。當通過Poll獲取到FD事件的時候,通過io_dispatch找到task,然後再通知調度器。
TcpListner實際並非rust std庫中的TcpListner,tokio對其進行了包裝,每次有新連接到來的時候都會生成一個新的TcpStream。
TcpStream也是tokio包裝後的TcpStream,可以看到其中包含一個PollEvented,而PollEvented內部包含實際的TcpSteam。PollEvented構造之後,會調用io_dispatch中的註冊接口,然後在第一次調用poll的時候,將fd和task關聯。
通過上面的文章可以看到,直接使用tokio相關API還是有些難度的,然而在rust 1.39.0之後的版本,我們可以使用async/awai特性來簡化代碼,使得代碼更容易理解。使用async/await後,上面的代碼可以簡化為:
#[tokio::main]pub async fn main() -> Result<(), Box<dyn Error>> { let mut stream = TcpStream::connect("127.0.0.1:6142").await?; println!("created stream"); let result = stream.write(b"hello world\n").await; println!("wrote to stream; success={:?}", result.is_ok()); Ok(())}
要點在於對於需要異步的函數使用async修飾,在調用async函數的時候使用await獲取返回結果。實際上async函數是由編輯器生成的future,await也是由編譯器生成代碼調用future的poll方法。因此真正用好async/await也需要對上面的內容了解清楚。
Tips
最後,使用tokio有一些需要注意地方:
生命周期的問題。聲明周期的問題是一直貫穿rust的,具體到tokio使用上來說,最主要的是self的生命周期問題,主要是因為runtime要求借用是靜態的,這個跟對象本身的聲明周期是有矛盾的。我們推薦的主要做法是使用actor模型,這樣可以消除掉對於靜態生命周期的要求。
注意兼容問題,最主要是需要注意future01和future03的兼容性問題,future官方提供了兼容包,來做版本之間的兼容,如果要使用async/await,推薦儘量使用future03庫。
runtime,tokio的一個runtime對應一個線程池,因此推薦對不同業務使用不同線程池,減少業務之間相互影響。
使用TaskExecutor/Handle來spawm一個task。上面代碼裡經常使用的tokio::spawn是針對默認runtime的,如果使用了不同的runtime,那麼就不能使用tokio::spawn。另外,TaskExecutor/Handle支持clone,可以解決一些生命周期帶來的問題。
可以在代碼中通過api通知task運行。
近期文章推薦
技術原創及架構實踐文章,歡迎通過公眾號菜單「聯繫我們」進行投稿。
高可用架構
改變網際網路的構建方式
長按二維碼 關注「高可用架構」公眾號