
本文共 5737 字,大约阅读时间需要 19 分钟。
使用Spring的同学应该对重试并不陌生,在Spring中有一个专门用于重试的模块。使用这个模块,只需要一个注解就能优雅的实现重试了。那么我们也知道,Spring WebFlux响应式技术栈相比经典的命令式编程技术栈Spring MVC有了很大的改变,那么在Spring WebFlux中重试是否有Spring MVC那样方便呢?
Spring WebFlux底层是使用Project Reactor,在Reactor中有一个专门的Retry动作。下面我们就一起学习一下Reactor的重试吧。
我们先看看Reactor的几个重试动作。在Reactor 3.4.3中只有下面三个动作用于重试了,在之前的版本中还有一些其他的方法。可能是出于简化操作的考虑将它们移除了。
- public final Flux<T> retry() ;
- public final Flux<T> retry(long numRetries) ;
- public final Flux<T> retryWhen(Retry retrySpec) ;
public final Flux<T> retry()
public final Flux<T> retry() 和public final Flux<T> retry(long numRetries) 其实是同一个方法。在retry()中调用了retry(long munRetries),使用的数量是Long的最大值。所以这两个方法我就一起说了。retry()动作是当操作序列发生错误后重新订阅序列。
下面有一个简单的例子:
AtomicInteger time = new AtomicInteger(-1); Flux.just("liu", "cheng").map(e -> { System.out.println("map1处理:" + e); return e; } ).map(e -> { System.out.println("map2处理:" + e); if (time.get() <= 0 && "cheng".equals(e)) { time.getAndIncrement(); throw new RuntimeException("这是一个错误"); } return e.toUpperCase(); }).retry(2).subscribe(e -> { System.out.println(e); });
执行结果为:
map1处理:liumap2处理:liuLIUmap1处理:chengmap2处理:chengmap1处理:liumap2处理:liuLIUmap1处理:chengmap2处理:chengmap1处理:liumap2处理:liuLIUmap1处理:chengmap2处理:chengCHENG
通过结果我们可以看出,当操作序列发生错误后会从头开始重新消费所有元素,即使它之前被消费过了。所以,这里就需要我们注意了,在平时业务开发过程中如果业务在重试时,已经执行过的不能再被重复执行时,我们在执行任务的时候就需要自己添加判断。如果执行过就直接跳过当前任务消费下一个任务。
public final Flux<T> retryWhen(Retry retrySpec)
retryWhen就是Reactor重试比较高级的用法了。它接收一个Retry参数,这个Retry还有几个子类。
在Retry中提供了下面这些方法:
可以看出里面有很多方法。我就从选几个比较常用的介绍一下吧。
- backoff方法返回就其实是Retry的子类RetryBackoffSpec。它需要两个参数:最大重试次数和最小间隔时间。
例子:
AtomicInteger time = new AtomicInteger(-1); Flux.just("liu", "cheng").map(e -> { System.out.println("时间:"+System.currentTimeMillis() + " map1处理:" + e); return e; } ).map(e -> { System.out.println("map2处理:" + e); if (time.get() <= 0 && "cheng".equals(e)) { time.getAndIncrement(); throw new RuntimeException("这是一个错误"); } return e.toUpperCase();//最多重试3次,每次的最短时间间隔为1秒 }).retryWhen(Retry.backoff(3, Duration.ofSeconds(1))).subscribe(e -> { System.out.println(e); }); Thread.currentThread().sleep(100000);
执行结果
时间:1614606928130 map1处理:liumap2处理:liuLIU时间:1614606928130 map1处理:chengmap2处理:cheng时间:1614606929290 map1处理:liumap2处理:liuLIU时间:1614606929290 map1处理:chengmap2处理:cheng时间:1614606931117 map1处理:liumap2处理:liuLIU时间:1614606931117 map1处理:chengmap2处理:chengCHENGDisconnected from the target VM, address: '127.0.0.1:62822', transport: 'socket'Process finished with exit code 0
从执行结果可以看出,两次重试的时间间隔分别是:1160,2987。
- fixedDelay方法返回的也是RetryBackoffSpec。它需要两个参数:最大重试次数和固定的间隔时间。
例子:
AtomicInteger time = new AtomicInteger(-1); Flux.just("liu", "cheng").map(e -> { System.out.println("时间:" + System.currentTimeMillis() + " map1处理:" + e); return e; } ).map(e -> { System.out.println("map2处理:" + e); if (time.get() <= 0 && "cheng".equals(e)) { time.getAndIncrement(); throw new RuntimeException("这是一个错误"); } return e.toUpperCase();//最大重试3次,固定延迟1秒 }).retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(1))).subscribe(e -> { System.out.println(e); }); Thread.currentThread().sleep(100000);
执行结果:
时间:1614607836856 map1处理:liumap2处理:liuLIU时间:1614607836856 map1处理:chengmap2处理:cheng时间:1614607837986 map1处理:liumap2处理:liuLIU时间:1614607837986 map1处理:chengmap2处理:cheng时间:1614607838988 map1处理:liumap2处理:liuLIU时间:1614607838988 map1处理:chengmap2处理:chengCHENG
从执行结果看,两次重试的时间间隔为:1130,1002。
- from方法。它需要一个Function函数。具体通过下面的例子更直观一些。
@Test public void testFrom() { AtomicInteger time = new AtomicInteger(-1); Flux.just("liu", "cheng").map(e -> { System.out.println("map1处理:" + e); return e; } ).map(e -> { System.out.println("map2处理:" + e); if (time.get() <= 0 && "cheng".equals(e)) { time.getAndIncrement(); throw new RuntimeException("这是一个错误"); } return e.toUpperCase(); }).retryWhen(Retry.from((retrySignals) -> { return retrySignals.map(rs -> getNumberOfTries(rs)); })).subscribe(); } private Long getNumberOfTries(Retry.RetrySignal rs) { System.out.println("重试:" + rs.totalRetries()); if (rs.totalRetries() < 3) { return rs.totalRetries(); } else { System.err.println("retries exhausted"); throw Exceptions.propagate(rs.failure()); } }
它实现了retry(3)的逻辑。重试超过3次后就抛出异常不在重试了。
-
withThrowable方法。它和from有一点类似也是接收一个Function,但是它的参数是异常。
@Test public void testThrow() { AtomicInteger time = new AtomicInteger(-1); Flux.just("liu", "cheng").map(e -> { System.out.println("map1处理:" + e); return e; } ).map(e -> { System.out.println("map2处理:" + e); if (time.get() <= 0 && "cheng".equals(e)) { time.getAndIncrement(); throw new RuntimeException("这是一个错误"); } return e.toUpperCase(); }).retryWhen(Retry.withThrowable((retrySignals) -> { return retrySignals.map(rs -> { if(rs instanceof Exception){ throw new RuntimeException("重试错误"); }else{ return rs; } }); })).subscribe(); }
感谢阅读,希望对你有帮助。
如果觉得写得不错就请作者喝杯喜茶吧~
参考内容:
发表评论
最新留言
关于作者
