【RPC系列】2、从Reactor模型到Netty核心
发布日期:2021-06-24 15:28:51 浏览次数:2 分类:技术文章

本文共 7793 字,大约阅读时间需要 25 分钟。

一、NIO编程到底层交互

JDK1.4的java.nio.*包下引入了全新的Java I/O类库,它最初是由select/poll模型,JDK1.5之后又增加了对epoll的支持,不过只有Linux内核版本2.6以上才能生效。NIO适用于连接数多且连接比较短的轻量级操作架构。

NIO通过事件模型的异步通知机制去处理输入/输出的相关操作。如图:

以上可以看出:

(1)Acceptor负责接收客户端Socket发起的新建连接请求;

(2)Socket绑定到了一个Reactor线程上(Selector选择器与通信管道Channel);

(3)服务端将通信管道注册到Selector事件选择器上;

(4)Selector接管事件状态的监听,派发工作线程处理。

有分发功能Selector选择器其实就是久仰大名的多路复用选择器,扩展下多路复用原理:

1、每一个连接对应一个 Channel(多路指多个 Channel,复用指多个连接复用了一个线程或少量线程,在 Netty 指 EventLoop)2、一个 Channel 对应唯一的 ChannelPipeline,3、多个 Handler 串行的加入到 Pipeline 中,每个 Handler 关联唯一的 ChannelHandlerContext。

这样看Reactor模型不算复杂,但是涉及到服务器的多核心,还需要并发处理大量的客户连接,所以需要多线程Reactor。

一般原则,Reactor的数量与CPU核心数保持一致,这样一个8核的服务器面对如果有8000个连接,每个CPU承担1000个连接。

那么就会有人说,如果做到这么平均,没看到有负载均衡呢,这是个复杂的问题,后面考虑。

Selector选择器通过不断轮询注册在其上的Channel来选择并分发已处理就绪的事件,它可以同时轮旋多个Channel,一个Selector接入成千上万个客户端也不会产生明显的性能瓶颈。

当Selector轮询时发现Channel的数据状态有变化时,会通过SelectorKey触发对应的事件,并由相关的Handler实现事件处理。使用单线程的Selector来处理多个Channel可以极大的减少多个线程对系统资源的占用,同样也降低了线程上下文切换的开销。

最常见的Selector监听事件有(参考SelectionKey中):

//If the selector detects that the corresponding channel is ready for reading, //has reached end-of-stream, has been remotely shut down for further reading, or has an error pending//then it will add OP_READ to the key's ready-operation set and add the key to its selected-keyOP_READ:可读事件//If the selector detects that the corresponding channel is ready for writing//has been remotely shut down for further writing, or has an error pending//then it will add OP_WRITE to the key's ready set and add the key to its selected-keyOP_WRITE:可写事件//If the selector detects that the corresponding socket channel is ready to complete its connection sequence//or has an error pending, then it will add OP_CONNECT to the key's ready set and add the key to its selected-keyOP_CONNECT:客户端连接服务端事件//If the selector detects that the corresponding server-socket channel is ready to accept another connection//or has an error pending, then it will add OP_ACCEPT to the key's ready set and add the key to its selected-keyOP_ACCEPT:服务端接收客户端事件

这章主要介绍设计模型和处理流程,不涉及具体代码,后续对应展开(学习Selector模型是理解NIO的关键)。

除了Selector多路复用选择器外,NIO还有几个核心的概念:

1、Buffer(包含需要读取或者写入的数据的缓冲区)

在RPC系列1中,描述过接收缓冲区和发送缓冲区,在NIO中也叫直接缓冲区,可以直接操作JVM的对外内存,还有一直就是非直接缓冲区,只能操作JVM的堆中内存。缓冲区是建立在Socket之上,保存从Socket中读取的输入字节流,并循环利用,以降低GC的压力。Buffer的实现类都包含三个基本属性:capacity、limit、position。先看下读写内存模型设计:

Capacity是缓冲区可容纳的最大数据量,在缓冲区创建时被初始化,limit是缓冲区当前数据量的边界,position是下一个将要被读或写的元素索引位置,这三个属性的关系是capacity>=limit>=position>=0

在写入时,limit与capacity相同,每写入一组数据后position会加1,直到position到达capacity的位置或者写入结束,最终limit指向了position的位置。

在读取时,每读取一组数据,position会加1,读取到limit所在的位置结束,如果缓冲区完全被数据充满,那么limit则等于capacity。

2、Channel(双向的数据读写通道)

什么是双向读写呢?Channel可以实现读和写同时操作,Channel同时支持阻塞和非阻塞模式,正如官方给的解释that is capable of performing one or more distinct I/O operations。可以实现一个或多个I/O操作,Channels are, in general, intended to be safe for multithreaded access,通道的目的是为了多线程访问安全。

01ddb70f051528a2b3b47f6385b0b212cd2.jpg

那么回到题目,nio是如何与底层交互的,其实也就是如何在Java中执行机器命令,拿ByteBuffer的put方法来说,上面我们说了ByteBuffer有直接内存的处理方法,那就是直接跳过了JVM,与堆外的内存进行交互

我们跟踪DirectByteBuffer进去看到下面的代码:

public ByteBuffer put(byte arg0) {	unsafe.putByte(this.ix(this.nextPutIndex()), arg0);	return this;}

看到unsafe就应该知道这是个native方法

/**     * Stores a value into a given memory address.  If the address is zero, or     * does not point into a block obtained from {@link #allocateMemory}, the     * results are undefined.     *     * @see #getByte(long)     */    public native void    putByte(long address, byte x);

毕竟程序都是程序员写出来的,那肯定能追踪到幕后黑手,

二、Netty的设计模式

先看下netty的架构图

从图中就可以看出Netty是由核心(Core)、传输服务(Transport Services)以及协议支持(Protocal Support)这几个模块组成。上面看不懂也没关系,我们先跟着流程学习如何使用Netty创建自己的通信。

首先,咱们使用netty-all-4.1.31创建服务端:

//初始化工作线程组,可以指定合理的线程池大小,默认值为当前服务器CPU核数的2倍EventLoopGroup workerGroup = new NioEventLoopGroup();//初始化分发与监听事件的轮询线程组。Netty使用的是与NIO相同的Selector方式,通过EventLoopGroup//初始化线程池,这个线程池只需要一个线程用于监听事件状态即可EventLoopGroup parentGroup = new NioEventLoopGroup(1);//初始化服务端Netty启动类ServerBootstrap boot = new ServerBootstrap();try {    //设置监听线程组与工作线程组	boot.group(parentGroup, workerGroup)        //设置网络通道相关参数,具体参数意思下面介绍		.option(ChannelOption.SO_BACKLOG, 128)        //将处理I/O的通道设置为使用NIO		.channel(NioServerSocketChannel.class)        //添加事件回调方法处理器,即相应的事件触发后的监听处理器,通过        //自定义的回调处理器处理业务逻辑		.childHandler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { //添加编码回调处理器,用于将网络传过来的二进制字节数组解码成服务需要的对象 //涉及到序列化的知识后面章节会提到 ch.pipeline().addLast(new RpcDecoder(Request.class) //添加编码回调处理器,用于将服务端回写至客户端的对象编码成二进制 //字节数组,网络传输的约束。 , new RpcEncoder(Response.class) //添加定制化业务的回调处理器 , new MyServerHandler()); } }); //绑定提供服务的端口并且开始准备接收客户端发送过来的请求 future = boot.bind(9999).sync(); //帅气的关闭线程组 future.channel().closeFuture().sync();} catch (InterruptedException e) { e.printStackTrace();} finally { workerGroup.shutdownGracefully(); parentGroup.shutdownGracefully();}

以上就是使用Netty启动服务端的程序,比较简单清晰,编解码的回调程序可以参考ByteToMessageDecoder和MessageToByteEncoder这两个类,下面看下定制类的写法

public class MyServerHandler extends ChannelInboundHandlerAdapter {	private ApplicationContext applicationContext;		public void setApplicationContext(ApplicationContext applicationContext) {		this.applicationContext = applicationContext;	}	@SuppressWarnings("static-access")	@Override	public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {        //通过转换成定制的对象		Request msg = (Request) obj;        //这里模仿了rpc的服务调用,写了个代理反射,获取到本地的服务实例		Class
clazz = Thread.currentThread().getContextClassLoader().getClass().forName(msg.getApi()); Method method = clazz.getMethod(msg.getMethodName(), msg.getParamTypes()); Object ref = applicationContext.getBean(msg.getMapper()); //业务逻辑处理调用 String result = (String) method.invoke(ref, msg.getArgs()); //响应封装成定制的对象 Response response = new Response(); response.setRequestId(msg.getRequestId()); response.setParams(msg.getResult() + " 请求成功,反馈结果请接受处理。"); response.setResult(result); //上下文写出去 ctx.writeAndFlush(response); //释放 ReferenceCountUtil.release(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); }}

channelRead方法在客户端发送消息到服务端时触发。这里可以定制化实现业务逻辑,最后将对象写入缓冲区并刷新缓冲区至客户端。这里如果不调用writeAndFlush方法而是调用write方法,那么消息只会写入缓冲区,而不会真正写到客户端,如果在多次write方法之后再进行一次flush方法,那么就会合并缓冲区写入客户端的次数,减少了交互的次数。

客户端的代码也比较类似,简单看下下面的代码即可

//创建工作线程组,客户端无需监听,直接连就完了EventLoopGroup workerGroup = new NioEventLoopGroup();try {        //初始化netty的客户端启动程序	Bootstrap boot = new Bootstrap();        //设置工作线程组	boot.group(workerGroup)            //将处理I/O的通道设置为使用NIO		.channel(NioSocketChannel.class)            //配置与网络通道相关的参数		.option(ChannelOption.AUTO_READ, true)            //添加事件回调处理方法		.handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel ch) throws Exception { //以下与服务端的添加回调处理器一致 ch.pipeline().addLast( new RpcDecoder(Response.class) , new RpcEncoder(Request.class) , new MyClientHandler()); } }); //连接到服务端主机和端口 channelFuture = boot.connect(host, port).sync(); //帅气的关闭连接 channelFuture.channel().closeFuture().sync();}catch(Exception e) { e.printStackTrace();}finally { workerGroup.shutdownGracefully(); }

客户端的定制化业务回调处理器与服务端的一致,就当作互相通信的的业务逻辑处理。

通过上述代码的分析,可以看出,Netty分离了业务处理以及序列化/反序列化与服务端主进程的耦合,让代码更加清晰易懂,Netty极大了简化了NIO的开发,建议大家使用netty-all-4.*以上的API进行开发。

附上常用的网络参数:

  • ChannelOption.SO_BACKLOG——tcp/ip 协议listen 函数中的backlog 参数,指定了队列的大小
  • ChannelOption.SO_REUSEADDR——表示允许重复使用本地地址和端口,允许的话可以多个进程使用
  • ChannelOption.SO_KEEPALIVE——如果在两小时内没有数据的通信时,TCP 会自动发送一个活动探测数据报文。
  • ChannelOption.SO_SNDBUF 和ChannelOption.SO_RCVBUF这两个参数用于操作接收缓冲区和发送缓冲区的大小
  • ChannelOption.SO_LINGER——使用SO_LINGER 可以阻塞close()的调用时间,直到数据完全发送
  • ChannelOption.TCP_NODELAY——Nagle 算法是将小的数据包组装为更大的帧然后进行发送

以上内容参考并摘抄书籍Leader-us著《架构解密·从分布式到微服务》、张亮等著《未来架构·从服务化到云原生》

转载地址:https://blog.csdn.net/weixin_33602978/article/details/105226019 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:【RPC系列】3、序列化技术(必读)
下一篇:【RPC系列】1、聊一聊Rpc实现流程

发表评论

最新留言

哈哈,博客排版真的漂亮呢~
[***.90.31.176]2024年04月14日 21时09分05秒