使用concurrent.futures並發執行簡單任務
用Python編寫並發代碼可能很棘手。在您開始之前,您必須考慮很多令人討厭的事情,比如手頭的任務是I/O密集型還是計算密集型、為實現並發性所付出的代價是否會給您帶來所需要的提升。此外,由於全局解釋器鎖的存在,進一步限制了編寫真正並發的代碼。但為了理智起見,你可以這樣簡化並發編程,而不必大錯特錯:
在Python中,如果手頭的任務是I/O密集型,可以使用標準庫的threading 模塊,或者如果任務是計算密集型,那麼multiprocessing模塊很有助益。threading和multiprocessing為您提供了很多控制權和靈活性,但它們的代價是必須編寫相對低級的冗長代碼,在核心邏輯的基礎上增加額外的並具有複雜性的層。有時,當目標任務很複雜時,在添加並發時通常無法避免複雜性。然而,許多簡單的任務可以並發而不增加太多額外的開銷。
Python標準庫還包含一個名為concurrent.futures的模塊。這個模塊是在Python3.2中添加的,為開發人員提供了一個高級接口來啟動異步任務。它是在threading和multiprocessing 之上的一個通用抽象層,用於提供一個接口,以便使用線程池或進程池並發地執行任務。如果您只想同時運行一段合格的代碼,而不需要threading和multiprocessing api所暴露的附加特性,那麼它就是一個完美的工具。map(func, *iterables, timeout=None, chunksize=1)
類似於map(func,*iterables),除了:立即收集iterables 而不是惰性收集;func是異步執行的,對func的幾個調用可以同時進行。如果調用了 __next__(),並且在對Executor.map()的原始調用超時後結果仍不可用,則返回的迭代器將拋出concurrent.futures.TimeoutError。超時時間可以是int或float。如果超時時間未指定或為None,則等待時間沒有限制。如果func調用引發異常,則在從迭代器檢索其值時將引發該異常。當使用ProcessPoolExecutor時,此方法將iterable分為若干塊,並作為單獨的任務提交給池。這些塊的(近似)大小可以通過將chunksize設置為正整數來指定。對於很長的iterable,使用一個較大的chunksize值與默認大小1相比可以顯著提高性能。對於ThreadPoolExecutor,chunksize沒有任何效果。在這裡,get_tasks返回一個iterable,其中包含需要執行特定任務函數的目標任務或參數。任務通常是阻塞的可調用函數,它們一個接一個地運行,一次只運行一個任務。由於其順序執行,邏輯很容易推理。當任務數量較少或單個任務的執行時間要求和複雜性較低時,是很方便的。然而,當任務數量巨大或單個任務耗時很長時,可能會很快失控。一般的經驗法則是在I/O密集型任務中使用ThreadPoolExecutor,比如向多個URL發送多個http請求,將大量文件保存到磁碟等等。在計算密集型任務中應該使用ProcessPoolExecutor,比如在大量的圖像上運行計算量很大的預處理函數,同時操作許多文本文件等。
當您有許多任務時,您可以將它們放到一次運行計劃中,並等待它們全部完成,然後您可以收集結果。在這裡,您首先創建一個Executor,它管理正在運行的所有任務----在單獨的進程或線程中。使用with語句創建一個上下文調度器,它確保在完成後通過隱式調用executor.shutdown()方法清除所有不需要的線程或進程。在實際代碼中,基於callables的性質,您需要用ThreadPoolExecutor或ProcessPoolExecutor替換Executor。然後使用set comprehension開始所有的任務。executor.submit()方法調度每個任務。這將創建一個Future對象,該對象表示要完成的任務。一旦所有的任務都安排好了,就調用concurrent.futures_as_completed()方法,這會在每個任務完成時生成future。executor.result()方法提供perform(task)的返回值,或者在失敗時拋出異常。executor.submit()方法異步調度任務,不保存與原始任務相關的任何上下文。所以如果你想把結果和最初的任務對應起來,你需要自己去追蹤它們。注意變量futures,其中原始任務使用字典映射到對應的futures。
另一種方法是使用execuror.map()方法,按照預定的順序收集結果。注意map函數如何一次獲取整個iterable。它會立即而不是惰性地把結果按預定的順序顯示出來。如果在操作過程中發生任何未處理的異常,它也將立即拋出,並且不會繼續執行。在Python3.5+中,executor.map()接收一個可選參數:chunksize。當使用ProcessPoolExecutor時,對於很長的iterable,使用一個較大的chunksize值與默認大小1相比可以顯著提高性能。對於ThreadPoolExecutor,chunksize沒有效果。
在繼續示例之前,讓我們編寫一個小的decorator,它將有助於度量和比較並發代碼和順序代碼的執行時間。可以這樣使用decorator:首先,讓我們從一堆url下載一些pdf文件並將它們保存到磁碟。這可能是一個I/O密集型的任務,我們將使用ThreadPoolExecutor類來執行該操作。但在此之前,我們先按順序來做。
在上面的代碼片段中,我主要定義了兩個函數。下載功能從給定的URL下載pdf文件並將其保存到磁碟。它檢查URL中的文件是否具有擴展名,如果沒有擴展名,則會引發運行時錯誤。如果在文件名中找到擴展名,它將逐塊下載文件並保存到磁碟。第二個函數download_all只是遍歷一個url序列,並對每個url應用download_one函數。順序執行花費了22.8s。現在讓我們看看相同代碼的多線程版本的表現。代碼的並發版本只需要順序版本用時的1/4左右。注意,在這個並發版本中,download_one函數與之前相同,但是在download_all函數中,ThreadPoolExecutor上下文管理器包含了execute.map()方法。download_one與包含url的iterable一起傳遞到map中。timeout參數確定線程在多久之後放棄管道中的某個任務。max_workers表示要部署多少工作線程來生成和管理線程。一般經驗法則是使用2 * multiprocessing.cpu_count() + 1。我的機器有6個物理內核和12個線程。所以我設置為13。注意:您還可以嘗試通過相同的接口使用ProcessPoolExecutor運行上述函數,並注意到多線程版本的性能由於任務性質合適而表現稍好。使用Multi-processing運行計算密集型子例程
下面的示例顯示了一個計算密集型的哈希函數。主函數將按順序多次運行計算密集型哈希算法。然後另一個函數將再次多次運行加密操作。讓我們先按順序運行函數。
如果您分析hash-one和hash-all函數,您可以看到它們實際上是兩個計算密集型的嵌套for循環。上述代碼在順序模式下運行大約需要18秒。現在讓我們使用ProcessPoolExecutor並行運行它。如果仔細觀察,即使在並發版本中,hash中的for循環也會按順序運行。然而,hash_all函數中的另一個for循環正在通過多個進程執行。在這裡,我用將workers數量設置為10,chunksize設置為2。調整了workers數量和chunksize以獲得最大性能。如您所見,上述計算密集型操作的並發版本比其順序操作的版本快11倍。既然concurrent.futures提供這樣一個簡單的API,您可能會嘗試將並發性應用於手頭的每個簡單任務。不過,這不是個好主意。首先,簡單性有其合理的限制。這樣,您只能將並發性應用於最簡單的任務,通常是將函數映射到iterable或同時運行幾個子例程。如果您手頭的任務需要排隊,從多個進程生成多個線程,那麼您仍然需要使用較低級別的threading和multiprocessing模塊。
使用並發的另一個陷阱是使用ThreadPoolExecutor時可能出現的死鎖情況。當與Future 關聯的可調用函數等待另一個Future的結果時,它們可能永遠不會釋放對線程的控制並導致死鎖。讓我們看看官方文檔中稍微修改過的示例。
在上面的例子中,函數wait_on_b依賴於函數wait_on_a的結果(Future對象的結果),同時後一個函數的結果依賴於前一個函數的結果。因此,上下文管理器中的代碼塊永遠不會執行,因為它具有相互依賴性。這就造成了死鎖。讓我們從官方文檔中解釋另一個死鎖情況。
當子例程生成嵌套的future對象並在單個線程上運行時,通常會發生上述情況。在函數wait_on_future中,executor.submit(pow,5,2)創建另一個future對象。因為我使用一個線程運行整個過程,所以內部的future對象正在阻塞線程,並且上下文管理器中的外部executor.submit()方法不能使用任何線程。使用多個線程可以避免這種情況,但通常,這本身就是一個糟糕的設計。在某些情況下,並發代碼的性能可能比順序代碼的性能低。這可能有多種原因。
線程用於執行計算密集型的任務
多進程用於執行I/O密集型任務
這些任務太瑣碎,無法使用線程或多個進程生成和調度多個線程或進程會帶來額外的開銷。通常線程的生成和調度速度比進程快得多。然而,使用錯誤的並發類型實際上會降低代碼的速度,而不是使其更高效。下面是一個簡單的示例,其中ThreadPoolExecutor和ProcessPoolExecutor的性能都比它們的順序版本差。以上示例驗證列表中的數字是否為素數。我們在1000個數字上運行這個函數來確定它們是不是質數。順序版本大約花了67毫秒。但是,請看下面,同一代碼的多線程版本執行同一任務所需的時間(140ms)竟是兩倍多。同一代碼的多線程版本甚至更慢。這些任務並不能證明開放多進程是正確的。雖然從直觀上看,檢查質數的任務似乎應該是一個計算密集型操作,但確定任務本身的計算量是否足以證明使用多個線程或進程的合理性也很重要。否則,您可能會得到比簡單解決方案性能更差的複雜代碼。博客中的所有代碼都是在運行Ubuntu 18.04的機器上,用python 3.8編寫和測試的。
concurrent.futures-官方文檔(https://docs.python.org/3/library/concurrent.futures.html)Easy Concurrency in Python(http://pljung.de/posts/easy-concurrency-in-python/)Adventures in Python with concurrent.futures(https://alexwlchan.net/2019/10/adventures-with-concurrent-futures/) 英文原文:https://rednafi.github.io/digressions/python/2020/04/21/python-concurrent-futures.html
譯者:阿布銩