
springboot+kafka初尝试
发布日期:2021-05-27 02:54:16
浏览次数:30
分类:精选文章
本文共 3848 字,大约阅读时间需要 12 分钟。
消息队列(MQ)是一个在消息传输过程中的容器,主要用于提供生产和消费接口,实现数据的存储和获取。消息队列的主要分类包括点对点(Point-to-Point,简称P2P)和发布订阅( Publish/Subscribe,简称Pub/Sub)。
Kafka简介
Kafka是一种开源的分布式消息队列系统,完全支持发布订阅模式。它以其高效的批量处理能力和可扩展性而闻名于世。
Kafka的使用场景
Kafka在多个场景中表现出色,主要包括:
- 指标监控:Kafka用于收集和存储分布式应用的性能数据,便于进行实时分析。
- 运营数据记录:Kafka可以用来记录各种运营数据,如报警信息和用户反馈,确保数据在分布式系统中的集中处理。
- 日志聚合:Kafka可以跨服务、跨组织收集日志,并以标准格式存储,便于后续处理。
- 消息解耦:Kafka通过生产者和消费者的解耦,缓存消息,提升系统性能和可靠性。
Kafka的核心术语
- Topic(主题):消息按照主题进行分类,类似于一个队列。
- Producer(生产者):负责发送消息到Kafka集群的客户端。
- Consumer(消费者):负责从Kafka集群中读取消息的客户端。
- Broker(代理):单个Kafka服务器称为一个Broker,集群由多个Broker组成,每个Broker可以支持多个Topic。
- Zookeeper:用于存储Kafka集群的元数据,确保集群的高可用性和一致性。
Spring Boot集成Kafka示例
pom.xml文件中核心依赖
com.alibaba fastjson 1.2.75 org.springframework.boot spring-boot-starter-web org.springframework.kafka spring-kafka
application.properties配置文件
# Kafka代理地址设置,可多个空格分隔spring.kafka.bootstrap-servers=192.168.0.1:9092# 消息发送的重试次数,默认为0spring.kafka.producer.retries=0# 每次批量发送消息的数量,默认为16384spring.kafka.producer.batch-size=16384# 内存缓冲区大小,默认为33554432spring.kafka.producer.buffer-memory=33554432# 消息key和消息体的编解码方式spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer# 消息消费者的标识符spring.kafka.consumer.group-id=consumer-groupspring.kafka.consumer.auto-offset-reset=earliest# 自动提交消息,仅在消费者调用关联主题时自动提交,默认为truespring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.auto-commit-interval=100# 消息的编解码方式spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer# 消息监听时,主题不存在将不报错,默认为falsespring.kafka.listener.missing-topics-fatal=false
Kafka代码示例
@RestControllerpublic class DemoController { @Autowired private DemoService demoService; @GetMapping("/hello") public Object hello() { return "hello"; } @GetMapping("/send") public Object send() { demoService.send("hello"); return "hello"; }}@Servicepublic class DemoService { @Autowired private KafkaProducer kafkaProducer; public void send(String param) { Message message = new Message(); message.setId(1L); message.setMsg(param); message.setSendTime(new Date()); kafkaProducer.send(message); }}public class Message implements Serializable { private static final long serialVersionUID = 1L; private Long id; private String msg; private Date sendTime; // Getters and Setters omitted for brevity}
Kafka生产者和消费者
@Componentpublic class KafkaProducer { private final Logger logger = LoggerFactory.getLogger(KafkaProducer.class); @Autowired private KafkaTemplatekafkaTemplate; public void send(Message message) { logger.info("send message: " + JSON.toJSONString(message)); kafkaTemplate.send("topic1", JSON.toJSONString(message)); }}@Componentpublic class KafkaConsumer { private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); @Autowired private ConsumerFactory consumerFactory; @KafkaListener(topics = {"topic1"}, containerFactory = "filterContainerFactory") public void listen(ConsumerRecord record) { Optional message = Optional.ofNullable(record.value()); if (message.isPresent()) { logger.info("record: " + record); logger.info("receive message: " + message.get()); } }}
消息过滤策略(简化示例)
@Componentpublic class KafkaConsumerFilter { @KafkaListener(topics = {"topic1"}) public void listen(ConsumerRecordrecord) { Optional message = Optional.ofNullable(record.value()); if (message.isPresent()) { logger.info("record: " + record); logger.info("receive message: " + message.get()); } }}
发表评论
最新留言
初次前来,多多关照!
[***.217.46.12]2025年04月16日 19时31分55秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
有了Trae,人人都是程序员的时代来了
2023-01-23
CentOS 系列:CentOS 7文件系统的组成
2023-01-23
Docker部署postgresql-11以及主从配置
2023-01-23
EnvironmentNotWritableError: The current user does not have write permissions to the target environm
2023-01-23
kali安装docker(亲测有效)
2023-01-23
PHP系列:PHP 基础编程 2(时间函数、数组---实现登录&注册&修改)
2023-01-23
PHP系列:使用PHP实现登录注册功能的完整指南
2023-01-23
"WARNING: Increasing RAM size to 1GB" and "Cannot set up guest memory 'xxx.ram': Invalid argument".
2023-01-23
04-docker-commit构建自定义镜像
2023-01-23
05-docker系列-使用dockerfile构建镜像
2023-01-23
09-docker系列-docker网络你了解多少(下)
2023-01-23
#C8# UVM中的factory机制 #S8.2.3# 重载sequence哪些情形
2023-01-24
cytoscape安装java_Cytoscape史上最全攻略
2023-01-24
c语言编写单片机中断,C语言AVR单片机中断程序写法
2023-01-24
java教学团队管理系统(ssm)
2023-01-24
java教师管理系统(ssm)
2023-01-24
java教师课堂助手app(ssm)
2023-01-24
java教育辅导班信息网(ssm)
2023-01-24