本篇是介紹Spark的入門系列文章,希望能幫你初窺Spark的大門。
一 Spark概述
1.1 什麼是Spark?
Spark是一種基於內存的快速,通用,可擴展的大數據計算引擎框架。
由於MapReduce計算框架在Map和Reduce階段存在大量的shuffle操作以及IO操作,導致MapReduce的效率一直被很多同學吐槽,而Spark計算框架一定程度下減了shuffle及寫入磁碟操作,因此速度相比較快,因此今天介紹基於Hadoop的Spark計算框架;
先來看一張Hadoop與Spark框架關係圖,Hadoop與Spark關係一目了然,當然並不是說Spark可以代替MapReduce框架,只能說在某一方面上優勝MapReduce,但仍然無法代替MapReduce計算框架。
MapReduce計算框架會存在一定的局限性
1. 基於HDFS文件系統,經過Map計算及Reduce聚合結果保存到磁碟中,對於數據挖掘及數據分析來說,需要對數據進行多次操作,在一定程度上存在局限性;
2. MapReduce在Map及Reduce階段需要大量IO及工作節點移動文件需要的帶寬資源,因此性能上也存在一定的局限性;
3. MapReduce和Hadoop緊密耦合在一起,無法動態替換。
在大家急需一種新的計算引擎出現的情況下,Spark應運而生,不過Spark真正與Hadoop產生聯繫,還需要Yarn幫忙。
在2013年10月發布Hadoop2.X之前,Hadoop1.x是沒有Yarn存在的,存儲和計算緊密耦合,並且負責調度任務和資源的JobTracker苦不堪言,所有工作都堆在了它身上。在2.X之後,Yarn作為資源管理器閃亮登場,通過ResourceManager和NodeManager將任務調度和資源調度的職責分開,再通過ApplicationMaster和Cotainer,實現ResourceManager和NodeManager的解耦合。
於是,Hadoop1.x中的hdfs+mr,變成了Hadoop2.x中的ResourceManager+ApplicationMaster+NodeManager,實現了存儲和計算的解耦合,並且實現了Container(可以理解為電腦上裝的虛擬機)中計算引擎的可插拔替換(MR,Spark,Tez,Presto等),至此,Spark可以一展身手了。
二、Spark的幾大組件介紹
主要四大組件:
Spark Core:實現了Spark的基本功能,包含任務調度、內存管理、錯誤恢復、與存儲系統交互等模塊。SparkCore中還包含了對彈性分布式數據集(Resilient Distributed DataSet,簡稱RDD)的API定義。
Spark SQL:是Spark用來操作結構化數據的程序包。通過Spark SQL,我們可以使用 SQL或者Apache Hive版本的SQL方言(HQL)來查詢數據。Spark SQL支持多種數據源,比如Hive表、Parquet以及JSON等。
Spark Streaming:是Spark提供的對實時數據進行流式計算的組件。提供了用來操作數據流的API,並且與Spark Core中的 RDD API高度對應。
Spark ML:提供常見的機器學習(ML)功能的程序庫。包括分類、回歸、聚類、協同過濾等,還提供了模型評估、數據 導入等額外的支持功能。
集群管理器:Spark 設計為可以高效地在一個計算節點到數千個計算節點之間伸縮計算。為了實現這樣的要求,同時獲得最大靈活性,Spark支持在各種集群管理器(Cluster Manager)上運行,包括Hadoop YARN、Apache Mesos,甚至可以在Spark自帶的簡易調度器上運行。
三、Spark運行模式
Spark的運行模式模式,一般分為Local模式,Standalone模式及Yarn模式(重點),Mesos模式(此模式國內基本不用)。
Local模式:運行在一臺計算機上的模式,通常用於本機練手和測試;
Standalone模式:構建一個由Master+Slaver構成的Spark集群;
Yarn模式:Spark客戶端直接連接Yarn,不需要額外構建Spark集群。有yarn-client和yarn-cluster兩種模式,主要區別在於:Driver程序的運行節點。
yarn-client:Driver程序運行在客戶端,適用於交互、調試,希望立即看到app的輸出yarn-cluster:Driver程序運行在由RM(ResourceManager)啟動的AP(APPMaster)適用於生產環境。
Mesos模式(了解):Spark客戶端直接連接Mesos;不需要額外構建Spark集群。國內應用比較少,更多的是運用yarn調度。
四 、Spark及相應環境安裝
4.1 Mac:
下載安裝jdk1.8並配置環境變量,下載scala的壓縮包後解壓(我使用的是scala-2.11.12),;以及spark-2.3.1-bin-hadoop2.7壓縮包解壓。
最後記得vim .bash_profile配置三者的環境:
exportSCALA_HOME=/User/xxx/xxx/scala-2.11.12
export PATH=$PATH:$SCALA_HOME/bin
SPARK_LOCAL_IP=127.0.0.1
export JAVA_HOME=」/Library/Java/JavaVirtualMachines/jdk1.8_0_221.jdk/Contents/Home」
配置後更新環境變量source .bash_profile
若能在console輸入java -version返回版本則為成功;
若能在console輸出scala -version返回版本則為成功
操作完之後,在spark文件夾下,輸入./bin/spark-shell若成功列印出:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.1
/_/
則表示成功。
4.2 Windows:
安裝jdk1.8並配置環境變量,若能在cmd輸入java -version返回版本則為成功;解壓scala-2.11.12並配置環境變量,若能在cmd輸出scala -version返回版本則為成功;解壓spark-2.3.2-bin-hadoop2.7並配置環境變量,若能在cmd輸入spark-shell出現welcome語則為成功;解壓hadoop-2.7.7並配置環境變量(winutils.exe已放置hadoop-2.7.7\bin目錄下)
4.3 用IDEA跑WordCount,此案例等同於其他語言的Hello World。
打開IDEA,新建scala項目,選擇之前安裝的jdk和scala版本,進入項目後,右擊項目,選擇Open Module Settings,選擇Libraries,點擊加號,java,把我們解壓縮的spark-2.3.2-bin-hadoop2.7的 jars目錄下所有jar包選中,點擊ok,給依賴包重取個名字,再次點擊ok,這樣所有依賴的jar包就都有了。
在你的項目下新建input文件夾,裡面可以新建一個或兩個txt文件,裡面寫上hello world,hello spark等文字,再新建一個WordCount的object文件,輸入以下內容(我們的程序會自動讀取input文件夾裡的文件):
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
object WordCount {
def main(args: Array[String]): Unit = {
// local模式
// 創建SparkConf對象
// 設定Spark計算框架的運行環境
// app id
val config:SparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount")
// 創建spark上下文對象
val sc = new SparkContext(config)
// 讀取文件,將文件內容一行行的讀取出來
val unit: RDD[String] = sc.textFile("input")
// 將一行行的數據分解為一個個的單詞
val unit1: RDD[String] = unit.flatMap(_.split(" "))
// 為了統計方便,將單詞數據進行結構的轉換
val unit2: RDD[(String, Int)] = unit1.map((_,1))
// 對轉換結構後的數據進行分組聚合
val unit3: RDD[(String, Int)] = unit2.reduceByKey(_+_)
// 收集結果並循環列印
unit3.collect().foreach(println)
}
}
右擊run,如果能正確地在控制臺列印出結果,那麼恭喜你,你的第一個Spark程序已經運行成功了!
五、Spark中的rdd和算子
RDD(Resilient Distributed Dataset)叫做彈性分布式數據集,是Spark中最基本的數據抽象。代碼中是一個抽象類,它代表一個不可變、可分區、裡面的元素可並行計算的集合。
在Spark中,RDD被表示為對象,主要操作方法有轉換及動作兩大方面。
通過對象上的方法調用來對RDD進行轉換。經過一系列的transformations定義RDD之後,就可以調用actions觸發RDD的計算,action可以是向應用程式返回結果(count, collect等),或者是向存儲系統保存數據(saveAsTextFile等)。在Spark中,只有遇到action,才會執行RDD的計算(即延遲計算),這樣在運行時可以通過管道的方式傳輸多個轉換。
5.1 RDD轉換(transformations)
整體上分為Value類型和Key-Value類型;
a、Value類型主要有:
map(func);
mapPartitions(func);
mapPartitionsWithIndex(func);
flatMap(func);
glom;
groupBy(func);
filter(func);
sample(withReplacement,fraction,seed);
distinct([numTasks]));
coalesce(numPartitions);
sortBy(func,[ascending], [numTasks]);
pipe(command, [envVars])
b、雙Value類型主要有:
union(otherDataset);
subtract (otherDataset);
intersection(otherDataset);
cartesian(otherDataset);
zip(otherDataset)
c、Key-Value類型主要有:
partitionBy;
groupByKey;
reduceByKey(func,[numTasks]);
aggregateByKey;
foldByKey;
combineByKey[C];
sortByKey([ascending], [numTasks]);
mapValues;
join(otherDataset, [numTasks]);
cogroup(otherDataset, [numTasks])
5.2 RDD的執行(actions),主要有:
reduce(func);
collect();
count();
first();
take(n);
takeOrdered(n);
aggregate;
fold(num)(func);
saveAsTextFile(path);
saveAsSequenceFile(path);
saveAsObjectFile(path);
countByKey();
foreach(func)
每個算子都有其不同的用法,由於篇幅關係,關於RDD的內容就先聊到這裡,相關算子介紹請看下期~
相關知識:
centos7 分布式集群hadoop與hive安裝
微信公眾號: