
Kafka入门(三)Kafka 生产者 Producer
发布日期:2021-05-18 04:42:49
浏览次数:24
分类:精选文章
本文共 6974 字,大约阅读时间需要 23 分钟。
Kafka生产者基础
Kafka生产者是将数据发布到Kafka topics的核心组件。通过合理配置Kafka生产者的方式,能够实现高效、可靠的数据发布。本文将详细介绍Kafka生产者的工作模式、代码实现以及负载均衡方案。
1.1 Producer发送模式
Kafka生产者在发送数据时,主要有以下三种发送模式:
同步发送(异步阻塞)
在这种模式下,生产者会等待消息的确认_receipt。Kafka从变成生产者发送消息后,至少等待acks="all"
,确保所有副本都成功接收到数据。这保证了消息的完整性,但同时也增加了延迟。而通过配置retries
,可以配置重试次数,可以避免因leader
失效而导致消息丢失。异步发送
异步发送允许生产者立即提交消息请求,而不等待响应。这种方式适合对延迟敏感的场景,但由于缺乏确认机制,不能保证消息的可靠性。需要根据业务需求决定是否使用。异步回调发送
这种模式结合了异步发送的特点,同时引入了回调机制。当消息发送完成或失败时,会触发相应的回调函数。这种方式适合对消息状态感兴趣的场景,但会增加生产者的复杂性。1.2代码片段
以下是一个典型的Kafka生产者代码示例,展示了异步发送模式和异步回调发送模式的实现:
package com.item.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import java.util.Properties;import java.util.concurrent.ExecutionException;public class ProducerSample { final static String TOPIC_NAME = "demo_topic1"; public static void main(String[] args) throws ExecutionException, InterruptedException { // 异步发送 producerSend(); // 异步阻塞(同步)发送 producerSendBlock(); // 异步发送带回调函数 producerSendWithCallback(); // 异步发送带回调函数和Partition负载均衡 producerSendWithCallbackAndPartition(); } /** * 异步发送 */ public static void producerSend() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.RETRIES_CONFIG, "0"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); properties.put(ProducerConfig.LINGER_MS_CONFIG, "1"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer<>(properties); for (int i = 0; i < 10; i++) { ProducerRecord stringStringProducerRecord = new ProducerRecord<>(TOPIC_NAME, "key" + i, "value" + i); producer.send(stringStringProducerRecord); } producer.close(); } /** * 异步阻塞(同步)发送 */ public static void producerSendBlock() throws ExecutionException, InterruptedException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.RETRIES_CONFIG, "0"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); properties.put(ProducerConfig.LINGER_MS_CONFIG, "1"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer<>(properties); for (int i = 0; i < 10; i++) { String key = "key-" + i; ProducerRecord stringStringProducerRecord = new ProducerRecord<>(TOPIC_NAME, key, "value" + i); Future send = producer.send(stringStringProducerRecord); RecordMetadata recordMetadata = send.get(); System.out.println(key + ":" + recordMetadata.partition() + " , offset : " + recordMetadata.offset()); } producer.close(); } /** * 异步发送带回调函数 */ public static void producerSendWithCallback() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.RETRIES_CONFIG, "0"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); properties.put(ProducerConfig.LINGER_MS_CONFIG, "1"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer<>(properties); for (int i = 0; i < 10; i++) { ProducerRecord record = new ProducerRecord<>(TOPIC_NAME, "key-" + i, "value-" + i); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { System.out.println("发送失败:" + e.getMessage()); } else { System.out.println("partition : " + recordMetadata.partition() + " , offset : " + recordMetadata.offset()); } } }); } producer.close(); } /** * 异步发送带回调函数和Partition负载均衡 */ public static void producerSendWithCallbackAndPartition() { Properties properties = new Properties(); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.RETRIES_CONFIG, "0"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); properties.put(ProducerConfig.LINGER_MS_CONFIG, "1"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.item.kafka.producer.SamplePartition"); Producer producer = new KafkaProducer<>(properties); for (int i = 0; i < 10; i++) { ProducerRecord record = new ProducerRecord<>(TOPIC_NAME, "key-" + i, "value-" + i); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println("partition : " + recordMetadata.partition() + " , offset : " + recordMetadata.offset()); } }); } producer.close(); }}
自定义负载均衡方案
通过自定义负载均衡器,可以根据业务需求实现Topic的数据分区策略。在本文中,提供一个简单的负载均衡器实现:
package com.item.kafka.producer;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;public class SamplePartition implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { String keyStr = key + ""; String keyInt = keyStr.substring(4); try { int i = Integer.parseInt(keyInt); return i % 2; } catch (NumberFormatException e) { return 0; } } @Override public void close() { } @Override public void configure(Mapconfigs) { }}
该负载均衡器会对Topic的数据按照Key的特定规则进行分区,支持多种负载均衡策略的灵活配置。
通过上述内容,可以对Kafka生产者的工作原理、配置方式以及负载均衡方案有清晰的了解。这对于优化实际业务中的Kafka生产者配置具有重要参考价值。
发表评论
最新留言
能坚持,总会有不一样的收获!
[***.219.124.196]2025年04月30日 05时35分07秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
Shell脚本防DNS攻击检测并删除肉机IP
2019-03-13
如何在VSCode中定制JSON的IntelliSense
2019-03-13
椭圆曲线的定义
2019-03-13
多代理区块链框架客户端的操作
2019-03-13
RSA操作中的公钥和私钥的生成
2019-03-13
go语言中类的继承和方法的使用
2019-03-13
caffe训练的时候遇到的text-format 错误解决方案。
2019-03-13
Java 8新特性(一):Lambda表达式
2019-03-13
Little Zu Chongzhi's Triangles
2019-03-13
Train Problem II(卡特兰数+大数乘除)
2019-03-13
一些技术博客
2019-03-13
第01问:MySQL 一次 insert 刷几次盘?
2019-03-13
分布式 | DBLE 3.20.07.0 来啦!
2019-03-13
振荡器指标
2019-03-13
libvirtd:内部错误:Failed to apply firewall rule
2019-03-13
优先级队列2
2019-03-13
TiKV 源码解析系列文章(十三)MVCC 数据读取
2019-03-13
1900分图论 : 1183E1 LCA + Kruskal
2019-03-13
(建议收藏)计算机网络:传输层概述、UDP协议与可靠传输协议习题解析与拓展
2019-03-13
Android 开发常用的工具类(更新ing)
2019-03-13