(九)Kafka网络设计
发布日期:2021-11-18 17:47:32
浏览次数:8
分类:技术文章
本文共 12501 字,大约阅读时间需要 41 分钟。
文章目录
前期回顾
/** * 步骤二: * 首先是判断哪些partition有消息可以发送,获取到这个partition的leader partition * 对应的broker主机。 * * 哪些broker上面需要我们去发送消息? */ RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); while (iter.hasNext()) { Node node = iter.next(); /** * 步骤四:检查与要发送数据的主机的网络是否已经建立好。 */ if (!this.client.ready(node, now)) { //如果返回的是false !false 代码就进来 //移除result 里面要发送消息的主机。 //所以我们会看到这儿所有的主机都会被移除 iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } //所以我们发现 如果网络没有建立的话,这儿的代码是不执行的 Map> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); if (guaranteeMessageOrder) { // Mute all the partitions drained //如果batches 空的话,这而的代码也就不执行了。 for (List batchList : batches.values()) { for (RecordBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } //如果网络连接没有建立好 batches其实是为空。 //也就说其实这段代码也是不会执行。 List requests = createProduceRequests(batches, now); 。。。。。。。。。。。。。 。。。。。。。。。。。。。 //我们猜这儿可能就是去建立连接。 this.client.poll(pollTimeout, now);
因此很重要:this.client.poll(pollTimeout, now);
在步骤二的ready()里面,建立连接之前需要做网络的初始化:
private void initiateConnect(Node node, long now) { String nodeConnectionId = node.idString(); try { log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); this.connectionStates.connecting(nodeConnectionId, now); //TODO 尝试建立连接 selector.connect(nodeConnectionId, new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnected(nodeConnectionId, now); /* maybe the problem is our metadata, update it */ metadataUpdater.requestUpdate(); log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); } }
典型的NIO操作
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException { if (this.channels.containsKey(id)) throw new IllegalStateException("There is already a connection for id " + id); //要是了解java NIO编程的同学,这些代码就是一些基本的 //代码,跟我们NIO编程的代码是一模一样。 //获取到SocketChannel SocketChannel socketChannel = SocketChannel.open(); //设置为非阻塞的模式 socketChannel.configureBlocking(false); Socket socket = socketChannel.socket(); socket.setKeepAlive(true); //设置一些参数 //这些网络的参数,我们之前在分析Producer的时候给大家看过 //都有一些默认值。 if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socket.setSendBufferSize(sendBufferSize); if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socket.setReceiveBufferSize(receiveBufferSize); //这个的默认值是false,代表要开启Nagle的算法 //它会把网络中的一些小的数据包收集起来,组合成一个大的数据包 //然后再发送出去。因为它认为如果网络中有大量的小的数据包在传输 //其实是会影响网络拥塞。 //kafka一定不能把这儿设置为false,因为我们有些时候可能有些数据包就是比较 //小,他这儿就不帮我们发送了,显然是不合理的。 socket.setTcpNoDelay(true); boolean connected; try { //尝试去服务器去连接。 //因为这儿非阻塞的 //有可能就立马连接成功,如果成功了就返回true //也有可能需要很久才能连接成功,返回false。 connected = socketChannel.connect(address); } catch (UnresolvedAddressException e) { socketChannel.close(); throw new IOException("Can't resolve address: " + address, e); } catch (IOException e) { socketChannel.close(); throw e; } //SocketChannel往Selector上注册了一个OP_CONNECT SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT); //根据根据SocketChannel 封装出来一个KafkaChannel KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize); //把key和KafkaChannel关联起来 //后面使用起来会比较方便 //我们可以根据key就找到KafkaChannel //也可以根据KafkaChannel找到key key.attach(channel); //缓存起来了 this.channels.put(id, channel); //所以正常情况下,这儿网络不能完成连接。 //如果这儿不能完成连接。大家猜一下 //kafka会在哪儿完成网络最后的连接呢? //如果里面就连接上了 if (connected) { // OP_CONNECT won't trigger for immediately connected channels log.debug("Immediately connected to node {}", channel.id()); immediatelyConnectedKeys.add(key); // 取消前面注册 OP_CONNECT 事件。 key.interestOps(0); } }
public Listpoll(long timeout, long now) { /** * 在这个方法里面有涉及到kafka的网络的方法,但是 * 目前我们还没有给大家讲kafka的网络,所以我们分析的时候 * 暂时不用分析得特别的详细,我们大概知道是如何获取到元数据 * 即可。等我们分析完了kafka的网络以后,我们在回头看这儿的代码 * 的时候,其实代码就比较简单了。 */ //步骤一:封装了一个要拉取元数据请求 long metadataTimeout = metadataUpdater.maybeUpdate(now); try { //步骤二: 发送请求,进行复杂的网络操作 //但是我们目前还没有学习到kafka的网络 //所以这儿大家就只需要知道这儿会发送网络请求。 //TODO 执行网络IO的操作。 this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } // process completed actions long updatedNow = this.time.milliseconds(); List responses = new ArrayList<>(); handleCompletedSends(responses, updatedNow); //步骤三:处理响应,响应里面就会有我们需要的元数据。 /** * 这个地方是我们在看生产者是如何获取元数据的时候,看的。 * 其实Kafak获取元数据的流程跟我们发送消息的流程是一模一样。 * 获取元数据 -》 判断网络连接是否建立好 -》 建立网络连接 * -》 发送请求(获取元数据的请求) -》 服务端发送回来响应(带了集群的元数据信息) * */ handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); //处理超时的请求 handleTimedOutRequests(responses, updatedNow); // invoke callbacks for (ClientResponse response : responses) { if (response.request().hasCallback()) { try { //调用的响应的里面的我们之前发送出去的请求的回调函数 //看到了这儿,我们回头再去看一下 //我们当时发送请求的时候,是如何封装这个请求。 //不过虽然目前我们还没看到,但是我们可以大胆猜一下。 //当时封装网络请求的时候,肯定是给他绑定了一个回调函数。 response.request().callback().onComplete(response); } catch (Exception e) { log.error("Uncaught error in request completion:", e); } } } return responses; }
再调用 selector.poll()
public void poll(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException("timeout should be >= 0"); clear(); if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty()) timeout = 0; /* check ready keys */ long startSelect = time.nanoseconds(); //从Selector上找到有多少个key注册了 int readyKeys = select(timeout); long endSelect = time.nanoseconds(); this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds()); //因为我们用场景驱动的方式 //我们刚刚确实是注册了一个key if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) { //立马就要对这个Selector上面的key要进行处理。 pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect); pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); } //TODO 对stagedReceives里面的数据要进行处理 addToCompletedReceives(); long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); // we use the time at the end of select to ensure that we don't close any connections that // have just been processed in pollSelectionKeys maybeCloseOldestConnection(endSelect); }
private void pollSelectionKeys(IterableselectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { //获取到所有key Iterator iterator = selectionKeys.iterator(); //遍历所有的key while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); //根据key找到对应的KafkaChannel KafkaChannel channel = channel(key); // register all per-connection metrics at once sensors.maybeRegisterConnectionMetrics(channel.id()); if (idleExpiryManager != null) idleExpiryManager.update(channel.id(), currentTimeNanos); try { /* complete any connections that have finished their handshake (either normally or immediately) */ /** * * 我们代码第一次进来应该要走的是这儿分支,因为我们前面注册的是 * SelectionKey key = socketChannel.register(nioSelector, * SelectionKey.OP_CONNECT); * */ if (isImmediatelyConnected || key.isConnectable()) { //TODO 核心的代码来了 //去最后完成网络的连接 //如果我们之前初始化的时候,没有完成网络连接的话,这儿一定会帮你 //完成网络的连接。 if (channel.finishConnect()) { //网络连接已经完成了以后,就把这个channel存储到 this.connected.add(channel.id()); this.sensors.connectionCreated.record(); SocketChannel socketChannel = (SocketChannel) key.channel(); log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}", socketChannel.socket().getReceiveBufferSize(), socketChannel.socket().getSendBufferSize(), socketChannel.socket().getSoTimeout(), channel.id()); } else continue; } /* if channel is not ready finish prepare */ if (channel.isConnected() && !channel.ready()) channel.prepare(); /* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; //接受服务端发送回来的响应(请求) //networkReceive 代表的就是一个服务端发送 //回来的响应 while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); } /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ //核心代码,处理发送请求的事件 //selector 注册了一个OP_WRITE //selector 注册了一个OP_READ if (channel.ready() && key.isWritable()) { //获取到我们要发送的那个网络请求。 //是这句代码就是要往服务端发送数据了。 Send send = channel.write(); if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } /* cancel any defunct sockets */ if (!key.isValid()) { close(channel); this.disconnected.add(channel.id()); } } catch (Exception e) { String desc = channel.socketDescription(); if (e instanceof IOException) log.debug("Connection with {} disconnected", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); close(channel); this.disconnected.add(channel.id()); } } }
转载地址:https://blog.csdn.net/weixin_37850264/article/details/112335148 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
表示我来过!
[***.240.166.169]2024年04月22日 12时48分18秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
洛谷 P2580 于是他错误的点名开始了【字典树/Map】
2019-04-28
HDU 3336 Count the string【KMP的next数组性质】
2019-04-28
洛谷 P1196 [NOI2002]银河英雄传说【带权并查集】
2019-04-28
HDU 4825 Xor Sum【01字典树/贪心】(两数最大/最小异或和)
2019-04-28
洛谷 P4551 最长异或路径【01字典树/贪心】
2019-04-28
LeetCode 921. 使括号有效的最少添加(栈)
2019-04-28
LeetCode 1018. 可被 5 整除的二进制前缀
2019-04-28
LeetCode 961. 重复 N 次的元素
2019-04-28
LeetCode 925. 长按键入(双指针)
2019-04-28
LeetCode 1309. 解码字母到整数映射
2019-04-28
动态规划应用--最长递增子序列 LeetCode 300
2019-04-28
LeetCode 53. 最大子序和(动态规划)
2019-04-28
图Graph--拓扑排序(Topological Sorting)
2019-04-28
图Graph--最短路径算法(Shortest Path Algorithm)
2019-04-28
LeetCode 674. 最长连续递增序列
2019-04-28
LeetCode 70. 爬楼梯(动态规划)
2019-04-28
数据结构--位图 BitMap
2019-04-28
朴素贝叶斯算法--过滤垃圾短信
2019-04-28
向量空间 Vector Space -- 推荐系统
2019-04-28