(八)Sender线程运行流程初探
发布日期:2021-11-18 17:47:31 浏览次数:7 分类:技术文章

本文共 13534 字,大约阅读时间需要 45 分钟。

文章目录


总览

void run(long now) {
//获取元数据 //因为我们是根据场景驱动的方式,目前是我们第一次代码进来, //目前还没有获取到元数据 //所以这个cluster里面是没有元数据 //如果这儿没有元数据的话,这个方法里面接下来的代码就不用看了 //是以为接下来的这些代码依赖这个元数据。 //TODO 我们直接看这个方法的最后一行代码 //就是这行代码去拉取的元数据。 /** * 我们用场景驱动的方式,现在我们的代码是第二次进来 * 第二次进来的时候,已经有元数据了,所以cluster这儿是有元数据。 * * 步骤一: * 获取元数据 * * */ Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send /** * 步骤二: * 首先是判断哪些partition有消息可以发送,获取到这个partition的leader partition * 对应的broker主机。 * * 哪些broker上面需要我们去发送消息? */ RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); /** * 步骤三: * 标识还没有拉取到元数据的topic */ if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as // topics which may have expired. Add the topic again to metadata to ensure it is included // and request metadata update, since there are messages to send to the topic. for (String topic : result.unknownLeaderTopics) this.metadata.add(topic); this.metadata.requestUpdate(); } // remove any nodes we aren't ready to send to Iterator
iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) {
Node node = iter.next(); /** * 步骤四:检查与要发送数据的主机的网络是否已经建立好。 */ if (!this.client.ready(node, now)) {
//如果返回的是false !false 代码就进来 //移除result 里面要发送消息的主机。 //所以我们会看到这儿所有的主机都会被移除 iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } /** * 步骤五: * * 我们有可能要发送的partition有很多个, * 很有可能有一些partition的leader partition是在同一台服务器上面。 * p0:leader:0 * p1:leader: 0 * p2:leader: 1 * p3:leader: 2 * 假设我们集群只有3台服务器 * 当我们的分区的个数大于集群的节点的个数的时候,一定会有多个leader partition在同一台服务器上面。 * * 按照broker进行分组,同一个broker的partition为同一组 * 0:{p0,p1} * 1:{p2} * 2:{p3} * */ //所以我们发现 如果网络没有建立的话,这儿的代码是不执行的 Map
> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); if (guaranteeMessageOrder) {
// Mute all the partitions drained //如果batches 空的话,这而的代码也就不执行了。 for (List
batchList : batches.values()) {
for (RecordBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } /** * 步骤六: * 对超时的批次是如何处理的? * */ List
expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); /** * 步骤七: * 创建发送消息的请求 * * * 创建请求 * 我们往partition上面去发送消息的时候,有一些partition他们在同一台服务器上面 * ,如果我们一分区一个分区的发送我们网络请求,那网络请求就会有一些频繁 * 我们要知道,我们集群里面网络资源是非常珍贵的。 * 会把发往同个broker上面partition的数据 组合成为一个请求。 * 然后统一一次发送过去,这样子就减少了网络请求。 */ //如果网络连接没有建立好 batches其实是为空。 //也就说其实这段代码也是不会执行。 List
requests = createProduceRequests(batches, now); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } //发送请求 for (ClientRequest request : requests) client.send(request, now); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; //TODO 重点就是去看这个方法 //就是用这个方法拉取的元数据。 /** * 步骤八: * 真正执行网络操作的都是这个NetWordClient这个组件 * 包括:发送请求,接受响应(处理响应) */ //我们猜这儿可能就是去建立连接。 this.client.poll(pollTimeout, now); }

ready(Cluster cluster, long nowMs)

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set
readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; Set
unknownLeaderTopics = new HashSet<>(); //waiters里面有数据,说明我们的这个内存池里面内存不够了。 //如果exhausted的值等于true,说明内存池里面的内存不够用了。 boolean exhausted = this.free.queued() > 0; //遍历所有的分区 for (Map.Entry
> entry : this.batches.entrySet()) {
//获取到分区 TopicPartition part = entry.getKey(); //获取到分区对应的队列 Deque
deque = entry.getValue(); //根据分区 可以获取到这个分区的leader partition在哪一台kafka的主机上面。 Node leader = cluster.leaderFor(part); synchronized (deque) {
//如果没有找到对应主机。 unknownLeaderTopics 下次去拉取元数据 if (leader == null && !deque.isEmpty()) {
// This is a partition for which leader is not known, but messages are available to send. // Note that entries are currently not removed from batches when deque is empty. unknownLeaderTopics.add(part.topic()); } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
//首先从队列的队头获取到批次 RecordBatch batch = deque.peekFirst(); //如果这个catch不null,我们判断一下是否可以发送这个批次。 if (batch != null) {
/** * batch.attempts:重试的次数 * batch.lastAttemptMs:上一次重试的时间 * retryBackoffMs:重试的时间间隔 * * backingOff:重新发送数据的时间到了 */ boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; /** * nowMs: 当前时间 * batch.lastAttemptMs: 上一次重试的时间。 * waitedTimeMs=这个批次已经等了多久了。 */ long waitedTimeMs = nowMs - batch.lastAttemptMs; /** * 但是我们用场景驱动的方式去分析,因为我们第一次发送数据。 * 所以之前也没有消息发送出去过,也就没有重试这一说。 * * timeToWaitMs =lingerMs * lingerMs * 这个值默认是0,如果这个值默认是0 的话,那代表着来一条消息 * 就发送一条消息,那很明显是不合适的。 * 所以我们发送数据的时候,大家一定要记得去配置这个参数。 * 假设我们配置的是100ms * timeToWaitMs = linerMs = 100ms * 消息最多存多久就必须要发送出去了。 */ long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; /** * timeToWaitMs: 最多能等待多久 * waitedTimeMs: 已经等待了多久 * timeLeftMs: 还要在等待多久 */ long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); /** *如果队列大于一,说明这个队列里面至少有一个批次肯定是写满了 * 如果批次写满了肯定是可以发送数据了。 *当然也有可能就是这个队列里面只有一个批次,然后刚好这个批次 * 写满了,也可以发送数据。 * * full:是否有写满的批次 */ boolean full = deque.size() > 1 || batch.records.isFull(); /** * waitedTimeMs:已经等待了多久 * timeToWaitMs:最多需要等待多久 * expired: 时间到了,到了发送消息的时候了 * 如果expired=true 代表就是时间到了,到了发送消息的时候了 */ boolean expired = waitedTimeMs >= timeToWaitMs; /** * 1)full: 如果一个批次写满了(无论时间有没有到) * 2)expired:时间到了(批次没写满也得发送) * 3)exhausted:内存不够(消息发送出去以后,就会释放内存) */ boolean sendable = full || expired || exhausted || closed || flushInProgress(); //可以发送消息 if (sendable && !backingOff) {
//把可以发送批次的partition的leader partition所在的主机加入到 //readyNodes readyNodes.add(leader); } else {
// Note that this results in a conservative estimate since an un-sendable partition may have // a leader that will later be found to have sendable data. However, this is good enough // since we'll just wake up and then sleep again for the remaining time. nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); }

ready()方法

public boolean ready(Node node, long now) {
//如果当前检查的节点为null,就报异常。 if (node.isEmpty()) throw new IllegalArgumentException("Cannot connect to empty node " + node); //判断要发送消息的主机,是否具备发送消息的条件 if (isReady(node, now)) return true; //判断是否可以尝试去建立网络 if (connectionStates.canConnect(node.idString(), now)) // if we are interested in sending to a node and we don't have a connection to it, initiate one //初始化连接 initiateConnect(node, now); return false; }
public boolean isReady(Node node, long now) {
//我们要发送写数据请求的时候,不能是正在更新元数据的时候。 return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString()); } private boolean canSendRequest(String node) {
/** * 1.connectionStates.isConnected(node): * 生产者:多个连接,缓存多个连接(跟我们的broker的节点数是一样的) * 判断缓存里面是否已经把这个连接给建立好了。 * 2.selector.isChannelReady(node): * java NIO:selector * selector -> 绑定了多个KafkaChannel(java socketChannel) * 一个kafkaChannel就代表一个连接。 * * 3.inFlightRequests.canSendMore(node): * 每个往broker主机上面发送消息的连接,最多能容忍5个消息,发送出去了 * 但是还没有接受到响应。 * 发送数据的顺序。 * 1,2,3,4,5 * * 2,3,4,5,1 */ return connectionStates.isConnected(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node); }

canConnect()

public boolean canConnect(String id, long now) {
//首先从缓存里面获取当前主机的连接。 NodeConnectionState state = nodeState.get(id); //如果值为null,说明从来没有连接过。 if (state == null) return true; else //可以从缓存里面获取到连接。 //但是连接的状态是DISCONNECTED 并且 // now - state.lastConnectAttemptMs >= this.reconnectBackoffMs 说明可以进行重试,重试连接。 return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs; }

initiateConnect()

private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString(); try {
log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); this.connectionStates.connecting(nodeConnectionId, now); //TODO 尝试建立连接 selector.connect(nodeConnectionId, new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) {
/* attempt failed, we'll try again after the backoff */ connectionStates.disconnected(nodeConnectionId, now); /* maybe the problem is our metadata, update it */ metadataUpdater.requestUpdate(); log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); } }

Sender 线程流程

在这里插入图片描述

转载地址:https://blog.csdn.net/weixin_37850264/article/details/112332774 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:(九)Kafka网络设计
下一篇:(七)RecordAccumulator详细剖析

发表评论

最新留言

能坚持,总会有不一样的收获!
[***.219.124.196]2024年04月01日 00时26分57秒