TOKIO ASYNC&AWAIT 初探

2022-01-17 Rust語言中文社區

tokio async&await 初探

3.1.1 建立Tcp連接

3.1.2 https

3.1.3 獲取網頁

3.1.4 完整的抓網頁

一 想解決的問題

工具的用法

二 tokio 簡介

三 任務分解

3.1 獲取網頁

3.2 解析網頁

3.3 寫配置文件

3.4 合在一起

3.5 main函數

3.6 其他

四 完整的程序

rust的async/await終於在萬眾矚目之下穩定下來了,一起來嘗嘗鮮. 這篇文章主要是介紹基於tokio 0.2做一個服務程式設計師的小工具githubdns.

一 想解決的問題

github是程式設計師日常離不開的工具,但是國內訪問實在是太慢.看到網上介紹的,通過ipaddress.com來查詢github.com地址的方式,驗證了一下,還有一定效果. 但是每次都要手工操作,有點麻煩,就做了這麼一個小工具githubdns.同時其實也是想試試tokio,看看方便不.

工具的用法

sudo githubdns

非常簡單可以不帶任何參數,執行完畢後會在/etc/hosts文件中添加三行對於codeload.github.com,github.com,github.global.ssl.fastly.net的解析. 這就是我想做的全部事情.
如果大家有更好的提速方式,請告訴我,提前感謝!

二 tokio 簡介

tokio現在基本上是Rust上異步編程的標配了, 用官方的話來說,他就是一個Rust的異步程序Runtime.目前的0.2版本已經完全按照async/await重構,用起來非常方便. 另外熱議的Rust的零成本抽象我就不羅嗦了.

三 任務分解3.1 獲取網頁

找到域名對應的ip地址,這部分看起來比較簡單,就是一個https請求. 比如https://github.com.ipaddress.com
看著簡單,我就在這裡卡了一會兒. Rust畢竟是新興語言,就這麼一個小功能,都沒有現成的,如果是go的話,一句話就搞定了.
但是既然想嘗鮮,就從最基礎的Tcp開幹吧.

3.1.1 建立Tcp連接

這個屬於最常用的功能了,非常方便. 一句話

let socket = TcpStream::connect(&addr).await.unwrap();

這裡的await特性就是我們要的了,async wait,連接建立完了再繼續. 不會一直堵塞當前線程.

3.1.2 https

因為是https連接,所以必須轉換成tls連接. 這裡用的是tokio-tls,雖然說不是很完善,但是這種基本的操作,還是足夠了.

let builder = native_tls::TlsConnector::builder();
let connector = builder.build().unwrap();
let connector = tokio_tls::TlsConnector::from(connector);
let mut socket = connector.connect(domain.as_str(), socket).await?;

略顯羅嗦了,四行才行. 前三行可以封裝的更好一點. 第四行的connect傳入了domain參數,就是為了進行tls握手,驗證證書的有效性.

3.1.3 獲取網頁

如果有封裝好的tokio-https庫,這裡的三個步驟應該是一步完成的. 無奈沒有. 本來想自己封裝一個簡單的,但是嫌太羅嗦了,針對這個小工具,也沒有必要,乾脆裸上吧.

socket.write_all(format!("GET {} HTTP/1.0\r\nHost:{}\r\n\r\n", path, domain).as_bytes()).await?;

let mut data = Vec::new();
socket.read_to_end(&mut data).await?;

自己拼一個header發出去,然後直接抓取response. 因為我們只關心body中的html,不關心response中的header,直接扔掉.

let s = String::from_utf8(data)?;
let pos = s.find("\r\n\r\n").unwrap_or(0);
let (_, body) = s.split_at(pos);

3.1.4 完整的抓網頁

//根據url,獲取其地址對應的html內容
async fn get(domain: String) -> Result<String, Box<dyn Error>> {
let (domain, path) = parse_url(domain.as_str());
let ip_port = format!("{}:443", domain.clone());
let addr = ip_port.to_socket_addrs().unwrap().next().unwrap();
let socket = TcpStream::connect(&addr).await.unwrap();
let builder = native_tls::TlsConnector::builder();
let connector = builder.build().unwrap();
let connector = tokio_tls::TlsConnector::from(connector);
let mut socket = connector.connect(domain.as_str(), socket).await?;
socket
.write_all(format!("GET {} HTTP/1.0\r\nHost:{}\r\n\r\n", path, domain).as_bytes())
.await?;

let mut data = Vec::new();
socket.read_to_end(&mut data).await?;
let s = String::from_utf8(data)?;
let pos = s.find("\r\n\r\n").unwrap_or(0);
let (_, body) = s.split_at(pos);
Ok(String::from(body))
}

首先是函數的籤名async fn get(domain: String) -> Result<String, Box<dyn Error>>. 必須是async,否則函數體中是無法使用await的. 感興趣的同學可以看看網上的教程. 簡單的說就是async關鍵字會把我們的返回值轉換為Future.

而裡面的await關鍵字則會自動為我們保存上下文,封裝成一個狀態機.

3.2 解析網頁

這個就簡單多了,我們有現成的crate scraper,拿來用即可.因為重點是說異步,這裡的代碼雖然有點長,就不羅嗦了.關鍵可以用一句jquery來描述

$("ul.comma-separated li")

3.3 寫配置文件

對於文件的異步讀寫,使用tokio-fs,非常方便.

let contents = fs::read(hosts_file_name).await;
//... 修改
fs::write(hosts_file_name, lines.join(enter).as_bytes()).await;

ok,一個小工具就完成了.

3.4 合在一起

工具的目標是抓取多個域名對應的ip地址,然後寫入配置文件. 既然是異步,肯定要同時抓取多個.這裡順便展示一下join_all如何使用了.

let domains_str = m.value_of("domains").unwrap().to_string();
let domains: Vec<&str> = domains_str.split(",").collect();
let domain_ips = Arc::new(RwLock::new(HashMap::new()));

let mut v = Vec::new();
for domain in domains.iter() {
let domain_ips = domain_ips.clone();
let domain = String::from(*domain);
v.push(async move {
let res = get(domain.clone()).await;
if res.is_err() {
println!(
"get domain {},err {}",
domain,
res.unwrap_err().description()
);
return;
}
let res = res.unwrap();
let ip = get_address(&res, domain.clone());
domain_ips.write().unwrap().insert(domain.clone(), ip);
});
}
join_all(v).await;

let (hosts_file, enter) = get_hosts_file();
read_and_modify_hosts(domain_ips.clone(), &hosts_file, &enter).await;
Ok(())

整體看給人感覺還是略顯羅嗦,不過用rust寫代碼,我也明顯感覺到羅嗦,也可能是我功力不夠,不能吐槽rust了.

如果拋開錯誤處理,我們可以很簡潔的.

for domain in domains.iter() {
let domain_ips = domain_ips.clone();
let domain = String::from(*domain);
v.push(async move {
//取網頁內容
let res = get(domain.clone()).await?;
//解析ip地址
let ip = get_address(&res, domain.clone());
//放入map中,好寫入hosts文件
domain_ips.write().unwrap().insert(domain.clone(), ip);
});
}
join_all(v).await;

這樣看來是不是還是比較清晰的. 多個連接同時發出,又不用像goroutine一樣啟動協程,總的來說還是感覺很清爽的.

3.5 main函數

為了更方便的使用tokio,避免手工使用tokio::spawn之類的,tokio提供了async main. 使用起來是真香!

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>>

3.6 其他

這裡故意忽略了一些稍微複雜的內容,比如hosts文件的解析以及不同平臺的差異. 這些是所有代碼都無法繞開的.
不過還有一點是要特別吐槽的,rust的String設計的真是不好用,導致字符串的處理總是顯得比較羅嗦.

整個下來,有230行左右, 不過我想已經把tokio異步編程要點都覆蓋到了.

四 完整的程序

純粹是為了讓文章顯得長一點,哈哈,完全可以忽略.或者直接到我的github上看.

use futures::future::*;
use native_tls;
use scraper::{Html, Selector};
use std::collections::HashMap;
use std::error::Error;

use std::net::{IpAddr, Ipv4Addr, ToSocketAddrs};
use std::sync::{Arc, RwLock};

use clap::{App, Arg};
use std::process::Command;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let m = App::new("githubdns")
.arg(
Arg::with_name("domains")
.long("domains")
.help("set the dns needs to write to hosts")
.default_value("github.com,github.global.ssl.fastly.net,codeload.github.com,assets-cdn.github.com")
.required(false),
)
.get_matches();
let domains_str = m.value_of("domains").unwrap().to_string();
let domains: Vec<&str> = domains_str.split(",").collect();
let domain_ips = Arc::new(RwLock::new(HashMap::new()));

let mut v = Vec::new();
for domain in domains.iter() {
let domain_ips = domain_ips.clone();
let domain = String::from(*domain);
v.push(async move {
let res = get(domain.clone()).await;
if res.is_err() {
println!(
"get domain {},err {}",
domain,
res.unwrap_err().description()
);
return;
}
let res = res.unwrap();
let ip = get_address(&res, domain.clone());
domain_ips.write().unwrap().insert(domain.clone(), ip);
});
}
join_all(v).await;

// println!("domain_ips={:?}", domain_ips.read().unwrap());
let (hosts_file, enter) = get_hosts_file();
read_and_modify_hosts(domain_ips.clone(), &hosts_file, &enter).await;
Ok(())
}
//hosts文件路徑,以及回車換行對應的是\r\n還是\n,\r
//這裡有一個副作用,會講hosts文件的只讀屬性移除,可以考慮寫入後再增加上,
fn get_hosts_file() -> (String, String) {
let info = sys_info::os_type();
if info.is_err() {
panic!("unsupported os");
}
let info = info.unwrap();
// Such as "Linux", "Darwin", "Windows".
match info.as_str() {
"Linux" => ("/etc/hosts".into(), "\n".into()),
"Darwin" => ("/etc/hosts".into(), "\r".into()),
"Windows" => {
let path = r"C:\Windows\System32\drivers\etc\hosts";
//windows下hosts文件默認是只讀的,如果不修改,後續會寫不進去
Command::new("attrib")
.args(&["-R", path])
.output()
.expect("remove readonly failed");

(path.into(), "\r\n".into())
}
_ => panic!("not supported os {}", info),
}
}
//讀取hosts文件,如果其中已經有相關域名的設置,先刪除,再添加
//原有注釋保持不動
async fn read_and_modify_hosts(
m: Arc<RwLock<HashMap<String, String>>>,
hosts_file: &str,
enter: &str,
) {
use tokio::fs;
let flags = "# ----Generated By githubdns ---";
let hosts_file_name = hosts_file;
let mut m = m.write().unwrap();
let contents = fs::read(hosts_file_name).await;
if contents.is_err() {
println!(
"read {} err {}",
hosts_file_name,
contents.err().unwrap().description()
);
return;
}
let contents = contents.unwrap();
let s = String::from_utf8(contents).unwrap();
let mut lines: Vec<&str> = s.split(enter).collect();
let mut i = 0;
while i < lines.len() {
let l = lines.get(i).unwrap().clone();
if l == flags {
lines.remove(i);
continue;
}
if l.trim_start().starts_with("#") {
i += 1;
continue;
}
let _ = m.iter().any(|(domain, ip)| {
let pos = l.find(domain.as_str());
if pos.is_some() {
//如果是這個domain的子域名,也不關心
let pos = pos.unwrap();
if ip.len() > 0
&& pos > 0
&& (l.as_bytes()[pos - 1] == ' ' as u8 || l.as_bytes()[pos - 1] == '\t' as u8)
{
//是我們要找的完整的域名
lines.remove(i);
i -= 1;
return true;
}
}
return false;
});
i += 1;
}

let mut lines: Vec<_> = lines.iter().map(|n| String::from(*n)).collect();

lines.push(flags.into());
for (domain, ip) in m.iter_mut() {
if ip.len() > 0 {
lines.push(format!("{}\t {}", ip, domain));
}
}
lines.push(flags.into());
let r = fs::write(hosts_file_name, lines.join(enter).as_bytes()).await;
if r.is_err() {
panic!("write to {} ,err={}", hosts_file, r.unwrap_err());
}
}
//解析url,返回對應的domain和path
fn parse_url(domain: &str) -> (String, String) {
let ss: Vec<_> = domain.split(".").collect();
let mut path = "/".into();
let mut domain: String = domain.into();
if ss.len() > 2 {
path = format!("/{}", domain.clone());
domain = ss[ss.len() - 2..].join(".");
}
domain = format!("{}.ipaddress.com", domain);
return (domain, path);
}

//根據url,獲取其地址對應的html內容
async fn get(domain: String) -> Result<String, Box<dyn Error>> {
let (domain, path) = parse_url(domain.as_str());
println!("get {},{}", domain, path);
let ip_port = format!("{}:443", domain.clone());
// println!("ip_port={}", ip_port);
let addr = ip_port.to_socket_addrs().unwrap().next().unwrap();
// println!("addr={}", addr);
let socket = TcpStream::connect(&addr).await.unwrap();
// Send off the request by first negotiating an SSL handshake, then writing
// of our request, then flushing, then finally read off the response.
let builder = native_tls::TlsConnector::builder();
let connector = builder.build().unwrap();
let connector = tokio_tls::TlsConnector::from(connector);
let mut socket = connector.connect(domain.as_str(), socket).await?;
socket
.write_all(format!("GET {} HTTP/1.0\r\nHost:{}\r\n\r\n", path, domain).as_bytes())
.await?;

let mut data = Vec::new();
socket.read_to_end(&mut data).await?;
let s = String::from_utf8(data)?;
let pos = s.find("\r\n\r\n").unwrap_or(0);
let (_, body) = s.split_at(pos);
// println!("body={}", body);
Ok(String::from(body))
}
//從html中提取domain對應的第一個ipv4地址
fn get_address(data: &str, domain: String) -> String {
let document = Html::parse_document(data);
let ul_selector = Selector::parse("ul.comma-separated").unwrap();
let li_selector = Selector::parse("li").unwrap();
let ul = document.select(&ul_selector).next();
if ul.is_none() {
println!("{} cannot found ul,data={}", domain, data);
return String::new();
}
let ul = ul.unwrap();
let mut ip_v4 = Ipv4Addr::new(127, 0, 0, 1);
let found = ul.select(&li_selector).any(|n| {
// println!("n={}", n.inner_html().trim());
let ip: Result<IpAddr, _> = n.inner_html().trim().parse();
match ip {
Err(_) => {
return false;
}
Ok(ip) => match ip {
IpAddr::V4(ipv4) => {
ip_v4 = ipv4;
return true;
}
_ => {
return false;
}
},
}
});
if found {
return ip_v4.to_string();
}
return String::new();
}

#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_parse_url() {
let u = parse_url("github.global.ssl.fastly.net");
assert_eq!(
u,
(
String::from("fastly.net.ipaddress.com"),
String::from("/github.global.ssl.fastly.net")
)
);
let u = parse_url("github.com");
assert_eq!(
u,
(String::from("github.com.ipaddress.com"), String::from("/"))
);
}
#[tokio::test]
async fn test_get() {
assert!(true);
}
#[test]
fn test_get_address() {
let data = std::fs::read_to_string("github.com.html").unwrap();
let ip = get_address(data.as_str(), String::from("github.com"));
assert_eq!(ip, String::from("192.30.253.112"))
}
}

本文來自bai的投稿,原文地址:https://stevenbai.top/rust/tokio_async_await-%E5%88%9D%E6%8E%A2/

相關焦點

  • Tokio 1.0 發布,Rust 異步編程框架 - OSCHINA - 中文開源技術交流...
    Tokio 團隊表示,雖然 Tokio 從四年前剛誕生起就一直在不斷發展,不過出現真正的顯著變化是在一年前,因為 Rust 在當時才正式支持 async/await。::{AsyncReadExt, AsyncWriteExt};#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { let mut listener = TcpListener::bind("127.0.0.1:8080").await?
  • 理解 JavaScript 的 async/await
    無論是在 C# 還是 JavaScript 中,async/await 都是非常棒的特性,它們也都是非常甜的語法糖。C# 的 async/await 實現離不開 Task 或 Task<Result> 類,而 JavaScript 的 async/await 實現,也離不開Promise。
  • 理解JavaScript 的 async/await
    1. async 和 await 在幹什麼任意一個名稱都是有意義的,先從字面意思來理解。async 是「異步」的簡寫,而 await 可以認為是 async wait 的簡寫。所以應該很好理解 async 用於申明一個 function 是異步的,而 await 用於等待一個異步方法執行完成。
  • async/await,了解一下?
    因此,在 ES6 中封裝了 Generator 函數的語法糖 async 函數,但是將其定義在了 es7 中。ES7 定義出的 async 函數,終於讓 JavaScript 對於異步操作有了終極解決方案。 Async 函數是 Generator 函數的語法糖。使用 關鍵字 Async 來表示,在函數內部使用 await 來表示異步。
  • Swift 的 Async/Await 簡介
    解決方案:async/await異步函數 - 通常被稱為 async/await - 允許將異步代碼當作線性同步代碼來編寫。這就立馬解決了上述的許多問題,因為它允許程式設計師充分利用與同步代碼相同的語言結構。使用 async/await 還自然地保留了代碼的語義結構,提供了必要的信息讓語言可以做至少三個方向的改進。
  • async/await 使用方式
    timeout(false).catch(err => {    console.log(err)})async 關鍵字差不多了,我們再來考慮await 關鍵字,await是等待的意思,那麼它等待什麼呢,它後面跟著什麼呢?
  • Async/Await有什麼用?
    // 每日前端夜話 第394篇// 正文共:2500 字// 預計閱讀時間:8 分鐘當我第一次看到 async/await 的描述時有點困惑,直到我了解了 async/await 的出處,才領悟到它的價值。
  • 深入async/await知多少
    其實在使用async/await的有多少人真的了解它們呢?接下來詳細地講述它們和在什麼情況下需要注意的細節。為什麼需要它      如果你對async/await的需求不明顯,那只能說明你平時很少寫異步邏輯(當你很少接觸傳統異常邏輯編寫的情況下,最好也找些相關功能用一下先也了解一下傳統異常調整用情況)。
  • 盤點JavaScript中async/await知識
    還有另外一個叫 await 的關鍵詞,它只在 async 函數內工作,也非常酷。三、Await1.如果函數前面沒有 async 關鍵字,就會得到一個語法錯誤。就像前面說的,await 只在 async 函數 中有效。
  • async/await 原理及執行順序分析
    基於這個原因,ES7 引入了 async/await,這是 JavaScript 異步編程的一個重大改進,提供了在不阻塞主線程的情況下使用同步代碼實現異步訪問資源的能力,並且使得代碼邏輯更加清晰,而且還支持 try-catch 來捕獲異常,非常符合人的線性思維。所以,要研究一下如何實現 async/await。
  • JavaScript基礎——深入學習async/await
    本篇文章,小編將和大家一起學習異步編程的未來——async/await,它會打破你對上篇文章Promise的認知,竟然異步代碼還能這麼寫! 但是別太得意,你需要深入理解Promise後,才能更好的的駕馭async/await,因為async/await是基於Promise的,沒有理解Promise,小編強烈建議各位再看看《JavaScript基礎——Promise使用指南》。
  • 如何正確合理使用 JavaScript async/await
    在本文中,將從不同的角度探討 async/await,並演示如何正確有效地使用這對兄弟。async 作用是什麼從 MDN 可以看出:async 函數返回的是一個 Promise 對象。如果它等到的不是一個 Promise 對象,那 await 表達式的運算結果就是它等到的東西。如果它等到的是一個 Promise 對象,await 就忙起來了,它會阻塞後面的代碼,等著 Promise 對象 resolve,然後得到 resolve 的值,作為 await 表達式的運算結果。這就是 await 必須用在 async 函數中的原因。
  • JavaScript中的async/await的用法和理解
    今天就說一說「JavaScript中的async/await的用法和理解」程式語言中任意一個關鍵字都是有意義的,我們先從字面意思來理解。1.async async 是「異步」的簡寫,帶async關鍵字的函數,是聲明異步函數,返回值是promise對象,如果async關鍵字函數返回的不是promise,會自動用Promise.resolve()包裝。
  • Python async/await教程
    async/await更新的和更清潔的語法是使用async/await關鍵字,async在Python 3.5中引入,用於作為一個協同程序聲明一個函數,就像@asyncio.coroutine裝飾器所做的,通過把它放到函數定義前使它應用於函數:
  • 如何在 JS 循環中正確使用 async 與 await
    async 與 await 的使用方式相對簡單。 當你嘗試在循環中使用await時,事情就會變得複雜一些。在本文中,分享一些在如果循環中使用await值得注意的問題。準備一個例子對於這篇文章,假設你想從水果籃中獲取水果的數量。
  • 明明有了 promise ,為啥還需要 async await ?
    為了讓還沒聽說過這個特性的小夥伴們有一個大致了解,以下是一些關於該特性的簡要介紹:async/await是一種編寫異步代碼的新方法。在這之前編寫異步代碼使用的是回調函數和promise。async/await實際是建立在promise之上的。因此你不能把它和回調函數搭配使用。
  • 你必須了解的JavaScript關鍵字async和await
    注意,await只能在用async關鍵字標記的函數中使用。它的工作方式與Generator類似,在Promise完成之前暫停上下文中的執行。如果等待的表達不是Promise,那麼它就變成了Promise。
  • 實現一個 async/await (typescript 版)
    這次我們來實現一個 typescript 版本的 async/await。關於 async/await 的原理的文章,網上也有很多了,但是本文既然是使用 typescript 來寫,我們的 async/await 也是要能夠通過用戶傳入的函數自動推斷出結果,所以如何對其編寫 typescript 定義也是本文的一個重要板塊。
  • 如何用實例掌握Async/Await
    今天讓我們一起來探討如何用實例掌握Async/Await目錄1、簡介(callbacks, promises, async/await)2、實例—貨幣轉換器從2個API’s接收異步數據。Async函數是通過在函數聲明之前加上Async來創建的,如下所示:異步函數可以用await暫停,await是只能在異步函數中使用的關鍵字。await返回異步函數完成後返回的任何內容。
  • 為什麼 redux-saga 不能用 async await 實現
    想必開始接觸 redux-saga 的同學都有這個疑問,為啥為要用 generator 的寫法,用 async await 行不行。import { put, call } from 'redux-saga/effects'import { loginService } from '..