來源 | Alice菌
責編 | Carol
封圖 | CSDN 下載於視覺中國
相信很多小夥伴已經接觸過 SparkStreaming 了,理論就不講太多了,今天的內容主要是為大家帶來的是 SparkStreaming 整合 Kafka 的教程。
文中含代碼,感興趣的朋友可以複製動手試試!
Kafka回顧
正式開始之前,先讓我們來對Kafka回顧一波。
核心概念圖解Broker:安裝Kafka服務的機器就是一個broker
Producer:消息的生產者,負責將數據寫入到broker中(push)
Consumer:消息的消費者,負責從kafka中拉取數據(pull),老版本的消費者需要依賴zk,新版本的不需要
Topic: 主題,相當於是數據的一個分類,不同topic存放不同業務的數據 –主題:區分業務
Replication:副本,數據保存多少份(保證數據不丟失) –副本:數據安全
Partition:分區,是一個物理的分區,一個分區就是一個文件,一個Topic可以有1~n個分區,每個分區都有自己的副本 –分區:並發讀寫
Consumer Group:消費者組,一個topic可以有多個消費者/組同時消費,多個消費者如果在一個消費者組中,那麼他們不能重複消費數據 –消費者組:提高消費者消費速度、方便統一管理
注意[1]:一個Topic可以被多個消費者或者組訂閱,一個消費者/組也可以訂閱多個主題
注意[2]:讀數據只能從Leader讀, 寫數據也只能往Leader寫,Follower會從Leader那裡同步數據過來做副本!!!
常用命令啟動kafka
/export/servers/kafka/bin/kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties
停止kafka
/export/servers/kafka/bin/kafka-server-stop.sh
查看topic信息
/export/servers/kafka/bin/kafka-topics.sh --list --zookeeper node01:2181
創建topic
/export/servers/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 3 --topic test
查看某個topic信息
/export/servers/kafka/bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test
刪除topic
/export/servers/kafka/bin/kafka-topics.sh --zookeeper node01:2181 --delete --topic test
啟動生產者–控制臺的生產者一般用於測試
/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092 --topic spark_kafka
啟動消費者–控制臺的消費者一般用於測試
/export/servers/kafka/bin/kafka-console-consumer.sh --zookeeper node01:2181 --topic spark_kafka--from-beginning
消費者連接到borker的地址
/export/servers/kafka/bin/kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic spark_kafka --from-beginning
整合kafka兩種模式說明
這同時也是一個面試題的熱點。
開發中我們經常會利用SparkStreaming實時地讀取kafka中的數據然後進行處理,在spark1.3版本後,kafkaUtils裡面提供了兩種創建DStream的方法:
1、Receiver接收方式:
KafkaUtils.createDstream(開發中不用,了解即可,但是面試可能會問)。Receiver作為常駐的Task運行在Executor等待數據,但是一個Receiver效率低,需要開啟多個,再手動合併數據(union),再進行處理,很麻煩Receiver哪臺機器掛了,可能會丟失數據,所以需要開啟WAL(預寫日誌)保證數據安全,那麼效率又會降低!Receiver方式是通過zookeeper來連接kafka隊列,調用Kafka高階API,offset存儲在zookeeper,由Receiver維護。spark在消費的時候為了保證數據不丟也會在Checkpoint中存一份offset,可能會出現數據不一致所以不管從何種角度來說,Receiver模式都不適合在開發中使用了,已經淘汰了2、Direct直連方式
KafkaUtils.createDirectStream(開發中使用,要求掌握)Direct方式是直接連接kafka分區來獲取數據,從每個分區直接讀取數據大大提高了並行能力Direct方式調用Kafka低階API(底層API),offset自己存儲和維護,默認由Spark維護在checkpoint中,消除了與zk不一致的情況當然也可以自己手動維護,把offset存在mysql、redis中所以基於Direct模式可以在開發中使用,且藉助Direct模式的特點+手動操作可以保證數據的Exactly once 精準一次總結:
Receiver接收方式多個Receiver接受數據效率高,但有丟失數據的風險開啟日誌(WAL)可防止數據丟失,但寫兩遍數據效率低。Zookeeper維護offset有重複消費數據可能。使用高層次的APIDirect直連方式不使用Receiver,直接到kafka分區中讀取數據不使用日誌(WAL)機制Spark自己維護offset使用低層次的API擴展:關於消息語義
注意:
開發中SparkStreaming和kafka集成有兩個版本:0.8及0.10+
0.8版本有Receiver和Direct模式(但是0.8版本生產環境問題較多,在Spark2.3之後不支持0.8版本了)。
0.10以後只保留了direct模式(Reveiver模式不適合生產環境),並且0.10版本API有變化(更加強大)
結論:
我們學習和開發都直接使用0.10版本中的direct模式,但是關於Receiver和Direct的區別面試的時候要能夠答得上來
spark-streaming-kafka-0-8(了解)
1.Receiver
KafkaUtils.createDstream使用了receivers來接收數據,利用的是Kafka高層次的消費者api,偏移量由Receiver維護在zk中,對於所有的receivers接收到的數據將會保存在Spark executors中,然後通過Spark Streaming啟動job來處理這些數據,默認會丟失,可啟用WAL日誌,它同步將接受到數據保存到分布式文件系統上比如HDFS。保證數據在出錯的情況下可以恢復出來。儘管這種方式配合著WAL機制可以保證數據零丟失的高可靠性,但是啟用了WAL效率會較低,且無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。
(官方現在已經不推薦這種整合方式。)
準備工作1)啟動zookeeper集群
zkServer.shstart
2)啟動kafka集群
kafka-server-start.sh /export/servers/kafka/config/server.properties
3.創建topic
kafka-topics.sh--create--zookeepernode01:2181--replication-factor 1 --partitions 3 --topicspark_kafka
4.通過shell命令向topic發送消息
kafka-console-producer.sh--broker-listnode01:9092--topicspark_kafka
5.添加kafka的pom依賴
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.2.0</version></dependency>
API通過receiver接收器獲取kafka中topic數據,可以並行運行更多的接收器讀取kafak topic中的數據,這裡為3個
val receiverDStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => { val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics) stream })
如果啟用了WAL(spark.streaming.receiver.writeAheadLog.enable=true)可以設置存儲級別(默認StorageLevel.MEMORY_AND_DISK_SER_2)
代碼演示
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.{SparkConf, SparkContext}import scala.collection.immutableobject SparkKafka { def main(args: Array[String]): Unit = {//1.創建StreamingContextval config: SparkConf = new SparkConf().setAppName("SparkStream").setMaster("local[*]") .set("spark.streaming.receiver.writeAheadLog.enable", "true")//開啟WAL預寫日誌,保證數據源端可靠性val sc = new SparkContext(config) sc.setLogLevel("WARN")val ssc = new StreamingContext(sc,Seconds(5)) ssc.checkpoint("./kafka")//==============================================//2.準備配置參數val zkQuorum = "node01:2181,node02:2181,node03:2181"val groupId = "spark"val topics = Map("spark_kafka" -> 2)//2表示每一個topic對應分區都採用2個線程去消費,//ssc的rdd分區和kafka的topic分區不一樣,增加消費線程數,並不增加spark的並行處理數據數量//3.通過receiver接收器獲取kafka中topic數據,可以並行運行更多的接收器讀取kafak topic中的數據,這裡為3個val receiverDStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics) stream })//4.使用union方法,將所有receiver接受器產生的Dstream進行合併val allDStream: DStream[(String, String)] = ssc.union(receiverDStream)//5.獲取topic的數據(String, String) 第1個String表示topic的名稱,第2個String表示topic的數據valdata: DStream[String] = allDStream.map(_._2)//==============================================//6.WordCountval words: DStream[String] = data.flatMap(_.split(" "))val wordAndOne: DStream[(String, Int)] = words.map((_, 1))val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _) result.print() ssc.start() ssc.awaitTermination() }}
2.Direct
Direct方式會定期地從kafka的topic下對應的partition中查詢最新的偏移量,再根據偏移量範圍在每個batch裡面處理數據,Spark通過調用kafka簡單的消費者API讀取一定範圍的數據。
Direct的缺點是無法使用基於zookeeper的kafka監控工具Direct相比基於Receiver方式有幾個優點:簡化並行不需要創建多個kafka輸入流,然後union它們,sparkStreaming將會創建和kafka分區數一樣的rdd的分區數,而且會從kafka中並行讀取數據,spark中RDD的分區數和kafka中的分區數據是一一對應的關係。高效 Receiver實現數據的零丟失是將數據預先保存在WAL中,會複製一遍數據,會導致數據被拷貝兩次,第一次是被kafka複製,另一次是寫到WAL中。而Direct不使用WAL消除了這個問題。恰好一次語義(Exactly-once-semantics)Receiver讀取kafka數據是通過kafka高層次api把偏移量寫入zookeeper中,雖然這種方法可以通過數據保存在WAL中保證數據不丟失,但是可能會因為sparkStreaming和ZK中保存的偏移量不一致而導致數據被消費了多次。 Direct的Exactly-once-semantics(EOS)通過實現kafka低層次api,偏移量僅僅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的問題。
APIKafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
代碼演示
import kafka.serializer.StringDecoderimport org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.{SparkConf, SparkContext}object SparkKafka2 { def main(args: Array[String]): Unit = {//1.創建StreamingContextval config: SparkConf = new SparkConf().setAppName("SparkStream").setMaster("local[*]")val sc = new SparkContext(config) sc.setLogLevel("WARN")val ssc = new StreamingContext(sc,Seconds(5)) ssc.checkpoint("./kafka")//==============================================//2.準備配置參數val kafkaParams = Map("metadata.broker.list" -> "node01:9092,node02:9092,node03:9092", "group.id" -> "spark")val topics = Set("spark_kafka")val allDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)//3.獲取topic的數據valdata: DStream[String] = allDStream.map(_._2)//==============================================//WordCountval words: DStream[String] = data.flatMap(_.split(" "))val wordAndOne: DStream[(String, Int)] = words.map((_, 1))val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _) result.print() ssc.start() ssc.awaitTermination() }}
spark-streaming-kafka-0-10
說明spark-streaming-kafka-0-10版本中,API有一定的變化,操作更加靈活,開發中使用
pom.xml<!--<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>${spark.version}</version></dependency>--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency>
API:http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
創建topic/export/servers/kafka/bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 3 --partitions 3 --topic spark_kafka
啟動生產者/export/servers/kafka/bin/kafka-console-producer.sh --broker-list node01:9092,node01:9092,node01:9092 --topic spark_kafka
代碼演示import org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.{SparkConf, SparkContext}object SparkKafkaDemo { def main(args: Array[String]): Unit = {//1.創建StreamingContext//spark.master should be set as local[n], n > 1 val conf = new SparkConf().setAppName("wc").setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("WARN") val ssc = new StreamingContext(sc,Seconds(5))//5表示5秒中對數據進行切分形成一個RDD//準備連接Kafka的參數 val kafkaParams = Map[String, Object]("bootstrap.servers" -> "node01:9092,node02:9092,node03:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "SparkKafkaDemo",//earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費//latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據//none:topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常//這裡配置latest自動重置偏移量為最新的偏移量,即如果有偏移量從偏移量位置開始消費,沒有偏移量從新來的數據開始消費"auto.offset.reset" -> "latest",//false表示關閉自動提交.由spark幫你提交到Checkpoint或程式設計師手動維護"enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("spark_kafka")//2.使用KafkaUtil連接Kafak獲取數據 val recordDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent,//位置策略,源碼強烈推薦使用該策略,會讓Spark的Executor和Kafka的Broker均勻對應 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))//消費策略,源碼強烈推薦使用該策略//3.獲取VALUE數據 val lineDStream: DStream[String] = recordDStream.map(_.value())//_指的是ConsumerRecord val wrodDStream: DStream[String] = lineDStream.flatMap(_.split(" ")) //_指的是發過來的value,即一行數據 val wordAndOneDStream: DStream[(String, Int)] = wrodDStream.map((_,1)) val result: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_) result.print() ssc.start()//開啟 ssc.awaitTermination()//等待優雅停止 }}
好了,本篇主要講解的 SparkStreaming 整合 Kafka 的過程,並帶大家複習了一波Kafka的基礎知識,如果對你有用的話,麻煩動手手點個「在看」吧~
本文由作者首發 CSDN 博客,原文連結:
https://blog.csdn.net/weixin_44318830/article/details/105612516