(十)生产者发送请求
发布日期:2021-11-18 17:47:33
浏览次数:10
分类:技术文章
本文共 19464 字,大约阅读时间需要 64 分钟。
文章目录
前期回顾
private void pollSelectionKeys(IterableselectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { //获取到所有key Iterator iterator = selectionKeys.iterator(); //遍历所有的key while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); //根据key找到对应的KafkaChannel KafkaChannel channel = channel(key); // register all per-connection metrics at once sensors.maybeRegisterConnectionMetrics(channel.id()); if (idleExpiryManager != null) idleExpiryManager.update(channel.id(), currentTimeNanos); try { /* complete any connections that have finished their handshake (either normally or immediately) */ /** * * 我们代码第一次进来应该要走的是这儿分支,因为我们前面注册的是 * SelectionKey key = socketChannel.register(nioSelector, * SelectionKey.OP_CONNECT); * */ if (isImmediatelyConnected || key.isConnectable()) { //TODO 核心的代码来了 //去最后完成网络的连接 //如果我们之前初始化的时候,没有完成网络连接的话,这儿一定会帮你 //完成网络的连接。 if (channel.finishConnect()) { //网络连接已经完成了以后,就把这个channel存储到 this.connected.add(channel.id()); this.sensors.connectionCreated.record(); SocketChannel socketChannel = (SocketChannel) key.channel(); log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}", socketChannel.socket().getReceiveBufferSize(), socketChannel.socket().getSendBufferSize(), socketChannel.socket().getSoTimeout(), channel.id()); } else continue; } /* if channel is not ready finish prepare */ if (channel.isConnected() && !channel.ready()) channel.prepare(); /* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; //接受服务端发送回来的响应(请求) //networkReceive 代表的就是一个服务端发送 //回来的响应 while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); } /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ //核心代码,处理发送请求的事件 //selector 注册了一个OP_WRITE //selector 注册了一个OP_READ if (channel.ready() && key.isWritable()) { //获取到我们要发送的那个网络请求。 //是这句代码就是要往服务端发送数据了。 Send send = channel.write(); if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } /* cancel any defunct sockets */ if (!key.isValid()) { close(channel); this.disconnected.add(channel.id()); } } catch (Exception e) { String desc = channel.socketDescription(); if (e instanceof IOException) log.debug("Connection with {} disconnected", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); close(channel); this.disconnected.add(channel.id()); } } }
其中channel.finishConnect()中完成建立连接,调用了
public boolean finishConnect() throws IOException { //完成的最后的网络的连接 boolean connected = socketChannel.finishConnect(); //如果连接完成了以后。 if (connected) //取消了OP_CONNECT事件 //增加了OP_READ 事件 我们这儿的这个key对应的KafkaChannel是不是就可以接受服务 //端发送回来的响应了。 key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); return connected; }
sender的run(),继续分析
void run(long now) { //获取元数据 //因为我们是根据场景驱动的方式,目前是我们第一次代码进来, //目前还没有获取到元数据 //所以这个cluster里面是没有元数据 //如果这儿没有元数据的话,这个方法里面接下来的代码就不用看了 //是以为接下来的这些代码依赖这个元数据。 //TODO 我们直接看这个方法的最后一行代码 //就是这行代码去拉取的元数据。 /** * 我们用场景驱动的方式,现在我们的代码是第二次进来 * 第二次进来的时候,已经有元数据了,所以cluster这儿是有元数据。 * * 步骤一: * 获取元数据 * * */ Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send /** * 步骤二: * 首先是判断哪些partition有消息可以发送,获取到这个partition的leader partition * 对应的broker主机。 * * 哪些broker上面需要我们去发送消息? */ RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); /** * 步骤三: * 标识还没有拉取到元数据的topic */ if (!result.unknownLeaderTopics.isEmpty()) { // The set of topics with unknown leader contains topics with leader election pending as well as // topics which may have expired. Add the topic again to metadata to ensure it is included // and request metadata update, since there are messages to send to the topic. for (String topic : result.unknownLeaderTopics) this.metadata.add(topic); this.metadata.requestUpdate(); } // remove any nodes we aren't ready to send to Iteratoriter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); /** * 步骤四:检查与要发送数据的主机的网络是否已经建立好。 */ if (!this.client.ready(node, now)) { //如果返回的是false !false 代码就进来 //移除result 里面要发送消息的主机。 //所以我们会看到这儿所有的主机都会被移除 iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } /** * 步骤五: * * 我们有可能要发送的partition有很多个, * 很有可能有一些partition的leader partition是在同一台服务器上面。 * p0:leader:0 * p1:leader: 0 * p2:leader: 1 * p3:leader: 2 * 假设我们集群只有3台服务器 * 当我们的分区的个数大于集群的节点的个数的时候,一定会有多个leader partition在同一台服务器上面。 * * 按照broker进行分组,同一个broker的partition为同一组 * 0:{p0,p1} * 1:{p2} * 2:{p3} * */ //所以我们发现 如果网络没有建立的话,这儿的代码是不执行的 Map > batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); if (guaranteeMessageOrder) { // Mute all the partitions drained //如果batches 空的话,这而的代码也就不执行了。 for (List batchList : batches.values()) { for (RecordBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } /** * 步骤六: * 对超时的批次是如何处理的? * */ List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); /** * 步骤七: * 创建发送消息的请求 * * * 创建请求 * 我们往partition上面去发送消息的时候,有一些partition他们在同一台服务器上面 * ,如果我们一分区一个分区的发送我们网络请求,那网络请求就会有一些频繁 * 我们要知道,我们集群里面网络资源是非常珍贵的。 * 会把发往同个broker上面partition的数据 组合成为一个请求。 * 然后统一一次发送过去,这样子就减少了网络请求。 */ //如果网络连接没有建立好 batches其实是为空。 //也就说其实这段代码也是不会执行。 List requests = createProduceRequests(batches, now); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } //发送请求 for (ClientRequest request : requests) client.send(request, now); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; //TODO 重点就是去看这个方法 //就是用这个方法拉取的元数据。 /** * 步骤八: * 真正执行网络操作的都是这个NetWordClient这个组件 * 包括:发送请求,接受响应(处理响应) */ //我们猜这儿可能就是去建立连接。 this.client.poll(pollTimeout, now); }
其中步骤五和步骤七: 会把发往同个broker上面partition的数据组合成为一个请求,然后统一一次发送过去,这样子就减少了网络请求。调用send()
public void send(ClientRequest request, long now) { String nodeId = request.request().destination(); if (!canSendRequest(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); //TODO 看上去就是关键的代码。 doSend(request, now); } private void doSend(ClientRequest request, long now) { request.setSendTimeMs(now); //这儿往inFlightRequests 组件里存 Request请求。 //存储的就是还没有收到响应的请求。 //这个里面默认最多能存5个请求。 //其实我们可以猜想一个事,如果我们的请求发送出去了 //然后也成功的接受到了响应,后面就会到这儿把这个请求移除。 this.inFlightRequests.add(request); //TODO selector.send(request.request()); }
调用selector的send()
public void send(Send send) { //获取到一个KafakChannel KafkaChannel channel = channelOrFail(send.destination()); try { //TODO channel.setSend(send); } catch (CancelledKeyException e) { this.failedSends.add(send.destination()); close(channel); } }
调用kafkachannel的setsend()
public void setSend(Send send) { if (this.send != null) throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress."); //往KafkaChannel里面绑定一个发送出去的请求。 this.send = send; //关键的代码来了 //这儿绑定了一个OP_WRITE事件。 //一旦绑定了这个事件以后,我们就可以往服务端发送请求了。 this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); }
开始发送数据 sender里面的poll
public Listpoll(long timeout, long now) { /** * 在这个方法里面有涉及到kafka的网络的方法,但是 * 目前我们还没有给大家讲kafka的网络,所以我们分析的时候 * 暂时不用分析得特别的详细,我们大概知道是如何获取到元数据 * 即可。等我们分析完了kafka的网络以后,我们在回头看这儿的代码 * 的时候,其实代码就比较简单了。 */ //步骤一:封装了一个要拉取元数据请求 long metadataTimeout = metadataUpdater.maybeUpdate(now); try { //步骤二: 发送请求,进行复杂的网络操作 //但是我们目前还没有学习到kafka的网络 //所以这儿大家就只需要知道这儿会发送网络请求。 //TODO 执行网络IO的操作。 this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } // process completed actions long updatedNow = this.time.milliseconds(); List responses = new ArrayList<>(); handleCompletedSends(responses, updatedNow); //步骤三:处理响应,响应里面就会有我们需要的元数据。 /** * 这个地方是我们在看生产者是如何获取元数据的时候,看的。 * 其实Kafak获取元数据的流程跟我们发送消息的流程是一模一样。 * 获取元数据 -》 判断网络连接是否建立好 -》 建立网络连接 * -》 发送请求(获取元数据的请求) -》 服务端发送回来响应(带了集群的元数据信息) * */ handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); //处理超时的请求 handleTimedOutRequests(responses, updatedNow); // invoke callbacks for (ClientResponse response : responses) { if (response.request().hasCallback()) { try { //调用的响应的里面的我们之前发送出去的请求的回调函数 //看到了这儿,我们回头再去看一下 //我们当时发送请求的时候,是如何封装这个请求。 //不过虽然目前我们还没看到,但是我们可以大胆猜一下。 //当时封装网络请求的时候,肯定是给他绑定了一个回调函数。 response.request().callback().onComplete(response); } catch (Exception e) { log.error("Uncaught error in request completion:", e); } } } return responses; }
public void poll(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("timeout should be >= 0"); clear(); if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty()) timeout = 0; /* check ready keys */ long startSelect = time.nanoseconds(); //从Selector上找到有多少个key注册了 int readyKeys = select(timeout); long endSelect = time.nanoseconds(); this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); //因为我们用场景驱动的方式 //我们刚刚确实是注册了一个key if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { //立马就要对这个Selector上面的key要进行处理。 pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect); pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); } //TODO 对stagedReceives里面的数据要进行处理 addToCompletedReceives(); long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); // we use the time at the end of select to ensure that we don't close any connections that // have just been processed in pollSelectionKeys maybeCloseOldestConnection(endSelect); }
调用pollSelectionKeys
private void pollSelectionKeys(IterableselectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { //获取到所有key Iterator iterator = selectionKeys.iterator(); //遍历所有的key while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); //根据key找到对应的KafkaChannel KafkaChannel channel = channel(key); // register all per-connection metrics at once sensors.maybeRegisterConnectionMetrics(channel.id()); if (idleExpiryManager != null) idleExpiryManager.update(channel.id(), currentTimeNanos); try { /* complete any connections that have finished their handshake (either normally or immediately) */ /** * * 我们代码第一次进来应该要走的是这儿分支,因为我们前面注册的是 * SelectionKey key = socketChannel.register(nioSelector, * SelectionKey.OP_CONNECT); * */ if (isImmediatelyConnected || key.isConnectable()) { //TODO 核心的代码来了 //去最后完成网络的连接 //如果我们之前初始化的时候,没有完成网络连接的话,这儿一定会帮你 //完成网络的连接。 if (channel.finishConnect()) { //网络连接已经完成了以后,就把这个channel存储到 this.connected.add(channel.id()); this.sensors.connectionCreated.record(); SocketChannel socketChannel = (SocketChannel) key.channel(); log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}", socketChannel.socket().getReceiveBufferSize(), socketChannel.socket().getSendBufferSize(), socketChannel.socket().getSoTimeout(), channel.id()); } else continue; } /* if channel is not ready finish prepare */ if (channel.isConnected() && !channel.ready()) channel.prepare(); /* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; //接受服务端发送回来的响应(请求) //networkReceive 代表的就是一个服务端发送 //回来的响应 while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); } /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ //核心代码,处理发送请求的事件 //selector 注册了一个OP_WRITE //selector 注册了一个OP_READ if (channel.ready() && key.isWritable()) { //获取到我们要发送的那个网络请求。 //是这句代码就是要往服务端发送数据了。 Send send = channel.write(); if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } /* cancel any defunct sockets */ if (!key.isValid()) { close(channel); this.disconnected.add(channel.id()); } } catch (Exception e) { String desc = channel.socketDescription(); if (e instanceof IOException) log.debug("Connection with {} disconnected", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); close(channel); this.disconnected.add(channel.id()); } } }
调用write()
public Send write() throws IOException { Send result = null; //send方法就是发送网络请求的方法 if (send != null && send(send)) { result = send; send = null; } return result; }
调用send()
private boolean send(Send send) throws IOException { //最终执行发送请求的代码是在这儿 send.writeTo(transportLayer); //如果已经完成网络请求的发送。 if (send.completed()) //然后就移除OP_WRITE transportLayer.removeInterestOps(SelectionKey.OP_WRITE); return send.completed(); }
流程
转载地址:https://blog.csdn.net/weixin_37850264/article/details/112337443 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
初次前来,多多关照!
[***.217.46.12]2024年03月26日 00时16分12秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
@Value注解不能注入static修饰的属性
2019-04-27
spring boot 2.x 接口返回时间类型不再自动序列化为timestamp
2019-04-27
Ubuntu Linux 创建root用户并且允许远程登录
2019-04-27
Linux shell 关于 2>&1 的含义
2019-04-27
Ubuntu Linux系统使用apt-get install安装的软件的相关位置
2019-04-27
nginx同一server配置多个前端工程location访问404问题
2019-04-27
前端嫌弃原生Swagger界面太low,于是我给她开通了超级VIP
2019-04-27
小白都能学会的Java注解与反射机制
2019-04-27
Java高并发测试框架JCStress
2019-04-27
阿里P8大神教我yaml语法,我终于不再只是使用字符串类型了
2019-04-27
Springboot 集成 i8n,两行代码实现国际化,你不想学吗?
2019-04-27
LeetCode 每日一题「判定字符是否唯一」
2019-04-27
Oracle中wm_concat的使用
2019-04-27
国庆第四天出行归来
2019-04-27
宝宝游乐园的优化思路(r6笔记第72天)
2019-04-27
UI5_INFO_FETCH_FROM_DB
2019-04-27
SAP CRM WebClient UI的配置存储数据库表
2019-04-27
SAP C4C Mashup port bindingF4帮助对话框里的数据源
2019-04-27
SAP C4C产品主数据OData服务的ETag处理
2019-04-27