《高级》Flink异步io链接Redis--Java和Scala版
发布日期:2021-07-17 15:49:22 浏览次数:4 分类:技术文章

本文共 5328 字,大约阅读时间需要 17 分钟。

最近发现好多小伙伴不知道如何异步链接redis

我准备了两个版本 java版本和scala版本

直接上代码,大部分同学看了应该会懂

刚开始学习flink的同学中间细节的东西,不明白的可以微信联系我,可以进入我的flink微信交流群。

 

喜欢flink的朋友,支持一下原创,可以关注我的公众号:

 

先看java版本

import com.alibaba.fastjson.JSON;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.AsyncDataStream;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.async.ResultFuture;import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPoolConfig;import java.util.Collections;import java.util.Properties;import java.util.concurrent.CompletableFuture;import java.util.concurrent.TimeUnit;import java.util.function.Supplier;public class AsynFlinkRedisJava {
public static void main(String args[]) throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(500); Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("zookeeper.connect", "localhost:2181"); props.setProperty("group.id", "flink-kafka"); FlinkKafkaConsumer08 consumer = new FlinkKafkaConsumer08("flink1", new SimpleStringSchema(), props); DataStream
stream = env.addSource(consumer); DataStream
resultStream = AsyncDataStream.unorderedWait(stream, new AsyncRedis(),1000, TimeUnit.MICROSECONDS, 100); resultStream.print(); env.execute("kaishi"); }}class AsyncRedis extends RichAsyncFunction
 {
private transient JedisPool pool; @Override public void open(Configuration parameters) throws Exception {
super.open(parameters); pool= new JedisPool(new JedisPoolConfig(), "localhost", 6379);    } @Override public void asyncInvoke(String input, final ResultFuture
resultFuture) throws Exception {
        CompletableFuture.supplyAsync(new Supplier
() {
@Override public String get() {
try {
String imei = JSON.parseObject(input).get("imei").toString(); Jedis jedis = pool.getResource(); String result = jedis.hget("DC_IMEI_APPID",imei); pool.returnResource(jedis); return result; } catch (Exception e) {
System.out.println(e); return null; } } }).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(dbResult)); }); }}

再看scala版本:

import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, StreamExecutionEnvironment, _}import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08import com.alibaba.fastjson.JSONimport redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}import java.util.concurrent.TimeUnitimport org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}import scala.concurrent.{ExecutionContext, Future}object AsynFlinkRedis {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() //kafka位置 老版本的 kafka是配置zookeeper地址 properties.setProperty("bootstrap.servers","localhost:9092") properties.setProperty("zookeeper.connect","localhost:2181") val topic = "flink1" properties.setProperty("group.id", "test-flink") val kafkStream = new FlinkKafkaConsumer08(topic,new SimpleStringSchema(),properties) val stream = env.addSource(kafkStream) stream.print() val resultStream=AsyncDataStream.unorderedWait(stream,new RedisAsyncFunction(), 1000, TimeUnit.MILLISECONDS, 100) resultStream.print() env.execute()  }}class RedisAsyncFunction extends  AsyncFunction[String,String]{
lazy val pool = new JedisPool(new JedisPoolConfig,"localhost",6379) override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = {
Future {
//获取kafka日志的imei号 val imei = JSON.parseObject(input).get("imei").toString //从redis中获取imei对应的userid println(pool.getNumActive) val jedis = pool.getResource val useridJson =jedis.hget("DC_IMEI_APPID",imei) print(useridJson) resultFuture.complete(Seq(useridJson)) pool.returnResource(jedis) }(ExecutionContext.global)}}

kafka练习日志:

{"accStatus":"NULL","addr":"","alertType":"stayAlert","fenceId":"NULL","gpsTime":"2019-01-29 23:45:01","iccid":"NULL","imei":"868620190220000","imsi":"NULL","lat":"46.795862","lng":"134.011538","offlineTime":"NULL","postTime":"2019-01-30 00:00:00","time":"NULL","type":"DEVICE"}

redis测试数据:

hset DC_IMEI_APPID 868620190220000 "{\"allFullId\":\"1,130396,130395,129659\",\"appId\":\"TRACKER\",\"userId\":\"129659\",\"mcType\":\"GT06N\",\"timeZone\":\"UTCA08:00\"}"

转载地址:https://blog.csdn.net/huzechen/article/details/99980622 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:浅入快出--递归之斐波那契数列(一)
下一篇:Flink广播状态实战——设备异常报警

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年03月27日 14时06分36秒