(六)RecordAccumulator封装消息程总体流程
发布日期:2021-11-18 17:47:30 浏览次数:9 分类:技术文章

本文共 6038 字,大约阅读时间需要 20 分钟。

文章目录


ensureValidRecordSize

private void ensureValidRecordSize(int size) {
//如果一条消息的大小超过了 1M,那么就会报错 if (size > this.maxRequestSize) //自定义了异常。 throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the maximum request size you have configured with the " + ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration."); //如果你的一条消息的大小超过32M也会报错。 if (size > this.totalMemorySize) throw new RecordTooLargeException("The message is " + size + " bytes when serialized which is larger than the total memory buffer you have configured with the " + ProducerConfig.BUFFER_MEMORY_CONFIG + " configuration."); }

原理图

在这里插入图片描述

每个消息根据分区号找到对应的队列(每个队列都有一个TopicPartition),
kafka设计了内存池,每个批次的默认大小是16k,当批次发完出去之后,就把内存放入内存池,当需要内存时就从内存池拿取,减少了内存的GC;

源码剖析

public  RecordAppendResult append(TopicPartition tp,                                     long timestamp,                                     byte[] key,                                     byte[] value,                                     Callback callback,                                     long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); try {
// check if we have an in-progress batch /** * 步骤一:先根据分区找到应该插入到哪个队列里面。 * 如果有已经存在的队列,那么我们就使用存在队列 * 如果队列不存在,那么我们新创建一个队列 * * 我们肯定是有了存储批次的队列,但是大家一定要知道一个事 * 我们代码第一次执行到这儿,获取其实就是一个空的队列。 * * 现在代码第二次执行进来。 * 假设 分区还是之前的那个分区。 * * 这个方法里面我们之前分析,里面就是针对batchs进行的操作 * 里面kafka自己封装了一个数据结构:CopyOnWriteMap (这个数据结构本来就是线程安全的) */ Deque
dq = getOrCreateDeque(tp); /** * 假设我们现在有线程一,线程二,线程三 * */ synchronized (dq) {
//首先进来的是第一个线程 if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); /** * 步骤二: * 尝试往队列里面的批次里添加数据 * * 一开始添加数据肯定是失败的,我们目前只是以后了队列 * 数据是需要存储在批次对象里面(这个批次对象是需要分配内存的) * 我们目前还没有分配内存,所以如果按场景驱动的方式, * 代码第一次运行到这儿其实是不成功的。 */ RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); //第一次进来的时候appendResult的值就为null if (appendResult != null) return appendResult; }//释放锁 // we don't have an in-progress record batch try to allocate a new batch /** * 步骤三:计算一个批次的大小 * 在消息的大小和批次的大小之间取一个最大值,用这个值作为当前这个批次的大小。 * 有可能我们的一个消息的大小比一个设定好的批次的大小还要大。 * 默认一个批次的大小是16K。 * 所以我们看到这段代码以后,应该给我们一个启示。 * 如果我们生产者发送数的时候,如果我们的消息的大小都是超过16K, * 说明其实就是一条消息就是一个批次,那也就是说消息是一条一条被发送出去的。 * 那如果是这样的话,批次这个概念的设计就没有意义了 * 所以大家一定要根据自定公司的数据大小的情况去设置批次的大小。 */ int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); /** * 步骤四: * 根据批次的大小去分配内存 * * * 线程一,线程二,线程三,执行到这儿都会申请内存 * 假设每个线程 都申请了 16k的内存。 */ ByteBuffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) {
//假设线程一 进来了。 //线程二就进来了 // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new IllegalStateException("Cannot send after the producer is closed."); /** * 步骤五: * 尝试把数据写入到批次里面。 * 代码第一次执行到这儿的时候 依然还是失败的(appendResult==null) * 目前虽然已经分配了内存 * 但是还没有创建批次,那我们向往批次里面写数据 * 还是不能写的。 * * 线程二进来执行这段代码的时候,是成功的。 */ RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq); //失败的意思就是appendResult 还是会等于null if (appendResult != null) {
//释放内存 //线程二到这儿,其实他自己已经把数据写到批次了。所以 //他的内存就没有什么用了,就把内存个释放了(还给内存池了。) free.deallocate(buffer); return appendResult; } /** * 步骤六: * 根据内存大小封装批次 * * * 线程一到这儿 会根据内存封装出来一个批次。 */ MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize); RecordBatch batch = new RecordBatch(tp, records, time.milliseconds()); //尝试往这个批次里面写数据,到这个时候 我们的代码会执行成功。 //线程一,就往批次里面写数据,这个时候就写成功了。 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds())); /** * 步骤七: * 把这个批次放入到这个队列的队尾 * * * 线程一把批次添加到队尾 */ dq.addLast(batch); incomplete.add(batch); return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true); }//释放锁 } finally {
appendsInProgress.decrementAndGet(); } }

转载地址:https://blog.csdn.net/weixin_37850264/article/details/112318681 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:(七)RecordAccumulator详细剖析
下一篇:(五)生产者源码之分区选择

发表评论

最新留言

留言是一种美德,欢迎回访!
[***.207.175.100]2024年04月07日 14时17分07秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

史上最简单的spring-boot集成websocket的实现方式 2019-04-27
带你玩转属于自己的spring-boot-starter系列(一) 2019-04-27
带你玩转属于自己自己的spring-boot-starter系列(二) 2019-04-27
带你玩转属于自己的spring-boot-starter系列(三) 2019-04-27
基于SnowFlake算法如何让分库分表中不同的ID落在同一个库的算法的实现 2019-04-27
基于springboot的ShardingSphere5.X的分库分表的解决方案之分库解决方案(二) 2019-04-27
基于springboot的ShardingSphere5.X的分库分表的解决方案之分表解决方案(一) 2019-04-27
基于springboot的ShardingSphere5.X的分库分表的解决方案之关联查询解决方案(三) 2019-04-27
基于springboot的ShardingSphere5.X的分库分表的解决方案之基于seata的分布式事务的解决方案(十五) 2019-04-27
Linux文件管理参考 2019-04-27
FTP文件管理项目(本地云)项目日报(一) 2019-04-27
FTP文件管理项目(本地云)项目日报(二) 2019-04-27
FTP文件管理项目(本地云)项目日报(三) 2019-04-27
FTP文件管理项目(本地云)项目日报(四) 2019-04-27
【C++】勉强能看的线程池详解 2019-04-27
FTP文件管理项目(本地云)项目日报(五) 2019-04-27
FTP文件管理项目(本地云)项目日报(关于不定长包的测试) 2019-04-27
FTP文件管理项目(本地云)项目日报(六) 2019-04-27
FTP文件管理项目(本地云)项目日报(七) 2019-04-27
FTP文件管理项目(本地云)项目日报(八) 2019-04-27