Kafka笔记-Spring Boot消费者构造
发布日期:2021-06-30 10:41:37 浏览次数:2 分类:技术文章

本文共 3524 字,大约阅读时间需要 11 分钟。

程序运行截图如下:

生产者端

消费者打印:

那个HOW ARE YOU就是了!

这里关键是:

这个@KafkaListener注解,监听了数据。

相关的配置文件如下:

package com.kafkatest.kafkatest.config;import com.kafkatest.kafkatest.common.MessageEntity;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.StringDeserializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.ConsumerFactory;import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;import java.util.Map;@Configuration@EnableKafkapublic class KafkaConsumerConfig {    @Value("${kafka.consumer.servers}")    private String servers;    @Value("${kafka.consumer.enable.auto.commit}")    private boolean enableAutoCommit;    @Value("${kafka.consumer.session.timeout}")    private String sessionTimeout;    @Value("${kafka.consumer.auto.commit.interval}")    private String autoCommitInterval;    @Value("${kafka.consumer.group.id}")    private String groupId;    @Value("${kafka.consumer.auto.offset.reset}")    private String autoOffsetReset;    @Value("${kafka.consumer.concurrency}")    private int concurrency;    @Bean    public KafkaListenerContainerFactory
> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); return factory; } private ConsumerFactory
consumerFactory() { return new DefaultKafkaConsumerFactory<>( consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(MessageEntity.class) ); } private Map
consumerConfigs() { Map
propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; }}

application.properties如下:

kafka.producer.servers=122.51.245.141:9092kafka.producer.retries=0kafka.producer.batch.size=4096kafka.producer.linger=1kafka.producer.buffer.memory=40960kafka.topic.default=TESTkafka.consumer.zookeeper.connect=122.51.245.141:2181kafka.consumer.servers=122.51.245.141:9092kafka.consumer.enable.auto.commit=truekafka.consumer.session.timeout=6000kafka.consumer.auto.commit.interval=100kafka.consumer.auto.offset.reset=latestkafka.consumer.topic=TESTkafka.consumer.group.id=TESTkafka.consumer.concurrency=10

源码下载地址:

 

转载地址:https://it1995.blog.csdn.net/article/details/104267877 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Spring Boot笔记-mysql5.7使用@Table后提示doesn't exist问题
下一篇:Kafka笔记-kafka外网搭建及构建生产者

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年04月26日 23时34分48秒