超詳細!一文詳解 SparkStreaming 如何整合 Kafka!附代碼可實踐

2020-12-19 CSDN

來源 | Alice菌

責編 | Carol

封圖 | CSDN 下載於視覺中國

相信很多小夥伴已經接觸過 SparkStreaming 了,理論就不講太多了,今天的內容主要是為大家帶來的是 SparkStreaming 整合 Kafka 的教程。

文中含代碼,感興趣的朋友可以複製動手試試!

Kafka回顧

正式開始之前,先讓我們來對Kafka回顧一波。

核心概念圖解Broker:安裝Kafka服務的機器就是一個broker

Producer:消息的生產者,負責將數據寫入到broker中(push)

Consumer:消息的消費者,負責從kafka中拉取數據(pull),老版本的消費者需要依賴zk,新版本的不需要

Topic: 主題,相當於是數據的一個分類,不同topic存放不同業務的數據 –主題:區分業務

Replication:副本,數據保存多少份(保證數據不丟失) –副本:數據安全

Partition:分區,是一個物理的分區,一個分區就是一個文件,一個Topic可以有1~n個分區,每個分區都有自己的副本 –分區:並發讀寫

Consumer Group:消費者組,一個topic可以有多個消費者/組同時消費,多個消費者如果在一個消費者組中,那麼他們不能重複消費數據 –消費者組:提高消費者消費速度、方便統一管理

注意[1]:一個Topic可以被多個消費者或者組訂閱,一個消費者/組也可以訂閱多個主題

注意[2]:讀數據只能從Leader讀, 寫數據也只能往Leader寫,Follower會從Leader那裡同步數據過來做副本!!!

常用命令啟動kafka

/export/servers/kafka/bin/kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties

停止kafka

/export/servers/kafka/bin/kafka-server-stop.sh

查看topic信息

/export/servers/kafka/bin/kafka-topics.sh --list --zookeeper node01:2181

創建topic

/export/servers/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 3 --topic test

查看某個topic信息

/export/servers/kafka/bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test

刪除topic

/export/servers/kafka/bin/kafka-topics.sh --zookeeper node01:2181 --delete --topic test

啟動生產者–控制臺的生產者一般用於測試

/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092 --topic spark_kafka

啟動消費者–控制臺的消費者一般用於測試

/export/servers/kafka/bin/kafka-console-consumer.sh --zookeeper node01:2181 --topic spark_kafka--from-beginning

消費者連接到borker的地址

/export/servers/kafka/bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic spark_kafka --from-beginning

整合kafka兩種模式說明

這同時也是一個面試題的熱點。

開發中我們經常會利用SparkStreaming實時地讀取kafka中的數據然後進行處理,在spark1.3版本後,kafkaUtils裡面提供了兩種創建DStream的方法:

1、Receiver接收方式:

KafkaUtils.createDstream(開發中不用,了解即可,但是面試可能會問)。Receiver作為常駐的Task運行在Executor等待數據,但是一個Receiver效率低,需要開啟多個,再手動合併數據(union),再進行處理,很麻煩Receiver哪臺機器掛了,可能會丟失數據,所以需要開啟WAL(預寫日誌)保證數據安全,那麼效率又會降低!Receiver方式是通過zookeeper來連接kafka隊列,調用Kafka高階API,offset存儲在zookeeper,由Receiver維護。spark在消費的時候為了保證數據不丟也會在Checkpoint中存一份offset,可能會出現數據不一致所以不管從何種角度來說,Receiver模式都不適合在開發中使用了,已經淘汰了2、Direct直連方式

KafkaUtils.createDirectStream(開發中使用,要求掌握)Direct方式是直接連接kafka分區來獲取數據,從每個分區直接讀取數據大大提高了並行能力Direct方式調用Kafka低階API(底層API),offset自己存儲和維護,默認由Spark維護在checkpoint中,消除了與zk不一致的情況當然也可以自己手動維護,把offset存在mysql、redis中所以基於Direct模式可以在開發中使用,且藉助Direct模式的特點+手動操作可以保證數據的Exactly once 精準一次總結:

Receiver接收方式多個Receiver接受數據效率高,但有丟失數據的風險開啟日誌(WAL)可防止數據丟失,但寫兩遍數據效率低。Zookeeper維護offset有重複消費數據可能。使用高層次的APIDirect直連方式不使用Receiver,直接到kafka分區中讀取數據不使用日誌(WAL)機制Spark自己維護offset使用低層次的API擴展:關於消息語義

注意:

開發中SparkStreaming和kafka集成有兩個版本:0.8及0.10+

0.8版本有Receiver和Direct模式(但是0.8版本生產環境問題較多,在Spark2.3之後不支持0.8版本了)。

0.10以後只保留了direct模式(Reveiver模式不適合生產環境),並且0.10版本API有變化(更加強大)

結論:

我們學習和開發都直接使用0.10版本中的direct模式,但是關於Receiver和Direct的區別面試的時候要能夠答得上來

spark-streaming-kafka-0-8(了解)

1.Receiver

KafkaUtils.createDstream使用了receivers來接收數據,利用的是Kafka高層次的消費者api,偏移量由Receiver維護在zk中,對於所有的receivers接收到的數據將會保存在Spark executors中,然後通過Spark Streaming啟動job來處理這些數據,默認會丟失,可啟用WAL日誌,它同步將接受到數據保存到分布式文件系統上比如HDFS。保證數據在出錯的情況下可以恢復出來。儘管這種方式配合著WAL機制可以保證數據零丟失的高可靠性,但是啟用了WAL效率會較低,且無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。

(官方現在已經不推薦這種整合方式。)

準備工作1)啟動zookeeper集群

zkServer.shstart

2)啟動kafka集群

kafka-server-start.sh /export/servers/kafka/config/server.properties

3.創建topic

kafka-topics.sh--create--zookeepernode01:2181--replication-factor 1 --partitions 3 --topicspark_kafka

4.通過shell命令向topic發送消息

kafka-console-producer.sh--broker-listnode01:9092--topicspark_kafka

5.添加kafka的pom依賴

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.2.0</version></dependency>

API通過receiver接收器獲取kafka中topic數據,可以並行運行更多的接收器讀取kafak topic中的數據,這裡為3個

val receiverDStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => { val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics) stream })

如果啟用了WAL(spark.streaming.receiver.writeAheadLog.enable=true)可以設置存儲級別(默認StorageLevel.MEMORY_AND_DISK_SER_2)

代碼演示

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.{SparkConf, SparkContext}import scala.collection.immutableobject SparkKafka { def main(args: Array[String]): Unit = {//1.創建StreamingContextval config: SparkConf = new SparkConf().setAppName("SparkStream").setMaster("local[*]") .set("spark.streaming.receiver.writeAheadLog.enable", "true")//開啟WAL預寫日誌,保證數據源端可靠性val sc = new SparkContext(config) sc.setLogLevel("WARN")val ssc = new StreamingContext(sc,Seconds(5)) ssc.checkpoint("./kafka")//==============================================//2.準備配置參數val zkQuorum = "node01:2181,node02:2181,node03:2181"val groupId = "spark"val topics = Map("spark_kafka" -> 2)//2表示每一個topic對應分區都採用2個線程去消費,//ssc的rdd分區和kafka的topic分區不一樣,增加消費線程數,並不增加spark的並行處理數據數量//3.通過receiver接收器獲取kafka中topic數據,可以並行運行更多的接收器讀取kafak topic中的數據,這裡為3個val receiverDStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics) stream })//4.使用union方法,將所有receiver接受器產生的Dstream進行合併val allDStream: DStream[(String, String)] = ssc.union(receiverDStream)//5.獲取topic的數據(String, String) 第1個String表示topic的名稱,第2個String表示topic的數據valdata: DStream[String] = allDStream.map(_._2)//==============================================//6.WordCountval words: DStream[String] = data.flatMap(_.split(" "))val wordAndOne: DStream[(String, Int)] = words.map((_, 1))val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _) result.print() ssc.start() ssc.awaitTermination() }}

2.Direct

Direct方式會定期地從kafka的topic下對應的partition中查詢最新的偏移量,再根據偏移量範圍在每個batch裡面處理數據,Spark通過調用kafka簡單的消費者API讀取一定範圍的數據。

Direct的缺點是無法使用基於zookeeper的kafka監控工具Direct相比基於Receiver方式有幾個優點:簡化並行不需要創建多個kafka輸入流,然後union它們,sparkStreaming將會創建和kafka分區數一樣的rdd的分區數,而且會從kafka中並行讀取數據,spark中RDD的分區數和kafka中的分區數據是一一對應的關係。高效 Receiver實現數據的零丟失是將數據預先保存在WAL中,會複製一遍數據,會導致數據被拷貝兩次,第一次是被kafka複製,另一次是寫到WAL中。而Direct不使用WAL消除了這個問題。恰好一次語義(Exactly-once-semantics)Receiver讀取kafka數據是通過kafka高層次api把偏移量寫入zookeeper中,雖然這種方法可以通過數據保存在WAL中保證數據不丟失,但是可能會因為sparkStreaming和ZK中保存的偏移量不一致而導致數據被消費了多次。 Direct的Exactly-once-semantics(EOS)通過實現kafka低層次api,偏移量僅僅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的問題。

APIKafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

代碼演示

import kafka.serializer.StringDecoderimport org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.{SparkConf, SparkContext}object SparkKafka2 { def main(args: Array[String]): Unit = {//1.創建StreamingContextval config: SparkConf = new SparkConf().setAppName("SparkStream").setMaster("local[*]")val sc = new SparkContext(config) sc.setLogLevel("WARN")val ssc = new StreamingContext(sc,Seconds(5)) ssc.checkpoint("./kafka")//==============================================//2.準備配置參數val kafkaParams = Map("metadata.broker.list" -> "node01:9092,node02:9092,node03:9092", "group.id" -> "spark")val topics = Set("spark_kafka")val allDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)//3.獲取topic的數據valdata: DStream[String] = allDStream.map(_._2)//==============================================//WordCountval words: DStream[String] = data.flatMap(_.split(" "))val wordAndOne: DStream[(String, Int)] = words.map((_, 1))val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _) result.print() ssc.start() ssc.awaitTermination() }}

spark-streaming-kafka-0-10

說明spark-streaming-kafka-0-10版本中,API有一定的變化,操作更加靈活,開發中使用

pom.xml<!--<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>${spark.version}</version></dependency>--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency>

API:http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

創建topic/export/servers/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 3 --topic spark_kafka

啟動生產者/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092,node01:9092,node01:9092 --topic spark_kafka

代碼演示import org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.{SparkConf, SparkContext}object SparkKafkaDemo { def main(args: Array[String]): Unit = {//1.創建StreamingContext//spark.master should be set as local[n], n > 1 val conf = new SparkConf().setAppName("wc").setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val ssc = new StreamingContext(sc,Seconds(5))//5表示5秒中對數據進行切分形成一個RDD//準備連接Kafka的參數 val kafkaParams = Map[String, Object]("bootstrap.servers" -> "node01:9092,node02:9092,node03:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "SparkKafkaDemo",//earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費//latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據//none:topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常//這裡配置latest自動重置偏移量為最新的偏移量,即如果有偏移量從偏移量位置開始消費,沒有偏移量從新來的數據開始消費"auto.offset.reset" -> "latest",//false表示關閉自動提交.由spark幫你提交到Checkpoint或程式設計師手動維護"enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("spark_kafka")//2.使用KafkaUtil連接Kafak獲取數據 val recordDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent,//位置策略,源碼強烈推薦使用該策略,會讓Spark的Executor和Kafka的Broker均勻對應 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))//消費策略,源碼強烈推薦使用該策略//3.獲取VALUE數據 val lineDStream: DStream[String] = recordDStream.map(_.value())//_指的是ConsumerRecord val wrodDStream: DStream[String] = lineDStream.flatMap(_.split(" ")) //_指的是發過來的value,即一行數據 val wordAndOneDStream: DStream[(String, Int)] = wrodDStream.map((_,1)) val result: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_) result.print() ssc.start()//開啟 ssc.awaitTermination()//等待優雅停止 }}

好了,本篇主要講解的 SparkStreaming 整合 Kafka 的過程,並帶大家複習了一波Kafka的基礎知識,如果對你有用的話,麻煩動手手點個「在看」吧~

本文由作者首發 CSDN 博客,原文連結:

https://blog.csdn.net/weixin_44318830/article/details/105612516

相關焦點

  • Spark Streaming 對比 JStorm
    下圖簡單的給出了當前我們使用Spark Streaming與JStorm的對比:002Spark Streaming設計與封裝在接入Spark Streaming的初期,首先需要考慮的是如何基於現有的實時平臺無縫的嵌入
  • 大數據入門:流處理框架Spark Streaming與Storm
    這個拓撲將會被提交給集群,由集群中的主控節點(master node)分發代碼,將任務分配給工作節點(worker node)執行。Storm與Spark streaming的應用場景Storm:Storm需要純實時的環境,不能忍受1秒以上的延遲環境,比如銀行類的金融系統。
  • 數據分析工程師面試集錦5——Spark面試指南
    導語本篇文章為大家帶來spark面試指南,文內會有兩種題型,問答題和代碼題,題目大部分來自於網絡上,有小部分是來自於工作中的總結,每個題目會給出一個參考答案。為什麼考察Spark?Spark作為大數據組件中的執行引擎,具備以下優勢特性。
  • 大牛嘔心力作——Kafka開發實戰,助你徜徉大數據時代
    但隨著數據量越來越大,如何實時準確地收集並分析數據成為擺在所有從業人員面前的難題。而這時,kafka的出現算是解決了這個問題。Kafka的核心功能是什麼?一言以蔽之,高性能的消息發送與高性能的消息消費。接下來咱們就進入kafka世界,深入實戰探討kafka實戰開發。
  • 大白話+13張圖解 Kafka
    一、Kafka基礎消息系統的作用應該大部份小夥伴都清楚,用機油裝箱舉個例子所以消息系統就是如上圖我們所說的倉庫,能在中間過程作為緩存,並且實現解耦合的作用。Consumer Group - 消費者組我們在消費數據時會在代碼裡面指定一個group.id,這個id代表的是消費組的名字,而且這個group.id就算不設置,系統也會默認設置conf.setProperty("group.id","tellYourDream")
  • Kafka這些名詞都說不出所以然,您竟然敢說自己精通kafka
    Kafka具有快速,可擴展,耐用和容錯的發布、訂閱消息傳遞系統,被用於JMS(Java消息隊列服務)。Kafka具有更高的吞吐量,可靠性和複製特性,使其適用於跟蹤服務呼叫或物聯網傳感器數據。kafka與內存微服務一起使用,提供可靠性,可用於向 CEP(複雜事件流系統)和IoT / IFTTT式自動化系統提供事件。Kafka可以與Flume ,Spark Streaming,Storm,HBase,Flink和Spark一起工作,以實時接收,分析和處理流數據。
  • Spark錦標賽第二賽季超燃啟動 再掀主機遊戲新狂歡
    Spark錦標賽第二賽季超燃啟動 再掀主機遊戲新狂歡 來源:www.18183.com作者:阿姆斯特朗雷時間:2020-06-03 分享到: 5月,機核網與華奧電競合作舉辦的主機電競賽事——Spark 錦標賽——第一賽季完美落下帷幕
  • 代碼詳解:一文掌握神經網絡超參數調優
    有些人可能會疑惑——如何在運行模型過程中檢測其性能?這是個好問題,答案就是使用回叫。回叫:訓練模型過程中進行監測通過使用回叫,可在訓練的任何階段監測模型。回叫是指對訓練程序中特定階段使用的一系列功能。使用回叫,可在訓練過程中觀察模型內部狀態及數據。可向順序或模型分類的the .fit()方法傳輸一系列回叫(作為關鍵詞變元回叫)。
  • 大數據入門:Spark RDD、DataFrame、DataSet
    RDD、DataFrame、DataSet三者的共性RDD、DataFrame、Dataset全都是spark平臺下的分布式彈性數據集,為處理超大型數據提供便利。三者都會根據spark的內存情況自動緩存運算,這樣即使數據量很大,也不用擔心會內存溢出。三者都有partition的概念。三者有許多共同的函數,如filter,排序等。
  • ATTENTION | 您有一份光華MBA整合實踐邀請函
    北大光華MBA整合實踐項目自2011年啟動,先後與來自金融、網際網路、製造、能源、醫療、藝術等眾多行業的百餘家優秀企業合作,更有企業多次參與其中,與光華師生共同探討、解決企業經營的真實課題。讓我們一起聽聽往期參加企業對於整合實踐項目的反饋。
  • 《噬血代碼》毒蝶boss怎麼打 狂襲毒蝶打法技巧詳解
    遊戲中有些boss挑戰中需要掌握一些技巧,這裡給大家帶來了噬血代碼狂襲毒蝶打法技巧詳解,詳情一起看下具體的操作吧。 推薦閱讀: 噬血代碼無窮無盡結局怎麼出 噬血代碼捏臉教程視頻分享 狂襲毒蝶打法技巧詳解 噬血代碼狂襲毒蝶solo攻略 毒蝶boss打法技巧 狂襲蝴蝶這個BOSS是一個女人、蝴蝶與蛇的結合體,她會使用毒液強化攻擊。
  • 深度|一文帶你了解「無代碼開發」始末
    「無代碼運動」是幾千年以來驅動技術創新的核心原則的演變:不斷對以前僅一小部分人可用的過程工具或介質進行公民化拓展並通過此倍增人類創造的潛力。上文中我們提到的軟體工程的歷史則是「如何更好地解決次要任務」的歷史。而如今的無代碼產品則是在新的時代背景下對於「次要任務」的新的解答,其核心是解決了兩個任務關鍵節點之間的根本脫節。
  • Hello,Funspark!Hello,World——華奧電競品牌全新升級
    以「極致生活」和「電競精神」為基礎,華奧全新的賽事品牌Funspark 誕生了。和過去相比,如今的電競不僅衍生出了許多曾經想像不到的業態,連賽事本身輸出的內容也在變得極為豐富。不管對一個新產業,還是對於一個賽事,豐富都意味著未來可能出現的多種可能性。Funspark也一樣。
  • 資產安全繫於代碼 DeFi項目如何避免成為HackFi?
    比如知名DeFi項目Yam,Yam於北京時間8月12日啟動後不到24小時,合約質押的資產就已經超過4.6億美元,然而由於一個小小的增發漏洞,項目就宣告失敗,距離啟動也不過36小時,代幣YAM也從109美元跌至0.9美元,跌幅超99%,而這也是DeFi眾多安全事件的一個縮影。
  • 神策數據:《十大數據分析模型詳解》白皮書上線!
    今日,神策數據推出數據分析模型系列白皮書之《十大數據分析模型詳解》,基於多維事件模型,總結歸納十大數據分析模型,內附多種分析模型的實際應用場景案例!  以下內容節選自該白皮書2017年,神策數據曾推出——八大數據分析模型,詳細解釋了各種分析模型的定義、適用範圍、分析思路、使用方法等等,一經推出,好評如潮。2020年,經過更為豐富的理論沉澱以及業務實踐積累,神策數據迭代出《十大數據分析模型詳解》白皮書,內附多種分析模型的實際應用場景案例,內容更易理解,分析角度更加豐富!
  • spark啟動worker失敗 - CSDN
    -1.4.0-bin-hadoop2.6]# vi /etc/profile [root@cdh1 spark-1.4.0-bin-hadoop2.6]# source /etc/profile[root@cdh1 spark-1.4.0-bin-hadoop2.6]# spark -versionbash: spark: command not found[root@cdh1 spark-1.4.0
  • 動物之森幽幽任務超詳細攻略 附動物森友會幽幽出現時間
    動物之森幽幽任務超詳細攻略 附動物森友會幽幽出現時間 動物之森幽幽會交給大家很多任務,不過幽幽也有不同的出現時間,一些小夥伴還不清楚幽幽的任務到底怎麼做,其中的任務怎麼完成,下面就來為大家詳細的介紹一下
  • 如何理解Kafka的消息可靠性策略?
    導語 | Kafka作為一款性能優秀的消息隊列,主要用於異步、削峰、解耦處理,在分布式事務中有著廣泛地應用,但仍有很多開發者在運用過程中存在疑惑。文本將為大家由淺入深剖析Kafka基礎原理以及它的消息可靠性策略,幫助大家理解這一技術知識。文章作者:張璇,騰訊應用開發工程師。
  • 《我的勇者》秒殺代碼如何獲取 秒殺代碼腳本攻略
    導 讀 我的勇者秒殺代碼怎麼獲取 秒殺代碼腳本攻略,秒殺代碼是什麼意思,玩家怎麼快速修改代碼腳本,秒怪的操作怎麼查看,不妨詳細了解下九遊小編帶來的新內容吧。