本文共 29225 字,大约阅读时间需要 97 分钟。
第四章 Flink 运行架构
4.1 Flink 运行时的组件
作业管理器(JobManager)
任务管理器(TaskManager)
资源管理器(ResourceManager)
分发器(Dispatcher)
4.2 任务提交流程
任务提交流程(YARN: per-job模式)
4.3 任务调度原理
4.3.1 TaskManger 与 Slots
推荐使用当前机器cpu 核数来设置TaskManager对用的TaskSlot数
4.3.2 程序与数据流(DataFlow)
4.3.3 执行图(ExecutionGraph)
client生成的优化依据是根据算子之前数据传输的形式和并行度,两个算子的数据依赖是One-to-one(在依赖)且并行度相同,可以优化合并到一起最后会被分到一个TaskSlot中4.3.4 并行度(Parallelism)
算子之间数据的传输形式:
一个程序中,不同的算子可能具有不同的并行度
4.3.5 任务链(Operator Chains)
client生成的优化依据是根据算子之前数据传输的形式和并行度,两个算子的数据依赖是One-to-one(在依赖)且并行度相同,可以优化合并到一起最后会被分到一个TaskSlot中 优化之后只有五个任务,且最大算子的并行度是2,所有需要2个TaskSlot就可以运行任务。第五章 Flink 流处理 API
5.1 Environment
5.2 Source
5.2.1 从集合读取数据
public class SourceTest1_Collection { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 1.Source:从集合读取数据 DataStreamsensorDataStream = env.fromCollection( Arrays.asList( new SensorReading("sensor_1", 1547718199L, 35.8), new SensorReading("sensor_6", 1547718201L, 15.4), new SensorReading("sensor_7", 1547718202L, 6.7), new SensorReading("sensor_10", 1547718205L, 38.1) ) ); // 2.打印 sensorDataStream.print(); // 3.执行 env.execute(); } }
5.2.2 从文件读取数据
DataStreamdataStream = env.readTextFile("YOUR_FILE_PATH ");
5.2.3 以 kafka 消息队列的数据作为来源
需要引入 kafka 连接器的依赖:
pom.xmlorg.apache.flink flink-connector-kafka-0.11_2.12 1.10.1
具体代码如下:
// kafka 配置项Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");// 从 kafka 读取数据DataStreamdataStream = env.addSource( new FlinkKafkaConsumer011 ("sensor", new SimpleStringSchema(), properties));
5.2.4 自定义 Source
除了以上的 source 数据来源,我们还可以自定义 source。需要做的,只是传入
一个 SourceFunction 就可以。具体调用如下:DataStreamdataStream = env.addSource( new MySensor());
我们希望可以随机生成传感器数据,MySensorSource 具体的代码实现如下:
public static class MySensor implements SourceFunction{ private boolean running = true; public void run(SourceContext ctx) throws Exception { Random random = new Random(); HashMap sensorTempMap = new HashMap (); for( int i = 0; i < 10; i++ ){ sensorTempMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20); } while (running) { for( String sensorId: sensorTempMap.keySet() ){ Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian(); sensorTempMap.put(sensorId, newTemp); ctx.collect( new SensorReading(sensorId, System.currentTimeMillis(), newTemp)); } Thread.sleep(1000L); } } public void cancel() { this.running = false; } }
5.3 Transform
普通转换算子
5.3.1 map
DataStreammapStram = dataStream.map(new MapFunction () { public Integer map(String value) throws Exception { return value.length(); }});
5.3.2 flatMap
DataStreamflatMapStream = dataStream.flatMap(new FlatMapFunction () { public void flatMap(String value, Collector out) throws Exception { String[] fields = value.split(","); for( String field: fields ) out.collect(field); }});
5.3.3 Filter
DataStreamfilterStream = dataStream.filter(new FilterFunction () { public boolean filter(String value) throws Exception { return value == 1; }});
package com.atguigu.apitest.transform;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved ** Project: FlinkTutorial * Package: com.atguigu.apitest.transform * Version: 1.0 *
* Created by wushengran on 2020/11/7 14:28 */import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * @ClassName: TransformTest1_Base * @Description: * @Author: wushengran on 2020/11/7 14:28 * @Version: 1.0 */public class TransformTest1_Base {
public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件读取数据 DataStreaminputStream = env.readTextFile("data/sensor.txt"); // 1. map,把String转换成长度输出 DataStream mapStream = inputStream.map(new MapFunction () { @Override public Integer map(String value) throws Exception { return value.length(); } }); // 2. flatmap,按逗号分字段 DataStream flatMapStream = inputStream.flatMap(new FlatMapFunction () { @Override public void flatMap(String value, Collector out) throws Exception { String[] fields = value.split(","); for( String field: fields ) out.collect(field); } }); // 3. filter, 筛选sensor_1开头的id对应的数据 DataStream filterStream = inputStream.filter(new FilterFunction () { @Override public boolean filter(String value) throws Exception { return value.startsWith("sensor_1"); } }); // 打印输出 mapStream.print("map"); flatMapStream.print("flatMap"); filterStream.print("filter"); env.execute(); }}
5.3.4 KeyBy
根据key的hash值进行分区,key hash之后再根据分区数取模确定分区号,同一个key的数据一定会放在一个分区中,由于hash碰撞同一个分区中可以存在多个key的数据。
DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分 区包含具有相同 key 的元素,在内部以 hash 的形式实现的。fink api 中dataStream只有keyBy变成KeyedStream之后才能做聚合操作
package com.atguigu.apitest.transform;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved ** Project: FlinkTutorial * Package: com.atguigu.apitest.transform * Version: 1.0 *
* Created by wushengran on 2020/11/7 15:09 */import com.atguigu.apitest.beans.SensorReading;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import sun.awt.SunHints;import javax.xml.crypto.Data;/** * @ClassName: TransformTest2_RollingAggregation * @Description: * @Author: wushengran on 2020/11/7 15:09 * @Version: 1.0 */public class TransformTest2_RollingAggregation {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); // 从文件读取数据 DataStreaminputStream = env.readTextFile("data/sensor.txt"); // 转换成SensorReading类型// DataStream dataStream = inputStream.map(new MapFunction () { // @Override// public SensorReading map(String value) throws Exception { // String[] fields = value.split(",");// return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));// }// }); DataStream dataStream = inputStream.map( line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); } ); // 分组 KeyedStream keyedStream = dataStream.keyBy("id"); KeyedStream keyedStream1 = dataStream.keyBy(data -> data.getId()); // KeyedStream keyedStream2 = dataStream.keyBy(SensorReading::getId); DataStream dataStream1 = env.fromElements(1L, 34L, 4L, 657L, 23L); KeyedStream keyedStream2 = dataStream1.keyBy(new KeySelector () { @Override public Integer getKey(Long value) throws Exception { return value.intValue() % 2; } }); // KeyedStream keyedStream1 = dataStream.keyBy(SensorReading::getId); // 滚动聚合,取当前最大的温度值 DataStream resultStream = keyedStream.maxBy("temperature"); resultStream.print("result"); // keyedStream.print("key"); // keyedStream1.print("key1"); // keyedStream2.sum(0).print("key2"); env.execute(); }}
聚合转换算子
5.3.5 滚动聚合算子(Rolling Aggregation)
5.3.6 Reduce
package com.atguigu.apitest.transform;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved ** Project: FlinkTutorial * Package: com.atguigu.apitest.transform * Version: 1.0 *
* Created by wushengran on 2020/11/7 15:39 */import com.atguigu.apitest.beans.SensorReading;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @ClassName: TransformTest3_Reduce * @Description: * @Author: wushengran on 2020/11/7 15:39 * @Version: 1.0 */public class TransformTest3_Reduce {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件读取数据 DataStreaminputStream = env.readTextFile("data/sensor.txt"); // 转换成SensorReading类型 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 分组 KeyedStream keyedStream = dataStream.keyBy("id"); // reduce聚合,取最大的温度值,以及当前最新的时间戳 SingleOutputStreamOperator resultStream = keyedStream.reduce(new ReduceFunction () { @Override public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception { return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature())); } }); //labmer/* keyedStream.reduce( (curState, newData) -> { return new SensorReading(curState.getId(), newData.getTimestamp(), Math.max(curState.getTemperature(), newData.getTemperature())); });*/ resultStream.print(); env.execute(); }}
多流转换算子
5.3.7 Split + Select process
flink1.12及新版
Side Outputs:可以使用process方法对流中数据进行处理,并针对不同的处理结果将数据收集到不同的OutputTag中import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.ProcessFunction;import org.apache.flink.util.Collector;import org.apache.flink.util.OutputTag;/** * Author itcast * Desc 演示DataStream-Transformation-拆分(split)和选择(select)操作 * 注意split和select在flink1.12中已经过期并移除了 * 所以得使用outPutTag和process来实现 * 需求:对流中的数据按照奇数和偶数拆分并选择 */public class TransformationDemo03 { public static void main(String[] args) throws Exception { //TODO 0.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //TODO 1.source DataStreamSourceds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); //TODO 2.transformation //需求:对流中的数据按照奇数和偶数拆分并选择 OutputTag oddTag = new OutputTag<>("奇数", TypeInformation.of(Integer.class)); OutputTag evenTag = new OutputTag<>("偶数",TypeInformation.of(Integer.class)); /* public abstract class ProcessFunction extends AbstractRichFunction { public abstract void processElement(I value, ProcessFunction.Context ctx, Collector out) throws Exception; } */ SingleOutputStreamOperator result = ds.process(new ProcessFunction () { @Override public void processElement(Integer value, Context ctx, Collector out) throws Exception { //out收集完的还是放在一起的,ctx可以将数据放到不同的OutputTag if (value % 2 == 0) { ctx.output(evenTag, value); } else { ctx.output(oddTag, value); } } }); DataStream oddResult = result.getSideOutput(oddTag); DataStream evenResult = result.getSideOutput(evenTag); //TODO 3.sink System.out.println(oddTag);//OutputTag(Integer, 奇数) System.out.println(evenTag);//OutputTag(Integer, 偶数) oddResult.print("奇数:");//打印时添加前缀 evenResult.print("偶数:"); //TODO 4.execute env.execute(); }}
旧版
Split就是将一个流分成多个流 Select就是获取分流后对应的数据 拆分(split)和选择(select)操作 注意split和select在flink1.12中已经过期并移除了 所以得使用outPutTag和process来实现需求:
对流中的数据按照奇数和偶数进行分流,并获取分流后的数据Split 切割
SelectSplitStreamsplitStream = dataStream.split(new OutputSelector () { @Override public Iterable select(SensorReading value) { return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low"); }});DataStream highTempStream = splitStream.select("high");DataStream lowTempStream = splitStream.select("low");DataStream allTempStream = splitStream.select("high", "low");
5.3.8 Connect 和 CoMap
Connect + CoMap将两条流合成一条流
**Connect ** Connect 虽然放到一起其还是两条流,两条流的数据类型可以不一致,connect之后需要再comap之后才合并成真正的一条流ConnectedStreams, SensorReading> connectedStreams = warningStream.connect(lowTempStream); DataStream resultStream = connectedStreams.map(new CoMapFunction , SensorReading, Tuple2>() { @Override public Tuple2 map1(Tuple2 value) throws Exception { return new Tuple2<>(value.f0, "high temp warning"); } @Override public Tuple2 map2(SensorReading value) throws Exception { return new Tuple2<>(value.getId(), "high temp warning"); } });
DataStream> warningStream = highTempStream.map(new MapFunction >() { @Override public Tuple2 map(SensorReading value) throws Exception { return new Tuple2<>(value.getId(), value.getTemperature()); }});ConnectedStreams , SensorReading> connectedStreams = warningStream.connect(lowTempStream);DataStream
5.3.9 Union
DataStreamunionStream = highTempStream.union(lowTempStream);
目前Flink支持8种分区策略:
global: 数据会被分发到下游算子的第一个实例中进行处理。 shuffle :数据会被随机分发到下游算子的每一个实例中进行。 rebalance: 数据会被循环发送到下游的每一个实例中进行处理。 rescale :这种分区器会根据上下游算子的并行度,循环的方式输出到下游算子的每个实例。这里有点难以理解,假设上游并行度为 2,编号为 A 和 B。下游并行度为 4,编号为 1,2,3,4。那么 A 则把数据循环发送给 1 和 2,B 则把数据循环发送给 3 和 4。假设上游并行度为 4,编号为 A,B,C,D。下游并行度为 2,编号为 1,2。那么 A 和 B 则把数据发送给 1,C 和 D 则把数据发送给 2。 broadcastr :广播分区会将上游数据输出到下游算子的每个实例中。适合于大数据集和小数据集做Jion的场景。 forward:用于将记录输出到下游本地的算子实例。它要求上下游算子并行度一样。简单的说,ForwardPartitioner用来做数据的控制台打印。 keyBy分区器。会将数据按Key的Hash值输出到下游算子实例中。 partitionCustomr:用户自定义分区器。需要用户自己实现 Partitioner 接口,来定义自己的分区逻辑。package com.atguigu.apitest.transform;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved ** Project: FlinkTutorial * Package: com.atguigu.apitest.transform * Version: 1.0 *
* Created by wushengran on 2020/11/9 10:14 */import com.atguigu.apitest.beans.SensorReading;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @ClassName: TransformTest6_Partition * @Description: * @Author: wushengran on 2020/11/9 10:14 * @Version: 1.0 */public class TransformTest6_Partition {
public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); // 从文件读取数据 DataStreaminputStream = env.readTextFile("data/sensor.txt"); // 转换成SensorReading类型 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); dataStream.print("input"); // 1. shuffle DataStream shuffleStream = inputStream.shuffle();// shuffleStream.print("shuffle"); // 2. keyBy// dataStream.keyBy("id").print("keyBy"); // 3. global dataStream.global().print("global"); env.execute(); }}
5.4 支持的数据类型
5.5 实现 UDF 函数——更细粒度的控制流
5.5.1 函数类(Function Classes)
DataStreamflinkTweets = tweets.filter(new FlinkFilter());public static class FlinkFilter implements FilterFunction { @Override public boolean filter(String value) throws Exception { return value.contains("flink"); }}
还可以将函数实现成匿名类
DataStreamflinkTweets = tweets.filter(new FilterFunction () { @Override public boolean filter(String value) throws Exception { return value.contains("flink"); }});
我们 filter 的字符串"flink"还可以当作参数传进去。
DataStreamtweets = env.readTextFile("INPUT_FILE ");DataStream flinkTweets = tweets.filter(new KeyWordFilter("flink"));public static class KeyWordFilter implements FilterFunction { private String keyWord; KeyWordFilter(String keyWord) { this.keyWord = keyWord; } @Override public boolean filter(String value) throws Exception { return value.contains(this.keyWord); }}
sum就不要自己接口的,底部帮我实现了
5.5.2 匿名函数(Lambda Functions)
DataStreamtweets = env.readTextFile("INPUT_FILE");DataStream flinkTweets = tweets.filter( tweet -> tweet.contains("flink") );
5.5.3 富函数(Rich Functions)
public static class MyMapFunction extends RichMapFunction> { @Override public Tuple2 map(SensorReading value) throws Exception { return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), value.getId()); } @Override public void open(Configuration parameters) throws Exception { System.out.println("my map open"); // 以下可以做一些初始化工作,例如建立一个和 HDFS 的连接 } @Override public void close() throws Exception { System.out.println("my map close"); // 以下做一些清理工作,例如断开和 HDFS 的连接 }}
import com.atguigu.apitest.beans.SensorReading;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.functions.RichMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @ClassName: TransformTest5_RichFunction * @Description: * @Author: wushengran on 2020/11/9 9:29 * @Version: 1.0 */public class TransformTest5_RichFunction { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); // 从文件读取数据 DataStreaminputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt"); // 转换成SensorReading类型 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); DataStream > resultStream = dataStream.map( new MyMapper() ); resultStream.print(); env.execute(); } public static class MyMapper0 implements MapFunction >{ @Override public Tuple2 map(SensorReading value) throws Exception { return new Tuple2<>(value.getId(), value.getId().length()); } } // 实现自定义富函数类 public static class MyMapper extends RichMapFunction >{ @Override public Tuple2 map(SensorReading value) throws Exception { getRuntimeContext().getState(); //获取只想此任务并行(分区)号 getRuntimeContext().getIndexOfThisSubtask(); return new Tuple2<>(value.getId(), getRuntimeContext().getIndexOfThisSubtask()); } @Override public void open(Configuration parameters) throws Exception { // 初始化工作,一般是定义状态,或者建立数据库连接 System.out.println("open"); } @Override public void close() throws Exception { // 一般是关闭连接和清空状态的收尾操作 System.out.println("close"); } }}
5.6 Sink
5.6.1 Kafka
kafka 也是流式处理可以来一个处理一个,和flink就是天生一对。
org.apache.flink flink-connector-kafka-0.11_2.12 1.10.1
package com.atguigu.apitest.sink;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved ** Project: FlinkTutorial * Package: com.atguigu.apitest.sink * Version: 1.0 *
* Created by wushengran on 2020/11/9 10:24 */import com.atguigu.apitest.beans.SensorReading;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;import java.util.Properties;/** * @ClassName: SinkTest1_Kafka * @Description: * @Author: wushengran on 2020/11/9 10:24 * @Version: 1.0 */public class SinkTest1_Kafka {
public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);// // 从文件读取数据// DataStreaminputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt"); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); // 从文件读取数据 DataStream inputStream = env.addSource( new FlinkKafkaConsumer011 ("sensor", new SimpleStringSchema(), properties)); // 转换成SensorReading类型 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])).toString(); }); dataStream.addSink( new FlinkKafkaProducer011 ("localhost:9092", "sinktest", new SimpleStringSchema())); env.execute(); }}
5.6.2 Redis
flink2.11官方没有提供共redis的skin, 使用bahir第三方提供的
org.apache.bahir flink-connector-redis_2.11 1.0
定义一个 redis 的 mapper 类,用于定义保存到 redis 时调用的命令:
package com.atguigu.apitest.sink;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved ** Project: FlinkTutorial * Package: com.atguigu.apitest.sink * Version: 1.0 *
* Created by wushengran on 2020/11/9 10:42 */import com.atguigu.apitest.beans.SensorReading;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.redis.RedisSink;import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;/** * @ClassName: SinkTest2_Redis * @Description: * @Author: wushengran on 2020/11/9 10:42 * @Version: 1.0 */public class SinkTest2_Redis {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件读取数据 DataStreaminputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt"); // 转换成SensorReading类型 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 定义jedis连接配置 FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder() .setHost("localhost") .setPort(6379) .build(); dataStream.addSink( new RedisSink<>(config, new MyRedisMapper())); env.execute(); } // 自定义RedisMapper public static class MyRedisMapper implements RedisMapper { // 定义保存数据到redis的命令,存成Hash表,hset sensor_temp id temperature @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp"); } @Override public String getKeyFromData(SensorReading data) { return data.getId(); } @Override public String getValueFromData(SensorReading data) { return data.getTemperature().toString(); } }}
print也是一种slink只是写到控制台
resultStream.print();
5.6.4 JDBC 自定义 mySql sink
mysql mysql-connector-java 5.1.44
package com.atguigu.apitest.sink;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved ** Project: FlinkTutorial * Package: com.atguigu.apitest.sink * Version: 1.0 *
* Created by wushengran on 2020/11/9 11:43 */import com.atguigu.apitest.beans.SensorReading;import com.atguigu.apitest.source.SourceTest4_UDF;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;/** * @ClassName: SinkTest4_Jdbc * @Description: * @Author: wushengran on 2020/11/9 11:43 * @Version: 1.0 */public class SinkTest4_Jdbc {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件读取数据// DataStreaminputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");//// // 转换成SensorReading类型// DataStream dataStream = inputStream.map(line -> { // String[] fields = line.split(",");// return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));// }); DataStream dataStream = env.addSource(new SourceTest4_UDF.MySensorSource()); dataStream.addSink(new MyJdbcSink()); env.execute(); } // 实现自定义的SinkFunction public static class MyJdbcSink extends RichSinkFunction { // 声明连接和预编译语句 Connection connection = null; PreparedStatement insertStmt = null; PreparedStatement updateStmt = null; @Override public void open(Configuration parameters) throws Exception { connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456"); insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)"); updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?"); } // 每来一条数据,调用连接,执行sql @Override public void invoke(SensorReading value, Context context) throws Exception { // 直接执行更新语句,如果没有更新那么就插入 updateStmt.setDouble(1, value.getTemperature()); updateStmt.setString(2, value.getId()); updateStmt.execute(); if( updateStmt.getUpdateCount() == 0 ){ insertStmt.setString(1, value.getId()); insertStmt.setDouble(2, value.getTemperature()); insertStmt.execute(); } } @Override public void close() throws Exception { insertStmt.close(); updateStmt.close(); connection.close(); } }}
转载地址:https://blog.csdn.net/yangshengwei230612/article/details/116567038 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!