如何將 MapReduce 轉化為 Spark

2021-02-15 數盟

【數盟致力於成為最卓越的數據科學社區,聚焦於大數據、分析挖掘、數據可視化領域,業務範圍:線下活動、在線課程、獵頭服務】

這年頭不會點Python都不好意思跟隔壁老王打招呼,數盟線下工作坊:手把手教你玩轉Python,詳情點擊文末閱讀原文!

作者:周明耀

出處:developerWorks 中國

MapReduce VS Spark

目前的大數據處理可以分為以下三個類型:

複雜的批量數據處理(batch data processing),通常的時間跨度在數十分鐘到數小時之間;

基於歷史數據的交互式查詢(interactive query),通常的時間跨度在數十秒到數分鐘之間;

基於實時數據流的數據處理(streaming data processing),通常的時間跨度在數百毫秒到數秒之間。

大數據處理勢必需要依賴集群環境,而集群環境有三大挑戰,分別是並行化、單點失敗處理、資源共享,分別可以採用以並行化的方式重寫應用程式、對單點失敗的處理方式、動態地進行計算資源的分配等解決方案來面對挑戰。

針對集群環境出現了大量的大數據編程框架,首先是 Google 的 MapReduce,它給我們展示了一個簡單通用和自動容錯的批處理計算模型。但是對於其他類型的計算,比如交互式和流式計算,MapReduce 並不適合。這也導致了大量的不同於 MapReduce 的專有的數據處理模型的出現,比如 Storm、Impala 等等。但是這些專有系統也存在一些不足:

重複工作:許多專有系統在解決同樣的問題,比如分布式作業以及容錯,舉例來說,一個分布式的 SQL 引擎或者一個機器學習系統都需要實現並行聚合,這些問題在每個專有系統中會重複地被解決。

組合問題:在不同的系統之間進行組合計算是一件麻煩的事情。對於特定的大數據應用程式而言,中間數據集是非常大的,而且移動的成本很高。在目前的環境下,我們需要將數據複製到穩定的存儲系統,比如 HDFS,以便在不同的計算引擎中進行分享。然而,這樣的複製可能比真正的計算所花費的代價要大,所以以流水線的形式將多個系統組合起來效率並不高。

適用範圍的局限性:如果一個應用不適合一個專有的計算系統,那麼使用者只能換一個系統,或者重寫一個新的計算系統。

資源分配:在不同的計算引擎之間進行資源的動態共享比較困難,因為大多數的計算引擎都會假設它們在程序運行結束之前擁有相同的機器節點的資源。

管理問題:對於多個專有系統,需要花費更多的精力和時間來管理和部署,尤其是對於終端使用者而言,需要學習多種 API 和系統模型。

Spark 是伯克利大學推出的大數據處理框架,它提出了 RDD 概念 (Resilient Distributed Datasets),即抽象的彈性數據集概念。Spark 是對 MapReduce 模型的一種擴展。要在 MapReduce 上實現其不擅長的計算工作 (比如迭代式、交互式和流式),是比較困難的,因為 MapReduce 缺少在並行計算的各個階段進行有效的數據共享的能力,而這種能力是 RDD 的本質所在。利用這種有效地數據共享和類似 MapReduce 的操作接口,上述的各種專有類型計算都能夠有效地表達,而且能夠獲得與專有系統同等的性能。

MapReduce 和 Spark 介紹MapReduce

MapReduce 是為 Apache Hadoop 量身訂做的,它非常適用於 Hadoop 的使用場景,即大規模日誌處理系統、批量數據提取加載工具 (ETL 工具) 等類似操作。但是伴隨著 Hadoop 地盤的不斷擴張,Hadoop 的開發者們發現 MapReduce 在很多場景下並不是最佳選擇,於是 Hadoop 開始把資源管理放入到自己獨立的組件 YARN 裡面。此外,類似於 Impala 這樣的項目也開始逐漸進入到我們的架構中,Impala 提供 SQL 語義,能查詢存儲在 Hadoop 的 HDFS 和 HBase 中的 PB 級大數據。之前也有類似的項目,例如 Hive。Hive 系統雖然也提供了 SQL 語義,但由於 Hive 底層執行使用的是 MapReduce 引擎,仍然是一個批處理過程,難以滿足查詢的交互性。相比之下,Impala 的最大特點也是最大賣點就是它的效率。

第一代 Hadoop MapReduce 是一個在計算機集群上分布式處理海量數據集的軟體框架,包括一個 JobTracker 和一定數量的 TaskTracker。運行流程圖如圖 1 所示。

圖 1. MapReduce 運行流程圖

在最上層有 4 個獨立的實體,即客戶端、jobtracker、tasktracker 和分布式文件系統。客戶端提交 MapReduce 作業;jobtracker 協調作業的運行;jobtracker 是一個 Java 應用程式,它的主類是 JobTracker;tasktracker 運行作業劃分後的任務,tasktracker 也是一個 Java 應用程式,它的主類是 TaskTracker。Hadoop 運行 MapReduce 作業的步驟主要包括提交作業、初始化作業、分配任務、執行任務、更新進度和狀態、完成作業等 6 個步驟。

Spark 簡介

Spark 生態系統的目標就是將批處理、交互式處理、流式處理融合到一個軟體框架內。Spark 是一個基於內存計算的開源的集群計算系統,目的是讓數據分析更加快速。Spark 非常小巧玲瓏,由加州伯克利大學 AMP 實驗室的 Matei 為主的小團隊所開發。使用的語言是 Scala,項目的 core 部分的代碼只有 63 個 Scala 文件,非常短小精悍。Spark 啟用了內存分布數據集,除了能夠提供交互式查詢外,它還可以優化迭代工作負載。Spark 提供了基於內存的計算集群,在分析數據時將數據導入內存以實現快速查詢,速度比基於磁碟的系統,如 Hadoop 快很多。Spark 最初是為了處理迭代算法,如機器學習、圖挖掘算法等,以及交互式數據挖掘算法而開發的。在這兩種場景下,Spark 的運行速度可以達到 Hadoop 的幾百倍。

Spark 允許應用在內存中保存工作集以便高效地重複利用,它支持多種數據處理應用,同時也保持了 MapReduce 的重要特性,如高容錯性、數據本地化、大規模數據處理等。此外,提出了彈性分布式數據集 (Resilient Distributed Datasets) 的概念:

RDD 表現為一個 Scala 對象,可由一個文件創建而來;

分布在一個集群內的,不可變的對象切分集;

通過並行處理(map、filter、groupby、join)固定數據(BaseRDD)創建模型,生成 Transformed RDD;

故障時可使用 RDD 血統信息重建;

可高速緩存,以便再利用。

圖 2 所示是一個日誌挖掘的示例代碼,首先將日誌數據中的 error 信息導入內存,然後進行交互搜索。

圖 2. RDD 代碼示例

在導入數據時,模型以 block 形式存在於 worker 上,由 driver 向 worker 分發任務,處理完後 work 向 driver 反饋結果。也可在 work 上對數據模型建立高速緩存 cache,對 cache 的處理過程與 block 類似,也是一個分發、反饋的過程。

Spark 的 RDD 概念能夠取得和專有系統同樣的性能,還能提供包括容錯處理、滯後節點處理等這些專有系統缺乏的特性。

迭代算法:這是目前專有系統實現的非常普遍的一種應用場景,比如迭代計算可以用於圖處理和機器學習。RDD 能夠很好地實現這些模型,包括 Pregel、HaLoop 和 GraphLab 等模型。

關係型查詢:對於 MapReduce 來說非常重要的需求就是運行 SQL 查詢,包括長期運行、數小時的批處理作業和交互式的查詢。然而對於 MapReduce 而言,對比並行資料庫進行交互式查詢,有其內在的缺點,比如由於其容錯的模型而導致速度很慢。利用 RDD 模型,可以通過實現許多通用的資料庫引擎特性,從而獲得很好的性能。

MapReduce 批處理:RDD 提供的接口是 MapReduce 的超集,所以 RDD 能夠有效地運行利用 MapReduce 實現的應用程式,另外 RDD 還適合更加抽象的基於 DAG 的應用程式。

流式處理:目前的流式系統也只提供了有限的容錯處理,需要消耗系統非常大的拷貝代碼或者非常長的容錯時間。特別是在目前的系統中,基本都是基於連續計算的模型,常住的有狀態的操作會處理到達的每一條記錄。為了恢復失敗的節點,它們需要為每一個操作複製兩份操作,或者將上遊的數據進行代價較大的操作重放,利用 RDD 實現離散數據流,可以克服上述問題。離散數據流將流式計算當作一系列的短小而確定的批處理操作,而不是常駐的有狀態的操作,將兩個離散流之間的狀態保存在 RDD 中。離散流模型能夠允許通過 RDD 的繼承關係圖進行並行性的恢復而不需要進行數據拷貝。

Spark 內部術語解釋

Application:基於 Spark 的用戶程序,包含了 driver 程序和集群上的 executor;

Driver Program:運行 main 函數並且新建 SparkContext 的程序;

Cluster Manager:在集群上獲取資源的外部服務 (例如:standalone,Mesos,Yarn);

Worker Node:集群中任何可以運行應用代碼的節點;

Executor:是在一個 worker node 上為某應用啟動的一個進程,該進程負責運行任務,並且負責將數據存在內存或者磁碟上。每個應用都有各自獨立的 executors;

Task:被送到某個 executor 上的工作單元;

Job:包含很多任務的並行計算,可以與 Spark 的 action 對應;

Stage:一個 Job 會被拆分很多組任務,每組任務被稱為 Stage(就像 Mapreduce 分 map 任務和 reduce 任務一樣)。

SparkDemo 程序運行

Spark 原始碼可以在 http://spark-project.org/download 處下載,也可以到 github 直接複製 Spark 項目。Spark 提供基本源碼壓縮包,同時也提供已經編譯好的壓縮包。Spark 是通過 Scala Shell 來與外界進行交互的。

開始安裝,推薦的方法是首先在第一個節點上部署和啟動 master,獲取 master spark url,然後在部署到其他節點之前修改 conf/spark-env.sh 中的內容。

開始單機 master 服務:./bin/start-master.sh

下載了 spark-0.9.1-bin-cdh4 後上傳到/home/zhoumingyao 目錄 (可以自定義目錄,本例使用的是 CentosV6.5 作業系統) 下,具體子目錄如清單 1 所示。

清單 1. 目錄列表

-rw-r--r-- 1 root root 3899 3 月 27 13:36 README.md-rw-r--r-- 1 root root 25379 3 月 27 13:36 pom.xml-rw-r--r-- 1 root root 162 3 月 27 13:36 NOTICE-rw-r--r-- 1 root root 4719 3 月 27 13:36 make-distribution.sh-rw-r--r-- 1 root root 21118 3 月 27 13:36 LICENSE-rw-r--r-- 1 root root 127117 3 月 27 13:36 CHANGES.txtdrwxr-xr-x 4 root root 4096 5 月 6 13:35 assemblydrwxr-xr-x 4 root root 4096 5 月 6 13:36 bageldrwxr-xr-x 2 root root 4096 5 月 6 13:36 bindrwxr-xr-x 2 root root 4096 5 月 6 13:36 confdrwxr-xr-x 4 root root 4096 5 月 6 13:37 coredrwxr-xr-x 2 root root 4096 5 月 6 13:37 datadrwxr-xr-x 4 root root 4096 5 月 6 13:37 devdrwxr-xr-x 3 root root 4096 5 月 6 13:37 dockerdrwxr-xr-x 7 root root 4096 5 月 6 13:37 docsdrwxr-xr-x 4 root root 4096 5 月 6 13:37 ec2drwxr-xr-x 4 root root 4096 5 月 6 13:37 examplesdrwxr-xr-x 7 root root 4096 5 月 6 13:38 externaldrwxr-xr-x 3 root root 4096 5 月 6 13:38 extrasdrwxr-xr-x 5 root root 4096 5 月 6 13:38 graphxdrwxr-xr-x 5 root root 4096 5 月 6 13:38 mllibdrwxr-xr-x 3 root root 4096 5 月 6 13:38 projectdrwxr-xr-x 6 root root 4096 5 月 6 13:38 pythondrwxr-xr-x 4 root root 4096 5 月 6 13:38 repldrwxr-xr-x 2 root root 4096 5 月 6 13:38 sbindrwxr-xr-x 2 root root 4096 5 月 6 13:38 sbtdrwxr-xr-x 4 root root 4096 5 月 6 13:39 streamingdrwxr-xr-x 3 root root 4096 5 月 6 13:39 targetdrwxr-xr-x 4 root root 4096 5 月 6 13:39 toolsdrwxr-xr-x 5 root root 4096 5 月 6 13:39 yarn

進入 bin 目錄,執行 spark-shell.sh,進入 scala shell 狀態,如清單 2 所示。

清單 2. 運行命令

scala> val data = Array(1, 2, 3, 4, 5) //產生 datadata: Array[Int] = Array(1, 2, 3, 4, 5)

下面開始將 data 處理成 RDD,如清單 3 所示。

清單 3. 處理成 RDD

scala> val distData = sc.parallelize(data) //將 data 處理成 RDDdistData: spark.RDD[Int] = spark.ParallelCollection@7a0ec850(顯示出的類型為 RDD)

清單 4. 在 RDD 上運算

scala> distData.reduce(_+_) //在 RDD 上進行運算,對 data 裡面元素進行加和

清單 5. 啟動 Spark

[root@facenode1 sbin]# ./start-all.shstarting org.apache.spark.deploy.master.Master, logging to /home/zhoumingyao/spark-0.9.1-bin-cdh4/sbin/../logs/ spark-root-org.apache.spark.deploy.master.Master-1-facenode1.outlocalhost: Warning: Permanently added 'localhost' (RSA) to the list of known hosts.localhost: starting org.apache.spark.deploy.worker.Worker, logging to /home/zhoumingyao/spark-0.9.1-bin-cdh4/sbin/../ logs/spark-root-org.apache.spark.deploy.worker.Worker-1-facenode1.out

清單 6. 查看服務

[root@facenode1 sbin]# ps -ef | grep sparkroot 29295 1 11 16:45 pts/1 00:00:03 /usr/java/jdk1.6.0_31/bin/java -cp :/home/zhoumingyao/spark-0.9.1-bin-cdh4/conf:/home/ zhoumingyao/spark-0.9.1-bin-cdh4/assembly/target/scala-2.10/ spark-assembly_2.10-0.9.1-hadoop2.0.0-mr1-cdh4.2.0.jar:/etc/alternatives/ hadoopconf -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip facenode1 --port 7077 --webui-port 8080root 29440 1 12 16:45 ? 00:00:03 java -cp :/home/zhoumingyao/ spark-0.9.1-bin-cdh4/conf:/home/zhoumingyao/spark-0.9.1-bin-cdh4/ assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.0.0-mr1-cdh4.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://facenode1:7077

可以啟動多個工作站,通過以下命令連接到 master 伺服器,如清單 7 所示。

清單 7. 連接 Master 伺服器

./spark-class org.apache.spark.deploy.worker.Worker spark://facenode1:7077輸出如下:14/05/06 16:49:06 INFO ui.WorkerWebUI: Started Worker web UI at http://facenode1:808214/05/06 16:49:06 INFO worker.Worker: Connecting to master spark://facenode1:7077...14/05/06 16:49:06 INFO worker.Worker: Successfully registered with master spark://facenode1:7077

進入 master server 的 Web UI 可以看到主節點、從節點的工作情況,如清單 8 所示。

清單 8. 訪問 Web 客戶端

http://10.10.19.171:8080/

注意,如果是集群方式,請在 conf 文件夾下面的 slaves 文件裡面逐行加入需要加入集群的 master、works 伺服器的 ip 地址或者 hostname。

MapReduce 轉換到 Spark

Spark 是類似於 MapReduce 的計算引擎,它提出的內存方式解決了 MapReduce 存在的讀取磁碟速度較慢的困難,此外,它基於 Scala 的函數式編程風格和 API,進行並行計算時效率很高。

由於 Spark 採用的是 RDD(彈性分布式結果集) 方式對數據進行計算,這種方式與 MapReduce 的 Map()、Reduce() 方式差距較大,所以很難直接使用 Mapper、Reducer 的 API,這也是阻礙 MapReduce 轉為 Spark 的絆腳石。

Scala 或者 Spark 裡面的 map() 和 reduce() 方法與 Hadoop MapReduce 裡面的 map()、reduce() 方法相比,Hadoop MapReduce 的 API 更加靈活和複雜,下面列出了 Hadoop MapReduce 的一些特性:

Mappers 和 Reducers 通常使用 key-value 鍵值對作為輸入和輸出;

一個 key 對應一個 Reducer 的 reduce;

每一個 Mapper 或者 Reducer 可能發出類似於 0,1 這樣的鍵值對作為每一次輸出;

Mappers 和 Reducers 可能發出任意的 key 或者 value,而不是標準數據集方式;

Mapper 和 Reducer 對象對每一次 map() 和 reduce() 的調用都存在生命周期。它們支持一個 setup() 方法和 cleanup() 方法,這些方法可以被用來在處理批量數據之前的操作。

試想這麼一個場景,我們需要計算一個文本文件裡每一行的字符數量。在 Hadoop MapReduce 裡,我們需要為 Mapper 方法準備一個鍵值對,key 用作行的行數,value 的值是這一行的字符數量。

清單 9. MapReduce 方式 Map 函數

public class LineLengthCountMapper extends Mapper<LongWritable,Text,IntWritable,IntWritable> { @Override protected void map(LongWritable lineNumber, Text line, Context context) throws IOException, InterruptedException { context.write(new IntWritable(line.getLength()), new IntWritable(1)); }}

清單 9 所示代碼,由於 Mappers 和 Reducers 只處理鍵值對,所以對於類 LineLengthCountMapper 而言,輸入是 TextInputFormat 對象,它的 key 由行數提供,value 就是該行所有字符。換成 Spark 之後的代碼如清單 10 所示。

清單 10. Spark 方式 Map 函數

lines.map(line => (line.length, 1))

在 Spark 裡,輸入是彈性分布式數據集 (Resilient Distributed Dataset),Spark 不需要 key-value 鍵值對,代之的是 Scala 元祖 (tuple),它是通過 (line.length, 1) 這樣的 (a,b) 語法創建的。以上代碼中 map() 操作是一個 RDD,(line.length, 1) 元祖。當一個 RDD 包含元祖時,它依賴於其他方法,例如 reduceByKey(),該方法對於重新生成 MapReduce 特性具有重要意義。

清單 11 所示代碼是 Hadoop MapReduce 統計每一行的字符數,然後以 Reduce 方式輸出。

清單 11. MapReduce 方式 Reduce 函數

public class LineLengthReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> { @Override protected void reduce(IntWritable length, Iterable<IntWritable> counts, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : counts) { sum += count.get(); } context.write(length, new IntWritable(sum)); }}

Spark 裡面的對應代碼如清單 12 所示。

清單 12. Spark 方式 Reduce 函數

val lengthCounts = lines.map(line => (line.length, 1)).reduceByKey(_ + _)

Spark 的 RDD API 有一個 reduce() 方法,它會 reduce 所有的 key-value 鍵值對到一個獨立的 value。

我們現在需要統計大寫字母開頭的單詞數量,對於文本的每一行而言,一個 Mapper 可能需要統計很多個鍵值對,代碼如清單 13 所示。

清單 13. MapReduce 方式計算字符數量

public class CountUppercaseMapper extends Mapper<LongWritable,Text,Text,IntWritable> { @Override protected void map(LongWritable lineNumber, Text line, Context context) throws IOException, InterruptedException { for (String word : line.toString().split(" ")) { if (Character.isUpperCase(word.charAt(0))) { context.write(new Text(word), new IntWritable(1)); } } }}

在 Spark 裡面,對應的代碼如清單 14 所示。

清單 14. Spark 方式計算字符數量

lines.flatMap(_.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1)))

MapReduce 依賴的 Map 方法這裡並不適用,因為每一個輸入必須對應一個輸出,這樣的話,每一行可能佔用到很多的輸出。相反的,Spark 裡面的 Map 方法比較簡單。Spark 裡面的方法是,首先對每一行數據進行匯總後存入一個輸出結果物數組,這個數組可能是空的,也可能包含了很多的值,最終這個數組會作為一個 RDD 作為輸出物。這就是 flatMap() 方法的功能,它對每一行文本裡的單詞轉換成函數內部的元組後進行了過濾。

在 Spark 裡面,reduceByKey() 方法可以被用來統計每篇文章裡面出現的字母數量。如果我們想統計每一篇文章裡面出現的大寫字母數量,在 MapReduce 裡程序可以如清單 15 所示。

清單 15. MapReduce 方式

public class CountUppercaseReducer extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text word, Iterable<IntWritable> counts, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : counts) { sum += count.get(); } context.write(new Text(word.toString().toUpperCase()), new IntWritable(sum)); }}

在 Spark 裡,代碼如清單 16 所示。

清單 16. Spark 方式

groupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) }

groupByKey() 方法負責收集一個 key 的所有值,不應用於一個 reduce 方法。本例中,key 被轉換成大寫字母,值被直接相加算出總和。但這裡需要注意,如果一個 key 與很多個 value 相關聯,可能會出現 Out Of Memory 錯誤。

Spark 提供了一個簡單的方法可以轉換 key 對應的值,這個方法把 reduce 方法過程移交給了 Spark,可以避免出現 OOM 異常。

reduceByKey(_ + _).map { case (word,total) => (word.toUpperCase,total) }

setup() 方法在 MapReduce 裡面主要的作用是在 map 方法開始前對輸入進行處理,常用的場景是連接資料庫,可以在 cleanup() 方法中釋放在 setup() 方法裡面佔用的資源。

清單 17. MapReduce 方式

public class SetupCleanupMapper extends Mapper<LongWritable,Text,Text,IntWritable> { private Connection dbConnection; @Override protected void setup(Context context) { dbConnection = ...; } ... @Override protected void cleanup(Context context) { dbConnection.close(); }}

在 Spark 裡面沒有這樣的方法。

結束語

通過本文的學習,讀者了解了 MapReduce 和 Spark 之間的差異及切換成本。本文針對的是對 Spark 完全沒有了解的用戶,後續文章會從實際應用出發,從安裝、應用程式的角度給出更加實際的教程。

參考資料學習

參考 Spark 文檔首頁,了解 IBM 開發者論壇已經收錄的 Spark 文章。

參考 Cloudera Spark 文檔首頁,了解 Spark 原理。

>參考書籍《Spark 大數據處理技術》,作者作為 Spark 社區的主要推動者,對於 Spark 技術有深入的介紹。

developerWorks 開源技術主題:查找豐富的操作信息、工具和項目更新,幫助您掌握開源技術並將其用於 IBM 產品。

點擊[閱讀原文] 報名數盟線下工作坊


相關焦點

  • 一、Spark概述
    官方網址:http://spark.apache.orghttps://databricks.com/spark/aboutSpark 四大特點1、速度快(比mapreduce在內存中快100倍,在磁碟中快10倍)spark中的job中間結果可以不落地
  • Spark-TFRecord: Spark將全面支持TFRecord
    為了模型訓練,我們或者修改代碼使模型訓練能夠讀取avro格式,或者將avro格式的datasets轉化為TFRecord。Spark-TFRecod主要是解決後者,即將不同格式轉化為TFRecord。現有的項目和之前的嘗試在 Spark-TFRecord 項目之前,社區提供 Spark-TensorFlow-Connector , 在 Spark 中讀寫 TFRecord 。
  • 搞懂Hadoop、MapReduce、Hive、HBase、YARN及Spark的區別與聯繫
    hive數據倉庫工具能將結構化的數據文件映射為一張資料庫表,並提供SQL查詢功能,能將SQL語句轉變成MapReduce任務來執行。hive與關係型資料庫的SQL略有不同,但支持了絕大多數的語句如DDL、DML以及常見的聚合函數、連接查詢、條件查詢。
  • 大數據分析工程師入門9-Spark SQL
    大數據處理使用SQL進行大數據處理,使傳統的RDBMS人員也可以進行大數據處理,不需要掌握像mapreduce的編程方法。// 開啟隱式轉換import spark.implicits._ // 讀入文本文件並最終轉化成DataFrameval peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes =&
  • Spark RDD上的map operators是如何pipeline起來的
    當時我很自然地回答說:不需要多次循環,spark會將多個map操作pipeline起來apply到rdd partition的每個data element上。事後仔細想了想這個問題,雖然我確信spark不可能傻到每個map operator都循環遍歷一次數據,但是這些map操作具體是怎麼被pipeline起來apply的呢?這個問題還真不太清楚。
  • HBase與mapreduce集成
    在公司中,大多數都是hbase和mapreduce共同進行業務操作,hbase多數都是讀寫操作,hbase和hadoop在內部已經封裝好了,我們要做的就是調用。常見的模式1、從hbase讀取數據將hbase的數據作為map的輸入2、將數據寫入hbase將hbase作為reduce輸出3、從hbase讀,再寫入hbase
  • Spark機器學習的關鍵技巧
    )基於Spark做智能問答(Spark上的算法支持)如何基於 spark 做機器學習其中這些內容在我之前寫的一篇描述工作經歷的文章:https://github.com/allwefantasy/my-life/blob/master/career.md 都有提及到
  • Spark 數據傾斜及其解決方案
    dataFrame 和 sparkSql 可以設置 spark.sql.shuffle.partitions=[num_tasks] 參數控制 shuffle 的並發度,默認為200。(2)適用場景大量不同的 Key 被分配到了相同的 Task 造成該 Task 數據量過大。(3)解決方案調整並行度。
  • MapReduce 閱讀筆記
    原文:http://blog.luoyuanhang.com/2017/04/19/mapreduce-notes/這篇文章是我閱讀
  • 【Spark研究】Spark編程指南(Python版)
    Spark支持兩種共享變量:廣播變量,用來將一個值緩存到所有節點的內存中;累加器,只能用於累加,比如計數器和求和。這篇指南將展示這些特性在Spark支持的語言中是如何使用的(本文只翻譯了Python部分)。如果你打開了Spark的交互命令行——bin/spark-shell的Scala命令行或bin/pyspark的Python命令行都可以——那麼這篇文章你學習起來將是很容易的。
  • Spark【面試】
    reduce處理,最後將數據保存或者顯示,結束整個job2、hadoop的TextInputFormat作用是什麼,如何自定義實現InputFormat會在map操作之前對數據進行兩方面的預處理 1是getSplits,返回的是InputSplit數組,對數據進行split分片,每片交給map
  • Spark面試高頻考點,必知必會!
    executor-cores —— 每個executor使用的內核數,默認為1,官方建議2-5個,我們企業是4個num-executors —— 啟動executors的數量,默認為2executor-memory —— executor內存大小,默認1Gdriver-cores —— driver使用內核數,默認為
  • 【實驗任務】MapReduce社交好友推薦算法
    實驗目的1.了解笛卡爾積2.學習MapReduce 社交好友推薦算法相關知識如果A和B具有好友關係,B和C具有好友關係,而A和C卻不是好友關係,那麼我們稱A和C這樣的關係為:二度好友關係。在生活中,二度好友推薦的運用非常廣泛,比如某些主流社交產品中都會有「可能認識的人」這樣的功能,一般來說可能認識的人就是通過二度好友關係搜索得到的,在傳統的關係型資料庫中,可以通過圖的廣度優先遍歷算法實現,而且深度限定為2,然而在海量的數據中,這樣的遍歷成本太大,所以有必要利用MapReduce編程模型來並行化。
  • 『 Spark 』2. spark 基本概念解析
    Executor在每個 Worker Node 上為某應用啟動的一個進程,該進程負責運行任務,並且負責將數據存在內存或者磁碟上,每個任務都有各自獨立的 Executor。 Executor 是一個執行 Task 的容器。它的主要職責是:總結:Executor 是一個應用程式運行的監控和執行容器。6.
  • 當MongoDB遇見Spark
    就是數據存儲這部分, 也就是圖中的黑色圈圈HDFS的部分, 如下圖用MongoDB替換HDFS後的Spark生態系統Spark+Mongodb生態系統為什麼要用MongoDB替換HDFS存儲方式上, HDFS以文件為單位,每個文件64MB~128MB不等, 而MongoDB
  • Spark—15分鐘教程
    讓我們看看如何使用Sales表進行基本操作。簡單的Select語句和顯示數據#  以Parquet格式讀取源表sales_table = spark.read.parquet(".在下面的示例中,我們首先將表拆分為兩部分,然後將這些部分合併在一起(完全沒有必要,但它將演示如何使用API):#   以Parquet格式讀取源表sales_table = spark.read.parquet(".
  • Spark 2.0系列之SparkSession詳解
    的各種API,學習Spark的難度也會大大降低。SparkSession的功能首先,我們從一個Spark應用案例入手:SparkSessionZipsExample可以從JSON文件中讀取郵政編碼,通過DataFrame API進行分析,同時還能夠使用Spark SQL語句實施查詢。
  • 用Spark-NLP建立文本分類模型
    1.初始化Spark我們將導入所需的庫並使用不同的配置參數初始化spark會話。配置值取決於我的本地環境。相應地調整參數。2.11:2.4.5")\ .config("spark.kryoserializer.buffer.max", "1000M")\ .config("spark.network.timeout","3600s")\ .getOrCreate()2.加載文本數據我們將使用BBC的數據。
  • 使用Golang實現Spark UDF
    【導讀】如何實現go語言寫spark udf提交到spark任務運行?本文做了詳細介紹Spark非常適合處理大量數據。但是對於我們Go語言開發者來說並不友好,我們公司的人對Golang技術棧更熟悉。本文記錄了如何實現Golang寫spark udf。注意以下操作也可以對UDAF使用。就是聚合的UDF。為何做這個操作你可能會質疑這個方案能否落地。結論是這個方案很好用也很有效。