(八)Sender线程运行流程初探
发布日期:2021-11-18 17:47:31
浏览次数:7
分类:技术文章
本文共 13534 字,大约阅读时间需要 45 分钟。
文章目录
总览
void run(long now) { //获取元数据 //因为我们是根据场景驱动的方式,目前是我们第一次代码进来, //目前还没有获取到元数据 //所以这个cluster里面是没有元数据 //如果这儿没有元数据的话,这个方法里面接下来的代码就不用看了 //是以为接下来的这些代码依赖这个元数据。 //TODO 我们直接看这个方法的最后一行代码 //就是这行代码去拉取的元数据。 /** * 我们用场景驱动的方式,现在我们的代码是第二次进来 * 第二次进来的时候,已经有元数据了,所以cluster这儿是有元数据。 * * 步骤一: * 获取元数据 * * */ Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send /** * 步骤二: * 首先是判断哪些partition有消息可以发送,获取到这个partition的leader partition * 对应的broker主机。 * * 哪些broker上面需要我们去发送消息? */ RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); /** * 步骤三: * 标识还没有拉取到元数据的topic */ if (!result.unknownLeaderTopics.isEmpty()) { // The set of topics with unknown leader contains topics with leader election pending as well as // topics which may have expired. Add the topic again to metadata to ensure it is included // and request metadata update, since there are messages to send to the topic. for (String topic : result.unknownLeaderTopics) this.metadata.add(topic); this.metadata.requestUpdate(); } // remove any nodes we aren't ready to send to Iteratoriter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); /** * 步骤四:检查与要发送数据的主机的网络是否已经建立好。 */ if (!this.client.ready(node, now)) { //如果返回的是false !false 代码就进来 //移除result 里面要发送消息的主机。 //所以我们会看到这儿所有的主机都会被移除 iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } /** * 步骤五: * * 我们有可能要发送的partition有很多个, * 很有可能有一些partition的leader partition是在同一台服务器上面。 * p0:leader:0 * p1:leader: 0 * p2:leader: 1 * p3:leader: 2 * 假设我们集群只有3台服务器 * 当我们的分区的个数大于集群的节点的个数的时候,一定会有多个leader partition在同一台服务器上面。 * * 按照broker进行分组,同一个broker的partition为同一组 * 0:{p0,p1} * 1:{p2} * 2:{p3} * */ //所以我们发现 如果网络没有建立的话,这儿的代码是不执行的 Map > batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); if (guaranteeMessageOrder) { // Mute all the partitions drained //如果batches 空的话,这而的代码也就不执行了。 for (List batchList : batches.values()) { for (RecordBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } /** * 步骤六: * 对超时的批次是如何处理的? * */ List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); /** * 步骤七: * 创建发送消息的请求 * * * 创建请求 * 我们往partition上面去发送消息的时候,有一些partition他们在同一台服务器上面 * ,如果我们一分区一个分区的发送我们网络请求,那网络请求就会有一些频繁 * 我们要知道,我们集群里面网络资源是非常珍贵的。 * 会把发往同个broker上面partition的数据 组合成为一个请求。 * 然后统一一次发送过去,这样子就减少了网络请求。 */ //如果网络连接没有建立好 batches其实是为空。 //也就说其实这段代码也是不会执行。 List requests = createProduceRequests(batches, now); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } //发送请求 for (ClientRequest request : requests) client.send(request, now); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; //TODO 重点就是去看这个方法 //就是用这个方法拉取的元数据。 /** * 步骤八: * 真正执行网络操作的都是这个NetWordClient这个组件 * 包括:发送请求,接受响应(处理响应) */ //我们猜这儿可能就是去建立连接。 this.client.poll(pollTimeout, now); }
ready(Cluster cluster, long nowMs)
public ReadyCheckResult ready(Cluster cluster, long nowMs) { SetreadyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; Set unknownLeaderTopics = new HashSet<>(); //waiters里面有数据,说明我们的这个内存池里面内存不够了。 //如果exhausted的值等于true,说明内存池里面的内存不够用了。 boolean exhausted = this.free.queued() > 0; //遍历所有的分区 for (Map.Entry > entry : this.batches.entrySet()) { //获取到分区 TopicPartition part = entry.getKey(); //获取到分区对应的队列 Deque deque = entry.getValue(); //根据分区 可以获取到这个分区的leader partition在哪一台kafka的主机上面。 Node leader = cluster.leaderFor(part); synchronized (deque) { //如果没有找到对应主机。 unknownLeaderTopics 下次去拉取元数据 if (leader == null && !deque.isEmpty()) { // This is a partition for which leader is not known, but messages are available to send. // Note that entries are currently not removed from batches when deque is empty. unknownLeaderTopics.add(part.topic()); } else if (!readyNodes.contains(leader) && !muted.contains(part)) { //首先从队列的队头获取到批次 RecordBatch batch = deque.peekFirst(); //如果这个catch不null,我们判断一下是否可以发送这个批次。 if (batch != null) { /** * batch.attempts:重试的次数 * batch.lastAttemptMs:上一次重试的时间 * retryBackoffMs:重试的时间间隔 * * backingOff:重新发送数据的时间到了 */ boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; /** * nowMs: 当前时间 * batch.lastAttemptMs: 上一次重试的时间。 * waitedTimeMs=这个批次已经等了多久了。 */ long waitedTimeMs = nowMs - batch.lastAttemptMs; /** * 但是我们用场景驱动的方式去分析,因为我们第一次发送数据。 * 所以之前也没有消息发送出去过,也就没有重试这一说。 * * timeToWaitMs =lingerMs * lingerMs * 这个值默认是0,如果这个值默认是0 的话,那代表着来一条消息 * 就发送一条消息,那很明显是不合适的。 * 所以我们发送数据的时候,大家一定要记得去配置这个参数。 * 假设我们配置的是100ms * timeToWaitMs = linerMs = 100ms * 消息最多存多久就必须要发送出去了。 */ long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; /** * timeToWaitMs: 最多能等待多久 * waitedTimeMs: 已经等待了多久 * timeLeftMs: 还要在等待多久 */ long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); /** *如果队列大于一,说明这个队列里面至少有一个批次肯定是写满了 * 如果批次写满了肯定是可以发送数据了。 *当然也有可能就是这个队列里面只有一个批次,然后刚好这个批次 * 写满了,也可以发送数据。 * * full:是否有写满的批次 */ boolean full = deque.size() > 1 || batch.records.isFull(); /** * waitedTimeMs:已经等待了多久 * timeToWaitMs:最多需要等待多久 * expired: 时间到了,到了发送消息的时候了 * 如果expired=true 代表就是时间到了,到了发送消息的时候了 */ boolean expired = waitedTimeMs >= timeToWaitMs; /** * 1)full: 如果一个批次写满了(无论时间有没有到) * 2)expired:时间到了(批次没写满也得发送) * 3)exhausted:内存不够(消息发送出去以后,就会释放内存) */ boolean sendable = full || expired || exhausted || closed || flushInProgress(); //可以发送消息 if (sendable && !backingOff) { //把可以发送批次的partition的leader partition所在的主机加入到 //readyNodes readyNodes.add(leader); } else { // Note that this results in a conservative estimate since an un-sendable partition may have // a leader that will later be found to have sendable data. However, this is good enough // since we'll just wake up and then sleep again for the remaining time. nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); }
ready()方法
public boolean ready(Node node, long now) { //如果当前检查的节点为null,就报异常。 if (node.isEmpty()) throw new IllegalArgumentException("Cannot connect to empty node " + node); //判断要发送消息的主机,是否具备发送消息的条件 if (isReady(node, now)) return true; //判断是否可以尝试去建立网络 if (connectionStates.canConnect(node.idString(), now)) // if we are interested in sending to a node and we don't have a connection to it, initiate one //初始化连接 initiateConnect(node, now); return false; }
public boolean isReady(Node node, long now) { //我们要发送写数据请求的时候,不能是正在更新元数据的时候。 return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString()); } private boolean canSendRequest(String node) { /** * 1.connectionStates.isConnected(node): * 生产者:多个连接,缓存多个连接(跟我们的broker的节点数是一样的) * 判断缓存里面是否已经把这个连接给建立好了。 * 2.selector.isChannelReady(node): * java NIO:selector * selector -> 绑定了多个KafkaChannel(java socketChannel) * 一个kafkaChannel就代表一个连接。 * * 3.inFlightRequests.canSendMore(node): * 每个往broker主机上面发送消息的连接,最多能容忍5个消息,发送出去了 * 但是还没有接受到响应。 * 发送数据的顺序。 * 1,2,3,4,5 * * 2,3,4,5,1 */ return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node); }
canConnect()
public boolean canConnect(String id, long now) { //首先从缓存里面获取当前主机的连接。 NodeConnectionState state = nodeState.get(id); //如果值为null,说明从来没有连接过。 if (state == null) return true; else //可以从缓存里面获取到连接。 //但是连接的状态是DISCONNECTED 并且 // now - state.lastConnectAttemptMs >= this.reconnectBackoffMs 说明可以进行重试,重试连接。 return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs; }
initiateConnect()
private void initiateConnect(Node node, long now) { String nodeConnectionId = node.idString(); try { log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); this.connectionStates.connecting(nodeConnectionId, now); //TODO 尝试建立连接 selector.connect(nodeConnectionId, new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnected(nodeConnectionId, now); /* maybe the problem is our metadata, update it */ metadataUpdater.requestUpdate(); log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); } }
Sender 线程流程
转载地址:https://blog.csdn.net/weixin_37850264/article/details/112332774 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
能坚持,总会有不一样的收获!
[***.219.124.196]2024年04月01日 00时26分57秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
数据人上班划水都聊什么?
2019-04-26
统计学(1)|白话统计学发展(含统计学必知必会)
2019-04-26
TiDB在知乎万亿量级业务数据下的实践和挑战
2019-04-26
数据治理方案技术调研 Atlas VS Datahub VS Amundsen
2019-04-26
企业级数据查询毫秒级响应,Elastic search开启搜索应用新境界
2019-04-26
Hive千亿级数据倾斜解决方案(好文收藏)
2019-04-26
放假了,数据人可以看点小视频
2019-04-26
终于有人把A/B测试讲明白了
2019-04-26
推荐几个小视频
2019-04-26
2万文字,一文搞懂Kafka
2019-04-26
传统BI如何转大数据数仓
2019-04-26
【数据文化】Uber的数据治理
2019-04-26
金融行业大数据治理之路——数据模型篇
2019-04-26
阿里数据中台到底是怎么建设的?
2019-04-26
数据人如何提高核心竞争力
2019-04-26
Pulsar,做大数据一定要牢牢掌握的世界级产品!
2019-04-26
网易湖仓一体的探索与实践
2019-04-26
送30本数据畅销书!
2019-04-26
面试官:谈谈大数据采集和常见问题
2019-04-26
数据人都应该懂点数据库知识
2019-04-26