logstash_output_kafka:Mysql同步Kafka深入詳解

2021-12-30 銘毅天下Elasticsearch

收錄於話題 #死磕Elasticsearch 163個

0、題記

實際業務場景中,會遇到基礎數據存在Mysql中,實時寫入數據量比較大的情景。遷移至kafka是一種比較好的業務選型方案。

而mysql寫入kafka的選型方案有:

方案一:logstash_output_kafka 插件。
方案二:kafka_connector。
方案三:debezium 插件。
方案四:flume。
方案五:其他類似方案。

其中:debezium和flume是基於mysql binlog實現的。

如果需要同步歷史全量數據+實時更新數據,建議使用logstash。

1、logstash同步原理

常用的logstash的插件是:logstash_input_jdbc實現關係型資料庫到Elasticsearch等的同步。

實際上,核心logstash的同步原理的掌握,有助於大家理解類似的各種庫之間的同步。

logstash核心原理:輸入生成事件,過濾器修改它們,輸出將它們發送到其他地方。

logstash核心三部分組成:input、filter、output。

input { }
filter { }
output { }

1.1 input輸入

包含但遠不限於:

jdbc:關係型資料庫:mysql、oracle等。

file:從文件系統上的文件讀取。

syslog:在已知埠514上偵聽syslog消息。

redis:redis消息。beats:處理 Beats發送的事件。

kafka:kafka實時數據流。

1.2 filter過濾器

過濾器是Logstash管道中的中間處理設備。您可以將過濾器與條件組合,以便在事件滿足特定條件時對其執行操作。

可以把它比作數據處理的ETL環節。

一些有用的過濾包括:

grok:解析並構造任意文本。Grok是目前Logstash中將非結構化日誌數據解析為結構化和可查詢內容的最佳方式。有了內置於Logstash的120種模式,您很可能會找到滿足您需求的模式!

mutate:對事件欄位執行常規轉換。您可以重命名,刪除,替換和修改事件中的欄位。

drop:完全刪除事件,例如調試事件。

clone:製作事件的副本,可能添加或刪除欄位。

geoip:添加有關IP位址的地理位置的信息。

1.3 output輸出

輸出是Logstash管道的最後階段。一些常用的輸出包括:

elasticsearch:將事件數據發送到Elasticsearch。

file:將事件數據寫入磁碟上的文件。

kafka:將事件寫入Kafka。

詳細的filter demo參考:http://t.cn/EaAt4zP

2、同步Mysql到kafka配置參考

input {
    jdbc {
      jdbc_connection_string => "jdbc:mysql://192.168.1.12:3306/news_base"
      jdbc_user => "root"
      jdbc_password => "xxxxxxx"
      jdbc_driver_library => "/home/logstash-6.4.0/lib/mysql-connector-java-5.1.47.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      #schedule => "* * * * *"
      statement => "SELECT * from news_info WHERE id > :sql_last_value  order by id"
      use_column_value => true
      tracking_column => "id"        
      tracking_column_type => "numeric"
      record_last_run => true
      last_run_metadata_path => "/home/logstash-6.4.0/sync_data/news_last_run"    

    }

}

filter {
   ruby{
        code => "event.set('gather_time_unix',event.get('gather_time').to_i*1000)"
    }
    ruby{
        code => "event.set('publish_time_unix',event.get('publish_time').to_i*1000)"
    }
  mutate {
    remove_field => [ "@version" ]
    remove_field => [ "@timestamp" ]
    remove_field => [ "gather_time" ]
    remove_field => [ "publish_time" ]
  }
}

 output {
      kafka {
            bootstrap_servers => "192.168.1.13:9092"
            codec => json_lines
            topic_id => "mytopic"

    }
    file {
            codec => json_lines
            path => "/tmp/output_a.log"
    }
 }

以上內容不複雜,不做細講。

注意:  
Mysql藉助logstash同步後,日期類型格式:「2019-04-20 13:55:53」已經被識別為日期格式。

code =>
"event.set('gather_time_unix',event.get('gather_time').to_i*1000)",

是將Mysql中的時間格式轉化為時間戳格式。

3、坑總結3.1 坑1欄位大小寫問題

from星友:使用logstash同步mysql數據的,因為在jdbc.conf裡面沒有添加 lowercase_column_names
=> "false"  這個屬性,所以logstash默認把查詢結果的列明改為了小寫,同步進了es,所以就導致es裡面看到的欄位名稱全是小寫。

最後總結:es是支持大寫欄位名稱的,問題出在logstash沒用好,需要在同步配置中加上 lowercase_column_names => "false"  。記錄下來希望可以幫到更多人。

3.2 同步到ES中的數據會不會重複?

想將關係資料庫的數據同步至ES中,如果在集群的多臺伺服器上同時啟動logstash。

解讀:實際項目中就是沒用隨機id  使用指定id作為es的_id ,指定id可以是url的md5.這樣相同數據就會走更新覆蓋以前數據

3.3 相同配置logstash,升級6.3之後不能同步數據。

解讀:高版本基於時間增量有優化。

tracking_column_type => "timestamp"應該是需要指定標識為時間類型,默認為數字類型numeric

3.4 ETL欄位統一在哪處理?

解讀:可以logstash同步mysql的時候sql查詢階段處理,如:select a_value as avalue***。

或者filter階段處理,mutate rename處理。

mutate {
        rename => ["shortHostname", "hostname" ]
    }

或者kafka階段藉助kafka stream處理。

4、小結

推薦閱讀:
1、實戰 | canal 實現Mysql到Elasticsearch實時增量同步
2、乾貨 | Debezium實現Mysql到Elasticsearch高效實時同步
3、一張圖理清楚關係型資料庫與Elasticsearch同步 http://t.cn/EaAceD3

4、新的實現:http://t.cn/EaAt60O

5、mysql2mysql: http://t.cn/EaAtK7r
6、推薦開源實現:http://t.cn/EaAtjqN

加入星球,更短時間更快習得更多乾貨!

相關焦點

  • Logstash讀取Kafka數據寫入HDFS詳解
    強大的功能,豐富的插件,讓logstash在數據處理的行列中出類拔萃通常日誌數據除了要入ES提供實時展示和簡單統計外,還需要寫入大數據集群來提供更為深入的邏輯處理,前邊幾篇ELK的文章介紹過利用logstash將kafka的數據寫入到elasticsearch集群,這篇文章將會介紹如何通過logstash將數據寫入HDFS本文所有演示均基於logstash
  • Filebeat+Kafka+Logstash+Elasticsearch+Kibana 構建日誌分析系統
    :5601"output.kafka:    hosts: ["kafka:9092"]    topic: 'logs'    codec.json:      pretty: false參數說明:「參數」「說明」
  • 數據管道 Logstash 入門
    { # kafka consumer 配置 }}filter { # 數據處理配置}output { elasticsearch { # elasticsearch 輸出配置 }}然後運行 logstash 就可以了。
  • 乾貨 | Debezium實現Mysql到Elasticsearch高效實時同步
    logstash和kafka_connector都僅支持基於自增id或者時間戳更新的方式增量同步數據。回到問題本身:如果庫表裡沒有相關欄位,該如何處理呢?本文給出相關探討和解決方案。1、 binlog認知1.1 啥是 binlog?
  • 利用flume+kafka+storm+mysql構建大數據實時系統
    >1).數據採集負責從各節點上實時採集數據,選用cloudera的flume來實現2).數據接入由於採集數據的速度和數據處理的速度不一定同步,因此添加一個消息中間件來作為緩衝,選用apache的kafka3).流式計算對採集到的數據進行實時分析,選用apache的storm4).數據輸出對分析後的結果持久化,暫定用mysql詳細介紹各個組件及安裝配置:作業系統:centos6.4FlumeFlume是Cloudera提供的一個分布式、可靠、和高可用的海量日誌採集
  • 「Spring和Kafka」Kafka整合Spring 深入挖掘 -第1部分
    接下來是《如何在您的Spring啟動應用程式中使用Apache Kafka》「Spring和Kafka」如何在您的Spring啟動應用程式中使用Kafka ,這展示了如何開始使用Spring啟動和Apache Kafka®,這裡我們將更深入地挖掘Apache Kafka項目的Spring提供的一些附加功能。
  • 聊聊 Kafka:編譯 Kafka 源碼並搭建源碼環境
    進入 kafka 源碼包,修改 build.gradle 文件,在原來配置上,添加 ali 私服配置。streams 目錄:Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters.
  • kafka使用原理介紹
    當ack=1的時候,表示producer發送出去message,同步的把message存到對應topic的partition的leader上,然後producer就返回成功,partition leader異步的把message同步到其他partition replica上。
  • 圖解 kafka 架構與工作原理
    ,kafka的數據就保存在topic。Replication:每一個分區都有多個副本,副本的作用是做備胎,主分區(Leader)會將數據同步到從分區(Follower)。當主分區(Leader)故障的時候會選擇一個備胎(Follower)上位,成為 Leader。
  • kafka入門(原理-搭建-簡單使用)
    前言公司在用kafka接受和發送數據,自己學習過Rabbitmq,不懂kafka挺不爽的,說幹就幹!網上找了許多帖子,學習了很多,小小的demo自己也搭建起來了,美滋滋,下面我認為優秀的網站和自己的步驟展現給大家。
  • CentOS7下簡單搭建zookeeper+kafka集群
    zoo.cfg修改日誌保存目錄dataDir=/opt/zookeeper/data#server.服務編號=服務地址、LF通信埠、選舉埠server.221=node1:2888:3888server.222=node2:2888:3888server.223=node3:2888:3888配置同步到
  • kafka極簡教程
    kafka是用於構建實時數據管道和流應用程式。具有橫向擴展,容錯,wicked fast(變態快)等優點,並已在成千上萬家公司運行。
  • 大白話+13張圖解 Kafka
    Topic是一個邏輯上的概念,並不能直接在圖中把Topic的相關單元畫出需要注意:kafka在0.8版本以前是沒有副本機制的,所以在面對伺服器宕機的突發情況時會丟失數據,所以儘量避免使用這個版本之前的kafkaReplica - 副本kafka中的partition
  • 股友問:Kafka為什麼會丟消息,如何解決呢?
    事務隔離級別不同生成的時機不一樣(比如 RC 是每次查詢的時候都會生成一個,之後快照讀都會讀最新;RR 是第一次查詢時會生成之後就不再生成,之後快照讀都會讀同一個),readview 本質上其實是根據每行的記錄隱藏的當前事務 ID,去讀取 undolog 的記錄來生成當前事務允許讀到的數據集,所以才有快照讀這個說法。
  • kafka異步雙活方案 mirror maker2深度解析
    mirror maker2背景通常情況下,我們都是使用一套kafka集群處理業務。但有些情況需要使用另一套kafka集群來進行數據同步和備份。在kafka早先版本的時候,kafka針對這種場景就有推出一個叫mirror maker的工具(mirror maker1,以下mm1即代表mirror maker1),用來同步兩個kafka集群的數據。
  • Apache Kafka 快速入門指南
    Kafka 對消息保存時根據 Topic 進行歸類,發送消息 者稱為 Producer,消息接受者稱為 Consumer,此外 kafka 集群有多個 kafka 實例組成,每個 實例(server)稱為 broker。
  • Kafka這些名詞都說不出所以然,您竟然敢說自己精通kafka
    kafka基於發布/訂閱模式的消息隊列kafka有那麼多優異的性能,難怪許多大公司都在使用Kafka!Follower: 每個分區多個副本的「從」副本,實時從 leader 中同步數據,保持和 leader 數據的同步。leader 發生故障時,某個 follower 還會成為新的 leader。
  • 從未如此簡單:10分鐘帶你逆襲Kafka!
    要了解 Kafka 如何執行這些操作,讓我們從頭開始深入研究 Kafka 的功能。首先幾個概念: Kafka 在一個或多個可以跨越多個數據中心的伺服器上作為集群運行。 Kafka 集群將記錄流存儲在稱為主題的類別中。 每個記錄由一個鍵,一個值和一個時間戳組成。
  • Kafka 基本原理(8000 字小結)
    1)kafka以topic來進行消息管理,每個topic包含多個partition,每個partition對應一個邏輯log,有多個segment組成。2)每個segment中存儲多條消息(見下圖),消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。
  • 不了解Kafka的acks配置,怎麼能說你會Kafka?
    Kafka生產者有很多可以配置的參數,這些在kafka的說明文檔中已經有詳細的說明,它們大部分都有合理的默認值,一般情況下,我們不需要修改。不過有些參數在內存使用、性能和可靠性方面對生產者的影響比較大,今天就重點來講講acks參數對消息可靠性的影響。