近期多了幾位朋友關注,非常感謝~以筆者之前的渣排版都能夠看得下去的,相信都是能夠靜下心來希望提升自己的朋友~
學術能力重要,工程實踐能力同樣重要。
筆者近期在工作中使用python的multiprocessing庫中的並發功能的時候發現:
當某個自定義類中的成員變量包含multiprocessing庫中的Pool實例或Manager中的Queue,Lock等實例時,Pool.apply_async並發模塊無法接收以類方法/實例方法類型的函數進行並發。
舉個慄子:
現在我們定製了一個類,並且在構造函數中聲明了一個實例化了一個Pool作為成員變量,定義了一個用於並發的函數async_func_member,以及一個調用並發功能的函數call_async:
import multiprocessing as mpclass test_async: def __init__(self): self.async_pool = mp.Pool(5) def async_func_member(self, func_idx): print("current aysnc func is {}".format(func_idx)) return {"func_idx": func_idx} def call_async(self): return_dicts = [] for call_num in range(5): return_dicts.append(self.async_pool.apply_async(self.async_func_member, args=(call_num,)).get())這裡注意兩個小細節:
第一個是,apply_async函數的傳參數部分args只接收tuple類型,所以即使只有call_num這個單參數傳入,也要寫成args=(call_num,);
第二個是,通過調用並發的.get()方法,會直接阻塞主進程並等到子進程獲取到結果為止,實際上上述的並發是個串行執行的順序,不過這個和本文重點無關。
好了,現在運行吧:
if __name__ == '__main__': test_async_instance = test_async() ret = test_async_instance.call_async()運行上述代碼後,報錯信息摘取如下:
NotImplementedError: pool objects cannot be passed between processes or pickled這個錯誤的意思是:pool對象是無法被序列化並在進程之間傳遞的。
簡單來說,就是我們自定義的test_async類裡面,不能包含有multiprocessing中的Pool實例,否則apply_async將無法把這個類的成員函數(即self.async_func_member)進行序列化並在不同進程中傳遞。
為什麼呢?這個和Python的序列化庫(pickle)有關,當我們在進程之間傳遞變量或函數或某些自定義類型時,它們都需要能夠被pickle序列化,否則將導致進程之間無法傳遞這些信息。(這裡不深究為什麼pickle不支持序列化pool,重點先看看怎麼解決)
問題暴露了,現在看看怎麼解決比較好。
解決方案一:讓apply_async不調用成員函數。(有點飲鴆止渴)
如果非要在自定義類中實例化Pool(),Manager.Queue()以及Manager.Lock()等對象,那麼就把用於並發的函數寫到該類的外面,以避免序列化帶來的問題:
import multiprocessing as mp
def async_outside(func_idx): print("current aysnc func is {}".format(func_idx)) return {"func_idx": func_idx}
class test_async: def __init__(self): self.async_pool = mp.Pool(5) def call_async(self): return_dicts = [] tmp_async_pool = mp.Pool(5) for call_num in range(5): return_dicts.append(self.async_pool.apply_async(async_outside, args=(call_num,)).get())這樣就避開了序列化的問題,運行結果如下:
current aysnc func is 0current aysnc func is 1current aysnc func is 2current aysnc func is 3current aysnc func is 4[{'func_idx': 0}, {'func_idx': 1}, {'func_idx': 2}, {'func_idx': 3}, {'func_idx': 4}]解決方案二:讓類在序列化的時候認為這個pool對象不存在
這裡涉及到實現兩個特殊的方法:__getstate__()以及__setstate__(),這兩個方法是專門針對序列化設計的。
__getstate__()會告訴pickle,這個類在序列化的時候用到了那些成員變量;
__setstate__()會告訴pickle,這個類在反序列化的時候要從它這裡拿到一些什麼成員變量。
貼代碼:
import multiprocessing as mpclass test_async: def __init__(self): self.async_pool = mp.Pool(5)
def async_func_member(self, func_idx): print("current aysnc func is {}".format(func_idx)) return {"func_idx": func_idx}
def __getstate__(self): pickle_dict = self.__dict__.copy() del pickle_dict['async_pool'] return pickle_dict
def __setstate__(self, state): pass def call_async(self): return_dicts = [] tmp_async_pool = mp.Pool(5) for call_num in range(5): return_dicts.append(self.async_pool.apply_async(self.async_func_member, args=(call_num,)).get()) return return_dicts運行也是可以成功的。
解決方案三:不要將Pool實例化為成員變量
這是筆者比較推薦的解決方案。
進程池Pool,進程之間共享的隊列Manager.Queue()以及鎖Manager.Lock()等都不建議實例化為類的成員變量。
因為實例化為類的成員變量有兩個缺點:
1、該類無法被序列化;
2、當該進程池(本例為self.async_pool)被調用過apply_async(),並且被調用過close()以及join()後,就無法被再次用來做並發了,一個成員變量只有一次被調用的生命周期,把它聲明為成員變量就沒有意義了。
因此,筆者的建議是,對於進程池以及其他進程間共享的數據結構等信息,最好都是即開即用,而不是寫到一個公共區域中重複使用。
建議代碼:
import multiprocessing as mpclass test_async: def __init__(self): "do some initialization"
def async_func_member(self, func_idx): print("current aysnc func is {}".format(func_idx)) return {"func_idx": func_idx}
def call_async(self): return_dicts = [] tmp_async_pool = mp.Pool(5) for call_num in range(5): return_dicts.append(tmp_async_pool.apply_async(self.async_func_member, args=(call_num,)).get()) tmp_async_pool.close() tmp_async_pool.join() return return_dicts運行可以成功。以上這種方法清晰簡潔,做到了對進程池的即開即用,也不需要操心序列化反序列化的問題。
總結
1、python不支持序列化進程池,進程間共享的數據結構等對象;
2、__getstate__() 和 __setstate__()能夠自定義序列化和反序列化的工作;
3、建議對multiprocessing庫中的對象採取即開即用的做法,而不是寫為成員變量。