springboot整合Kafka,使用zookeeper做服務治理

2020-08-28 Java成長催化師


一.springboot自動配置方式整合kafka

springboot提供自動配置整合kafka的方式,需要做一下步驟:

1. 引入kafka依賴包:

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.7.RELEASE</version> </dependency>

2.在springboot配置中加入kafka相關配置,springboot啟動時候會自動加載這些配置,完成連結kafka,創建producer,consumer等。

spring: kafka: 消費者配置 consumer: bootstrap-servers: 127.0.0.1:9092 group-id: myGroup enable-auto-commit: true auto-offset-reset: earliest auto-commit-interval: 1000 max-poll-records: 10 34;test_topic&34; rev &34; error &34;test_topic&34;spring.kafka& zk地址address: 192.168.1.30:2181 zk數據組groupdefault: groupdefault

3.下載噹噹的config-toolkit,訪問http://localhost:8080/,加入相關配置,github上有詳細說明。4. 新建一個ZKConfiguration類,實現EnvironmentAware接口,實現EnvironmentAware接口的setEnvironment可以在項目啟動時設置項目的環境變量,可以在這個類中結合config-toolkit,把zk的配置加載到項目環境變量當中:

@Componentpublic class ZKConfiguration implements EnvironmentAware { @Autowired private Environment env; private static Map<String, GeneralConfigGroup> GROUPMAP = new HashMap<>(); public ZKConfiguration() { } // 加載zk的基本配置 @Bean public ZookeeperConfigProfile zookeeperConfigProfile() { ZookeeperConfigProfile configProfile = new ZookeeperConfigProfile( Objects.requireNonNull(this.env.getProperty(&34;)), Objects.requireNonNull(this.env.getProperty(&34;)), this.env.getProperty(&34;)); return configProfile; } //得到具體組裡的配置 @Bean({&34;}) public GeneralConfigGroup generalConfigGroupDefault() { ZookeeperConfigProfile configProfile = this.zookeeperConfigProfile(); GeneralConfigGroup group = new ZookeeperConfigGroup(configProfile, this.env.getProperty(&34;)); return group; } /** * 獲取配置組 * @return */ public GeneralConfigGroup getConfigGroup(String group) { return GROUPMAP.get(group); } /** * * 項目啟動時會調用這個方法,把zk裡的配置組存在臨時變量GROUPMAP裡,以後會用到 * 所以 數據源初始化,就設置在這個方法裡 * @param environment */ @Override public void setEnvironment(Environment environment) { this.env = environment; ZookeeperConfigProfile configProfile = this.zookeeperConfigProfile(); GROUPMAP.put(&34;, new ZookeeperConfigGroup(configProfile, this.env.getProperty(&34;))); }}

5.獲得所有配置項後,就是讓springboot去建立kafka連結了,這裡相當於要重新實現KafkaAutoConfiguration的配置。建立一個KafkaConfig配置類,這裡主要是配置所有kafka需要的bean:

@Configuration

@ConditionalOnClass({KafkaTemplate.class})

@EnableKafka

public class KafkaConfig {

// 把剛剛加載zk配置的類注入進來@Autowiredprivate ZKConfiguration zkConfiguration;// 創建 消費者工廠@Bean(&34;)@ConditionalOnMissingBean({ConsumerFactory.class})public ConsumerFactory<String, String> consumerFactory() { // 創建工廠需要三個參數: // 1. 消費者配置的map // 2. key的反序列化實現類 // 3. value的反序列化實現類 return new DefaultKafkaConsumerFactory<String, String>(makeKafkaConfig(), new StringDeserializer(), new StringDeserializer());}// 創建生產者工廠@Bean(&34;)@ConditionalOnMissingBean({ProducerFactory.class})public ProducerFactory<String, String> kafkaProducerFactory() { // 生產者工廠的參數如消費者工廠 return new DefaultKafkaProducerFactory(makeKafkaConfig(), new StringSerializer(), new StringSerializer());}// 創建 kafkaTemplate 這個bean,有了這個bean才能在實際業務中使用kafka@Bean(&34;)@ConditionalOnMissingBean({com.seckill.boot.common.util.KafkaTemplate.class})public KafkaTemplate<String, Protobufable> kafkaTemplate(@Qualifier(&34;) ProducerFactory<String, String> kafkaProducerFactory, @Qualifier(&34;) ProducerListener<String, Protobufable> producerListener) { KafkaTemplate<String, Protobufable> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory); kafkaTemplate.setProducerListener(producerListener); kafkaTemplate.setDefaultTopic(&34;); return kafkaTemplate;}@Bean(&34;)@ConditionalOnMissingBean({ProducerListener.class})public ProducerListener<String, Protobufable> kafkaProducerListener() { return new LoggingProducerListener();}@Bean@ConditionalOnProperty( name = {&34;})@ConditionalOnMissingBeanpublic KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) { return new KafkaTransactionManager(producerFactory);}// zk裡拿到的配置取出來private Map<String, Object> makeKafkaConfig() { // 獲得配置的group GeneralConfigGroup configGroup = zkConfiguration.getConfigGroup(&34;); Map<String, Object> kafkaConfig = new HashMap<>(); kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(&34;, configGroup.get(&34;)); kafkaConfig.put(&34;, configGroup.get(&34;)); kafkaConfig.put(ProducerConfig.ACKS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ProducerConfig.BUFFER_MEMORY_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ProducerConfig.RETRIES_CONFIG, configGroup.get(&34;)); return kafkaConfig;}

}

6. 將kafka需要的bean配置好後,就能在實際業務中使用KafkaTemplate操作消息了

@Componentpublic class MqProviderImpl{ @Autowired private KafkaTemplate<String, String> kafkaTemplate;

來源:https://www.tuicool.com/articles/rmeYJrz

相關焦點

  • springboot整合kafka實現消息的發送消費
    如下是springboot整合kafka的一個案例,方便需要的小夥伴。啟動kafka Servercd 到kafka的bin目錄下:前提是啟動zk./kafka-server-start.sh /Users/hz/programs/kafka_2.12-2.2.1/config/server.properties &kafka創建topic:kafka-topics.sh --create --zookeeper localhost
  • springboot整合dubbo+zookeeper(三)
    現在越來越多的公司開發項目中使用了springboot作為搭建服務的框架,而dubbo是一款國內使用較多的SOA架構開發的中間件,其主要由三個部分組成:生產者,消費者,註冊中心。註冊中心一般使用的是zookeeper。於是我們從頭來捋一捋如何搭建一個springboot+dubbo項目。
  • 從零開始搭建Kafka+SpringBoot分布式消息系統
    zookeeper,所以需先搭建好zookeeper集群。本教程所有下載均放在/usr/local目錄下)(ps4:kafka可能有內置zookeeper,感覺可以越過zookeeper教程,但是這裡也配置出來了。
  • springboot整合dubbo+zookeeper(一)
    現在越來越多的公司開發項目中使用了springboot作為搭建服務的框架,而dubbo是一款國內使用較多的SOA架構開發的中間件,其主要由三個部分組成:生產者,消費者,註冊中心。註冊中心一般使用的是zookeeper。於是我們從頭來捋一捋如何搭建一個springboot+dubbo項目。
  • springboot整合dubbo+zookeeper(二)
    現在越來越多的公司開發項目中使用了springboot作為搭建服務的框架,而dubbo是一款國內使用較多的SOA架構開發的中間件,其主要由三個部分組成:生產者,消費者,註冊中心。註冊中心一般使用的是zookeeper。於是我們從頭來捋一捋如何搭建一個springboot+dubbo項目。
  • kafka入門(原理-搭建-簡單使用)
    看 kafka 官方給出的圖:多個 broker 協同合作,producer 和 consumer 部署在各個業務邏輯中被頻繁的調用,三者通過 zookeeper管理協調請求和轉發。這樣一個高性能的分布式消息發布訂閱系統就完成了。
  • springboot + kafka的使用
    Kafka對消息保存時根據Topic進行歸類,發送消息者稱為Producer,消息接受者稱為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)稱為broker。無論是kafka集群,還是consumer都依賴於zookeeper集群保存一些meta信息,來保證系統可用性。
  • ELK + Filebeat + Kafka 分布式日誌管理平臺搭建
    : true paths: - /var/logs/springboot/sparksys-gateway.log fields: log_source: gateway- type: log enabled: true paths: - /var/logs/springboot/sparksys-file.log fields: log_source: file-
  • 62.Kafka消息隊列訂閱發布
    服務地址,那麼消息的發送會直接發送到broker提供的地址中.).broker.id=0 使用的認證協議security.inter.broker.protocol=SASL_PLAINTEXT 完成身份驗證的類authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer此文件是服務端 設置用戶名和密碼KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule
  • CentOS7下簡單搭建zookeeper+kafka集群
    dataDir=/opt/zookeeper/data#server.服務編號=服務地址、LF通信埠、選舉埠server.221=node1:2888:3888server.222=node2:2888:3888server.223=node3:2888:3888配置同步到node2和node3scp zoo.cfg
  • Springboot整合zookeeper增刪改查入門教程
    本教程的工程,使用maven、jdk8、springboot、zookeeper 3.4.12   重點:大家學會增刪改查後,不妨動腦想下,zookeeper如何實現分布式鎖,小小的提示下,競爭創建臨時節點,創建成功者,則獲得鎖。
  • 使用SpringBoot+Dubbo 搭建一個簡單的分布式服務
    :Dubbo、RPC、分布式、由於本文的目的是帶大家使用SpringBoot+Dubbo 搭建一個簡單的分布式服務,所以這些概念我只會簡單給大家普及一下,不會做深入探究。簡單來說 Dubbo 是一個分布式服務框架,致力於提供高性能和透明化的RPC遠程服務調用方案,以及SOA服務治理方案。Dubbo 目前已經有接近 23k 的 Star ,Dubbo的Github 地址:https://github.com/apache/incubator-dubbo。
  • 一文詳解 SparkStreaming 如何整合 Kafka!附代碼可實踐
    整合kafka兩種模式說明這同時也是一個面試題的熱點。Receiver方式是通過zookeeper來連接kafka隊列,調用Kafka高階API,offset存儲在zookeeper,由Receiver維護。
  • NET Core 下使用 Kafka
    CentOS安裝KafakaKafka : http://kafka.apache.org/downloadsZooLeeper : https://zookeeper.apache.org/releases.html
  • .NET Core 下使用 Kafka
    /releases.ht…下載並解壓 下載 zookeeper,解壓$ wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz$ tar -zxvf
  • Kafka-manager部署與使用簡單介紹
    ,並可以選擇要使用的代理 批量運行分區的多個主題的重新分配 將分區添加到現有主題 更新現有主題的配置Kafka Manager下載地址 https://github.com/yahoo/kafka-manager/releases下載下來的是源碼包,需要進行sbt編譯,為了方便我這裡直接下載編譯好的kafka-manager
  • kafka_2.12-1.0.0-單節點的安裝部署使用
    提高協調服務的工具。kafka已經內置了一個zookeeper伺服器以及一些啟動腳本編輯配置文件zookeeper.properties配置內容dataDir=/usr/local/kafka_2.12-1.0.0/data/clientPort=2181啟動zookeeper.
  • SpringBoot與Kafka整合實現簡單分布式消息隊列
    來源 | urlify.cn/jMjuAj1、此處只是單純的梳理一下SpringBoot整合--        ## SpringBoot 整合 kafka核心依賴 ##-->        <dependency>     
  • kafka偽集群模式
    kafka偽集群模式是指由一臺zookeeper和一臺kafka服務組成1、安裝jdk使用root用戶安裝jdkmkdir /usr/javatar -zvxf jdk-8u231-linux-x64.tar.gz -C /usr/java
  • Kafka 2.2.0基礎入門
    > zookeeper是一個為分布式應用提供一致性服務的軟體,它是開源的Hadoop項目的一個子項目,並根據google發表的一篇論文來實現的。zookeeper為分布式系統提供了高效且易於使用的協同服務,它可以為分布式應用提供相當多的服務,諸如統一命名服務,配置管理,狀態同步和組服務等。zookeeper接口簡單,不必過多地糾結在分布式系統編程難於處理的同步和一致性問題上,可以使用zookeeper提供的現成(off-the-shelf)服務來實現來實現分布式系統額配置管理,組管理,Leader選舉等功能。