「日誌架構」ELK Stack + Kafka 端到端練習

2020-08-28 首席架構師智庫

在前一章中,我們已經學習了如何從頭到尾地配置ELK堆棧。這樣的配置能夠支持大多數用例。然而,對於一個無限擴展的生產環境,瓶頸仍然存在:

  • Logstash需要使用管道和過濾器處理日誌,這需要花費大量的時間,如果日誌爆發,可能會成為瓶頸;
  • 彈性搜索需要對日誌進行索引,這也消耗了時間,當日誌爆發時,它就成為了一個瓶頸。

上面提到的瓶頸可以通過添加更多的Logstash部署和縮放Elasticsearch集群來平滑,當然,也可以通過在中間引入緩存層來平滑,就像所有其他的IT解決方案一樣(比如在資料庫訪問路徑的中間引入Redis)。利用緩存層最流行的解決方案之一是將Kafka集成到ELK堆棧中。我們將在本章討論如何建立這樣的環境。

架構

當Kafka被用作ELK棧中的緩存層時,將使用如下架構:


這方面的細節可以從部署和擴展Logstash中找到

演示環境

基於以上介紹的知識,我們的演示環境將構建如下:

The detailed enviroment is as below:

  • logstash69167/69168 (hostnames: e2e-l4-0690-167/168): receive logs from syslog, filebeat, etc. and forward/produce logs to Kafka topics;
  • kafka69155/156/157 (hostnames: e2e-l4-0690-155/156/157): kafka cluster
    • zookeeper will also be installed on these 3 x nodes;
    • kafka manager will be installed on kafka69155;
  • logstash69158/69159 (hostnames: e2e-l4-0690-158/159): consume logs from kafka topics, process logs with pipelines, and send logs to Elasticsearch;
  • elasticsearch69152/69153/69154 (hostnames: e2e-l4-0690-152/153/154): Elasticsearch cluster
    • Kibana will be installed on elasticsearch69152
  • Data sources such as syslog, filebeat, etc. follow the same configuration as when Kafka is not used, hence we ignore their configuration in this chapter.

部署

Elasticsearch部署

安裝過程已經由本文檔記錄,請參閱前面的章節。在本節中,我們將只列出配置和命令。

  1. Install Elasticsearch on elasticsearch69152/69153/69154;
  2. Configs on each node (/etc/elasticsearch/elasticsearch.yml):
  3. elasticsearch69152cluster.name: edc-elasticsearch
    node.name: e2e-l4-0690-152
    path.data: /var/lib/elasticsearch
    path.logs: /var/log/elasticsearch
    network.host: 0.0.0.0
    discovery.seed_hosts: [&34;, &34;, &34;]
    cluster.initial_master_nodes: [&34;, &34;, &34;]
  4. elasticsearch69153cluster.name: edc-elasticsearch
    node.name: e2e-l4-0690-153
    path.data: /var/lib/elasticsearch
    path.logs: /var/log/elasticsearch
    network.host: 0.0.0.0
    discovery.seed_hosts: [&34;, &34;, &34;]
    cluster.initial_master_nodes: [&34;, &34;, &34;]
  5. elasticsearch69154cluster.name: edc-elasticsearch
    node.name: e2e-l4-0690-154
    path.data: /var/lib/elasticsearch
    path.logs: /var/log/elasticsearch
    network.host: 0.0.0.0
    discovery.seed_hosts: [&34;, &34;, &34;]
    cluster.initial_master_nodes: [&34;, &34;, &34;]
  6. Start Elasticsearch service on each node:systemctl disable firewalld
    systemctl enable elasticsearch
    systemctl start elasticsearch
  7. Verify (on any node): 3 x alive nodes should exist and one master node is elected successfully[root@e2e-l4-0690-152]39;http://localhost:9200/_cluster/state?pretty&34;0.0.0.0&34;e2e-l4-0690-152&34;http://e2e-l4-0690-152:9200&34;http://e2e-l4-0690-153:9200&34;http://e2e-l4-0690-154:9200& kafka69155
    echo 2 > /var/lib/zookeeper/myid kafka69157
  8. Start Zookeeper on all nodes:./bin/zkServer.sh start
    ./bin/zkServer.sh status
  9. Connect to Zooper for verification:./bin/zkCli.sh -server 10.226.69.155:2181,10.226.69.156:2181,10.226.69.157:2181

Kafka 部署

A Kafka cluster will be deployed on kafka69155/69156/69157.

  1. Kafka does not need any installation, downloading and decompressing a tarball is enough. Please refer to Kafka Quickstart for reference;
  2. The Kafka cluster will run on kafka69155/156/157 where a Zookeeper cluster is already running. To enable the Kafka cluster, configure each node as below(config/server.properties):
  3. kafka69155:broker.id=0
    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://10.226.69.155:9092
    zookeeper.connect=10.226.69.155:2181,10.226.69.156:2181:10.226.69.157:2181
  4. kafka69156:broker.id=1
    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://10.226.69.156:9092
    zookeeper.connect=10.226.69.155:2181,10.226.69.156:2181:10.226.69.157:2181
  5. kafka69157:broker.id=1
    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://10.226.69.157:9092
    zookeeper.connect=10.226.69.155:2181,10.226.69.156:2181:10.226.69.157:2181
  6. Start Kafka on all nodes:./bin/kafka-server-start.sh -daemon config/server.properties

Once the Kafka cluster is running, we can go ahead configuring Logstash. When it is required to make changes to the Kafka cluster, we should shut down the cluster gracefully as below, then make changes and start the cluster again:

./bin/kafka-server-stop.sh

Kafka Manager 部署

可以使用CLI命令管理Kafka集群。然而,它並不是非常方便。Kafka Manager是一個基於web的工具,它使基本的Kafka管理任務變得簡單明了。該工具目前由雅虎維護,並已被重新命名為CMAK (Apache Kafka的集群管理)。無論如何,我們更喜歡稱之為Kafka經理。

The Kafka manager will be deployed on kafka69155.

  1. Download the application from its github repo;
  2. After decompressing the package, change the zookeeper option as below in conf/application.conf:kafka-manager.zkhosts=&34;
  3. Create the app deployment(a zip file will be created):./sbt clean dist
  4. Unzip the newly created zip file (kafka-manager-2.0.0.2.zip in this demo) and start the service:unzip kafka-manager-2.0.0.2.zip
    cd kafka-manager-2.0.0.2
    bin/kafka-manager
  5. The Kafka manager can be accessed from http://10.226.69.155:9000/ after a while;
  6. Click Cluster->Add Cluster and enter below information to manage our Kafka cluster:
  7. Cluster Name: assign a meaningful name for this cluster
  8. Cluster Zookeeper Hosts: 10.226.69.155:2181,10.226.69.156:2181,10.226.69.157:2181
  9. Enable JMX Polling: yes
  10. Done.

Logstash部署

基於我們對演示環境的介紹,我們有兩套Logstash部署:

  • Log Producers: logstash69167/69168Collect logs from data sources (such as syslog, filebeat, etc.) and forward log entries to corresponding Kafka topics. The num. of such Logstash instances can be determined based on the amount of data generated by data sources.Actually, such Logstash instances are separated from each other. In other words, they work as standalone instances and have no knowledge on others.
  • Log Consumers: logstash69158/69159Consume logs from Kafka topics, modify logs based on pipeline definitions and ship modified logs to Elasticsearch.Such Logstash instances have the identical pipeline configurations (except for client_id) and belong to the same Kafka consumer group which load balance each other.

The installation of Logstash has been covered in previous chapters, we won’t cover them again in this chapter, instead, we will focus our effort on the clarification of pipeline definitions when Kafka is leveraged in the middle.

Logstash產生日誌到Kafka

每個Logstash實例負責合併某些指定數據源的日誌。

  • logstash69167: consolidate logs for storage arrays and application solutions based on Linux;
  • logstash69168: consolidate logs for ethernet switches and application solutions based on Windows.
  1. Define pipelines(/etc/logstash/conf.d)
  2. logstash6916734;server&34;filebeat&34;ps&34;rhel&34;host&34;server&34;ps-rhel&34;ps-rhel&34;json&34;10.226.69.155:9092,10.226.69.156:9092,10.226.69.157:9092& /etc/logstash/conf.d/sc_sles.conf
    input {
    beats {
    port => 5044
    tags => [&34;, &34;, &34;, &34;]
    }
    }
    filter {
    mutate {
    rename => [&34;, &34;]
    }
    }
    output {
    kafka {
    id => &34;
    topic_id => &34;
    codec => &34;
    bootstrap_servers => &34;
    }
    }
    34;array&34;syslog&34;sc&34;ps&34;pssc&34;pssc&34;json&34;10.226.69.155:9092,10.226.69.156:9092,10.226.69.157:9092& /etc/logstash/conf.d/unity.conf
    input {
    udp {
    port => 5000
    tags => [&34;, &34;, &34;]
    }
    }
    output {
    kafka {
    id => &34;
    topic_id => &34;
    codec => &34;
    bootstrap_servers => &34;
    }
    }
    34;array&34;syslog&34;xio&34;xio&34;xio&34;json&34;10.226.69.155:9092,10.226.69.156:9092,10.226.69.157:9092& /etc/logstash/conf.d/ethernet_switch.conf
    input {
    udp {
    port => 514
    tags => [&34;, &34;, &34;, &34;]
    }
    }
    output {
    kafka {
    id => &34;
    topic_id => &34;
    codec => &34;
    bootstrap_servers => &34;
    }
    }
    34;server&34;winlogbeat&34;vnx&34;windows&34;exchange&34;host&34;server&34;vnx-exchange&34;vnx-exchange&34;json&34;10.226.69.155:9092,10.226.69.156:9092,10.226.69.157:9092& /etc/logstash/conf.d/vnx_mssql.conf
    input {
    beats {
    port => 5045
    tags => [&34;, &34;, &34;, &34;, &34;]
    }
    }
    filter {
    mutate {
    rename => [&34;, &34;]
    }
    }
    output {
    kafka {
    id => &34;
    topic_id => &34;
    codec => &34;
    bootstrap_servers => &34;
    }
    }
  3. Enable pipelines (/etc/logstash/pipelines.yml):
  4. logstash69167:- pipeline.id: ps_rhel
    path.config: &34;
    - pipeline.id: sc_sles
    path.config: &34;
    - pipeline.id: pssc
    path.config: &34;
    - pipeline.id: unity
    path.config: &34;
    - pipeline.id: xio
    path.config: &34;
  5. logstash69168:- pipeline.id: ethernet_switch
    path.config: &34;
    - pipeline.id: vnx_exchange
    path.config: &34;
    - pipeline.id: vnx_mssql
    path.config: &34;
  6. Start Logstash servers on all nodes:systemctl start logstash
  7. Verify topics are successfully created on Kafka:ssh root@kafka69155/156/157
    ./bin/kafka-topics.sh -bootstrap-server &34; --list
  8. Verify logs are sent to Kafka successfully:ssh root@kafka69155/156/157
    ./bin/kafka-console-consumer.sh -bootstrap-server &34; --topic <topic name>

現在,我們已經將Logstash實例配置為Kafka producer。在繼續之前,有必要介紹一些關於使用Kafka作為輸出插件時的管道配置的技巧。

不要為這類Logstash實例的管道定義複雜的過濾器,因為它們可能增加延遲;

  • 在輸入部分添加標籤,以簡化Kibana的日誌搜索/分類工作;
  • 為不同的管道指定不同的id和有意義的名稱;
  • 如果syslog也是設置中的數據源,則將主機欄位重命名為其他有意義的名稱。關於這個問題的解釋,請參考tips章節。

Logstash,它消耗來自Kafka的日誌

我們將為logstash69158/69159配置管道。這兩個Logstash實例具有相同的管道定義(除了client_id之外),並通過利用Kafka的消費者組特性均勻地使用來自Kafka主題的消息。

由於日誌被安全地緩存在Kafka中,所以在將日誌實體發送到Elasticsearch之前,使用管道定義複雜的過濾器來修改日誌實體是正確的。這不會導致瓶頸,因為Kafka中已經有日誌了,唯一的影響是您可能需要等待一段時間才能看到Elasticsearch/Kibana中的日誌。如果查看來自Elasticsearch/Kibana的日誌對時間很敏感,那麼可以添加屬於同一使用者組的更多Logstash實例來平衡處理的負載。

  1. Define pipelines(/etc/logstash/conf.d): client_id should always be set with different values34;logstash69158-array& client_id => &34;
    group_id => &34;
    topics => [&34;, &34;, &34;, &34;, &34;]
    codec => &34;
    bootstrap_servers => &34;
    }
    }
    output {
    elasticsearch {
    hosts => [&34;, &34;, &34;]
    index => &34;
    }
    }
    34;logstash69158-server& client_id => &34;
    group_id => &34;
    topics => [&34;, &34;, &34;, &34;]
    codec => &34;
    bootstrap_servers => &34;
    }
    }
    output {
    elasticsearch {
    hosts => [&34;, &34;, &34;]
    index => &34;
    }
    }
    34;logstash69158-switch& client_id => &34;
    group_id => &34;
    topics => [&34;]
    codec => &34;
    bootstrap_servers => &34;
    }
    }
    output {
    elasticsearch {
    hosts => [&34;, &34;, &34;]
    index => &34;
    }
    }
  2. Enable pipelines on all nodes(/etc/logstash/pipelines.yml):- pipeline.id: kafka_array
    path.config: &34;
    - pipeline.id: kafka_server
    path.config: &34;
    - pipeline.id: kafka_switch
    path.config: &34;
  3. Start logstash on all nodes:systemctl start logstash

配置並啟動Logstash之後,日誌應該能夠發送到Elasticsearch,並可以從Kibana檢查。

現在,我們已經將Logstash實例配置為Kafka使用者。在繼續之前,有必要介紹一些在使用Kafka作為輸入插件時的管道配置技巧。

  • 對於不同Logstash實例上的每個管道,應該始終使用不同的值設置client_id。該欄位用於識別Kafka上的消費者;
  • 對於不同Logstsh實例上的相同管道,group_id應該設置恆等值。這個欄位用於標識Kafka上的消費者組,如果值不同,負載平衡就無法工作。

數據源配置

數據源是伺服器、交換機、陣列等,它們通過beat、syslog等將日誌發送到Logstash。配置它們的步驟與沒有Kafka集成時相同,請參照前一章。

結論

我們已經配置了一個集成了Kafka和ELK堆棧的演示環境。通過集成Kafka,可以提高日誌處理性能(添加緩存層),還可以集成更多潛在的應用程式(使用來自Kafka的日誌消息並執行一些特殊操作,如ML)。

(此處已添加圈子卡片,請到今日頭條客戶端查看)

相關焦點

  • ELK+kafka日誌收集分析系統
    上echo 1 > /data/elk/zk/data/myid# 192.168.0.42上echo 2 > /data/elk/zk/data/myid# 192.168.0.133上echo 3 > /data/elk/zk/data/myid其他兩臺的配置一樣,除了myid不同。
  • ELK + Filebeat + Kafka 分布式日誌管理平臺搭建
    1.3 架構演進ELK缺點:ELK架構,並且Spring Boot應用使用 logstash-logback-encoder 直接發送給 Logstash,缺點就是Logstash是重量級日誌收集server,佔用cpu資源高且內存佔用比較高
  • Kubernetes ELK 日誌收集
    Kubernetes EFK日誌收集Kubernetes日誌收集架構Kubernetes集群本身不提供收集日誌的解決方案,目前基於ELK日誌收集的方案主要有三種在節點運行一個agent收集日誌在Pod中包含一個sidecar容器來收集日誌直接通過應用程式將日誌信息推送到採集後端 (例kafka,es等)節點級別的日誌記錄節點日誌採集 通過在每個節點上運行一個日誌收集的Agent來採集數據,日誌採集agent是一種專用工具,用於將日誌數據推送到統一的後端
  • 詳解Kafka端到端的延遲
    下圖顯示了一條記錄在系統中的路徑,從Kafka生產者到Kafka的Broker節點,副本的複製,以及消費者最終在其主體分區日誌中獲取到具體的消息。無論是clients端還是broker端,SSL加密也是有開銷的。同時由於SSL無法利用Zero Copy特性進行數據傳輸,因為consumer獲取消息時也會增加額外的開銷。雖然這些因素都會影響延遲,但是通常情況下企業內部可能還是需要這種架構上的考慮,因此採用該部署結構進行測試。
  • 微服務架構開發實戰:ElasticStack實現日誌集中化
    日誌託運有一些日誌託運組件可以與其他工具結合起來建立一個端到端的日誌管理解決方案。不同日誌託運工具的功能不同。· Logstash:是一個功能強大的數據管道工具,可用於收集和發送日誌文件。它充當經紀人,提供了一種機制來接受來自不同來源的流數據,並將其匯集到不同的目的地。
  • 集群日誌收集架構ELK
    是一款輕量級的、開源的日誌收集處理框架,它可以方便的把分散的、多樣化的日誌收集起來,並進行自定義的過濾分析處理,然後輸出到指定的位置(如:es)。其實在我們應用伺服器端只需要採集日誌功能就行了,沒有必要logstash其他的功能;所以Filebeat等beat組件就出現了,它們比較小巧,而且不耗資源,也完全夠用。Filebeat是一個輕量級的託運人,用於轉發和集中日誌數據。Filebeat作為代理安裝在伺服器上,監視您指定的日誌文件或位置,收集日誌事件,並將它們轉發到Elasticsearch或 Logstash進行索引。
  • 集群日誌收集架構ELK
    ,它可以方便的把分散的、多樣化的日誌收集起來,並進行自定義的過濾分析處理,然後輸出到指定的位置(如:es)。其實在我們應用伺服器端只需要採集日誌功能就行了,沒有必要logstash其他的功能;所以Filebeat等beat組件就出現了,它們比較小巧,而且不耗資源,也完全夠用。Filebeat是一個輕量級的託運人,用於轉發和集中日誌數據。
  • ELK難?一招教你輕鬆搞定Filebeat快速入門及使用
    Logstash 主要是用來日誌的搜集、分析、過濾日誌的工具,支持大量的數據獲取方式。一般工作方式為c/s架構,client端安裝在需要收集日誌的主機上,server端負責將收到的各節點日誌進行過濾、修改等操作在一併發往elasticsearch上去。
  • 如何在CentOS 7 / Fedora 31/30/29上安裝ELK Stack
    原文地址:https://computingforgeeks.com/how-to-install-elk-stack-on-centos-fedora/作者: Josphat Mutai[1]翻譯:高行行
  • Spring Cloud微服務系統添加ELK+logback可視化日誌分析管理工具
    為Spring Cloud微服務系統添加ELK+logback可視化日誌分析管理工具一、問題起源 微服務架構中,除了Spring Cloud所需的組件,如網關、Eureka註冊中心、配置中心等,還有大量經過業務拆分生成的微服務節點。如何有效地收集匯總各個微服務節點的日誌,對於應對微服務架構的複雜性有很大的幫助。
  • 「事件驅動架構」GoldenGate創建從Oracle到Kafka的CDC事件流(2)
    「事件驅動架構」GoldenGate創建從Oracle到Kafka的CDC事件流(1)數據泵是一個提取過程,它監視一個跟蹤日誌,並(實時地)將任何更改推到另一個由不同的(通常是遠程的)GoldenGate實例管理的跟蹤日誌。對於這個PoC,由GoldenGate (classic)管理的trail log aa將被泵送至GoldenGate管理的trail log bb進行大數據處理。
  • Flutter+Serverless端到端研發架構實踐
    移動端離業務越來越遠,服務端沒有時間做底層領域沉澱。研發架構的演進接下來我們帶著這裡兩個問題回顧前後端研發架構演進的歷史。PC網際網路早期沒有還沒有前後端的概念,此階段單個業務需求通常一個開發人員可以完成研發,前端網頁與後端邏輯都寫在一個工程中。
  • 搭建ELK日誌分析系統
    Logstash 是一個具有實時渠道能力的數據收集引擎,主要用於日誌的收集與解析,並將其存入 ElasticSearch中。架構架構圖>使用說明1.logback整合elk,pom文件引入依賴 <dependency> <groupId>net.logstash.logback</groupId> <artifactId>logstash-logback-encoder
  • 日誌分析平臺ELK之日誌收集器logstash
    前文我們聊解了什麼是elk,elk中的elasticsearch集群相關組件和集群搭建以及es集群常用接口的說明和使用,回顧請查看考https://www.cnblogs.com/qiuhom-1874/p/13758006.html;今天我們來了解下ELK中的日誌收集器logstash;logstash的工作原理類似Linux
  • 「Elastic Stack」 日誌收集部署
    Syslog 日誌收集[success]該模塊使用Ubuntu 12.04、Centos 7和macOS Sierra等作業系統的日誌進行了測試。 此模塊不適用於Windows。1.Nginx模塊使用版本1.10中的日誌進行了測試。
  • 虛擬教育走進英語學習場景,「MageVR」打造「VR端+手機端」學習...
    36氪近期接觸的項目「MageVR」,是北京等賢科技開發的一款虛擬實境學習產品,面向B端和C端提供近千節精品課程和優質的VR互動體驗。 在「MageVR」聯合創始人關天智看來,VR軟體市場目前還沒有一個統一標準,比較混亂。
  • 學界| 谷歌新論文提出預測器架構:端到端的學習與規劃
    選自arxiv.org機器之心編譯參與:Jane W論文:預測器:端到端的學習和規劃(The Predictron: End-To-End Learning and Planning)摘要人工智慧的主要挑戰之一是在有規劃的情況下有效地學習模型。
  • Kafka 架構及原理分析
    設計架構使用場景大數據領域網站行為分析日誌聚合應用監控流式數據處理在線和離線數據分析數據集成消息導入 MaxCompute、OOS、RDS、Hadoop、HBase 等離線數據倉庫流計算集成StreamComputeE-MapReduceSparkStorm集成流計算引擎架構依賴 Zookeeper 實現配置和節點管理
  • Kafka 知識腦圖 - 分布式日誌收集系統
    PartitionHW ( High Watermak ) 高水位 : 所有 ISR 已同步的偏移量,消費者最多只能拉取到此偏移量之前的消息LEO ( Log End Offset ) 日誌結尾 : 下條待寫入消息的偏移量2.
  • 基於OGG 實現Oracle到Kafka增量數據實時同步
    需要實時從OLTP系統中獲取數據變更,實時同步到下遊業務系統。本文基於Oracle OGG,介紹一種將Oracle資料庫的數據實時同步到Kafka消息隊列的方法。數據抽取主要分如下幾種類型:本地抽取從本地資料庫捕獲增量變更數據,寫入到本地Trail文件數據推送(Data Pump)從本地Trail文件讀取數據,推送到目標端。