(十二)粘包问题与拆包问题
发布日期:2021-11-18 17:47:33
浏览次数:8
分类:技术文章
本文共 8404 字,大约阅读时间需要 28 分钟。
文章目录
前言
- 粘包问题:一个请求里面带有多个响应,多个消息粘再一起给你发送回来;
- 拆包问题:一个消息拆成多个请求发送回来;
粘包
private void pollSelectionKeys(IterableselectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { //获取到所有key Iterator iterator = selectionKeys.iterator(); //遍历所有的key while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); //根据key找到对应的KafkaChannel KafkaChannel channel = channel(key); // register all per-connection metrics at once sensors.maybeRegisterConnectionMetrics(channel.id()); if (idleExpiryManager != null) idleExpiryManager.update(channel.id(), currentTimeNanos); try { /* complete any connections that have finished their handshake (either normally or immediately) */ /** * * 我们代码第一次进来应该要走的是这儿分支,因为我们前面注册的是 * SelectionKey key = socketChannel.register(nioSelector, * SelectionKey.OP_CONNECT); * */ if (isImmediatelyConnected || key.isConnectable()) { //TODO 核心的代码来了 //去最后完成网络的连接 //如果我们之前初始化的时候,没有完成网络连接的话,这儿一定会帮你 //完成网络的连接。 if (channel.finishConnect()) { //网络连接已经完成了以后,就把这个channel存储到 this.connected.add(channel.id()); this.sensors.connectionCreated.record(); SocketChannel socketChannel = (SocketChannel) key.channel(); log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}", socketChannel.socket().getReceiveBufferSize(), socketChannel.socket().getSendBufferSize(), socketChannel.socket().getSoTimeout(), channel.id()); } else continue; } /* if channel is not ready finish prepare */ if (channel.isConnected() && !channel.ready()) channel.prepare(); /* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; //接受服务端发送回来的响应(请求) //networkReceive 代表的就是一个服务端发送 //回来的响应 while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); } /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ //核心代码,处理发送请求的事件 //selector 注册了一个OP_WRITE //selector 注册了一个OP_READ if (channel.ready() && key.isWritable()) { //获取到我们要发送的那个网络请求。 //是这句代码就是要往服务端发送数据了。 Send send = channel.write(); if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } /* cancel any defunct sockets */ if (!key.isValid()) { close(channel); this.disconnected.add(channel.id()); } } catch (Exception e) { String desc = channel.socketDescription(); if (e instanceof IOException) log.debug("Connection with {} disconnected", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); close(channel); this.disconnected.add(channel.id()); } } }
其中
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; //接受服务端发送回来的响应(请求) //networkReceive 代表的就是一个服务端发送 //回来的响应 while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); }
public NetworkReceive read() throws IOException { NetworkReceive result = null; if (receive == null) { receive = new NetworkReceive(maxReceiveSize, id); } //一直在读取数据。 receive(receive); //是否读完一个完整的响应消息 if (receive.complete()) { receive.payload().rewind(); result = receive; receive = null; } return result; }
public boolean complete() { //size 没有剩余空间(size读满) && return !size.hasRemaining() && !buffer.hasRemaining(); }
一直递归调用,最终处理粘包问题核心代码
public long readFromReadableChannel(ReadableByteChannel channel) throws IOException { int read = 0; //size是一个4字节大小的内存空间 //如果size还有剩余的内存空间。 if (size.hasRemaining()) { //先读取4字节的数据,(代表的意思就是后面跟着的消息体的大小) int bytesRead = channel.read(size); if (bytesRead < 0) throw new EOFException(); read += bytesRead; //一直要读取到当这个size没有剩余空间 //说明已经读取到了一个4字节的int类型的数了。 if (!size.hasRemaining()) { size.rewind(); // int receiveSize = size.getInt(); if (receiveSize < 0) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); if (maxSize != UNLIMITED && receiveSize > maxSize) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); //分配一个内存空间,这个内存空间的大小 //就是刚刚读出来的那个4字节的int的大小。 this.buffer = ByteBuffer.allocate(receiveSize); } } if (buffer != null) { //去读取数据 int bytesRead = channel.read(buffer); if (bytesRead < 0) throw new EOFException(); read += bytesRead; } return read; }
拆包
50生产者|60如何处理
有两种地方可能会发生拆包:- 消息体的size
- 消息体
1. 消息体的size
public long readFromReadableChannel(ReadableByteChannel channel) throws IOException { int read = 0; //size是一个4字节大小的内存空间 //如果size还有剩余的内存空间。 if (size.hasRemaining()) { //先读取4字节的数据,(代表的意思就是后面跟着的消息体的大小) int bytesRead = channel.read(size); if (bytesRead < 0) throw new EOFException(); read += bytesRead; //一直要读取到当这个size没有剩余空间 //说明已经读取到了一个4字节的int类型的数了。 //这里如果size没有读满,就不会去读消息体的信息 if (!size.hasRemaining()) { size.rewind(); // int receiveSize = size.getInt(); if (receiveSize < 0) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); if (maxSize != UNLIMITED && receiveSize > maxSize) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); //分配一个内存空间,这个内存空间的大小 //就是刚刚读出来的那个4字节的int的大小。 this.buffer = ByteBuffer.allocate(receiveSize); } } if (buffer != null) { //去读取数据 int bytesRead = channel.read(buffer); if (bytesRead < 0) throw new EOFException(); read += bytesRead; } return read; } public ByteBuffer payload() { return this.buffer; }}
2. 消息体
public NetworkReceive read() throws IOException { NetworkReceive result = null; if (receive == null) { receive = new NetworkReceive(maxReceiveSize, id); } //一直在读取数据。 receive(receive); //是否读完一个完整的响应消息 if (receive.complete()) { receive.payload().rewind(); result = receive; receive = null; } return result; }
public boolean complete() { //size 没有剩余空间(size读满) &&后面的条件保证 只有消息体填满才可以继续读下个消息 return !size.hasRemaining() && !buffer.hasRemaining(); }
转载地址:https://blog.csdn.net/weixin_37850264/article/details/112338914 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
初次前来,多多关照!
[***.217.46.12]2024年04月19日 17时18分38秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
C++_类和对象_对象特性_拷贝构造函数调用时机---C++语言工作笔记042
2019-04-26
C++_类和对象_对象特性_构造函数调用规则---C++语言工作笔记043
2019-04-26
C++_类和对象_对象特性_深拷贝与浅拷贝---C++语言工作笔记044
2019-04-26
AndroidStudio_android中实现对properties文件的读写操作_不把properties文件放在assets文件夹中_支持读写---Android原生开发工作笔记238
2019-04-26
弹框没反应使用Looper解决_the caller should invoke Looper.prepare() and Looper.loop()---Android原生开发工作笔记239
2019-04-26
Command line is too long. Shorten command line for Application---微服务升级_SpringCloud Alibaba工作笔记0067
2019-04-26
C++_类和对象_对象特性_初始化列表---C++语言工作笔记045
2019-04-26
kivy制作安卓APP--简单音乐播放器
2019-04-26
十年(程序员改编)
2019-04-26
c++排序算法个人总结
2019-04-26
看完你就知道的乐观锁和悲观锁
2019-04-26
Docker入门
2019-04-26
Spring Aop 扫盲
2019-04-26
看完这篇操作系统,和面试官扯皮就没问题了
2019-04-26
安卓开发入门教程-Fragment
2019-04-26