Hadoop Streaming

2020-10-20 AI職場技術分享


Hadoop生態有非常多的工具可以用於大數據的管理和數據處理。這裡我們給大家詳細介紹一下,如何使用Hadoop Streaming這個方式,對大數據進行處理。

Hadoop Streaming

Hadoop streaming是Hadoop的一個工具, 它幫助用戶創建和運行一類特殊的map/reduce作業, 這些特殊的map/reduce作業是由一些可執行文件或腳本文件充當mapper或者reducer。例如在hadoop環境下的命令行可以執行:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input myInputDirs \ -output myOutputDir \ -mapper /bin/cat \ -reducer /bin/wc

看不懂別著急,咱們馬上來分析一下上述的代碼。

Streaming工作原理

在上面的例子裡,mapper和reducer都是可執行文件,它們從標準輸入讀入數據(一行一行讀), 並把計算結果發給標準輸出。Streaming工具會創建一個Map/Reduce作業, 並把它發送給合適的集群,同時監視這個作業的整個執行過程。

如果一個可執行文件被用於mapper,則在mapper初始化時, 每一個mapper任務會把這個可執行文件作為一個單獨的進程啟動。 <br data-tomark-pass>
mapper任務運行時,它把輸入切分成行並把每一行提供給可執行文件進程的標準輸入。 同時,mapper收集可執行文件進程標準輸出的內容,並把收到的每一行內容轉化成key/value對,作為mapper的輸出。 <br data-tomark-pass>
默認情況下,一行中第一個tab之前的部分作為key,之後的(不包括tab)作為value。 如果沒有tab,整行作為key值,value值為null。不過,這可以定製,在下文中將會討論如何自定義key和value的切分方式。

如果一個可執行文件被用於reducer,每個reducer任務會把這個可執行文件作為一個單獨的進程啟動。<br data-tomark-pass>
Reducer任務運行時,它把輸入切分成行並把每一行提供給可執行文件進程的標準輸入。 同時,reducer收集可執行文件進程標準輸出的內容,並把每一行內容轉化成key/value對,作為reducer的輸出。 <br data-tomark-pass>
默認情況下,一行中第一個tab之前的部分作為key,之後的(不包括tab)作為value。在下文中將會討論如何自定義key和value的切分方式。

這是Map/Reduce框架和streaming mapper/reducer之間的基本通信協議。

用戶也可以使用java類作為mapper或者reducer。上面的例子與這裡的代碼等價:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input myInputDirs \ -output myOutputDir \ -mapper org.apache.hadoop.mapred.lib.IdentityMapper \ -reducer /bin/wc

用戶可以設定stream.non.zero.exit.is.failure true 或false 來表明streaming task的返回值非零時是 Failure 還是Success。默認情況,streaming task返回非零時表示失敗。

將文件打包到提交的作業中

我們要開始講關鍵點了,並不是每位同學都對java熟悉程度這麼高。沒關係,hadoop允許我們用腳本語言完成處理過程,並把文件打包提交到作業中,完成大數據的處理。

<font color="red">任何可執行文件都可以被指定為mapper/reducer。這些可執行文件不需要事先存放在集群上;如果在集群上還沒有,則需要用-file選項讓framework把可執行文件作為作業的一部分,一起打包提交。</font>例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input myInputDirs \ -output myOutputDir \ -mapper myPythonScript.py \ -reducer /bin/wc \ -file myPythonScript.py

<font color="red">上面的例子描述了一個用戶把可執行python文件作為mapper。 其中的選項「-file myPythonScirpt.py」使可執行python文件作為作業提交的一部分被上傳到集群的機器上。</font>

除了可執行文件外,其他mapper或reducer需要用到的輔助文件(比如字典,配置文件等)也可以用這種方式打包上傳。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input myInputDirs \ -output myOutputDir \ -mapper myPythonScript.py \ -reducer /bin/wc \ -file myPythonScript.py \ -file myDictionary.txt

Streaming選項與用法

只使用Mapper的作業

有時只需要map函數處理輸入數據。這時只需把mapred.reduce.tasks設置為零,Map/reduce框架就不會創建reducer任務,mapper任務的輸出就是整個作業的最終輸出。

為了做到向下兼容,Hadoop Streaming也支持「-reduce None」選項,它與「-jobconf mapred.reduce.tasks=0」等價。

為作業指定其他插件

和其他普通的Map/Reduce作業一樣,用戶可以為streaming作業指定其他插件:

-inputformat JavaClassName
-outputformat JavaClassName
-partitioner JavaClassName
-combiner JavaClassName
用於處理輸入格式的類要能返回Text類型的key/value對。如果不指定輸入格式,則默認會使用TextInputFormat。 因為TextInputFormat得到的key值是LongWritable類型的(其實key值並不是輸入文件中的內容,而是value偏移量), 所以key會被丟棄,只把value用管道方式發給mapper。

用戶提供的定義輸出格式的類需要能夠處理Text類型的key/value對。如果不指定輸出格式,則默認會使用TextOutputFormat類。

Hadoop Streaming中的大文件和檔案

任務使用-cacheFile和-cacheArchive選項在集群中分發文件和檔案,選項的參數是用戶已上傳至HDFS的文件或檔案的URI。這些文件和檔案在不同的作業間緩存。用戶可以通過fs.default.name.config配置參數的值得到文件所在的host和fs_port。

這個是使用-cacheFile選項的例子:

-cacheFile hdfs://host:fs_port/user/testfile.txt#testlink

在上面的例子裡,url中#後面的部分是建立在任務當前工作目錄下的符號連結的名字。這裡的任務的當前工作目錄下有一個「testlink」符號連結,它指向testfile.txt文件在本地的拷貝。如果有多個文件,選項可以寫成:

-cacheFile hdfs://host:fs_port/user/testfile1.txt#testlink1 -cacheFile hdfs://host:fs_port/user/testfile2.txt#testlink2

-cacheArchive選項用於把jar文件拷貝到任務當前工作目錄並自動把jar文件解壓縮。例如:

-cacheArchive hdfs://host:fs_port/user/testfile.jar#testlink3

在上面的例子中,testlink3是當前工作目錄下的符號連結,它指向testfile.jar解壓後的目錄。

下面是使用-cacheArchive選項的另一個例子。其中,input.txt文件有兩行內容,分別是兩個文件的名字:testlink/cache.txt和testlink/cache2.txt。「testlink」是指向檔案目錄(jar文件解壓後的目錄)的符號連結,這個目錄下有「cache.txt」和「cache2.txt」兩個文件。

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input "/user/me/samples/cachefile/input.txt" \ -mapper "xargs cat" \ -reducer "cat" \ -output "/user/me/samples/cachefile/out" \ -cacheArchive 'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar#testlink' \ -jobconf mapred.map.tasks=1 \ -jobconf mapred.reduce.tasks=1 \ -jobconf mapred.job.name="Experiment"

再來看一樣過程和內容

$ ls test_jar/cache.txt cache2.txt$ jar cvf cachedir.jar -C test_jar/ .added manifestadding: cache.txt(in = 30) (out= 29)(deflated 3%)adding: cache2.txt(in = 37) (out= 35)(deflated 5%)$ hadoop dfs -put cachedir.jar samples/cachefile$ hadoop dfs -cat /user/me/samples/cachefile/input.txttestlink/cache.txttestlink/cache2.txt$ cat test_jar/cache.txt This is just the cache string$ cat test_jar/cache2.txt This is just the second cache string$ hadoop dfs -ls /user/me/samples/cachefile/out Found 1 items/user/me/samples/cachefile/out/part-00000 <r 3> 69$ hadoop dfs -cat /user/me/samples/cachefile/out/part-00000This is just the cache string This is just the second cache string

為作業指定附加配置參數

用戶可以使用「-jobconf <n>=<v>」增加一些配置變量。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input myInputDirs \ -output myOutputDir \ -mapper org.apache.hadoop.mapred.lib.IdentityMapper\ -reducer /bin/wc \ -jobconf mapred.reduce.tasks=2

上面的例子中,-jobconf mapred.reduce.tasks=2表明用兩個reducer完成作業。<br data-tomark-pass>
-jobconf mapred.map.tasks=2表明用兩個mapper完成作業。

其他選項

Streaming 作業的其他選項如下表:

選項可選/必須描述-cluster name可選在本地Hadoop集群與一個或多個遠程集群間切換-dfs host:port or local可選覆蓋作業的HDFS配置-jt host:port or local可選覆蓋作業的JobTracker配置-additionalconfspec specfile可選用一個類似於hadoop-site.xml的XML文件保存所有配置,從而不需要用多個"-jobconf name=value"類型的選項單獨為每個配置變量賦值-cmdenv name=value可選傳遞環境變量給streaming命令-cacheFile fileNameURI可選指定一個上傳到HDFS的文件-cacheArchive fileNameURI可選指定一個上傳到HDFS的jar文件,這個jar文件會被自動解壓縮到當前工作目錄下-inputreader JavaClassName可選為了向下兼容:指定一個record reader類(而不是input format類)-verbose可選詳細輸出

使用-cluster <name>實現「本地」Hadoop和一個或多個遠程Hadoop集群間切換。默認情況下,使用hadoop-default.xml和hadoop-site.xml;當使用-cluster <name>選項時,會使用$HADOOP_HOME/conf/hadoop-<name>.xml。

下面的選項改變temp目錄:

-jobconf dfs.data.dir=/tmp

下面的選項指定其他本地temp目錄:

-jobconf mapred.local.dir=/tmp/local-jobconf mapred.system.dir=/tmp/system-jobconf mapred.temp.dir=/tmp/temp

更多有關jobconf的細節請參考:http://wiki.apache.org/hadoop/JobConfFile

在streaming命令中設置環境變量:

-cmdenv EXAMPLE_DIR=/home/example/dictionaries/

高級功能與其他例子

使用自定義的方法切分行來形成Key/Value對

之前已經提到,當Map/Reduce框架從mapper的標準輸入讀取一行時,它把這一行切分為key/value對。 在默認情況下,每行第一個tab符之前的部分作為key,之後的部分作為value(不包括tab符)。

但是,用戶可以自定義,可以指定分隔符是其他字符而不是默認的tab符,或者指定在第n(n>=1)個分割符處分割而不是默認的第一個。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input myInputDirs \ -output myOutputDir \ -mapper org.apache.hadoop.mapred.lib.IdentityMapper \ -reducer org.apache.hadoop.mapred.lib.IdentityReducer \ -jobconf stream.map.output.field.separator=. \ -jobconf stream.num.map.output.key.fields=4

在上面的例子,「-jobconf stream.map.output.field.separator=.」指定「.」作為map輸出內容的分隔符,並且從在第四個「.」之前的部分作為key,之後的部分作為value(不包括這第四個「.」)。 如果一行中的「.」少於四個,則整行的內容作為key,value設為空的Text對象(就像這樣創建了一個Text:new Text(""))。

同樣,用戶可以使用「-jobconf stream.reduce.output.field.separator=SEP」和「-jobconf stream.num.reduce.output.fields=NUM」來指定reduce輸出的行中,第幾個分隔符處分割key和value。

一個實用的Partitioner類 (二次排序,-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 選項)

Hadoop有一個工具類org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner, 它在應用程式中很有用。Map/reduce框架用這個類切分map的輸出, 切分是基於key值的前綴,而不是整個key。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input myInputDirs \ -output myOutputDir \ -mapper org.apache.hadoop.mapred.lib.IdentityMapper \ -reducer org.apache.hadoop.mapred.lib.IdentityReducer \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \ -jobconf stream.map.output.field.separator=. \ -jobconf stream.num.map.output.key.fields=4 \ -jobconf map.output.key.field.separator=. \ -jobconf num.key.fields.for.partition=2 \ -jobconf mapred.reduce.tasks=12

其中,-jobconf stream.map.output.field.separator=. 和-jobconf stream.num.map.output.key.fields=4是前文中的例子。Streaming用這兩個變量來得到mapper的key/value對。

上面的Map/Reduce 作業中map輸出的key一般是由「.」分割成的四塊。但是因為使用了 -jobconf num.key.fields.for.partition=2 選項,所以Map/Reduce框架使用key的前兩塊來切分map的輸出。其中,-jobconf map.output.key.field.separator=. 指定了這次切分使用的key的分隔符。這樣可以保證在所有key/value對中, key值前兩個塊值相同的所有key被分到一組,分配給一個reducer。

這種高效的方法等價於指定前兩塊作為主鍵,後兩塊作為副鍵。 主鍵用於切分塊,主鍵和副鍵的組合用於排序。一個簡單的示例如下:

Map的輸出(key)

11.12.1.211.14.2.311.11.4.111.12.1.111.14.2.2

切分給3個reducer(前兩塊的值用於切分)

11.11.4.1-----------11.12.1.211.12.1.1-----------11.14.2.311.14.2.2

在每個切分後的組內排序(四個塊的值都用於排序)

11.11.4.1-----------11.12.1.111.12.1.2-----------11.14.2.211.14.2.3

Hadoop聚合功能包的使用(-reduce aggregate 選項)

Hadoop有一個工具包「Aggregate」( https://svn.apache.org/repos/asf/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate)。 「Aggregate」提供一個特殊的reducer類和一個特殊的combiner類, 並且有一系列的「聚合器」(「aggregator」)(例如「sum」,「max」,「min」等)用於聚合一組value的序列。 用戶可以使用Aggregate定義一個mapper插件類, 這個類用於為mapper輸入的每個key/value對產生「可聚合項」。 combiner/reducer利用適當的聚合器聚合這些可聚合項。

要使用Aggregate,只需指定「-reducer aggregate」:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input myInputDirs \ -output myOutputDir \ -mapper myAggregatorForKeyCount.py \ -reducer aggregate \ -file myAggregatorForKeyCount.py \ -jobconf mapred.reduce.tasks=12

python程序myAggregatorForKeyCount.py例子:

#!/usr/bin/pythonimport sys;def generateLongCountToken(id): return "LongValueSum:" + id + "\t" + "1"def main(argv): line = sys.stdin.readline(); try: while line: line = line[:-1]; fields = line.split("\t"); print generateLongCountToken(fields[0]); line = sys.stdin.readline(); except "end of file": return Noneif __name__ == "__main__": main(sys.argv)

欄位的選取(類似於unix中的 'cut' 命令)

Hadoop的工具類org.apache.hadoop.mapred.lib.FieldSelectionMapReduce幫助用戶高效處理文本數據, 就像unix中的「cut」工具。工具類中的map函數把輸入的key/value對看作欄位的列表。 用戶可以指定欄位的分隔符(默認是tab), 可以選擇欄位列表中任意一段(由列表中一個或多個欄位組成)作為map輸出的key或者value。 同樣,工具類中的reduce函數也把輸入的key/value對看作欄位的列表,用戶可以選取任意一段作為reduce輸出的key或value。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input myInputDirs \ -output myOutputDir \ -mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\ -reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \ -jobconf map.output.key.field.separa=. \ -jobconf num.key.fields.for.partition=2 \ -jobconf mapred.data.field.separator=. \ -jobconf map.output.key.value.fields.spec=6,5,1-3:0- \ -jobconf reduce.output.key.value.fields.spec=0-2:5- \ -jobconf mapred.reduce.tasks=12

選項「-jobconf map.output.key.value.fields.spec=6,5,1-3:0-」指定了如何為map的輸出選取key和value。Key選取規則和value選取規則由「:」分割。 在這個例子中,map輸出的key由欄位6,5,1,2和3組成。輸出的value由所有欄位組成(「0-」指欄位0以及之後所有欄位)。

選項「-jobconf reduce.output.key.value.fields.spec=0-2:0-」(譯者註:此處應為」0-2:5-「)指定如何為reduce的輸出選取value。 本例中,reduce的輸出的key將包含欄位0,1,2(對應於原始的欄位6,5,1)。 reduce輸出的value將包含起自欄位5的所有欄位(對應於所有的原始欄位)。

相關焦點

  • 利用Hadoop Streaming處理二進位格式文件
    hadoop Streaming是Hadoop提供的多語言編程工具,用戶可以使用自己擅長的程式語言(比如python、php或C#等)編寫Mapper和Reducer處理文本數據。
  • Hadoop經典案例--詞頻統計
    streaming執行map-reduce任務了,命令行執行user@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \-file /home/hduser/mapper.py -mapper /home/hduser/mapper.py
  • Hadoop之wordcount實例-MapReduce程序
    用 Python 寫MapReduce 還需要了解 HadoopStreaming ,在 Apache 的 Hadoop 官網可以查看HadoopStreaming 的運行機制,簡單來說就是 HadoopStreaming 是可運行特殊腳本的MapReduce 作業的工具 ,使用格式如下:hadoop jar /home/hadoop/app/hadoop-2.7.7/share/hadoop
  • 好程式設計師大數據培訓之Hadoop常見問題
    大數據培訓  1、100個以上hadoop節點,一般怎麼開發,運維?   a.首先大數據的應用開發和hadoop集群的規模是沒有關係,你指的是集群的搭建和運維嗎,對於商用的hadoop系統來說涉及到很多東西。   b.任務的分配是有hadoop的調度器的調度策略決定的,默認為FIFO調度,商業集群一般使用多隊列多用戶調度器。
  • Hadoop
    #顯示/下的所有文件夾信息hadoop fs -ls /#遞歸顯示所有文件夾和子文件(夾)hadoop fs -lsr#創建/user/hadoop目錄hadoop fs -mkdir /user/hadoop#把a.txt放到集群/user/hadoop/文件夾下hadoop fs -put a.txt /user/hadoop/#把集群上的/user/
  • 在Ubuntu上裝Hadoop
    /core/releases.html 下載最近發布的版本 最好為hadoop創建一個用戶: 比如創建一個group為hadoop user為hadoop的用戶以及組  $ sudo addgroup hadoop $ sudo adduser --ingroup
  • SparkStreaming+Flume集成例子
    import org.apache.spark.SparkConfimport org.apache.spark.streaming./start-all.sh7.2 提交Spark任務 spark-submit \ --class org.apache.spark.examples.streaming.FlumePushWordCount \ --packages org.apache.spark:spark-streaming-flume
  • Hadoop知識整理
    >(d)運行MapReduce程序bin/hadoop jarshare/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar wordcount /user/hadoop/input/ /user/hadoop/output(e)查看輸出結果bin/hdfs
  • Hadoop安裝部署
    上傳hadoop-2.6.0-cdh5.tar.gz到/home/hadoop/software/tar xvzf hadoop-2.6.0-cdh5.tar.gzmv hadoop-2.6.0-cdh5 hadoop2cd /home/hadoop
  • hadoop 高可用方案
    zookeeper-3.4.5/ hadoop06:/hadoop/ scp -r /hadoop/zookeeper-3.4.5/ hadoop07:/hadoop/ 注意:修改hadoop06、hadoop07對應/hadoop/zookeeper-3.4.5/tmp/myid內容 hadoop06: echo 2 >
  • hadoop服務快速部署
    這篇文章記錄下針對不同的hadoop版本進行服務部署的過程,希望可以幫到你們安裝docker hadoop2.7.0一鍵部署docker hadoop3.0.0集群(一個master 三個slave)安裝docker hadoop 3.2.0 a、docker啟動 b、docker compose方式啟動
  • hadoop HA集群的安裝
    ,hadoop2,hadoop3,hadoop44臺主機的hadoop用戶的ssh-key需要放置於每一臺主機hadoop用戶之下。4.4.1 啟動journalnode[hadoop@10-110-92-161 ~]$ cd /usr/local/hadoop/[hadoop@10-110-92-161 hadoop]$ sbin/hadoop-daemon.sh start journalnode[hadoop@10-110-92-161 hadoop]$ jps1557 JournalNode22439
  • Hadoop大數據面試題全版本
    答:第一題:1使用root帳戶登錄2 修改IP3 修改host主機名4 配置SSH免密碼登錄5 關閉防火牆6 安裝JDK6 解壓hadoop安裝包7 配置hadoop的核心文件 hadoop-env.sh,core-site.xml , mapred-site.xml , hdfs-site.xml
  • Hadoop 多節點集群
    # mkdir /opt/hadoop# cd /opt/hadoop/# wget http://apache.mesi.com.ar/hadoop/common/hadoop-1.2.1/hadoop-1.2.0.tar.gz# tar -xzf hadoop-1.2.0.tar.gz# mv hadoop-1.2.0 hadoop
  • hadoop-常用的操作命令
    創建目錄hadoop dfs -mkdir /home上傳文件或目錄到hdfshadoop dfs -put hello /hadoop dfs -put hellodir/ /查看目錄hadoop dfs -ls /創建一個空文件hadoop dfs -touchz /wahaha刪除一個文件hadoop dfs -rm /wahaha刪除一個目錄
  • Hadoop分布式環境搭建
    scp –r /home/hadoop/app hadoop@slaver1:/home/hadoopscp –r /home/hadoop/app hadoop@slaver2:/home/hadoop分發 master 主機上的環境變量配置文件。
  • Hadoop HDFS詳細操作
    3. hadoop fs -moveFromLocal 、hadoop fs -put、hadoop fs -copyFromLocal上傳本地文件:將本地文件上傳到hdfs中。查看文件內容hadoop fs -cat <文件路徑>hadoop fs -text <文件路徑># 顯示HDFS上某文件最後1kb內容hadoop fs -tail <HDFS文件路徑># 顯示HDFS上某文件最後1kb內容, 如果有心數據進來會實時顯示hadoop fs -tail -f <HDFS文件路徑>
  • hadoop3 啟動 - CSDN
    1 啟動start-all.sh  在子工程hadoop-common-project下的模塊hadoop-common中的bin目錄裡,使用start-all.sh啟動工程.