FileBeat + Kafka + ELK搭建與簡單示例

2021-02-19 IT大咖說

最近調研了一下ELK,自己也嘗試搭建了一套環境,用於學習, 現將整個部署的過程記錄下來

現 Filebeat 已經完全替代了 Logstash-Forwarder 成為新一代的日誌採集器,越來越多人開始使用它,所以現在基於 Filebeat ELK 架構如下圖

看完了圖,再來簡單介紹一下ELK

E lasticsearch :簡稱 ES,是ELK的核心,是基於Apache Lucene的開源數據搜尋引擎,可以實時快速的搜索和分析,性能強悍

L ogstash :一個具有實時傳輸能力的數據收集引擎,用來數據收集、分析、過濾日誌的工具,支持多類型日誌

K ibana :為Elasticsearch提供了分析和可視化的Web平臺 ,可以生成各種維度的表格,圖形, Kibana 還可以用於問題分析,可以很快的將異常事件或者事件範圍縮小到秒級或者個位數,從TB級別的數據中搜到關鍵的錯誤信息

Filebeat : 一個輕量級日誌採集器, 早期的 ELK 架構中使用 Logstash 收集、解析日誌,但是 Logstash 對內存、cpu、io 等資源消耗比較高, 相比 Logstash,Filebeat 所佔系統的 CPU 和內存幾乎可以忽略不計

Kafka:一種高吞吐量的分布式發布訂閱消息系統, 如果日誌量巨大,還需要引入Kafka用以均衡網絡傳輸,降低網絡閉塞, 保證數據不丟失,還可以系統之間解耦,具有更好的靈活性和擴展性

版本:

Filebeat:7.8.1
LogStash:7.8.1
Kibana:7.8.1
Elasticsearch:7.8.1
Kafka:2.2.2 - 2.12
Java:11

在選擇對應版本的時候需要注意,ES 7.8.1需要Java 11

Filebeat與Kafka對應版本的選擇,官網也有說明,建議Kafka版本在 0.11 和 2.2.2 之間,所以選用了Kafka 2.2.2

Filebea t

1.解壓

2.Filebeat配置起來很簡單,修改filebeat.yml配置文件如下

定義Filebeat的輸入:為目標日誌

定義Filebeat的輸出:為kakfa,指定topic

# ============================== Filebeat inputs ===============================

filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

- type: log

# Change to true to enable this input configuration.
enabled: true

# Paths that should be crawled and fetched. Glob based paths.
paths:
- /yw/log/*.log


# Logstash Output -
output.kafka:
enable: true
hosts: ["localhost:9092"]
topic: "test"

2.啟動Filebeat,需要指定配置文件,即剛才配置好的filebeat.yml

nohup ./filebeat -c filebeat.yml &

Kafka

1.解壓

2.修改配置文件 server.properties,修改如下配置

listeners=PLAINTEXT://localhost:9092

3.如果你沒有zookeeper,則可以使用kakfa自帶的zookeeper,配置zookeeper

# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

4.啟動zookeeper

cd bin
nohup ./zookeeper-server-start.sh ../config/zookeeper.properties &

查看進程是否正確啟動

[root@localhost bin]# jps
9139 QuorumPeerMain

5.啟動Kafka

nohup ./kafka-server-start.sh ../config/server.properties &

查看進程是否正確啟動

[root@localhost bin]# jps
9139 QuorumPeerMain
9683 Kafka

6.查看 topic,在我們配置 Filebeat 的時候定義了topic,當 Kafka 啟動後,就會創建 topic

[root@localhost bin]# ./kafka-topics.sh --list --bootstrap-server localhost:9092
test

[root@localhost bin]# ./kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1073741824
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

LogStash

1.解壓

2.修改配置文件 config/logstash.yml

node.name: localhost

3.在使用Logstash時,可根據需求創建自己的配置文件

定義 input :Logstash 去接 Filebeat 發送到 Kafka 的數據,配置 Kafka 相關的信息,指定 topic,並指明類型為 json

定義 output : 需要向 ES 中存儲,指定ES的地址,並創建索引: test – 年份.月份 ,配置文件如下

注意,需要保證格式的正確,要不然 LogStash會拋出異常

[root@localhost config]# cat test.conf
input {
kafka {
bootstrap_servers => ["localhost:9092"]
group_id => "test"
topics => ["test"]
consumer_threads => 1
codec => json {
charset => "UTF-8"
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "test-%{+YYYY.MM}"
}
}

3. 啟動 Logstash,需要指定我們自定義的配置文件

cd ../bin/
nohup ./logstash -f ../config/test.conf &

查看進程是否啟動

[root@localhost bin]# jps
9139 QuorumPeerMain
9683 Kafka
3619 Logstash

Elasticsearch

1.解壓

2.創建 elasticsearch 用戶(root用戶不能啟動 elasticsearch )

useradd elasticsearch

3.修改文件擁有用戶

chown -R elasticsearch elasticsearch/

切換用戶

su - elasticsearch

4.修改配置文件,config/elasticsearch.yml

http.port: 9200

5.修改Jvm配置文件 jvm.options,資源有限,需要限定一下堆的大小

-Xms256m
-Xmx512m

6.啟動 Elasticsearch

[root@localhost bin]# cd bin
[root@localhost bin]# ./elasticsearch

查看進程是否啟動

[2020-07-29T21:54:17,410][INFO ][o.e.h.AbstractHttpServerTransport] [localhost.localdomain] publish_address {127.0.0.1:9200}, bound_addresses {[::1]:9200}, {127.0.0.1:9200}
[2020-07-29T21:54:17,411][INFO ][o.e.n.Node ] [localhost.localdomain] started

[root@localhost bin]# jps
3619 Logstash
2516 Kafka
2955 QuorumPeerMain
3725 Elasticsearch
3262 Kafka

7.測試es是否好用

[root@localhost bin]# curl localhost:9200
{
"name" : "localhost.localdomain",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "2dcAImNFTli_lGTHJYx7_A",
"version" : {
"number" : "7.8.1",
"build_flavor" : "default",
"build_type" : "tar",
"build_hash" : "b5ca9c58fb664ca8bf9e4057fc229b3396bf3a89",
"build_date" : "2020-07-21T16:40:44.668009Z",
"build_snapshot" : false,
"lucene_version" : "8.5.1",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}

Kibana

1.解壓

2.創建 kibana 用戶

useradd kibana

3. 修改文件擁有用戶

chown -R kibana kibana/

切換用戶

su - kibana

4.修改配置文件 kibana.yml

elasticsearch.hosts: ["http://localhost:9200"]

5.啟動 kibana

[root@localhost kibana]# cd bin/
[root@localhost bin]# ./kibana

查看是否啟動成功,相關信息如下

[01:55:38.282] [info][listening] Server running at http://localhost:5601
[01:55:39.117] [info][server][Kibana][http] http server running at http://localhost:5601

所有組件都啟動成功後,打開瀏覽器輸入 localhost:5601 ,查看kibana

接下來就是見證奇蹟的時刻,網頁成功打開

然後連接自己的ES

下一步後,找到Logstash中設置好的索引

接下來,就是驗證整個流程了

讓監控的日誌中產生一些數據

[2020-07-29 22:12:49,689] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 37 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-29 22:22:49,645] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-29 22:32:49,646] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-29 22:42:49,646] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

在 Kafka 中消費一下,是否有消息發送到 topic 上

./kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092
{"@timestamp":"2020-07-30T02:43:17.329Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.8.1"},"ecs":{"version":"1.5.0"},"host":{"ip":["10.46.2.112","fe80::2959:f9cc:2a94:6ddc","192.168.122.1"],"mac":["00:0c:29:4e:57:2e","52:54:00:56:ed:eb","52:54:00:56:ed:eb"],"name":"localhost.localdomain","hostname":"localhost.localdomain","architecture":"x86_64","os":{"family":"redhat","name":"CentOS Linux","kernel":"3.10.0-1062.el7.x86_64","codename":"Core","platform":"centos","version":"7 (Core)"},"id":"17946cedccdd442b845d0cfa8693cc71","containerized":false},"agent":{"version":"7.8.1","hostname":"localhost.localdomain","ephemeral_id":"3a381097-c3fe-4339-b92b-fff537b9a9f2","id":"401bee7f-115f-447d-a1e5-a6b6c57e21a1","name":"localhost.localdomain","type":"filebeat"},"message":"[2020-07-29 22:04:48,914] INFO [GroupCoordinator 0]: Assignment received from leader for group console-consumer-98001 for generation 1 (kafka.coordinator.group.GroupCoordinator)","log":{"offset":668,"file":{"path":"/yw/log/1.log"}},"input":{"type":"log"}}
{"@timestamp":"2020-07-30T02:43:17.330Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.8.1"},"log":{"offset":846,"file":{"path":"/yw/log/1.log"}},"message":"[2020-07-29 22:12:49,689] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 37 milliseconds. (kafka.coordinator.group.GroupMetadataManager)","input":{"type":"log"},"host":{"architecture":"x86_64","os":{"version":"7 (Core)","family":"redhat","name":"CentOS Linux","kernel":"3.10.0-1062.el7.x86_64","codename":"Core","platform":"centos"},"id":"17946cedccdd442b845d0cfa8693cc71","containerized":false,"ip":["10.46.2.112","fe80::2959:f9cc:2a94:6ddc","192.168.122.1"],"mac":["00:0c:29:4e:57:2e","52:54:00:56:ed:eb","52:54:00:56:ed:eb"],"name":"localhost.localdomain","hostname":"localhost.localdomain"},"agent":{"id":"401bee7f-115f-447d-a1e5-a6b6c57e21a1","name":"localhost.localdomain","type":"filebeat","version":"7.8.1","hostname":"localhost.localdomain","ephemeral_id":"3a381097-c3fe-4339-b92b-fff537b9a9f2"},"ecs":{"version":"1.5.0"}}
{"@timestamp":"2020-07-30T02:43:17.330Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.8.1"},"host":{"architecture":"x86_64","name":"localhost.localdomain","os":{"family":"redhat","name":"CentOS Linux","kernel":"3.10.0-1062.el7.x86_64","codename":"Core","platform":"centos","version":"7 (Core)"},"id":"17946cedccdd442b845d0cfa8693cc71","containerized":false,"ip":["10.46.2.112","fe80::2959:f9cc:2a94:6ddc","192.168.122.1"],"mac":["00:0c:29:4e:57:2e","52:54:00:56:ed:eb","52:54:00:56:ed:eb"],"hostname":"localhost.localdomain"},"message":"[2020-07-29 22:22:49,645] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)","log":{"offset":1004,"file":{"path":"/yw/log/1.log"}},"input":{"type":"log"},"agent":{"id":"401bee7f-115f-447d-a1e5-a6b6c57e21a1","name":"localhost.localdomain","type":"filebeat","version":"7.8.1","hostname":"localhost.localdomain","ephemeral_id":"3a381097-c3fe-4339-b92b-fff537b9a9f2"},"ecs":{"version":"1.5.0"}}
{"@timestamp":"2020-07-30T02:43:17.330Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.8.1"},"host":{"containerized":false,"ip":["10.46.2.112","fe80::2959:f9cc:2a94:6ddc","192.168.122.1"],"name":"localhost.localdomain","mac":["00:0c:29:4e:57:2e","52:54:00:56:ed:eb","52:54:00:56:ed:eb"],"hostname":"localhost.localdomain","architecture":"x86_64","os":{"family":"redhat","name":"CentOS Linux","kernel":"3.10.0-1062.el7.x86_64","codename":"Core","platform":"centos","version":"7 (Core)"},"id":"17946cedccdd442b845d0cfa8693cc71"},"agent":{"ephemeral_id":"3a381097-c3fe-4339-b92b-fff537b9a9f2","id":"401bee7f-115f-447d-a1e5-a6b6c57e21a1","name":"localhost.localdomain","type":"filebeat","version":"7.8.1","hostname":"localhost.localdomain"},"message":"[2020-07-29 22:32:49,646] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)","log":{"offset":1161,"file":{"path":"/yw/log/1.log"}},"input":{"type":"log"},"ecs":{"version":"1.5.0"}}
{"@timestamp":"2020-07-30T02:43:17.330Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.8.1"},"host":{"id":"17946cedccdd442b845d0cfa8693cc71","name":"localhost.localdomain","containerized":false,"ip":["10.46.2.112","fe80::2959:f9cc:2a94:6ddc","192.168.122.1"],"mac":["00:0c:29:4e:57:2e","52:54:00:56:ed:eb","52:54:00:56:ed:eb"],"hostname":"localhost.localdomain","architecture":"x86_64","os":{"version":"7 (Core)","family":"redhat","name":"CentOS Linux","kernel":"3.10.0-1062.el7.x86_64","codename":"Core","platform":"centos"}},"agent":{"hostname":"localhost.localdomain","ephemeral_id":"3a381097-c3fe-4339-b92b-fff537b9a9f2","id":"401bee7f-115f-447d-a1e5-a6b6c57e21a1","name":"localhost.localdomain","type":"filebeat","version":"7.8.1"},"message":"[2020-07-29 22:42:49,646] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)","log":{"offset":1318,"file":{"path":"/yw/log/1.log"}},"input":{"type":"log"},"ecs":{"version":"1.5.0"}}

發現已經消費到了,格式為json,日誌具體內容在message中

接下來看一下 es 中有沒有數據,在Discover中,找到之前設置好的索引,我們搜索message中帶有 INFO 級別的日誌

我們想在 INFO 日誌中找到那條 37 milliseconds 數據, 只需要添加相關條件即可,如下圖所示

至此,簡單的一個示例以及完成了

來源:

https://www.toutiao.com/i6857318767244870157/

「IT大咖說」歡迎廣大技術人員投稿,投稿郵箱:aliang@itdks.com


 IT大咖說  |  關於版權 

由「IT大咖說(ID:itdakashuo)」原創的文章,轉載時請註明作者、出處及微信公眾號。投稿、約稿、轉載請加微信:ITDKS10(備註:投稿),茉莉小姐姐會及時與您聯繫!

感謝您對IT大咖說的熱心支持!

相關焦點

  • elk的簡單搭建
    引入消息隊列,        均衡了網絡傳輸,從而降低了網絡閉塞,尤其是丟失數據的可能性,但依然存在 Logstash 佔用系統資源過多的問題 工作流程:        Filebeat採集—> logstash轉發到kafka—>  logstash處理從kafka緩存的數據進行分析—>  輸出到es—>  顯示在kibana三、linux 下搭建
  • Docker 入門到實戰教程(十二)ELK+Filebeat搭建日誌分析系統
    Packetbeat(搜集網絡流量數據) Topbeat(搜集系統、進程和文件系統級別的 CPU 和內存使用情況等數據) Filebeat(搜集文件數據) Winlogbeat(搜集 Windows 事件日誌數據)先看下效果file下面開始搭建
  • ELK 原理與實踐詳細介紹
    1.3、完整日誌系統基本特徵傳輸:能夠穩定的把日誌數據解析過濾並傳輸到存儲系統二、ELK架構分析2.1、beats+elasticsearch+kibana模式如上圖所示,該ELK框架由beats(日誌分析我們通常使用filebeat)+elasticsearch+kibana構成,這個框架比較簡單,入門級的框架。
  • ELK 是什麼?一文全面了解 ELK Stack 企業級日誌平臺
    ;ES搜索快,基本上是秒級的;架構相對簡單,橫向擴展方便。日誌採集層:logstash或 filebeat消息代理層:redis或kafka。生產環境拓撲圖:三、ELK Stackelk[elk@module-kzkt-02 logs]$ cd /opt/elasticsearch-5.3.1/bin/[elk@module-kzkt-02 bin]$ .
  • ELK 日誌收集系統
    ELK=ElasticSeach+Logstash+Kibana,本項目採用的是 ElasticSeach + Logstash + kafka + Kibana各個組件介紹2.1 LogstashLogstash 主要用於收集伺服器日誌,它是一個開源數據收集引擎,具有實時管道功能。
  • 使用Docker搭建ELK日誌系統
    現在我們假定需要收集兩個目錄下的日誌文件:/home/user/elk/customer/*.log,/home/user/elk/order/*.log:customer.log:2017-12-26 10:05:56,476 INFO ConfigClusterResolver:43 - Resolving
  • 記錄一下Elasticsearch+Filebeat+Kibana搭建過程(單節點)
    單節點EFK搭建使用版本:Elasticsearch6.3.2,Filebeat6.3.2,Kibana6.3.2因為官網下載速度太慢,可以使用以下連結下載:Elasticsearch6.3.2: https://pan.baidu.com/s/1i-dbRzQshpro8RQqM9cqeQ 提取碼:1234Filebeat6.3.2
  • ELK Stack 企業級日誌平臺
    ;ES搜索快,基本上是秒級的;架構相對簡單,橫向擴展方便。Topbeat (搜集系統,進程和文件系統級別的CPU和內存使用情況等數據)Filebeat(搜集文件數據)Winlogbeat(搜集windows事件日誌數據)Redis和kafka
  • 搞懂 ELK 並不是一件特別難的事
    其中Filebeat也能通過module對日誌進行簡單的解析和索引。並查看預建的Kibana儀錶板。該框架適合簡單的日誌數據,一般可以用來玩玩,生產環境建議接入Logstash。Beats + Logstash + Elasticsearch + Kibana模式
  • 搞懂ELK並不是一件特別難的事
    其中Filebeat也能通過module對日誌進行簡單的解析和索引。並查看預建的Kibana儀錶板。該框架適合簡單的日誌數據,一般可以用來玩玩,生產環境建議接入Logstash。curl-L-Ohttps://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.7.0-linux-x86_64.tar.gztar -xzvf filebeat-7.7.0-linux-x86_64.tar.gz配置示例文件:filebeat.reference.yml(包含所有未過時的配置項
  • Filebeat | 使用Filebeat收集日誌
    00:00:00 /usr/share/filebeat/bin/filebeat-god -r / -n -p /var/run/filebeat.pid -- /usr/share/filebeat/bin/filebeat -c /etc/filebeat/filebeat.yml -path.home /usr/share/filebeat -path.config /etc/filebeat
  • ELK日誌平臺:安裝配置搭建(1)
    ELK平臺搭建系統環境System: Centos 7.3ElasticSearch: 7.xLogstash: 7.xKibana: 7.xFilebeat: 7.xJava: openjdk version  "1.8"ELK下載:https://www.elastic.co
  • 聊聊 Filebeat 收集日誌那些事兒
    我們接下來看一下 libbeat 到底都做了些什麼:libbeat 提供了 publisher 組件,用於對接 input;收集到的數據在進入到 libbeat 後,首先會經過各種 processor 的加工處理,比如過濾添加欄位,多行合併等等;input 組件通過 publisher 組件將收集到的數據推送到 publisher 內部的隊列;libbeat
  • Filebeat 收集日誌的那些事兒
    input僅需要做兩件事:Filebeat本身的使用很簡單,我們只需要按需寫好相應的input和output配置就好了。下面我們以一個收集磁碟日誌文件到Kafka集群的例子來講一下。1.在filebeat.yml中配置kafka output:output.kafka: hosts: ["xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx:9092", "xxx.xxx.xxx.xxx
  • 一文輕鬆搞定ELK日誌實時採集分析平臺
    Kibana 也是一個開源和免費的工具,Kibana可以為 Logstash 和 ElasticSearch 提供的日誌分析友好的 Web 界面,可以幫助匯總、分析和搜索重要數據日誌;最終將數據以直觀的、圖形化的方式展示出來準備工作虛擬機搭建便於測試,環境的搭建參考 基於
  • Docker怎麼部署ELK?
    集成鏡像,地址:https://hub.docker.com/r/sebp/elk/tags[root@centos-mq ~]# docker pull sebp/elk:660註:660為elk版本2、啟動[root@centos-mq ~]# echo
  • Kubernetes實戰之部署ELK Stack收集平臺日誌
    Logstash是ELK的中央數據流引擎,用於從不同目標(文件/數據存儲/MQ)收集的不同格式數據,經過過濾後支持輸出到不同目的地(文件/MQ/redis/elasticsearch/kafka等)。Kibana可以將elasticsearch的數據通過友好的頁面展示出來,提供實時分析的功能。通過上面對ELK簡單的介紹,我們知道了ELK字面意義包含的每個開源框架的功能。
  • Docker實戰之ELK
    docker 搭建 ELK首先我們搭建 ELK 環境, 然後再引入緩存 Kafka, 數據流為:log-->logstash-agent-->kafka-->logstash-->es-->kibana
  • 開源日誌管理方案 ELK 和 EFK 的區別
    首先拉取一下 sebp/elk 這個集成鏡像,這裡選擇的 tag 版本是 lates:docker pull sebp/elk:latest註:由於其包含了整個 ELK 方案,所以需要耐心等待一會。通過以下命令使用 sebp/elk 這個集成鏡像啟動運行 ELK:docker run -it -d --name elk \    -p 5601:5601 \    -p 9200:9200 \    -p 5044:5044 \    sebp/elk:latest運行完成之後就可以先訪問一下 http://192.168.4.31
  • 一文讀懂開源日誌管理方案 ELK 和 EFK 的區別
    首先拉取一下 sebp/elk 這個集成鏡像,這裡選擇的 tag 版本是 latest:docker pull sebp/elk:latest註:由於其包含了整個 ELK 方案,所以需要耐心等待一會。通過以下命令使用 sebp/elk 這個集成鏡像啟動運行 ELK:docker run -it -d --name elk \    -p 5601:5601 \    -p 9200:9200 \    -p 5044:5044 \    sebp