Flink-dataSource的种类和基本使用
发布日期:2021-05-07 14:49:35 浏览次数:17 分类:原创文章

本文共 7814 字,大约阅读时间需要 26 分钟。



Flink-dataSource的种类和基本使用





























前言:

对于一个flink任务,我们拿java的API层面来解释:

一个任务基本上我们分为3种























名称 作用
dataSource 表示flink的数据来源
dataStream 表示一个数据处理的过程,一个dataSource一旦经过某种转换,就成为了dataStream的一部分,比如fliter,flatMap等等操作(和spark的RDD相似)
sink 相当于spark的action动作,就是真正触发flink任务的一个动作。

那么本文讲的主要是Flink的数据源头:dataSource的相关讲解。


并行的dataSource


并行的dataSource,指的是并行度为1的dataSource。
而并行度可以通过这个方法进行查看:


dataSource.getParallelism();

无界集并行dataSource


socketTextStream()


从socket当中去获取文本数据的一种方法、
为了更加直观,没有使用lambda表达式。略显繁琐


public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 1.创建一个flink stream 序执行的环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // 2.通过这个环境创建一个抽象的的数据集dataStream // DataStreamSource<String> dataStream = environment.socketTextStream("192.168.237.130", 8888); DataStreamSource<String> dataStream = environment.socketTextStream(args[0], Integer.parseInt(args[1])); // 3.调用dataStream上的方法 ,如:transformation(可以不调用) 和sink(必须调用,类似于spark的action,提交动作)。 // 调用transformation会将一个dataStream转换为一个新的dataStream SingleOutputStreamOperator<String> dataStream2 = dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override public void flatMap(String line, Collector<String> out) throws Exception {
// 将一行单词进行切分 String[] words = line.split(" "); for (String word : words) {
// 切分后输出 out.collect(word); } } }); // 4.将单词和数字1进行组合,返回一个dataStream SingleOutputStreamOperator<Tuple2<String, Integer>> dataStream3 = dataStream2.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1); } }); // 5.进行分组聚合,根据单词进行keyBy,然后把对应的第一个数据进行累加。这里的数字是下标,对应的Tuple2<String,Integer> SingleOutputStreamOperator<Tuple2<String, Integer>> dataStream4 = dataStream3.keyBy(0).sum(1); // 到这里transformation结束 // 6.调用sink并启动 dataStream4.print(); environment.execute("StreamWordCount"); }}

通过socketTextStream这种方式获取的数据集可以为无界的。
因为只要有数据从对应的socket端口中流过,那么flink就可以进行实时的运算。


有界集并行dataSource


有界的dataSource也就是,一旦任务创建,他的数据集是有限的,不会有新的数据产生,也就是说,flink不用一直执行或者等待新的数据,只要把有限的数据执行完了,程序也就停止了。


fromCollection()


用于存放数组数据的,这里为了简便,用List
顾名思义,从Collection,也就是集合当中获取数据,但是请大家注意,一般这个方法和下面的fromElements是用来测试和验证的。
为什么这么说,解释下:比如我们要做一个flink的WordCount的小demo,那么我们可能习惯的用第一种方式,也就是socketTextStream来获取数据,但是注意,这里的数据我们是需要手动从客户端当中去输入的,比较麻烦,那么如果用fromCollection这种方式,我们就可以在一开始就定义好一定的数据,比较方便。


public class SourceDemo1 {
public static void main(String[] args) throws Exception {
// 创建实时计算的一个执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // 将客户端的集合并行化成一个抽象的数据集,通常这个方法是用作测试的,即模拟flink的数据。 // 就不需要通过socket去传递数据了 // 但是 fromElements是一个有界的数据流,一旦数据处理完,就会退出。 DataStream<Integer> nums = environment.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9)); // 这里结果是1,并行度为1,就代表只有一个subTask来产生数据。 System.out.println("=========" + nums.getParallelism() + "========="); DataStream<Integer> sum = nums.filter(new FilterFunction<Integer>() {
@Override public boolean filter(Integer value) throws Exception {
return value % 2 == 0; } }); sum.print(); environment.execute("SourceDemo1"); }}

fromElements()


指定存放相应的元素作为数据源。


public class SourceDemo1 {
public static void main(String[] args) throws Exception {
// 创建实时计算的一个执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // 创建抽象的数据集【创建原始的抽象数据集的方法,Source】 DataStream<Integer> nums = environment.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9); // 这里结果是1,并行度为1,就代表只有一个subTask来产生数据。 System.out.println("=========" + nums.getParallelism() + "========="); DataStream<Integer> sum = nums.filter(new FilterFunction<Integer>() {
@Override public boolean filter(Integer value) throws Exception {
return value % 2 == 0; } }); sum.print(); environment.execute("SourceDemo1"); }}

到这里为止,以上的3种dataSource的并行度都为1,接下来介绍并行度不是1的dataSource


非并行的dataSource


非并行的dataSource指的是并行度>1的dataSource


fromParallelCollection()


public class SourceDemo2 {
public static void main(String[] args) throws Exception {
// 创建实时计算的一个执行环境 StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // 创建抽象的数据集【创建原始的抽象数据集的方法,Source】 DataStreamSource<Long> nums = environment.fromParallelCollection(new NumberSequenceIterator(1, 10), Long.class); System.out.println("=========" + nums.getParallelism() + "========="); // DataStream<Integer> nums = environment.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9); SingleOutputStreamOperator<Long> sum = nums.filter(new FilterFunction<Long>() {
@Override public boolean filter(Long value) throws Exception {
return value % 2 == 0; } }); sum.print(); environment.execute("SourceDemo2"); }}

注意:fromParallelCollection需要指定类型为Long
最终打印的部分结果如下:
输出的并行度为8
在这里插入图片描述
输出的数据
顺便提一下,这里的输出的格式问题:



  1. 第一列代表的是subTask的编号+1

  2. 第二列代表的是输出的结果(我们demo里面输出的是偶数)


在这里插入图片描述


generateSequence()


只需要把这一部分改为:


DataStreamSource<Long> nums = environment.generateSequence(1100);

即可,那么代表数据源为1到100的整型。


readTextFile()


从文件当中去获取数据,也是属于有界集的dataSource
代码部分:


public class TextFileSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource = environment.readTextFile(args[0]); // 获取并行度 int parallelism = dataStreamSource.getParallelism(); System.out.println("-------------->:" + parallelism); SingleOutputStreamOperator<Tuple2<String, Integer>> words = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = line.split(" "); for (String word : words) {
out.collect(Tuple2.of(word, 1)); } } }); System.out.println("===============>:" + words.getParallelism()); SingleOutputStreamOperator<Tuple2<String, Integer>> sum = words.keyBy(0).sum(1); sum.print(); environment.execute("TextFileSource"); }}

设置部分:
点击设置:
在这里插入图片描述
加上你文件的地址:
在这里插入图片描述
我这里的地址是桌面的一个叫words.txt的文件


在这里插入图片描述


注意:一定要配置完后,再执行代码,否则报错
输出结果:
在这里插入图片描述


小结


dataSource分为有界集和无界集:
所以flink也可以做离线计算,也可以做实时计算



  1. 如果数据是有界集,一般用的执行环境为ExecutionEnvironment,数据量是定死的,flink做的是离线计算。(哪怕用的是实时的StreamExecutionEnvironment,但是数据一旦计算完毕,程序就会停止,所以相当于还是做得离线计算,就是批计算)


  2. 如果数据是无限集,一般用的执行环境为StreamExecutionEnvironment,Flink也就是做实时的流计算。


  3. dataSource根据环境environment的不同方法,可以分为并行dataSource和非并行dataSource。


  4. 并行的dataSource也就是 并行度>1的,通俗来讲就是允许多个线程(多个subTask,subTask为真正执行的一个单位)去同时分配的执行这个任务。而非并行的dataSource也就是并行度为1的,只有1个线程去执行。



















































- 并行 非并行 是否可以作为无界集(用于实时的流处理)
socketTextStream() Y ----- Y
fromCollection() Y ----- N(一般用于测试)
fromElements Y ----- N
fromParallelCollection ----- Y N
generateSequence ----- Y N
readTextFile ----- Y Y

这张表格的数据不一定准确(是否可以作为无界集),因为严格来说,只要数据源源不断的进入,比如在你程序结束前就有新的数据添加了,那么就是一个流处理。流处理的时间也有长有短,因此也不能随意下结论。


这里提供一个参考。
比如readTextFile,假如从Hdfs当中读取文件,而另一方面又有类似于spark,hadoop,ES等地方,不断的往Hdfs当中存放数据,那么Flink也是做的流处理。如果像本文当中的demo,那么就是一个离线处理了。所以大家要合理的思考这个事情。

上一篇:Flink-简介以及standalone集群安装和简单的测试
下一篇:ElasticSearch 快照备份和还原

发表评论

最新留言

网站不错 人气很旺了 加油
[***.192.178.218]2025年03月27日 09时46分06秒