原文:http://blog.luoyuanhang.com/2017/04/19/mapreduce-notes/
這篇文章是我閱讀 MapReduce 論文:《MapReduce: Simplified Data Processing on Large Clusters》的筆記,這篇筆記概述了 MapReduce 是什麼,它的工作流程,一些細節問題,以及我的個人理解與思考。
《MapReduce: Simplified Data Processing on Large Clusters》:
https://research.google.com/archive/mapreduce-osdi04.pdf
MapReduce 是什麼?
MapReduce 是 Google設計的一種用於大規模數據集的分布式模型,它具有支持並行計算、容錯、易使用等特點。它的設計目標如下:
支持並行
用於分布式
能夠進行錯誤處理(比如機器崩潰)
易於使用(程式設計師友好)
負載均衡
模型流程
MapReduce 模型主要分為 2 個部分:Map 和 Reduce。
在 Map 過程中,Map 函數會獲取輸入的數據,產生一個臨時中間值,它是一個 K/V 對,然後MapReduce Library 會按 Key 值給鍵值對(K/V)分組然後傳遞給 Reduce 函數。而後,Reduce 接收到了這些 K/V 對,會將它們合併。
以論文中的字數統計程序為例:
現在我們來考慮,如果我們有許多文檔,然後我們想要統計在這些文檔中每個字出現的次數,現在用 MapReduce 來解決這個問題。Map 函數所做的工作,就是進行分詞,產生一組形如下表的 K/V 鍵值對:
然後將這組鍵值對傳遞給 Reduce,由 Reduce 進行合併。
具體流程如下:
由用戶程序中調用的 MapReduce Library 將文件分成 M 塊(M 要遠大於 Map Worker 的數量,每塊大小16MB~64MB),此時,進入 MapReduce 過程;
由 Master 給空閒的 Worker 分配任務,共有 M 個 Map 任務,R 個 Reduce 任務;
Map Worker 讀取文件,將文件處理為 K/V 鍵值對,K/V 鍵值對緩存於內存中(此時存在一個問題,如果斷電怎麼辦?往下看後邊有解釋);
將緩存於內存的 K/V 鍵值對寫入磁碟,分成 R 堆(分堆方法有很多種,論文中提到了使用 Hash 散列函數),然後將結果發送給 Master;
Master 將這些 K/V 鍵值對的存儲地址告知 Reduce,Reduce Worker 通過 RPC(遠程過程調用)進行讀取,讀取完畢之後會根據 Key 值進行排序(這樣,相同 Key 值的就會在一起。但是存在一個問題,如果內存不夠大,排序該怎麼進行?可以使用外部排序);
Reduce Worker 將已經排序的結果進行遍歷,將每個 Key 值所對應的一組 Value,所組成的 <key, value[num]>傳遞給用戶所編寫的 reduce 函數進行處理;
所有的 Map,Reduce 任務都完成後,告知用戶程序,MapReduce 已經結束,返回用戶程序。
容錯處理(Fault-Tolerance)
MapReduce 中的容錯處理是非常重要的,因為MapReduce 是運行於分布式環境中的,在分布式環境中經常會有機器出現錯誤,我們不能讓個別機器的錯誤影響到整體。
Worker 崩潰
Master 通過定期給 Worker 發送心跳(heartbeat)來檢測 Worker 是否還在正常工作,如果 Worker 無應答或者是應答有誤,我們認定它已經宕機(fail)。如果正在工作的 Worker 宕機了,那麼運行在它上面的 map 任務會進行初始化(初始狀態為 idle,任務還有其他2種狀態,in-progress處理中,completed 已完成),重新被分配到正常的 Worker 上。
如果說 Map Worker 已經完成了一些工作,我們仍然要對運行在它上面的所有任務重新進行分配,這是為什麼呢?這裡同時可以解決上面的那個問題。因為 Map Worker 處理後的中間結果存在於內存中,或者是 local disk 中,一旦它宕機,這些數據就獲取不到了。
但是對於 Reduce Worker,它完成的任務不用重做,因為它處理後的結果是保存在全局存儲中的。
如果,在 Map Worker A 宕機之後,它所做的任務被重新分配給了 Map Worker B,後邊的 Reduce Worker 會被告知,A 已經宕機,要去 B 去讀取數據。
Master 崩潰
如果說 MapReduce 的 Master 宕機了,又該如何處理呢?
MapReduce 中的 Master 會定期進行 checkpoint 備份,如果 Master 宕機,會根據之前的 checkpoint 進行恢復,但是恢復期間,MapReduce 任務會中斷。
一些細節問題
因為用戶的 reduce 函數是 deterministic 的,所以即使有多個 Reduce Worker 都執行了同一個任務,但是它們執行的結果都是一樣的,並不影響最後的結果。
正是因為 reduce 函數是 non-deterministic 的,本來每次執行的結果也不確定,所以更不會產生影響。
Input 文件保存於 GFS 中,GFS 會將它們分塊保存(每塊16MB~64MB),GFS 會對每個文件有3個備份,備份在不同的機器上。
遵循就『近』原則,將任務分配給離任務所保存的位置最『近』的 Worker,這裡對『近』的定義是網絡層面上的,比如說在同一個交換機下的兩個機器就是距離『近』的。
一開始將文件分塊時,分為 M 塊,遠大於 Map Worker 的數量就有助於負載均衡。同時,這樣做還有一個好處,就是當一個 Worker 宕機的時候,可以將任務迅速分配開來,分到多個 Worker 上去。如果 M 比較小,有可能當一個 Worker 宕機時,它的任務不夠分配到剩下的 Worker 中,會有 Worker 閒置。
MapReduce 有一種機制應對這種情況:MapReduce 會對未完成的任務(in-progress) 定時執行備份執行操作(即,把這些正在某些 Worker 上執行但未完成的任務再次分配給其他 Worker 去執行),不論這個任務被哪個 Worker 完成都會被標記為已完成。
MapReduce 給用戶提供了一個 Combiner 函數,這個函數可以將結果在發送到網絡之前進行合併,例如發送鍵值對<」by」, 3>。