本文共 5740 字,大约阅读时间需要 19 分钟。
一.flink简介
1.传统数据处理架构
事务处理:java 后端->数据库
分析处理:离线数仓 有状态的流式处理:实时处理流处理的演变:第二代流失处理架构(lambda)
2.Flink 的主要特点
3.Flink vs Spark Streaming
延迟:flink 毫秒级延迟,sparkStreaming 秒级延迟
架构:flink 真正的流,sparkStreaming 微批 flink没有Stage的概念,每个个节点的计算不需要等待。4.WordCount
dataSet
dataSet是数据集不是数据流
package com.atguigu.wc;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved ** Project: FlinkTutorial * Package: com.atguigu.wc * Version: 1.0 *
* Created by wushengran on 2020/11/6 11:22 */import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.util.Collector;/** * @ClassName: WordCount * @Description: * @Author: wushengran on 2020/11/6 11:22 * @Version: 1.0 */// 批处理word countpublic class WordCount {
public static void main(String[] args) throws Exception{ // 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //data/sensor.txt // 从文件中读取数据 // String inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\hello.txt"; String inputPath = "data/hello.txt"; DataSetinputDataSet = env.readTextFile(inputPath); // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计 DataSet > resultSet = inputDataSet.flatMap(new MyFlatMapper()) .groupBy(0) // 按照第一个位置的word分组 .sum(1); // 将第二个位置上的数据求和 resultSet.print(); } // 自定义类,实现FlatMapFunction接口 public static class MyFlatMapper implements FlatMapFunction > { @Override public void flatMap(String value, Collector > out) throws Exception { // 按空格分词 String[] words = value.split(" "); // 遍历所有word,包成二元组输出 for (String word : words) { out.collect(new Tuple2<>(word, 1)); } } }}
DataStream
流式处理API
package com.atguigu.wc;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved ** Project: FlinkTutorial * Package: com.atguigu.wc * Version: 1.0 *
* Created by wushengran on 2020/11/6 11:48 */import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.net.URL;/** * @ClassName: StreamWordCount * @Description: * @Author: wushengran on 2020/11/6 11:48 * @Version: 1.0 */public class StreamWordCount {
public static void main(String[] args) throws Exception{ // 创建流处理执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1);// env.disableOperatorChaining();// // 从文件中读取数据// String inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\hello.txt";// DataStreaminputDataStream = env.readTextFile(inputPath); // 用parameter tool工具从程序启动参数中提取配置项 ParameterTool parameterTool = ParameterTool.fromArgs(args); String host = parameterTool.get("host"); int port = parameterTool.getInt("port"); // 从socket文本流读取数据 DataStream inputDataStream = env.socketTextStream(host, port); // 基于数据流进行转换计算 DataStream > resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper()).slotSharingGroup("green") .keyBy(0) .sum(1).setParallelism(2).slotSharingGroup("red"); resultStream.print().setParallelism(1); // 执行任务 env.execute(); }}
nc -lk 7777
第三章 Flink 部署
flink下载
https://flink.apache.org/zh/ flink 1.10之前的版本都包含了对hadoop的依赖 flink1.10之后的版本flink的安装包和hadoop的依赖分离开来了 flink/conf/flink-conf.yaml的主要内容3.1 Standalone 模式
3.1.1 安装
解压缩 flink-1.10.1-bin-scala_2.12.tgz,进入 conf 目录中。
1)修改 flink/conf/flink-conf.yaml 文件:3)分发给另外两台机子:
4)启动 : flink/conf/flink-conf.yaml访问 http://localhost:8081 可以对 flink 集群和任务进行监控管理。
3.1.2 提交任务
web页面提交任务
task任务需要的taskSloat 一般等于执行任务需要最大一个并行度命令提交任务
-
准备数据文件(如果需要)
-
把含数据文件的文件夹,分发到 taskmanage 机器
如 果 从 文 件 中 读 取 数 据 , 由 于 是 从 本 地 磁 盘 读 取 , 实 际 任 务 会 被 分 发 到 taskmanage 的机器中,所以要把目标文件分发 -
执行程序
./flink run -p 2 -c com.atguigu.wc.StreamWordCount /root/ysw/flink/jar/FlinkTutorial-1.0-SNAPSHOT.jar --host kafka1 --port 66664) 查看计算结果 注意:如果输出到控制台,应该在 taskmanager 下查看;如果计算结果输出到文 件,同样会保存到 taskmanage 的机器下,不会在 jobmanage 下。
- 在 webui 控制台查看计算过程
3.2 Yarn 模式
现在流行的资源管理平台主要是yarn和k8s
3.2.1 Flink on Yarn
3.2.2 Session Cluster
- 启动 hadoop 集群(略)
- 启动 yarn-session
/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d3) 执行任务
./flink run -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777
- 去 yarn 控制台查看任务状态
- 取消 yarn-session
yarn application --kill application_1577588252906_0001
3.2.2 Per Job Cluster
- 启动 hadoop 集群(略)
- 不启动 yarn-session,直接执行 job
./flink run –m yarn-cluster -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777
3.3 Kubernetes 部署
// 启动 jobmanager-service 服务kubectl create -f jobmanager-service.yaml// 启动 jobmanager-deployment 服务kubectl create -f jobmanager-deployment.yaml// 启动 taskmanager-deployment 服务kubectl create -f taskmanager-deployment.yaml
4)访问 Flink UI 页面
集群启动后,就可以通过 JobManagerServicers 中配置的 WebUI 端口,用浏览器 输入以下 url 来访问 Flink UI 页面了: http://{JobManagerHost:Port}/api/v1/namespaces/default/services/flink-jobmanage r:ui/proxy转载地址:https://blog.csdn.net/yangshengwei230612/article/details/116502663 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!