Kafka笔记-kafka外网搭建及构建生产者
发布日期:2021-06-30 10:41:37 浏览次数:2 分类:技术文章

本文共 5222 字,大约阅读时间需要 17 分钟。

程序运行截图如下:

后端如下:

消费者如下:

这里启动kafka先要运行zookpeer

./zookeeper-server-start.sh ../config/zookeeper.properties

这里要先修改下配置文件,在config下的server.properties:

advertised.listeners=PLATNTEXT://122.51.245.141:9092

然后运行kafka

./kafka-server-start.sh ../config/server.properties

web端代码关键代码如下:

配置相关KafkaProducerConfig.java

package com.kafkatest.kafkatest.config;import com.kafkatest.kafkatest.common.MessageEntity;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.common.serialization.StringSerializer;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.annotation.EnableKafka;import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.core.ProducerFactory;import org.springframework.kafka.support.serializer.JsonSerializer;import java.util.HashMap;import java.util.Map;@Configuration@EnableKafkapublic class KafkaProducerConfig {    @Value("${kafka.producer.servers}")    private String servers;    @Value("${kafka.producer.retries}")    private int retries;    @Value("${kafka.producer.batch.size}")    private int batchSize;    @Value("${kafka.producer.linger}")    private int linger;    @Value("${kafka.producer.buffer.memory}")    private int bufferMemory;    public Map
producerConfigs(){ Map
props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory
producerFactory(){ return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer<>()); } @Bean public KafkaTemplate
kafkaTemplate(){ return new KafkaTemplate<>(producerFactory()); }}

生产者如下:

ProducerCallback.java

package com.kafkatest.kafkatest.producer;import com.google.gson.Gson;import com.kafkatest.kafkatest.common.MessageEntity;import org.apache.kafka.clients.producer.RecordMetadata;import org.springframework.kafka.support.SendResult;import org.springframework.util.concurrent.ListenableFutureCallback;public class ProducerCallback implements ListenableFutureCallback
> { private final long startTime; private final String key; private final MessageEntity message; private final Gson gson = new Gson(); public ProducerCallback(long startTime, String key, MessageEntity message) { this.startTime = startTime; this.key = key; this.message = message; } @Override public void onFailure(Throwable throwable) { throwable.printStackTrace(); } @Override public void onSuccess(SendResult
result) { if(result == null){ return; } long elapsedTime = System.currentTimeMillis() - startTime; RecordMetadata metadata = result.getRecordMetadata(); if (metadata != null) { StringBuilder record = new StringBuilder(); record.append("message(") .append("key = ").append(key).append(",") .append("message = ").append(gson.toJson(message)).append(")") .append("sent to partition(").append(metadata.partition()).append(")") .append("with offset(").append(metadata.offset()).append(")") .append("in ").append(elapsedTime).append(" ms"); } }}

SimpleProducer.java

package com.kafkatest.kafkatest.producer;import com.kafkatest.kafkatest.common.MessageEntity;import org.apache.kafka.clients.producer.ProducerRecord;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.kafka.core.KafkaOperations;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Component;import org.springframework.util.concurrent.ListenableFuture;@Componentpublic class SimpleProducer {    @Autowired    @Qualifier("kafkaTemplate")    private KafkaTemplate
kafkaTemplate; public void send(String topic, MessageEntity message){ kafkaTemplate.send(topic, message); } public void send(String topic, String key, MessageEntity entity){ ProducerRecord
record = new ProducerRecord<>( topic, key, entity ); long startTime = System.currentTimeMillis(); ListenableFuture
> future = kafkaTemplate.send(record); future.addCallback(new ProducerCallback(startTime, key, entity)); }}

源码下载如下:

转载地址:https://it1995.blog.csdn.net/article/details/104262199 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Kafka笔记-Spring Boot消费者构造
下一篇:Leaflet工作笔记-GIS地图上构造echarts的3D图

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年04月09日 02时13分13秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章