Kafka入门(三)Kafka 生产者 Producer
发布日期:2021-05-18 04:42:49 浏览次数:24 分类:精选文章

本文共 6974 字,大约阅读时间需要 23 分钟。

Kafka生产者基础

Kafka生产者是将数据发布到Kafka topics的核心组件。通过合理配置Kafka生产者的方式,能够实现高效、可靠的数据发布。本文将详细介绍Kafka生产者的工作模式、代码实现以及负载均衡方案。


1.1 Producer发送模式

Kafka生产者在发送数据时,主要有以下三种发送模式:

  • 同步发送(异步阻塞)

    在这种模式下,生产者会等待消息的确认_receipt。Kafka从变成生产者发送消息后,至少等待acks="all",确保所有副本都成功接收到数据。这保证了消息的完整性,但同时也增加了延迟。而通过配置retries,可以配置重试次数,可以避免因leader失效而导致消息丢失。

  • 异步发送

    异步发送允许生产者立即提交消息请求,而不等待响应。这种方式适合对延迟敏感的场景,但由于缺乏确认机制,不能保证消息的可靠性。需要根据业务需求决定是否使用。

  • 异步回调发送

    这种模式结合了异步发送的特点,同时引入了回调机制。当消息发送完成或失败时,会触发相应的回调函数。这种方式适合对消息状态感兴趣的场景,但会增加生产者的复杂性。


  • 1.2代码片段

    以下是一个典型的Kafka生产者代码示例,展示了异步发送模式和异步回调发送模式的实现:

    package com.item.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import java.util.Properties;import java.util.concurrent.ExecutionException;public class ProducerSample {    final static String TOPIC_NAME = "demo_topic1";    public static void main(String[] args) throws ExecutionException, InterruptedException {        // 异步发送        producerSend();        // 异步阻塞(同步)发送        producerSendBlock();        // 异步发送带回调函数        producerSendWithCallback();        // 异步发送带回调函数和Partition负载均衡        producerSendWithCallbackAndPartition();    }    /**      * 异步发送     */    public static void producerSend() {        Properties properties = new Properties();        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");        properties.put(ProducerConfig.ACKS_CONFIG, "all");        properties.put(ProducerConfig.RETRIES_CONFIG, "0");        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");        properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");        Producer
    producer = new KafkaProducer<>(properties); for (int i = 0; i < 10; i++) { ProducerRecord
    stringStringProducerRecord = new ProducerRecord<>(TOPIC_NAME, "key" + i, "value" + i); producer.send(stringStringProducerRecord); } producer.close(); } /** * 异步阻塞(同步)发送 */ public static void producerSendBlock() throws ExecutionException, InterruptedException { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.RETRIES_CONFIG, "0"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); properties.put(ProducerConfig.LINGER_MS_CONFIG, "1"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producer
    producer = new KafkaProducer<>(properties); for (int i = 0; i < 10; i++) { String key = "key-" + i; ProducerRecord
    stringStringProducerRecord = new ProducerRecord<>(TOPIC_NAME, key, "value" + i); Future
    send = producer.send(stringStringProducerRecord); RecordMetadata recordMetadata = send.get(); System.out.println(key + ":" + recordMetadata.partition() + " , offset : " + recordMetadata.offset()); } producer.close(); } /** * 异步发送带回调函数 */ public static void producerSendWithCallback() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.RETRIES_CONFIG, "0"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); properties.put(ProducerConfig.LINGER_MS_CONFIG, "1"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producer
    producer = new KafkaProducer<>(properties); for (int i = 0; i < 10; i++) { ProducerRecord
    record = new ProducerRecord<>(TOPIC_NAME, "key-" + i, "value-" + i); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { System.out.println("发送失败:" + e.getMessage()); } else { System.out.println("partition : " + recordMetadata.partition() + " , offset : " + recordMetadata.offset()); } } }); } producer.close(); } /** * 异步发送带回调函数和Partition负载均衡 */ public static void producerSendWithCallbackAndPartition() { Properties properties = new Properties(); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.RETRIES_CONFIG, "0"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); properties.put(ProducerConfig.LINGER_MS_CONFIG, "1"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.item.kafka.producer.SamplePartition"); Producer
    producer = new KafkaProducer<>(properties); for (int i = 0; i < 10; i++) { ProducerRecord
    record = new ProducerRecord<>(TOPIC_NAME, "key-" + i, "value-" + i); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println("partition : " + recordMetadata.partition() + " , offset : " + recordMetadata.offset()); } }); } producer.close(); }}

    自定义负载均衡方案

    通过自定义负载均衡器,可以根据业务需求实现Topic的数据分区策略。在本文中,提供一个简单的负载均衡器实现:

    package com.item.kafka.producer;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;public class SamplePartition implements Partitioner {    @Override    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {        String keyStr = key + "";        String keyInt = keyStr.substring(4);        try {            int i = Integer.parseInt(keyInt);            return i % 2;        } catch (NumberFormatException e) {            return 0;        }    }    @Override    public void close() {    }    @Override    public void configure(Map
    configs) { }}

    该负载均衡器会对Topic的数据按照Key的特定规则进行分区,支持多种负载均衡策略的灵活配置。


    通过上述内容,可以对Kafka生产者的工作原理、配置方式以及负载均衡方案有清晰的了解。这对于优化实际业务中的Kafka生产者配置具有重要参考价值。

    上一篇:Kafka入门(常见错误)
    下一篇:Kafka入门(二)Kafka客户端操作 --AdminClient API

    发表评论

    最新留言

    能坚持,总会有不一样的收获!
    [***.219.124.196]2025年04月30日 05时35分07秒