利用flume+kafka+storm+mysql構建大數據實時系統

2021-03-02 數盟

【數盟致力於成為最卓越的數據科學社區,聚焦於大數據、分析挖掘、數據可視化領域,業務範圍:線下活動、在線課程、獵頭服務】

這年頭不會點Python都不好意思跟隔壁老王打招呼,數盟線下工作坊:手把手教你玩轉Python,詳情點擊文末閱讀原文!

作者:停不下的腳步

出處:停不下的腳步的博客

架構圖

數據流向圖

實時日誌分析系統架構簡介

系統主要分為四部分:

1).數據採集

負責從各節點上實時採集數據,選用cloudera的flume來實現

2).數據接入

由於採集數據的速度和數據處理的速度不一定同步,因此添加一個消息中間件來作為緩衝,選用apache的kafka

3).流式計算

對採集到的數據進行實時分析,選用apache的storm

4).數據輸出

對分析後的結果持久化,暫定用mysql

詳細介紹各個組件及安裝配置:

作業系統:centos6.4

Flume

Flume是Cloudera提供的一個分布式、可靠、和高可用的海量日誌採集、聚合和傳輸的日誌收集系統,支持在日誌系統中定製各類數據發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接受方(可定製)的能力。

下圖為flume典型的體系結構:

Flume數據源以及輸出方式:

Flume提供了從console(控制臺)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日誌系統,支持TCP和UDP等2種模式),exec(命令執行)等數據源上收集數據的能力,在我們的系統中目前使用exec方式進行日誌採集。

Flume的數據接受方,可以是console(控制臺)、text(文件)、dfs(HDFS文件)、RPC(Thrift-RPC)和syslogTCP(TCP syslog日誌系統)等。在我們系統中由kafka來接收。

Flume版本:1.4.0

Flume下載及文檔:

http://flume.apache.org/

Flume安裝:

$tar zxvf apache-flume-1.4.0-bin.tar.gz /usr/local

Flume啟動命令:

$bin/flume-ng agent –conf conf –conf-file conf/flume-conf.properties –name producer -Dflume.root.logger=INFO,console

注意事項:需要更改conf目錄下的配置文件,並且添加jar包到lib目錄下。

Kafka

Kafka是一個消息中間件,它的特點是:

1、關注大吞吐量,而不是別的特性

2、針對實時性場景

3、關於消息被處理的狀態是在consumer端維護,而不是由kafka server端維護。

4、分布式,producer、broker和consumer都分布於多臺機器上。

下圖為kafka的架構圖:

Kafka版本:0.8.0

Kafka下載及文檔:http://kafka.apache.org/

Kafka安裝:

> tar xzf kafka-<VERSION>.tgz

> cd kafka-<VERSION>

> ./sbt update

> ./sbt package

> ./sbt assembly-package-dependency Kafka

啟動及測試命令:

(1) start server

> bin/zookeeper-server-start.sh config/zookeeper.properties

> bin/kafka-server-start.sh config/server.properties

(2)Create a topic
> bin/kafka-create-topic.sh –zookeeper localhost:2181 –replica 1 –partition 1 –topic test

> bin/kafka-list-topic.sh –zookeeper localhost:2181

(3)Send some messages

> bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test

(4)Start a consumer

> bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning

storm

Storm是一個分布式的、高容錯的實時計算系統。

Storm架構圖:

storm工作任務topology:

Storm 版本:0.9.0

Storm 下載:http://storm-project.net/

Storm安裝:

第一步,安裝Python2.7.2

# wget http://www.python.org/ftp/python/2.7.2/Python-2.7.2.tgz

# tar zxvf Python-2.7.2.tgz

# cd Python-2.7.2

# ./configure

# make

# make install

# vi /etc/ld.so.conf

第二步,安裝zookeeper(kafka自帶zookeeper,如果選用kafka的,該步可省略)

#wget http://ftp.meisei-u.ac.jp/mirror/apache/dist//zookeeper/zookeeper-3.3.3/zoo keeper-3.3.3.tar.gz

# tar zxf zookeeper-3.3.3.tar.gz

# ln -s /usr/local/zookeeper-3.3.3/ /usr/local/zookeeper

# vi ~./bashrc (設置ZOOKEEPER_HOME和ZOOKEEPER_HOME/bin)

第三步,安裝JAVA

$tar zxvf jdk-7u45-linux-x64.tar.gz /usr/local

如果使用storm0.9以下版本需要安裝zeromq及jzmq。

第四步,安裝zeromq以及jzmq

jzmq的安裝貌似是依賴zeromq的,所以應該先裝zeromq,再裝jzmq。

1)安裝zeromq(非必須):

# wget http://download.zeromq.org/historic/zeromq-2.1.7.tar.gz

# tar zxf zeromq-2.1.7.tar.gz

# cd zeromq-2.1.7

# ./configure

# make

# make install

# sudo ldconfig (更新LD_LIBRARY_PATH)

缺少c++環境:yum install gcc-c++

之後遇到的問題是:Error:cannot link with -luuid, install uuid-dev

這是因為沒有安裝uuid相關的package。

解決方法是:# yum install uuid*

# yum install e2fsprogs*

# yum install libuuid*

2)安裝jzmq(非必須)

然後,jzmq就裝好了,這裡有個網站上參考到的問題沒有遇見,遇見的童鞋可以參考下。在./autogen.sh這步如果報錯:autogen.sh:error:could not find libtool is required to run autogen.sh,這是因為缺少了libtool,可以用#yum install libtool*來解決。

如果安裝的是storm0.9及以上版本不需要安裝zeromq和jzmq,但是需要修改storm.yaml來指定消息傳輸為netty:

storm.local.dir: 「/tmp/storm/data」

storm.messaging.transport: "backtype.storm.messaging.netty.Context"

storm.messaging.netty.server_worker_threads: 1

storm.messaging.netty.client_worker_threads: 1

storm.messaging.netty.buffer_size: 5242880

storm.messaging.netty.max_retries: 100

storm.messaging.netty.max_wait_ms: 1000

storm.messaging.netty.min_wait_ms: 100

第五步,安裝storm

$unzip storm-0.9.0-wip16.zip

備註:單機版不需要修改配置文件,分布式在修改配置文件時要注意:冒號後必須加空格。

測試storm是否安裝成功:

1. 下載strom starter的代碼 git clone https://github.com/nathanmarz/storm-starter.git

2. 使用mvn -f m2-pom.xml package 進行編譯

如果沒有安裝過maven,參見如下步驟安裝:
1.從maven的官網下載http://maven.apache.org/

tar zxvf apache-maven-3.1.1-bin.tar.gz /usr/local

配置maven環境變量

export MAVEN_HOME=/usr/local/maven

export PATH=$PATH:$MAVEN_HOME/bin

驗證maven是否安裝成功:mvn -v

修改Storm-Starter的pom文件m2-pom.xml ,修改dependency中twitter4j-core 和 twitter4j-stream兩個包的依賴版本,如下:

org.twitter4j
twitter4j-core
[2.2,)

org.twitter4j
twitter4j-stream
[2.2,)

編譯完後生成target文件夾

啟動zookeeper

zkServer.sh start

啟動nimbus supervisor ui

storm nimbus

storm supervisor

storm ui

jps查看啟動狀態

進入target目錄執行:

storm jar storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar storm.starter.WordCountTopology wordcountTop

然後查看http://localhost:8080

注釋:單機版 不用修改storm.yaml

kafka和storm整合

1.下載kafka-storm0.8插件:https://github.com/wurstmeister/storm-kafka-0.8-plus

2.該項目下載下來需要調試下,找到依賴jar包。然後重新打包,作為我們的storm項目的jar包。

3.將該jar包及kafka_2.9.2-0.8.0-beta1.jar metrics-core-2.2.0.jar scala-library-2.9.2.jar (這三個jar包在kafka-storm-0.8-plus項目依賴中能找到)

備註:如果開發的項目需要其他jar,記得也要放進storm的Lib中比如用到了mysql就要添加mysql-connector-java-5.1.22-bin.jar到storm的lib下

flume和kafka整合

1.下載flume-kafka-plus: https://github.com/beyondj2ee/flumeng-kafka-plugin

2.提取插件中的flume-conf.properties文件

修改該文件:#source section

producer.sources.s.type = exec
producer.sources.s.command = tail -f -n+1 /mnt/hgfs/vmshare/test.log
producer.sources.s.channels = c

修改所有topic的值改為test

將改後的配置文件放進flume/conf目錄下

在該項目中提取以下jar包放入環境中flume的lib下:

以上為單機版的flume+kafka+storm的配置安裝

flume+storm插件

https://github.com/xiaochawan/edw-Storm-Flume-Connectors

啟動步驟

安裝好storm,flume,kafka之後開始項目部署啟動(在部署啟動之前最好按照安裝文檔進行storm kafka flume各個組件測試)。

第一步
將編寫好的storm項目打成jar包放入伺服器上,假如放在/usr/local/project/storm.xx.jar

注:關於storm項目的編寫見安裝文檔中的 kafka和storm整合 。

第二步

啟動zookeeper(這裡可以啟動kafka自帶的zookeeper或者啟動單獨安裝的kafka,以下以kafka自帶為例)

cd /usr/local/kafka

bin/zookeeper-server-start.sh config/zookeeper.properties

第三步

啟動kafka

cd /usr/local/kafka

> bin/kafka-server-start.sh config/server.properties

創建主題

> bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test

注:因為kafka消息的offset是由zookeeper記錄管理的,所以在此需指定zookeeper的ip,replica 表示該主題的消息被複製幾份,partition 表示每份主題被分割成幾部分。test表示主題名稱。

第四步

啟動storm

> storm nimbus

> storm supervisor

> storm ui

cd /usr/local/project/

> storm jar storm.xx.jar storm.testTopology test

註:storm.xx.jar 為我們編寫好的storm項目jar包,第一步完成的工作。 storm.testTopology 為storm項目中main方法所在的類路徑。test為此次topology的名字。

第五步

啟動flume

cd /usr/local/flume

bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer

註:flume.conf.properties為我們自定義的flume配置文件,flume安裝好後是沒有此文件的,需要我們自己編寫,編寫方式見flume安裝的文章。

至此需要啟動的程序已經全部啟動,storm項目已經開始運行,可以打開storm ui 觀察運行是否正常。

http://localhost:8080

注:此處ip為storm nimbus所在機器Ip 埠可在storm配置文件 storm/conf/storm.yaml中修改

點擊[閱讀原文] 報名數盟線下工作坊


相關焦點

  • Flume+Kafka+Storm+Redis構建大數據實時處理系統
    在下面給出的完整案例中,我們將會完成下面的幾項工作:如何一步步構建我們的實時處理系統(Flume+Kafka+Storm+Redis)實時處理網站的用戶訪問日誌,並統計出該網站的PV、UV將實時分析出的PV、UV動態地展示在我們的前面頁面上如果你對上面提及的大數據組件已經有所認識,或者對如何構建大數據實時處理系統感興趣,那麼就可以盡情閱讀下面的內容了
  • 實時海量日誌分析系統的架構設計、實現以及思考
    由於需要對日誌進行實時分析,所以Storm是我們想到的首個框架。Storm是一個分布式實時計算系統,它可以很好的處理流式數據。利用storm我們幾乎可以直接實現一個日誌分析系統,但是將日誌分析系統進行模塊化設計可以收到更好的效果。模塊化的設計至少有兩方面的優點:模塊化設計可以使功能更加清晰。
  • 如何使用Kafka在生產環境構建大規模機器學習
    AI 前線導語:這篇文章將介紹機器學習在任務關鍵型實時系統中的應用,將 Apache Kafka 作為中心化的、可伸縮的任務關鍵型系統,同時還將介紹使用 Kafka Streams API 來構建智能流式應用。
  • BIGO技術:實時計算平臺建設
    BIGO之前的ETL場景數據路徑通常是Kafka->flume->Hive。經過flume入庫的路徑存在著一下幾方面的問題:1.  Flume的容錯能力差,遇到已成可能會導致丟數據或者數據重複。2.  Flume的動態擴展能力差,流量突然到來時候很難立刻擴展。3.  一旦數據欄位或者格式發生變化,flume比較難於靈活調整。
  • kafka使用原理介紹
    KafkaKafka是最初由Linkedin公司開發,是一個分布式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分布式消息系統,它的最大的特性就是可以實時的處理大量數據以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web
  • 百度大數據三面題:shuffle過程+HBase+Spark優化+kmeans算法
    kafka的數據存在內存還是磁碟Hive與關係型資料庫的關係?Hive中存放是什麼?為什麼要用flume導入hdfs,hdfs的構架是怎樣的?為什麼要用flume導入hdfs,hdfs的構架是怎樣的Hbase行健列族的概念,物理模型,表的設計原則?
  • kafka極簡教程
    kafka是用於構建實時數據管道和流應用程式。具有橫向擴展,容錯,wicked fast(變態快)等優點,並已在成千上萬家公司運行。
  • 使用Storm實現實時大數據分析!
    簡單和明了,Storm讓大數據分析變得輕鬆加愉快。當今世界,公司的日常運營經常會生成TB級別的數據。數據來源囊括了網際網路裝置可以捕獲的任何類型數據,網站、社交媒體、交易型商業數據以及其它商業環境中創建的數據。考慮到數據的生成量,實時處理成為了許多機構需要面對的首要挑戰。
  • 構建實時數據倉庫,雲原生數據倉庫AnalyticDB for MySQL技術解密
    阿里雲分析型資料庫重磅推出基礎版,極大降低了用戶構建數據倉庫門檻。高度兼容MySQL,極低的使用成本和極高的性能,使中小企業也可以輕鬆的搭建一套實時數據倉庫,實現企業數據價值在線化。存儲計算分離架構、行列混存技術、輕量的索引構建方式和分布式混合計算引擎又保證了基礎版強大的分析性能。年成本不到一萬就可以構建一套實時數據倉庫,無需成立專門的大數據團隊,為企業節省百萬成本。1.基礎版技術架構如下為基礎版架構圖,整體由Coordinator和Worker組成,各自的職責如下介紹。
  • 數據同步中間件服務
    數據交換通過DataX進行中轉,任何數據源只要和DataX連接上即可以和已實現的任意數據源同步。    Databus是一個實時的、可靠的、支持事務的、保持一致性的數據變更抓取系統。2011年在LinkedIn正式進入生產系統,2013年開源。
  • 騰訊大牛教你ClickHouse實時同步MySQL數據
    由於ClickHouse本身無法很好地支持單條大批量的寫入,因此在實時同步數據方面需要藉助其他服務協助。本文給出一種結合Canal+Kafka的方案,並且給出在多個MySQL實例分庫分表的場景下,如何將多張MySQL數據表寫入同一張ClickHouse表的方法,歡迎大家批評指正。  首先來看看我們的需求背景:  1.
  • hadoop與數據挖掘的關係_區別_哪個好
    hadoop與數據挖掘的關係 大數據就是Hadoop嗎?當然不是,但是很多人一提到大數據就會立刻想到Hadoop。現在數據科學家利用海量數據創建數據模型為企業帶來的利益是以前所不可想像的,但是數據的潛力已經被完全挖掘出來了嗎,它滿足了人們的期待了嗎?今天小編就從Hadoop項目開始為你抽絲剝繭了解hadoop。
  • 大數據常見的數據框架你知道的有哪些呢?
    大數據的出現背景最初為谷歌提出的三篇關於大數據的論文,分別是GFS論文,MapReduce論文和BigTable論文。這三篇論文奠定了大數據發展的基礎。我們常見的大數據框架hadoop,flume,hive,kafka,hbase,pig,spark.等等這些大數據框架,所謂大數據生態系統的集群,就是由這些大數據組件組成的大數據生態系統,每一個組件都有在處理各種數據的特點以及它自己獨特的優點,這些組件的出現為大數據的快速發展提供了基礎。
  • 圖解SparkStreaming與Kafka的整合,細節滿滿
    前言老劉是一名即將找工作的研二學生,寫博客一方面是複習總結大數據開發的知識點,一方面是希望幫助更多自學的小夥伴。由於老劉是自學大數據開發,肯定會存在一些不足,還希望大家能夠批評指正,讓我們一起進步!我們要知道Spark作為實時計算框架,它僅僅涉及到計算,並沒有涉及到數據的存儲,所以我們後期需要使用spark對接外部的數據源。
  • 千鋒大數據培訓課程大綱有什麼?
    隨著大數據技術不斷發展,想要分得大數據一杯羹的人總是源源不斷,學習大數據技術已經成為一種潮流。,包括但不局限於:元數據管理、數據開發測試工具與方法、數據質量、主數據管理  6、 掌握實時流計算技術,有storm開發經驗者優先  了解了企業對大數據工程師的應聘要求,我們也就是有了目標,這樣學習起來目的性更強,學到的技術知識也是系統的!
  • 面試大數據分析師,你需要掌握的基礎技術棧.
    在日常的工作中,大數據分析師的核心工作內容便是從大量數據中,通過數據處理、統計分析和算法挖掘等技術手段,將數據轉換成有效的信息,從而進一步轉化為知識並內化為智慧。因此,涉及的技術會非常廣泛,這取決於具體的業務需求和公司技術架構。本文我們著重介紹大部分公司常用的技術,也是面試中最容易被問到的技術。
  • Logstash讀取Kafka數據寫入HDFS詳解
    強大的功能,豐富的插件,讓logstash在數據處理的行列中出類拔萃通常日誌數據除了要入ES提供實時展示和簡單統計外,還需要寫入大數據集群來提供更為深入的邏輯處理,前邊幾篇ELK的文章介紹過利用logstash將kafka的數據寫入到elasticsearch集群,這篇文章將會介紹如何通過logstash將數據寫入HDFS本文所有演示均基於logstash
  • Kafka快速入門秘籍:背景介紹,應用場景分析、核心架構分析
    即實現了系統解耦,又提升了系統響應的速度4)消息中間件介紹:消息中間件(Message Queue Middleware,簡稱MQ)又稱為消息隊列,是指利用高效可靠的消息傳遞機制進行與平臺無關的數據交流
  • 實戰|Kafka + Flink + Redis 的電商大屏實時計算案
    實時大屏(real-time dashboard)正在被越來越多的企業採用,用來及時呈現關鍵的數據指標。並且在實際操作中,肯定也不會僅僅計算一兩個維度。由於Flink的「真·流式計算」這一特點,它比Spark Streaming要更適合大屏應用。本文從筆者的實際工作經驗抽象出簡單的模型,並簡要敘述計算流程(當然大部分都是源碼)。
  • 數據管道 Logstash 入門
    •從 kafka 中消費消息,處理數據,寫入 elasticsearch 。為什麼要用 Logstash ?方便省事。假設你需要從 kafka 中消費數據,然後寫入 elasticsearch ,如果自己編碼,你得去對接 kafka 和 elasticsearch 的 API 吧,如果你用 Logstash ,這部分就不用自己去實現了,因為 Logstash 已經為你封裝了對應的 plugin 插件,你只需要寫一個配置文件形如:input { kafka