六、取消与关闭
发布日期:2021-09-12 09:57:56 浏览次数:23 分类:技术文章

本文共 13579 字,大约阅读时间需要 45 分钟。

文章目录

取消与关闭

任务和线程的启动很容易,在大部分时候,我们都会让他们运行直到结束,或者让他们自行停止。有的时候我们希望提前结束任务或者线程,或许是因为用户取消了操作或者应用程序需要被快速关闭。

要使得任务和线程能够安全、快速、可靠的停止下来,并不是一件容易的事,Java没有提供任何机制来安全地终止线程,但是它提供了中断(Interruption),这是一种协作机制,能够使一个线程终止另一个线程的当前工作。
这种协作式的方法是必要的,我们很少希望某个任务、线程或者服务立即停止,因为这种立即停止会使共享的数据结构处于不一致的状态,相反,在编写任务和服务时可以使用一种协作的方式,当需要停止时,它们首先会清除当前正在执行的工作,然后再结束。这提供了更好的灵活性,因为任务本身的代码比发出取消请求的代码更清楚如何执行清除工作。
一个在行为良好的软件与勉强运行的软件之间的最主要的区别就是,行为良好的软件能完善的处理失败、关闭和取消等过程。

一、任务取消

如果外部代码能在某个操作正常完成之前将其置入“完成”状态,那么这个操作就可以称为可取消的(Callcellable)。

取消某个操作的原因有很多:

  • 用户请求取消
  • 有时间限制的操作
  • 应用程序事件:例如应用程序对某个问题空间进行分解并搜索,从而使得不同的任务可以搜索问题空间中的不同区域。当其中一个任务找到了解决方案的时候,所有其他仍在搜索的任务都取消。
  • 错误
  • 关闭

在Java中没有一种安全的抢占式方法来停止线程,因此也就没有安全的抢占式方法来停止任务。只有一些协作机制,使请求取消的任务和代码都遵循一种协商好的协议。

其中一种协作机制能设置某个“已请求取消”标志,任务定期查看这个标志。如下面的案例所示:

class PrimeGenerator implements Runnable{
private final ArrayList
primes = new ArrayList
(); private volatile boolean cancelled; public void run() {
BigInteger p = BigInteger.ONE; while(! cancelled) {
p = p.nextProbablePrime(); synchronized(this) {
primes.add(p); } } System.out.println("result is : "+p); } public void cancel() {
cancelled = true; } public synchronized List
get() {
return new ArrayList
(primes); } }public class Test{
public static void main(String[] args) {
Test testaSencondPrimes = new Test(); try {
List
result = testaSencondPrimes.aSecondOfPrimes(); System.out.println("reseach primes result:"); for(BigInteger i:result) System.out.println(i); } catch (InterruptedException e) {
e.printStackTrace(); } } public List
aSecondOfPrimes() throws InterruptedException { PrimeGenerator generator = new PrimeGenerator(); new Thread(generator).start(); try { Thread.sleep(1); } finally { generator.cancel(); } return generator.get(); }}

一个可取消的任务必须拥有可取消策略,在这个策略中将详细的定义取消操作的“How”、“When”以及“What”,即其他代码如何(How)请求取消任务,任务在何时(When)检查是否已经请求了取消,以及在响应取消时应该执行哪些(What)操作。

1、中断

如果使用这种方法的任务调用了一个阻塞方法,例如BlockingQueue.put,那么可能会发生一个更严重的问题—任务可能永远不会检查取消标志,因此永远不会结束。

public class BrokenPrimeProducer  extends Thread{
private final BlockingQueue
queue; private volatile boolean cancelled = false; public BrokenPrimeProducer(BlockingQueue
queue) {
this.queue = queue; } public void run() {
try {
BigInteger pBigInteger = BigInteger.ONE; while(! cancelled) {
queue.put(pBigInteger = pBigInteger.nextProbablePrime()); System.out.println("成功往队列中插入一个素数"); } } catch (InterruptedException consumed) {
} System.out.println("producer执行结束"); } public void cancel() {
cancelled = true; }}

消费者

public class TestBrokenPrimeProducer{
public static void main(String[] args) throws InterruptedException {
BlockingQueue
primes = new ArrayBlockingQueue
(2);//FIFO 队列大小设置为2 BrokenPrimeProducer producer = new BrokenPrimeProducer(primes); producer.start(); try {
System.out.println(primes.take()); System.out.println(primes.take()); } catch (InterruptedException e) {
// TODO Auto-generated catch block e.printStackTrace(); } finally {
System.out.println("主线程休眠1s producer进入阻塞状态"); Thread.sleep(1000); System.out.println("producer阻塞1s后 中断produer"); producer.cancel(); System.out.println("中断producer线程后"); for(BigInteger integer:primes) {
System.out.println("队列中仍有:"+integer); } } }}

输出结果

在队列中放一个数字在队列中放一个数字在队列中放一个数字23在队列中放一个数字主线程休眠1s producer进入阻塞状态producer阻塞1s后 中断produer中断producer线程后队列中仍有:5队列中仍有:7

上面的输出之后,阻塞队列阻塞,并没有结束。

线程中断是一种协作机制,线程可以通过这种机制来通知另一个线程,告诉它在合适的或者可能的情况下停止当前工作,并转而执行其他的线程。

每个线程都有一个boolean类型的中断状态。当中断线程时,这个线程的中断状态将被设置为true。

Thread中包含了目标线程以及查询线程中断状态的方法。

public class Thread{
public void interrupt(){
...}; //中断目标线程 public boolean isInterrupted(){
...}; //返回目标线程的中断状态 public static boolean interrupted(){
...}; //清除当前线程的中断状态,并返回它之前的值,这也是清除中断状态的唯一方法即若返回true原有的中断状态会被清除,必须对其作出处理。 ...}

中断发生的情况

  • 阻塞状态下发生中断:阻塞库方法,如Thread.sleep和Object.wait等,都会去检查线程何时中断,并且在发生中断的时候提前返回也就是阻塞提前结束。他们在响应中断执行的操作包括:清除中断状态,抛出InterruptedException,表示阻塞操作由于中断而提前结束。
  • 非阻塞状态下发生中断:中断状态将被设置,然后根据将要被取消的操作来检查中断状态以判断发生了中断。通过这样的方法,中断操作将变得"有黏性"----------如果不触发InterruptedException,那么中断状态将一直保持,直到明确清除中断状态。不阻塞的时候好像不会触发InterruptedException

调用interrupt并不意味着立即停止目标线程正在执行的工作,而只是传递了请求中断的消息。

中断并不会真正中断一个正在运行的线程,而只是发出了中断请求,然后由线程在下一个合适的时刻中断自己。

使用静态的interrupted时应该小心,因为它会清除当前的中断状态。如果在调用interrupted时返回了true,那么除非你想屏蔽这个中断,否则必须对它进行处理------可以抛出InterruptedException,或者通过再次调用Interrupt来恢复中断状态。

通常,中断是实现取消的最合理的方式。

有了中断,那么BrokenPrimeProducer中的问题则很容易解决:使用中断而不是使用标志老要求取消。在每次迭代中,有两个位置可以检测出中断:在阻塞的put方法调用中,以及在循环开始处查询中断状态时。

class PrimeProducer extends Thread{
private final BlockingQueue
queue; PrimeProducer(BlockingQueue
queue) {
this.queue = queue; } public void run() {
try {
BigInteger p = BigInteger.ONE; while(!Thread.currentThread().isInterrupted()) {
queue.put(p=p.nextProbablePrime()); } } catch(InterruptedException consumed) {
//允许线程退出 } } public void cancle() {
interrupt(); } }

2、中断策略

概念

中断策略规定线程如何解释某个中断请求—当发现中断请求时,应该做那些工作,那些单元是原子操作,以及以多快速度来响应中断。

简单说,线程是不可以随便结束的,想要结束一个线程你得明白线程如何结束(how),什么时候结束合适(when),结束以后应该如何做好收尾工作,干些什么(what),得有一个很好应对策略才行
由于每个线程拥有各自的中断策略,除非你清楚中断对该线程的含义,否则就不该中断这个线程,捕获InterruptedException之后也要恢复中断状态。调用 Thread.currentThread().interrupt();

3、响应中断

当调用可中断的阻塞函数,例如Thread.sleep或者BlockingQueue.put等,有两种实用策略可用于处理InterruptedException:

  • 传递异常,throws InterruptedException,从而使你的方法也成为可中断的阻塞方法。如果你不传递,在catch块中捕获了异常缺不做任何处理,你一定要清楚线程的中断策略,在调用栈中已经没有上层代码需要指定中断信息,否则都应该保存中断状态。
  • 恢复中断状态,从而使调用栈的上层代码能对其进行处理
import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingDeque;public class Main{
public String getNextTask(BlockingQueue
queue) {
boolean interrupted = false; try {
while (true) {
try {
return queue.take(); } catch (InterruptedException e) {
interrupted = true; Thread.currentThread().interrupt(); //结束的过位置不对,应该在finally中结束,不然就死循环了,因为每次take都会阻塞,导致抛出异常,再调用interrupt,就这样一直循环。 //System.out.println(222); // fall through and retry } } } finally {
//if (interrupted) // Thread.currentThread().interrupt(); //把上面的中文那个地方屏蔽了,写在这就对了,走不到这里。 } } //测试的代码,把上面那个方法随便扔到一个Test类中 public static void main(String[] args) throws InterruptedException {
BlockingQueue
queue = new LinkedBlockingDeque<>(2); Main t = new Main(); //设置一个结束的标识,为了让阻塞队列抛出异常 Thread.currentThread().interrupt(); t.getNextTask(queue); }}
  • 已检查异常,指的是一个函数的代码逻辑没有错误,但程序运行时会因为IO等错误导致异常,你在编写程序阶段是预料不到的。如果不处理这些异常,程序将来肯定会出错。所以编译器会提示你要去捕获并处理这种可能发生的异常,不处理就不能通过编译。
  • 未检查异常,指的是你的程序逻辑本身有问题,比如数组越界、访问null对象,这种错误你自己是可以避免的。编译器不会强制你检查这种异常。也检查不过来,太多了。

interrupt可能会抛出一个未检查的异常。

public class MyTest{
public static void get() throws InterruptedException {
System.out.println(111); Thread.sleep(111); } public static void main(String[] args) throws InterruptedException {
Thread.currentThread().interrupt(); get(); }}

4、通过Future来实现取消

ExecutorService.submit将返回一个Future来描述任务,Future拥有cancel方法,该方法带有一个boolean类型参数mayInterruptIfRunning。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。

Future.cancel(boolean mayInterruptIfRunning) true:如果应该中断执行此任务的线程,则为 true
Future.cancel(boolean mayInterruptIfRunning) false:允许正在运行的任务运行完成,
当executorService.submit一个Callable任务,且call方法有返回值,future的get方法才能返回任务的计算结果,若submit一个Runnable任务,get没有计算结果。

上面的代码中都是通过interrupt来实现线程的中断,下面通过ExecutorService.submit后,得到Future来描述任务,再通过cancel方法取消任务。

public static void timeRun(Runnable r,long timeout,TimeUnit unit,throws InterruptedException){
Future
task = taskExec.submit(r); try {
task.get(timeout,unit); } catch(TimeoutException e) {
} catch(ExecutionException e) {
throw launderThrowable(e.getCause()); } finally {
task.cancel(true); //如果任务已经结束,那么执行取消操作也不会带来任何影响,这行代码如果任务正在执行,那么将被中断 }}

当Future.get抛出InterruptedException或者TimeoutException时,如果你知道不再需要结果,那么就可以调用Future.cancel来取消任务

5、处理不可中断的阻塞

许多可阻塞方法都是提前返回或者抛出InterruptedException来响应中断请求,然而并非所有的可阻塞方法或者机制都是能响应中断,对于特殊的不可中断操作而被阻塞的线程,可以使用类似于中断的手段来停止这些线程,但是我们必须知道线程阻塞的原因(即具体问题具体分析):

  • java.io包中同步Socket I/O:在服务器应用程序中,最常见的阻塞I/O形式就是对套接字的读写,虽然InputStream和OutputStream的read和write方法都不会响应中断,但是可以通过关闭底层的套接字,可以使执行read或者write方法而阻塞的线程抛出一个SocketException
    - java.io包中同步 I/O:当中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptException并关闭链路(这回使其他在这条链路上阻塞的线程同样抛出ClosedByInterruptException)。当关闭一个InterruptibleChannel时,将导致所有在此链路上阻塞的线程都抛出AsynchronousCloseException,大多数标准的Channel都实现了InterruptibleChannel。
  • Selector的异步I/O:如果一个线程在调用Selector.select方法(在java.nio.channels中)时阻塞了,那么调用close或者wakeup方法会使线程抛出ClosedSelectorException并提前返回。
  • 等待内置锁:若一个线程由于等待某个内置锁而阻塞,那么将无法响应中断,因为线程认为它肯定会获得锁,所以它不理会中断请求。但是,在Lock类中提供了lockInterruptibly方法,该方法允许在等待一个锁的同时仍然响应中断。
    对于同步Socket阻塞情形,重新重写Thread的interrupt方法,即可关闭套接字从而中断socket阻塞,也可中断线程。
public class ReaderThread extends Thread{
private final Socket socket; private final InputStream in; public ReaderThread(Socket socket) throws IOException {
this.socket = socket; this.in = socket.getInputStream(); } public void interrupt() {
try {
socket.close(); } catch (Exception ignored) {
} finally {
super.interrupt(); } } public void run() {
try {
byte[] buf = new byte[BUFSZ]; while(true) {
int count = in.read(buf); if(count < 0) break; else if(count > 0) processBuffer(buf,count); } } catch (Exception e) {
// 运行线程退出 } }}

二、停止基于线程的服务

如果应用程序准备退出,那么这些服务所拥有的线程也需要结束,由于无法通过抢占式的方法来停止线程,因此需要提供生命周期方法。

线程的所有权是不可以传递的:应用程序可以拥有服务,服务也可以拥有工作者线程,但应用程序并不能拥有工作者线程,因此应用程序不能停止工作者线程。相反,服务应该提供生命周期方法来关闭它自己以及它所拥有的线程。

对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期方法。

1、关闭ExecutorService

ExecutorService有两种关闭方式:使用shutdown正常关闭,以及使用shutdownNow强行关闭。在进行强行关闭的时候,shutdownNow首先关闭当前正在执行的任务,然后返回所以尚未启动的任务清单。

这两种方式的区别在于各自的安全性和响应性:强行关闭的速度更快,但是风险也更大,因为任务很可能在执行到一般时被结束;而正常关闭虽然速度慢,但是却更安全,因为ExecutorService会一直等到队列中的所有任务都执行完成之后才关闭,在其他拥有线程的服务中也应该考虑提供类似的关闭方式以供选择。

2、“毒丸”对象

另外一种关闭生产者-消费者服务的方式是使用毒丸对象。毒丸的含义:当得到这个对象时,线程立即停止。

在FIFO队列中,毒丸对象将确保消费者在关闭之前首先完成队列中所有的工作,在提交毒丸对象之前提交的消费者所有工作都会被清理,而生产者在提交毒丸对象后,将不会再提交。书上说的很难懂,看了博客的简单理解是在生产者消费者实现相约好,生产者生产某个对象、数据时,消费者消费到指定的数据、对象时,两者都停止工作。

3、只执行一次的服务

如果某个方法要处理一批任务,并且所有的任务都处理完后才返回,那么可以通过一个私有的Executor来简化服务的生命周期管理,其中该executor的生命周期是由该方法控制的。

例如:checkMail方法能在多台主机上并行检查新邮件,通过创建一个私有的Executor,并向每台主机提交一个任务,当所有邮件检查任务都执行完后,关闭executor等待结束。

boolean checkMail(Set
hosts, long timeout, TimeUnit unit) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool(); final AtomicBoolean hasNewMail = new AtomicBoolean(false); try {
for (final String host : hosts) //try执行之后此执行finally,所以只能等下面的线程run结束才能shutdown {
exec.execute(new Runnable() {
@Override public void run() {
if (check(host)) hasNewMail.set(true); } }); } } finally {
exec.shutdown(); exec.awaitTermination(timeout, unit); } return hasNewMail.get(); }

4、shutdownNow的局限性

当通过shutdownNow来强行关闭ExecutorService,会尝试取消正在执行的任务,并返回所有已经提交但是尚未开始任务,但是如何知道哪些任务已经开始但是尚未结束被强行取消了?

解决办法:重写了一个TrackingExecutor(实现了ExecutorService接口),在execute(Runnable runnable)中,尝试执行完runnable的run()方法后finally进行判断该runnable所在的线程是否发生过中断,若发生过中断视为被执行过程中shutdown的任务。

public class TrackingExecutor extends AbstractExcuyorService{
private final ExecutorService exec; private final Set
tasksCancelledAtShutdown = new Collections.synchronizedSet(new HashSet
()); public List
getCancelledTasks() {
if(!exec.isTerminated()) throw new IllegalStateException(...); return new ArrayList
(tasksCancelledAtShutdown); } public void execute(final Runnable runnable) {
exec.execute( public void run() {
try {
runnable.run(); } finally {
if(isShutdown()&&Thread.currentThread().isInterrupted()) {
tasksCancelledAtShutdown.add(runnable); } } }); }}

三、处理非正常的线程终止

1、Runtime类注册关闭钩子

关闭钩子(shutdown hook)指通过Runtime.addShutdownHook注册的但尚未开始的线程。JVM不能保证关闭钩子的调用顺序,当所有的关闭钩子执行结束,那么JVM运行终结器。

  关闭钩子用于实现服务或者应用程序的清理工作,例如删除临时文件夹、清除无法由操作系统自动清除的资源。关闭钩子(即尚未开始执行的线程)要为线程安全。例如通过注册一个关闭钩子来停止日志服务。

public void start(){
Runtime.getRuntime().addShutdownHook(new Thread(){
//注册一个关闭钩子 public void run() {
try{
LogService.this.stop();} cath(InterruptedException ignored){
} } }) }

2、守护线程

有时候,你希望创建一个线程来执行辅助工作,但是又不希望这个线程阻碍JVM的关闭。在这种情况下就需要使用守护线程。

线程可以分为两种:普通线程和守护线程,在JVM启动时候创建的线程中,除了主线程之外,其他的线程都是守护线程(例如垃圾回收器以及其他执行辅助工作的线程)。当创建一个新线程时,新线程将继承创建它的线程的守护状态,因此在默认情况下,主线程创建的所有线程都是普通线程。
普通线程和守护线程的区别在于线程退出时发生的操作,当一个线程退出的时候,JVM会检查其他正在执行的线程,如果这些线程都是守护线程,那么JVM会正常退出操作,当JVM停止的时候,所有仍然存在的守护线程都将被抛弃。

转载地址:https://blog.csdn.net/weixin_38367817/article/details/103795579 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:六.1、interrupt方法的介绍
下一篇:Future、FutureTask、Runnable、Callable的关系

发表评论

最新留言

关注你微信了!
[***.104.42.241]2023年11月22日 15时09分05秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

Springboot+Vue实现在线聊天室项目-登录、注册接口的实现 2019-03-28
Springboot+Vue实现在线聊天室项目-修改头像、添加好友接口的实现 2019-03-28
Springboot+Vue实现在线聊天室项目-集成spring-WebSocket配置 2019-03-28
Springboot+Vue实现在线聊天室项目-webSocket实现的消息转发 2019-03-28
Springboot+Vue实现在线聊天室项目-推送好友请求消息接口 2019-03-28
Springboot+Vue实现在线聊天室项目-聊天室获取房间信息、分页获取消息的接口 2019-03-28
Springboot+Vue实现在线聊天室项目-主页面及聊天框页面webSocket的消息推送 2019-03-28
Vue聊天室中WebSocket的封装实现不同页面接收相同消息后进行的不同操作 2019-03-28
SpringBoot-WebSocket 使用SimpMessagingTemplate发送后前端无法监听到消息 2019-03-28
Springboot+Vue实现在线聊天室项目-总结反思 2019-03-28
小程序后端D1-服务器验证码生成以及Swagger2接口文档初次使用 2019-03-28
egret踩坑-父组件has-a子组件时子组件调用动画失效 2019-03-28
egret+java小游戏服务端01建立Netty的WebSocket服务 2019-03-28
egret+java小游戏服务端02创建基础pojo类和构建游戏逻辑 2019-03-28
egret+java小游戏服务端03重构pojo类(简单工厂+单一职责接口) 2019-03-28
egret+java小游戏服务端04重构指令解析功能(反射实现开闭原则) 2019-03-28
egret+java小游戏前端05-egret的初步使用 2019-03-28
egret+java小游戏服务端06-重构指令策略(策略模式+装饰者模式) 2019-03-28
Java不同方式实现线程的生产者消费者(synchronized、Lock、BlockingQueue) 2019-03-28
【Codeforces Global Round 13】- C.Pekora and Trampoline - 贪心 2019-03-28