Spring WebFlux(Reactor3)重试
发布日期:2021-05-06 01:36:29 浏览次数:23 分类:精选文章

本文共 5737 字,大约阅读时间需要 19 分钟。

使用Spring的同学应该对重试并不陌生,在Spring中有一个专门用于重试的模块。使用这个模块,只需要一个注解就能优雅的实现重试了。那么我们也知道,Spring WebFlux响应式技术栈相比经典的命令式编程技术栈Spring MVC有了很大的改变,那么在Spring WebFlux中重试是否有Spring MVC那样方便呢?

Spring WebFlux底层是使用Project Reactor,在Reactor中有一个专门的Retry动作。下面我们就一起学习一下Reactor的重试吧。

 

我们先看看Reactor的几个重试动作。在Reactor 3.4.3中只有下面三个动作用于重试了,在之前的版本中还有一些其他的方法。可能是出于简化操作的考虑将它们移除了。

  •    public final Flux<T> retry() ;
  •    public final Flux<T> retry(long numRetries) ;
  •    public final Flux<T> retryWhen(Retry retrySpec) ;

public final Flux<T> retry()

public final Flux<T> retry() 和public final Flux<T> retry(long numRetries) 其实是同一个方法。在retry()中调用了retry(long munRetries),使用的数量是Long的最大值。所以这两个方法我就一起说了。retry()动作是当操作序列发生错误后重新订阅序列。

下面有一个简单的例子:

AtomicInteger time = new AtomicInteger(-1);        Flux.just("liu", "cheng").map(e -> {                    System.out.println("map1处理:" + e);                    return e;                }        ).map(e -> {            System.out.println("map2处理:" + e);            if (time.get() <= 0 && "cheng".equals(e)) {                time.getAndIncrement();                throw new RuntimeException("这是一个错误");            }            return e.toUpperCase();        }).retry(2).subscribe(e -> {            System.out.println(e);        });

执行结果为:

map1处理:liumap2处理:liuLIUmap1处理:chengmap2处理:chengmap1处理:liumap2处理:liuLIUmap1处理:chengmap2处理:chengmap1处理:liumap2处理:liuLIUmap1处理:chengmap2处理:chengCHENG

通过结果我们可以看出,当操作序列发生错误后会从头开始重新消费所有元素,即使它之前被消费过了。所以,这里就需要我们注意了,在平时业务开发过程中如果业务在重试时,已经执行过的不能再被重复执行时,我们在执行任务的时候就需要自己添加判断。如果执行过就直接跳过当前任务消费下一个任务。

 

 public final Flux<T> retryWhen(Retry retrySpec)

retryWhen就是Reactor重试比较高级的用法了。它接收一个Retry参数,这个Retry还有几个子类。

在Retry中提供了下面这些方法:

可以看出里面有很多方法。我就从选几个比较常用的介绍一下吧。

  • backoff方法返回就其实是Retry的子类RetryBackoffSpec。它需要两个参数:最大重试次数和最小间隔时间。

例子:

AtomicInteger time = new AtomicInteger(-1);        Flux.just("liu", "cheng").map(e -> {                    System.out.println("时间:"+System.currentTimeMillis() + " map1处理:" + e);                    return e;                }        ).map(e -> {            System.out.println("map2处理:" + e);            if (time.get() <= 0 && "cheng".equals(e)) {                time.getAndIncrement();                throw new RuntimeException("这是一个错误");            }            return e.toUpperCase();//最多重试3次,每次的最短时间间隔为1秒        }).retryWhen(Retry.backoff(3, Duration.ofSeconds(1))).subscribe(e -> {            System.out.println(e);        });        Thread.currentThread().sleep(100000);

 执行结果

时间:1614606928130 map1处理:liumap2处理:liuLIU时间:1614606928130 map1处理:chengmap2处理:cheng时间:1614606929290 map1处理:liumap2处理:liuLIU时间:1614606929290 map1处理:chengmap2处理:cheng时间:1614606931117 map1处理:liumap2处理:liuLIU时间:1614606931117 map1处理:chengmap2处理:chengCHENGDisconnected from the target VM, address: '127.0.0.1:62822', transport: 'socket'Process finished with exit code 0

从执行结果可以看出,两次重试的时间间隔分别是:1160,2987。

  • fixedDelay方法返回的也是RetryBackoffSpec。它需要两个参数:最大重试次数和固定的间隔时间。

例子:

AtomicInteger time = new AtomicInteger(-1);        Flux.just("liu", "cheng").map(e -> {                    System.out.println("时间:" + System.currentTimeMillis() + " map1处理:" + e);                    return e;                }        ).map(e -> {            System.out.println("map2处理:" + e);            if (time.get() <= 0 && "cheng".equals(e)) {                time.getAndIncrement();                throw new RuntimeException("这是一个错误");            }            return e.toUpperCase();//最大重试3次,固定延迟1秒        }).retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(1))).subscribe(e -> {            System.out.println(e);        });        Thread.currentThread().sleep(100000);

执行结果:

时间:1614607836856 map1处理:liumap2处理:liuLIU时间:1614607836856 map1处理:chengmap2处理:cheng时间:1614607837986 map1处理:liumap2处理:liuLIU时间:1614607837986 map1处理:chengmap2处理:cheng时间:1614607838988 map1处理:liumap2处理:liuLIU时间:1614607838988 map1处理:chengmap2处理:chengCHENG

从执行结果看,两次重试的时间间隔为:1130,1002。 

  • from方法。它需要一个Function函数。具体通过下面的例子更直观一些。
@Test    public void testFrom() {        AtomicInteger time = new AtomicInteger(-1);        Flux.just("liu", "cheng").map(e -> {                    System.out.println("map1处理:" + e);                    return e;                }        ).map(e -> {            System.out.println("map2处理:" + e);            if (time.get() <= 0 && "cheng".equals(e)) {                time.getAndIncrement();                throw new RuntimeException("这是一个错误");            }            return e.toUpperCase();        }).retryWhen(Retry.from((retrySignals) -> {            return retrySignals.map(rs -> getNumberOfTries(rs));        })).subscribe();    }    private Long getNumberOfTries(Retry.RetrySignal rs) {        System.out.println("重试:" + rs.totalRetries());        if (rs.totalRetries() < 3) {            return rs.totalRetries();        } else {            System.err.println("retries exhausted");            throw Exceptions.propagate(rs.failure());        }    }

它实现了retry(3)的逻辑。重试超过3次后就抛出异常不在重试了。

  • withThrowable方法。它和from有一点类似也是接收一个Function,但是它的参数是异常。

@Test    public void testThrow() {        AtomicInteger time = new AtomicInteger(-1);        Flux.just("liu", "cheng").map(e -> {                    System.out.println("map1处理:" + e);                    return e;                }        ).map(e -> {            System.out.println("map2处理:" + e);            if (time.get() <= 0 && "cheng".equals(e)) {                time.getAndIncrement();                throw new RuntimeException("这是一个错误");            }            return e.toUpperCase();        }).retryWhen(Retry.withThrowable((retrySignals) -> {            return retrySignals.map(rs -> {                if(rs instanceof Exception){                    throw new RuntimeException("重试错误");                }else{                    return rs;                }            });        })).subscribe();    }

感谢阅读,希望对你有帮助。

如果觉得写得不错就请作者喝杯喜茶吧~

参考内容:

上一篇:分布式微服务权限校验方式一览
下一篇:Spring WebFlux(Reactor3)响应式编程处理异常

发表评论

最新留言

路过,博主的博客真漂亮。。
[***.116.15.85]2025年05月01日 18时26分04秒