在前一章中,我們已經學習了如何從頭到尾地配置ELK堆棧。這樣的配置能夠支持大多數用例。然而,對於一個無限擴展的生產環境,瓶頸仍然存在:
- Logstash需要使用管道和過濾器處理日誌,這需要花費大量的時間,如果日誌爆發,可能會成為瓶頸;
- 彈性搜索需要對日誌進行索引,這也消耗了時間,當日誌爆發時,它就成為了一個瓶頸。
上面提到的瓶頸可以通過添加更多的Logstash部署和縮放Elasticsearch集群來平滑,當然,也可以通過在中間引入緩存層來平滑,就像所有其他的IT解決方案一樣(比如在資料庫訪問路徑的中間引入Redis)。利用緩存層最流行的解決方案之一是將Kafka集成到ELK堆棧中。我們將在本章討論如何建立這樣的環境。
架構
當Kafka被用作ELK棧中的緩存層時,將使用如下架構:
這方面的細節可以從部署和擴展Logstash中找到
演示環境
基於以上介紹的知識,我們的演示環境將構建如下:
The detailed enviroment is as below:
- logstash69167/69168 (hostnames: e2e-l4-0690-167/168): receive logs from syslog, filebeat, etc. and forward/produce logs to Kafka topics;
- kafka69155/156/157 (hostnames: e2e-l4-0690-155/156/157): kafka cluster
- zookeeper will also be installed on these 3 x nodes;
- kafka manager will be installed on kafka69155;
- logstash69158/69159 (hostnames: e2e-l4-0690-158/159): consume logs from kafka topics, process logs with pipelines, and send logs to Elasticsearch;
- elasticsearch69152/69153/69154 (hostnames: e2e-l4-0690-152/153/154): Elasticsearch cluster
- Kibana will be installed on elasticsearch69152
- Data sources such as syslog, filebeat, etc. follow the same configuration as when Kafka is not used, hence we ignore their configuration in this chapter.
部署
Elasticsearch部署
安裝過程已經由本文檔記錄,請參閱前面的章節。在本節中,我們將只列出配置和命令。
- Install Elasticsearch on elasticsearch69152/69153/69154;
- Configs on each node (/etc/elasticsearch/elasticsearch.yml):
- elasticsearch69152cluster.name: edc-elasticsearch
node.name: e2e-l4-0690-152
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
discovery.seed_hosts: [&34;, &34;, &34;]
cluster.initial_master_nodes: [&34;, &34;, &34;] - elasticsearch69153cluster.name: edc-elasticsearch
node.name: e2e-l4-0690-153
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
discovery.seed_hosts: [&34;, &34;, &34;]
cluster.initial_master_nodes: [&34;, &34;, &34;] - elasticsearch69154cluster.name: edc-elasticsearch
node.name: e2e-l4-0690-154
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
discovery.seed_hosts: [&34;, &34;, &34;]
cluster.initial_master_nodes: [&34;, &34;, &34;] - Start Elasticsearch service on each node:systemctl disable firewalld
systemctl enable elasticsearch
systemctl start elasticsearch - Verify (on any node): 3 x alive nodes should exist and one master node is elected successfully[root@e2e-l4-0690-152]39;http://localhost:9200/_cluster/state?pretty&34;0.0.0.0&34;e2e-l4-0690-152&34;http://e2e-l4-0690-152:9200&34;http://e2e-l4-0690-153:9200&34;http://e2e-l4-0690-154:9200& kafka69155
echo 2 > /var/lib/zookeeper/myid kafka69157 - Start Zookeeper on all nodes:./bin/zkServer.sh start
./bin/zkServer.sh status - Connect to Zooper for verification:./bin/zkCli.sh -server 10.226.69.155:2181,10.226.69.156:2181,10.226.69.157:2181
Kafka 部署
A Kafka cluster will be deployed on kafka69155/69156/69157.
- Kafka does not need any installation, downloading and decompressing a tarball is enough. Please refer to Kafka Quickstart for reference;
- The Kafka cluster will run on kafka69155/156/157 where a Zookeeper cluster is already running. To enable the Kafka cluster, configure each node as below(config/server.properties):
- kafka69155:broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://10.226.69.155:9092
zookeeper.connect=10.226.69.155:2181,10.226.69.156:2181:10.226.69.157:2181 - kafka69156:broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://10.226.69.156:9092
zookeeper.connect=10.226.69.155:2181,10.226.69.156:2181:10.226.69.157:2181 - kafka69157:broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://10.226.69.157:9092
zookeeper.connect=10.226.69.155:2181,10.226.69.156:2181:10.226.69.157:2181 - Start Kafka on all nodes:./bin/kafka-server-start.sh -daemon config/server.properties
Once the Kafka cluster is running, we can go ahead configuring Logstash. When it is required to make changes to the Kafka cluster, we should shut down the cluster gracefully as below, then make changes and start the cluster again:
./bin/kafka-server-stop.sh
Kafka Manager 部署
可以使用CLI命令管理Kafka集群。然而,它並不是非常方便。Kafka Manager是一個基於web的工具,它使基本的Kafka管理任務變得簡單明了。該工具目前由雅虎維護,並已被重新命名為CMAK (Apache Kafka的集群管理)。無論如何,我們更喜歡稱之為Kafka經理。
The Kafka manager will be deployed on kafka69155.
- Download the application from its github repo;
- After decompressing the package, change the zookeeper option as below in conf/application.conf:kafka-manager.zkhosts=&34;
- Create the app deployment(a zip file will be created):./sbt clean dist
- Unzip the newly created zip file (kafka-manager-2.0.0.2.zip in this demo) and start the service:unzip kafka-manager-2.0.0.2.zip
cd kafka-manager-2.0.0.2
bin/kafka-manager - The Kafka manager can be accessed from http://10.226.69.155:9000/ after a while;
- Click Cluster->Add Cluster and enter below information to manage our Kafka cluster:
- Cluster Name: assign a meaningful name for this cluster
- Cluster Zookeeper Hosts: 10.226.69.155:2181,10.226.69.156:2181,10.226.69.157:2181
- Enable JMX Polling: yes
- Done.
Logstash部署
基於我們對演示環境的介紹,我們有兩套Logstash部署:
- Log Producers: logstash69167/69168Collect logs from data sources (such as syslog, filebeat, etc.) and forward log entries to corresponding Kafka topics. The num. of such Logstash instances can be determined based on the amount of data generated by data sources.Actually, such Logstash instances are separated from each other. In other words, they work as standalone instances and have no knowledge on others.
- Log Consumers: logstash69158/69159Consume logs from Kafka topics, modify logs based on pipeline definitions and ship modified logs to Elasticsearch.Such Logstash instances have the identical pipeline configurations (except for client_id) and belong to the same Kafka consumer group which load balance each other.
The installation of Logstash has been covered in previous chapters, we won’t cover them again in this chapter, instead, we will focus our effort on the clarification of pipeline definitions when Kafka is leveraged in the middle.
Logstash產生日誌到Kafka
每個Logstash實例負責合併某些指定數據源的日誌。
- logstash69167: consolidate logs for storage arrays and application solutions based on Linux;
- logstash69168: consolidate logs for ethernet switches and application solutions based on Windows.
- Define pipelines(/etc/logstash/conf.d)
- logstash6916734;server&34;filebeat&34;ps&34;rhel&34;host&34;server&34;ps-rhel&34;ps-rhel&34;json&34;10.226.69.155:9092,10.226.69.156:9092,10.226.69.157:9092& /etc/logstash/conf.d/sc_sles.conf
input {
beats {
port => 5044
tags => [&34;, &34;, &34;, &34;]
}
}
filter {
mutate {
rename => [&34;, &34;]
}
}
output {
kafka {
id => &34;
topic_id => &34;
codec => &34;
bootstrap_servers => &34;
}
}
34;array&34;syslog&34;sc&34;ps&34;pssc&34;pssc&34;json&34;10.226.69.155:9092,10.226.69.156:9092,10.226.69.157:9092& /etc/logstash/conf.d/unity.conf
input {
udp {
port => 5000
tags => [&34;, &34;, &34;]
}
}
output {
kafka {
id => &34;
topic_id => &34;
codec => &34;
bootstrap_servers => &34;
}
}
34;array&34;syslog&34;xio&34;xio&34;xio&34;json&34;10.226.69.155:9092,10.226.69.156:9092,10.226.69.157:9092& /etc/logstash/conf.d/ethernet_switch.conf
input {
udp {
port => 514
tags => [&34;, &34;, &34;, &34;]
}
}
output {
kafka {
id => &34;
topic_id => &34;
codec => &34;
bootstrap_servers => &34;
}
}
34;server&34;winlogbeat&34;vnx&34;windows&34;exchange&34;host&34;server&34;vnx-exchange&34;vnx-exchange&34;json&34;10.226.69.155:9092,10.226.69.156:9092,10.226.69.157:9092& /etc/logstash/conf.d/vnx_mssql.conf
input {
beats {
port => 5045
tags => [&34;, &34;, &34;, &34;, &34;]
}
}
filter {
mutate {
rename => [&34;, &34;]
}
}
output {
kafka {
id => &34;
topic_id => &34;
codec => &34;
bootstrap_servers => &34;
}
} - Enable pipelines (/etc/logstash/pipelines.yml):
- logstash69167:- pipeline.id: ps_rhel
path.config: &34;
- pipeline.id: sc_sles
path.config: &34;
- pipeline.id: pssc
path.config: &34;
- pipeline.id: unity
path.config: &34;
- pipeline.id: xio
path.config: &34; - logstash69168:- pipeline.id: ethernet_switch
path.config: &34;
- pipeline.id: vnx_exchange
path.config: &34;
- pipeline.id: vnx_mssql
path.config: &34; - Start Logstash servers on all nodes:systemctl start logstash
- Verify topics are successfully created on Kafka:ssh root@kafka69155/156/157
./bin/kafka-topics.sh -bootstrap-server &34; --list - Verify logs are sent to Kafka successfully:ssh root@kafka69155/156/157
./bin/kafka-console-consumer.sh -bootstrap-server &34; --topic <topic name>
現在,我們已經將Logstash實例配置為Kafka producer。在繼續之前,有必要介紹一些關於使用Kafka作為輸出插件時的管道配置的技巧。
不要為這類Logstash實例的管道定義複雜的過濾器,因為它們可能增加延遲;
- 在輸入部分添加標籤,以簡化Kibana的日誌搜索/分類工作;
- 為不同的管道指定不同的id和有意義的名稱;
- 如果syslog也是設置中的數據源,則將主機欄位重命名為其他有意義的名稱。關於這個問題的解釋,請參考tips章節。
Logstash,它消耗來自Kafka的日誌
我們將為logstash69158/69159配置管道。這兩個Logstash實例具有相同的管道定義(除了client_id之外),並通過利用Kafka的消費者組特性均勻地使用來自Kafka主題的消息。
由於日誌被安全地緩存在Kafka中,所以在將日誌實體發送到Elasticsearch之前,使用管道定義複雜的過濾器來修改日誌實體是正確的。這不會導致瓶頸,因為Kafka中已經有日誌了,唯一的影響是您可能需要等待一段時間才能看到Elasticsearch/Kibana中的日誌。如果查看來自Elasticsearch/Kibana的日誌對時間很敏感,那麼可以添加屬於同一使用者組的更多Logstash實例來平衡處理的負載。
- Define pipelines(/etc/logstash/conf.d): client_id should always be set with different values34;logstash69158-array& client_id => &34;
group_id => &34;
topics => [&34;, &34;, &34;, &34;, &34;]
codec => &34;
bootstrap_servers => &34;
}
}
output {
elasticsearch {
hosts => [&34;, &34;, &34;]
index => &34;
}
}
34;logstash69158-server& client_id => &34;
group_id => &34;
topics => [&34;, &34;, &34;, &34;]
codec => &34;
bootstrap_servers => &34;
}
}
output {
elasticsearch {
hosts => [&34;, &34;, &34;]
index => &34;
}
}
34;logstash69158-switch& client_id => &34;
group_id => &34;
topics => [&34;]
codec => &34;
bootstrap_servers => &34;
}
}
output {
elasticsearch {
hosts => [&34;, &34;, &34;]
index => &34;
}
} - Enable pipelines on all nodes(/etc/logstash/pipelines.yml):- pipeline.id: kafka_array
path.config: &34;
- pipeline.id: kafka_server
path.config: &34;
- pipeline.id: kafka_switch
path.config: &34; - Start logstash on all nodes:systemctl start logstash
配置並啟動Logstash之後,日誌應該能夠發送到Elasticsearch,並可以從Kibana檢查。
現在,我們已經將Logstash實例配置為Kafka使用者。在繼續之前,有必要介紹一些在使用Kafka作為輸入插件時的管道配置技巧。
- 對於不同Logstash實例上的每個管道,應該始終使用不同的值設置client_id。該欄位用於識別Kafka上的消費者;
- 對於不同Logstsh實例上的相同管道,group_id應該設置恆等值。這個欄位用於標識Kafka上的消費者組,如果值不同,負載平衡就無法工作。
數據源配置
數據源是伺服器、交換機、陣列等,它們通過beat、syslog等將日誌發送到Logstash。配置它們的步驟與沒有Kafka集成時相同,請參照前一章。
結論
我們已經配置了一個集成了Kafka和ELK堆棧的演示環境。通過集成Kafka,可以提高日誌處理性能(添加緩存層),還可以集成更多潛在的應用程式(使用來自Kafka的日誌消息並執行一些特殊操作,如ML)。