Python極其簡單的分布式異步作業管理系統RQ入門
1. 什麼是Job?
Job直譯過來就是工作,可以是任意的Python函數,你可以把你想要異步執行的任務都寫成Job函數。簡而言之,Job就是你想執行的操作。例如,我想統計任意網頁的字符數量,可以寫一個這樣的Job函數:
import requestsdef count_words(url): return len(requests.get(url).text.split())
這樣一個函數就可以稱之為Job。
2. 什麼是Queue?
當我有很多Job時,假如我現在有3個Job,分別是j1、j2、j3,那麼當計算機要執行這些任務的時候,會按照j1、j2、j3加入的順序來執行這些Job,這樣的一個可以忘裡面添加Job,並且能夠順序執行隊列稱之為Queue。
例如,我們可以這樣來構建一個Queue:
import redisfrom rq import Queueredis_conn = redis.Redis()q = Queue('default', connection=redis_conn) # 第一個參數是Queue的名稱,可以不傳,默認為default
3. 怎麼把Job放到隊列裡面去?
j = q.enqueue(count_words, args=('https://www.baidu.com',))
enqueue第一參數是Job函數,args是Job函數的參數,關鍵字參數可以通過kwargs傳入。
4. 什麼是Worker?
Worker是Job的消費者,簡單來說,你把很多Job加入到了Queue,誰來運行這些Job呢?當然就是Worker啦,你也可以看出Worker必須是獨立的進程,這個進程從Redis裡面獲取Job的信息(包括函數、參數等等),然後運行這個Job。
啟動Worker進程也很簡單:
$ rq worker low high default16:56:02 RQ worker 'rq:worker:s2.6443' started, version 0.8.1 16:56:02 Cleaning registries for queue: low 16:56:02 Cleaning registries for queue: high 16:56:02 Cleaning registries for queue: default 16:56:02 16:56:02 *** Listening on low, high, default...
後面的三個參數low、high、default,就是這個Worker將要運行哪些Queue裡面的Job,這個順序很重要,排在前面的Queue裡面的Job將優先被運行。
5. 一個完整的例子
jobs.py
import requestsimport redisfrom rq import Queuedef count_words(url): return len(requests.get(url).text.split())def get_q(): redis_conn = redis.Redis() return Queue(connection=redis_conn)
app.py
from jobs import get_q, count_wordsdef run(): q = get_q() j = e.enqueue(count_words, 'https://www.baidu.com') print(j.result)if __name__ == '__main__': run()
啟動Worker:
$ rq worker
運行:
$ python app.py