flink异步IO --黑马
发布日期:2021-06-28 21:02:43 浏览次数:2 分类:技术文章

本文共 6643 字,大约阅读时间需要 22 分钟。

异步IO实现拉宽操作

Async I/O 是阿里巴巴贡献给社区的一个呼声非常高的特性,于1.2版本引入。主要目的是为了解决与外部系统交互时网络延迟成为了系统瓶颈的问题。

异步IO操作的需求

Flink在做流数据计算时,很多时候需要与外部系统进行交互(比如数据库、Redis、Hive、HBase等等存储系统)。

 往往需要注意系统间通信延迟是否会拖慢整个Flink作业,影响整体吞吐量和实时性。

场景:

流计算系统中经常需要于外部系统进行交互,比如需要查询外部数据库以关联上用户的额外信息,通常,我们的实现方式是向数据库发送用户a的查询请求(例如在MapFunction中),然后等待结果返回,在这之前,我们无法发送用户b的查询请求,这是一种同步访问的模式,如下图左边所示。

在这里插入图片描述

图中棕色的长条标识等待时间,可以发现网络等待时间极大的阻碍了吞吐和延迟,为了解决同步访问的问题,异步模式可以并发的处理多个请求和回复,也就是说,你可以连续的向数据库发送用户a、b、c、d等的请求,与此同时,哪个请求的回复先返回了就处理哪个回复,从而连续的请求之间不需要阻塞等待,如上图右边所示,这也正是Async I/O的实现原理。

实现的目标:提高吞吐量

*方式* *说明*
同步、提高并行度(MapFunction) 非常高的资源成本;高并行度MapFunction意味着更多的subtask,线程,网络连接,数据库连接,缓冲区等等
Async I/O 与数据库的异步交互意味着一个并行函数实例可以同时处理多个请求并同时接收响应(资源复用),这样等待时间可以与发送其他请求和接收响应重叠,至少等待时间是在多个请求上平摊的,这在大多数据情况下会导致更高的流吞吐量
使用Aysnc I/O的前提条件
  1. 对外部系统进行异步IO访问的客户端API。(比如使用vertx,但是目前只支持scala 2.12的版本,可以使用java类库来做)

  2. 或者在没有这样的客户端的情况下,可以通过创建多个客户端并使用线程池处理同步调用来尝试将同步客户端转变为有限的并发客户端。但是,这种方法通常比适当的异步客户端效率低。(比如可以写ExecutorService来实现)

Async I/O API实现异步流式转换

Async I/O API允许用户在数据流中使用异步客户端访问外部存储,该API处理与数据流的集成,以及消息顺序性(Order),事件时间(EventTime),一致性(容错)等脏活累活,用户只专注于业务

如果目标数据库中有异步客户端,则三步即可实现异步流式转换操作(针对该数据库的异步):

  1. 实现用来分发请求的AsyncFunction,用来向数据库发送异步请求并设置回调

  2. 获取操作结果的callback,并将它提交给ResultFuture

  3. 将异步I/O操作应用于DataStream

伪代码如下:

在这里插入图片描述

当异步I/O请求超时时,默认情况下会抛出异常并重新启动Job,如果希望处理超时,可以覆盖AsyncFunction的timeout方法

在这里插入图片描述

Asycn IO应用于DataStream

AsyncDataStream是一个工具类,用于将AsyncFunction应用于DataStream,AsyncFunction发出的并发请求都是无序的,该顺序基于哪个请求先完成,为了控制结果记录的发出顺序,flink提供了两种模式,分别对应AsyncDataStream的两个静态方法,OrderedWait和unorderedWait

AsyncDataStream.orderedWait();

AsyncDataStream.unorderWait();

orderedWait(有序):消息的发送顺序与接收到的顺序相同(包括 watermark ),也就是先进先出。

unorderWait(无序):

1)在ProcessingTime中,完全无序,即哪个请求先返回结果就先发送(最低延迟和最低消耗)。

2)在EventTime中,以watermark为边界,介于两个watermark之间的消息可以乱序,但是watermark和消息之间不能乱序,这样既认为在无序中又引入了有序,这样就有了与有序一样的开销。(具体我们会在后面的原理中讲解)。

代码实现java

Flink实现异步IO实战 java

代码实现scala
class AsyncOrderDetailRedisRequest extends RichAsyncFunction[RowData, OrderGoodsWideEntity]{
var jedis:Jedis = _ override def open(parameters: Configuration): Unit = {
jedis = RedisUtil.getJedis() jedis.select(1) } override def close(): Unit = {
if(jedis != null) {
jedis.close() } } override def timeout(input: RowData, resultFuture: ResultFuture[OrderGoodsWideEntity]): Unit = {
println("超时") } // 2. 定义Future回调的执行上下文(当前线程) // ExecutionContext: scala.concurrent.ExecutionContext // Executors: org.apache.flink.runtime.concurrent.Executors implicit lazy val executor = ExecutionContext.fromExecutor(Executors.directExecutor()) override def asyncInvoke(rowData: RowData, resultFuture: ResultFuture[OrderGoodsWideEntity]): Unit = {
// 发起异步请求,获取请求结果Future,调用的是Future代码 Future {
if(!jedis.isConnected){
println("重新获取redis连接") jedis = RedisUtil.getJedis() jedis.select(1) } // 1. 根据 goodsId 获取商品名称 val goodsJSON = jedis.hget("itcast_shop:dim_goods", rowData.getColumns.get("goodsId")) //print(goodsJSON) val dimGoods = DimGoodsDBEntity(goodsJSON) val shopJSON = jedis.hget("itcast_shop:dim_shops", dimGoods.shopId + "") //print(shopJSON) val dimShop = DimShopsDBEntity(shopJSON) val thirdCatJSON = jedis.hget("itcast_shop:dim_goods_cats", dimGoods.goodsCatId + "") //print(thirdCatJSON) val dimThirdCat = DimGoodsCatDBEntity(thirdCatJSON) val secondCatJSON = jedis.hget("itcast_shop:dim_goods_cats", dimThirdCat.parentId) //print(secondCatJSON) val dimSecondCat = DimGoodsCatDBEntity(secondCatJSON) val firstCatJSON = jedis.hget("itcast_shop:dim_goods_cats", dimSecondCat.parentId) //print(firstCatJSON) val dimFirstCat = DimGoodsCatDBEntity(firstCatJSON) val cityJSON = jedis.hget("itcast_shop:dim_org", dimShop.areaId + "") //print(cityJSON) val dimOrgCity = DimOrgDBEntity(cityJSON) val regionJSON = jedis.hget("itcast_shop:dim_org", dimOrgCity.parentId + "") //print(regionJSON) val dimOrgRegion = DimOrgDBEntity(regionJSON) val orderGoods = OrderGoodsWideEntity(rowData.getColumns.get("ogId").toLong, rowData.getColumns.get("orderId").toLong, rowData.getColumns.get("goodsId").toLong, rowData.getColumns.get("goodsNum").toLong, rowData.getColumns.get("goodsPrice").toDouble, dimGoods.goodsName, dimShop.shopId, dimThirdCat.catId.toInt, dimThirdCat.catName, dimSecondCat.catId.toInt, dimSecondCat.catName, dimFirstCat.catId.toInt, dimThirdCat.catName, dimShop.areaId, dimShop.shopName, dimShop.shopCompany, dimOrgCity.orgId, dimOrgCity.orgName, dimOrgRegion.orgId, dimOrgRegion.orgName) // 异步IO拉取数据 print("异步IO拉取数据" + orderGoods) resultFuture.complete(Array(orderGoods)) } }}
原理实现

AsyncDataStream.(un)orderedWait 的主要工作就是创建了一个 AsyncWaitOperator。AsyncWaitOperator 是支持异步 IO 访问的算子实现,该算子会运行 AsyncFunction 并处理异步返回的结果,其内部原理如下图所示。

在这里插入图片描述
如图所示,AsyncWaitOperator 主要由两部分组成:StreamElementQueue 和 Emitter。StreamElementQueue 是一个 Promise 队列,所谓 Promise 是一种异步抽象表示将来会有一个值(海底捞排队给你的小票),这个队列是未完成的 Promise 队列,也就是进行中的请求队列。Emitter 是一个单独的线程,负责发送消息(收到的异步回复)给下游。

图中E5表示进入该算子的第五个元素(”Element-5”),在执行过程中首先会将其包装成一个 “Promise” P5,然后将P5放入队列。最后调用 AsyncFunction 的 ayncInvoke 方法,该方法会向外部服务发起一个异步的请求,并注册回调。该回调会在异步请求成功返回时调用 AsyncCollector.collect 方法将返回的结果交给框架处理。实际上 AsyncCollector 是一个 Promise ,也就是 P5,在调用 collect 的时候会标记 Promise 为完成状态,并通知 Emitter 线程有完成的消息可以发送了。Emitter 就会从队列中拉取完成的 Promise ,并从 Promise 中取出消息发送给下游。

消息的顺序性

上文提到 Async I/O 提供了两种输出模式。其实细分有三种模式: 有序,ProcessingTime 无序,EventTime 无序。Flink 使用队列来实现不同的输出模式,并抽象出一个队列的接口(StreamElementQueue),这种分层设计使得AsyncWaitOperator和Emitter不用关心消息的顺序问题。StreamElementQueue有两种具体实现,分别是 OrderedStreamElementQueue 和

UnorderedStreamElementQueue。UnorderedStreamElementQueue 比较有意思,它使用了一套逻辑巧妙地实现完全无序和 EventTime 无序。

*有序*

有序比较简单,使用一个队列就能实现。所有新进入该算子的元素(包括 watermark),都会包装成 Promise 并按到达顺序放入该队列。如下图所示,尽管P4的结果先返回,但并不会发送,只有 P1 (队首)的结果返回了才会触发 Emitter 拉取队首元素进行发送。

在这里插入图片描述
*ProcessingTime 无序*

ProcessingTime 无序也比较简单,因为没有 watermark,不需要协调 watermark 与消息的顺序性,所以使用两个队列就能实现,一个 uncompletedQueue 一个 completedQueue。所有新进入该算子的元素,同样的包装成 Promise 并放入 uncompletedQueue 队列,当uncompletedQueue队列中任意的Promise返回了数据,则将该 Promise 移到 completedQueue 队列中,并通知 Emitter 消费。如下图所示:

在这里插入图片描述

*EventTime 无序*

EventTime 无序类似于有序与 ProcessingTime 无序的结合体。因为有 watermark,需要协调 watermark 与消息之间的顺序性,所以uncompletedQueue中存放的元素从原先的 Promise 变成了 Promise 集合。如果进入算子的是消息元素,则会包装成 Promise 放入队尾的集合中。如果进入算子的是 watermark,也会包装成 Promise 并放到一个独立的集合中,再将该集合加入到 uncompletedQueue 队尾,最后再创建一个空集合加到 uncompletedQueue 队尾。这样,watermark 就成了消息顺序的边界。只有处在队首的集合中的 Promise 返回了数据,才能将该 Promise 移到 completedQueue 队列中,由 Emitter 消费发往下游。只有队首集合空了,才能处理第二个集合。这样就保证了当且仅当某个 watermark 之前所有的消息都已经被发送了,该 watermark 才能被发送。过程如下图所示:

在这里插入图片描述

转载地址:https://blog.csdn.net/yangshengwei230612/article/details/116425305 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:IDEA 导入模块
下一篇:scala中样例类和apply方法的使用

发表评论

最新留言

路过,博主的博客真漂亮。。
[***.116.15.85]2024年04月28日 20时44分43秒