《高级》Flink异步io链接Redis--Java和Scala版
发布日期:2021-07-17 15:49:22
浏览次数:4
分类:技术文章
本文共 5328 字,大约阅读时间需要 17 分钟。
最近发现好多小伙伴不知道如何异步链接redis
我准备了两个版本 java版本和scala版本
直接上代码,大部分同学看了应该会懂
刚开始学习flink的同学中间细节的东西,不明白的可以微信联系我,可以进入我的flink微信交流群。
喜欢flink的朋友,支持一下原创,可以关注我的公众号:
先看java版本
import com.alibaba.fastjson.JSON;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.AsyncDataStream;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.async.ResultFuture;import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPoolConfig;import java.util.Collections;import java.util.Properties;import java.util.concurrent.CompletableFuture;import java.util.concurrent.TimeUnit;import java.util.function.Supplier;public class AsynFlinkRedisJava { public static void main(String args[]) throws Exception{ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(500); Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("zookeeper.connect", "localhost:2181"); props.setProperty("group.id", "flink-kafka"); FlinkKafkaConsumer08 consumer = new FlinkKafkaConsumer08("flink1", new SimpleStringSchema(), props); DataStreamstream = env.addSource(consumer); DataStream resultStream = AsyncDataStream.unorderedWait(stream, new AsyncRedis(),1000, TimeUnit.MICROSECONDS, 100); resultStream.print(); env.execute("kaishi"); }}class AsyncRedis extends RichAsyncFunction { private transient JedisPool pool; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); pool= new JedisPool(new JedisPoolConfig(), "localhost", 6379); } @Override public void asyncInvoke(String input, final ResultFuture resultFuture) throws Exception { CompletableFuture.supplyAsync(new Supplier () { @Override public String get() { try { String imei = JSON.parseObject(input).get("imei").toString(); Jedis jedis = pool.getResource(); String result = jedis.hget("DC_IMEI_APPID",imei); pool.returnResource(jedis); return result; } catch (Exception e) { System.out.println(e); return null; } } }).thenAccept( (String dbResult) -> { resultFuture.complete(Collections.singleton(dbResult)); }); }}
再看scala版本:
import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, StreamExecutionEnvironment, _}import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08import com.alibaba.fastjson.JSONimport redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}import java.util.concurrent.TimeUnitimport org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}import scala.concurrent.{ExecutionContext, Future}object AsynFlinkRedis { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() //kafka位置 老版本的 kafka是配置zookeeper地址 properties.setProperty("bootstrap.servers","localhost:9092") properties.setProperty("zookeeper.connect","localhost:2181") val topic = "flink1" properties.setProperty("group.id", "test-flink") val kafkStream = new FlinkKafkaConsumer08(topic,new SimpleStringSchema(),properties) val stream = env.addSource(kafkStream) stream.print() val resultStream=AsyncDataStream.unorderedWait(stream,new RedisAsyncFunction(), 1000, TimeUnit.MILLISECONDS, 100) resultStream.print() env.execute() }}class RedisAsyncFunction extends AsyncFunction[String,String]{ lazy val pool = new JedisPool(new JedisPoolConfig,"localhost",6379) override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = { Future { //获取kafka日志的imei号 val imei = JSON.parseObject(input).get("imei").toString //从redis中获取imei对应的userid println(pool.getNumActive) val jedis = pool.getResource val useridJson =jedis.hget("DC_IMEI_APPID",imei) print(useridJson) resultFuture.complete(Seq(useridJson)) pool.returnResource(jedis) }(ExecutionContext.global)}}
kafka练习日志:
{"accStatus":"NULL","addr":"","alertType":"stayAlert","fenceId":"NULL","gpsTime":"2019-01-29 23:45:01","iccid":"NULL","imei":"868620190220000","imsi":"NULL","lat":"46.795862","lng":"134.011538","offlineTime":"NULL","postTime":"2019-01-30 00:00:00","time":"NULL","type":"DEVICE"}
redis测试数据:
hset DC_IMEI_APPID 868620190220000 "{\"allFullId\":\"1,130396,130395,129659\",\"appId\":\"TRACKER\",\"userId\":\"129659\",\"mcType\":\"GT06N\",\"timeZone\":\"UTCA08:00\"}"
转载地址:https://blog.csdn.net/huzechen/article/details/99980622 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年03月27日 14时06分36秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
git log 你学废了吗?
2019-04-27
40张动图揭示各种传感器工作原理!
2019-04-27
别错过校招
2019-04-27
90后中国程序员“黑吃黑”博彩网站,半年获利256万,判刑11年半
2019-04-27
用一张图片告诉你芯片设计
2019-04-27
昨天小米股价大涨的背后:UWB芯片到底是个什么鬼?
2019-04-27
MTK 平台TP调试遇坑
2019-04-27
当你使用微信和QQ的时候,请不要忘记ICQ这个伟大的公司!
2019-04-27
db 是个什么鬼?
2019-04-27
我看完大连理工研究生的遗书之后
2019-04-27
你见过出道即巅峰吗?
2019-04-27
面试官:如何写出让 CPU 跑得更快的代码?
2019-04-27
代码优化导致的奇葩问题
2019-04-27
想领取开发套件,就来参加AIoT开发者大赛
2019-04-27
深度:关于Linux内核最硬核的文章
2019-04-27
Type-C PD充电简介
2019-04-27
[教程]win10 ,ubuntu双系统安装避坑指南
2019-04-27
我要不要离职?
2019-04-27
Linux字符设备驱动内幕
2019-04-27
【速来抢】iPhone12、STM32开发板、1024元现金红包…打包免费送!!!
2019-04-27