(三)Producer源码核心流程初探
发布日期:2021-11-18 17:47:29 浏览次数:7 分类:技术文章

本文共 14109 字,大约阅读时间需要 47 分钟。

文章目录


源码剖析

KafkaProducer

private KafkaProducer(ProducerConfig config, Serializer
keySerializer, Serializer
valueSerializer) {
try {
log.trace("Starting the Kafka producer"); // 配置一些用户自定义的参数 Map
userProvidedConfigs = config.originals(); this.producerConfig = config; this.time = new SystemTime(); //配置clientId clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); Map
metricTags = new LinkedHashMap
(); metricTags.put("client-id", clientId); //metric一些东西,我们一般分析源码的时候 不需要去关心 MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) .tags(metricTags); List
reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); //设置分区器 this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); /** */ //重试时间 retry.backoff.ms 默认100ms long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); //设置序列化器 if (keySerializer == null) { this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); this.keySerializer.configure(config.originals(), true); } else { config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); this.keySerializer = keySerializer; } if (valueSerializer == null) { this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); this.valueSerializer.configure(config.originals(), false); } else { config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); this.valueSerializer = valueSerializer; } // load interceptors and make sure they get clientId // userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); //设置拦截器 //类似于一个过滤器 List
> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class); this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList); ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters); //这个元数据的结构,可能就是生产者从服务端那儿拉取过来的kafka的元数据。 //生产者要想去拉取元数据, 发送网络请求,重试, //metadata.max.age.ms 生产者每隔一段时间都要去更新一下自己的元数据。 //默认就是每隔5分钟 去服务端那儿重新拉取元数据。 this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners); //max.request.size 生产者往服务端发送消息的时候,规定一条消息最大多大? //如果你超过了这个规定消息的大小,你的消息就不能发送过去。 //默认是1M,这个值偏小,在生产环境中,我们需要修改这个值。 //经验值是10M。但是大家也可以根据自己公司的情况来。 this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); //指的是缓存大小 //默认值是32M,这个值一般是够用,如果有特殊情况的时候,我们可以去修改这个值。 this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); //kafka是支持压缩数据的,这儿设置压缩格式。 //提高你的系统的吞吐量,你可以设置压缩格式。 //一次发送出去的消息就更多。生产者这儿会消耗更多的cpu. this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); /* check for user defined settings. * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG. * This should be removed with release 0.9 when the deprecated configs are removed. */ if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) { log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " + "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG); if (blockOnBufferFull) { this.maxBlockTimeMs = Long.MAX_VALUE; } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); } else { this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); } } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) { log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " + "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG); this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG); } else { this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); } /* check for user defined settings. * If the TIME_OUT config is set use that for request timeout. * This should be removed with release 0.9 */ if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) { log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " + ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG); } else { this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); } //TODO 创建了一个核心的组件 this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, this.compressionType, config.getLong(ProducerConfig.LINGER_MS_CONFIG), retryBackoffMs, metrics, time); List
addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); //去更新元数据 //addresses 这个地址其实就是我们写producer代码的时候,传参数的时候,传进去了一个broker的地址。 //所以这段代码看起来像是去服务端拉取元数据,所以我们去验证一下,是否真的去拉取元数据。 //TODO update方法初始化的时候并没有去服务端拉取元数据。 this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); //TODO 初始化了一个重要的管理网路的组件。 /** *connections.max.idle.ms: 默认值是9分钟,一个网络连接最多空闲多久,超过这个空闲时间,就关闭这个网络连接。 * * max.in.flight.requests.per.connection:默认是5,producer -》 broker 。 * 发送数据的时候,其实是有多个网络连接。每个网络连接可以忍受 producer端发送给broker 消息然后消息没有响应的个数。 * 因为kafka有重试机制,所以有可能会造成数据乱序,如果想要保证有序,这个值要把设置为1. * * send.buffer.bytes:socket发送数据的缓冲区的大小,默认值是128K * receive.buffer.bytes:socket接受数据的缓冲区的大小,默认值是32K。 */ NetworkClient client = new NetworkClient( new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder), this.metadata, clientId, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), this.requestTimeoutMs, time); //retries:重试的次数 //我们在项目中一般都会去设置重试, /*** * acks: * 0: * producer发送数据到broker后,就完了,没有返回值,不管写成功还是写失败都不管了。 * 1: * producer发送数据到broker后,数据成功写入leader partition以后返回响应。 * 数据 -》 broker(leader partition) * -1: * producer发送数据到broker后,数据要写入到leader partition里面,并且数据同步到所有的 * follower partition里面以后,才返回响应。 * * 这样我们才能保证不丢数据。 * * */ //这个就是一个线程 this.sender = new Sender(client, this.metadata, this.accumulator, config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), this.metrics, new SystemTime(), clientId, this.requestTimeoutMs); String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : ""); //创建了一个线程,然后里面传进去了一个sender对象。 //把业务的代码和关于线程的代码给隔离开来。 //关于线程的这种代码设计的方式,其实也值得大家积累的。 this.ioThread = new KafkaThread(ioThreadName, this.sender, true); //启动线程。 this.ioThread.start(); this.errors = this.metrics.sensor("errors"); config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId); log.debug("Kafka producer started"); } catch (Throwable t) { // call close methods if internal objects are already constructed // this is to prevent resource leak. see KAFKA-2121 close(0, TimeUnit.MILLISECONDS, true); // now propagate the exception throw new KafkaException("Failed to construct kafka producer", t); } }

doSend

private Future
doSend(ProducerRecord
record, Callback callback) {
TopicPartition tp = null; try {
/** * 步骤一: * 同步等待拉取元数据。 * maxBlockTimeMs 最多能等待多久。 */ ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); //clusterAndWaitTime.waitedOnMetadataMs 代表的是拉取元数据用了多少时间。 //maxBlockTimeMs -用了多少时间 = 还剩余多少时间可以使用。 long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); //更新集群的元数据 Cluster cluster = clusterAndWaitTime.cluster; /** * 步骤二: * 对消息的key和value进行序列化。 */ byte[] serializedKey; try {
serializedKey = keySerializer.serialize(record.topic(), record.key()); } catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer"); } byte[] serializedValue; try {
serializedValue = valueSerializer.serialize(record.topic(), record.value()); } catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer"); } /** * 步骤三: * 根据分区器选择消息应该发送的分区。 * * 因为前面我们已经获取到了元数据Ulead * 这儿我们就可以根据元数据的信息 * 计算一下,我们应该要把这个数据发送到哪个分区上面。 */ int partition = partition(record, serializedKey, serializedValue, cluster); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); /** * 步骤四: * 确认一下消息的大小是否超过了最大值。 * KafkaProdcuer初始化的时候,指定了一个参数,代表的是Producer这儿最大能发送的是一条消息能有多大 * 默认最大是1M,我们一般都回去修改它。 */ ensureValidRecordSize(serializedSize); /** * 步骤五: * 根据元数据信息,封装分区对象 */ tp = new TopicPartition(record.topic(), partition); long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); // producer callback will make sure to call both 'callback' and interceptor callback /** * 步骤六: * 给每一条消息都绑定他的回调函数。因为我们使用的是异步的方式发送的消息。 */ Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp); /** * 步骤七: * 把消息放入accumulator(32M的一个内存) * 然后有accumulator把消息封装成为一个批次一个批次的去发送。 */ RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs); //如果批次满了 //或者新创建出来一个批次 if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); /** * 步骤八: * 唤醒sender线程。他才是真正发送数据的线程。 */ this.sender.wakeup(); } return result.future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) {
log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); if (this.interceptors != null) this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) {
this.errors.record(); if (this.interceptors != null) this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); } catch (BufferExhaustedException e) {
this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); if (this.interceptors != null) this.interceptors.onSendError(record, tp, e); throw e; } catch (KafkaException e) {
this.errors.record(); if (this.interceptors != null) this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method if (this.interceptors != null) this.interceptors.onSendError(record, tp, e); throw e; } /** * * 大型项目架构,在设计异常体系的可能没有思路。 * 所以我们可以学习一下kafak(全世界比较优秀的开源的项目) * 核心流程捕捉各种异常 * 底层代码的异常往上抛: * 为了能报错的时候让用户能直观的知道程序出了什么问题, * 我们会发现kafka这儿会自定义了很多异常。 * 这种方式也是值得大家学习。 * * */ }

转载地址:https://blog.csdn.net/weixin_37850264/article/details/112314453 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:(五)生产者源码之分区选择
下一篇:(四)producer拉取元数据剖析

发表评论

最新留言

路过,博主的博客真漂亮。。
[***.116.15.85]2024年04月12日 18时35分34秒