問題導讀
1.Storm拓撲包含哪些基本元素?
2.如何描述單詞計數拓撲數據流?
3.典型的Bolt執行哪些功能?
4.什麼是Storm流分組?
本章,主要介紹使用storm開發分布式流處理應用的基本概念。我們將構建一個統計持續流動的句子中單詞個數的簡單應用。通過本章的學習,你將了解到設計一個複雜流計算系統所學需要的多種結構,技術和模式。
我們將首先介紹Storm的數據結構,接下來實現一個完全成熟的Storm應用的各個組件。本章結束,你將基本了解Storm計算結構,搭建開發環境,掌握開發和調試storm應用程式的基本技術。
本章包括以下主題:
·Storm的基本結構——topologies, streams, spouts, and bolts
·建立Storm的開發環境
·實現一個基本的單詞計數應用程式
·並行化和容錯
·並行計算任務擴展
在Storm中,分布式計算的結構被稱為一個拓撲,它由流數據,Spouts(流生產者),以及Bolt(操作)組成。Storm拓撲大致類似於批處理作業,例如Hadoop處理系統等。然而,批作業都清楚定義了任務開始和結束點,Strom拓撲確一直運行下去,直到顯式地kill或解除部署。
StreamsStorm的核心數據結構是元組。元組是一個簡單的命名值列表(鍵-值對),流是一個無界元組序列。如果你熟悉複雜事件處理(CEP),你可以把Storm元組看作是事件。
SpoutsSpout是storm拓撲的主要數據入口點。Spout像適配器一樣連接到一個源的數據,將數據轉換為元組,發然後發射出一連串的元組。
正如您了解的,Storm提供了一個簡單的API實現Spout。開發一個Spout主要是編寫代碼從原始源或API消費數據。主要的數據來源包括:
·web網站或行動應用程式的點擊流
·Twitter或其他社交網絡輸入
·傳感器輸出
·應用程式日誌事件
因為Spout通常不實現任何特定的業務邏輯,他們常常可以被多個拓撲重用。
BoltsBolts可以被認為是運算操作或函數。它可以任意數量的流作為輸入,處理數據,並可選地發出一個或多個流。Bolt可以從Spout或其他bolt訂閱流,使它可以形成一個複雜的網絡流的轉換。
像Spout API一樣,Bolts可以執行任何形式的處理,而且bolt的接口簡單直接。典型的Bolt執行的功能包括:
·過濾元組
·連接和聚合
·計算
·資料庫讀/寫
我們的單詞計數拓撲(下圖中所示)將由一個Spout接著三個bolt組成。
Sentence spoutSentenceSpout類只會發出一連串的單值元組,名字為「sentence」和一個字符串值(一個句子),像下面的代碼:
為簡單起見,我們的數據的來源將是一個不變的句子列表,我們遍歷這些句子,發射出每個句子的元組。在真實的應用程式中,一個Spout通常連接到一個動態數據源,如從Twitter API查詢得到的微博。
分詞bolt分割句子bolt將訂閱句子spout的元組流。對收到的每個元組,它將查找「句子」對象的值,然後分割成單詞,每個單詞發射出一個元組:
單詞統計Spout訂閱SplitSentenceBolt類的輸出,持續對它收到的特定詞記數。每當它收到元組,它將增加與單詞相關聯計數器,並發出當前這個詞和當前記數:
該報告bolt訂閱WordCountBolt類的輸出並維護一個表包含所有單詞和相應的數量,就像WordCountBolt一樣。當它收到一個元組,它更新表並將內容列印到控制臺。
實現單詞統計拓撲前面我們已經介紹了基本的Storm概念,接下來我們將開發一個簡單的應用程式。現在,我們在本地模式下開發和運行Storm拓撲。Storm的本地模式是在一個JVM實例中模擬Storm集群,便於在本地開發環境或IDE中開發和調試Storm拓撲。在後面的章節中,我們將向您展示如何在本地模式下開發Storm拓撲並部署到完全分布式集群環境中。
建立開發環境創建一個新的Storm項目只是把Storm庫和其依賴添加到Java類路徑中。然而,當您將學完第二章--storm集群配置,你可將Strom拓撲和你編譯環境需要特殊的包部署到集群中。因此,強烈建議您使用一個構建管理工具,比如Apache Maven,Gradle或Leinengen。在分布式單詞記數的例子中,我們將使用Maven。
首先我們創建一個maven項目:
接下來, 編輯pom.xml文件並添加Storm依賴:
然後,使用以下命令通過構建項目測試Maven配置:
Maven將下載Storm及其所有依賴項。項目已經建立,我們現在就開始寫我們的Storm應用程式。
實現sentence spoutTo keep things simple, our SentenceSpout implementation will simulate a data source by creating a static list of sentences that gets iterated. Each sentence is emitted as a single field tuple. The complete spout implementation is listed in Example 1.1.
為簡單起見,我們的SentenceSpout實現模擬數據源創建一個靜態的句子迭代列表。每一個發出句子作為一個元組。例子1.1給出完整的Spout實現。
Example 1.1: SentenceSpout.javaBaseRichSpout類是一個方便的類,它實現了ISpout和IComponent接口並提供默認的在本例中我們不需要的方法。使用這個類,我們需只專注於我們所需要的方法。
declareOutputFields()方法是Storm IComponent接口中定義的接口,所有的Storm組件(包括Spout和bolt)必須實現該方法,它用於告訴Storm流組件將會發出的每個流的元組將包含的欄位。在這種情況下,我們定義的spout將發射一個包含一個欄位(「sentence」)的單一(默認)的元組流。
open()方法中是ISpout中定義的接口,在Spout組件初始化時被調用。open()方法接受三個參數:一個包含Storm配置的Map,一個TopologyContext對象,它提供了關於組件在一個拓撲中的上下文信息,和SpoutOutputCollector對象提供發射元組的方法。在這個例子中,我們不需要執行初始化,因此,open()實現簡單的存儲在一個實例變量the SpoutOutputCollector對象的引用。
nextTuple()方法是任何Spout實現的核心。Storm調用這個方法來請求Spout OutputCollector來發出輸出元組。在這裡,我們只是發出句子的當前索引並增加該索引。
實現split sentence boltThe SplitSentenceBolt 的實現見Example 1.2.
Example 1.2 – SplitSentenceBolt.javaBaseRichBolt類是另一個便利類,它實現IComponent和IBolt接口。擴展這個類使我們不必實現我們不關心的方法,讓我們專注於我們所需要的功能。
IBolt接口中的prepare()方法類似於ISpout 的open()方法。這裡一般完成在blot的初始化時的資源初始化,比如資料庫連接。像SentenceSpout類一樣,SplitSentenceBolt類不需要太多的初始化,所以prepare()方法只保存OutputCollector對象的引用。
在declareOutputFields()方法中,SplitSentenceBolt類定義一個元組流,每個包含一個欄位(「word」)。
SplitSentenceBolt核心功能是在類IBolt定義execute()方法。調用此方法每次Bolt從流接收一個訂閱的元組。在這種情況下,它在收到的元組中查找「sentence」的值,並將該值拆分成單個的詞,然後按單詞發出新的tuple。
實現word count boltWordCountBolt類(Example 1.3)是拓撲組件,實際上是維護了單詞數。在bolt的prepare()方法中,我們實例化一個實例HashMap,將存儲所有單詞和相應的數量。最常見的做法在prepare()方法中來實例化實例變量的。這種模式背後的原因在於部署拓撲時,其組件spout和bolt是在網絡上發送的序列化的實例變量。如果spout或bolt有任何non-serializable實例變量在序列化之前被實例化(例如,在構造函數中創建)將拋出NotSerializableException並且拓撲將無法發布。在這種情況下,因為HashMap 是可序列化的,我們可以安全地在構造函數中實例化它。然而,一般來說,最好是限制構造函數參數為原始和可序列化的對象,如果是non-serializable對象,則應在prepare()方法中實例化。
在declareOutputFields()方法,WordCountBolt類聲明一個元組的流,將包含收到這個詞和相應的計數。在execute()方法中,我們查找的收到的單詞的計數(如果不存在,初始化為0),然後增加計數並存儲,發出一個新的詞和當前計數組成的二元組。發射計數作為流允許拓撲的其他bolt訂閱和執行額外的處理。
Example 1.3 –WordCountBolt.javaReportBolt類的目的是產生每個單詞的報告。像WordCountBolt類一樣,它使用一個HashMap對象來記錄數量,但在這種情況下,它只是存儲收到counter bolt的數字。
到目前為止,report bolt與其他bolt之間的一個區別它是一個終止bolt,它只接收元組。因為它不會發出任何流,所以declareOutputFields()方法是空的。
report bolt也介實現了了IBolt中定義的接口cleanup()方法。Storm在bolt即將關閉時調用這個方法。我們利用cleanup()方法以一個方便的方式在拓撲關閉時輸出最後計數。但通常情況下,cleanup()方法用於釋放資源的bolt,如打開的文件或資料庫連接。
一個重要的事情一定要記住關於IBolt.cleanup()方法是沒有保證的,當Storm拓撲在當一個集群上運行。在下一行我們談論Storm的容錯機制我們將討論背後的原因。但是對於本例,我們在開發模式下運行cleanup()方法是保證運行的。
ReportBolt類的完整原始碼見Example 1.4.
Example 1.4 – ReportBolt.java既然我們已經定義了Spout和botl完成我們的計算,我們準備集成在一起成形成一個可運行拓撲(參考Example 1.5)。
Example 1.5 –WordCountTopology.javaStorm拓撲通常在Java main()方法定義和運行(或提交如果拓撲被部署到集群)。在這個例子中,我們首先定義字符串常量,這將作為我們的唯一標識Storm組件。在main()方法開始實例化我們的spout和bolts並創建了一個TopologyBuilder實例。TopologyBuilder類提供了流-style API定義組件之間的數據流的拓撲。我們註冊這個sentence spout並給它分配一個惟一的ID:
下一步是註冊SplitSentenceBolt並建立一個訂閱SentenceSpout發出的流類:
setBolt()方法會註冊一個bolt給TopologyBuilder類並返回一個實例BoltDeclarer,它為bolt暴露了定義輸入源方法。這裡我們通過定義的shuffleGrouping()方法為SentenceSpout和惟一的ID對象建立關係。shuffleGrouping()方法告訴Storm 混排SentenceSpout類發出的元組和均勻分發它們給SplitSentenceBolt對象的之一實例。我們稍後將詳細解釋流分組並討論Storm的並行性。
下一行建立SplitSentenceBolt類和theWordCountBolt類之間的連接:
您將了解,有些時候包含某些數據的元組必須路由到一個特定的實例。在這裡,我們使用BoltDeclarer類的fieldsGrouping ()方法,以確保所有元組包含相同的「單詞」值路由到同一個WordCountBolt實例。
最後一步,我們把WordCountBolt實例定義的數據流的元組發到ReportBolt類實的例。在這種情況下,我們希望WordCountBolt發出的所有元組路由到一個ReportBolt的任務。這種行為由globalGrouping()方法完成,如下:
與我們的數據流定義一樣,運行我們的單詞計算的最後一步是建立拓撲,並提交到集群中:
這裡,我們在本地模式下運行Storm,在我們本地開發環境使用Storm LocalCluster類來模擬一個完整的Storm集群。storm本地模式是一種方便的方式來開發和測試應用程式,在部署到分布式集群前。本地模式還允許您在IDE內運行Storm拓撲,設置斷點,暫停執行,檢查變量和分析應用程式找出性能瓶頸,這些是Storm集群說做不到的。
在本例中,我們創建一個LocalCluster實例並調用具有拓撲名稱的submitTopology()方法,它是backtype.storm.Config實例。TopologyBuilder類的createTopology()方法返回的Topology對象。在下一章,您將看到submitTopology()方法用於在本地部署拓撲模式相同的籤名方法也可在部署拓撲到遠程(分布式)模式。
Storm的Config類僅僅是HashMap的之列,它定義了一系列配置Storm拓撲的運行時行為具體常量和方便的方法。當提交一個拓撲時,Storm將合併其預定義的默認配置值和Congif實例的內容傳遞給submitTopology()方法,並將結果分別傳遞給拓撲的spout的open()和bolt的prepare()方法。在這個意義上,配置參數的配置對象表示一組全局拓撲中的所有組件。
我們現在將好運行WordCountTopology類。main()方法將提交拓撲,等待它運行十秒後,殺死(取消)拓撲,最後關閉本地集群。當程序運行完成後,您應該在控制臺看到輸出類似如下信息:
前面介紹到,Storm允許計算水平擴展到多臺機器,將計算劃分為多個獨立的任務在集群上並行執行。在storm中,任務只是在集群中運行的一個Spout的bolt實例。
理解並行性是如何工作的,我們必須首先解釋一個Stormn集群拓撲參與執行的四個主要組件: ·Nodes(機器):這些只是配置為Storm集群參與執行拓撲的部分的機器。Storm集群包含一個或多個節點來完成工作。 ·Workers(JVM):這些是在一個節點上運行獨立的JVM進程。每個節點配置一個或更多運行的worker。一個拓撲可以請求一個或更多的worker分配給它。 ·Executors(線程):這些是worker運行在JVM進程一個Java線程。多個任務可以分配給一個Executor。除非顯式重寫,Storm將分配一個任務給一個Executor。 ·Tasks(Spout/Bolt實例):任務是Spout和bolt的實例,在executor線程中運行nextTuple()和executre()方法。
WordCountTopology並行性到目前為止,在我們的單詞計數的例子中,我們沒有顯式地使用任何Storm的並行api;相反,我們允許Storm使用其默認設置。在大多數情況下,除非覆蓋,Storm將默認使用最大並行性設置。
改變拓撲結構的並行設置之前,讓我們考慮拓撲在默認設置下是如何將執行的。假設我們有一臺機器(節點),指定一個worker的拓撲,並允許Storm每一個任務一個executor執行,執行我們的拓撲,將會如下:
正如您可以看到的,並行性只有線程級別。每個任務運行在一個JVM的一個單獨的線程內。我們怎樣才能利用我們手頭的硬體更有效地提高並行性?讓我們開始通過增加worker和executors的數量來運行我們的拓撲。
在拓撲中增加worker分配額外的worker是一個增加拓撲計算能力的一種簡單方法,Storm提供了通過其API或純粹配置來更改這兩種方式。無論我們選擇哪一種方法,我們的組件上Spout的和bolt沒有改變,並且可以重複使用。
以前版本的字數統計拓撲中,我們介紹了配置對象,在部署時傳遞到submitTopology()方法,但它基本上未使用。增加分配給一個拓撲中worker的數量,我們只是調用Config對象的setNumWorkers()方法:
這個分配兩個Worker的拓撲結構而不是默認的。這將計算資源添加到我們的拓撲中,為了有效地利用這些資源,我們也會想調整executors的數量和我們的拓撲每個executor的task數量。
配置executor數和task數正如我們所看到的,默認情況下,在一個拓撲定義時Storm為每個組件創建一個單一的任務,為每個任務分配一個executor。Storm的並行API提供了修改這種行為的方式,允許您設置的每個組件的executor數和每個exexutor的task數量。
executor的數量分配到一個給定的組件是通過修改配置當定義一個流分組並行性時。為了說明這個特性,讓我們 修改我們的拓撲SentenceSpout並行度分配兩個任務,每個任務分配自己的executor線程:
如果我們使用一個worker,執行我們的拓撲現在看起來像下面的樣子:
接下來,我們將設置分割句子bolt為兩個有四個task的executor執行。每個executor線程將被指派兩個任務執行(4 / 2 = 2)。我們還將配置字數統計bolt運行四個任務,每個都有自己的執行線程:
有兩個worker,拓撲的執行將看起來像下面的圖:
拓撲結構的並行性增加,運行更新的WordCountTopology類為每個單詞產生了更高的總數量:
因為Spout無限發出數據,直到topology被kill,實際的數量將取決於您的計算機的速度和其他什麼進程運行它,但是你應該看到一個總體增加的發射和處理數量。
重要的是要指出,增加woker的數量並不會影響一個拓撲在本地模式下運行。一個拓撲在本地模式下運行總是運行在一個單獨的JVM進程,所以只有任務和executro並行設置才會有影響。Storm的本地模式提供一個近似的集群行為在你測試在一個真正的應用程式集群生產環境之前對開發是很有用的。
Storm流分組基於前面的例子,你可能想知道為什麼我們不增加ReportBolt的並行性?答案是,它沒有任何意義。要理解為什麼,你需要理解Storm流分組的概念。
流分組定義了在一個拓撲中一個流的元組是如何分布在bolt的任務的。例如,在並行版本的字數統計拓撲,在拓撲的SplitSentenceBolt類被分配了四個任務。流分組決定哪一個任務將獲得哪一個給定的元組。
Storm定義了7個內置流分組: ·隨機分組:隨機分配整個目標bolt的任務,這樣每個元組bolt接收同等數量的元組。 ·欄位分組:該元組基於分組欄位中指定的值路由bolt任務。例如,如果一個流組合「word」欄位,「word」相同的元組值欄位將總是被路由到相同的bolt的任務。 ·All分組:這複製bolt任務所有的元組,每個任務將獲得元組的一個副本。 ·Global分組:這個把所有元組路由到一個任務中,選擇最低的任務任務ID值。注意,設置bolt的並行性或任務的數量在使用全球分組是沒有意義的,因為所有元組將被路由到相同的bolt的任務。Global分組應謹慎使用,因為它將所有元組路由到一個JVM實例,可能造成在特定JVM/機器在一個集群中形成擁塞。 · None分組:None分組的功能相當於隨機分組。它被保留以供將來使用。 ·直接分組:直接分組,源決定哪個組件將接收一個給定的元組通過調用emitDirect()方法。它只能用於定義直接流。 · Local or shuffle grouping: 本地或隨機分組類似於隨機分組,但把元組shuffle到bolt任務運行在相同的工作進程中,如果可以。否則,它將退回到隨機分組的行為。根據拓撲結構的並行性,本地或隨機分組可以通過限制網絡提高拓撲傳輸性能。
除了預定義的分組,您可以定義您自己的流分組通過實現CustomStreamGrouping接口:
prepare()方法在運行時被調用,它初始化分組信息分組的實現可以使用它來決定元組元組怎樣被任務說接受。WorkerTopologyContext對象提供關於拓撲的上下文信息,和GlobalStreamId對象提供元數據流 分組。最有用的參數是targetTasks,它是分組需要考慮所有任務標識符列表。你通常會將targetTasksparameter作為一個實例變量引用存儲在chooseTasks()方法的實現中。
chooseTasks()方法返回一個應發送的元組任務標識符列表的。它的參數是發出元組組件的任務標識符和元組的值。
為了說明流分組的重要性,讓我們引入一個錯誤拓撲。先修改SentenceSpout 的nextTuple()方法,它只發出每個句子一次:
現在運行拓撲得到以下輸出:
現在修改CountBolt的field分組為參數隨機分組並重新運行拓撲:
輸出應該類似於下面:
我們計算不正確了,因為CountBolt參數是有狀態:它保留一個計數為每個收到的單詞的。在這種情況下,我們計算的準確性取決於當組件被並行化基於元組的內容分組的能力。引入的錯誤我們將只顯示如果CountBolt參數大於1的並行性。這強調了測試拓撲與各種並行配置的重要性。
Tip一般來說,你應該避免將狀態信息存儲在一個bolt因為任何時間worker或有其任務重新分配失敗,該信息將丟失。一個解決方案是定期快照的持久性存儲狀態信息,比如資料庫,所以它可以恢復是否重新分配一個任務。
消息處理保證Storm提供一個API,允許您保證Spout發出的一個元組被完全處理。到目前為止,在我們的示例中,我們不擔心失敗。我們已經看到,Spout流可以分裂和可以生成任意數量的流拓撲結構,根據下bolt的行為。在發生故障時,會發生什麼呢?作為一個例子,考慮一個bolt持久化元組數據信息基於資料庫。我們該如何處理資料庫更新失敗的情況呢?
Spout的可靠性在Storm中,保證消息處理從Spout開始。Spout支持保證處理需要一種方法來跟蹤發出的元組,如果下遊處理完元組則回發一個元組,或任何元組失敗。子元組可以被認為是任何來自Spout元組的結果元組。看的另一種方法是考慮Spout流(作為一個元組樹的樹幹(下圖所示):
在前面的圖中,實線代表原樹幹spoout發出的元組,虛線代表來自最初的元組的元組。結果圖代表元組樹。保證處理,樹中的每個bolt可以確認(ack)或fail一個元組。如果所有bolt在樹上ack元組來自主幹tuple,spout的ack方法將調用表明消息處理完成。如果任何bolt在樹上明確fail一個元組,或如果處理元組樹超過了超時時間,spout的fail方法將被調用。
Storm的ISpout接口定義了涉及可靠性的三個方法API:nextTuple,ack,fail。
正如我們所見過的,當Storm要求spout發出一個元組,它調用nextTuple()方法。實現保證處理的第一步是保證元組分配一個惟一的ID並將該值傳遞給SpoutOutputCollector的emit()方法:
分配tuple消息ID告訴Storm,Spout想接收通知或元組樹完成時如果不能在任何時候如果處理成功,Spout的ack()方法將被調用的消息ID分配給元組。如果處理失敗或超時,Spout的失敗方法將被調用。
bolt可靠性實現一個bolt,保證消息處理涉及兩個步驟: 1。錨定發射傳入的元組當發射新的元組時。 2。確認或失敗
錨定一個元組意味著我們創造一個傳入的元組和派生的元組之間的聯繫,這樣任何下遊bolt預計參與的元組樹確認tuple,或讓元組失敗,或讓它超時。
你可以錨定一個元組(或元組的列表)通過調用OutputCollector重載方法之一emit:
在這裡,我們錨定傳入的元組並發射一個新的元組,下遊bolt應該確認或失敗。另一種形式的emit方法將發出所屬的元組:
未錨定的元組不參與一個流的可靠性保證。如果一個非錨點元組下遊失敗,它不會導致原始根元組的重發。
成功處理元組後選擇發射新的或派生的元組,一個bolt處理可靠流應該確認輸入的元組:
如果元組處理失敗,這種情況下,spout必須重發(再)元組,bolt應明確失敗的元組:
如果元組由於超時或通過一個顯式的調用處理OutputCollector.fail()方法失敗,Spout,最初的元組,發出通知,讓它重發tuple,您在稍後就會看到。
可靠的word count為了進一步說明可靠性,我們首先加強SentenceSpout類支持保證可靠。它將需要跟蹤所有發出的元組並分配每一個惟一的ID,我們將使用一個HashMap < UUID、Values>對象來存儲未處理完的元組。對於我們發出的每個元組,我們會分配一個唯一的標識符,並將其存儲在我們的未處理完map。當我們收到一個消息的確認,我們將從我們的等待名單刪除元組。失敗,我們將重複該元組:
修改bolt提供保證處理簡單涉及錨定出站元組的元組,然後確認收到的元組:
在這一章,我們已經構建了一個簡單的分布式計算應用程式使用Stomr的核心API和並覆蓋很大一部分Storm的特性集,即使沒有安裝Storm和部署一個集群。Storm的本地模式是強大的生產力並易於開發,但要想看到Storm的真正的強大和水平可伸縮性,你需要將應用程式部署到一個真正的集群中。