Spring for Apache Kafka 2.1.0 已發布,同時發布的還有 1.3.2 和 2.0.2 維護版本,包含重要的 Bug 修復。
2.1.0 版本的主要將 kafka-clients 庫升級到 1.0.0,以及一些改進:
Sometimes, when a message can’t be processed, you may wish to stop the container so the condition can be corrected and the message re-delivered. The framework now provides the ContainerStoppingErrorHandler
for record listeners and ContainerStoppingBatchErrorHandler
for batch listeners.
The KafkaAdmin
now supports increasing partitions when a NewTopic
bean is detected with a larger number of partitions than currently exist on the topic.
StringJsonMessageConverter
and JsonSerializer/JsonDeserializer
now pass and consume type information in Headers
. This allows multiple types to be easily sent/received on the same topic:
@SpringBootApplicationpublic class Kafka21Application { public static void main(String[] args) { SpringApplication.run(Kafka21Application.class, args) .close(); } @Bean public ApplicationRunner runner(KafkaTemplate<Object, Object> template) { return args -> { template.send(MessageBuilder.withPayload(42) .setHeader(KafkaHeaders.TOPIC, "blog") .build()); template.send(MessageBuilder.withPayload("43") .setHeader(KafkaHeaders.TOPIC, "blog") .build()); Thread.sleep(5_000); }; } @Bean public StringJsonMessageConverter converter() { return new StringJsonMessageConverter(); } @Component @KafkaListener(id = "multi", topics = "blog") public static class Listener { @KafkaHandler public void intListener(Integer in) { System.out.println("Got an int: " + in); } @KafkaHandler public void stringListener(String in) { System.out.println("Got a string: " + in); } }}Got an int: 42Got a string: 43更多詳情請查閱發行主頁。