项目实战 从 0 到 1 学习之Flink(25)Flink从redis中获取数据作为source源
发布日期:2021-05-14 00:16:44 浏览次数:18 分类:博客文章

本文共 2290 字,大约阅读时间需要 7 分钟。

redis中的数据:

需要实现SourceFunction接口,指定泛型<>,也就是获取redis里的数据,处理完后的数据输入的数据类型 这里我们需要的是
(我们需要返回kv对的,就要考虑HashMap)
pom.xml

redis.clients
jedis
2.9.3

Java代码:

package ryx.source;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import redis.clients.jedis.Jedis;import redis.clients.jedis.exceptions.JedisConnectionException;import javax.swing.plaf.TableHeaderUI;import java.util.HashMap;import java.util.Map;/** * * 在redis中保存的有国家和大区的关系 * hset  areas AREA_US US * hset  areas AREA_CT TW,HK * hset  areas AREA_AR PK,KW,SA * hset  areas AREA_IN IN *./bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic allDataClean--from-beginning * * 我们需要返回kv对的,就要考虑HashMap */public class MyRedisSource implements SourceFunction
> { private Logger logger= LoggerFactory.getLogger(MyRedisSource.class); private boolean isRunning =true; private Jedis jedis=null; private final long SLEEP_MILLION=5000; public void run(SourceContext
> ctx) throws Exception { this.jedis = new Jedis("hadoop01", 6379); HashMap
kVMap = new HashMap
(); while(isRunning){ try{ kVMap.clear(); Map
areas = jedis.hgetAll("areas"); for(Map.Entry
entry:areas.entrySet()){ // key :大区 value:国家 String key = entry.getKey(); String value = entry.getValue(); String[] splits = value.split(","); System.out.println("key:"+key+",--value:"+value); for (String split:splits){ // key :国家value:大区 kVMap.put(split, key); } } if(kVMap.size()>0){ ctx.collect(kVMap); }else { logger.warn("从redis中获取的数据为空"); } Thread.sleep(SLEEP_MILLION); }catch (JedisConnectionException e){ logger.warn("redis连接异常,需要重新连接",e.getCause()); jedis = new Jedis("hadoop01", 6379); }catch (Exception e){ logger.warn(" source 数据源异常",e.getCause()); } } } public void cancel() { isRunning=false; while(jedis!=null){ jedis.close(); } }}

结果为:

key:AREA_US,–value:US
key:AREA_CT,–value:TW,HK
key:AREA_AR,–value:PK,KW,SA
key:AREA_IN,–value:IN

接着将value数据进行分割单个的单词,和key进行进行组合装到HashMap中,通过Run方法的SourceContext对象,作为Source源进行输出!

上一篇:项目实战 从 0 到 1 学习之Flink (26)Flink采集kafka数据后存到mongodb
下一篇:项目实战从 0 到 1 学习之Flink (24)Flink将kafka的数据存到redis中

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2025年04月24日 16时30分20秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章