(十二)粘包问题与拆包问题
发布日期:2021-11-18 17:47:33 浏览次数:8 分类:技术文章

本文共 8404 字,大约阅读时间需要 28 分钟。

文章目录


前言

  1. 粘包问题:一个请求里面带有多个响应,多个消息粘再一起给你发送回来;
  2. 拆包问题:一个消息拆成多个请求发送回来;

粘包

private void pollSelectionKeys(Iterable
selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
//获取到所有key Iterator
iterator = selectionKeys.iterator(); //遍历所有的key while (iterator.hasNext()) {
SelectionKey key = iterator.next(); iterator.remove(); //根据key找到对应的KafkaChannel KafkaChannel channel = channel(key); // register all per-connection metrics at once sensors.maybeRegisterConnectionMetrics(channel.id()); if (idleExpiryManager != null) idleExpiryManager.update(channel.id(), currentTimeNanos); try {
/* complete any connections that have finished their handshake (either normally or immediately) */ /** * * 我们代码第一次进来应该要走的是这儿分支,因为我们前面注册的是 * SelectionKey key = socketChannel.register(nioSelector, * SelectionKey.OP_CONNECT); * */ if (isImmediatelyConnected || key.isConnectable()) {
//TODO 核心的代码来了 //去最后完成网络的连接 //如果我们之前初始化的时候,没有完成网络连接的话,这儿一定会帮你 //完成网络的连接。 if (channel.finishConnect()) {
//网络连接已经完成了以后,就把这个channel存储到 this.connected.add(channel.id()); this.sensors.connectionCreated.record(); SocketChannel socketChannel = (SocketChannel) key.channel(); log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}", socketChannel.socket().getReceiveBufferSize(), socketChannel.socket().getSendBufferSize(), socketChannel.socket().getSoTimeout(), channel.id()); } else continue; } /* if channel is not ready finish prepare */ if (channel.isConnected() && !channel.ready()) channel.prepare(); /* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive; //接受服务端发送回来的响应(请求) //networkReceive 代表的就是一个服务端发送 //回来的响应 while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); } /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ //核心代码,处理发送请求的事件 //selector 注册了一个OP_WRITE //selector 注册了一个OP_READ if (channel.ready() && key.isWritable()) {
//获取到我们要发送的那个网络请求。 //是这句代码就是要往服务端发送数据了。 Send send = channel.write(); if (send != null) {
this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); } } /* cancel any defunct sockets */ if (!key.isValid()) {
close(channel); this.disconnected.add(channel.id()); } } catch (Exception e) {
String desc = channel.socketDescription(); if (e instanceof IOException) log.debug("Connection with {} disconnected", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); close(channel); this.disconnected.add(channel.id()); } } }

其中

if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive; //接受服务端发送回来的响应(请求) //networkReceive 代表的就是一个服务端发送 //回来的响应 while ((networkReceive = channel.read()) != null) addToStagedReceives(channel, networkReceive); }
public NetworkReceive read() throws IOException {
NetworkReceive result = null; if (receive == null) {
receive = new NetworkReceive(maxReceiveSize, id); } //一直在读取数据。 receive(receive); //是否读完一个完整的响应消息 if (receive.complete()) {
receive.payload().rewind(); result = receive; receive = null; } return result; }
public boolean complete() {
//size 没有剩余空间(size读满) && return !size.hasRemaining() && !buffer.hasRemaining(); }

一直递归调用,最终处理粘包问题核心代码

public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
int read = 0; //size是一个4字节大小的内存空间 //如果size还有剩余的内存空间。 if (size.hasRemaining()) {
//先读取4字节的数据,(代表的意思就是后面跟着的消息体的大小) int bytesRead = channel.read(size); if (bytesRead < 0) throw new EOFException(); read += bytesRead; //一直要读取到当这个size没有剩余空间 //说明已经读取到了一个4字节的int类型的数了。 if (!size.hasRemaining()) {
size.rewind(); // int receiveSize = size.getInt(); if (receiveSize < 0) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); if (maxSize != UNLIMITED && receiveSize > maxSize) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); //分配一个内存空间,这个内存空间的大小 //就是刚刚读出来的那个4字节的int的大小。 this.buffer = ByteBuffer.allocate(receiveSize); } } if (buffer != null) {
//去读取数据 int bytesRead = channel.read(buffer); if (bytesRead < 0) throw new EOFException(); read += bytesRead; } return read; }

拆包

50生产者|60如何处理

有两种地方可能会发生拆包

  1. 消息体的size
  2. 消息体

1. 消息体的size

public long readFromReadableChannel(ReadableByteChannel channel) throws IOException {
int read = 0; //size是一个4字节大小的内存空间 //如果size还有剩余的内存空间。 if (size.hasRemaining()) {
//先读取4字节的数据,(代表的意思就是后面跟着的消息体的大小) int bytesRead = channel.read(size); if (bytesRead < 0) throw new EOFException(); read += bytesRead; //一直要读取到当这个size没有剩余空间 //说明已经读取到了一个4字节的int类型的数了。 //这里如果size没有读满,就不会去读消息体的信息 if (!size.hasRemaining()) {
size.rewind(); // int receiveSize = size.getInt(); if (receiveSize < 0) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")"); if (maxSize != UNLIMITED && receiveSize > maxSize) throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")"); //分配一个内存空间,这个内存空间的大小 //就是刚刚读出来的那个4字节的int的大小。 this.buffer = ByteBuffer.allocate(receiveSize); } } if (buffer != null) {
//去读取数据 int bytesRead = channel.read(buffer); if (bytesRead < 0) throw new EOFException(); read += bytesRead; } return read; } public ByteBuffer payload() {
return this.buffer; }}

2. 消息体

public NetworkReceive read() throws IOException {
NetworkReceive result = null; if (receive == null) {
receive = new NetworkReceive(maxReceiveSize, id); } //一直在读取数据。 receive(receive); //是否读完一个完整的响应消息 if (receive.complete()) {
receive.payload().rewind(); result = receive; receive = null; } return result; }
public boolean complete() {
//size 没有剩余空间(size读满) &&后面的条件保证 只有消息体填满才可以继续读下个消息 return !size.hasRemaining() && !buffer.hasRemaining(); }

转载地址:https://blog.csdn.net/weixin_37850264/article/details/112338914 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:(十五)top10热门品类之需求回顾以及实现思路分析
下一篇:(十)生产者发送请求

发表评论

最新留言

初次前来,多多关照!
[***.217.46.12]2024年04月19日 17时18分38秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

C++_类和对象_对象特性_拷贝构造函数调用时机---C++语言工作笔记042 2019-04-26
C++_类和对象_对象特性_构造函数调用规则---C++语言工作笔记043 2019-04-26
C++_类和对象_对象特性_深拷贝与浅拷贝---C++语言工作笔记044 2019-04-26
AndroidStudio_java.util.ConcurrentModificationException---Android原生开发工作笔记237 2019-04-26
AndroidStudio_android中实现对properties文件的读写操作_不把properties文件放在assets文件夹中_支持读写---Android原生开发工作笔记238 2019-04-26
弹框没反应使用Looper解决_the caller should invoke Looper.prepare() and Looper.loop()---Android原生开发工作笔记239 2019-04-26
Command line is too long. Shorten command line for Application---微服务升级_SpringCloud Alibaba工作笔记0067 2019-04-26
AndroidStudio_android实现双击_3击_监听实现---Android原生开发工作笔记240 2019-04-26
C++_类和对象_对象特性_初始化列表---C++语言工作笔记045 2019-04-26
AndroidStudio安卓原生开发_UI高级_DrawerLayout_侧滑菜单控件---Android原生开发工作笔记120 2019-04-26
AndroidStudio安卓原生开发_UI高级_Shape的使用_虚线_直线_矩形_渐变_径向渐变_线性渐变_扫描渐变---Android原生开发工作笔记122 2019-04-26
AndroidStudio安卓原生开发_UI高级_StateListDrawable状态选择器_按钮按下和抬起显示不同颜色---Android原生开发工作笔记124 2019-04-26
kivy制作安卓APP--简单音乐播放器 2019-04-26
十年(程序员改编) 2019-04-26
c++排序算法个人总结 2019-04-26
看完你就知道的乐观锁和悲观锁 2019-04-26
Docker入门 2019-04-26
Spring Aop 扫盲 2019-04-26
看完这篇操作系统,和面试官扯皮就没问题了 2019-04-26
安卓开发入门教程-Fragment 2019-04-26