實驗目的
利用搭建好的大數據平臺 Hadoop,對 HDFS 中的文本文件進行處理,採用 Hadoop Steaming 方式,使用 Python 語言實現英文單詞的統計功能,並輸出單詞統計結果。
實驗內容
將附件"COPYING_LGPL.txt"上傳 Hadoop 集群的 HDFS 中,採用 Hadoop Steaming方式,使用 Python語言實現字詞統計功能,輸出字詞統計結果,即實現文本單詞的詞頻統計功能。要求將實驗原理,過程,代碼分析,結果分析記錄在實驗報告中。
實驗步驟
實驗原理簡述 MapReduce 的 Data Flow 如下圖所示,原始數據經過 mapper 處理,再進行 partition 和 sort,到達 reducer,輸出最後結果。Hadoop 的MapReduce處理框架,一般的編程模型如下圖所示, 將一個業務拆分為 Mapper 和 Reducer 兩個階段。使用 Python 語言背後的「技巧」是我們將使用 Hadoop Streaming API 來幫助我們通過 STDIN(標準輸入)和 STDOUT(標準輸出)在 Map 和 Reduce 代碼之間傳遞數據。我們將簡單地使用 Python 的 sys.stdin 來讀取輸入數據並將我們自己的輸出列印到 sys.stdout。這就是我們需要做的全部,因為 Hadoop Streaming 會幫助我們處理其他所有事情!
使用 Python 來調用 Hadoop Streaming API,其基本流程如下圖。用 Python 寫MapReduce 還需要了解 HadoopStreaming ,在 Apache 的 Hadoop 官網可以查看HadoopStreaming 的運行機制,簡單來說就是 HadoopStreaming 是可運行特殊腳本的MapReduce 作業的工具 ,使用格式如下:
hadoop jar /home/hadoop/app/hadoop-2.7.7/share/hadoop/tools/lib/hadoopstreaming-2.7.7.jar -files /home/hadoop/mapper.py -mapper /home/hadoop/mapper.py -files /home/hadoop/reducer.py -reducer /home/hadoop/reducer.py -input /wordcount/COPYING_LGPL.txt -output /wordcount/output
實驗過程將本地物理機的測試文本文件 COPYING_LGPL.txt 上傳到虛擬主機 Master 上,在從 Master 上傳到 Hadoop 集群的 HDFS 文件系統上/wordcount/COPYING_LGPL.txt。
使用 Python 編寫 MapReduce 程序,分別根據實現原理編寫 Mapper 程序和Reducer 程序,使用 Vim 編寫 Mapper 和 Reducer 腳本,並使兩個腳本具有可執行權限,及使用命令: chmod +x mapper.py reducer.py。
使用 HadoopStreaming 命令來運行自己編寫的程序,其命令如下:
hadoop jar /home/hadoop/app/hadoop-2.7.7/share/hadoop/tools/lib/hadoopstreaming-2.7.7.jar -files /usr/bin/mapper.py -files /usr/bin/reducer.py -mapper "python /usr/bin/mapper.py" -reducer "python /usr/bin/reducer.py" -input /wordcount/input/COPYING_LGPL.txt -output /wordcount/output
可以編寫一個 shell 腳本命令,來運行 HadoopStreaming 命令,這樣在 shell 腳本中首先使用刪掉輸出目錄文件的命令(hdfs dfs -rm -r -f /wordcount/output),防止多次測試出錯, 同時每次測試只需要運行 shell 腳本即可,這樣在做實驗的時候更加方便操作,而不用每次都敲命令。
對HadoopStreaming 命令進行解釋:
hadoop jar #指調用hadoop jar包的命令
/home/hadoop/app/hadoop-2.7.7/share/hadoop/tools/lib/hadoopstreaming-2.7.7.jar #調用HadoopStreaming 命令的jar包
-files /usr/bin/mapper.py #提交的作業的路徑
-files /usr/bin/reducer.py #提交的作業的路徑
-mapper "python /usr/bin/mapper.py" #mapper程序的解釋器python以及程序路徑
-reducer "python /usr/bin/reducer.py" #reducer程序的解釋器python以及程序路徑
-input /wordcount/input/COPYING_LGPL.txt #HDFS上的輸入文件的路徑
-output /wordcount/output #HDFS上的輸出文件的路徑
HadoopStreaming API 的調用接口說明: 調用 python 中的標準輸入流 sys.stdin ,MAP 具體過程是, HadoopStream 每次從 input 文件讀取一行數據,然後傳到 sys.stdin中,運行 payhon 的 map 函數腳本,然後用 print 輸出回 HadoopStreeam。 REDUCE 過程一樣。所以 M 和 R 函數的輸入格式為 for line in sys.stdin:line=line.strip。
Mapper 過程如下: 第一步,在每個節點上運行我們編寫的 map 程序 ,即就是 調用標準輸入流 , 讀取文本內容,對文本內容分詞,形成一個列表,讀取列表中每一個元素的值 , Map 函數輸出, key 為 word,下一步將進行 shuffle 過程,將按照key 排序,輸出,這兩步為 map 階段工作為,在本地節點進行,第二步, hadoop 框架,把我們運行的結果,進入 shuffle 過程,每個節點對 key 單獨進行排序,然後輸出。Reducer 過程:第一步, merge 過程,把所有節點匯總到一個節點,合併並且按照 key排序。第二步,運行 reducer 函數。
Python原始碼
分析 WordCount 程序實例的實現原理步驟,具體 Python 代碼如下原始碼所示,前面是簡要原理的實現,後面是使用 Python 的迭代器和生成器升級 mapper 程序和 reducer 程序。
MapReduce 的 WordCount 簡要原理 Python 實現原始碼如下,分為Mapper階段和Reducer階段,想要查看Python原始碼的請查看我的博客文章,博客名雲主宰蒼穹。