這年頭不會點Python都不好意思跟隔壁老王打招呼,數盟線下工作坊:手把手教你玩轉Python,詳情點擊文末閱讀原文!
作者:周明耀
出處:developerWorks 中國
目前的大數據處理可以分為以下三個類型:
複雜的批量數據處理(batch data processing),通常的時間跨度在數十分鐘到數小時之間;
基於歷史數據的交互式查詢(interactive query),通常的時間跨度在數十秒到數分鐘之間;
基於實時數據流的數據處理(streaming data processing),通常的時間跨度在數百毫秒到數秒之間。
大數據處理勢必需要依賴集群環境,而集群環境有三大挑戰,分別是並行化、單點失敗處理、資源共享,分別可以採用以並行化的方式重寫應用程式、對單點失敗的處理方式、動態地進行計算資源的分配等解決方案來面對挑戰。
針對集群環境出現了大量的大數據編程框架,首先是 Google 的 MapReduce,它給我們展示了一個簡單通用和自動容錯的批處理計算模型。但是對於其他類型的計算,比如交互式和流式計算,MapReduce 並不適合。這也導致了大量的不同於 MapReduce 的專有的數據處理模型的出現,比如 Storm、Impala 等等。但是這些專有系統也存在一些不足:
重複工作:許多專有系統在解決同樣的問題,比如分布式作業以及容錯,舉例來說,一個分布式的 SQL 引擎或者一個機器學習系統都需要實現並行聚合,這些問題在每個專有系統中會重複地被解決。
組合問題:在不同的系統之間進行組合計算是一件麻煩的事情。對於特定的大數據應用程式而言,中間數據集是非常大的,而且移動的成本很高。在目前的環境下,我們需要將數據複製到穩定的存儲系統,比如 HDFS,以便在不同的計算引擎中進行分享。然而,這樣的複製可能比真正的計算所花費的代價要大,所以以流水線的形式將多個系統組合起來效率並不高。
適用範圍的局限性:如果一個應用不適合一個專有的計算系統,那麼使用者只能換一個系統,或者重寫一個新的計算系統。
資源分配:在不同的計算引擎之間進行資源的動態共享比較困難,因為大多數的計算引擎都會假設它們在程序運行結束之前擁有相同的機器節點的資源。
管理問題:對於多個專有系統,需要花費更多的精力和時間來管理和部署,尤其是對於終端使用者而言,需要學習多種 API 和系統模型。
Spark 是伯克利大學推出的大數據處理框架,它提出了 RDD 概念 (Resilient Distributed Datasets),即抽象的彈性數據集概念。Spark 是對 MapReduce 模型的一種擴展。要在 MapReduce 上實現其不擅長的計算工作 (比如迭代式、交互式和流式),是比較困難的,因為 MapReduce 缺少在並行計算的各個階段進行有效的數據共享的能力,而這種能力是 RDD 的本質所在。利用這種有效地數據共享和類似 MapReduce 的操作接口,上述的各種專有類型計算都能夠有效地表達,而且能夠獲得與專有系統同等的性能。
MapReduce 和 Spark 介紹MapReduceMapReduce 是為 Apache Hadoop 量身訂做的,它非常適用於 Hadoop 的使用場景,即大規模日誌處理系統、批量數據提取加載工具 (ETL 工具) 等類似操作。但是伴隨著 Hadoop 地盤的不斷擴張,Hadoop 的開發者們發現 MapReduce 在很多場景下並不是最佳選擇,於是 Hadoop 開始把資源管理放入到自己獨立的組件 YARN 裡面。此外,類似於 Impala 這樣的項目也開始逐漸進入到我們的架構中,Impala 提供 SQL 語義,能查詢存儲在 Hadoop 的 HDFS 和 HBase 中的 PB 級大數據。之前也有類似的項目,例如 Hive。Hive 系統雖然也提供了 SQL 語義,但由於 Hive 底層執行使用的是 MapReduce 引擎,仍然是一個批處理過程,難以滿足查詢的交互性。相比之下,Impala 的最大特點也是最大賣點就是它的效率。
第一代 Hadoop MapReduce 是一個在計算機集群上分布式處理海量數據集的軟體框架,包括一個 JobTracker 和一定數量的 TaskTracker。運行流程圖如圖 1 所示。
圖 1. MapReduce 運行流程圖在最上層有 4 個獨立的實體,即客戶端、jobtracker、tasktracker 和分布式文件系統。客戶端提交 MapReduce 作業;jobtracker 協調作業的運行;jobtracker 是一個 Java 應用程式,它的主類是 JobTracker;tasktracker 運行作業劃分後的任務,tasktracker 也是一個 Java 應用程式,它的主類是 TaskTracker。Hadoop 運行 MapReduce 作業的步驟主要包括提交作業、初始化作業、分配任務、執行任務、更新進度和狀態、完成作業等 6 個步驟。
Spark 生態系統的目標就是將批處理、交互式處理、流式處理融合到一個軟體框架內。Spark 是一個基於內存計算的開源的集群計算系統,目的是讓數據分析更加快速。Spark 非常小巧玲瓏,由加州伯克利大學 AMP 實驗室的 Matei 為主的小團隊所開發。使用的語言是 Scala,項目的 core 部分的代碼只有 63 個 Scala 文件,非常短小精悍。Spark 啟用了內存分布數據集,除了能夠提供交互式查詢外,它還可以優化迭代工作負載。Spark 提供了基於內存的計算集群,在分析數據時將數據導入內存以實現快速查詢,速度比基於磁碟的系統,如 Hadoop 快很多。Spark 最初是為了處理迭代算法,如機器學習、圖挖掘算法等,以及交互式數據挖掘算法而開發的。在這兩種場景下,Spark 的運行速度可以達到 Hadoop 的幾百倍。
Spark 允許應用在內存中保存工作集以便高效地重複利用,它支持多種數據處理應用,同時也保持了 MapReduce 的重要特性,如高容錯性、數據本地化、大規模數據處理等。此外,提出了彈性分布式數據集 (Resilient Distributed Datasets) 的概念:
RDD 表現為一個 Scala 對象,可由一個文件創建而來;
分布在一個集群內的,不可變的對象切分集;
通過並行處理(map、filter、groupby、join)固定數據(BaseRDD)創建模型,生成 Transformed RDD;
故障時可使用 RDD 血統信息重建;
可高速緩存,以便再利用。
圖 2 所示是一個日誌挖掘的示例代碼,首先將日誌數據中的 error 信息導入內存,然後進行交互搜索。
圖 2. RDD 代碼示例在導入數據時,模型以 block 形式存在於 worker 上,由 driver 向 worker 分發任務,處理完後 work 向 driver 反饋結果。也可在 work 上對數據模型建立高速緩存 cache,對 cache 的處理過程與 block 類似,也是一個分發、反饋的過程。
Spark 的 RDD 概念能夠取得和專有系統同樣的性能,還能提供包括容錯處理、滯後節點處理等這些專有系統缺乏的特性。
迭代算法:這是目前專有系統實現的非常普遍的一種應用場景,比如迭代計算可以用於圖處理和機器學習。RDD 能夠很好地實現這些模型,包括 Pregel、HaLoop 和 GraphLab 等模型。
關係型查詢:對於 MapReduce 來說非常重要的需求就是運行 SQL 查詢,包括長期運行、數小時的批處理作業和交互式的查詢。然而對於 MapReduce 而言,對比並行資料庫進行交互式查詢,有其內在的缺點,比如由於其容錯的模型而導致速度很慢。利用 RDD 模型,可以通過實現許多通用的資料庫引擎特性,從而獲得很好的性能。
MapReduce 批處理:RDD 提供的接口是 MapReduce 的超集,所以 RDD 能夠有效地運行利用 MapReduce 實現的應用程式,另外 RDD 還適合更加抽象的基於 DAG 的應用程式。
流式處理:目前的流式系統也只提供了有限的容錯處理,需要消耗系統非常大的拷貝代碼或者非常長的容錯時間。特別是在目前的系統中,基本都是基於連續計算的模型,常住的有狀態的操作會處理到達的每一條記錄。為了恢復失敗的節點,它們需要為每一個操作複製兩份操作,或者將上遊的數據進行代價較大的操作重放,利用 RDD 實現離散數據流,可以克服上述問題。離散數據流將流式計算當作一系列的短小而確定的批處理操作,而不是常駐的有狀態的操作,將兩個離散流之間的狀態保存在 RDD 中。離散流模型能夠允許通過 RDD 的繼承關係圖進行並行性的恢復而不需要進行數據拷貝。
Spark 內部術語解釋Application:基於 Spark 的用戶程序,包含了 driver 程序和集群上的 executor;
Driver Program:運行 main 函數並且新建 SparkContext 的程序;
Cluster Manager:在集群上獲取資源的外部服務 (例如:standalone,Mesos,Yarn);
Worker Node:集群中任何可以運行應用代碼的節點;
Executor:是在一個 worker node 上為某應用啟動的一個進程,該進程負責運行任務,並且負責將數據存在內存或者磁碟上。每個應用都有各自獨立的 executors;
Task:被送到某個 executor 上的工作單元;
Job:包含很多任務的並行計算,可以與 Spark 的 action 對應;
Stage:一個 Job 會被拆分很多組任務,每組任務被稱為 Stage(就像 Mapreduce 分 map 任務和 reduce 任務一樣)。
SparkDemo 程序運行Spark 原始碼可以在 http://spark-project.org/download 處下載,也可以到 github 直接複製 Spark 項目。Spark 提供基本源碼壓縮包,同時也提供已經編譯好的壓縮包。Spark 是通過 Scala Shell 來與外界進行交互的。
開始安裝,推薦的方法是首先在第一個節點上部署和啟動 master,獲取 master spark url,然後在部署到其他節點之前修改 conf/spark-env.sh 中的內容。
開始單機 master 服務:./bin/start-master.sh
下載了 spark-0.9.1-bin-cdh4 後上傳到/home/zhoumingyao 目錄 (可以自定義目錄,本例使用的是 CentosV6.5 作業系統) 下,具體子目錄如清單 1 所示。
清單 1. 目錄列表-rw-r--r-- 1 root root 3899 3 月 27 13:36 README.md-rw-r--r-- 1 root root 25379 3 月 27 13:36 pom.xml-rw-r--r-- 1 root root 162 3 月 27 13:36 NOTICE-rw-r--r-- 1 root root 4719 3 月 27 13:36 make-distribution.sh-rw-r--r-- 1 root root 21118 3 月 27 13:36 LICENSE-rw-r--r-- 1 root root 127117 3 月 27 13:36 CHANGES.txtdrwxr-xr-x 4 root root 4096 5 月 6 13:35 assemblydrwxr-xr-x 4 root root 4096 5 月 6 13:36 bageldrwxr-xr-x 2 root root 4096 5 月 6 13:36 bindrwxr-xr-x 2 root root 4096 5 月 6 13:36 confdrwxr-xr-x 4 root root 4096 5 月 6 13:37 coredrwxr-xr-x 2 root root 4096 5 月 6 13:37 datadrwxr-xr-x 4 root root 4096 5 月 6 13:37 devdrwxr-xr-x 3 root root 4096 5 月 6 13:37 dockerdrwxr-xr-x 7 root root 4096 5 月 6 13:37 docsdrwxr-xr-x 4 root root 4096 5 月 6 13:37 ec2drwxr-xr-x 4 root root 4096 5 月 6 13:37 examplesdrwxr-xr-x 7 root root 4096 5 月 6 13:38 externaldrwxr-xr-x 3 root root 4096 5 月 6 13:38 extrasdrwxr-xr-x 5 root root 4096 5 月 6 13:38 graphxdrwxr-xr-x 5 root root 4096 5 月 6 13:38 mllibdrwxr-xr-x 3 root root 4096 5 月 6 13:38 projectdrwxr-xr-x 6 root root 4096 5 月 6 13:38 pythondrwxr-xr-x 4 root root 4096 5 月 6 13:38 repldrwxr-xr-x 2 root root 4096 5 月 6 13:38 sbindrwxr-xr-x 2 root root 4096 5 月 6 13:38 sbtdrwxr-xr-x 4 root root 4096 5 月 6 13:39 streamingdrwxr-xr-x 3 root root 4096 5 月 6 13:39 targetdrwxr-xr-x 4 root root 4096 5 月 6 13:39 toolsdrwxr-xr-x 5 root root 4096 5 月 6 13:39 yarn
進入 bin 目錄,執行 spark-shell.sh,進入 scala shell 狀態,如清單 2 所示。
清單 2. 運行命令scala> val data = Array(1, 2, 3, 4, 5) //產生 datadata: Array[Int] = Array(1, 2, 3, 4, 5)
下面開始將 data 處理成 RDD,如清單 3 所示。
清單 3. 處理成 RDDscala> val distData = sc.parallelize(data) //將 data 處理成 RDDdistData: spark.RDD[Int] = spark.ParallelCollection@7a0ec850(顯示出的類型為 RDD)
清單 4. 在 RDD 上運算scala> distData.reduce(_+_) //在 RDD 上進行運算,對 data 裡面元素進行加和
清單 5. 啟動 Spark[root@facenode1 sbin]# ./start-all.shstarting org.apache.spark.deploy.master.Master, logging to /home/zhoumingyao/spark-0.9.1-bin-cdh4/sbin/../logs/ spark-root-org.apache.spark.deploy.master.Master-1-facenode1.outlocalhost: Warning: Permanently added 'localhost' (RSA) to the list of known hosts.localhost: starting org.apache.spark.deploy.worker.Worker, logging to /home/zhoumingyao/spark-0.9.1-bin-cdh4/sbin/../ logs/spark-root-org.apache.spark.deploy.worker.Worker-1-facenode1.out
清單 6. 查看服務[root@facenode1 sbin]# ps -ef | grep sparkroot 29295 1 11 16:45 pts/1 00:00:03 /usr/java/jdk1.6.0_31/bin/java -cp :/home/zhoumingyao/spark-0.9.1-bin-cdh4/conf:/home/ zhoumingyao/spark-0.9.1-bin-cdh4/assembly/target/scala-2.10/ spark-assembly_2.10-0.9.1-hadoop2.0.0-mr1-cdh4.2.0.jar:/etc/alternatives/ hadoopconf -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip facenode1 --port 7077 --webui-port 8080root 29440 1 12 16:45 ? 00:00:03 java -cp :/home/zhoumingyao/ spark-0.9.1-bin-cdh4/conf:/home/zhoumingyao/spark-0.9.1-bin-cdh4/ assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.0.0-mr1-cdh4.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://facenode1:7077
可以啟動多個工作站,通過以下命令連接到 master 伺服器,如清單 7 所示。
清單 7. 連接 Master 伺服器./spark-class org.apache.spark.deploy.worker.Worker spark://facenode1:7077輸出如下:14/05/06 16:49:06 INFO ui.WorkerWebUI: Started Worker web UI at http://facenode1:808214/05/06 16:49:06 INFO worker.Worker: Connecting to master spark://facenode1:7077...14/05/06 16:49:06 INFO worker.Worker: Successfully registered with master spark://facenode1:7077
進入 master server 的 Web UI 可以看到主節點、從節點的工作情況,如清單 8 所示。
清單 8. 訪問 Web 客戶端http://10.10.19.171:8080/
注意,如果是集群方式,請在 conf 文件夾下面的 slaves 文件裡面逐行加入需要加入集群的 master、works 伺服器的 ip 地址或者 hostname。
MapReduce 轉換到 SparkSpark 是類似於 MapReduce 的計算引擎,它提出的內存方式解決了 MapReduce 存在的讀取磁碟速度較慢的困難,此外,它基於 Scala 的函數式編程風格和 API,進行並行計算時效率很高。
由於 Spark 採用的是 RDD(彈性分布式結果集) 方式對數據進行計算,這種方式與 MapReduce 的 Map()、Reduce() 方式差距較大,所以很難直接使用 Mapper、Reducer 的 API,這也是阻礙 MapReduce 轉為 Spark 的絆腳石。
Scala 或者 Spark 裡面的 map() 和 reduce() 方法與 Hadoop MapReduce 裡面的 map()、reduce() 方法相比,Hadoop MapReduce 的 API 更加靈活和複雜,下面列出了 Hadoop MapReduce 的一些特性:
Mappers 和 Reducers 通常使用 key-value 鍵值對作為輸入和輸出;
一個 key 對應一個 Reducer 的 reduce;
每一個 Mapper 或者 Reducer 可能發出類似於 0,1 這樣的鍵值對作為每一次輸出;
Mappers 和 Reducers 可能發出任意的 key 或者 value,而不是標準數據集方式;
Mapper 和 Reducer 對象對每一次 map() 和 reduce() 的調用都存在生命周期。它們支持一個 setup() 方法和 cleanup() 方法,這些方法可以被用來在處理批量數據之前的操作。
試想這麼一個場景,我們需要計算一個文本文件裡每一行的字符數量。在 Hadoop MapReduce 裡,我們需要為 Mapper 方法準備一個鍵值對,key 用作行的行數,value 的值是這一行的字符數量。
清單 9. MapReduce 方式 Map 函數public class LineLengthCountMapper extends Mapper<LongWritable,Text,IntWritable,IntWritable> { @Override protected void map(LongWritable lineNumber, Text line, Context context) throws IOException, InterruptedException { context.write(new IntWritable(line.getLength()), new IntWritable(1)); }}
清單 9 所示代碼,由於 Mappers 和 Reducers 只處理鍵值對,所以對於類 LineLengthCountMapper 而言,輸入是 TextInputFormat 對象,它的 key 由行數提供,value 就是該行所有字符。換成 Spark 之後的代碼如清單 10 所示。
清單 10. Spark 方式 Map 函數lines.map(line => (line.length, 1))
在 Spark 裡,輸入是彈性分布式數據集 (Resilient Distributed Dataset),Spark 不需要 key-value 鍵值對,代之的是 Scala 元祖 (tuple),它是通過 (line.length, 1) 這樣的 (a,b) 語法創建的。以上代碼中 map() 操作是一個 RDD,(line.length, 1) 元祖。當一個 RDD 包含元祖時,它依賴於其他方法,例如 reduceByKey(),該方法對於重新生成 MapReduce 特性具有重要意義。
清單 11 所示代碼是 Hadoop MapReduce 統計每一行的字符數,然後以 Reduce 方式輸出。
清單 11. MapReduce 方式 Reduce 函數public class LineLengthReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> { @Override protected void reduce(IntWritable length, Iterable<IntWritable> counts, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : counts) { sum += count.get(); } context.write(length, new IntWritable(sum)); }}
Spark 裡面的對應代碼如清單 12 所示。
清單 12. Spark 方式 Reduce 函數val lengthCounts = lines.map(line => (line.length, 1)).reduceByKey(_ + _)
Spark 的 RDD API 有一個 reduce() 方法,它會 reduce 所有的 key-value 鍵值對到一個獨立的 value。
我們現在需要統計大寫字母開頭的單詞數量,對於文本的每一行而言,一個 Mapper 可能需要統計很多個鍵值對,代碼如清單 13 所示。
清單 13. MapReduce 方式計算字符數量public class CountUppercaseMapper extends Mapper<LongWritable,Text,Text,IntWritable> { @Override protected void map(LongWritable lineNumber, Text line, Context context) throws IOException, InterruptedException { for (String word : line.toString().split(" ")) { if (Character.isUpperCase(word.charAt(0))) { context.write(new Text(word), new IntWritable(1)); } } }}
在 Spark 裡面,對應的代碼如清單 14 所示。
清單 14. Spark 方式計算字符數量lines.flatMap(_.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1)))
MapReduce 依賴的 Map 方法這裡並不適用,因為每一個輸入必須對應一個輸出,這樣的話,每一行可能佔用到很多的輸出。相反的,Spark 裡面的 Map 方法比較簡單。Spark 裡面的方法是,首先對每一行數據進行匯總後存入一個輸出結果物數組,這個數組可能是空的,也可能包含了很多的值,最終這個數組會作為一個 RDD 作為輸出物。這就是 flatMap() 方法的功能,它對每一行文本裡的單詞轉換成函數內部的元組後進行了過濾。
在 Spark 裡面,reduceByKey() 方法可以被用來統計每篇文章裡面出現的字母數量。如果我們想統計每一篇文章裡面出現的大寫字母數量,在 MapReduce 裡程序可以如清單 15 所示。
清單 15. MapReduce 方式public class CountUppercaseReducer extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text word, Iterable<IntWritable> counts, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : counts) { sum += count.get(); } context.write(new Text(word.toString().toUpperCase()), new IntWritable(sum)); }}
在 Spark 裡,代碼如清單 16 所示。
清單 16. Spark 方式groupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) }
groupByKey() 方法負責收集一個 key 的所有值,不應用於一個 reduce 方法。本例中,key 被轉換成大寫字母,值被直接相加算出總和。但這裡需要注意,如果一個 key 與很多個 value 相關聯,可能會出現 Out Of Memory 錯誤。
Spark 提供了一個簡單的方法可以轉換 key 對應的值,這個方法把 reduce 方法過程移交給了 Spark,可以避免出現 OOM 異常。
reduceByKey(_ + _).map { case (word,total) => (word.toUpperCase,total) }
setup() 方法在 MapReduce 裡面主要的作用是在 map 方法開始前對輸入進行處理,常用的場景是連接資料庫,可以在 cleanup() 方法中釋放在 setup() 方法裡面佔用的資源。
清單 17. MapReduce 方式public class SetupCleanupMapper extends Mapper<LongWritable,Text,Text,IntWritable> { private Connection dbConnection; @Override protected void setup(Context context) { dbConnection = ...; } ... @Override protected void cleanup(Context context) { dbConnection.close(); }}
在 Spark 裡面沒有這樣的方法。
結束語通過本文的學習,讀者了解了 MapReduce 和 Spark 之間的差異及切換成本。本文針對的是對 Spark 完全沒有了解的用戶,後續文章會從實際應用出發,從安裝、應用程式的角度給出更加實際的教程。
參考資料學習參考 Spark 文檔首頁,了解 IBM 開發者論壇已經收錄的 Spark 文章。
參考 Cloudera Spark 文檔首頁,了解 Spark 原理。
>參考書籍《Spark 大數據處理技術》,作者作為 Spark 社區的主要推動者,對於 Spark 技術有深入的介紹。
developerWorks 開源技術主題:查找豐富的操作信息、工具和項目更新,幫助您掌握開源技術並將其用於 IBM 產品。
點擊[閱讀原文] 報名數盟線下工作坊