我們知道現在硬體飛速發展,多核CPU 成了標配。為了提高程序的效率,一個方面改變程序的順序執行,用異步方式,防止由於某個耗時步驟,而影響後續程序的執行。另一個方面是採用並發方式執行,重複利用多核CPU優勢加速執行。關於並發編程大家可能比較熟悉的是Golang的協程、通道和Node.js 的async.parallel異步並發編程。就並發編程來說,Python不是一門合適的語言,主要是Python有一個解析器(CPython)內置的全局解釋鎖GIL。 GIL限制Python中一次只能有一個線程訪問Python對象,從而我們無法實現多線程分配到多個CPU執行,這是一個極大限制,限制Python並發編程。當然限制歸限制,Python標準庫中都已經引入了多進程和多線程庫,所以Python並發程序相當簡單。
本文中,蟲蟲給大家實例介紹一下Python的並發編程
並發編程
關於python並發編程,我們推薦優雅地創建並發程序三部曲:
首先,編寫一個按順序執行任務的腳本。
其次,腳本中的執行程序(耗時任務)提取為一個執行函數,並使用map函數調用。
最後,使用並發模塊中的函數替換map即可。
實例腳本
該實例中,我們用到一個小的圖片爬蟲,使用urllib從Picsum下載20張圖片,具體腳本程序如下:
pic_get.py
import urllib.request
import time
url = 'picsum.photos/id/{}/200/300'
args = [(n, url.format(n)) for n in range(20)]
start = time.time()
for pic_id, url in args:
res = urllib.request.urlopen(url)
pic = res.read()
with open(f'./{pic_id}.jpg', 'wb') as f:
f.write(pic)
print(f'圖片 {pic_id} 已經保存!')
end = time.time()
msg ='共耗時 {:.3f} 秒下載完成。'
print(msg.format(end-start))
python pic_get.py 運行該腳本,結果如下:
圖片 0 已經保存!
圖片 1 已經保存!
圖片 2 已經保存!
...
共耗時 26.694 秒下載完成。
下載共耗費不到半分鐘,接著按照我們優雅的三部曲,改造這個腳本。
使用Map改造腳本
下面腳本中,我們將下載圖片的代碼打包到一個執行函數get_img中。
# pic_get1.py
import urllib.request
import time
def get_img(pic_id, url):
res = urllib.request.urlopen(url)
pic = res.read()
with open(f'test/{pic_id}.jpg', 'wb') as f:
f.write(pic)
print(f'圖片 {pic_id} 已經保存!')
def main():
url = 'picsum.photos/id/{}/200/300'
pic_ids = [i for i in range(20)] ;
urls=[(url.format(n)) for n in range(20)]
start = time.time()
for _ in map(get_img, pic_ids, urls):
pass
end = time.time()
msg = '共耗時{:.3f}秒下載完成。'
print(msg.format(end-start))
if __name__ == '__main__':
main()
上述腳本中,用map函數替換先前腳本中的for循環(黑體部分)。map是一個函數式編程語法,該函數會生成一個迭代器,迭代器會執行迭代調用get_img()。關於map()函數熟悉函數式編程人可能會覺得有點奇怪,請自己搜索資料充電,此處,我們用它來充當並發編程網關。
圖片 0 已經保存!
圖片 1 已經保存!
圖片 2 已經保存!
...
圖片 19 已經保存!
共耗時26.023秒下載完成。
用map改造後,運行腳本總耗時大體上和腳本一致。
多線程並發處理
Python標準庫的current.futures模塊包含了大量並發編程的包裝函數,詳細說明,可參見官方文檔,此處我們直接上代碼。
將pic_get1.py中的程序做簡單改進,就能實現多線程腳本:
首先在腳本開頭引入多線程函數:
from concurrent.futures import ThreadPoolExecutor
接著替換
for _ in map(download_img, pic_ids, urls):
pass
為
with ThreadPoolExecutor(max_workers=20) as do:
do.map(get_img, pic_ids, urls)
即可。執行
圖片 0 已經保存!
圖片 2 已經保存!
圖片 5 已經保存!
...
圖片 9 已經保存!
共耗時2.913秒下載完成。
總耗時由26秒,減少到了大約3秒。大概快了8倍。並發執行的效果還是槓槓的。
程序中我們使用with ThreadPoolExecutor語句產生一個執行器do。通過將get_img和相應的參數映射到執行程序,自動生成多線程執行。
大家可能注意到了在多線程腳本執行後,圖片下載時候不是以前的0~19的順序的,而是不同線程並發執行的所以完成提示信息也是亂序的。
多進程處理
多進程的改造也非常簡單,我麼只需把之前多線程腳本中的ThreadPoolExecutor替換為ProcessPoolExecutor即可。
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=20) as do:
do.map(get_img, pic_ids, urls)
執行結果:
圖片 9 已經保存!
圖片 6 已經保存!
...
圖片 11 已經保存!
圖片 15 已經保存!
共耗時4.606秒下載完成
也非常快了,4秒鐘就完成了,但是比多線程的3秒,稍微慢點。為什麼多進程要比多線程慢呢?顧名思義,多進程程序會啟用多個進程,而多線程會使用線程。Python中一個進程可以運行多個線程。每個進程都有其適當的Python解釋器和適當的GIL。相比較而已,啟動一個進程是更加耗時,重的操作,所以需要花費的時間更多。
斐波那契數列計算
為了進一步說明Python中線程和進程之間的區別,我們再來舉一個大量計算的例子,斐波那契數列的計算。
根據斐波那契數列的定義我們用遞歸方法編寫實現其計算:
def fib(n):
if n == 1:
return 0
elif n == 2:
return 1
else:
return fib(n-1) + fib(n-2)
在不使用numpy的情況下用普通Python計算比較慢:
def main():
fib_range = list(range(1, 35))
times = []
for run in range(10):
start = time.time()
for n in fib_range:
fib(n)
end = time.time()
times.append(end-start)
print('波那契數列fib(35)計算平均耗時 {:.3f}。'.format(np.mean(times)))
波那契數列fib(35)計算平均耗時 5.200
下面我們用並發計算來加速計算。
讓我們通過線程加速它!為此,我用受信任的ThreadPoolExecutor替換for循環,如下所示:
with ThreadPoolExecutor() as do:
do.map(fib, fib_range)
執行結果:
波那契數列fib(35)計算平均耗時 5.239。
什麼?加速後,反而慢,好像多線程沒起到作用。這就是GIL的因素導致的,儘管使用了多個線程,生成了一堆線程,但是這些線程都在同一進程中運行並共享一個GIL。所以斐波那契序列儘管是並發計算的,這些線程在只能在一個CPU上循序執行。
進程可以分布在不同的CPU核心,而在同一進程上運行的線程則不能。使CPU消耗最大的操作為CPU綁定操作。為了加快CPU限制的操作,應該啟動多個進程計算。我們用ProcessPoolExecutor替換ThreadPoolExecutor再試試:
波那契數列fib(35)計算平均耗時 3.591
性能提高了一點。
除了並發的方式外,我們可以用算法優化方法來提高性能,在數值計算中,這是一種更有效的方法,比如,我們改造fib函數:
def fib(n):
a, b, i = 0, 1, 1
while i < n:
a, b = b, a + b
i += 1
return b
上述方法中,巧妙用內存存中的變量歷史迭代的前兩次結果都存在內存中,所以該次計算中無需回溯迭代計算,這樣計算效率O(1),基本上可以秒出結果。
使用新算法後的執行結果:
波那契數列fib(35)計算平均耗時 0.000。
總結
本文我們實例介紹了Python中的並發編程,關於並發編程由於標準庫中給我們打包好了方便使用的並發函數使得其使用非常方便。需要注意的是Python中的並發不管是多線程在IO操作中是有效的,而在其他方面,如數值結算時候就受GIL限制無用了。關於並發計算和GIL有心的話,可以參考有關文檔進一步深入學習了解。