首先我們搭建 ELK 環境, 然後再引入緩存 Kafka, 數據流為:
log-->logstash-agent-->kafka-->logstash-->es-->kibana首先全局了解下文件結構
.
├── docker-compose.yml
├── kafka
│ ├── data
│ │ └── kafka-logs-e07760a64ec8
│ │ ├── cleaner-offset-checkpoint
│ │ ├── log-start-offset-checkpoint
│ │ ├── meta.properties
│ │ ├── recovery-point-offset-checkpoint
│ │ ├── replication-offset-checkpoint
│ │ └── test-0
│ │ ├── 00000000000000000000.index
│ │ ├── 00000000000000000000.log
│ │ ├── 00000000000000000000.timeindex
│ │ └── leader-epoch-checkpoint
│ └── docker.sock
├── kibana
│ └── kibana.yml
├── logstash
│ └── logstash.conf
└── zookeeper
├── data
└── datalogdocker-compose-elk.yml
version: "3"
services:
elasticsearch:
image: elasticsearch:7.7.0
container_name: elasticsearch
hostname: elasticsearch
environment:
- "discovery.type=single-node"
ports:
- 9200:9200
- 9300:9300
networks:
- elknetwork
kibana:
image: kibana:7.7.0
container_name: kibana
hostname: kibana
ports:
- 5601:5601
volumes:
- ./kibana:/usr/share/kibana/config/
links:
- elasticsearch:elasticsearch
depends_on:
- elasticsearch
networks:
- elknetwork
logstash:
image: logstash:7.7.0
container_name: logstash
hostname: logstash
ports:
- 9600:9600
- 8089:8089
volumes:
- ./logstash:/usr/share/logstash/pipeline/
links:
- elasticsearch:elasticsearch
depends_on:
- elasticsearch
networks:
- elknetwork
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
restart: unless-stopped
ports:
- "2181:2181"
volumes:
- ./zookeeper/data:/data
- ./zookeeper/datalog:/datalog
networks:
- elknetwork
kafka:
image: wurstmeister/kafka
container_name: kafka
restart: unless-stopped
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.124.5 # 用ifconfig查詢,或直接填寫kafka
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.124.5:9092 ## 修改:宿主機IP
volumes:
- "./kafka/docker.sock:/var/run/docker.sock"
- "./kafka/data/:/kafka"
depends_on:
- zookeeper
networks:
- elknetwork
networks:
elknetwork:
driver: bridge注意: logstash 沒數據源時會主動停止進程, 勿慌
kibana.yml
server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://elasticsearch:9200" ]
monitoring.ui.container.elasticsearch.enabled: true
i18n.locale: "zh-CN"logstash.conf
input {
tcp {
port => 8089
}
}
output {
elasticsearch { hosts => ["elasticsearch:9200"] }
}docker 中 logstash 默認配置(/usr/share/logstash/config/pipelines.yml)
- pipeline.id: main
path.config: "/usr/share/logstash/pipeline"docker 中 logstash 默認配置(/usr/share/logstash/config/logstash.yml)
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://elasticsearch:9200" ]熟悉 docker 中默認配置
# 進入docker容器
docker exec -it logstash /bin/bash
# 查看文件目錄
ls
# 查看docker中配置文件
more config/logstash.yaml
logstash 中 /usr/share/logstash/config/pipelines.yml 將配置文件指向 /usr/share/logstash/pipeline 目錄下, 該目錄下是我們真實的配置文件目錄kibana 中 /usr/share/kibana/config/kibana.yml 可以修改為中文log4j2 通過 tcp 將日誌輸出到 logstashpom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.2</version>
</dependency>log4j2-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="OFF" monitorInterval="60">
<Appenders>
<!-- Console 日誌,只輸出 level 及以上級別的信息,並配置各級別日誌輸出顏色 -->
<Console name="Console" target="SYSTEM_OUT">
<!--控制臺只輸出level及以上級別的信息(onMatch),其他的直接拒絕(onMismatch)-->
<ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="%highlight{%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %M() @%L - %msg%n}{FATAL=Bright Red, ERROR=Bright Magenta, WARN=Bright Yellow, INFO=Bright Green, DEBUG=Bright Cyan, TRACE=Bright White}"/>
</Console>
<!-- socket 日誌,輸出日誌到 Logstash 中做日誌收集 -->
<Socket name="Socket" host="127.0.0.1" port="8089" protocol="TCP">
<JsonLayout properties="true" compact="true" eventEol="true" />
<PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %M() @%L - %msg%n"/>
</Socket>
</Appenders>
<Loggers>
<Root level="INFO">
<property name="hostName">cuishiying</property>
<property name="applicationName">elk-demo</property>
<appender-ref ref="Console"/>
<appender-ref ref="Socket"/>
</Root>
</Loggers>
</Configuration>測試
@RestController
@RequestMapping("/test")
public class LogController {
private Logger logger = LogManager.getLogger(LogController.class);
// http://127.0.0.1:8080/test/log4j2
@RequestMapping(value = "/log4j2", method = RequestMethod.GET)
public String testLog(){
try {
MDC.put("traceid", String.valueOf(System.currentTimeMillis()));
logger.info("Hello 這是 info message. 信息");
logger.error("Hello 這是 error message. 報警");
logger.warn("Hello 這是 warn message. 警告");
logger.debug("Hello 這是 debug message. 調試");
List<String> list = new ArrayList<>();
System.out.println(list.get(2));
} catch (Exception e) {
logger.error("testLog", e);
} finally {
MDC.clear();
}
return "";
}
}項目啟動後,點擊 management 可以看到:logstash 會按默認規則自動在 ES 中創建索引。我們需要手動創建 kibana 索引,然後在 discover 中即可看到日誌
用 socket 方式將日誌傳輸給 logstash,如果把 logstash 停掉然後再啟動,日誌就無法繼續傳輸了,也就是說 socket 無法自動重連,這在生產環境中,當然是個隱患。所以生產環境一般會用Logstash-gelf
<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Properties>
<Property name="LOG_PATTERN">{"logger": "%logger", "level": "%level", "msg": "%message"}%n</Property>
</Properties>
<Appenders>
<Console name="Console" target="SYSTEM_OUT" follow="true">
<PatternLayout pattern="${LOG_PATTERN}"/>
</Console>
<Gelf name="logstash-gelf" host="udp:127.0.0.1" port="4567" version="1.1" ignoreExceptions="true">
<Field name="timestamp" pattern="%d{yyyy-MM-dd HH:mm:ss.SSS}" />
<Field name="logger" pattern="%logger" />
<Field name="level" pattern="%level" />
<Field name="simpleClassName" pattern="%C{1}" />
<Field name="className" pattern="%C" />
<Field name="server" pattern="%host" />
</Gelf>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="Console"/>
<AppenderRef ref="logstash-gelf" />
</Root>
</Loggers>
</Configuration>
log4j2 將日誌輸出到 kafkalog4j2-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="OFF" monitorInterval="60">
<Properties>
<property name="log_pattern_console">%highlight{%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %M() @%L - %msg%n}{FATAL=Bright Red, ERROR=Bright Magenta, WARN=Bright Yellow, INFO=Bright Green, DEBUG=Bright Cyan, TRACE=Bright White}</property>
<!-- 日誌文件默認輸出格式;%X{traceid}:鏈路id;%C:大寫,類名;%M:方法名;%l:行號,影響性能;%m:信息;%n:換行 -->
<property name="log_pattern">%d{yyyy-MM-dd HH:mm:ss.SSS} [%X{traceid}] [%-5level] %l - %m%n</property>
</Properties>
<Appenders>
<!-- Console 日誌,只輸出 level 及以上級別的信息,並配置各級別日誌輸出顏色 -->
<Console name="Console" target="SYSTEM_OUT">
<!--控制臺只輸出level及以上級別的信息(onMatch),其他的直接拒絕(onMismatch)-->
<ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="${log_pattern_console}"/>
</Console>
<!-- kafka 日誌,輸出日誌到 Logstash 中做日誌收集 -->
<Kafka name="Kafka" topic="test">
<PatternLayout pattern="${log_pattern}"/>
<Property name="bootstrap.servers">localhost:9092</Property>
</Kafka>
</Appenders>
<Loggers>
<Root level="INFO">
<property name="hostName">cuishiying</property>
<property name="app_name">elk-demo</property>
<appender-ref ref="Console"/>
<!--<appender-ref ref="Socket"/>-->
<AppenderRef ref="Kafka"/>
</Root>
</Loggers>
</Configuration>此時,日誌可以直接輸出到 kafka 中
log4j2 將日誌通過 kafka 輸出到 elk 中修改 logstash.conf, 將輸入源設置為 kafka, 輸出設置為 es, 以 kafka 的 topic 和日期創建 es 索引
logstash.conf
input {
kafka{
bootstrap_servers => ["kafka:9092"]
topics_pattern => "test*"
group_id => "logstash-group"
consumer_threads => 5
codec => "json"
decorate_events => true #此屬性會將當前topic、offset、group、partition等信息也帶到message中
auto_offset_reset => "latest"
}
}
filter {
json {
source => "message"
}
mutate{
remove_field => "@version"
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"
manage_template => false # 取消logstash自定義模板功能,進而強制使用es的內置模板
}
stdout {
codec => rubydebug {metadata => true} #logstash控制臺輸出日誌和@metadata信息
}
}重啟 docker
docker-compose restart logstash訪問 http://localhost:5601, 可以看到日誌已正常輸出到 ELK 中, 包括異常棧。
日誌鏈路追蹤上邊的 MDC 放入 traceid 的操作可以通過過濾器統一放入。
package com.easyliao.auth.common.filter;
import com.easyliao.auth.common.utils.IdUtils;
import com.easyliao.auth.common.utils.IpUtil;
import com.easyliao.auth.common.utils.RequestUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.MDC;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebFilter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
@Slf4j
@WebFilter(urlPatterns = "/*", filterName = "logFilter")
@Order(value = Ordered.HIGHEST_PRECEDENCE)
@Component
public class LogFilter extends OncePerRequestFilter {
private final String TRACE_ID = "traceid";
private final String IP = "ip";
private final String DEFAULT_TRACE_ID = "0";
@Override
protected void doFilterInternal(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, FilterChain filterChain) throws ServletException, IOException {
try {
// traceId初始化
initTraceId(httpServletRequest);
// 初始化ip
initIp(httpServletRequest);
// 入口信息
log(httpServletRequest, httpServletResponse);
// 執行後續過濾器
filterChain.doFilter(httpServletRequest,httpServletResponse);
} finally {
afterLog(httpServletRequest, httpServletResponse);
}
}
private void log(HttpServletRequest request, HttpServletResponse response){
if (!log.isInfoEnabled()) {
return;
}
log.info("\n請求地址: [{}] \n請求參數: [{}]",
request.getRequestURL().toString(),
RequestUtils.getFormParams(request)
);
}
private void afterLog(HttpServletRequest req, HttpServletResponse response) {
MDC.remove(TRACE_ID);
MDC.remove(IP);
}
private void initIp(HttpServletRequest servletRequest) {
MDC.put(IP, IpUtil.getIpAddr(servletRequest));
}
private void initTraceId(HttpServletRequest request) {
//嘗試獲取http請求中的traceId
String traceId = request.getParameter(TRACE_ID);
//如果當前traceId為空或者為默認traceId,則生成新的traceId
if (StringUtils.isBlank(traceId) || this.defaultTraceId(traceId)){
traceId = this.genTraceId();
}
//設置traceId
this.setTraceId(traceId);
}
public Boolean defaultTraceId(String traceId) {
return DEFAULT_TRACE_ID.equals(traceId);
}
public String genTraceId() {
return IdUtils.uuid();
}
public void setTraceId(String traceId) {
//如果參數為空,則設置默認traceId
traceId = StringUtils.isBlank(traceId) ? DEFAULT_TRACE_ID : traceId;
//將traceId放到MDC中
MDC.put(TRACE_ID, traceId);
}
public String getTraceId() {
//獲取
String traceId = MDC.get(TRACE_ID);
//如果traceId為空,則返回默認值
return StringUtils.isBlank(traceId) ? DEFAULT_TRACE_ID : traceId;
}
}
logback+kafka+elkpom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>elk-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>elk-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.2.0-RC2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<!-- scan:當此屬性設置為true時,配置文件如果發生改變,將會被重新加載,默認值為true。scanPeriod:設置監測配置文件是否有修改的時間間隔,如果沒有給出時間單位,
默認單位是毫秒當scan為true時,此屬性生效。默認的時間間隔為1分鐘。debug:當此屬性設置為true時,將列印出logback內部日誌信息,實時查看logback運行狀態。
默認值為false。 -->
<!-- <configuration scan="false" scanPeriod="60 seconds" debug="false"> -->
<configuration>
<!-- 日誌文件默認輸出格式;%X{traceid}:鏈路id;%C:大寫,類名;%M:方法名;%l:行號,影響性能;%m:信息;%n:換行 -->
<property name="log_pattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%X{traceid}] [%-5level] %L - %m%n"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${log_pattern}</pattern>
</encoder>
</appender>
<!-- This is the kafkaAppender -->
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder>
<pattern>${log_pattern}</pattern>
</encoder>
<topic>test</topic>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy" />
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy" />
<!-- Optional parameter to use a fixed partition -->
<!-- <partition>0</partition> -->
<!-- Optional parameter to include log timestamps into the kafka message -->
<!-- <appendTimestamp>true</appendTimestamp> -->
<!-- each <producerConfig> translates to regular kafka-client config (format: key=value) -->
<!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs -->
<!-- bootstrap.servers is the only mandatory producerConfig -->
<producerConfig>bootstrap.servers=localhost:9092</producerConfig>
<!-- this is the fallback appender if kafka is not available. -->
<appender-ref ref="STDOUT" />
</appender>
<root level="info">
<appender-ref ref="STDOUT" />
<appender-ref ref="kafkaAppender" />
</root>
</configuration>application.properties
logging.config=classpath:logback-spring.xml
最後本文到此結束,感謝閱讀。如果您覺得不錯,請關注公眾號【當我遇上你】,您的支持是我寫作的最大動力。
參考https://medium.com/@harisshafiq08/elk-stack-deployment-through-docker-compose-98ce40ff2fb6https://2much2learn.com/centralized-logging-with-kafka-and-elk-stack/https://yangbingdong.com/2018/spring-boot-docker-elk/https://github.com/Haris3243/docker-elkstack