將物聯網數據和MQTT消息流式傳輸到Apache Kafka

2021-01-20 智能甄選

Apache Kafka是一個實時流媒體平臺,在大型和小型組織中得到廣泛採用。Kafka的分布式微服務架構和發布/訂閱協議使其成為在企業系統和應用程式之間移動實時數據的理想選擇。據一些人稱,超過三分之一的財富500強公司正在使用Kafka。在GitHub上,Kafka是最受歡迎的Apache項目之一,擁有超過11,000名明星和超過500名貢獻者。毫無疑問,Kafka是一個開源項目,正在改變組織在雲和數據中心內部移動數據的方式。

Kafka的體系結構已經過優化,可以以可擴展的方式儘可能快地在系統和應用程式之間傳輸數據。Kafka客戶端/生產者與Kafka集群緊密耦合,要求每個客戶端知道Kafka集群的IP位址並直接訪問所有單個節點。在可信網絡內部,這允許更改代理拓撲,這意味著可以通過直接從Kafka客戶端使用多個節點來擴展主題和分區。在大多數情況下,Kafka主題空間也保持相當平坦,因為通常使用多個分區來縮放單個Kafka主題。在Kafka系統中擁有數百甚至數千個主題通常是不可取的,但對大多數數據流使用一些主題。

對於物聯網使用情況,設備通過公共網際網路連接到數據中心或雲,Kafka架構不適合開箱即用。如果您嘗試通過Internet使用Kafka從數千甚至數百萬設備流式傳輸數據,則Apache Kafka架構不適用。單獨Kafka不適合物聯網使用案例有很多原因:

Kafka經紀人需要由客戶直接解決,這意味著客戶需要直接聯繫Kafka經紀人。專業物聯網部署通常使用負載均衡器作為其雲中的第一道防線,因此設備只需使用負載均衡器的IP位址連接到基礎架構,負載均衡器就可以充當代理。如果您希望您的設備直接連接到Kafka,那麼您的Kafka經紀人必須接觸公共網際網路。Kafka不支持大量主題。通過公共Internet連接數百萬個IoT設備時,通常會使用單個和唯一的主題(通常在主題名稱中包含一些唯一的IoT設備標識符),因此可以根據各個客戶端的權限限制讀寫操作。您不希望智能恆溫器被黑客入侵,並且用於竊聽系統中所有數據流的憑據。與用於物聯網協議的客戶端庫相比,Kafka客戶端相當複雜且資源密集。用於大多數程式語言的Kafka API非常簡單明了,但引擎蓋下卻存在很多複雜性。例如,客戶端將使用和維護與Kafka代理的多個TCP連接。物聯網部署通常具有受限設備,這些設備需要最小的佔用空間,但在設備端不需要非常高的吞吐量。默認情況下,Kafka客戶端針對吞吐量進行了優化Kafka客戶端需要穩定的TCP連接才能獲得最佳結果。許多物聯網用例涉及不可靠的網絡,例如聯網汽車或智能農業,因此物聯網設備需要始終如一地重新建立與卡夫卡的連接。將數萬甚至數百萬個客戶端連接到單個Kafka集群是不常見的(通常甚至根本不可能)。在物聯網用例中,通常有大量設備,它們同時連接到後端並不斷產生數據。Kafka缺少一些關鍵的物聯網功能。Kafka協議缺少諸如保持活力以及遺囑和遺囑等功能。這些功能對於構建具有彈性的物聯網解決方案非常重要,即使設備遇到意外斷開連接並且網絡不可靠也是如此。Kafka仍然為物聯網用例帶來了很多價值。物聯網解決方案創建了大量的實時數據,非常適合Kafka的處理。面臨的挑戰是如何將物聯網數據從設備橋接到Kafka集群?

許多實施物聯網用例的公司正在尋找集成MQTT和Kafka來處理其物聯網數據的選項。MQTT是另一種發布/訂閱協議,已成為連接IoT設備數據的標準。MQTT標準旨在通過不可靠的網絡連接大量物聯網設備,解決了卡夫卡的許多局限性。特別是,MQTT是一種輕量級協議,需要在每個設備上佔用較少的客戶端。它旨在通過不可靠的網絡安全地支持數百萬個連接,並在高延遲和低吞吐量環境中無縫工作。它包括物聯網功能,如保持活動,遺囑和遺囑功能,可靠消息的不同服務質量等級,以及客戶端負載平衡(共享訂閱)和為公共Internet通信設計的其他功能。主題是動態的,這意味著系統中可以有任意數量的MQTT主題,在MQTT伺服器群集中每個部署通常高達數千萬個主題。

雖然Kafka和MQTT有不同的設計目標,但兩者都能很好地協同工作。問題不是Kafka與MQTT,而是如何將兩個世界整合在一起,形成物聯網端到端數據管道。為了將MQTT消息集成到Kafka集群中,您需要某種類型的橋接器將MQTT消息轉發到Kafka。實現此類橋接有四種不同的體系結構方法:

Kafka Connect for MQTT

Kafka有一個名為Kafka Connect的擴展框架,它允許Kafka從其他系統中提取數據。Kafka Connect for MQTT充當MQTT客戶端,訂閱來自MQTT代理的所有消息。

如果您無法控制MQTT代理,那麼Kafka Connect for MQTT是一種很好的方法。這種方法允許Kafka攝取MQTT消息流。

使用Kafka Connect for MQTT存在性能和可伸縮性限制。如前所述,Kafka Connect for MQTT是一個MQTT客戶端,可以訂閱 通過代理的潛在所有MQTT消息。MQTT客戶端庫不用於處理極大量的MQTT消息,因此使用此方法的物聯網系統將存在性能和可伸縮性問題。

這種方法集中了業務和消息轉換邏輯,並創建了緊密耦合,這應該在分布式(微服務)體系結構中避免。業界領先的諮詢公司Thoughtworks 將此稱為反模式,甚至將Kafka納入其先前技術雷達出版物的「保留」類別。

MQTT代理

另一種方法是使用代理應用程式,該代理應用程式接受來自IoT設備的MQTT消息,但不實現發布/訂閱或任何MQTT會話功能,因此不是MQTT代理。IoT設備連接到MQTT代理,然後將MQTT消息推送到Kafka代理。

MQTT代理方法允許在Kafka部署中完成MQTT消息處理,因此管理和操作來自單個控制臺。MQTT代理通常是無狀態的,因此它(理論上)可以通過添加代理的多個實例來獨立於Kafka集群進行擴展。

MQTT代理的局限性在於它不是真正的MQTT實現。MQTT代理不是基於pub / sub,而是在設備和Kafka之間創建緊密耦合的流。MQTT pub / sub的好處是它創建了一個鬆散耦合的端點系統(設備或後端應用程式),可以在每個端點之間進行通信和移動數據。例如,MQTT允許兩個設備之間的通信,例如兩個連接的汽車可以相互通信,但MQTT代理應用程式只允許從汽車到Kafka集群的數據傳輸,而不是與另一輛汽車的數據傳輸。

一些Kafka MQTT代理應用程式支持QoS級別等功能。值得注意的是,只有當MQTT重新連接到同一個MQTT代理實例時,才能在連接丟失後恢復QoS消息流,如果使用的負載均衡器使用最小連接或循環策略進行擴展,則無法實現。因此,在MQTT中使用QoS級別的主要原因(無消息丟失)僅適用於此類場景中的穩定連接,這在大多數物聯網場景中是不切實際的假設。

使用這種方法的主要風險是代理不是一個功能齊全的MQTT代理,因此它不是MQTT規範定義的MQTT實現,因為它只實現了一個很小的子集,所以它不是標準化的解。為了正確使用MQTT和MQTT客戶端,需要一個功能齊全的MQTT代理。

如果消息丟失不是一個重要因素,並且如果不使用為可靠的IoT通信而設計的MQTT功能,則如果您只想通過Internet將數據單向發送到Kafka,則代理方法可能是一種輕量級替代方法。

建立自己的自定義橋梁

一些公司為卡夫卡橋建立了自己的MQTT。典型的方法是使用開源MQTT客戶端庫和開源Kafka客戶端庫創建應用程式。自定義應用程式負責在MQTT代理和Kafka實例之間轉置和路由數據。

這種方法面臨的主要挑戰是自定義應用程式通常不具備容錯能力和彈性。如果物聯網解決方案需要至少一次或完全一次消息傳遞的端到端保證,這就變得很重要。例如,設置為發送到自定義應用程式的服務質量等級1或2的MQTT消息將確認收到消息。但是,如果自定義應用程式在將消息轉發到Kafka之前崩潰,則消息將丟失。同樣,如果Kafka集群不可用,則自定義應用程式將需要緩衝MQTT消息。如果自定義應用程式在Kafka群集可用之前崩潰,則所有緩衝的消息都將丟失。要解決這些問題,

MQTT經紀人擴展

最後一種方法是擴展MQTT代理以創建包含本機Kafka協議的擴展。這允許MQTT代理充當一流的Kafka客戶端,並將IoT設備數據流式傳輸到多個Kafka集群。

要實現此方法,您需要具有對MQTT代理的訪問權限,並且代理需要能夠安裝擴展。

此方法允許IoT解決方案使用本機MQTT實現和本機Kafka實現。物聯網設備使用MQTT客戶端將數據發送到功能齊全的MQTT代理。MQTT代理擴展為包含本機Kafka客戶端,並將MQTT消息轉換為Kafka協議。這允許將物聯網數據路由到多個Kafka群集,同時路由到非Kafka應用程式。使用MQTT代理還可以訪問物聯網設備所需的所有MQTT功能,例如last will和tesament。像HiveMQ這樣的MQTT代理是為高可用性,持久性,性能和彈性而設計的,因此當Kafka不可寫時,消息可以緩存在代理上,因此IoT設備永遠不會丟失重要消息。所以,Kubernetes)。

Kafka的HiveMQ企業擴展

在與HiveMQ客戶的對話中,一些具有數百萬設備和非常高的消息吞吐量的操作集群,我們發現需要為Kafka創建MQTT代理擴展。我們的客戶希望從MQTT和Kafka協議的本機實現中受益,並提供兩種協議的所有交付保證。因此,我們很高興地宣布Kafka的HiveMQ企業擴展。

我們的客戶在聯合MQTT和Kafka解決方案中看到了巨大的價值。他們將Kafka視為在數據中心或雲環境中處理和分發實時數據的絕佳平臺。他們希望使用MQTT和HiveMQ將數據從設備移動到不同的後端系統。後端系統包括Kafka和非Kafka系統。他們還知道,如果他們試圖連接數百萬臺設備,如聯網汽車,他們需要使用原生且經過實戰考驗的MQTT實施,如HiveMQ。

Kafka的HiveMQ Enterprise Extension在HiveMQ代理中實現了本機Kafka協議。這允許MQTT消息與單個Kafka集群或多個Kafka集群同時無縫集成。它支持100%的整個MQTT 3和MQTT 5規範。我們甚至可以將數百萬個MQTT主題映射到有限數量的Kafka主題。最後,我們擴展了HiveMQ控制中心,以便監控寫入Kafka的MQTT消息。

我們很高興能將這款新產品帶給我們的HiveMQ客戶。這是使用Apache Kafka和IoT用例的最佳方法。可以在我們的Marketplace中下載 HiveMQ Enterprise Extension for Apache Kafka的免費試用版。將MQTT和Apache Kafka集成到物聯網用例中從未如此簡單。

相關焦點

  • 真的,關於 Kafka 入門看這一篇就夠了
    包括收集各種分布式應用的數據,生產各種操作的集中反饋,比如報警和報告;日誌記錄:Kafka 的基本概念來源於提交日誌,比如我們可以把資料庫的更新發送到 Kafka 上,用來記錄資料庫的更新時間,通過kafka以統一接口服務的方式開放給各種consumer,例如hadoop、Hbase、Solr等;流式處理:流式處理是有一個能夠提供多種應用程式的領域
  • 淺談大數據處理中的實時流式處理
    大數據的實時流式處理問題的特點數據源源不斷的到來;數據需要儘快的得到處理,不能產生積壓;處理之後的數據量依然巨大,仍然後TB級甚至PB級的數據量;處理的結果能夠儘快的展現;以上四個特點可以總結為數據的收集->數據的傳輸->數據的處理->數據的展現
  • 為什麼說,大數據是從流式計算開始切入的?
    大數據說了很多年,我說雷聲大,雨點小,這您同意嗎?為什麼?關鍵在創造的價值,如果僅僅是輔助決策,效果難以顯現,如何才能夠立竿見影?從技術上,對應的就是流式計算,因為它對應的是業務,能夠帶來收入的應用。什麼是流式大數據?有哪些應用?現在就讓我們一起回顧下流計算平臺的發展歷史,以及如何在企業中運用。
  • 流式數據處理介紹
    相比之下,流式處理能自然優雅地處理無止盡數據流,您可以檢測模式、檢查結果、多級別聚焦觀察、還可以輕鬆地同時觀察來自多個數據流。流處理天然地適合時間序列數據和隨時間變化的模式檢測。例如,如果您試圖檢測無止盡流中Web會話的長度則很難用批處理來檢測,因為某些會話將被分割到兩個不同的批處理中。流處理可以很容易地處理這種問題。你退一步想想,最連續的數據序列就是時間序列數據。
  • 流式處理框架及應用場景
    寫在前面的話:不要被技術嚇到哦 ,本文儘量寫的白話,致力為從事大數據的運營、諮詢規劃、需求以及想學習大數據的入門者提供知識分享@……@導讀:本文闡述實時處理誕生的背景,實時處理意義、應用場景和技術架構實現。
  • 物聯網架構各個層次使用的是什麼通訊協議
    三、NB-IoT,4G對比: NB-IoT低功耗,傳輸小數據,傳輸速度底,晶片模組和套餐便宜。目前NB基站還較少,不過華為在大力推這個通信方式,相信以後會普及。 2G/4G/5G:傳輸速度快和可以傳輸大的數據,但是功耗高,價格貴 WIFI和zigbee對比: wifi 功耗高,傳輸速率大11-54Mbps,功耗高10-50mA,距離短20-50m。一個路由只能加入較少設備。 zigbee功耗低,傳輸數據慢100Kbps,功耗20mA,可以中繼,距離遠幾百到幾千米。
  • 什麼是流式輸出?
    即:將頁面拆分成獨立的幾部分模塊,每個模塊有單獨的數據源和單獨的頁面模板,在server端流式的操作每個模塊進行業務邏輯處理和頁面模板的渲染,然後流式的將渲染出來的HTML輸出到網絡中,接著分塊的HTML數據在網絡中傳輸,接著流式的分塊的HTML在瀏覽器逐個渲染展示。
  • 使用Kafka本機模型伺服器進行流式機器學習
    機器學習(ML)包括有關歷史數據的模型訓練以及用於評分和預測的模型部署。雖然訓練大多是分批進行的,但評分通常需要大規模且可靠的實時功能。Apache Kafka在現代機器學習基礎架構中扮演著關鍵角色。下一代體系結構利用Kafka本機流模型伺服器而不是RPC(HTTP / gRPC)。
  • 一文帶你了解 Kafka 基本原理
    生產者(Producer):是能夠發布消息到話題的任何對象。 服務代理(Broker):已發布的消息保存在一組伺服器中,它們被稱為代理(Broker)或Kafka集群。 消費者(Consumer):可以訂閱一個或多個話題,並從Broker拉數據,從而消費這些已發布的消息。
  • 如何理解Kafka的消息可靠性策略?
    異常情況下,如果當數據發送到 leader後部分副本(f1和f2同步), leader掛了?此時任何 follower 都有可能變成新的 leader, producer 端會得到返回異常,producer 端會重新發送數據,但這樣數據可能會重複(但不會丟失), 暫不考慮數據重複的情況。
  • 大數據分析平臺解析:什麼是Apache Spark?
    典型的例子是,幾乎50行MapReduce代碼在文檔中計算單詞的數量在可以減少到只有幾行代碼(這裡顯示在Scala中):通過提供與Python和R等數據分析流行語言的綁定,以及對企業更友好的Java和Scala的綁定,Apache Spark使應用程式開發人員和數據科學家以可訪問的方式利用其可擴展性和速度。
  • 阿里搶拓新賽道 物聯網跨入新時代
    管理平臺;提供橫跨雲、邊、端多個維度的計算服務和AI能力實現物的實時決策和自主協作,將物聯網真正推向智聯網。  阿里巴巴作為行業聚合者和使能者,通過深入「雲-管-邊-端」側,全面布局物聯網新賽道:支持2/3/4G、LoRa、NB-IoT、eMTC等95%的通信協議;通過物聯網作業系統AliOSThings、IoT邊緣計算產品協助開發者快速接入阿里雲IoT管理平臺;提供橫跨雲、邊、端多個維度的計算服務和AI能力實現物的實時決策和自主協作,將物聯網真正推向智聯網。
  • 物聯網通信協議全解析
    通信對物聯網來說十分常用且關鍵,無論是近距離無線傳輸技術還是移動通信技術,都影響著物聯網的發展。而在通信中,通信協議尤其重要,是雙方實體完成通信或服務所必須遵循的規則和約定。本文介紹了幾個可用的物聯網通信協議,它們具有不同的性能、數據速率、覆蓋範圍、功率和內存,而且每一種協議都有各自的優點和或多或少的缺點。其中一些通信協議只適合小型家用電器,而其他一些通信協議則可以用於大型智慧城市項目。
  • Talking Data 閻志濤:流式大數據和即時交互分析技術
    ,在12月12日全天圍繞存儲新風口–第二存儲;HPC、Big data、AI 三大引擎交相輝映;軟體定義,全棧軟體賦能從核心到邊緣;流式大數據和即時交互式分析技術;區塊鏈技術與應用;快閃記憶體產業國際合作等12個熱點話題展開分享,成為了解最新存儲技術、數據創新知識的最佳場所。
  • 玩轉物聯網之MQTT
    物聯網概述物聯網——即Internet-of-Things,其實這個概念由來已久,簡單來講,物聯網是物與物、人與物之間的信息傳遞與控制簡稱。它和能源、電子信息、醫療、交通、零售、物流、工業製造等行業息息相關。要實現物聯網,首先需要將具備信息感知和通信能力的設備嵌入到我們關心的物品中,使其能連接到網際網路或企業網絡之上,實現互聯互通。
  • JetLinks 物聯網基礎平臺 1.8 RELEASE 發布
    JetLinks 開源物聯網平臺JetLinks 基於Java8,Spring Boot 2.x,WebFlux,Netty,Vert.x,Reactor等開發, 是一個開箱即用,可二次開發的企業級物聯網基礎平臺
  • 物聯網平臺獲得成功的主要戰略指標有哪些?PTC物聯網戰略的優勢是...
    物聯網(IoT)是當前科技和文化環境的自然演變,物聯網的影響將波及全世界幾乎所有行業和國家。物聯網平臺是這一龐大生態系統的核心,在物聯網終端與採集自終端的數據最終所駐留的存儲庫之間發揮著中間件的作用。
  • Apache RocketMQ 4.4.0 發布
    萬眾期待的 4.4.0 版本終於在昨天成功發布,值得關注的新特性包括權限控制(ACL)和消息軌跡(Msg Trace)。下面大家解讀該版本引入的這兩大特性。
  • 深入對比數據科學工具箱: SparkR vs Sparklyr
    SparkR 和 Sparklyr 是兩個基於 Spark 的R語言接口,通過簡單的語法深度集成到R語言生態中。