Spring For Apache Kafka 2.1.0 和 1.3.2 發布

2020-12-27 開源中國

Spring for Apache Kafka 2.1.0 已發布,同時發布的還有 1.3.2 和 2.0.2 維護版本,包含重要的 Bug 修復。

2.1.0 版本的主要將 kafka-clients 庫升級到 1.0.0,以及一些改進:

  • Sometimes, when a message can’t be processed, you may wish to stop the container so the condition can be corrected and the message re-delivered. The framework now provides the ContainerStoppingErrorHandler for record listeners and ContainerStoppingBatchErrorHandler for batch listeners.

  • The KafkaAdmin now supports increasing partitions when a NewTopic bean is detected with a larger number of partitions than currently exist on the topic.

  • StringJsonMessageConverter and JsonSerializer/JsonDeserializer now pass and consume type information in Headers. This allows multiple types to be easily sent/received on the same topic:

@SpringBootApplicationpublic class Kafka21Application {    public static void main(String[] args) {        SpringApplication.run(Kafka21Application.class, args)            .close();    }    @Bean    public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {        return args -> {            template.send(MessageBuilder.withPayload(42)                    .setHeader(KafkaHeaders.TOPIC, "blog")                    .build());            template.send(MessageBuilder.withPayload("43")                    .setHeader(KafkaHeaders.TOPIC, "blog")                    .build());            Thread.sleep(5_000);        };    }    @Bean    public StringJsonMessageConverter converter() {        return new StringJsonMessageConverter();    }    @Component    @KafkaListener(id = "multi", topics = "blog")    public static class Listener {        @KafkaHandler        public void intListener(Integer in) {            System.out.println("Got an int: " + in);        }        @KafkaHandler        public void stringListener(String in) {            System.out.println("Got a string: " + in);        }    }}

Got an int: 42Got a string: 43

更多詳情請查閱發行主頁。

相關焦點