使用Storm實現實時大數據分析!

2020-12-16 CSDN技術社區

簡單和明了,Storm讓大數據分析變得輕鬆加愉快。

當今世界,公司的日常運營經常會生成TB級別的數據。數據來源囊括了網際網路裝置可以捕獲的任何類型數據,網站、社交媒體、交易型商業數據以及其它商業環境中創建的數據。考慮到數據的生成量,實時處理成為了許多機構需要面對的首要挑戰。我們經常用的一個非常有效的開源實時計算工具就是Storm —— Twitter開發,通常被比作「實時的Hadoop」。然而Storm遠比Hadoop來的簡單,因為用它處理大數據不會帶來新老技術的交替。

Shruthi Kumar、Siddharth Patankar共同效力於Infosys,分別從事技術分析和研發工作。本文詳述了Storm的使用方法,例子中的項目名稱為「超速報警系統(Speeding Alert System)」。我們想實現的功能是:實時分析過往車輛的數據,一旦車輛數據超過預設的臨界值 —— 便觸發一個trigger並把相關的數據存入資料庫。

Storm

對比Hadoop的批處理,Storm是個實時的、分布式以及具備高容錯的計算系統。同Hadoop一樣Storm也可以處理大批量的數據,然而Storm在保證高可靠性的前提下還可以讓處理進行的更加實時;也就是說,所有的信息都會被處理。Storm同樣還具備容錯和分布計算這些特性,這就讓Storm可以擴展到不同的機器上進行大批量的數據處理。他同樣還有以下的這些特性:

  • 易於擴展。對於擴展,你只需要添加機器和改變對應的topology(拓撲)設置。Storm使用Hadoop Zookeeper進行集群協調,這樣可以充分的保證大型集群的良好運行。
  • 每條信息的處理都可以得到保證。
  • Storm集群管理簡易。
  • Storm的容錯機能:一旦topology遞交,Storm會一直運行它直到topology被廢除或者被關閉。而在執行中出現錯誤時,也會由Storm重新分配任務。
  • 儘管通常使用Java,Storm中的topology可以用任何語言設計。

當然為了更好的理解文章,你首先需要安裝和設置Storm。需要通過以下幾個簡單的步驟:

  • 從Storm官方下載Storm安裝文件
  • 將bin/directory解壓到你的PATH上,並保證bin/storm腳本是可執行的。

Storm組件

Storm集群主要由一個主節點和一群工作節點(worker node)組成,通過 Zookeeper進行協調。

主節點:

主節點通常運行一個後臺程序 —— Nimbus,用於響應分布在集群中的節點,分配任務和監測故障。這個很類似於Hadoop中的Job Tracker。

工作節點:

工作節點同樣會運行一個後臺程序 —— Supervisor,用於收聽工作指派並基於要求運行工作進程。每個工作節點都是topology中一個子集的實現。而Nimbus和Supervisor之間的協調則通過Zookeeper系統或者集群。

Zookeeper

Zookeeper是完成Supervisor和Nimbus之間協調的服務。而應用程式實現實時的邏輯則被封裝進Storm中的「topology」。topology則是一組由Spouts(數據源)和Bolts(數據操作)通過Stream Groupings進行連接的圖。下面對出現的術語進行更深刻的解析。

Spout:

簡而言之,Spout從來源處讀取數據並放入topology。Spout分成可靠和不可靠兩種;當Storm接收失敗時,可靠的Spout會對tuple(元組,數據項組成的列表)進行重發;而不可靠的Spout不會考慮接收成功與否只發射一次。而Spout中最主要的方法就是nextTuple(),該方法會發射一個新的tuple到topology,如果沒有新tuple發射則會簡單的返回。

Bolt:

Topology中所有的處理都由Bolt完成。Bolt可以完成任何事,比如:連接的過濾、聚合、訪問文件/資料庫、等等。Bolt從Spout中接收數據並進行處理,如果遇到複雜流的處理也可能將tuple發送給另一個Bolt進行處理。而Bolt中最重要的方法是execute(),以新的tuple作為參數接收。不管是Spout還是Bolt,如果將tuple發射成多個流,這些流都可以通過declareStream()來聲明。

Stream Groupings:

Stream Grouping定義了一個流在Bolt任務間該如何被切分。這裡有Storm提供的6個Stream Grouping類型:

1. 隨機分組(Shuffle grouping):隨機分發tuple到Bolt的任務,保證每個任務獲得相等數量的tuple。

2. 欄位分組(Fields grouping):根據指定欄位分割數據流,並分組。例如,根據「user-id」欄位,相同「user-id」的元組總是分發到同一個任務,不同「user-id」的元組可能分發到不同的任務。

3. 全部分組(All grouping):tuple被複製到bolt的所有任務。這種類型需要謹慎使用。

4. 全局分組(Global grouping):全部流都分配到bolt的同一個任務。明確地說,是分配給ID最小的那個task。

5. 無分組(None grouping):你不需要關心流是如何分組。目前,無分組等效於隨機分組。但最終,Storm將把無分組的Bolts放到Bolts或Spouts訂閱它們的同一線程去執行(如果可能)。

6. 直接分組(Direct grouping):這是一個特別的分組類型。元組生產者決定tuple由哪個元組處理者任務接收。

當然還可以實現CustomStreamGroupimg接口來定製自己需要的分組。

項目實施

當下情況我們需要給Spout和Bolt設計一種能夠處理大量數據(日誌文件)的topology,當一個特定數據值超過預設的臨界值時促發警報。使用Storm的topology,逐行讀入日誌文件並且監視輸入數據。在Storm組件方面,Spout負責讀入輸入數據。它不僅從現有的文件中讀入數據,同時還監視著新文件。文件一旦被修改Spout會讀入新的版本並且覆蓋之前的tuple(可以被Bolt讀入的格式),將tuple發射給Bolt進行臨界分析,這樣就可以發現所有可能超臨界的記錄。

下一節將對用例進行詳細介紹。

臨界分析

這一節,將主要聚焦於臨界值的兩種分析類型:瞬間臨界(instant thershold)和時間序列臨界(time series threshold)。

  • 瞬間臨界值監測:一個欄位的值在那個瞬間超過了預設的臨界值,如果條件符合的話則觸發一個trigger。舉個例子當車輛超越80公裡每小時,則觸發trigger。
  • 時間序列臨界監測:欄位的值在一個給定的時間段內超過了預設的臨界值,如果條件符合則觸發一個觸發器。比如:在5分鐘類,時速超過80KM兩次及以上的車輛。

Listing One顯示了我們將使用的一個類型日誌,其中包含的車輛數據信息有:車牌號、車輛行駛的速度以及數據獲取的位置。

AB 123 60 North city
BC 123 70 South city
CD 234 40 South city
DE 123 40 East  city
EF 123 90 South city
GH 123 50 West  city

這裡將創建一個對應的XML文件,這將包含引入數據的模式。這個XML將用於日誌文件的解析。XML的設計模式和對應的說明請見下表。

XML文件和日誌文件都存放在Spout可以隨時監測的目錄下,用以關注文件的實時更新。而這個用例中的topology請見下圖。

Figure 1:Storm中建立的topology,用以實現數據實時處理

如圖所示:FilelistenerSpout接收輸入日誌並進行逐行的讀入,接著將數據發射給ThresoldCalculatorBolt進行更深一步的臨界值處理。一旦處理完成,被計算行的數據將發送給DBWriterBolt,然後由DBWriterBolt存入給資料庫。下面將對這個過程的實現進行詳細的解析。

Spout的實現

Spout以日誌文件和XML描述文件作為接收對象。XML文件包含了與日誌一致的設計模式。不妨設想一下一個示例日誌文件,包含了車輛的車牌號、行駛速度、以及數據的捕獲位置。(看下圖)

Figure2:數據從日誌文件到Spout的流程圖

Listing Two顯示了tuple對應的XML,其中指定了欄位、將日誌文件切割成欄位的定界符以及欄位的類型。XML文件以及數據都被保存到Spout指定的路徑。

Listing Two:用以描述日誌文件的XML文件。

  1. <TUPLEINFO> 
  2. <FIELDLIST> 
  3. <FIELD> 
  4. <COLUMNNAME>vehicle_number</COLUMNNAME> 
  5. <COLUMNTYPE>string</COLUMNTYPE> 
  6. </FIELD> 
  7.  
  8. <FIELD>
  9. <COLUMNNAME>speed</COLUMNNAME> 
  10. <COLUMNTYPE>int</COLUMNTYPE> 
  11. </FIELD> 
  12.  
  13. <FIELD> 
  14. <COLUMNNAME>location</COLUMNNAME> 
  15. <COLUMNTYPE>string</COLUMNTYPE> 
  16. </FIELD> 
  17. </FIELDLIST> 
  18. <DELIMITER>,</DELIMITER> 
  19. </TUPLEINFO>   

通過構造函數及它的參數Directory、PathSpout和TupleInfo對象創建Spout對象。TupleInfo儲存了日誌文件的欄位、定界符、欄位的類型這些很必要的信息。這個對象通過XSTream序列化XML時建立。

Spout的實現步驟:

  • 對文件的改變進行分開的監聽,並監視目錄下有無新日誌文件添加。
  • 在數據得到了欄位的說明後,將其轉換成tuple。
  • 聲明Spout和Bolt之間的分組,並決定tuple發送給Bolt的途徑。

Spout的具體編碼在Listing Three中顯示。

Listing Three:Spout中open、nextTuple和delcareOutputFields方法的邏輯。

  1. public void open( Map conf, TopologyContext context,SpoutOutputCollector collector )   
  2. {   
  3.            _collector = collector;   
  4.          try   
  5.          {   
  6.          fileReader  =  new BufferedReader(new FileReader(new File(file)));  
  7.          }  
  8.          catch (FileNotFoundException e)  
  9.          {  
  10.          System.exit(1);   
  11.          }  
  12. }                                                          
  13.  
  14. public void nextTuple()  
  15. {  
  16.          protected void ListenFile(File file)  
  17.          {  
  18.          Utils.sleep(2000);  
  19.          RandomAccessFile access = null;  
  20.          String line = null;   
  21.             try   
  22.             {  
  23.                 while ((line = access.readLine()) != null)  
  24.                 {  
  25.                     if (line !=null)  
  26.                     {   
  27.                          String[] fields=null;  
  28.                           if (tupleInfo.getDelimiter().equals("|"))  fields = line.split("\\"+tupleInfo.getDelimiter());   
  29.                           else   
  30.                           fields = line.split  (tupleInfo.getDelimiter());   
  31.                           if (tupleInfo.getFieldList().size() == fields.length)  _collector.emit(new Values(fields));  
  32.                     }  
  33.                }  
  34.             }  
  35.             catch (IOException ex){ }  
  36.             }  
  37. }  
  38.  
  39. public void declareOutputFields(OutputFieldsDeclarer declarer)  
  40. {  
  41.       String[] fieldsArr = new String [tupleInfo.getFieldList().size()];  
  42.       for(int i=0; i<tupleInfo.getFieldList().size(); i++)  
  43.       {  
  44.               fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName();  
  45.       }  
  46. declarer.declare(new Fields(fieldsArr));  
  47. }      

declareOutputFileds()決定了tuple發射的格式,這樣的話Bolt就可以用類似的方法將tuple解碼。Spout持續對日誌文件的數據的變更進行監聽,一旦有添加Spout就會進行讀入並且發送給Bolt進行處理。

Bolt的實現

Spout的輸出結果將給予Bolt進行更深一步的處理。經過對用例的思考,我們的topology中需要如Figure 3中的兩個Bolt。

Figure 3:Spout到Bolt的數據流程。

ThresholdCalculatorBolt

Spout將tuple發出,由ThresholdCalculatorBolt接收並進行臨界值處理。在這裡,它將接收好幾項輸入進行檢查;分別是:

臨界值檢查

  • 臨界值欄數檢查(拆分成欄位的數目)
  • 臨界值數據類型(拆分後欄位的類型)
  • 臨界值出現的頻數
  • 臨界值時間段檢查

Listing Four中的類,定義用來保存這些值。

Listing Four:ThresholdInfo類

  1. public class ThresholdInfo implementsSerializable  
  2.  
  3. {    
  4.         private String action;   
  5.         private String rule;   
  6.         private Object thresholdValue;  
  7.         private int thresholdColNumber;   
  8.         private Integer timeWindow;   
  9.         private int frequencyOfOccurence;   
  10. }   

基於欄位中提供的值,臨界值檢查將被Listing Five中的execute()方法執行。代碼大部分的功能是解析和接收值的檢測。

Listing Five:臨界值檢測代碼段

  1. public void execute(Tuple tuple, BasicOutputCollector collector)   
  2. {  
  3.     if(tuple!=null)   
  4.     {  
  5.         List<Object> inputTupleList = (List<Object>) tuple.getValues();  
  6.         int thresholdColNum = thresholdInfo.getThresholdColNumber();   
  7.         Object thresholdValue = thresholdInfo.getThresholdValue();   
  8.         String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();   
  9.         Integer timeWindow = thresholdInfo.getTimeWindow();  
  10.          int frequency = thresholdInfo.getFrequencyOfOccurence();  
  11.          if(thresholdDataType.equalsIgnoreCase("string"))  
  12.          {  
  13.              String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();  
  14.              String frequencyChkOp = thresholdInfo.getAction();  
  15.              if(timeWindow!=null)  
  16.              {  
  17.                  long curTime = System.currentTimeMillis();  
  18.                  long diffInMinutes = (curTime-startTime)/(1000);  
  19.                  if(diffInMinutes>=timeWindow)  
  20.                  {  
  21.                      if(frequencyChkOp.equals("=="))  
  22.                      {  
  23.                           if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
  24.                           {  
  25.                               count.incrementAndGet();  
  26.                               if(count.get() > frequency)  
  27.                                   splitAndEmit(inputTupleList,collector);  
  28.                           }  
  29.                      }  
  30.                      else if(frequencyChkOp.equals("!="))  
  31.                      {  
  32.                          if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
  33.                          {  
  34.                               count.incrementAndGet();  
  35.                               if(count.get() > frequency)  
  36.                                   splitAndEmit(inputTupleList,collector);  
  37.                           }  
  38.                       }  
  39.                       else                         System.out.println("Operator not supported");   
  40.                   }  
  41.               }  
  42.               else 
  43.               {  
  44.                   if(frequencyChkOp.equals("=="))  
  45.                   {  
  46.                       if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
  47.                       {  
  48.                           count.incrementAndGet();  
  49.                           if(count.get() > frequency)  
  50.                               splitAndEmit(inputTupleList,collector);  
  51.                           }  
  52.                   }  
  53.                   else if(frequencyChkOp.equals("!="))  
  54.                   {  
  55.                        if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
  56.                        {  
  57.                            count.incrementAndGet();  
  58.                            if(count.get() > frequency)  
  59.                                splitAndEmit(inputTupleList,collector);  
  60.                           }  
  61.                    }  
  62.                }  
  63.             }  
  64.             else if(thresholdDataType.equalsIgnoreCase("int") ||                     thresholdDataType.equalsIgnoreCase("double") ||                     thresholdDataType.equalsIgnoreCase("float") ||                     thresholdDataType.equalsIgnoreCase("long") ||                     thresholdDataType.equalsIgnoreCase("short"))  
  65.             {  
  66.                 String frequencyChkOp = thresholdInfo.getAction();  
  67.                 if(timeWindow!=null)  
  68.                 {  
  69.                      long valueToCheck =                          Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());  
  70.                      long curTime = System.currentTimeMillis();  
  71.                      long diffInMinutes = (curTime-startTime)/(1000);  
  72.                      System.out.println("Difference in minutes="+diffInMinutes);  
  73.                      if(diffInMinutes>=timeWindow)  
  74.                      {  
  75.                           if(frequencyChkOp.equals("<"))  
  76.                           {  
  77.                               if(valueToCheck < Double.parseDouble(thresholdValue.toString()))  
  78.                               {  
  79.                                    count.incrementAndGet();  
  80.                                    if(count.get() > frequency)  
  81.                                        splitAndEmit(inputTupleList,collector);  
  82.                               }  
  83.                           }  
  84.                           else if(frequencyChkOp.equals(">"))  
  85.                           {  
  86.                                if(valueToCheck > Double.parseDouble(thresholdValue.toString()))  
  87.                                 {  
  88.                                    count.incrementAndGet();  
  89.                                    if(count.get() > frequency)  
  90.                                        splitAndEmit(inputTupleList,collector);  
  91.                                }  
  92.                            }  
  93.                            else if(frequencyChkOp.equals("=="))  
  94.                            {  
  95.                               if(valueToCheck == Double.parseDouble(thresholdValue.toString()))  
  96.                               {  
  97.                                   count.incrementAndGet();  
  98.                                   if(count.get() > frequency)  
  99.                                       splitAndEmit(inputTupleList,collector);  
  100.                                }  
  101.                            }  
  102.                            else if(frequencyChkOp.equals("!="))  
  103.                            {  
  104.     . . .  
  105.                             }  
  106.                        }  
  107.              }  
  108.       else 
  109.           splitAndEmit(null,collector);  
  110.       }  
  111.       else 
  112.      {  
  113.            System.err.println("Emitting null in bolt");  
  114.            splitAndEmit(null,collector);  
  115.     }  

經由Bolt發送的的tuple將會傳遞到下一個對應的Bolt,在我們的用例中是DBWriterBolt。

DBWriterBolt

經過處理的tuple必須被持久化以便於觸發tigger或者更深層次的使用。DBWiterBolt做了這個持久化的工作並把tuple存入了資料庫。表的建立由prepare()函數完成,這也將是topology調用的第一個方法。方法的編碼如Listing Six所示。

Listing Six:建表編碼。

  1. public void prepare( Map StormConf, TopologyContext context )   
  2. {         
  3.     try   
  4.     {  
  5.         Class.forName(dbClass);  
  6.     }   
  7.     catch (ClassNotFoundException e)   
  8.     {  
  9.         System.out.println("Driver not found");  
  10.         e.printStackTrace();  
  11.     }  
  12.    
  13.     try   
  14.     {  
  15.        connection driverManager.getConnection(   
  16.            "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);  
  17.        connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();  
  18.    
  19.        StringBuilder createQuery = new StringBuilder(  
  20.            "CREATE TABLE IF NOT EXISTS "+tableName+"(");  
  21.        for(Field fields : tupleInfo.getFieldList())  
  22.        {  
  23.            if(fields.getColumnType().equalsIgnoreCase("String"))  
  24.                createQuery.append(fields.getColumnName()+" VARCHAR(500),");  
  25.            else 
  26.                createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+",");  
  27.        }  
  28.        createQuery.append("thresholdTimeStamp timestamp)");  
  29.        connection.prepareStatement(createQuery.toString()).execute();  
  30.    
  31.         
  32.        StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"(");  
  33.        String tempCreateQuery = new String();  
  34.        for(Field fields : tupleInfo.getFieldList())  
  35.        {  
  36.             insertQuery.append(fields.getColumnName()+",");  
  37.        }  
  38.        insertQuery.append("thresholdTimeStamp").append(") values (");  
  39.        for(Field fields : tupleInfo.getFieldList())  
  40.        {  
  41.            insertQuery.append("?,");  
  42.        }  
  43.    
  44.        insertQuery.append("?)");  
  45.        prepStatement = connection.prepareStatement(insertQuery.toString());  
  46.     }  
  47.     catch (SQLException e)   
  48.     {         
  49.         e.printStackTrace();  
  50.     }         
  51. }  

數據分批次的插入資料庫。插入的邏輯由Listting Seven中的execute()方法提供。大部分的編碼都是用來實現可能存在不同類型輸入的解析。

Listing Seven:數據插入的代碼部分。

  1. public void execute(Tuple tuple, BasicOutputCollector collector)   
  2. {  
  3.     batchExecuted=false;  
  4.     if(tuple!=null)  
  5.     {  
  6.        List&#60;Object&#62; inputTupleList = (List&#60;Object&#62;) tuple.getValues();  
  7.        int dbIndex=0;  
  8.        for(int i=0;i&#60;tupleInfo.getFieldList().size();i++)  
  9.        {  
  10.            Field field = tupleInfo.getFieldList().get(i);  
  11.            try {  
  12.                dbIndex = i+1;  
  13.                if(field.getColumnType().equalsIgnoreCase("String"))               
  14.                    prepStatement.setString(dbIndex, inputTupleList.get(i).toString());  
  15.                else if(field.getColumnType().equalsIgnoreCase("int"))  
  16.                    prepStatement.setInt(dbIndex,  
  17.                        Integer.parseInt(inputTupleList.get(i).toString()));  
  18.                else if(field.getColumnType().equalsIgnoreCase("long"))  
  19.                    prepStatement.setLong(dbIndex,   
  20.                        Long.parseLong(inputTupleList.get(i).toString()));  
  21.                else if(field.getColumnType().equalsIgnoreCase("float"))  
  22.                    prepStatement.setFloat(dbIndex,   
  23.                        Float.parseFloat(inputTupleList.get(i).toString()));  
  24.                else if(field.getColumnType().equalsIgnoreCase("double"))  
  25.                    prepStatement.setDouble(dbIndex,   
  26.                        Double.parseDouble(inputTupleList.get(i).toString()));  
  27.                else if(field.getColumnType().equalsIgnoreCase("short"))  
  28.                    prepStatement.setShort(dbIndex,   
  29.                        Short.parseShort(inputTupleList.get(i).toString()));  
  30.                else if(field.getColumnType().equalsIgnoreCase("boolean"))  
  31.                    prepStatement.setBoolean(dbIndex,   
  32.                        Boolean.parseBoolean(inputTupleList.get(i).toString()));  
  33.                else if(field.getColumnType().equalsIgnoreCase("byte"))  
  34.                    prepStatement.setByte(dbIndex,   
  35.                        Byte.parseByte(inputTupleList.get(i).toString()));  
  36.                else if(field.getColumnType().equalsIgnoreCase("Date"))  
  37.                {  
  38.                   Date dateToAdd=null;  
  39.                   if (!(inputTupleList.get(i) instanceof Date))    
  40.                   {    
  41.                        DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");  
  42.                        try   
  43.                        {  
  44.                            dateToAdd = df.parse(inputTupleList.get(i).toString());  
  45.                        }  
  46.                        catch (ParseException e)   
  47.                        {  
  48.                            System.err.println("Data type not valid");  
  49.                        }  
  50.                    }    
  51.                    else 
  52.                    {  
  53.             dateToAdd = (Date)inputTupleList.get(i);  
  54.             java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime());  
  55.             prepStatement.setDate(dbIndex, sqlDate);  
  56.             }     
  57.             }   
  58.         catch (SQLException e)   
  59.         {  
  60.              e.printStackTrace();  
  61.         }  
  62.     }  
  63.     Date now = new Date();            
  64.     try 
  65.     {  
  66.         prepStatement.setTimestamp(dbIndex+1new java.sql.Timestamp(now.getTime()));  
  67.         prepStatement.addBatch();  
  68.         counter.incrementAndGet();  
  69.         if (counter.get()== batchSize)   
  70.         executeBatch();  
  71.     }   
  72.     catch (SQLException e1)   
  73.     {  
  74.         e1.printStackTrace();  
  75.     }             
  76.    }  
  77.    else 
  78.    {  
  79.         long curTime = System.currentTimeMillis();  
  80.        long diffInSeconds = (curTime-startTime)/(60*1000);  
  81.        if(counter.get()&#60;batchSize && diffInSeconds&#62;batchTimeWindowInSeconds)  
  82.        {  
  83.             try {  
  84.                 executeBatch();  
  85.                 startTime = System.currentTimeMillis();  
  86.             }  
  87.             catch (SQLException e) {  
  88.                  e.printStackTrace();  
  89.             }  
  90.        }  
  91.    }  
  92. }  
  93.    
  94. public void executeBatch() throws SQLException  
  95. {  
  96.     batchExecuted=true;  
  97.     prepStatement.executeBatch();  
  98.     counter = new AtomicInteger(0);  

一旦Spout和Bolt準備就緒(等待被執行),topology生成器將會建立topology並準備執行。下面就來看一下執行步驟。

在本地集群上運行和測試topology

  • 通過TopologyBuilder建立topology。
  • 使用Storm Submitter,將topology遞交給集群。以topology的名字、配置和topology的對象作為參數。
  • 提交topology。

Listing Eight:建立和執行topology。

  1. public class StormMain  
  2. {  
  3.      public static void main(String[] args) throws AlreadyAliveException,   
  4.                                                    InvalidTopologyException,   
  5.                                                    InterruptedException   
  6.      {  
  7.           ParallelFileSpout parallelFileSpout = new ParallelFileSpout();  
  8.           ThresholdBolt thresholdBolt = new ThresholdBolt();  
  9.           DBWriterBolt dbWriterBolt = new DBWriterBolt();  
  10.           TopologyBuilder builder = new TopologyBuilder();  
  11.           builder.setSpout("spout", parallelFileSpout, 1);  
  12.           builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout");  
  13.           builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt");  
  14.           if(this.argsMain!=null && this.argsMain.length &#620)   
  15.           {  
  16.               conf.setNumWorkers(1);  
  17.               StormSubmitter.submitTopology(   
  18.                    this.argsMain[0], conf, builder.createTopology());  
  19.           }  
  20.           else 
  21.           {      
  22.               Config conf = new Config();  
  23.               conf.setDebug(true);  
  24.               conf.setMaxTaskParallelism(3);  
  25.               LocalCluster cluster = new LocalCluster();  
  26.               cluster.submitTopology(  
  27.               "Threshold_Test", conf, builder.createTopology());  
  28.           }  
  29.      }  

topology被建立後將被提交到本地集群。一旦topology被提交,除非被取締或者集群關閉,它將一直保持運行不需要做任何的修改。這也是Storm的另一大特色之一。

這個簡單的例子體現了當你掌握了topology、spout和bolt的概念,將可以輕鬆的使用Storm進行實時處理。如果你既想處理大數據又不想遍歷Hadoop的話,不難發現使用Storm將是個很好的選擇。

原文連結:Easy, Real-Time Big Data Analysis Using Storm (編譯/仲浩 王旭東/審校)

歡迎關注@CSDN雲計算微博,了解更多雲信息。

本文為CSDN編譯整理,未經允許不得轉載。如需轉載請聯繫market@csdn.net

相關焦點

  • Flume+Kafka+Storm+Redis構建大數據實時處理系統
    在下面給出的完整案例中,我們將會完成下面的幾項工作:如何一步步構建我們的實時處理系統(Flume+Kafka+Storm+Redis)實時處理網站的用戶訪問日誌,並統計出該網站的PV、UV將實時分析出的PV、UV動態地展示在我們的前面頁面上如果你對上面提及的大數據組件已經有所認識,或者對如何構建大數據實時處理系統感興趣,那麼就可以盡情閱讀下面的內容了
  • 利用flume+kafka+storm+mysql構建大數據實時系統
    【數盟致力於成為最卓越的數據科學社區,聚焦於大數據、分析挖掘、數據可視化領域,業務範圍:線下活動、在線課程、獵頭服務】
  • 實時海量日誌分析系統的架構設計、實現以及思考
    由於需要對日誌進行實時分析,所以Storm是我們想到的首個框架。Storm是一個分布式實時計算系統,它可以很好的處理流式數據。利用storm我們幾乎可以直接實現一個日誌分析系統,但是將日誌分析系統進行模塊化設計可以收到更好的效果。模塊化的設計至少有兩方面的優點:模塊化設計可以使功能更加清晰。
  • 阿里雲實時大數據解決方案,助力企業實時分析與決策
    在全增量實時同步解決方案系統中,可以從MySql、Oracle、IBMDB2、SQLserver、POLARDB等關係型資料庫中全量離線同步到MaxCompute、Hologres、Elasticsearch、Kafka、DataHub等大數據產品中,再實現實時抽取關係型資料庫的變更信息,同步到大數據產品中。
  • 大數據實時分析平臺應用在哪些場景
    大數據平臺主要是解決對海量多樣化的數據源進行數據採集、數據存儲,數據分析和數據處理,並提供滿足日漸增長的擴展性要求。而且爬蟲爬過來的數據是輿情,通過大數據技術進行分詞之後得到的可能是大段的網友評論,客戶往往要求對輿情進行查詢,做全文本搜索,並要求響應時間控制在秒級。爬蟲將數據爬到大數據平臺的Kafka裡,在裡面做流處理,去重去噪做語音分析,寫到ElasticSearch裡。大數據的一個特點是多數據源,大數據平臺能根據不同的場景選擇不同的數據源。
  • Python大數據分析疫情:如何實現實時數據爬取及Matplotlib可視化
    作者 | 楊秀璋來源 | CSDN博客專家Eastmount責編 | 夕顏思來想去,雖然很忙,但還是擠時間針對這次肺炎疫情寫個Python大數據分析系列博客,包括網絡爬蟲、可視化分析、GIS地圖顯示、情感分析、輿情分析
  • Flink 如何實時分析 Iceberg 數據湖的 CDC 數據
    我們通常想到的第一個方案,就是把 CDC upsert 的數據通過 Flink 進行一些處理之後,實時的寫到 HBase 當中。HBase 是一個在線的、能提供在線點查能力的一種資料庫,具有非常高的實時性,對寫入操作是非常友好的,也可以支持一些小範圍的查詢,而且集群可擴展。這種方案其實跟普通的點查實時鏈路是同一套,那麼用 HBase 來做大數據的  OLAP 的查詢分析有什麼問題呢?
  • Apache Doris 在 WeLab實時大數據平臺的應用實踐
    我們以定製化服務的方式為合作夥伴提供金融智能解決方案,幫助合作夥伴實現金融科技創新。WeLab擁有獨創的風險管理技術,可以高效地整合和分析移動端大數據,並對用戶的風險進行定級,高效地輸出決策。為了實現秒級決策,我們對數據處理的實時性,準確性和安全性都有很高的要求。Apache Doris資料庫就是在這樣的背景下被引入到我們大數據平臺中來的,並最終成為了我們大數據平臺的重要基石之一。
  • 20個最好的網站數據實時分析工具
    WoopraWoopra將實時統計帶到了另一個層次,它能實時直播網站的訪問數據,你甚至可以使用Woopra Chat部件與用戶聊天。它還擁有先進的通知功能,可讓你建立各類通知,如電子郵件、聲音、彈出框等。
  • Apache Eagle:分布式實時 Hadoop 數據安全方案
    Apache Eagle提供一套高效分布式的流式策略引擎,具有高實時、可伸縮、易擴展、交互友好等特點,同時集成機器學習對用戶行為建立Profile以實現實時智能實時地保護Hadoop生態系統中大數據的安全。背景隨著大數據的發展,越來越多的成功企業或者組織開始採取數據驅動商業的運作模式。
  • 數據戰爭——直面海量處理+實時分析的雙重挑戰
    日前,在北京舉辦的大數據世界論壇上,內存計算、實時查詢、有效的存儲管理、智能挖掘分析,成為了眾多IT人士關注的焦點。用戶需求——海量+實時分析來自IDC全球存儲及大數據研究項目副總裁Benjamin Woo表示,到2020年,全球數據使用量預計暴增44倍,達到35.2ZB。35ZB是什麼概念呢?
  • FFT實時譜分析系統的FPGA設計和實現
    摘要: 採用按時間抽選的基4原位算法和坐標旋轉數字式計算機(CORDIC)算法實現了一個FFT實時譜分析系統。整個設計採用流水線工作方式,保證了系統的速度,避免了瓶頸的出現;整個系統採用FPGA實現,實驗表明,該系統既有DSP器件實現的靈活性又有專用 FFT晶片實現的高速數據吞吐能力,可以廣泛地應用於數位訊號處理的各個領域。
  • 基於MATLAB的實時數據採集與分析研究
    根據各種非電或電信號的特徵,利用相應的歸一化技術,將其轉換為可真實反映事物特徵的電信號後,經A/D轉換器轉換為計算機可識別的有限長二進位數字編碼,以此作為研究自然科學和實現工業實時控制的重要依據,實現對宏觀和微觀自然科學的量化認識,典型的數據採集系統組成如圖1所示。
  • 專訪騰訊蔣傑:深度揭秘騰訊大數據平臺
    大數據,這個詞越來越熱,很多人都在談大數據,其實很多張口閉口大數據的人,或許都不知道數據是如何產生、傳遞、存儲、運算到應用的。其實我一直感覺大數據這個東西有時候真的不是一般企業可以玩的溜的,特別是隨著傳統業務增長放緩,以及移動網際網路時代的精細化運營,對於大數據分析和挖掘的重視程度高於以往任何時候,如何從大數據中獲取高價值,已經成為大家關心的焦點問題。
  • Apache Eagle——eBay開源分布式實時Hadoop數據安全方案
    日前,eBay公司隆重宣布正式向開源業界推出分布式實時安全監控引方案—— Apache Eagle,該項目已正式加入Apache 稱為孵化器項目。Apache Eagle提供一套高效分布式的流式策略引擎,具有高實時、可伸縮、易擴展、交互友好等特點,同時集成機器學習對用戶行為建立Profile以實現實時智能實時地保護Hadoop生態系統中大數據的安全。
  • 大數據學習資料下載,新手攻略,數據分析工具、軟體使用教程
    大數據被認為是「未來的新石油」,在社會生產、流通、分配、消費活動以及經濟運行機制等方面發揮著重要的作用。作為 IT 類職業中的「大熊貓」,大數據工程師的收入待遇可以說達到了同類的頂級。國內 IT、通訊、行業招聘中,有 10% 都是和大數據相關的,且比例還在上升。
  • 萬億數據下的多維實時分析系統,如何做到亞秒級響應
    導語當業務發展到一定規模,實時數據倉庫是一個必要的基礎服務。從數據驅動方面考慮,多維實時數據分析系統的重要性也不言而喻。但是當數據量巨大的情況下,拿騰訊看點來說,一天上報的數據量達到萬億級的規模,要實現極低延遲的實時計算和亞秒級的多維實時查詢是有技術挑戰的。
  • RichData大數據智能分析平臺
    RichData是一個跟具體行業業務無關的、通用性的大數據平臺工具產品,通過這個產品所具備的的高性能的實時和非實時大數據計算能力、豐富的統計、分析、挖掘模型,為行業全流程、全周期的生產運營活動提供商業智能支持。
  • 帆軟|探索性分析/數據挖掘/大數據大並發FineBI5.0更新
    2、分析用戶  針對深度分析用戶,他們往往就有數據分析基礎和理解模型的能力,對數據有再處理加工和深度分析挖掘的需求。此時BI更多充當快速可視化分析的工具,通過可視化輔助數據分析時的思考。  針對初級分析用戶,他們的需求往往是Dashboard即席分析,結合可視化有主題的展示業務數據,實時監控,預警分析。此時的BI最好能有較低的上手門檻,能快速取數。
  • 一份大數據學習秘籍,值得收藏
    存儲方面一般有物理存儲、分布式對象存儲、大資料庫存儲等模式。物理存儲定義是數據存儲在磁碟上。存儲類型方面支持文件存儲、塊存儲、對象存儲。分布式存儲主要是提供多存儲節點來實現海量數據的存儲和方面,支持高可用、高性能、高伸縮性。