
项目实战 从 0 到 1 学习之Flink(25)Flink从redis中获取数据作为source源
发布日期:2021-05-14 00:16:44
浏览次数:18
分类:博客文章
本文共 2290 字,大约阅读时间需要 7 分钟。
redis中的数据:
需要实现SourceFunction接口,指定泛型<>,也就是获取redis里的数据,处理完后的数据输入的数据类型 这里我们需要的是(我们需要返回kv对的,就要考虑HashMap)pom.xmlredis.clients jedis 2.9.3
Java代码:
package ryx.source;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import redis.clients.jedis.Jedis;import redis.clients.jedis.exceptions.JedisConnectionException;import javax.swing.plaf.TableHeaderUI;import java.util.HashMap;import java.util.Map;/** * * 在redis中保存的有国家和大区的关系 * hset areas AREA_US US * hset areas AREA_CT TW,HK * hset areas AREA_AR PK,KW,SA * hset areas AREA_IN IN *./bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic allDataClean--from-beginning * * 我们需要返回kv对的,就要考虑HashMap */public class MyRedisSource implements SourceFunction> { private Logger logger= LoggerFactory.getLogger(MyRedisSource.class); private boolean isRunning =true; private Jedis jedis=null; private final long SLEEP_MILLION=5000; public void run(SourceContext > ctx) throws Exception { this.jedis = new Jedis("hadoop01", 6379); HashMap kVMap = new HashMap (); while(isRunning){ try{ kVMap.clear(); Map areas = jedis.hgetAll("areas"); for(Map.Entry entry:areas.entrySet()){ // key :大区 value:国家 String key = entry.getKey(); String value = entry.getValue(); String[] splits = value.split(","); System.out.println("key:"+key+",--value:"+value); for (String split:splits){ // key :国家value:大区 kVMap.put(split, key); } } if(kVMap.size()>0){ ctx.collect(kVMap); }else { logger.warn("从redis中获取的数据为空"); } Thread.sleep(SLEEP_MILLION); }catch (JedisConnectionException e){ logger.warn("redis连接异常,需要重新连接",e.getCause()); jedis = new Jedis("hadoop01", 6379); }catch (Exception e){ logger.warn(" source 数据源异常",e.getCause()); } } } public void cancel() { isRunning=false; while(jedis!=null){ jedis.close(); } }}
结果为:
key:AREA_US,–value:USkey:AREA_CT,–value:TW,HKkey:AREA_AR,–value:PK,KW,SAkey:AREA_IN,–value:IN接着将value数据进行分割单个的单词,和key进行进行组合装到HashMap中,通过Run方法的SourceContext对象,作为Source源进行输出!
发表评论
最新留言
路过按个爪印,很不错,赞一个!
[***.219.124.196]2025年04月24日 16时30分20秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
解决Chrome播放视频闪屏黑屏无法播放
2019-03-11
Java中final的理解
2019-03-11
Git简单理解与使用
2019-03-11
echarts 基本图表开发小结
2019-03-11
二分查找.基于有序数组的查找方法.704
2019-03-11
C语言文档操作
2019-03-11
制作JS验证码(简易)
2019-03-11
adb通过USB或wifi连接手机
2019-03-11
vue使用ecahrts词云图
2019-03-11
【README】回溯算法基本框架
2019-03-11
数组中常见的算法
2019-03-11
泛型机制 Generic
2019-03-11
包装类
2019-03-11
JDK9-15新特性
2019-03-11
集合继承结构
2019-03-11
ArrayList 实现类
2019-03-11
LinkedList 实现类
2019-03-11
Vector 实现类
2019-03-11
HashMap类、HashSet
2019-03-11