python並發函數apply_async踩坑記錄

2021-03-02 風清雲的學習之路

近期多了幾位朋友關注,非常感謝~以筆者之前的渣排版都能夠看得下去的,相信都是能夠靜下心來希望提升自己的朋友~

學術能力重要,工程實踐能力同樣重要。

筆者近期在工作中使用python的multiprocessing庫中的並發功能的時候發現:

當某個自定義類中的成員變量包含multiprocessing庫中的Pool實例或Manager中的Queue,Lock等實例時,Pool.apply_async並發模塊無法接收以類方法/實例方法類型的函數進行並發。

舉個慄子:

現在我們定製了一個類,並且在構造函數中聲明了一個實例化了一個Pool作為成員變量,定義了一個用於並發的函數async_func_member,以及一個調用並發功能的函數call_async:

import multiprocessing as mpclass test_async:    def __init__(self):        self.async_pool = mp.Pool(5)            def async_func_member(self, func_idx):        print("current aysnc func is {}".format(func_idx))        return {"func_idx": func_idx}            def call_async(self):        return_dicts = []        for call_num in range(5):            return_dicts.append(self.async_pool.apply_async(self.async_func_member, args=(call_num,)).get())

這裡注意兩個小細節:

第一個是,apply_async函數的傳參數部分args只接收tuple類型,所以即使只有call_num這個單參數傳入,也要寫成args=(call_num,);

第二個是,通過調用並發的.get()方法,會直接阻塞主進程並等到子進程獲取到結果為止,實際上上述的並發是個串行執行的順序,不過這個和本文重點無關。

好了,現在運行吧:

if __name__ == '__main__':    test_async_instance = test_async()    ret = test_async_instance.call_async()

運行上述代碼後,報錯信息摘取如下:

NotImplementedError: pool objects cannot be passed between processes or pickled

這個錯誤的意思是:pool對象是無法被序列化並在進程之間傳遞的。

簡單來說,就是我們自定義的test_async類裡面,不能包含有multiprocessing中的Pool實例,否則apply_async將無法把這個類的成員函數(即self.async_func_member)進行序列化並在不同進程中傳遞。

為什麼呢?這個和Python的序列化庫(pickle)有關,當我們在進程之間傳遞變量或函數或某些自定義類型時,它們都需要能夠被pickle序列化,否則將導致進程之間無法傳遞這些信息。(這裡不深究為什麼pickle不支持序列化pool,重點先看看怎麼解決)

問題暴露了,現在看看怎麼解決比較好。

解決方案一:讓apply_async不調用成員函數。(有點飲鴆止渴)

如果非要在自定義類中實例化Pool(),Manager.Queue()以及Manager.Lock()等對象,那麼就把用於並發的函數寫到該類的外面,以避免序列化帶來的問題:

import multiprocessing as mp


def async_outside(func_idx): print("current aysnc func is {}".format(func_idx)) return {"func_idx": func_idx}
class test_async: def __init__(self): self.async_pool = mp.Pool(5)     def call_async(self): return_dicts = [] tmp_async_pool = mp.Pool(5)        for call_num in range(5): return_dicts.append(self.async_pool.apply_async(async_outside, args=(call_num,)).get())

這樣就避開了序列化的問題,運行結果如下:

current aysnc func is 0current aysnc func is 1current aysnc func is 2current aysnc func is 3current aysnc func is 4[{'func_idx': 0}, {'func_idx': 1}, {'func_idx': 2}, {'func_idx': 3}, {'func_idx': 4}]

解決方案二:讓類在序列化的時候認為這個pool對象不存在

這裡涉及到實現兩個特殊的方法:__getstate__()以及__setstate__(),這兩個方法是專門針對序列化設計的。

__getstate__()會告訴pickle,這個類在序列化的時候用到了那些成員變量;

__setstate__()會告訴pickle,這個類在反序列化的時候要從它這裡拿到一些什麼成員變量。

貼代碼:

import multiprocessing as mpclass test_async:    def __init__(self):        self.async_pool = mp.Pool(5)
def async_func_member(self, func_idx): print("current aysnc func is {}".format(func_idx)) return {"func_idx": func_idx}
def __getstate__(self):                 pickle_dict = self.__dict__.copy() del pickle_dict['async_pool'] return pickle_dict
def __setstate__(self, state):                 pass def call_async(self): return_dicts = [] tmp_async_pool = mp.Pool(5)        for call_num in range(5):            return_dicts.append(self.async_pool.apply_async(self.async_func_member, args=(call_num,)).get())        return return_dicts

運行也是可以成功的。

解決方案三:不要將Pool實例化為成員變量

這是筆者比較推薦的解決方案。

進程池Pool,進程之間共享的隊列Manager.Queue()以及鎖Manager.Lock()等都不建議實例化為類的成員變量。

因為實例化為類的成員變量有兩個缺點:

1、該類無法被序列化;

2、當該進程池(本例為self.async_pool)被調用過apply_async(),並且被調用過close()以及join()後,就無法被再次用來做並發了,一個成員變量只有一次被調用的生命周期,把它聲明為成員變量就沒有意義了。

因此,筆者的建議是,對於進程池以及其他進程間共享的數據結構等信息,最好都是即開即用,而不是寫到一個公共區域中重複使用。

建議代碼:

import multiprocessing as mpclass test_async:    def __init__(self):                "do some initialization"
def async_func_member(self, func_idx): print("current aysnc func is {}".format(func_idx)) return {"func_idx": func_idx}
def call_async(self): return_dicts = []         tmp_async_pool = mp.Pool(5) for call_num in range(5):            return_dicts.append(tmp_async_pool.apply_async(self.async_func_member, args=(call_num,)).get())         tmp_async_pool.close() tmp_async_pool.join()         return return_dicts

運行可以成功。以上這種方法清晰簡潔,做到了對進程池的即開即用,也不需要操心序列化反序列化的問題。

總結

1、python不支持序列化進程池,進程間共享的數據結構等對象;

2、__getstate__() 和 __setstate__()能夠自定義序列化和反序列化的工作;

3、建議對multiprocessing庫中的對象採取即開即用的做法,而不是寫為成員變量。

相關焦點

  • python進程池Pool的apply與apply_async到底怎麼用?
    多進程python中使用multiprocessing模塊實現多進程。Type:      methodPool 中提供了如下幾個方法:apply()apply_async()map()map_async()close()terminal()join()這裡主要說一下apply和apply_async兩個,其他的內容可以進行百度搜索
  • 並發進行時--python編程必知必會(3)
    不一定非得以CPU數量作為進程池的進程數,進程數多於CPU數量也完全沒有問題,就算是單核CPU也能做多進程5. pool.map()的返回值是個列表6.特別注意,以上例子是由python2.7寫的,如果換成python3.x,同步執行的map(do,aimlist)需換成list(map(do,aimlist)),否則map不會被執行,前後時間戳一樣因為在3.x裡,map
  • Python數據分析—apply函數
    而這些操作都可以藉助python中的apply函數進行處理。今天介紹數據分析的第四課,教大家如何在python中用apply函數對數據框進行一些複雜一點的操作。def replace_gender_to_num(val): if val=='男': return 1 else: return 0date_frame.gender.apply(replace_gender_to_num)
  • Python async/await教程
    async/await更新的和更清潔的語法是使用async/await關鍵字,async在Python 3.5中引入,用於作為一個協同程序聲明一個函數,就像@asyncio.coroutine裝飾器所做的,通過把它放到函數定義前使它應用於函數:
  • 揭秘Python並發編程——協程
    Python並發編程一直是進階當中不可跨越的一道坎,其中包括進程、線程、協程,今天我們就來聊一聊協程。那麼具體的協程實現的具體原理是什麼呢,我們來分解一下代碼:先看一下輸出內容:我們這裡使用async來聲明一個異步函數,使用await來執行異步函數,await的執行效果會使程序阻塞在這裡,最後需要使用asyncio.run()來觸發運行,asyncio.run()作為主程序的入口函數,在整個過程中只執行一次當使用Task創建任務時,所有任務都會做好準備
  • Python並發編程初步
    另一個方面是採用並發方式執行,重複利用多核CPU優勢加速執行。關於並發編程大家可能比較熟悉的是Golang的協程、通道和Node.js 的async.parallel異步並發編程。就並發編程來說,Python不是一門合適的語言,主要是Python有一個解析器(CPython)內置的全局解釋鎖GIL。
  • Python 3.8異步並發編程
    有效的提高程序執行效率的兩種方法是異步和並發,Golang,node.js之所以可以有很高執行效率主要是他們的協程和異步並發機制。實際上異步和並發是每一種現代語言都在追求的特性,當然Python也不例外,今天我們就講講Python 3中的異步並發編程。
  • Pandas高級應用(map()與apply()函數)
    一.map()函數import pandas
  • Async Python 竟不比sync Python 快,怎麼回事?
    【CSDN編者按】在實際的基準測試下,async (異步)Python比「sync」(同步) Python要慢。而更讓人擔心的是,async框架在負載下會不穩定。作者 | Cal Paterson譯者 | 香檳超新星,責編 | 夕顏大多數人都認為異步Python的並發程度更高。
  • Pyinstaller 打包Python腳本踩坑之旅
    Pyinstaller 打包Python腳本踩坑之旅 前言:眾所周知,python是一門強大的膠水語言,尤其憑藉其豐富的第三方庫近些年來十分火熱
  • 快速掌握用python寫並行程序,乾貨滿滿
    我們可以稱它為並發(concurrency)程序,這個程序一定意義上提升了單個CPU的使用率,所以效率也相對較高。分布式計算的優勢:可以集成諸多低配的計算機(成千上萬臺)進行高並發的儲存與計算,從而達到與超級計算機媲美的處理能力。三、用python寫並行程序在介紹如何使用python寫並行程序之前,我們需要先補充幾個概念,分別是進程、線程與全局解釋器鎖(Global Interpreter Lock, GIL)。
  • Python在移除列表元素中的坑
    本章主要介紹在編寫Python代碼中,在列表中移除元素時,容易踩到的坑以及如何避免踩坑//前言python中的坑無處不在,防不勝防。前段時間,微信技術交流群中的一位在苦學python的群友,在練習使用remove函數時遇到了問題並發出來和大家討論。
  • 【Python】Pandas中的寶藏函數-apply
    axis :{0 or 'index', 1 or 'columns'}, default 0 函數應用所沿著的軸。0 or index : 在每一列上應用函數。1 or columns : 在每一行上應用函數。
  • Python編程:多進程multiprocessing
    下面介紹一下multiprocessing模塊下的Pool類下的幾個方法:1.apply()函數原型:apply(func[, args=()[, kwds={}]])該函數用於傳遞不定參數,同python中的apply函數一致,主進程會被阻塞直到函數執行結束(不建議使用,並且3.x以後不在出現)。
  • 尹立博:Python 全局解釋器鎖與並發 | AI 研習社第 59 期猿桌會
    分享主題:Python 全局解釋器鎖與並發分享提綱:1、全局解釋器鎖 (GIL)2、多進程 (multiprocessing)3、多線程 (multithreading)4、異步 (async)5、分布式計算(以 Dask 為例)
  • 一文弄懂apply、map和applymap三種函數的區別
    首先有一個表:apply應用到DataFrame中如果我們求一下每一列或者每一列的極差,注意axis參數的設置,一般默認為0,即求每一列的極值apply的參數可以直接接收現成的函數,也可以接收自定義函數,比如自定義的匿名函數:
  • Swift 的 Async/Await 簡介
    函數可以選擇成為 async,允許程式設計師使用正常的控制流機制來編寫涉及異步操作的複雜邏輯。編譯器負責將一個異步函數翻譯成一套適當的閉包和狀態機。這個提案定義了異步函數的語義。然而,它並沒有提供並發性:這在另一個引入結構化並發的提案裡討論,該提案將異步函數與並發執行的任務聯繫起來,並提供用於創建、查詢和取消任務的 API。
  • python 學習之 R and Python: 循環函數
    1引言我們看看在 R 裡和 Python 裡這些 lapply,apply,mapply,tapply,split, 函數的不同實現方式。Python 裡是沒有這類函數的, 除了 apply ,但也可以實現類似的方法。
  • 面試官問 async、await 函數原理是在問什麼?
    2.1 關於 generator說到異步編程,我們很容易想到還有 promise,async 和 await。它們有什麼區別呢?先看看 JS 異步編程進化史:callback -> promise -> generator -> async + awaitJS 異步編程再看看它們語法上的差異:CallbackPromiseGeneratorasync + await + Promiseajax(url, () => {})
  • Python異步IO實現全過程
    Async IO概覽相比於多進程和線程久經考驗,async IO則略遜一籌。本文將帶你總覽什麼是async IO以及它是如何應用於與之相關的環境的。Async IO適用於哪裡?並發與並行是一個寬泛的主題,並不容易實現。