
项目实战从 0 到 1 学习之Flink (24)Flink将kafka的数据存到redis中
发布日期:2021-05-14 00:16:43
浏览次数:18
分类:博客文章
本文共 9553 字,大约阅读时间需要 31 分钟。
1、依赖
UTF-8 1.7.2 1.7.7 1.2.17 2.11.8 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_2.11 ${flink.version} org.apache.flink flink-scala_2.11 ${flink.version} org.apache.flink flink-streaming-scala_2.11 ${flink.version} org.apache.bahir flink-connector-redis_2.11 1.0 org.apache.flink flink-connector-kafka-0.11_2.11 ${flink.version} org.apache.flink flink-connector-filesystem_2.11 1.7.2 com.alibaba fastjson 1.2.51 org.slf4j slf4j-log4j12 ${slf4j.version} runtime log4j log4j ${log4j.version} runtime org.apache.kafka kafka-clients 0.11.0.2
2、代码实现一:
//1.创建StreamExecutionEnvironmentval env=StreamExecutionEnvironment.getExecutionEnvironmentval props=new Properties()props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOS:9092")props.put(ConsumerConfig.GROUP_ID_CONFIG,"g1")val kafkaConsumer=new FlinkKafkaConsumer("topic01",new SimpleStringSchema(),props)val redisConfig=new FlinkJedisPoolConfig.Builder().setHost("CentOS").setPort(6379).build()val redisSink= new RedisSink(redisConfig,new WordPairRedisMapper)//2.设置Sourceval lines:DataStream[String]=env.addSource[String](kafkaConsumer)lines.flatMap(_.split("\\s+")).map((_,1)).keyBy(0).sum(1).addSink(redisSink)//4.执行任务env.execute("wordcount")
package com.baizhi.demo03import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}class WordPairRedisMapper extends RedisMapper[(String,Int)]{ override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET,"clicks") } override def getKeyFromData(t: (String, Int)): String = { t._1 } override def getValueFromData(t: (String, Int)): String = { t._2.toString }}
3、代码实现二:
package ryx;import com.alibaba.fastjson.JSONArray;import com.alibaba.fastjson.JSONObject;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;import org.apache.flink.util.Collector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import ryx.source.MyRedisSource;import java.util.HashMap;import java.util.Properties;public class DataClean { private static Logger logger= LoggerFactory.getLogger(DataClean.class); public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //checkpoint配置 env.enableCheckpointing(60000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000); env.getCheckpointConfig().setCheckpointTimeout(10000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置statebackend //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true)); String topicS="allData"; Properties prop = new Properties(); prop.setProperty("group.id", "cleanData"); prop.setProperty("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092"); FlinkKafkaConsumer011myConsumer = new FlinkKafkaConsumer011 (topicS, new SimpleStringSchema(), prop); //读取kafka中的数据 /** *{"dt":"2019-10-10 16:45:32","countryCode":"US","data":[{"type":"s5","score":0.5,"level":"B"},{"type":"s1","score":0.8,"level":"B"}]} *{"dt":"2019-10-10 16:45:34","countryCode":"HK","data":[{"type":"s5","score":0.5,"level":"A"},{"type":"s3","score":0.3,"level":"D"}]} *{"dt":"2019-10-10 16:45:36","countryCode":"KW","data":[{"type":"s1","score":0.1,"level":"B"},{"type":"s4","score":0.2,"level":"A"}]} *{"dt":"2019-10-10 16:45:38","countryCode":"HK","data":[{"type":"s2","score":0.2,"level":"A+"},{"type":"s3","score":0.1,"level":"C"}]} */ DataStreamSource data = env.addSource(myConsumer); //这里是前面的博客的----flink从redis中获取数据作为source源 DataStreamSource > mapData = env.addSource(new MyRedisSource()); DataStream resData = data.connect(mapData).flatMap(new CoFlatMapFunction , String>() { //存储国家和大区的映射关系 private HashMap allMap = new HashMap (); //flatMap1处理获取kafka中的数据 public void flatMap1(String value, Collector out) throws Exception { JSONObject jsonObject = JSONObject.parseObject(value); String dt = jsonObject.getString("dt"); String area = jsonObject.getString("countryCode"); logger.info("获取的时间戳为:" + dt + "---获取的国家为: " + area); System.out.println("获取的时间戳为:" + dt + "---获取的国家为: " + area); //获取大区的 String ar = allMap.get(area); JSONArray jsonArr = jsonObject.getJSONArray("data"); logger.info("获取的json字符串的data大小为:" + jsonArr.size()+"ar:"+ar); for (int i = 0; i < jsonArr.size(); i++) { JSONObject jsonOb = jsonArr.getJSONObject(i); logger.info("获取的data的json数组的数据为" + jsonOb); jsonOb.put("area", ar); jsonOb.put("dt", dt); out.collect(jsonOb.toJSONString()); System.out.println("获取的Json字符串为:" + jsonOb.toJSONString()); } } //flatMap2处理获取redis中的数据 public void flatMap2(HashMap value, Collector out) throws Exception { this.allMap=value; } }); String outTopic="allDataClean"; Properties outProp = new Properties(); outProp.setProperty("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092"); //设置事务超市时间 outProp.setProperty("transaction.timeout.ms",60000*15+""); FlinkKafkaProducer011 writeKafka = new FlinkKafkaProducer011 (outTopic, new KeyedSerializationSchemaWrapper (new SimpleStringSchema()), outProp, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE); resData.addSink(writeKafka); env.execute("DataCLean"); }}
结果:
key:AREA_USvalueUSkey:AREA_CTvalueTW,HKkey:AREA_ARvaluePK,KW,SAkey:AREA_INvalueIN获取的时间戳为:2019-10-10 16:45:38—获取的国家为: HK获取的Json字符串为:{“area”:“AREA_CT”,“dt”:“2019-10-10 16:45:38”,“score”:0.2,“level”:“A+”,“type”:“s2”}获取的Json字符串为:{“area”:“AREA_CT”,“dt”:“2019-10-10 16:45:38”,“score”:0.1,“level”:“C”,“type”:“s3”}获取的时间戳为:2019-10-10 16:45:40—获取的国家为: KW获取的Json字符串为:{“area”:“AREA_AR”,“dt”:“2019-10-10 16:45:40”,“score”:0.2,“level”:“A+”,“type”:“s3”}获取的Json字符串为:{“area”:“AREA_AR”,“dt”:“2019-10-10 16:45:40”,“score”:0.2,“level”:“A+”,“type”:“s5”}获取的时间戳为:2019-10-10 16:45:42—获取的国家为: US获取的Json字符串为:{“area”:“AREA_US”,“dt”:“2019-10-10 16:45:42”,“score”:0.2,“level”:“D”,“type”:“s3”}获取的Json字符串为:{“area”:“AREA_US”,“dt”:“2019-10-10 16:45:42”,“score”:0.2,“level”:“C”,“type”:“s4”}key:AREA_USvalueUSkey:AREA_CTvalueTW,HKkey:AREA_ARvaluePK,KW,SAkey:AREA_INvalueIN获取的时间戳为:2019-10-10 16:45:44—获取的国家为: IN获取的Json字符串为:{“area”:“AREA_IN”,“dt”:“2019-10-10 16:45:44”,“score”:0.2,“level”:“A”,“type”:“s1”}获取的Json字符串为:{“area”:“AREA_IN”,“dt”:“2019-10-10 16:45:44”,“score”:0.2,“level”:“B”,“type”:“s1”}获取的时间戳为:2019-10-10 16:45:46—获取的国家为: US获取的Json字符串为:{“area”:“AREA_US”,“dt”:“2019-10-10 16:45:46”,“score”:0.5,“level”:“A”,“type”:“s5”}获取的Json字符串为:{“area”:“AREA_US”,“dt”:“2019-10-10 16:45:46”,“score”:0.8,“level”:“C”,“type”:“s3”}图片:
发表评论
最新留言
表示我来过!
[***.240.166.169]2025年05月04日 18时23分04秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
VTK:可视化之RandomProbe
2019-03-09
block多队列分析 - 2. block多队列的初始化
2019-03-09
Java时间
2019-03-09
不编译只打包system或者vendor image命令
2019-03-09
MySQL
2019-03-09
The wxWindows Library Licence (WXwindows)
2019-03-09
leetcode——第203题——虚拟头结点
2019-03-09
【编程】C语言入门:1到 100 的所有整数中出现多少个数字9
2019-03-09
MySQL----基础及常用命令
2019-03-09
flink启动(二)
2019-03-09
前端开发进阶手册.pdf
2019-03-09
软件架构设计和MESH经验之谈
2019-03-09
redis持久化分析
2019-03-09
打开word时424错误
2019-03-09
如何添加开机自启项
2019-03-09
❤️一个18k运维项目经验这样做的,offer到碗里来❤️
2019-03-09
关于宝塔面板安装的mysql用Navicat连接出现2003的错误解决
2019-03-09
Windows2016 FTP用户隔离
2019-03-09
js传入参数是中文的时候出现 “******”未定义错误
2019-03-09
responded with a status of 404 ()
2019-03-09