(七)RecordAccumulator详细剖析
发布日期:2021-11-18 17:47:30 浏览次数:8 分类:技术文章

本文共 11063 字,大约阅读时间需要 36 分钟。

文章目录


getOrCreateDeque

private Deque
getOrCreateDeque(TopicPartition tp) {
//直接从batches里面获取当前分区对应的存储队列 Deque
d = this.batches.get(tp); //我们现在用到是场景驱动的方式,代码第一次执行到这儿的死活 //是获取不到队列的,也就是说d 这个变量的值为null if (d != null) return d; //代码继续执行,创建出来一个新的空队列, d = new ArrayDeque<>(); //把这个空的队列存入batches 这个数据结构里面 Deque
previous = this.batches.putIfAbsent(tp, d); if (previous == null) return d; else //直接返回新的结果 return previous; }

batches的数据结构

//TopicPartition 分区 -》  Deque
队列 private final ConcurrentMap
> batches;
//CopyOnWriteMap的这样的一个数据类型。        //这个数据结构在jdk里面是没有的,是kafka自己设计的。        //这个数据结构设计得很好,因为有了这个数据结构        //整体的提升了封装批次的这个流程的性能!!        //它的设计主要参考 JDK  juc包下面:CopyOnWriteArrayList        this.batches = new CopyOnWriteMap<>();

CopyOnWriteMap的设计

/** * A simple read-optimized map implementation that synchronizes only writes and does a full copy on each modification * * 1) 这个数据结构是在高并发的情况下是线程安全的。 * 2)  采用的读写分离的思想设计的数据结构 *      每次插入(写数据)数据的时候都开辟新的内存空间 *      所以会有个小缺点,就是插入数据的时候,会比较耗费内存空间。 * 3)这样的一个数据结构,适合写少读多的场景。 *      读数据的时候性能很高。 * * batchs这个对象存储数据的时候,就是使用的这个数据结构。 * 对于batches来说,它面对的场景就是读多写少的场景。 * *batches: *   读数据: *      每生产一条消息,都会从batches里面读取数据。 *      假如每秒中生产10万条消息,是不是就意味着每秒要读取10万次。 *      所以绝对是一个高并发的场景。 *   写数据: *     假设有100个分区,那么就是会插入100次数据。 *     并且队列只需要插入一次就可以了。 *     所以这是一个低频的操作。 * * * */public class CopyOnWriteMap
implements ConcurrentMap
{
/** * 核心的变量就是一个map * 这个map有个特点,它的修饰符是volatile关键字。 * 在多线程的情况下,如果这个map的值发生变化,其他线程也是可见的。 * * get * put */ private volatile Map
map; /** * 没有加锁,读取数据的时候性能很高(高并发的场景下,肯定性能很高) * 并且是线程安全的。 * 因为人家采用的读写分离的思想。 * @param k * @return */ @Override public V get(Object k) {
return map.get(k); } /** * 1): * 整个方法使用的是synchronized关键字去修饰的,说明这个方法是线程安全。 * 即使加了锁,这段代码的性能依然很好,因为里面都是纯内存的操作。 * 2) * 这种设计方式,采用的是读写分离的设计思想。 * 读操作和写操作 是相互不影响的。 * 所以我们读数据的操作就是线程安全的。 *3) * 最后把值赋给了map,map是用volatile关键字修饰的。 * 说明这个map是具有可见性的,这样的话,如果get数据的时候,这儿的值发生了变化,也是能感知到的。 */ @Override public synchronized V put(K k, V v) {
//新的内存空间,在读的时候先开辟一个新内存,再合并给map Map
copy = new HashMap
(this.map); //插入数据 V prev = copy.put(k, v); //赋值给map this.map = Collections.unmodifiableMap(copy); return prev; } @Override public synchronized void putAll(Map
entries) {
Map
copy = new HashMap
(this.map); copy.putAll(entries); this.map = Collections.unmodifiableMap(copy); } @Override public synchronized V remove(Object key) { Map
copy = new HashMap
(this.map); V prev = copy.remove(key); this.map = Collections.unmodifiableMap(copy); return prev; } @Override public synchronized V putIfAbsent(K k, V v) { //如果我们传进来的这个key不存在 if (!containsKey(k)) //那么就调用里面内部的put方法 return put(k, v); else //返回结果 return get(k); }}

tryAppend

private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque
deque) {
//首先要获取到队列里面一个批次 RecordBatch last = deque.peekLast(); //第一次进来是没有批次的,所以last肯定为null //线程二进来的时候,这个last不为空 if (last != null) {
//线程二就插入数据就ok了 FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds()); if (future == null) last.records.close(); else //返回值就不为null了 return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false); } //返回结果就是一个null值 return null; }

分段加锁:

该加锁的地方就加锁,不该加锁的地方就不要加锁。

在高并发的情况下,尽可能的提升代码的性能。

//这个方法,肯定是超过并发被调用的。//高并发调用的情况下,我们就必须要保证线程安全。append(){
xxxx sync{
xxx } xxxx sync{
xxx }}

详情可以看(六)的核心流程

内存池的设计

public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
//如果你想要申请的内存的大小,如果超过32M if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of " + this.totalMemory + " on memory allocations."); //加锁的代码 this.lock.lock(); try {
// check if we have a free buffer of the right size pooled //poolableSize 代表的是一个批次的大小,默认情况一下,一个批次的大小是16K。 //如果我们这次申请的批次的大小等于 我们设定好的一个批次的大小 //并且我们的内存池不为空,那么直接从内存池里面获取一个块内存就可以使用了。 //跟我们连接池是一个道理。 //我们场景驱动的方式,第一次进来 //内存池里面里面是没有内存的,所以这儿是获取不到内存。 if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // now check if the request is immediately satisfiable with the // memory on hand or if we need to block //内存的个数 * 批次的大小 = free的大小 int freeListSize = this.free.size() * this.poolableSize; // size: 我们这次要申请的内存 //this.availableMemory + freeListSize 目前可用的总内存 //this.availableMemory + freeListSize 目前可用的总内存 大于你要申请的内存。 if (this.availableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately // satisfy the request freeUp(size); //进行内存扣减 this.availableMemory -= size; lock.unlock(); //直接分配内存 return ByteBuffer.allocate(size); } else {
//还有一种情况就是,我们整个内存池 还剩10k的内存,但是我们这次申请的内存是32k //批次可能就是16k,但是我们的一条消息,就是32K -> max(15,32) = 当前批次 = 32K // we are out of memory and will have to block //统计分配的内存 int accumulated = 0; ByteBuffer buffer = null; Condition moreMemory = this.lock.newCondition(); long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); //等待 被人释放内存 this.waiters.addLast(moreMemory); // loop over and over until we have a buffer or have reserved // enough memory to allocate one /** * 总的分配的思路,可能一下子分配不了这么大的内存,但是可以先有点分配一点。 * */ //如果分配的内存的大小 还是没有要申请的内存大小大。 //内存池就会一直分配的内存,一点一点的去分配。 //等着别人会释放内存。 while (accumulated < size) {
long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try {
//在等待,等待别人释放内存。 //如果这儿的代码是等待wait操作 //那么我们可以猜想一下,当有人释放内存的时候 //肯定不是得唤醒这儿的代码 //目前代码一直在等待着 //假设,突然有人 往内存池里面还了内存。 //那么这儿的代码就可以被唤醒了。代码就继续往下执行。 waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } catch (InterruptedException e) {
this.waiters.remove(moreMemory); throw e; } finally {
long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); this.waitTime.record(timeNs, time.milliseconds()); } if (waitingTimeElapsed) {
this.waiters.remove(moreMemory); throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } remainingTimeToBlockNs -= timeNs; // check if we can satisfy this request from the free list, // otherwise allocate memory //再次方式看一下,内存池里面有没有数据了。 //如果内存池里面有数据 //并且你申请的内存的大小就是一个批次的大小 if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list //这儿就可以直接获取到内存。 buffer = this.free.pollFirst(); accumulated = size; } else {
// we'll need to allocate memory, but we may only get // part of what we need on this iteration freeUp(size - accumulated); // 可以给你分配的内存 int got = (int) Math.min(size - accumulated, this.availableMemory); //做内存扣减 this.availableMemory -= got; //累加已经分配了多少内存。 accumulated += got; } } // remove the condition for this thread to let the next thread // in line start getting memory Condition removed = this.waiters.removeFirst(); if (removed != moreMemory) throw new IllegalStateException("Wrong condition: this shouldn't happen."); // signal any additional waiters if there is more memory left // over for them if (this.availableMemory > 0 || !this.free.isEmpty()) {
if (!this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } // unlock and return the buffer lock.unlock(); if (buffer == null) return ByteBuffer.allocate(size); else return buffer; } } finally {
if (lock.isHeldByCurrentThread()) lock.unlock(); } }

内存的数据结构

在这里插入图片描述

//池子就是一个队列,队列里面放的就是一个块一块的内存    //就是跟一个连接池是一个道理。    private final Deque
free;

dosend()中释放内存,会唤醒正在等待的分配内存的线程

public void deallocate(ByteBuffer buffer, int size) {
lock.lock(); try {
//如果你还回来的内存的大小 就等于一个批次的大小, //我们的参数设置的内存是16K,你计算出来一个批次的大小也是16,申请的内存也是16k if (size == this.poolableSize && size == buffer.capacity()) {
//内存里面的东西清空 buffer.clear(); //把内存放入到内存池 this.free.add(buffer); } else {
//但是如果 我们释放的内存的大小 //不是一个批次的大小,那就把归为可用内存 //等着垃圾回收即可 this.availableMemory += size; } Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) //释放了内存(或者是还了内存以后) //都会唤醒等待内存的线程。 //接下来是不是还是要唤醒正在等待分配内存的线程。 moreMem.signal(); } finally {
lock.unlock(); } }

转载地址:https://blog.csdn.net/weixin_37850264/article/details/112320589 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:(八)Sender线程运行流程初探
下一篇:(六)RecordAccumulator封装消息程总体流程

发表评论

最新留言

哈哈,博客排版真的漂亮呢~
[***.90.31.176]2024年03月28日 00时46分59秒