java storm是幹什麼的_storm - CSDN

2020-11-24 CSDN技術社區

問題導讀
1.Storm拓撲包含哪些基本元素?
2.如何描述單詞計數拓撲數據流?
3.典型的Bolt執行哪些功能?
4.什麼是Storm流分組?


本章,主要介紹使用storm開發分布式流處理應用的基本概念。我們將構建一個統計持續流動的句子中單詞個數的簡單應用。通過本章的學習,你將了解到設計一個複雜流計算系統所學需要的多種結構,技術和模式。

我們將首先介紹Storm的數據結構,接下來實現一個完全成熟的Storm應用的各個組件。本章結束,你將基本了解Storm計算結構,搭建開發環境,掌握開發和調試storm應用程式的基本技術。

本章包括以下主題:
·Storm的基本結構——topologies, streams, spouts, and bolts
·建立Storm的開發環境
·實現一個基本的單詞計數應用程式
·並行化和容錯
·並行計算任務擴展

介紹的Storm拓撲的基本元素——streams, spouts, and bolts

在Storm中,分布式計算的結構被稱為一個拓撲,它由流數據,Spouts(流生產者),以及Bolt(操作)組成。Storm拓撲大致類似於批處理作業,例如Hadoop處理系統等。然而,批作業都清楚定義了任務開始和結束點,Strom拓撲確一直運行下去,直到顯式地kill或解除部署。

Streams

Storm的核心數據結構是元組。元組是一個簡單的命名值列表(鍵-值對),流是一個無界元組序列。如果你熟悉複雜事件處理(CEP),你可以把Storm元組看作是事件。

Spouts

Spout是storm拓撲的主要數據入口點。Spout像適配器一樣連接到一個源的數據,將數據轉換為元組,發然後發射出一連串的元組。

正如您了解的,Storm提供了一個簡單的API實現Spout。開發一個Spout主要是編寫代碼從原始源或API消費數據。主要的數據來源包括:
·web網站或行動應用程式的點擊流
·Twitter或其他社交網絡輸入
·傳感器輸出
·應用程式日誌事件

因為Spout通常不實現任何特定的業務邏輯,他們常常可以被多個拓撲重用。

Bolts

Bolts可以被認為是運算操作或函數。它可以任意數量的流作為輸入,處理數據,並可選地發出一個或多個流。Bolt可以從Spout或其他bolt訂閱流,使它可以形成一個複雜的網絡流的轉換。

像Spout API一樣,Bolts可以執行任何形式的處理,而且bolt的接口簡單直接。典型的Bolt執行的功能包括:
·過濾元組
·連接和聚合
·計算
·資料庫讀/寫

介紹的單詞計數拓撲數據流

我們的單詞計數拓撲(下圖中所示)將由一個Spout接著三個bolt組成。

Sentence spout

SentenceSpout類只會發出一連串的單值元組,名字為「sentence」和一個字符串值(一個句子),像下面的代碼:

  1. { "sentence":"my dog has fleas" }
複製代碼

為簡單起見,我們的數據的來源將是一個不變的句子列表,我們遍歷這些句子,發射出每個句子的元組。在真實的應用程式中,一個Spout通常連接到一個動態數據源,如從Twitter API查詢得到的微博。

分詞bolt

分割句子bolt將訂閱句子spout的元組流。對收到的每個元組,它將查找「句子」對象的值,然後分割成單詞,每個單詞發射出一個元組:

  1. { "word" : "my" }
  2. { "word" : "dog" }
  3. { "word" : "has" }
  4. { "word" : "fleas" }
複製代碼

單詞統計bolt

單詞統計Spout訂閱SplitSentenceBolt類的輸出,持續對它收到的特定詞記數。每當它收到元組,它將增加與單詞相關聯計數器,並發出當前這個詞和當前記數:

  1. { "word" : "dog", "count" : 5 }
複製代碼

報告 bolt

該報告bolt訂閱WordCountBolt類的輸出並維護一個表包含所有單詞和相應的數量,就像WordCountBolt一樣。當它收到一個元組,它更新表並將內容列印到控制臺。

實現單詞統計拓撲

前面我們已經介紹了基本的Storm概念,接下來我們將開發一個簡單的應用程式。現在,我們在本地模式下開發和運行Storm拓撲。Storm的本地模式是在一個JVM實例中模擬Storm集群,便於在本地開發環境或IDE中開發和調試Storm拓撲。在後面的章節中,我們將向您展示如何在本地模式下開發Storm拓撲並部署到完全分布式集群環境中。

建立開發環境

創建一個新的Storm項目只是把Storm庫和其依賴添加到Java類路徑中。然而,當您將學完第二章--storm集群配置,你可將Strom拓撲和你編譯環境需要特殊的包部署到集群中。因此,強烈建議您使用一個構建管理工具,比如Apache Maven,Gradle或Leinengen。在分布式單詞記數的例子中,我們將使用Maven。

首先我們創建一個maven項目:

  1. $ mvn archetype:create -DgroupId=storm.blueprints
  2. -DartifactId=Chapter1
  3. -DpackageName=storm.blueprints.chapter1.v1
複製代碼

接下來, 編輯pom.xml文件並添加Storm依賴:

  1. <dependency>
  2. <groupId>org.apache.storm</groupId>
  3. <artifactId>storm-core</artifactId>
  4. <version>0.9.1-incubating</version>
  5. </dependency>
複製代碼

然後,使用以下命令通過構建項目測試Maven配置:



Maven將下載Storm及其所有依賴項。項目已經建立,我們現在就開始寫我們的Storm應用程式。

實現sentence spout

To keep things simple, our SentenceSpout implementation will simulate a data source by creating a static list of sentences that gets iterated. Each sentence is emitted as a single field tuple. The complete spout implementation is listed in Example 1.1.

為簡單起見,我們的SentenceSpout實現模擬數據源創建一個靜態的句子迭代列表。每一個發出句子作為一個元組。例子1.1給出完整的Spout實現。

Example 1.1: SentenceSpout.java
  1. public class SentenceSpout extends BaseRichSpout {
  2.     private SpoutOutputCollector collector;
  3.     private String[] sentences = {
  4.         "my dog has fleas",
  5.         "i like cold beverages",
  6.         "the dog ate my homework",
  7.         "don't have a cow man",
  8.         "i don't think i like fleas"
  9.     };
  10.     private int index = 0;

  11.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  12.         declarer.declare(new Fields("sentence"));
  13.     }

  14.     public void open(Map config, TopologyContext
  15.             context, SpoutOutputCollector collector) {
  16.         this.collector = collector;
  17.     }
  18.     public void nextTuple() {
  19.         this.collector.emit(new Values(sentences[index]));
  20.         index++;
  21.         if (index >= sentences.length) {
  22.             index = 0;
  23.         }
  24.         Utils.sleep(1);
  25.     }
  26. }
複製代碼

BaseRichSpout類是一個方便的類,它實現了ISpout和IComponent接口並提供默認的在本例中我們不需要的方法。使用這個類,我們需只專注於我們所需要的方法。

declareOutputFields()方法是Storm IComponent接口中定義的接口,所有的Storm組件(包括Spout和bolt)必須實現該方法,它用於告訴Storm流組件將會發出的每個流的元組將包含的欄位。在這種情況下,我們定義的spout將發射一個包含一個欄位(「sentence」)的單一(默認)的元組流。

open()方法中是ISpout中定義的接口,在Spout組件初始化時被調用。open()方法接受三個參數:一個包含Storm配置的Map,一個TopologyContext對象,它提供了關於組件在一個拓撲中的上下文信息,和SpoutOutputCollector對象提供發射元組的方法。在這個例子中,我們不需要執行初始化,因此,open()實現簡單的存儲在一個實例變量the SpoutOutputCollector對象的引用。

nextTuple()方法是任何Spout實現的核心。Storm調用這個方法來請求Spout OutputCollector來發出輸出元組。在這裡,我們只是發出句子的當前索引並增加該索引。

實現split sentence bolt

The SplitSentenceBolt 的實現見Example 1.2.

Example 1.2 – SplitSentenceBolt.java
  1. public class SplitSentenceBolt extends BaseRichBolt {
  2.     private OutputCollector collector;
  3.     public void prepare(Map config, TopologyContext
  4.             context, OutputCollector collector) {
  5.         this.collector = collector;
  6.     }

  7.     public void execute(Tuple tuple) {
  8.         String sentence = tuple.getStringByField("sentence");
  9.         String[] words = sentence.split(" ");
  10.         for(String word : words){
  11.             this.collector.emit(new Values(word));
  12.         }
  13.     }
  14.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  15.         declarer.declare(new Fields("word"));
  16.     }
  17. }
複製代碼

BaseRichBolt類是另一個便利類,它實現IComponent和IBolt接口。擴展這個類使我們不必實現我們不關心的方法,讓我們專注於我們所需要的功能。

IBolt接口中的prepare()方法類似於ISpout 的open()方法。這裡一般完成在blot的初始化時的資源初始化,比如資料庫連接。像SentenceSpout類一樣,SplitSentenceBolt類不需要太多的初始化,所以prepare()方法只保存OutputCollector對象的引用。

在declareOutputFields()方法中,SplitSentenceBolt類定義一個元組流,每個包含一個欄位(「word」)。

SplitSentenceBolt核心功能是在類IBolt定義execute()方法。調用此方法每次Bolt從流接收一個訂閱的元組。在這種情況下,它在收到的元組中查找「sentence」的值,並將該值拆分成單個的詞,然後按單詞發出新的tuple。

實現word count bolt

WordCountBolt類(Example 1.3)是拓撲組件,實際上是維護了單詞數。在bolt的prepare()方法中,我們實例化一個實例HashMap,將存儲所有單詞和相應的數量。最常見的做法在prepare()方法中來實例化實例變量的。這種模式背後的原因在於部署拓撲時,其組件spout和bolt是在網絡上發送的序列化的實例變量。如果spout或bolt有任何non-serializable實例變量在序列化之前被實例化(例如,在構造函數中創建)將拋出NotSerializableException並且拓撲將無法發布。在這種情況下,因為HashMap 是可序列化的,我們可以安全地在構造函數中實例化它。然而,一般來說,最好是限制構造函數參數為原始和可序列化的對象,如果是non-serializable對象,則應在prepare()方法中實例化。

在declareOutputFields()方法,WordCountBolt類聲明一個元組的流,將包含收到這個詞和相應的計數。在execute()方法中,我們查找的收到的單詞的計數(如果不存在,初始化為0),然後增加計數並存儲,發出一個新的詞和當前計數組成的二元組。發射計數作為流允許拓撲的其他bolt訂閱和執行額外的處理。

Example 1.3 –WordCountBolt.java
  1. public class WordCountBolt extends BaseRichBolt {
  2.     private OutputCollector collector;
  3.     private HashMap<String, Long> counts = null;
  4.     public void prepare(Map config, TopologyContext
  5.             context, OutputCollector collector) {
  6.         this.collector = collector;
  7.         this.counts = new HashMap<String, Long>();
  8.     }

  9.     public void execute(Tuple tuple) {
  10.         String word = tuple.getStringByField("word");
  11.         Long count = this.counts.get(word);
  12.         if(count == null){
  13.             count = 0L;
  14.         }
  15.         count++;
  16.         this.counts.put(word, count);
  17.         this.collector.emit(new Values(word, count));
  18.     }

  19.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  20.         declarer.declare(new Fields("word", "count"));
  21.     }
  22. }
複製代碼

實現report bolt

ReportBolt類的目的是產生每個單詞的報告。像WordCountBolt類一樣,它使用一個HashMap對象來記錄數量,但在這種情況下,它只是存儲收到counter bolt的數字。

到目前為止,report bolt與其他bolt之間的一個區別它是一個終止bolt,它只接收元組。因為它不會發出任何流,所以declareOutputFields()方法是空的。

report bolt也介實現了了IBolt中定義的接口cleanup()方法。Storm在bolt即將關閉時調用這個方法。我們利用cleanup()方法以一個方便的方式在拓撲關閉時輸出最後計數。但通常情況下,cleanup()方法用於釋放資源的bolt,如打開的文件或資料庫連接。

一個重要的事情一定要記住關於IBolt.cleanup()方法是沒有保證的,當Storm拓撲在當一個集群上運行。在下一行我們談論Storm的容錯機制我們將討論背後的原因。但是對於本例,我們在開發模式下運行cleanup()方法是保證運行的。

ReportBolt類的完整原始碼見Example 1.4.

Example 1.4 – ReportBolt.java
  1. public class ReportBolt extends BaseRichBolt {
  2.     private HashMap<String, Long> counts = null;

  3.     public void prepare(Map config, TopologyContext context, OutputCollector collector) {
  4.         this.counts = new HashMap<String, Long>();
  5.     }
  6.     public void execute(Tuple tuple) {
  7.         String word = tuple.getStringByField("word");
  8.         Long count = tuple.getLongByField("count");
  9.         this.counts.put(word, count);
  10.     }

  11.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  12.        // this bolt does not emit anything
  13.     }
  14. public void cleanup() {
  15.         System.out.println("--- FINAL COUNTS ---");
  16.         List<String> keys = new ArrayList<String>();
  17.         keys.addAll(this.counts.keySet());
  18.         Collections.sort(keys);
  19.         for (String key : keys) {
  20.             System.out.println(key + " : " + this.counts.get(key));
  21.         }
  22.         System.out.println("--------------");
  23.     }

  24. }
複製代碼


    實現word count topology

既然我們已經定義了Spout和botl完成我們的計算,我們準備集成在一起成形成一個可運行拓撲(參考Example 1.5)。

Example 1.5 –WordCountTopology.java
  1. public class WordCountTopology {

  2.     private static final String SENTENCE_SPOUT_ID = "sentence-spout";
  3.     private static final String SPLIT_BOLT_ID = "split-bolt";
  4.     private static final String COUNT_BOLT_ID = "count-bolt";
  5.     private static final String REPORT_BOLT_ID = "report-bolt";
  6.     private static final String TOPOLOGY_NAME = "word-count-topology";

  7.     public static void main(String[] args) throws
  8.             Exception {
  9.         SentenceSpout spout = new SentenceSpout();
  10.         SplitSentenceBolt splitBolt = new
  11.                 SplitSentenceBolt();
  12.         WordCountBolt countBolt = new WordCountBolt();
  13.         ReportBolt reportBolt = new ReportBolt();

  14.         TopologyBuilder builder = new TopologyBuilder();
  15.         builder.setSpout(SENTENCE_SPOUT_ID, spout);
  16.         // SentenceSpout --> SplitSentenceBolt
  17.         builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
  18.         // SplitSentenceBolt --> WordCountBolt
  19.         builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(
  20.                 SPLIT_BOLT_ID, new Fields("word"));
  21.         // WordCountBolt --> ReportBolt
  22.         builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
  23.         Config config = new Config();
  24.         LocalCluster cluster = new LocalCluster();
  25.         cluster.submitTopology(TOPOLOGY_NAME, config,
  26.                 builder.createTopology());
  27.         Utils.sleep(10000);
  28.         cluster.killTopology(TOPOLOGY_NAME);
  29.         cluster.shutdown();
  30.     }
  31. }
複製代碼

Storm拓撲通常在Java main()方法定義和運行(或提交如果拓撲被部署到集群)。在這個例子中,我們首先定義字符串常量,這將作為我們的唯一標識Storm組件。在main()方法開始實例化我們的spout和bolts並創建了一個TopologyBuilder實例。TopologyBuilder類提供了流-style API定義組件之間的數據流的拓撲。我們註冊這個sentence spout並給它分配一個惟一的ID:

  1. builder.setSpout(SENTENCE_SPOUT_ID, spout);
複製代碼

下一步是註冊SplitSentenceBolt並建立一個訂閱SentenceSpout發出的流類:

  1. builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
複製代碼

setBolt()方法會註冊一個bolt給TopologyBuilder類並返回一個實例BoltDeclarer,它為bolt暴露了定義輸入源方法。這裡我們通過定義的shuffleGrouping()方法為SentenceSpout和惟一的ID對象建立關係。shuffleGrouping()方法告訴Storm 混排SentenceSpout類發出的元組和均勻分發它們給SplitSentenceBolt對象的之一實例。我們稍後將詳細解釋流分組並討論Storm的並行性。

下一行建立SplitSentenceBolt類和theWordCountBolt類之間的連接:

  1. builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
複製代碼

您將了解,有些時候包含某些數據的元組必須路由到一個特定的實例。在這裡,我們使用BoltDeclarer類的fieldsGrouping ()方法,以確保所有元組包含相同的「單詞」值路由到同一個WordCountBolt實例。

最後一步,我們把WordCountBolt實例定義的數據流的元組發到ReportBolt類實的例。在這種情況下,我們希望WordCountBolt發出的所有元組路由到一個ReportBolt的任務。這種行為由globalGrouping()方法完成,如下:

  1. builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
複製代碼

與我們的數據流定義一樣,運行我們的單詞計算的最後一步是建立拓撲,並提交到集群中:

  1. Config config = new Config();
  2. LocalCluster cluster = new LocalCluster();
  3. cluster.submitTopology(TOPOLOGY_NAME, config,
  4.         builder.createTopology());
  5. Utils.sleep(10000);
  6. cluster.killTopology(TOPOLOGY_NAME);
  7. cluster.shutdown();
複製代碼

這裡,我們在本地模式下運行Storm,在我們本地開發環境使用Storm LocalCluster類來模擬一個完整的Storm集群。storm本地模式是一種方便的方式來開發和測試應用程式,在部署到分布式集群前。本地模式還允許您在IDE內運行Storm拓撲,設置斷點,暫停執行,檢查變量和分析應用程式找出性能瓶頸,這些是Storm集群說做不到的。

在本例中,我們創建一個LocalCluster實例並調用具有拓撲名稱的submitTopology()方法,它是backtype.storm.Config實例。TopologyBuilder類的createTopology()方法返回的Topology對象。在下一章,您將看到submitTopology()方法用於在本地部署拓撲模式相同的籤名方法也可在部署拓撲到遠程(分布式)模式。

Storm的Config類僅僅是HashMap的之列,它定義了一系列配置Storm拓撲的運行時行為具體常量和方便的方法。當提交一個拓撲時,Storm將合併其預定義的默認配置值和Congif實例的內容傳遞給submitTopology()方法,並將結果分別傳遞給拓撲的spout的open()和bolt的prepare()方法。在這個意義上,配置參數的配置對象表示一組全局拓撲中的所有組件。

我們現在將好運行WordCountTopology類。main()方法將提交拓撲,等待它運行十秒後,殺死(取消)拓撲,最後關閉本地集群。當程序運行完成後,您應該在控制臺看到輸出類似如下信息:

  1. --- FINAL COUNTS ---
  2. a : 1426
  3. ate : 1426
  4. beverages : 1426
  5. cold : 1426
  6. cow : 1426
  7. dog : 2852
  8. don't : 2851
  9. fleas : 2851
  10. has : 1426
  11. have : 1426
  12. homework : 1426
  13. i : 4276
  14. like : 2851
  15. man : 1426
  16. my : 2852
  17. the : 1426
  18. think : 1425
  19. --------------
複製代碼

Storm並行度

前面介紹到,Storm允許計算水平擴展到多臺機器,將計算劃分為多個獨立的任務在集群上並行執行。在storm中,任務只是在集群中運行的一個Spout的bolt實例。

理解並行性是如何工作的,我們必須首先解釋一個Stormn集群拓撲參與執行的四個主要組件: ·Nodes(機器):這些只是配置為Storm集群參與執行拓撲的部分的機器。Storm集群包含一個或多個節點來完成工作。 ·Workers(JVM):這些是在一個節點上運行獨立的JVM進程。每個節點配置一個或更多運行的worker。一個拓撲可以請求一個或更多的worker分配給它。 ·Executors(線程):這些是worker運行在JVM進程一個Java線程。多個任務可以分配給一個Executor。除非顯式重寫,Storm將分配一個任務給一個Executor。 ·Tasks(Spout/Bolt實例):任務是Spout和bolt的實例,在executor線程中運行nextTuple()和executre()方法。

WordCountTopology並行性

到目前為止,在我們的單詞計數的例子中,我們沒有顯式地使用任何Storm的並行api;相反,我們允許Storm使用其默認設置。在大多數情況下,除非覆蓋,Storm將默認使用最大並行性設置。

改變拓撲結構的並行設置之前,讓我們考慮拓撲在默認設置下是如何將執行的。假設我們有一臺機器(節點),指定一個worker的拓撲,並允許Storm每一個任務一個executor執行,執行我們的拓撲,將會如下:

正如您可以看到的,並行性只有線程級別。每個任務運行在一個JVM的一個單獨的線程內。我們怎樣才能利用我們手頭的硬體更有效地提高並行性?讓我們開始通過增加worker和executors的數量來運行我們的拓撲。

在拓撲中增加worker

分配額外的worker是一個增加拓撲計算能力的一種簡單方法,Storm提供了通過其API或純粹配置來更改這兩種方式。無論我們選擇哪一種方法,我們的組件上Spout的和bolt沒有改變,並且可以重複使用。

以前版本的字數統計拓撲中,我們介紹了配置對象,在部署時傳遞到submitTopology()方法,但它基本上未使用。增加分配給一個拓撲中worker的數量,我們只是調用Config對象的setNumWorkers()方法:

  1. Config config = new Config();
  2. config.setNumWorkers(2);
複製代碼

這個分配兩個Worker的拓撲結構而不是默認的。這將計算資源添加到我們的拓撲中,為了有效地利用這些資源,我們也會想調整executors的數量和我們的拓撲每個executor的task數量。

配置executor數和task數

正如我們所看到的,默認情況下,在一個拓撲定義時Storm為每個組件創建一個單一的任務,為每個任務分配一個executor。Storm的並行API提供了修改這種行為的方式,允許您設置的每個組件的executor數和每個exexutor的task數量。

executor的數量分配到一個給定的組件是通過修改配置當定義一個流分組並行性時。為了說明這個特性,讓我們 修改我們的拓撲SentenceSpout並行度分配兩個任務,每個任務分配自己的executor線程:

  1. builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
複製代碼

如果我們使用一個worker,執行我們的拓撲現在看起來像下面的樣子:

接下來,我們將設置分割句子bolt為兩個有四個task的executor執行。每個executor線程將被指派兩個任務執行(4 / 2 = 2)。我們還將配置字數統計bolt運行四個任務,每個都有自己的執行線程:

  1. builder.setBolt(SPLIT_BOLT_ID, splitBolt, 2).setNumTasks(4)
  2.     .shuffleGrouping(SENTENCE_SPOUT_ID);
  3. builder.setBolt(COUNT_BOLT_ID, countBolt, 4)
  4.     .fieldsGrouping(SPLIT_BOLT_ID, newFields("word"));
複製代碼

有兩個worker,拓撲的執行將看起來像下面的圖:

拓撲結構的並行性增加,運行更新的WordCountTopology類為每個單詞產生了更高的總數量:

  1. --- FINAL COUNTS ---
  2. a : 2726
  3. ate : 2722
  4. beverages : 2723
  5. cold : 2723
  6. cow : 2726
  7. dog : 5445
  8. don't : 5444
  9. fleas : 5451
  10. has : 2723
  11. have : 2722
  12. homework : 2722
  13. i : 8175
  14. like : 5449
  15. man : 2722
  16. my : 5445
  17. the : 2727
  18. think : 2722
  19. --------------
複製代碼

因為Spout無限發出數據,直到topology被kill,實際的數量將取決於您的計算機的速度和其他什麼進程運行它,但是你應該看到一個總體增加的發射和處理數量。

重要的是要指出,增加woker的數量並不會影響一個拓撲在本地模式下運行。一個拓撲在本地模式下運行總是運行在一個單獨的JVM進程,所以只有任務和executro並行設置才會有影響。Storm的本地模式提供一個近似的集群行為在你測試在一個真正的應用程式集群生產環境之前對開發是很有用的。

Storm流分組

基於前面的例子,你可能想知道為什麼我們不增加ReportBolt的並行性?答案是,它沒有任何意義。要理解為什麼,你需要理解Storm流分組的概念。

流分組定義了在一個拓撲中一個流的元組是如何分布在bolt的任務的。例如,在並行版本的字數統計拓撲,在拓撲的SplitSentenceBolt類被分配了四個任務。流分組決定哪一個任務將獲得哪一個給定的元組。

Storm定義了7個內置流分組: ·隨機分組:隨機分配整個目標bolt的任務,這樣每個元組bolt接收同等數量的元組。 ·欄位分組:該元組基於分組欄位中指定的值路由bolt任務。例如,如果一個流組合「word」欄位,「word」相同的元組值欄位將總是被路由到相同的bolt的任務。 ·All分組:這複製bolt任務所有的元組,每個任務將獲得元組的一個副本。 ·Global分組:這個把所有元組路由到一個任務中,選擇最低的任務任務ID值。注意,設置bolt的並行性或任務的數量在使用全球分組是沒有意義的,因為所有元組將被路由到相同的bolt的任務。Global分組應謹慎使用,因為它將所有元組路由到一個JVM實例,可能造成在特定JVM/機器在一個集群中形成擁塞。 · None分組:None分組的功能相當於隨機分組。它被保留以供將來使用。 ·直接分組:直接分組,源決定哪個組件將接收一個給定的元組通過調用emitDirect()方法。它只能用於定義直接流。 · Local or shuffle grouping: 本地或隨機分組類似於隨機分組,但把元組shuffle到bolt任務運行在相同的工作進程中,如果可以。否則,它將退回到隨機分組的行為。根據拓撲結構的並行性,本地或隨機分組可以通過限制網絡提高拓撲傳輸性能。

除了預定義的分組,您可以定義您自己的流分組通過實現CustomStreamGrouping接口:

  1. public interface CustomStreamGrouping extends Serializable {
  2.     void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);
  3.     List<Integer> chooseTasks(int taskId, List<Object> values);
  4. }
複製代碼

prepare()方法在運行時被調用,它初始化分組信息分組的實現可以使用它來決定元組元組怎樣被任務說接受。WorkerTopologyContext對象提供關於拓撲的上下文信息,和GlobalStreamId對象提供元數據流 分組。最有用的參數是targetTasks,它是分組需要考慮所有任務標識符列表。你通常會將targetTasksparameter作為一個實例變量引用存儲在chooseTasks()方法的實現中。

chooseTasks()方法返回一個應發送的元組任務標識符列表的。它的參數是發出元組組件的任務標識符和元組的值。

為了說明流分組的重要性,讓我們引入一個錯誤拓撲。先修改SentenceSpout 的nextTuple()方法,它只發出每個句子一次:

  1. public void nextTuple() {
  2.     if(index < sentences.length){
  3.         this.collector.emit(new Values(sentences[index]));
  4.         index++;
  5.     }
  6.     Utils.waitForMillis(1);
  7. }
複製代碼

現在運行拓撲得到以下輸出:

  1. --- FINAL COUNTS ---
  2. a : 2
  3. ate : 2
  4. beverages : 2
  5. cold : 2
  6. cow : 2
  7. dog : 4
  8. don't : 4
  9. fleas : 4
  10. has : 2
  11. have : 2
  12. homework : 2
  13. i : 6
  14. like : 4
  15. man : 2
  16. my : 4
  17. the : 2
  18. think : 2
  19. --------------
複製代碼

現在修改CountBolt的field分組為參數隨機分組並重新運行拓撲:

  1. builder.setBolt(COUNT_BOLT_ID, countBolt, 4)
  2.     .shuffleGrouping(SPLIT_BOLT_ID);
複製代碼

輸出應該類似於下面:

  1. --- FINAL COUNTS ---
  2. a : 1
  3. ate : 2
  4. beverages : 1
  5. cold : 1
  6. cow : 1
  7. dog : 2
  8. don't : 2
  9. fleas : 1
  10. has : 1
  11. have : 1
  12. homework : 1
  13. i : 3
  14. like : 1
  15. man : 1
  16. my : 1
  17. the : 1
  18. think : 1
  19. --------------
複製代碼

我們計算不正確了,因為CountBolt參數是有狀態:它保留一個計數為每個收到的單詞的。在這種情況下,我們計算的準確性取決於當組件被並行化基於元組的內容分組的能力。引入的錯誤我們將只顯示如果CountBolt參數大於1的並行性。這強調了測試拓撲與各種並行配置的重要性。

Tip

一般來說,你應該避免將狀態信息存儲在一個bolt因為任何時間worker或有其任務重新分配失敗,該信息將丟失。一個解決方案是定期快照的持久性存儲狀態信息,比如資料庫,所以它可以恢復是否重新分配一個任務。

消息處理保證

Storm提供一個API,允許您保證Spout發出的一個元組被完全處理。到目前為止,在我們的示例中,我們不擔心失敗。我們已經看到,Spout流可以分裂和可以生成任意數量的流拓撲結構,根據下bolt的行為。在發生故障時,會發生什麼呢?作為一個例子,考慮一個bolt持久化元組數據信息基於資料庫。我們該如何處理資料庫更新失敗的情況呢?

Spout的可靠性

在Storm中,保證消息處理從Spout開始。Spout支持保證處理需要一種方法來跟蹤發出的元組,如果下遊處理完元組則回發一個元組,或任何元組失敗。子元組可以被認為是任何來自Spout元組的結果元組。看的另一種方法是考慮Spout流(作為一個元組樹的樹幹(下圖所示):

在前面的圖中,實線代表原樹幹spoout發出的元組,虛線代表來自最初的元組的元組。結果圖代表元組樹。保證處理,樹中的每個bolt可以確認(ack)或fail一個元組。如果所有bolt在樹上ack元組來自主幹tuple,spout的ack方法將調用表明消息處理完成。如果任何bolt在樹上明確fail一個元組,或如果處理元組樹超過了超時時間,spout的fail方法將被調用。

Storm的ISpout接口定義了涉及可靠性的三個方法API:nextTuple,ack,fail。

  1. public interface ISpout extends Serializable {
  2.     void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

  3.     void close();
  4.     void nextTuple();
  5.     void ack(Object msgId);
  6.     void fail(Object msgId);
  7. }
複製代碼

正如我們所見過的,當Storm要求spout發出一個元組,它調用nextTuple()方法。實現保證處理的第一步是保證元組分配一個惟一的ID並將該值傳遞給SpoutOutputCollector的emit()方法:

  1. collector.emit(new Values("value1", "value2") ,msgId);
複製代碼

分配tuple消息ID告訴Storm,Spout想接收通知或元組樹完成時如果不能在任何時候如果處理成功,Spout的ack()方法將被調用的消息ID分配給元組。如果處理失敗或超時,Spout的失敗方法將被調用。

bolt可靠性

實現一個bolt,保證消息處理涉及兩個步驟: 1。錨定發射傳入的元組當發射新的元組時。 2。確認或失敗

錨定一個元組意味著我們創造一個傳入的元組和派生的元組之間的聯繫,這樣任何下遊bolt預計參與的元組樹確認tuple,或讓元組失敗,或讓它超時。

你可以錨定一個元組(或元組的列表)通過調用OutputCollector重載方法之一emit:

  1. collector.emit(tuple, new Values(word));
複製代碼

在這裡,我們錨定傳入的元組並發射一個新的元組,下遊bolt應該確認或失敗。另一種形式的emit方法將發出所屬的元組:

  1. collector.emit(new Values(word));
複製代碼

未錨定的元組不參與一個流的可靠性保證。如果一個非錨點元組下遊失敗,它不會導致原始根元組的重發。

成功處理元組後選擇發射新的或派生的元組,一個bolt處理可靠流應該確認輸入的元組:

  1. this.collector.ack(tuple);
複製代碼

如果元組處理失敗,這種情況下,spout必須重發(再)元組,bolt應明確失敗的元組:

  1. this.collector.fail(tuple)
複製代碼

如果元組由於超時或通過一個顯式的調用處理OutputCollector.fail()方法失敗,Spout,最初的元組,發出通知,讓它重發tuple,您在稍後就會看到。

可靠的word count

為了進一步說明可靠性,我們首先加強SentenceSpout類支持保證可靠。它將需要跟蹤所有發出的元組並分配每一個惟一的ID,我們將使用一個HashMap < UUID、Values>對象來存儲未處理完的元組。對於我們發出的每個元組,我們會分配一個唯一的標識符,並將其存儲在我們的未處理完map。當我們收到一個消息的確認,我們將從我們的等待名單刪除元組。失敗,我們將重複該元組:

  1. public class SentenceSpout extends BaseRichSpout {

  2.     private ConcurrentHashMap<UUID, Values> pending;

  3.     private SpoutOutputCollector collector;

  4.     private String[] sentences = {
  5.             "my dog has fleas",
  6.             "i like cold beverages",
  7.             "the dog ate my homework",
  8.             "don't have a cow man",
  9.             "i don't think i like fleas"
  10.     };
  11.     private int index = 0;

  12.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  13.         declarer.declare(new Fields("sentence"));
  14.     }
  15.     public void open(Map config, TopologyContext context,
  16.                      SpoutOutputCollector collector) {
  17.         this.collector = collector;
  18.         this.pending = new ConcurrentHashMap<UUID, Values>();
  19.     }

  20.     public void nextTuple() {
  21.         Values values = new Values(sentences[index]);
  22.         UUID msgId = UUID.randomUUID();
  23.         this.pending.put(msgId, values);
  24.         this.collector.emit(values, msgId);
  25.         index++;
  26.         if (index >= sentences.length) {
  27.             index = 0;
  28.         }
  29.         Utils.sleep(1);
  30.     }
  31.     public void ack(Object msgId) {
  32.         this.pending.remove(msgId);
  33.     }

  34.     public void fail(Object msgId) {
  35.         this.collector.emit(this.pending.get(msgId), msgId);
  36.     }
  37. }
複製代碼

修改bolt提供保證處理簡單涉及錨定出站元組的元組,然後確認收到的元組:

  1. public class ReliableSplitSentenceBolt extends BaseRichBolt {
  2.     private OutputCollector collector;
  3.     public void prepare(Map config, TopologyContext
  4.             context, OutputCollector collector) {
  5.         this.collector = collector;
  6.     }

  7.     public void execute(Tuple tuple) {
  8.         String sentence = tuple.getStringByField("sentence");
  9.         String[] words = sentence.split(" ");
  10.         for(String word : words){
  11.             this.collector.emit(tuple, new Values(word));
  12.         }
  13.         this.collector.ack(tuple);
  14.     }
  15.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  16.         declarer.declare(new Fields("word"));
  17.     }
  18. }
複製代碼

總結

在這一章,我們已經構建了一個簡單的分布式計算應用程式使用Stomr的核心API和並覆蓋很大一部分Storm的特性集,即使沒有安裝Storm和部署一個集群。Storm的本地模式是強大的生產力並易於開發,但要想看到Storm的真正的強大和水平可伸縮性,你需要將應用程式部署到一個真正的集群中。

相關焦點

  • 邦元英語:《The big storm》暴風雨
    The big storm暴風雨適合年齡:5-8歲Orson是一隻非常害怕閃電暴雨的狗狗,今天當他面對庭院裡的狂風呼嘯,傾盆大雨並且夾雜著電閃雷鳴,他會怎麼做呢?01A storm was coming.暴風雨要來了。Orson did not like storms.Orson不喜歡暴風雨。
  • Stormtrooper
    那麼這個到底是個什麼基因呢?業餘時間谷歌了一下,資料非常的少。基本上,擁有者吧啦吧啦說了一堆沒用的話,比如這些個照片一定不是PS,照片像素有點低,他以後會改進拍一些更清晰的。圖片資料轉自: http://thereptilereport.com/mr-stormtrooper-update/http://geckoforums.net/f125-morphs-genetics/106399-2.htm
  • weather the storm一種不錯的表達
    weather是天氣,但同時也做動詞用,表示:(因受風吹、日曬、雨淋等,使)褪色,變色,變形;經受住當船隻遭遇風暴,一般要在原地等待風暴過去以求安全地度過惡劣天氣:I'll stay and weather the storm
  • Tropical storm heading away from the city
    A moped makes its way along a waterlogged street after torrential rain in the city's downtown area brought about by tropical storm
  • Typhoon Hagibis:Japan Deploys 110,000 Rescuers after Worst Storm
    Image copyrightGETTY IMAGESImage captionTyphoon Hagibis was the worst storm to hit the country in decadesSource
  • 動畫電影「鳴鳥不飛 The storm breaks」製作決定!
    動畫電影「鳴鳥不飛 The storm breaks」製作決定!178動漫原創 ▪ 2020-02-17 16:23:07 動畫電影「鳴鳥不飛」2月15日在日本上映之後,隨後官方宣布其續作「鳴鳥不飛 The storm
  • Typhoon Meranti Claims 28 As Yet Another Storm Approaches China
    Now, China's Marine forecasting authorities have issued a new orange alert as yet another storm Typhoon Malakas enters the East China Sea.
  • 流式大數據處理的三種框架:Storm,Spark和Samza(1)
    在storm中,每個都是tuple是不可變數組,對應著固定的鍵值對。
  • 園冶獲獎 | 一等獎作品來自西建大和南林的智慧結晶「Fighting against the storm」
    △ 跳轉連結了解課程第十一屆園冶杯大學生國際競賽主題競賽一等獎作品:Fighting against the storm 裹挾巨浪世界海岸線飽受海浪侵蝕組建團隊的話,首先知曉自己擅長什麼,如果你比較擅長思維邏輯,那就要找自己熟悉的且由一定作圖能力的朋友,這樣在溝通上比較容易,且已形成一定的默契,能比較快的進入角色。當然我們在組隊時是比較幸運的,在開營前一天,遇到一群陌生但「臭味」相投的朋友,共同的朋友朝著同一個方向努力,才有今天的結果。找到一個有意思,感興趣,且能進行下去的競賽主題是非常難的,這中間少不了新苗各位老師的幫助。
  • 什麼是「炸彈旋風」? | 慢速英語
    This is What’s Trending Today…這裡是本期今日熱點節目…… When it comes to weather, it is hard to sound more frightening than to call a storm
  • 2018年阿里巴巴關於Java重要開源項目匯總
    JStorm 可以看作是 storm 的 java 增強版本,除了內核用純java實現外,還包括了thrift、python、facet ui。從架構上看,其本質是一個基於 zk 的分布式調度系統。地址:https://github.com/alibaba/simpleimage11. redis 的 java 客戶端 TedisTedis 是另一個 redis 的 java 客戶端。Tedis 的目標是打造一個可在生產環境直接使用的高可用 Redis 解決方案。
  • java數組刪除重複元素專題及常見問題 - CSDN
    package com.akfucc.zhidao;import java.util.ArrayList;import java.util.Collections;import java.util.Iterator;import java.util.List
  • 什麼是一致性哈希算法
    參考這裡:blog.csdn.net/cywosp/article/details/233971793,一致性哈希的原理由於一般的哈希函數返回一個int(32bit)型的hashCode。因此,可以將該哈希函數能夠返回的hashCode表示成一個範圍為0---(2^32)-1 環。
  • android啟動頁設計專題及常見問題 - CSDN
    轉載請註明出處:http://blog.csdn.net/wangjihuanghun/article/details/63255144啟動頁幾乎成為了每個app的標配,有些商家在啟動頁中增加了開屏廣告以此帶來更多的收入。