springboot提供自動配置整合kafka的方式,需要做一下步驟:
1. 引入kafka依賴包:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.7.RELEASE</version> </dependency>
2.在springboot配置中加入kafka相關配置,springboot啟動時候會自動加載這些配置,完成連結kafka,創建producer,consumer等。
spring: kafka: 消費者配置 consumer: bootstrap-servers: 127.0.0.1:9092 group-id: myGroup enable-auto-commit: true auto-offset-reset: earliest auto-commit-interval: 1000 max-poll-records: 10 34;test_topic&34; rev &34; error &34;test_topic&34;spring.kafka& zk地址address: 192.168.1.30:2181 zk數據組groupdefault: groupdefault
3.下載噹噹的config-toolkit,訪問http://localhost:8080/,加入相關配置,github上有詳細說明。4. 新建一個ZKConfiguration類,實現EnvironmentAware接口,實現EnvironmentAware接口的setEnvironment可以在項目啟動時設置項目的環境變量,可以在這個類中結合config-toolkit,把zk的配置加載到項目環境變量當中:
@Componentpublic class ZKConfiguration implements EnvironmentAware { @Autowired private Environment env; private static Map<String, GeneralConfigGroup> GROUPMAP = new HashMap<>(); public ZKConfiguration() { } // 加載zk的基本配置 @Bean public ZookeeperConfigProfile zookeeperConfigProfile() { ZookeeperConfigProfile configProfile = new ZookeeperConfigProfile( Objects.requireNonNull(this.env.getProperty(&34;)), Objects.requireNonNull(this.env.getProperty(&34;)), this.env.getProperty(&34;)); return configProfile; } //得到具體組裡的配置 @Bean({&34;}) public GeneralConfigGroup generalConfigGroupDefault() { ZookeeperConfigProfile configProfile = this.zookeeperConfigProfile(); GeneralConfigGroup group = new ZookeeperConfigGroup(configProfile, this.env.getProperty(&34;)); return group; } /** * 獲取配置組 * @return */ public GeneralConfigGroup getConfigGroup(String group) { return GROUPMAP.get(group); } /** * * 項目啟動時會調用這個方法,把zk裡的配置組存在臨時變量GROUPMAP裡,以後會用到 * 所以 數據源初始化,就設置在這個方法裡 * @param environment */ @Override public void setEnvironment(Environment environment) { this.env = environment; ZookeeperConfigProfile configProfile = this.zookeeperConfigProfile(); GROUPMAP.put(&34;, new ZookeeperConfigGroup(configProfile, this.env.getProperty(&34;))); }}
5.獲得所有配置項後,就是讓springboot去建立kafka連結了,這裡相當於要重新實現KafkaAutoConfiguration的配置。建立一個KafkaConfig配置類,這裡主要是配置所有kafka需要的bean:
@Configuration
@ConditionalOnClass({KafkaTemplate.class})
@EnableKafka
public class KafkaConfig {
// 把剛剛加載zk配置的類注入進來@Autowiredprivate ZKConfiguration zkConfiguration;// 創建 消費者工廠@Bean(&34;)@ConditionalOnMissingBean({ConsumerFactory.class})public ConsumerFactory<String, String> consumerFactory() { // 創建工廠需要三個參數: // 1. 消費者配置的map // 2. key的反序列化實現類 // 3. value的反序列化實現類 return new DefaultKafkaConsumerFactory<String, String>(makeKafkaConfig(), new StringDeserializer(), new StringDeserializer());}// 創建生產者工廠@Bean(&34;)@ConditionalOnMissingBean({ProducerFactory.class})public ProducerFactory<String, String> kafkaProducerFactory() { // 生產者工廠的參數如消費者工廠 return new DefaultKafkaProducerFactory(makeKafkaConfig(), new StringSerializer(), new StringSerializer());}// 創建 kafkaTemplate 這個bean,有了這個bean才能在實際業務中使用kafka@Bean(&34;)@ConditionalOnMissingBean({com.seckill.boot.common.util.KafkaTemplate.class})public KafkaTemplate<String, Protobufable> kafkaTemplate(@Qualifier(&34;) ProducerFactory<String, String> kafkaProducerFactory, @Qualifier(&34;) ProducerListener<String, Protobufable> producerListener) { KafkaTemplate<String, Protobufable> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory); kafkaTemplate.setProducerListener(producerListener); kafkaTemplate.setDefaultTopic(&34;); return kafkaTemplate;}@Bean(&34;)@ConditionalOnMissingBean({ProducerListener.class})public ProducerListener<String, Protobufable> kafkaProducerListener() { return new LoggingProducerListener();}@Bean@ConditionalOnProperty( name = {&34;})@ConditionalOnMissingBeanpublic KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) { return new KafkaTransactionManager(producerFactory);}// zk裡拿到的配置取出來private Map<String, Object> makeKafkaConfig() { // 獲得配置的group GeneralConfigGroup configGroup = zkConfiguration.getConfigGroup(&34;); Map<String, Object> kafkaConfig = new HashMap<>(); kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(&34;, configGroup.get(&34;)); kafkaConfig.put(&34;, configGroup.get(&34;)); kafkaConfig.put(ProducerConfig.ACKS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ProducerConfig.BUFFER_MEMORY_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, configGroup.get(&34;)); kafkaConfig.put(ProducerConfig.RETRIES_CONFIG, configGroup.get(&34;)); return kafkaConfig;}
}
6. 將kafka需要的bean配置好後,就能在實際業務中使用KafkaTemplate操作消息了
@Componentpublic class MqProviderImpl{ @Autowired private KafkaTemplate<String, String> kafkaTemplate;
來源:https://www.tuicool.com/articles/rmeYJrz