簡單和明了,Storm讓大數據分析變得輕鬆加愉快。
當今世界,公司的日常運營經常會生成TB級別的數據。數據來源囊括了網際網路裝置可以捕獲的任何類型數據,網站、社交媒體、交易型商業數據以及其它商業環境中創建的數據。考慮到數據的生成量,實時處理成為了許多機構需要面對的首要挑戰。我們經常用的一個非常有效的開源實時計算工具就是Storm —— Twitter開發,通常被比作「實時的Hadoop」。然而Storm遠比Hadoop來的簡單,因為用它處理大數據不會帶來新老技術的交替。
Shruthi Kumar、Siddharth Patankar共同效力於Infosys,分別從事技術分析和研發工作。本文詳述了Storm的使用方法,例子中的項目名稱為「超速報警系統(Speeding Alert System)」。我們想實現的功能是:實時分析過往車輛的數據,一旦車輛數據超過預設的臨界值 —— 便觸發一個trigger並把相關的數據存入資料庫。
Storm
對比Hadoop的批處理,Storm是個實時的、分布式以及具備高容錯的計算系統。同Hadoop一樣Storm也可以處理大批量的數據,然而Storm在保證高可靠性的前提下還可以讓處理進行的更加實時;也就是說,所有的信息都會被處理。Storm同樣還具備容錯和分布計算這些特性,這就讓Storm可以擴展到不同的機器上進行大批量的數據處理。他同樣還有以下的這些特性:
- 易於擴展。對於擴展,你只需要添加機器和改變對應的topology(拓撲)設置。Storm使用Hadoop Zookeeper進行集群協調,這樣可以充分的保證大型集群的良好運行。
- 每條信息的處理都可以得到保證。
- Storm集群管理簡易。
- Storm的容錯機能:一旦topology遞交,Storm會一直運行它直到topology被廢除或者被關閉。而在執行中出現錯誤時,也會由Storm重新分配任務。
- 儘管通常使用Java,Storm中的topology可以用任何語言設計。
當然為了更好的理解文章,你首先需要安裝和設置Storm。需要通過以下幾個簡單的步驟:
- 從Storm官方下載Storm安裝文件
- 將bin/directory解壓到你的PATH上,並保證bin/storm腳本是可執行的。
Storm組件
Storm集群主要由一個主節點和一群工作節點(worker node)組成,通過 Zookeeper進行協調。
主節點:
主節點通常運行一個後臺程序 —— Nimbus,用於響應分布在集群中的節點,分配任務和監測故障。這個很類似於Hadoop中的Job Tracker。
工作節點:
工作節點同樣會運行一個後臺程序 —— Supervisor,用於收聽工作指派並基於要求運行工作進程。每個工作節點都是topology中一個子集的實現。而Nimbus和Supervisor之間的協調則通過Zookeeper系統或者集群。
Zookeeper
Zookeeper是完成Supervisor和Nimbus之間協調的服務。而應用程式實現實時的邏輯則被封裝進Storm中的「topology」。topology則是一組由Spouts(數據源)和Bolts(數據操作)通過Stream Groupings進行連接的圖。下面對出現的術語進行更深刻的解析。
Spout:
簡而言之,Spout從來源處讀取數據並放入topology。Spout分成可靠和不可靠兩種;當Storm接收失敗時,可靠的Spout會對tuple(元組,數據項組成的列表)進行重發;而不可靠的Spout不會考慮接收成功與否只發射一次。而Spout中最主要的方法就是nextTuple(),該方法會發射一個新的tuple到topology,如果沒有新tuple發射則會簡單的返回。
Bolt:
Topology中所有的處理都由Bolt完成。Bolt可以完成任何事,比如:連接的過濾、聚合、訪問文件/資料庫、等等。Bolt從Spout中接收數據並進行處理,如果遇到複雜流的處理也可能將tuple發送給另一個Bolt進行處理。而Bolt中最重要的方法是execute(),以新的tuple作為參數接收。不管是Spout還是Bolt,如果將tuple發射成多個流,這些流都可以通過declareStream()來聲明。
Stream Groupings:
Stream Grouping定義了一個流在Bolt任務間該如何被切分。這裡有Storm提供的6個Stream Grouping類型:
1. 隨機分組(Shuffle grouping):隨機分發tuple到Bolt的任務,保證每個任務獲得相等數量的tuple。
2. 欄位分組(Fields grouping):根據指定欄位分割數據流,並分組。例如,根據「user-id」欄位,相同「user-id」的元組總是分發到同一個任務,不同「user-id」的元組可能分發到不同的任務。
3. 全部分組(All grouping):tuple被複製到bolt的所有任務。這種類型需要謹慎使用。
4. 全局分組(Global grouping):全部流都分配到bolt的同一個任務。明確地說,是分配給ID最小的那個task。
5. 無分組(None grouping):你不需要關心流是如何分組。目前,無分組等效於隨機分組。但最終,Storm將把無分組的Bolts放到Bolts或Spouts訂閱它們的同一線程去執行(如果可能)。
6. 直接分組(Direct grouping):這是一個特別的分組類型。元組生產者決定tuple由哪個元組處理者任務接收。
當然還可以實現CustomStreamGroupimg接口來定製自己需要的分組。
項目實施
當下情況我們需要給Spout和Bolt設計一種能夠處理大量數據(日誌文件)的topology,當一個特定數據值超過預設的臨界值時促發警報。使用Storm的topology,逐行讀入日誌文件並且監視輸入數據。在Storm組件方面,Spout負責讀入輸入數據。它不僅從現有的文件中讀入數據,同時還監視著新文件。文件一旦被修改Spout會讀入新的版本並且覆蓋之前的tuple(可以被Bolt讀入的格式),將tuple發射給Bolt進行臨界分析,這樣就可以發現所有可能超臨界的記錄。
下一節將對用例進行詳細介紹。
臨界分析
這一節,將主要聚焦於臨界值的兩種分析類型:瞬間臨界(instant thershold)和時間序列臨界(time series threshold)。
- 瞬間臨界值監測:一個欄位的值在那個瞬間超過了預設的臨界值,如果條件符合的話則觸發一個trigger。舉個例子當車輛超越80公裡每小時,則觸發trigger。
- 時間序列臨界監測:欄位的值在一個給定的時間段內超過了預設的臨界值,如果條件符合則觸發一個觸發器。比如:在5分鐘類,時速超過80KM兩次及以上的車輛。
Listing One顯示了我們將使用的一個類型日誌,其中包含的車輛數據信息有:車牌號、車輛行駛的速度以及數據獲取的位置。
AB 123 | 60 | North city |
BC 123 | 70 | South city |
CD 234 | 40 | South city |
DE 123 | 40 | East city |
EF 123 | 90 | South city |
GH 123 | 50 | West city |
這裡將創建一個對應的XML文件,這將包含引入數據的模式。這個XML將用於日誌文件的解析。XML的設計模式和對應的說明請見下表。
XML文件和日誌文件都存放在Spout可以隨時監測的目錄下,用以關注文件的實時更新。而這個用例中的topology請見下圖。
Figure 1:Storm中建立的topology,用以實現數據實時處理
如圖所示:FilelistenerSpout接收輸入日誌並進行逐行的讀入,接著將數據發射給ThresoldCalculatorBolt進行更深一步的臨界值處理。一旦處理完成,被計算行的數據將發送給DBWriterBolt,然後由DBWriterBolt存入給資料庫。下面將對這個過程的實現進行詳細的解析。
Spout的實現
Spout以日誌文件和XML描述文件作為接收對象。XML文件包含了與日誌一致的設計模式。不妨設想一下一個示例日誌文件,包含了車輛的車牌號、行駛速度、以及數據的捕獲位置。(看下圖)
Figure2:數據從日誌文件到Spout的流程圖
Listing Two顯示了tuple對應的XML,其中指定了欄位、將日誌文件切割成欄位的定界符以及欄位的類型。XML文件以及數據都被保存到Spout指定的路徑。
Listing Two:用以描述日誌文件的XML文件。
- <TUPLEINFO>
- <FIELDLIST>
- <FIELD>
- <COLUMNNAME>vehicle_number</COLUMNNAME>
- <COLUMNTYPE>string</COLUMNTYPE>
- </FIELD>
-
- <FIELD>
- <COLUMNNAME>speed</COLUMNNAME>
- <COLUMNTYPE>int</COLUMNTYPE>
- </FIELD>
-
- <FIELD>
- <COLUMNNAME>location</COLUMNNAME>
- <COLUMNTYPE>string</COLUMNTYPE>
- </FIELD>
- </FIELDLIST>
- <DELIMITER>,</DELIMITER>
- </TUPLEINFO>
通過構造函數及它的參數Directory、PathSpout和TupleInfo對象創建Spout對象。TupleInfo儲存了日誌文件的欄位、定界符、欄位的類型這些很必要的信息。這個對象通過XSTream序列化XML時建立。
Spout的實現步驟:
- 對文件的改變進行分開的監聽,並監視目錄下有無新日誌文件添加。
- 在數據得到了欄位的說明後,將其轉換成tuple。
- 聲明Spout和Bolt之間的分組,並決定tuple發送給Bolt的途徑。
Spout的具體編碼在Listing Three中顯示。
Listing Three:Spout中open、nextTuple和delcareOutputFields方法的邏輯。
- public void open( Map conf, TopologyContext context,SpoutOutputCollector collector )
- {
- _collector = collector;
- try
- {
- fileReader = new BufferedReader(new FileReader(new File(file)));
- }
- catch (FileNotFoundException e)
- {
- System.exit(1);
- }
- }
-
- public void nextTuple()
- {
- protected void ListenFile(File file)
- {
- Utils.sleep(2000);
- RandomAccessFile access = null;
- String line = null;
- try
- {
- while ((line = access.readLine()) != null)
- {
- if (line !=null)
- {
- String[] fields=null;
- if (tupleInfo.getDelimiter().equals("|")) fields = line.split("\\"+tupleInfo.getDelimiter());
- else
- fields = line.split (tupleInfo.getDelimiter());
- if (tupleInfo.getFieldList().size() == fields.length) _collector.emit(new Values(fields));
- }
- }
- }
- catch (IOException ex){ }
- }
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer)
- {
- String[] fieldsArr = new String [tupleInfo.getFieldList().size()];
- for(int i=0; i<tupleInfo.getFieldList().size(); i++)
- {
- fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName();
- }
- declarer.declare(new Fields(fieldsArr));
- }
declareOutputFileds()決定了tuple發射的格式,這樣的話Bolt就可以用類似的方法將tuple解碼。Spout持續對日誌文件的數據的變更進行監聽,一旦有添加Spout就會進行讀入並且發送給Bolt進行處理。
Bolt的實現
Spout的輸出結果將給予Bolt進行更深一步的處理。經過對用例的思考,我們的topology中需要如Figure 3中的兩個Bolt。
Figure 3:Spout到Bolt的數據流程。
ThresholdCalculatorBolt
Spout將tuple發出,由ThresholdCalculatorBolt接收並進行臨界值處理。在這裡,它將接收好幾項輸入進行檢查;分別是:
臨界值檢查
- 臨界值欄數檢查(拆分成欄位的數目)
- 臨界值數據類型(拆分後欄位的類型)
- 臨界值出現的頻數
- 臨界值時間段檢查
Listing Four中的類,定義用來保存這些值。
Listing Four:ThresholdInfo類
- public class ThresholdInfo implementsSerializable
-
- {
- private String action;
- private String rule;
- private Object thresholdValue;
- private int thresholdColNumber;
- private Integer timeWindow;
- private int frequencyOfOccurence;
- }
基於欄位中提供的值,臨界值檢查將被Listing Five中的execute()方法執行。代碼大部分的功能是解析和接收值的檢測。
Listing Five:臨界值檢測代碼段
- public void execute(Tuple tuple, BasicOutputCollector collector)
- {
- if(tuple!=null)
- {
- List<Object> inputTupleList = (List<Object>) tuple.getValues();
- int thresholdColNum = thresholdInfo.getThresholdColNumber();
- Object thresholdValue = thresholdInfo.getThresholdValue();
- String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();
- Integer timeWindow = thresholdInfo.getTimeWindow();
- int frequency = thresholdInfo.getFrequencyOfOccurence();
- if(thresholdDataType.equalsIgnoreCase("string"))
- {
- String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();
- String frequencyChkOp = thresholdInfo.getAction();
- if(timeWindow!=null)
- {
- long curTime = System.currentTimeMillis();
- long diffInMinutes = (curTime-startTime)/(1000);
- if(diffInMinutes>=timeWindow)
- {
- if(frequencyChkOp.equals("=="))
- {
- if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
- {
- count.incrementAndGet();
- if(count.get() > frequency)
- splitAndEmit(inputTupleList,collector);
- }
- }
- else if(frequencyChkOp.equals("!="))
- {
- if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
- {
- count.incrementAndGet();
- if(count.get() > frequency)
- splitAndEmit(inputTupleList,collector);
- }
- }
- else System.out.println("Operator not supported");
- }
- }
- else
- {
- if(frequencyChkOp.equals("=="))
- {
- if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
- {
- count.incrementAndGet();
- if(count.get() > frequency)
- splitAndEmit(inputTupleList,collector);
- }
- }
- else if(frequencyChkOp.equals("!="))
- {
- if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))
- {
- count.incrementAndGet();
- if(count.get() > frequency)
- splitAndEmit(inputTupleList,collector);
- }
- }
- }
- }
- else if(thresholdDataType.equalsIgnoreCase("int") || thresholdDataType.equalsIgnoreCase("double") || thresholdDataType.equalsIgnoreCase("float") || thresholdDataType.equalsIgnoreCase("long") || thresholdDataType.equalsIgnoreCase("short"))
- {
- String frequencyChkOp = thresholdInfo.getAction();
- if(timeWindow!=null)
- {
- long valueToCheck = Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());
- long curTime = System.currentTimeMillis();
- long diffInMinutes = (curTime-startTime)/(1000);
- System.out.println("Difference in minutes="+diffInMinutes);
- if(diffInMinutes>=timeWindow)
- {
- if(frequencyChkOp.equals("<"))
- {
- if(valueToCheck < Double.parseDouble(thresholdValue.toString()))
- {
- count.incrementAndGet();
- if(count.get() > frequency)
- splitAndEmit(inputTupleList,collector);
- }
- }
- else if(frequencyChkOp.equals(">"))
- {
- if(valueToCheck > Double.parseDouble(thresholdValue.toString()))
- {
- count.incrementAndGet();
- if(count.get() > frequency)
- splitAndEmit(inputTupleList,collector);
- }
- }
- else if(frequencyChkOp.equals("=="))
- {
- if(valueToCheck == Double.parseDouble(thresholdValue.toString()))
- {
- count.incrementAndGet();
- if(count.get() > frequency)
- splitAndEmit(inputTupleList,collector);
- }
- }
- else if(frequencyChkOp.equals("!="))
- {
- . . .
- }
- }
- }
- else
- splitAndEmit(null,collector);
- }
- else
- {
- System.err.println("Emitting null in bolt");
- splitAndEmit(null,collector);
- }
- }
經由Bolt發送的的tuple將會傳遞到下一個對應的Bolt,在我們的用例中是DBWriterBolt。
DBWriterBolt
經過處理的tuple必須被持久化以便於觸發tigger或者更深層次的使用。DBWiterBolt做了這個持久化的工作並把tuple存入了資料庫。表的建立由prepare()函數完成,這也將是topology調用的第一個方法。方法的編碼如Listing Six所示。
Listing Six:建表編碼。
- public void prepare( Map StormConf, TopologyContext context )
- {
- try
- {
- Class.forName(dbClass);
- }
- catch (ClassNotFoundException e)
- {
- System.out.println("Driver not found");
- e.printStackTrace();
- }
-
- try
- {
- connection driverManager.getConnection(
- "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);
- connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();
-
- StringBuilder createQuery = new StringBuilder(
- "CREATE TABLE IF NOT EXISTS "+tableName+"(");
- for(Field fields : tupleInfo.getFieldList())
- {
- if(fields.getColumnType().equalsIgnoreCase("String"))
- createQuery.append(fields.getColumnName()+" VARCHAR(500),");
- else
- createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+",");
- }
- createQuery.append("thresholdTimeStamp timestamp)");
- connection.prepareStatement(createQuery.toString()).execute();
-
-
- StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"(");
- String tempCreateQuery = new String();
- for(Field fields : tupleInfo.getFieldList())
- {
- insertQuery.append(fields.getColumnName()+",");
- }
- insertQuery.append("thresholdTimeStamp").append(") values (");
- for(Field fields : tupleInfo.getFieldList())
- {
- insertQuery.append("?,");
- }
-
- insertQuery.append("?)");
- prepStatement = connection.prepareStatement(insertQuery.toString());
- }
- catch (SQLException e)
- {
- e.printStackTrace();
- }
- }
數據分批次的插入資料庫。插入的邏輯由Listting Seven中的execute()方法提供。大部分的編碼都是用來實現可能存在不同類型輸入的解析。
Listing Seven:數據插入的代碼部分。
- public void execute(Tuple tuple, BasicOutputCollector collector)
- {
- batchExecuted=false;
- if(tuple!=null)
- {
- List<Object> inputTupleList = (List<Object>) tuple.getValues();
- int dbIndex=0;
- for(int i=0;i<tupleInfo.getFieldList().size();i++)
- {
- Field field = tupleInfo.getFieldList().get(i);
- try {
- dbIndex = i+1;
- if(field.getColumnType().equalsIgnoreCase("String"))
- prepStatement.setString(dbIndex, inputTupleList.get(i).toString());
- else if(field.getColumnType().equalsIgnoreCase("int"))
- prepStatement.setInt(dbIndex,
- Integer.parseInt(inputTupleList.get(i).toString()));
- else if(field.getColumnType().equalsIgnoreCase("long"))
- prepStatement.setLong(dbIndex,
- Long.parseLong(inputTupleList.get(i).toString()));
- else if(field.getColumnType().equalsIgnoreCase("float"))
- prepStatement.setFloat(dbIndex,
- Float.parseFloat(inputTupleList.get(i).toString()));
- else if(field.getColumnType().equalsIgnoreCase("double"))
- prepStatement.setDouble(dbIndex,
- Double.parseDouble(inputTupleList.get(i).toString()));
- else if(field.getColumnType().equalsIgnoreCase("short"))
- prepStatement.setShort(dbIndex,
- Short.parseShort(inputTupleList.get(i).toString()));
- else if(field.getColumnType().equalsIgnoreCase("boolean"))
- prepStatement.setBoolean(dbIndex,
- Boolean.parseBoolean(inputTupleList.get(i).toString()));
- else if(field.getColumnType().equalsIgnoreCase("byte"))
- prepStatement.setByte(dbIndex,
- Byte.parseByte(inputTupleList.get(i).toString()));
- else if(field.getColumnType().equalsIgnoreCase("Date"))
- {
- Date dateToAdd=null;
- if (!(inputTupleList.get(i) instanceof Date))
- {
- DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
- try
- {
- dateToAdd = df.parse(inputTupleList.get(i).toString());
- }
- catch (ParseException e)
- {
- System.err.println("Data type not valid");
- }
- }
- else
- {
- dateToAdd = (Date)inputTupleList.get(i);
- java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime());
- prepStatement.setDate(dbIndex, sqlDate);
- }
- }
- catch (SQLException e)
- {
- e.printStackTrace();
- }
- }
- Date now = new Date();
- try
- {
- prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime()));
- prepStatement.addBatch();
- counter.incrementAndGet();
- if (counter.get()== batchSize)
- executeBatch();
- }
- catch (SQLException e1)
- {
- e1.printStackTrace();
- }
- }
- else
- {
- long curTime = System.currentTimeMillis();
- long diffInSeconds = (curTime-startTime)/(60*1000);
- if(counter.get()<batchSize && diffInSeconds>batchTimeWindowInSeconds)
- {
- try {
- executeBatch();
- startTime = System.currentTimeMillis();
- }
- catch (SQLException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- public void executeBatch() throws SQLException
- {
- batchExecuted=true;
- prepStatement.executeBatch();
- counter = new AtomicInteger(0);
- }
一旦Spout和Bolt準備就緒(等待被執行),topology生成器將會建立topology並準備執行。下面就來看一下執行步驟。
在本地集群上運行和測試topology
- 通過TopologyBuilder建立topology。
- 使用Storm Submitter,將topology遞交給集群。以topology的名字、配置和topology的對象作為參數。
- 提交topology。
Listing Eight:建立和執行topology。
- public class StormMain
- {
- public static void main(String[] args) throws AlreadyAliveException,
- InvalidTopologyException,
- InterruptedException
- {
- ParallelFileSpout parallelFileSpout = new ParallelFileSpout();
- ThresholdBolt thresholdBolt = new ThresholdBolt();
- DBWriterBolt dbWriterBolt = new DBWriterBolt();
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("spout", parallelFileSpout, 1);
- builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout");
- builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt");
- if(this.argsMain!=null && this.argsMain.length > 0)
- {
- conf.setNumWorkers(1);
- StormSubmitter.submitTopology(
- this.argsMain[0], conf, builder.createTopology());
- }
- else
- {
- Config conf = new Config();
- conf.setDebug(true);
- conf.setMaxTaskParallelism(3);
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(
- "Threshold_Test", conf, builder.createTopology());
- }
- }
- }
topology被建立後將被提交到本地集群。一旦topology被提交,除非被取締或者集群關閉,它將一直保持運行不需要做任何的修改。這也是Storm的另一大特色之一。
這個簡單的例子體現了當你掌握了topology、spout和bolt的概念,將可以輕鬆的使用Storm進行實時處理。如果你既想處理大數據又不想遍歷Hadoop的話,不難發現使用Storm將是個很好的選擇。
原文連結:Easy, Real-Time Big Data Analysis Using Storm (編譯/仲浩 王旭東/審校)
歡迎關注@CSDN雲計算微博,了解更多雲信息。
本文為CSDN編譯整理,未經允許不得轉載。如需轉載請聯繫market@csdn.net