springboot+kafka初尝试
发布日期:2021-05-27 02:54:16 浏览次数:30 分类:精选文章

本文共 3848 字,大约阅读时间需要 12 分钟。

消息队列(MQ)是一个在消息传输过程中的容器,主要用于提供生产和消费接口,实现数据的存储和获取。消息队列的主要分类包括点对点(Point-to-Point,简称P2P)和发布订阅( Publish/Subscribe,简称Pub/Sub)。

Kafka简介

Kafka是一种开源的分布式消息队列系统,完全支持发布订阅模式。它以其高效的批量处理能力和可扩展性而闻名于世。

Kafka的使用场景

Kafka在多个场景中表现出色,主要包括:

  • 指标监控:Kafka用于收集和存储分布式应用的性能数据,便于进行实时分析。
  • 运营数据记录:Kafka可以用来记录各种运营数据,如报警信息和用户反馈,确保数据在分布式系统中的集中处理。
  • 日志聚合:Kafka可以跨服务、跨组织收集日志,并以标准格式存储,便于后续处理。
  • 消息解耦:Kafka通过生产者和消费者的解耦,缓存消息,提升系统性能和可靠性。

Kafka的核心术语

  • Topic(主题):消息按照主题进行分类,类似于一个队列。
  • Producer(生产者):负责发送消息到Kafka集群的客户端。
  • Consumer(消费者):负责从Kafka集群中读取消息的客户端。
  • Broker(代理):单个Kafka服务器称为一个Broker,集群由多个Broker组成,每个Broker可以支持多个Topic。
  • Zookeeper:用于存储Kafka集群的元数据,确保集群的高可用性和一致性。

Spring Boot集成Kafka示例

pom.xml文件中核心依赖

com.alibaba
fastjson
1.2.75
org.springframework.boot
spring-boot-starter-web
org.springframework.kafka
spring-kafka

application.properties配置文件

# Kafka代理地址设置,可多个空格分隔spring.kafka.bootstrap-servers=192.168.0.1:9092# 消息发送的重试次数,默认为0spring.kafka.producer.retries=0# 每次批量发送消息的数量,默认为16384spring.kafka.producer.batch-size=16384# 内存缓冲区大小,默认为33554432spring.kafka.producer.buffer-memory=33554432# 消息key和消息体的编解码方式spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer# 消息消费者的标识符spring.kafka.consumer.group-id=consumer-groupspring.kafka.consumer.auto-offset-reset=earliest# 自动提交消息,仅在消费者调用关联主题时自动提交,默认为truespring.kafka.consumer.enable-auto-commit=truespring.kafka.consumer.auto-commit-interval=100# 消息的编解码方式spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer# 消息监听时,主题不存在将不报错,默认为falsespring.kafka.listener.missing-topics-fatal=false

Kafka代码示例

@RestControllerpublic class DemoController {  @Autowired  private DemoService demoService;  @GetMapping("/hello")  public Object hello() {    return "hello";  }  @GetMapping("/send")  public Object send() {    demoService.send("hello");    return "hello";  }}@Servicepublic class DemoService {  @Autowired  private KafkaProducer kafkaProducer;  public void send(String param) {    Message message = new Message();    message.setId(1L);    message.setMsg(param);    message.setSendTime(new Date());    kafkaProducer.send(message);  }}public class Message implements Serializable {  private static final long serialVersionUID = 1L;  private Long id;  private String msg;  private Date sendTime;  // Getters and Setters omitted for brevity}

Kafka生产者和消费者

@Componentpublic class KafkaProducer {  private final Logger logger = LoggerFactory.getLogger(KafkaProducer.class);  @Autowired  private KafkaTemplate
kafkaTemplate; public void send(Message message) { logger.info("send message: " + JSON.toJSONString(message)); kafkaTemplate.send("topic1", JSON.toJSONString(message)); }}@Componentpublic class KafkaConsumer { private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); @Autowired private ConsumerFactory
consumerFactory; @KafkaListener(topics = {"topic1"}, containerFactory = "filterContainerFactory") public void listen(ConsumerRecord
record) { Optional
message = Optional.ofNullable(record.value()); if (message.isPresent()) { logger.info("record: " + record); logger.info("receive message: " + message.get()); } }}

消息过滤策略(简化示例)

@Componentpublic class KafkaConsumerFilter {  @KafkaListener(topics = {"topic1"})  public void listen(ConsumerRecord
record) { Optional
message = Optional.ofNullable(record.value()); if (message.isPresent()) { logger.info("record: " + record); logger.info("receive message: " + message.get()); } }}
上一篇:Java中分库分表粗谈
下一篇:centos7环境安装kafka过程

发表评论

最新留言

初次前来,多多关照!
[***.217.46.12]2025年04月16日 19时31分55秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章