从 demo 到生产 - 手把手写出实战需求的 Flink 广播程序
发布日期:2021-05-19 01:19:08 浏览次数:16 分类:博客文章

本文共 25542 字,大约阅读时间需要 85 分钟。

Flink 广播变量在实时处理程序中扮演着很重要的角色,适当的使用广播变量会大大提升程序处理效率。

 

本文从简单的 demo 场景出发,引入生产中实际的需求并提出思路与部分示例代码,应对一般需求应该没有什么问题,话不多说,赶紧来看看这篇干货满满的广播程序使用实战吧。

 

1 啥是广播 

 

Flink 支持广播变量,允许在每台机器上保留一个只读的缓存变量,数据存在内存中,在不同的 task 所在的节点上的都能获取到,可以减少大量的 shuffle 操作。

换句话说,广播变量可以理解为一个公共的共享变量,可以把一个 dataset 的数据集广播出去,然后不同的 task 在节点上都能够获取到,这个数据在每个节点上只会存在一份。

如果不使用 broadcast,则在每个节点中的每个 task 中都需要拷贝一份 dataset 数据集,比较浪费内存 (也就是一个节点中可能会存在多份 dataset 数据)

 

2 用法总结

 

//1 初始化数据DataSet
toBroadcast = env.fromElements(1,2,3)//2 广播数据 apiwithBroadcastSet(toBroadcast,"broadcastSetName")//3 获取数据Collection
broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");

 

注意

 

  • 广播变量由于要常驻内存,程序结束时才会失效,所以数据量不宜过大

     

  • 广播变量广播在初始化后不支持修改 (修改场景也有办法)

 

3 基础案例演示

 

  • 基础案例广播变量使用

 

这种场景下广播变量就是加载参数表,参数表不会变化,记住第二部分常用总结公式即可。

 

/** * @author 大数据江湖 * @version 1.0 * @date 2021/5/17. * */public class BaseBroadCast {    /**     * broadcast广播变量     * 需求:     *  flink会从数据源中获取到用户的姓名     *  最终需要把用户的姓名和年龄信息打印出来     *  分析:     *  所以就需要在中间的map处理的时候获取用户的年龄信息     *  建议吧用户的关系数据集使用广播变量进行处理     *     */    public static void main(String[] args) throws Exception {        //获取运行环境        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        //1:准备需要广播的数据        ArrayList
> broadData = new ArrayList<>(); broadData.add(new Tuple2<>("zs", 18)); broadData.add(new Tuple2<>("ls", 20)); broadData.add(new Tuple2<>("ww", 17)); DataSet
> tupleData = env.fromCollection(broadData); //1.1:处理需要广播的数据,把数据集转换成map类型,map中的key就是用户姓名,value就是用户年龄 DataSet
> toBroadcast = tupleData.map(new MapFunction
, HashMap
>() { @Override public HashMap
map(Tuple2
value) throws Exception { HashMap
res = new HashMap<>(); res.put(value.f0, value.f1); return res; } }); //源数据 DataSource
data = env.fromElements("zs", "ls", "ww"); //注意:在这里需要使用到RichMapFunction获取广播变量 DataSet
result = data.map(new RichMapFunction
() { List
> broadCastMap = new ArrayList
>(); HashMap
allMap = new HashMap
(); /** * 这个方法只会执行一次 * 可以在这里实现一些初始化的功能 * 所以,就可以在open方法中获取广播变量数据 */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //3:获取广播数据 this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName"); for (HashMap map : broadCastMap) { allMap.putAll(map); } } @Override public String map(String value) throws Exception { Integer age = allMap.get(value); return value + "," + age; } }).withBroadcastSet(toBroadcast, "broadCastMapName");//2:执行广播数据的操作 result.print(); }}

 

 

生产案例演示

 

 

实际生产中有时候是需要更新广播变量的,但不是实时更新的,一般会设置一个更新周期,几分钟,几小时的都很常见,根据业务而定。

 

由于广播变量需要更新,解决办法一般是需要将广播变量做成另一个 source,进行流与流之间的 connect 操作,定时刷新广播的source,从而达到广播变量修改的目的。

 

4.1.1 使用 redis 中的数据作为广播变量的思路:

 

消费 kafka 中的数据,使用 redis 中的数据作为广播数据,进行数据清洗后 写到 kafka中。

 

示例代码分为三个部分:kafka 生产者,redis 广播数据源,执行入口类

 

  • 构建 kafka 生成者,模拟数据 (以下代码的消费消息来源均是此处生产)

 

/** * 模拟数据源 */public class kafkaProducer {    public static void main(String[] args) throws Exception{        Properties prop = new Properties();        //指定kafka broker地址        prop.put("bootstrap.servers", "10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092");        //指定key value的序列化方式        prop.put("key.serializer", StringSerializer.class.getName());        prop.put("value.serializer", StringSerializer.class.getName());        //指定topic名称        String topic = "data_flink_bigdata_test";        //创建producer链接        KafkaProducer
producer = new KafkaProducer
(prop); //{"dt":"2018-01-01 10:11:11","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.2,"level":"B"}]} while(true){ String message = "{\"dt\":\""+getCurrentTime()+"\",\"countryCode\":\""+getCountryCode()+"\",\"data\":[{\"type\":\""+getRandomType()+"\",\"score\":"+getRandomScore()+",\"level\":\""+getRandomLevel()+"\"},{\"type\":\""+getRandomType()+"\",\"score\":"+getRandomScore()+",\"level\":\""+getRandomLevel()+"\"}]}"; System.out.println(message); //同步的方式,往Kafka里面生产数据 producer.send(new ProducerRecord
(topic,message)); Thread.sleep(2000); } //关闭链接 //producer.close(); } public static String getCurrentTime(){ SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss"); return sdf.format(new Date()); } public static String getCountryCode(){ String[] types = {"US","TW","HK","PK","KW","SA","IN"}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; } public static String getRandomType(){ String[] types = {"s1","s2","s3","s4","s5"}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; } public static double getRandomScore(){ double[] types = {0.3,0.2,0.1,0.5,0.8}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; } public static String getRandomLevel(){ String[] types = {"A","A+","B","C","D"}; Random random = new Random(); int i = random.nextInt(types.length); return types[i]; }}

 

 

 

  • redis 数据作为广播数据

 

/** * redis中准备的数据源 * source: * * hset areas AREA_US US * hset areas AREA_CT TW,HK * hset areas AREA_AR PK,KW,SA * hset areas AREA_IN IN * * result: * * HashMap * * US,AREA_US * TW,AREA_CT * HK,AREA_CT * */public class BigDataRedisSource implements SourceFunction
> { private Logger logger= LoggerFactory.getLogger(BigDataRedisSource.class); private Jedis jedis; private boolean isRunning=true; @Override public void run(SourceContext
> cxt) throws Exception { this.jedis = new Jedis("localhost",6379); HashMap
map = new HashMap<>(); while(isRunning){ try{ map.clear(); Map
areas = jedis.hgetAll("areas"); /** * AREA_CT TT,AA * * map: * TT,AREA_CT * AA,AREA_CT */ for(Map.Entry
entry: areas.entrySet()){ String area = entry.getKey(); String value = entry.getValue(); String[] fields = value.split(","); for(String country:fields){ map.put(country,area); } } if(map.size() > 0 ){ cxt.collect(map); } Thread.sleep(60000); }catch (JedisConnectionException e){ logger.error("redis连接异常",e.getCause()); this.jedis = new Jedis("localhost",6379); }catch (Exception e){ logger.error("数据源异常",e.getCause()); } } } @Override public void cancel() { isRunning=false; if(jedis != null){ jedis.close(); } }}

 

 
  • 程序入口类

 

/** * @author 大数据江湖 * @version 1.0 * @date 2021/4/25. * * * 使用 kafka 输出流和 redis 输出流 进行合并清洗 * * */public class 广播方式1分两个流进行connnect操作 {    public static void main(String[] args) throws Exception {        //1 获取执行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(3);//并行度取决于 kafka 中的分区数 保持与kafka 一致        //2 设置 checkpoint        //开启checkpoint 一分钟一次        env.enableCheckpointing(60000);        //设置checkpoint 仅一次语义        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);        //两次checkpoint的时间间隔        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);        //最多只支持1个checkpoint同时执行        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);        //checkpoint超时的时间        env.getCheckpointConfig().setCheckpointTimeout(60000);        // 任务失败后也保留 checkPonit数据        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(                3, // 尝试重启的次数                Time.of(10, TimeUnit.SECONDS) // 间隔        ));        // 设置 checkpoint 路径       // env.setStateBackend(new FsStateBackend("hdfs://192.168.123.103:9000/flink/checkpoint"));        //3 设置 kafka Flink 消费        //创建 Kafka 消费信息        String topic="data_flink_bigdata_test";        Properties consumerProperties = new Properties();        consumerProperties.put("bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092");        consumerProperties.put("group.id","data_test_new_1");        consumerProperties.put("enable.auto.commit", "false");        consumerProperties.put("auto.offset.reset","earliest");        //4 获取 kafka 与 redis 数据源        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer
(topic, new SimpleStringSchema(), consumerProperties); DataStreamSource
kafkaSourceData = env.addSource(consumer); //直接使用广播的方式 后续作为两个数据流来操作 DataStream
> redisSourceData = env.addSource(new NxRedisSource()).broadcast(); //5 两个数据源进行 ETL 处理 使用 connect 连接处理 SingleOutputStreamOperator
etlData = kafkaSourceData.connect(redisSourceData).flatMap(new MyETLProcessFunction()); //6 新创建一个 kafka 生产者 进行发送 String outputTopic="allDataClean"; // 输出给下游 kafka Properties producerProperties = new Properties(); producerProperties.put("bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"); FlinkKafkaProducer
producer = new FlinkKafkaProducer<>(outputTopic, new KeyedSerializationSchemaWrapper
(new SimpleStringSchema()), producerProperties); etlData.addSink(producer); //7 提交任务执行 env.execute("DataClean"); } /** * in 1 kafka source : * * {"dt":"2018-01-01 10:11:11","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.2,"level":"B"}]} * * * in 2 redis source * * * US,AREA_US * TW,AREA_CT * HK,AREA_CT * * * * out 合并后的source */ private static class MyETLProcessFunction implements CoFlatMapFunction
,String> { //用来存储 redis 中的数据 HashMap
allMap = new HashMap
(); @Override public void flatMap1(String line, Collector
collector) throws Exception { //将 kafka 数据 按 redis 数据进行替换 // s -> kafka 数据 //allMap -> redis 数据 JSONObject jsonObject = JSONObject.parseObject(line); String dt = jsonObject.getString("dt"); String countryCode = jsonObject.getString("countryCode"); //可以根据countryCode获取大区的名字 String area = allMap.get(countryCode); JSONArray data = jsonObject.getJSONArray("data"); for (int i = 0; i < data.size(); i++) { JSONObject dataObject = data.getJSONObject(i); System.out.println("大区:"+area); dataObject.put("dt", dt); dataObject.put("area", area); //下游获取到数据的时候,也就是一个json格式的数据 collector.collect(dataObject.toJSONString()); } } @Override public void flatMap2(HashMap
stringStringHashMap, Collector
collector) throws Exception { //将 redis 中 数据进行赋值 allMap = stringStringHashMap; } }}

 

 

 

4.1.2 使用 MapState 进行广播程序优化:

 

优化的点在于 (下面代码中 TODO 标识点):

 

  1. 进行数据广播时需要使用 MapStateDescriptor 进行注册

     

  2. 进行两个流合并处理时 使用 process 函数

     

  3. 处理函数中使用 MapState  来存取 redis 中的数据

 

/** * @author 大数据江湖 * @version 1.0 * @date 2021/4/25. * 

* 使用 kafka 输出流和 redis 输出流 进行合并清洗 *

* 线上使用的方式 */public class 广播方式2使用MapState对方式1改造 { public static void main(String[] args) throws Exception { //1 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3);//并行度取决于 kafka 中的分区数 保持与kafka 一致 //2 设置 checkpoint //开启checkpoint 一分钟一次 env.enableCheckpointing(60000); //设置checkpoint 仅一次语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //两次checkpoint的时间间隔 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); //最多只支持1个checkpoint同时执行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //checkpoint超时的时间 env.getCheckpointConfig().setCheckpointTimeout(60000); // 任务失败后也保留 checkPonit数据 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 尝试重启的次数 Time.of(10, TimeUnit.SECONDS) // 间隔 )); // 设置 checkpoint 路径 //env.setStateBackend(new FsStateBackend("hdfs://192.168.123.103:9000/flink/checkpoint")); //3 设置 kafka Flink 消费 //创建 Kafka 消费信息 String topic = "data_flink_bigdata_test"; Properties consumerProperties = new Properties(); consumerProperties.put("bootstrap.servers", "10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"); consumerProperties.put("group.id", "data_flink_fpy_test_consumer"); consumerProperties.put("enable.auto.commit", "false"); consumerProperties.put("auto.offset.reset", "earliest"); //4 获取 kafka 与 redis 数据源 FlinkKafkaConsumer consumer = new FlinkKafkaConsumer

(topic, new SimpleStringSchema(), consumerProperties); DataStreamSource
kafkaSourceData = env.addSource(consumer); // 获取 redis 数据源并且进行广播 线上的广播也是 source + 广播方法 MapStateDescriptor
descriptor = new MapStateDescriptor
( "RedisBdStream", String.class, String.class ); //5 两个数据源进行 ETL 处理 使用 connect 连接处理 TODO process 替换 FlatMap //TODO 使用 MapState 来进行广播 BroadcastStream
> redisSourceData = env.addSource(new NxRedisSource()).broadcast(descriptor); SingleOutputStreamOperator
etlData = kafkaSourceData.connect(redisSourceData).process(new MyETLProcessFunction()); //6 新创建一个 kafka 生产者 进行发送 String outputTopic = "allDataClean"; // 输出给下游 kafka Properties producerProperties = new Properties(); producerProperties.put("bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"); FlinkKafkaProducer
producer = new FlinkKafkaProducer<>(outputTopic, new KeyedSerializationSchemaWrapper
(new SimpleStringSchema()), producerProperties); etlData.addSink(producer); etlData.print(); //7 提交任务执行 env.execute("DataClean"); } /** * in 1 kafka source * in 2 redis source *

* out 合并后的source */ private static class MyETLProcessFunction extends BroadcastProcessFunction

, String> { // TODO 注意此处 descriptor 的名称需要与 广播时 (99行代码) 名称一致 MapStateDescriptor
descriptor = new MapStateDescriptor
( "RedisBdStream", String.class, String.class ); //逻辑的处理方法 kafka 的数据 @Override public void processElement(String line, ReadOnlyContext readOnlyContext, Collector
collector) throws Exception { //将 kafka 数据 按 redis 数据进行替换 // s -> kafka 数据 //allMap -> redis 数据 System.out.println("into processElement "); JSONObject jsonObject = JSONObject.parseObject(line); String dt = jsonObject.getString("dt"); String countryCode = jsonObject.getString("countryCode"); //可以根据countryCode获取大区的名字 // String area = allDataMap.get(countryCode); //TODO 从MapState中获取对应的Code String area = readOnlyContext.getBroadcastState(descriptor).get(countryCode); JSONArray data = jsonObject.getJSONArray("data"); for (int i = 0; i < data.size(); i++) { JSONObject dataObject = data.getJSONObject(i); System.out.println("大区:" + area); dataObject.put("dt", dt); dataObject.put("area", area); //下游获取到数据的时候,也就是一个json格式的数据 collector.collect(dataObject.toJSONString()); } } //广播流的处理方法 @Override public void processBroadcastElement(HashMap
stringStringHashMap, Context context, Collector
collector) throws Exception { // 将接收到的控制数据放到 broadcast state 中 //key , flink // 将 RedisMap中的值放入 MapState 中 for (Map.Entry
entry : stringStringHashMap.entrySet()) { //TODO 使用 MapState 存储 redis 数据 context.getBroadcastState(descriptor).put(entry.getKey(), entry.getValue()); System.out.println(entry); } } }}

 

 

4.2 关系型数据库广播变量案例思路:

 

需求:

 

在 flink 流式处理中常常需要加载数据库中的数据作为条件进行数据处理,有些表作为系统表,实时查询效率很低,这时候就需要将这些数据作为广播数据,而同时这些数据可能也需要定期的更新。

 

思路:

 

数据库表的广播变量思路同redis等缓存广播数据的思路类似,也是使用 两个source 进行 connect 处理 , 在数据库表的 source 中定时刷新数据就可以了。

 

不同点在于这里把数据库查询的操作转成另一个工具类,在初始化时使用了静态代码块,在广播时使用了流的 connect 操作。

 

示例代码分为三个部分:数据库表广播源,数据库操作类,执行入口类

 

  • 数据库表广播源

 

/** * @author 大数据江湖 * @Date:2021-5-17 * DB source 源头 进行广播 */public class BigDataDBBroadSource extends RichSourceFunction
> { private final Logger logger = LoggerFactory.getLogger(BigDataDBBroadSource.class); private volatile boolean isRunning = true; public BigDataDBBroadSource() { } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } @Override public void run(SourceContext
> sourceContext) throws Exception { while (isRunning) { //TODO 使用的是一个 DB 源头的 source 60 s 刷新一次 进行往下游发送 TimeUnit.SECONDS.sleep(60); Map
map = new HashMap
(); //规则匹配关键词 final DbBroadCastListInitUtil.Build ruleListInitUtil = new DbBroadCastListInitUtil.Build(); ruleListInitUtil.reloadRule(); map.put("dbsource", ruleListInitUtil); if(map.size() > 0) { sourceContext.collect(map); } } } @Override public void cancel() { this.isRunning = false; } @Override public void close() throws Exception { super.close(); }}
 

 

  • 执行数据库操作类

 

/** * 数据库规则表初始化 * * @author 大数据江湖 * @Date:2021-5-17 * * US,AREA_US * TW,AREA_CT * HK,AREA_CT * */public class DbBroadCastListInitUtil implements Serializable {    private static final Logger LOG = LoggerFactory.getLogger(DbBroadCastListInitUtil.class);    // 数据库规则信息    public static Map
areasMap = new HashMap
(); static { LOG.info("初始化 db 模块"); Connection dbConn = null; try { if (dbConn == null || dbConn.isClosed()) { LOG.info("init dbConn start...."); LOG.info("init dbConn end...."); } HashMap
map = Maps.newHashMap(); map.put("US","AREA_US"); map.put("TW","AREA_CT"); map.put("HK","AREA_CT"); areasMap = map; } catch (Exception e) { LOG.error("init database [status:error]", e); throw new RuntimeException(" static article rule list db select error! , "+e.getMessage()) ; } finally { if(dbConn != null) { try { dbConn.close(); } catch (SQLException e) { LOG.error("dbConn conn close error!",e); } } } } public static class Build { // 数据库规则信息 public static Map
newAreasMap = new HashMap
(); public void reloadRule() throws Exception { LOG.info("重新初始化 DB reloadRule 模块"); Connection dbConn = null; try { if (dbConn == null || dbConn.isClosed()) { LOG.info("init dbConn start...."); LOG.info("init dbConn end...."); } HashMap
map = Maps.newHashMap(); map.put("US","AREA_US"); map.put("TW","AREA_CT"); map.put("HK","AREA_CT"); map.put("AM","AREA_CT"); newAreasMap = map; } catch (Exception e) { LOG.error("init database [status:error]", e); throw e; } finally { if(dbConn != null) { try { dbConn.close(); } catch (SQLException e) { LOG.error("dbConn conn close error!",e); } } } } public static Map
getNewAreasMap() { return newAreasMap; } } public static Build build() throws Exception { final DbBroadCastListInitUtil.Build build = new DbBroadCastListInitUtil.Build(); build.reloadRule(); return build; }}

 

 
  • 程序入口类

 

/** * @author 大数据江湖 * @version 1.0 * @date 2021/4/25. * 

* 使用 kafka 输出流和 redis 输出流 进行合并清洗 *

* 线上使用的方式 */public class 广播方式3使用DB对方式广播 { public static void main(String[] args) throws Exception { //1 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3);//并行度取决于 kafka 中的分区数 保持与kafka 一致 //2 设置 checkpoint //开启checkpoint 一分钟一次 env.enableCheckpointing(60000); //设置checkpoint 仅一次语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //两次checkpoint的时间间隔 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); //最多只支持1个checkpoint同时执行 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //checkpoint超时的时间 env.getCheckpointConfig().setCheckpointTimeout(60000); // 任务失败后也保留 checkPonit数据 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 尝试重启的次数 Time.of(10, TimeUnit.SECONDS) // 间隔 )); // 设置 checkpoint 路径 //env.setStateBackend(new FsStateBackend("hdfs://192.168.123.103:9000/flink/checkpoint")); //3 设置 kafka Flink 消费 //创建 Kafka 消费信息 String topic = "data_flink_bigdata_test"; Properties consumerProperties = new Properties(); consumerProperties.put("bootstrap.servers", "10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"); consumerProperties.put("group.id", "data_flink_bigdata_test_consumer"); consumerProperties.put("enable.auto.commit", "false"); consumerProperties.put("auto.offset.reset", "earliest"); //4 获取 kafka 与 redis 数据源 FlinkKafkaConsumer consumer = new FlinkKafkaConsumer

(topic, new SimpleStringSchema(), consumerProperties); DataStreamSource
kafkaSourceData = env.addSource(consumer); // 获取 redis 数据源并且进行广播 线上的广播也是 source + 广播方法 MapStateDescriptor
descriptor = new MapStateDescriptor
( "RedisBdStream", String.class, String.class ); //使用 数据库源 来进行广播 BroadcastStream
> broadcast = env.addSource(new BigDataDBBroadSource()).broadcast(descriptor); //5 两个数据源进行 ETL 处理 使用 connect 连接处理 数据库表信息进行广播 SingleOutputStreamOperator
etlData = kafkaSourceData.connect(broadcast).process(new MyETLProcessFunction()); //6 新创建一个 kafka 生产者 进行发送 String outputTopic = "allDataClean"; // 输出给下游 kafka /* Properties producerProperties = new Properties(); producerProperties.put("bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092"); FlinkKafkaProducer
producer = new FlinkKafkaProducer<>(outputTopic, new KeyedSerializationSchemaWrapper
(new SimpleStringSchema()), producerProperties); etlData.addSink(producer);*/ etlData.print(); //7 提交任务执行 env.execute("DataClean"); } /** * in 1 kafka source * in 2 redis source *

* out 合并后的source * * * TODO 程序启动后发生的事: * * 1 运行 open 方法 ,触发静态方法给 areasMap 赋值 * 2 运行 processElement 方法前, areasMap 肯定是值的,正常进行处理 * 3 当到 BigDataDBBroadSource 轮训的时间后 ,刷新数据库表数据到 areasMap ,此时 areasMap 会加入新值,完成广播变量的更新 * 4 广播变量更新后 继续进行 processElement 数据处理 * */ private static class MyETLProcessFunction extends BroadcastProcessFunction

, String> { public Map
areasMap = new HashMap
(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //触发静态方法去赋值 areasMap = DbBroadCastListInitUtil.areasMap; } //逻辑的处理方法 kafka 的数据 @Override public void processElement(String line, ReadOnlyContext readOnlyContext, Collector
collector) throws Exception { //将 kafka 数据 按 redis 数据进行替换 // s -> kafka 数据 //allMap -> redis 数据 System.out.println("into processElement "); JSONObject jsonObject = JSONObject.parseObject(line); String dt = jsonObject.getString("dt"); String countryCode = jsonObject.getString("countryCode"); //可以根据countryCode获取大区的名字 // String area = allDataMap.get(countryCode); //从MapState中获取对应的Code String area =areasMap.get(countryCode); JSONArray data = jsonObject.getJSONArray("data"); for (int i = 0; i < data.size(); i++) { JSONObject dataObject = data.getJSONObject(i); System.out.println("大区:" + area); dataObject.put("dt", dt); dataObject.put("area", area); //下游获取到数据的时候,也就是一个json格式的数据 collector.collect(dataObject.toJSONString()); } } @Override public void processBroadcastElement(Map
value, Context ctx, Collector
out) throws Exception { //广播算子定时刷新后 将数据发送到下游 if (value != null && value.size() > 0) { Object obj = value.getOrDefault("dbsource", null); if (obj != null) { DbBroadCastListInitUtil.Build biulder = (DbBroadCastListInitUtil.Build) obj; //更新了 数据库数据 areasMap = biulder.getNewAreasMap(); System.out.println("数据库刷新算子运行完成!"); } } } }}

 

 

 

注意看最后处理函数启动后发生的事:

     

  1.  运行 open 方法 ,触发数据库操作工具类静态方法给 areasMap 赋值

     

  2.  运行执行类 processElement 方法前,此时 areasMap 肯定是值的,正常进行处理

     

  3.  当到数据库源轮训的时间后 ,刷新数据库表数据到 areasMap ,此时 areasMap 会加入新值,完成广播变量的更新

     

  4.  广播变量更新后 继续进行执行类 processElement 数据处理

 

 

 


 

至此 广播程序的使用介绍完了, 对于广播数据不需要改变的情况 参考基础样例;对于从缓存或数据库等获取广播变量,同时又需要改变的情况,参考生成样例即可。

 

PS:  文中代码地址  ----   https://gitee.com/fanpengyi0922/flink-window-broadcast

 

 

   THE END  

 

上一篇:大数据存储利器 - Hbase 基础图解
下一篇:图说线性表-搞懂链表从这篇文章开始

发表评论

最新留言

路过,博主的博客真漂亮。。
[***.116.15.85]2025年05月11日 03时47分06秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

element-plus修改主题颜色 2023-01-24
element-plus的el-date-picker日期范围选择控件,根据开始日期限定结束日期的可选范围为开始日期到开始日期+30天 2023-01-24
18 个一线工作中常用 Shell 脚本【实用版】 2023-01-24
element-ui:el-input输入数字-整数和小数 2023-01-24
ElementUI-el-progress改变进度条颜色跟文字样式 2023-01-24
element事件(change,click)不触发 2023-01-24
10个高级的 SQL 查询技巧,你掌握了几个? 2023-01-24
ELK原理与介绍(转) 2023-01-24
ELK学习笔记(三)单台服务器多节点部署 2023-01-24
ELK应用日志收集实战 2023-01-24
elTable火狐浏览器换行 2023-01-24
15个Python数据处理技巧(非常详细)零基础入门到精通,收藏这一篇就够了 2023-01-24
2023年深信服、奇安信、360等大厂网络安全校招面试真题合集(附答案),让你面试轻松无压力! 2023-01-24
2024年全国程序员平均薪资排名:同样是程序员,为什么差这么多?零基础到精通,收藏这篇就够了 2023-01-24
0基础成功转行网络安全工程师,年薪30W+,经验总结都在这(建议收藏) 2023-01-24
100个电脑常用组合键大全(非常详细)零基础入门到精通,收藏这篇就够了 2023-01-24
10个程序员可以接私活的平台 2023-01-24
10个程序员可以接私活的平台(非常详细)零基础入门到精通,收藏这篇就够了 2023-01-24
10个运维拿来就用的 Shell 脚本,用了才知道有多爽,零基础入门到精通,收藏这一篇就够了 2023-01-24
10条sql语句优化的建议 2023-01-24