
本文共 8423 字,大约阅读时间需要 28 分钟。
相关文章:
netty版本 4.1.6Netty 的优雅退出
在实际项目中,Netty 作为高性能的异步 NIO 通信框架,往往用作基础通信框架负责各种协议的接入、解析和调度等,例如在 RPC 和分布式服务框架中,往往会使用 Netty 作为内部私有协议的基础通信框架。
当应用进程优雅退出时,作为通信框架的 Netty 也需要优雅退出,主要原因如下:- 尽快的释放 NIO 线程、句柄等资源;
- 如果使用 flush 做批量消息发送,需要将积攒在发送队列中的待发送消息发送完成;
- 正在 write 或者 read 的消息,需要继续处理;
- 设置在 NioEventLoop 线程调度器中的定时任务,需要执行或者清理。
下面我们看下 Netty 优雅退出涉及的主要操作和资源对象:

- 把 NIO 线程的状态位设置成
ST_SHUTTING_DOWN
状态,不再处理新的消息(不允许再对外发送消息); - 退出前的预处理操作:把发送队列中
尚未发送或者正在发送的消息
发送完、把已经到期或者在退出超时之前到期的定时任务
执行完成、把用户注册到NIO
线程的退出Hook 任务
执行完成; - 资源的释放操作:所有
Channel
的释放、多路复用器
的去注册和关闭、所有队列
和定时任务
的清空取消,最后是NIO
线程的退出。
下面我们具体看下如何实现 Netty 的优雅退出:
Netty 优雅退出的接口和总入口在EventLoopGroup
,调用它的 shutdownGracefully
方法即可,相关代码如下:
bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();
除了无参
的 shutdownGracefully
方法,还可以指定退出的超时时间
和周期
,相关接口定义如下:

// 统一定义 JVM 退出事件,并将 JVM 退出事件作为主题对进程内部发布 // 所有需要优雅退出的消费者订阅 JVM 退出事件主题 // 监听 JVM 退出的 ShutdownHook 被启动之后,发布 JVM 退出事件 // 消费者监听到 JVM 退出事件,开始执行自身的优雅退出 // 如果所有的非守护线程都成功完成优雅退出,进程主动退出 // 如果到了退出的超时时间仍然没正常退出,则由停机脚本通过 kill -9 pid 强杀进程,强制退出
总结一下:JVM 的 ShutdownHook 被触发之后,调用所有 EventLoopGroup 实例的 shutdownGracefully 方法进行优雅退出。由于 Netty 自身对优雅退出有较完善的支持,所以实现起来相对比较简单。
一些误区
在实际工作中,由于对优雅退出和资源释放的原理不太清楚,或者对 Netty 的接口不太了解,很容易把优雅退出和资源释放混淆,导致出现各种问题。
如下案例:本意是想把某个 Channel 关闭,但是却调用了 Channel 关联的 EventLoop 的 shutdownGracefully,导致把 EventLoop 线程和注册在该线程持有的多路复用器上所有的 Channel 都关闭了,错误代码
如下所示:
ctx.channel().eventLoop().shutdownGracefully();
正确的做法如下所示:调用 channel 的 close
方法,关闭链路,释放与该 Channel 相关的资源:
ctx.channel().close();
除非是整个进程优雅退出,一般情况下不会调用 EventLoopGroup 和 EventLoop 的 shutdownGracefully 方法,更多的是链路 channel 的关闭和资源释放。
原理分析
Netty 优雅退出涉及到线程组、线程、链路、定时任务等,底层实现细节非常复杂,下面我们就层层分解,通过源码来剖析它的实现原理。
建议先查阅前一篇文章中的类关系图
,比较清晰
NioEventLoopGroup
NioEventLoopGroup
实际是NioEventLoop
的线程组
,它的优雅退出比较简单,直接遍历 EventLoop
数组,循环调用它们的 shutdownGracefully
方法,源码如下:
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup { public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { //循环调用 shutdownGracefully 方法 for (EventExecutor l: children) { l.shutdownGracefully(quietPeriod, timeout, unit); } return terminationFuture(); }
NioEventLoop
调用NioEventLoop
的 shutdownGracefully
方法,首先就是要修改线程状态为正在关闭状态,它的实现在父类 SingleThreadEventExecutor 中,它们的继承关系如下: 在eclipse中,选中NioEventLoop类,右键,quick type hierarchy
,查看当前所在类的继承层次,包括实现接口,类继承层次

SingleThreadEventExecutor
的 shutdownGracefully
: 1.我们抓住其中的关键点,该方法只是将线程状态修改为ST_SHUTTING_DOWN
,并不执行具体的关闭操作
(类似shutdown方法将线程状态修改为ST_SHUTDOWN)。
2.for()循环是为了保证修改state的线程(原生线程或者外部线程)有且只有一个,并且通过CAS操作来确保线程安全
代码如下:public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { ... for (;;) { //加锁 if (isShuttingDown()) { return terminationFuture(); }
与原文相比,此处没有使用synchronized
,使用CAS方案在优雅退出的时候做了并发保护
。
完成状态修改之后,剩下的操作主要在 NioEventLoop
中进行,代码如下:
public final class NioEventLoop extends SingleThreadEventLoop { protected void run() { .... if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } }
我们继续看下 closeAll
的实现,它的原理是把注册在 selector
上的所有 Channel
都关闭,但是有些 Channel
正在发送消息,暂时还不能关,需要稍后再执行,核心代码如下:
循环调用 Channel Unsafe
的 close
方法,下面我们跳转到 Unsafe
中,对 close
方法进行分析。
public final class NioEventLoop extends SingleThreadEventLoop { private void closeAll() { selectAgain(); Setkeys = selector.keys(); Collection channels = new ArrayList (keys.size()); for (SelectionKey k: keys) { Object a = k.attachment(); if (a instanceof AbstractNioChannel) { channels.add((AbstractNioChannel) a); } else { k.cancel(); @SuppressWarnings("unchecked") NioTask task = (NioTask ) a; invokeChannelUnregistered(task, k, null); } } //循环调用 `Channel Unsafe`的 `close` 方法 for (AbstractNioChannel ch: channels) { ch.unsafe().close(ch.unsafe().voidPromise()); }
AbstractUnsafe
AbstractUnsafe 的 close 方法主要做了如下几件事:
1.判断当前该链路是否有消息正在发送,如果有则将关闭操作封装成 Task 放到 eventLoop 中稍后再执行:
2.将发送队列清空,不再允许发送新的消息:
3.调用 SocketChannel
的 close
方法,关闭链路:
4.调用 pipeline
的 fireChannelInactive
,触发链路关闭通知事件:
handler
的channelInactive
方法 public class MyHandler extends ChannelInboundHandlerAdapter { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
5.最后是调用 deregister,从多路复用器上取消 SelectionKey:
protected abstract class AbstractUnsafe implements Unsafe public final void close(final ChannelPromise promise) { ... //触发关闭链路通知,并注销 fireChannelInactiveAndDeregister(wasActive);
TaskQueue
NioEventLoop 执行完 closeAll()操作之后,需要调用 confirmShutdown 看是否真的能够退出,它的处理逻辑如下:
1.执行 TaskQueue
中排队的 Task
,代码如下:
if (runAllTasks() || runShutdownHooks()) {
2.执行注册到 NioEventLoop 中的 ShutdownHook
,代码如下:
if (runAllTasks() || runShutdownHooks()) {
3.判断是否到达优雅退出的指定超时时间
,如果达到或者过了超时时间,则立即退出,代码如下:
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) { return true; }
4.如果没到达指定的超时时间,暂时不退出,每隔 100MS 检测下是否有新的任务加入,有则继续执行:
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) { // Check if any tasks were added to the queue every 100ms. // TODO: Change the behavior of takeTask() so that it returns on timeout. wakeup(true); try { Thread.sleep(100); } catch (InterruptedException e) { // Ignore } return false; }
到此为止,confirmShutdown 方法讲解完毕,confirmShutdown 返回 true,则 NioEventLoop 线程正式退出,Netty 的优雅退出完成。
疑问解答
1. runAllTasks 重复执行问题
在 NioEventLoop 的 run 方法中,已经调用了runAllTasks
方法,为何紧随其后,在 confirmShutdown 中有继续调用 runAllTasks 方法呢,疑问代码如下: public final class NioEventLoop extends SingleThreadEventLoop { protected void run() { runAllTasks(ioTime * (100 - ioRatio) / ioRatio); ... if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; }
原因主要有两个:
1.为了防止定时任务 Task 或者用户自定义的线程 Task 的执行过多占用 NioEventLoop 线程的调度资源,Netty 对 NioEventLoop 线程 I/O 操作和非 I/O 操作时间做了比例限制,即限制非 I/O 操作的执行时间,如上图红框中代码所示。有了执行时间限制,因此可能会导致已经到期的定时任务、普通任务没有执行完,需要等待下次 Selector 轮询继续执行。在线程退出之前,需要对本该执行但是没有执行完成的 Task 进行扫尾处理,所以在 confirmShutdown 中再次调用了 runAllTasks 方法;
2.在调用 runAllTasks 方法之后,执行 confirmShutdown 之前,用户向 NioEventLoop 中添加了新的普通任务或者定时任务,因此需要在退出之前再次遍历并处理一遍 Task Queue。
2. 优雅退出是否能够保证所有在通信线程排队的消息全部发送出去
实际是无法保证
的,它只能保证如果现在正在发送消息过程中,调用了优雅退出方法,此时不会关闭链路,继续发送,如果发送操作完成,无论是否还有消息尚未发送出去,在下一轮 Selector 的轮询中,链路将会关闭,没有发送完成的消息将会被丢弃,甚至是半包消息。它的处理原理图如下: 
调用优雅退出之后,是否关闭链路
,判断标准是 inFlush0 是否为 true
,如果为 False,则会执行链路关闭操作;
场景 B:如果一次没有把消息发送完成,此时 Netty 会监听写事件,触发 Selector 的下一次轮询并发送消息,代码如下:

Selector 轮询时,首先处理读写事件,然后再处理定时任务和普通任务,因此在链路关闭之前,还有最后一次继续发送的机会,代码如下:
try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); }
如果非常不幸,再次发送仍然没有把积压的消息全部发送完毕,再次发生了写半包,那无论是否有积压消息,执行 AbstractUnsafe.close 的 Task 还是会把链路给关闭掉,原因是只要完成一次消息发送操作,Netty 就会把 inFlush0 置为 false,代码如下:
try { doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { outboundBuffer.failFlushed(t, true); } } finally { inFlush0 = false; }
链路关闭之后,所有尚未发送的消息都将被丢弃。
可能有些读者会有疑问,如果在第二次发送之后,执行 AbstractUnsafe.close 之前,业务正好又调用了 flush 操作,inFlush0 是否会被修改成 True 呢?这个是不可能的,因为从 Netty 4.X 之后线程模型发生了变更,flush 操作不是由用户线程执行,而是由 Channel 对应的 NioEventLoop 线程执行,所以在两者之间不会发生 inFlush0 被修改的情况。
Netty 4.X 之后的线程模型如下所示:

对于上述场景,需要应用层来保证相关的可靠性,或者对 Netty 的优雅退出机制进行优化。
发表评论
最新留言
关于作者
