flink1.10.1--java 版--尚硅谷-第四章 Flink 运行架构+API
发布日期:2021-06-28 21:02:53 浏览次数:2 分类:技术文章

本文共 29225 字,大约阅读时间需要 97 分钟。

第四章 Flink 运行架构

4.1 Flink 运行时的组件

在这里插入图片描述

作业管理器(JobManager)

在这里插入图片描述

任务管理器(TaskManager)

在这里插入图片描述

资源管理器(ResourceManager)

在这里插入图片描述

分发器(Dispatcher)

在这里插入图片描述

4.2 任务提交流程

在这里插入图片描述

在这里插入图片描述

任务提交流程(YARN: per-job模式)

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

4.3 任务调度原理

在这里插入图片描述

在这里插入图片描述

4.3.1 TaskManger 与 Slots

推荐使用当前机器cpu 核数来设置TaskManager对用的TaskSlot数

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

4.3.2 程序与数据流(DataFlow)

在这里插入图片描述

在这里插入图片描述

4.3.3 执行图(ExecutionGraph)

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
client生成的优化依据是根据算子之前数据传输的形式和并行度,两个算子的数据依赖是One-to-one(在依赖)且并行度相同,可以优化合并到一起最后会被分到一个TaskSlot中

4.3.4 并行度(Parallelism)

在这里插入图片描述

在这里插入图片描述

算子之间数据的传输形式:

一个程序中,不同的算子可能具有不同的并行度

在这里插入图片描述

4.3.5 任务链(Operator Chains)

在这里插入图片描述

client生成的优化依据是根据算子之前数据传输的形式和并行度,两个算子的数据依赖是One-to-one(在依赖)且并行度相同,可以优化合并到一起最后会被分到一个TaskSlot中
在这里插入图片描述
优化之后只有五个任务,且最大算子的并行度是2,所有需要2个TaskSlot就可以运行任务。

第五章 Flink 流处理 API

在这里插入图片描述

5.1 Environment

在这里插入图片描述

在这里插入图片描述

5.2 Source

5.2.1 从集合读取数据

public class SourceTest1_Collection {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 1.Source:从集合读取数据 DataStream
sensorDataStream = env.fromCollection( Arrays.asList( new SensorReading("sensor_1", 1547718199L, 35.8), new SensorReading("sensor_6", 1547718201L, 15.4), new SensorReading("sensor_7", 1547718202L, 6.7), new SensorReading("sensor_10", 1547718205L, 38.1) ) ); // 2.打印 sensorDataStream.print(); // 3.执行 env.execute(); } }

5.2.2 从文件读取数据

DataStream
dataStream = env.readTextFile("YOUR_FILE_PATH ");

5.2.3 以 kafka 消息队列的数据作为来源

需要引入 kafka 连接器的依赖:

pom.xml

org.apache.flink
flink-connector-kafka-0.11_2.12
1.10.1

具体代码如下:

// kafka 配置项Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");// 从 kafka 读取数据DataStream
dataStream = env.addSource( new FlinkKafkaConsumer011
("sensor", new SimpleStringSchema(), properties));

5.2.4 自定义 Source

除了以上的 source 数据来源,我们还可以自定义 source。需要做的,只是传入

一个 SourceFunction 就可以。具体调用如下:

DataStream
dataStream = env.addSource( new MySensor());

我们希望可以随机生成传感器数据,MySensorSource 具体的代码实现如下:

public static class MySensor implements SourceFunction
{
private boolean running = true; public void run(SourceContext
ctx) throws Exception {
Random random = new Random(); HashMap
sensorTempMap = new HashMap
(); for( int i = 0; i < 10; i++ ){
sensorTempMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20); } while (running) {
for( String sensorId: sensorTempMap.keySet() ){
Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian(); sensorTempMap.put(sensorId, newTemp); ctx.collect( new SensorReading(sensorId, System.currentTimeMillis(), newTemp)); } Thread.sleep(1000L); } } public void cancel() {
this.running = false; } }

5.3 Transform

普通转换算子

5.3.1 map

在这里插入图片描述

DataStream
mapStram = dataStream.map(new MapFunction
() {
public Integer map(String value) throws Exception {
return value.length(); }});

5.3.2 flatMap

DataStream
flatMapStream = dataStream.flatMap(new FlatMapFunction
() {
public void flatMap(String value, Collector
out) throws Exception {
String[] fields = value.split(","); for( String field: fields ) out.collect(field); }});

5.3.3 Filter

在这里插入图片描述

DataStream
filterStream = dataStream.filter(new FilterFunction
() {
public boolean filter(String value) throws Exception {
return value == 1; }});
package com.atguigu.apitest.transform;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved * 

* Project: FlinkTutorial * Package: com.atguigu.apitest.transform * Version: 1.0 *

* Created by wushengran on 2020/11/7 14:28 */import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * @ClassName: TransformTest1_Base * @Description: * @Author: wushengran on 2020/11/7 14:28 * @Version: 1.0 */public class TransformTest1_Base {

public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件读取数据 DataStream
inputStream = env.readTextFile("data/sensor.txt"); // 1. map,把String转换成长度输出 DataStream
mapStream = inputStream.map(new MapFunction
() {
@Override public Integer map(String value) throws Exception {
return value.length(); } }); // 2. flatmap,按逗号分字段 DataStream
flatMapStream = inputStream.flatMap(new FlatMapFunction
() { @Override public void flatMap(String value, Collector
out) throws Exception { String[] fields = value.split(","); for( String field: fields ) out.collect(field); } }); // 3. filter, 筛选sensor_1开头的id对应的数据 DataStream
filterStream = inputStream.filter(new FilterFunction
() { @Override public boolean filter(String value) throws Exception { return value.startsWith("sensor_1"); } }); // 打印输出 mapStream.print("map"); flatMapStream.print("flatMap"); filterStream.print("filter"); env.execute(); }}

在这里插入图片描述

5.3.4 KeyBy

根据key的hash值进行分区,key hash之后再根据分区数取模确定分区号,同一个key的数据一定会放在一个分区中,由于hash碰撞同一个分区中可以存在多个key的数据。

在这里插入图片描述
DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分
区包含具有相同 key 的元素,在内部以 hash 的形式实现的。
在这里插入图片描述

在这里插入图片描述

fink api 中dataStream只有keyBy变成KeyedStream之后才能做聚合操作

package com.atguigu.apitest.transform;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved * 

* Project: FlinkTutorial * Package: com.atguigu.apitest.transform * Version: 1.0 *

* Created by wushengran on 2020/11/7 15:09 */import com.atguigu.apitest.beans.SensorReading;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import sun.awt.SunHints;import javax.xml.crypto.Data;/** * @ClassName: TransformTest2_RollingAggregation * @Description: * @Author: wushengran on 2020/11/7 15:09 * @Version: 1.0 */public class TransformTest2_RollingAggregation {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); // 从文件读取数据 DataStream
inputStream = env.readTextFile("data/sensor.txt"); // 转换成SensorReading类型// DataStream
dataStream = inputStream.map(new MapFunction
() {
// @Override// public SensorReading map(String value) throws Exception {
// String[] fields = value.split(",");// return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));// }// }); DataStream
dataStream = inputStream.map( line -> {
String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); } ); // 分组 KeyedStream
keyedStream = dataStream.keyBy("id"); KeyedStream
keyedStream1 = dataStream.keyBy(data -> data.getId()); // KeyedStream
keyedStream2 = dataStream.keyBy(SensorReading::getId); DataStream
dataStream1 = env.fromElements(1L, 34L, 4L, 657L, 23L); KeyedStream
keyedStream2 = dataStream1.keyBy(new KeySelector
() { @Override public Integer getKey(Long value) throws Exception { return value.intValue() % 2; } }); // KeyedStream
keyedStream1 = dataStream.keyBy(SensorReading::getId); // 滚动聚合,取当前最大的温度值 DataStream
resultStream = keyedStream.maxBy("temperature"); resultStream.print("result"); // keyedStream.print("key"); // keyedStream1.print("key1"); // keyedStream2.sum(0).print("key2"); env.execute(); }}

聚合转换算子

5.3.5 滚动聚合算子(Rolling Aggregation)

在这里插入图片描述

在这里插入图片描述

5.3.6 Reduce

在这里插入图片描述

在这里插入图片描述

package com.atguigu.apitest.transform;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved * 

* Project: FlinkTutorial * Package: com.atguigu.apitest.transform * Version: 1.0 *

* Created by wushengran on 2020/11/7 15:39 */import com.atguigu.apitest.beans.SensorReading;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @ClassName: TransformTest3_Reduce * @Description: * @Author: wushengran on 2020/11/7 15:39 * @Version: 1.0 */public class TransformTest3_Reduce {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件读取数据 DataStream
inputStream = env.readTextFile("data/sensor.txt"); // 转换成SensorReading类型 DataStream
dataStream = inputStream.map(line -> {
String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 分组 KeyedStream
keyedStream = dataStream.keyBy("id"); // reduce聚合,取最大的温度值,以及当前最新的时间戳 SingleOutputStreamOperator
resultStream = keyedStream.reduce(new ReduceFunction
() { @Override public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception { return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature())); } }); //labmer/* keyedStream.reduce( (curState, newData) -> { return new SensorReading(curState.getId(), newData.getTimestamp(), Math.max(curState.getTemperature(), newData.getTemperature())); });*/ resultStream.print(); env.execute(); }}

多流转换算子

5.3.7 Split + Select process

flink1.12及新版

Side Outputs:可以使用process方法对流中数据进行处理,并针对不同的处理结果将数据收集到不同的OutputTag中
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

import org.apache.flink.api.common.RuntimeExecutionMode;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.ProcessFunction;import org.apache.flink.util.Collector;import org.apache.flink.util.OutputTag;/** * Author itcast * Desc 演示DataStream-Transformation-拆分(split)和选择(select)操作 * 注意split和select在flink1.12中已经过期并移除了 * 所以得使用outPutTag和process来实现 * 需求:对流中的数据按照奇数和偶数拆分并选择 */public class TransformationDemo03 {
public static void main(String[] args) throws Exception {
//TODO 0.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //TODO 1.source DataStreamSource
ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); //TODO 2.transformation //需求:对流中的数据按照奇数和偶数拆分并选择 OutputTag
oddTag = new OutputTag<>("奇数", TypeInformation.of(Integer.class)); OutputTag
evenTag = new OutputTag<>("偶数",TypeInformation.of(Integer.class)); /* public abstract class ProcessFunction
extends AbstractRichFunction { public abstract void processElement(I value, ProcessFunction.Context ctx, Collector
out) throws Exception; } */ SingleOutputStreamOperator
result = ds.process(new ProcessFunction
() { @Override public void processElement(Integer value, Context ctx, Collector
out) throws Exception { //out收集完的还是放在一起的,ctx可以将数据放到不同的OutputTag if (value % 2 == 0) { ctx.output(evenTag, value); } else { ctx.output(oddTag, value); } } }); DataStream
oddResult = result.getSideOutput(oddTag); DataStream
evenResult = result.getSideOutput(evenTag); //TODO 3.sink System.out.println(oddTag);//OutputTag(Integer, 奇数) System.out.println(evenTag);//OutputTag(Integer, 偶数) oddResult.print("奇数:");//打印时添加前缀 evenResult.print("偶数:"); //TODO 4.execute env.execute(); }}

旧版

Split就是将一个流分成多个流
Select就是获取分流后对应的数据
拆分(split)和选择(select)操作
注意split和select在flink1.12中已经过期并移除了
所以得使用outPutTag和process来实现

需求:

对流中的数据按照奇数和偶数进行分流,并获取分流后的数据

Split 切割

在这里插入图片描述
Select
在这里插入图片描述
在这里插入图片描述

SplitStream
splitStream = dataStream.split(new OutputSelector
() {
@Override public Iterable
select(SensorReading value) {
return (value.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low"); }});DataStream
highTempStream = splitStream.select("high");DataStream
lowTempStream = splitStream.select("low");DataStream
allTempStream = splitStream.select("high", "low");

5.3.8 Connect 和 CoMap

Connect + CoMap将两条流合成一条流

**Connect **

在这里插入图片描述

Connect 虽然放到一起其还是两条流,两条流的数据类型可以不一致,connect之后需要再comap之后才合并成真正的一条流

在这里插入图片描述

在这里插入图片描述

ConnectedStreams
, SensorReading> connectedStreams = warningStream.connect(lowTempStream); DataStream
resultStream = connectedStreams.map(new CoMapFunction
, SensorReading, Tuple2>() {
@Override public Tuple2 map1(Tuple2
value) throws Exception { return new Tuple2<>(value.f0, "high temp warning"); } @Override public Tuple2 map2(SensorReading value) throws Exception { return new Tuple2<>(value.getId(), "high temp warning"); } });
DataStream
> warningStream = highTempStream.map(new MapFunction
>() {
@Override public Tuple2
map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), value.getTemperature()); }});ConnectedStreams
, SensorReading> connectedStreams = warningStream.connect(lowTempStream);DataStream
resultStream = connectedStreams.map(new CoMapFunction
, SensorReading, Object>() { @Override public Object map1(Tuple2
value) throws Exception { return new Tuple3<>(value.f0, value.f1, "warning"); } @Override public Object map2(SensorReading value) throws Exception { return new Tuple2<>(value.getId(), "healthy"); }});

5.3.9 Union

在这里插入图片描述

在这里插入图片描述

DataStream
unionStream = highTempStream.union(lowTempStream);

目前Flink支持8种分区策略:

global: 数据会被分发到下游算子的第一个实例中进行处理。
shuffle :数据会被随机分发到下游算子的每一个实例中进行。
rebalance: 数据会被循环发送到下游的每一个实例中进行处理。
rescale :这种分区器会根据上下游算子的并行度,循环的方式输出到下游算子的每个实例。这里有点难以理解,假设上游并行度为 2,编号为 A 和 B。下游并行度为 4,编号为 1,2,3,4。那么 A 则把数据循环发送给 1 和 2,B 则把数据循环发送给 3 和 4。假设上游并行度为 4,编号为 A,B,C,D。下游并行度为 2,编号为 1,2。那么 A 和 B 则把数据发送给 1,C 和 D 则把数据发送给 2。
broadcastr :广播分区会将上游数据输出到下游算子的每个实例中。适合于大数据集和小数据集做Jion的场景。
forward:用于将记录输出到下游本地的算子实例。它要求上下游算子并行度一样。简单的说,ForwardPartitioner用来做数据的控制台打印。
keyBy分区器。会将数据按Key的Hash值输出到下游算子实例中。​​​​
partitionCustomr:用户自定义分区器。需要用户自己实现 Partitioner 接口,来定义自己的分区逻辑。

在这里插入图片描述

在这里插入图片描述

package com.atguigu.apitest.transform;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved * 

* Project: FlinkTutorial * Package: com.atguigu.apitest.transform * Version: 1.0 *

* Created by wushengran on 2020/11/9 10:14 */import com.atguigu.apitest.beans.SensorReading;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @ClassName: TransformTest6_Partition * @Description: * @Author: wushengran on 2020/11/9 10:14 * @Version: 1.0 */public class TransformTest6_Partition {

public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); // 从文件读取数据 DataStream
inputStream = env.readTextFile("data/sensor.txt"); // 转换成SensorReading类型 DataStream
dataStream = inputStream.map(line -> {
String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); dataStream.print("input"); // 1. shuffle DataStream
shuffleStream = inputStream.shuffle();// shuffleStream.print("shuffle"); // 2. keyBy// dataStream.keyBy("id").print("keyBy"); // 3. global dataStream.global().print("global"); env.execute(); }}

5.4 支持的数据类型

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

5.5 实现 UDF 函数——更细粒度的控制流

5.5.1 函数类(Function Classes)

在这里插入图片描述

DataStream
flinkTweets = tweets.filter(new FlinkFilter());public static class FlinkFilter implements FilterFunction
{
@Override public boolean filter(String value) throws Exception {
return value.contains("flink"); }}

还可以将函数实现成匿名类

DataStream
flinkTweets = tweets.filter(new FilterFunction
() {
@Override public boolean filter(String value) throws Exception {
return value.contains("flink"); }});

我们 filter 的字符串"flink"还可以当作参数传进去。

DataStream
tweets = env.readTextFile("INPUT_FILE ");DataStream
flinkTweets = tweets.filter(new KeyWordFilter("flink"));public static class KeyWordFilter implements FilterFunction
{
private String keyWord; KeyWordFilter(String keyWord) {
this.keyWord = keyWord; } @Override public boolean filter(String value) throws Exception {
return value.contains(this.keyWord); }}

sum就不要自己接口的,底部帮我实现了

在这里插入图片描述

5.5.2 匿名函数(Lambda Functions)

DataStream
tweets = env.readTextFile("INPUT_FILE");DataStream
flinkTweets = tweets.filter( tweet -> tweet.contains("flink") );

5.5.3 富函数(Rich Functions)

在这里插入图片描述

public static class MyMapFunction extends RichMapFunction
> {
@Override public Tuple2
map(SensorReading value) throws Exception {
return new Tuple2<>(getRuntimeContext().getIndexOfThisSubtask(), value.getId()); } @Override public void open(Configuration parameters) throws Exception {
System.out.println("my map open"); // 以下可以做一些初始化工作,例如建立一个和 HDFS 的连接 } @Override public void close() throws Exception {
System.out.println("my map close"); // 以下做一些清理工作,例如断开和 HDFS 的连接 }}
import com.atguigu.apitest.beans.SensorReading;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.functions.RichMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @ClassName: TransformTest5_RichFunction * @Description: * @Author: wushengran on 2020/11/9 9:29 * @Version: 1.0 */public class TransformTest5_RichFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); // 从文件读取数据 DataStream
inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt"); // 转换成SensorReading类型 DataStream
dataStream = inputStream.map(line -> {
String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); DataStream
> resultStream = dataStream.map( new MyMapper() ); resultStream.print(); env.execute(); } public static class MyMapper0 implements MapFunction
>{
@Override public Tuple2
map(SensorReading value) throws Exception { return new Tuple2<>(value.getId(), value.getId().length()); } } // 实现自定义富函数类 public static class MyMapper extends RichMapFunction
>{ @Override public Tuple2
map(SensorReading value) throws Exception { getRuntimeContext().getState(); //获取只想此任务并行(分区)号 getRuntimeContext().getIndexOfThisSubtask(); return new Tuple2<>(value.getId(), getRuntimeContext().getIndexOfThisSubtask()); } @Override public void open(Configuration parameters) throws Exception { // 初始化工作,一般是定义状态,或者建立数据库连接 System.out.println("open"); } @Override public void close() throws Exception { // 一般是关闭连接和清空状态的收尾操作 System.out.println("close"); } }}

5.6 Sink

在这里插入图片描述

5.6.1 Kafka

kafka 也是流式处理可以来一个处理一个,和flink就是天生一对。

org.apache.flink
flink-connector-kafka-0.11_2.12
1.10.1
package com.atguigu.apitest.sink;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved * 

* Project: FlinkTutorial * Package: com.atguigu.apitest.sink * Version: 1.0 *

* Created by wushengran on 2020/11/9 10:24 */import com.atguigu.apitest.beans.SensorReading;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;import java.util.Properties;/** * @ClassName: SinkTest1_Kafka * @Description: * @Author: wushengran on 2020/11/9 10:24 * @Version: 1.0 */public class SinkTest1_Kafka {

public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);// // 从文件读取数据// DataStream
inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt"); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); // 从文件读取数据 DataStream
inputStream = env.addSource( new FlinkKafkaConsumer011
("sensor", new SimpleStringSchema(), properties)); // 转换成SensorReading类型 DataStream
dataStream = inputStream.map(line -> {
String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])).toString(); }); dataStream.addSink( new FlinkKafkaProducer011
("localhost:9092", "sinktest", new SimpleStringSchema())); env.execute(); }}

5.6.2 Redis

flink2.11官方没有提供共redis的skin, 使用bahir第三方提供的

org.apache.bahir
flink-connector-redis_2.11
1.0

定义一个 redis 的 mapper 类,用于定义保存到 redis 时调用的命令:

package com.atguigu.apitest.sink;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved * 

* Project: FlinkTutorial * Package: com.atguigu.apitest.sink * Version: 1.0 *

* Created by wushengran on 2020/11/9 10:42 */import com.atguigu.apitest.beans.SensorReading;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.redis.RedisSink;import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;/** * @ClassName: SinkTest2_Redis * @Description: * @Author: wushengran on 2020/11/9 10:42 * @Version: 1.0 */public class SinkTest2_Redis {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件读取数据 DataStream
inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt"); // 转换成SensorReading类型 DataStream
dataStream = inputStream.map(line -> {
String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 定义jedis连接配置 FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder() .setHost("localhost") .setPort(6379) .build(); dataStream.addSink( new RedisSink<>(config, new MyRedisMapper())); env.execute(); } // 自定义RedisMapper public static class MyRedisMapper implements RedisMapper
{
// 定义保存数据到redis的命令,存成Hash表,hset sensor_temp id temperature @Override public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "sensor_temp"); } @Override public String getKeyFromData(SensorReading data) {
return data.getId(); } @Override public String getValueFromData(SensorReading data) {
return data.getTemperature().toString(); } }}

print

print也是一种slink只是写到控制台

resultStream.print();

5.6.4 JDBC 自定义 mySql sink

mysql
mysql-connector-java
5.1.44
package com.atguigu.apitest.sink;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved * 

* Project: FlinkTutorial * Package: com.atguigu.apitest.sink * Version: 1.0 *

* Created by wushengran on 2020/11/9 11:43 */import com.atguigu.apitest.beans.SensorReading;import com.atguigu.apitest.source.SourceTest4_UDF;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;/** * @ClassName: SinkTest4_Jdbc * @Description: * @Author: wushengran on 2020/11/9 11:43 * @Version: 1.0 */public class SinkTest4_Jdbc {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件读取数据// DataStream
inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");//// // 转换成SensorReading类型// DataStream
dataStream = inputStream.map(line -> {
// String[] fields = line.split(",");// return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));// }); DataStream
dataStream = env.addSource(new SourceTest4_UDF.MySensorSource()); dataStream.addSink(new MyJdbcSink()); env.execute(); } // 实现自定义的SinkFunction public static class MyJdbcSink extends RichSinkFunction
{
// 声明连接和预编译语句 Connection connection = null; PreparedStatement insertStmt = null; PreparedStatement updateStmt = null; @Override public void open(Configuration parameters) throws Exception {
connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456"); insertStmt = connection.prepareStatement("insert into sensor_temp (id, temp) values (?, ?)"); updateStmt = connection.prepareStatement("update sensor_temp set temp = ? where id = ?"); } // 每来一条数据,调用连接,执行sql @Override public void invoke(SensorReading value, Context context) throws Exception {
// 直接执行更新语句,如果没有更新那么就插入 updateStmt.setDouble(1, value.getTemperature()); updateStmt.setString(2, value.getId()); updateStmt.execute(); if( updateStmt.getUpdateCount() == 0 ){
insertStmt.setString(1, value.getId()); insertStmt.setDouble(2, value.getTemperature()); insertStmt.execute(); } } @Override public void close() throws Exception {
insertStmt.close(); updateStmt.close(); connection.close(); } }}

转载地址:https://blog.csdn.net/yangshengwei230612/article/details/116567038 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:flink1.10.1--java 版--尚硅谷-第六章 Flink Window+wartermark+状态+容错
下一篇:flink1.10.1--java 版--尚硅谷1-3简介/入门/安装/提交任务

发表评论

最新留言

不错!
[***.144.177.141]2024年04月15日 15时27分23秒