Kafka笔记-kafka外网搭建及构建生产者
发布日期:2021-06-30 10:41:37
浏览次数:2
分类:技术文章
本文共 5222 字,大约阅读时间需要 17 分钟。
程序运行截图如下:
后端如下:
消费者如下:
这里启动kafka先要运行zookpeer
./zookeeper-server-start.sh ../config/zookeeper.properties
这里要先修改下配置文件,在config下的server.properties:
advertised.listeners=PLATNTEXT://122.51.245.141:9092
然后运行kafka
./kafka-server-start.sh ../config/server.properties
web端代码关键代码如下:
配置相关KafkaProducerConfig.java
package com.kafkatest.kafkatest.config;import com.kafkatest.kafkatest.common.MessageEntity;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;import org.springframework.kafka.support.serializer.JsonSerializer;import java.util.HashMap;import java.util.Map;@Configuration@EnableKafkapublic class KafkaProducerConfig { @Value("${kafka.producer.servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch.size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; @Value("${kafka.producer.buffer.memory}") private int bufferMemory; public MapproducerConfigs(){ Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory producerFactory(){ return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer<>()); } @Bean public KafkaTemplate kafkaTemplate(){ return new KafkaTemplate<>(producerFactory()); }}
生产者如下:
ProducerCallback.java
package com.kafkatest.kafkatest.producer;import com.google.gson.Gson;import com.kafkatest.kafkatest.common.MessageEntity;import org.apache.kafka.clients.producer.RecordMetadata;import org.springframework.kafka.support.SendResult;import org.springframework.util.concurrent.ListenableFutureCallback;public class ProducerCallback implements ListenableFutureCallback> { private final long startTime; private final String key; private final MessageEntity message; private final Gson gson = new Gson(); public ProducerCallback(long startTime, String key, MessageEntity message) { this.startTime = startTime; this.key = key; this.message = message; } @Override public void onFailure(Throwable throwable) { throwable.printStackTrace(); } @Override public void onSuccess(SendResult result) { if(result == null){ return; } long elapsedTime = System.currentTimeMillis() - startTime; RecordMetadata metadata = result.getRecordMetadata(); if (metadata != null) { StringBuilder record = new StringBuilder(); record.append("message(") .append("key = ").append(key).append(",") .append("message = ").append(gson.toJson(message)).append(")") .append("sent to partition(").append(metadata.partition()).append(")") .append("with offset(").append(metadata.offset()).append(")") .append("in ").append(elapsedTime).append(" ms"); } }}
SimpleProducer.java
package com.kafkatest.kafkatest.producer;import com.kafkatest.kafkatest.common.MessageEntity;import org.apache.kafka.clients.producer.ProducerRecord;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.kafka.core.KafkaOperations;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Component;import org.springframework.util.concurrent.ListenableFuture;@Componentpublic class SimpleProducer { @Autowired @Qualifier("kafkaTemplate") private KafkaTemplatekafkaTemplate; public void send(String topic, MessageEntity message){ kafkaTemplate.send(topic, message); } public void send(String topic, String key, MessageEntity entity){ ProducerRecord record = new ProducerRecord<>( topic, key, entity ); long startTime = System.currentTimeMillis(); ListenableFuture > future = kafkaTemplate.send(record); future.addCallback(new ProducerCallback(startTime, key, entity)); }}
源码下载如下:
转载地址:https://it1995.blog.csdn.net/article/details/104262199 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年04月09日 02时13分13秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
基础架构系列篇-系统centos7中DOCKER安装REDIS(已更新)
2019-04-30
基础架构系列篇-系统centos7安装docker+COMPOSE
2019-04-30
基础架构系列篇-系统centos7中docker安装rabbitmq
2019-04-30
基础架构系列篇-NGINX部署VUE
2019-04-30
个人电商项目,基于uni-app+ springcloud +VUE技术
2019-04-30
基础架构系列篇-系统centos7安装kafka
2019-04-30
基础架构系列篇-系统centos7中docker安装分布式文件存储服务minio
2019-04-30
知识点记录-java判断系统是linux或windows
2019-04-30
知识点记录-springboot静态资源映射路径
2019-04-30
知识点记录-vue-cli+webpack打包运行图标显示异常
2019-04-30
知识点记录-springboot2.1集成rabbitmq
2019-04-30
微服务springcloud2系列篇-配置与注册nacos组件
2019-04-30
数据库系列篇mysql8-分库分表中间件mycat(WINDOWS环境)
2019-04-30
用户权限设计-基于RBAC模型
2019-04-30
微服务springcloud2系列篇-网关GATEWAY跨域问题
2019-04-30
基础架构系列篇-系统centos7中docker安装gitlab
2019-04-30
map的几种常用遍历方式
2019-04-30