flink1.10.1--java 版--尚硅谷1-3简介/入门/安装/提交任务
发布日期:2021-06-28 21:02:51 浏览次数:3 分类:技术文章

本文共 5740 字,大约阅读时间需要 19 分钟。

一.flink简介

1.传统数据处理架构

事务处理:java 后端->数据库

在这里插入图片描述

分析处理:离线数仓

在这里插入图片描述

有状态的流式处理:实时处理
在这里插入图片描述

流处理的演变:第二代流失处理架构(lambda)

在这里插入图片描述

2.Flink 的主要特点

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.Flink vs Spark Streaming

延迟:flink 毫秒级延迟,sparkStreaming 秒级延迟

架构:flink 真正的流,sparkStreaming 微批

在这里插入图片描述

flink没有Stage的概念,每个个节点的计算不需要等待。
在这里插入图片描述

4.WordCount

dataSet

dataSet是数据集不是数据流

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

package com.atguigu.wc;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved * 

* Project: FlinkTutorial * Package: com.atguigu.wc * Version: 1.0 *

* Created by wushengran on 2020/11/6 11:22 */import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.util.Collector;/** * @ClassName: WordCount * @Description: * @Author: wushengran on 2020/11/6 11:22 * @Version: 1.0 */// 批处理word countpublic class WordCount {

public static void main(String[] args) throws Exception{
// 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //data/sensor.txt // 从文件中读取数据 // String inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\hello.txt"; String inputPath = "data/hello.txt"; DataSet
inputDataSet = env.readTextFile(inputPath); // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计 DataSet
> resultSet = inputDataSet.flatMap(new MyFlatMapper()) .groupBy(0) // 按照第一个位置的word分组 .sum(1); // 将第二个位置上的数据求和 resultSet.print(); } // 自定义类,实现FlatMapFunction接口 public static class MyFlatMapper implements FlatMapFunction
> {
@Override public void flatMap(String value, Collector
> out) throws Exception { // 按空格分词 String[] words = value.split(" "); // 遍历所有word,包成二元组输出 for (String word : words) { out.collect(new Tuple2<>(word, 1)); } } }}

在这里插入图片描述

DataStream

流式处理API

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

package com.atguigu.wc;/** * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved * 

* Project: FlinkTutorial * Package: com.atguigu.wc * Version: 1.0 *

* Created by wushengran on 2020/11/6 11:48 */import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.net.URL;/** * @ClassName: StreamWordCount * @Description: * @Author: wushengran on 2020/11/6 11:48 * @Version: 1.0 */public class StreamWordCount {

public static void main(String[] args) throws Exception{
// 创建流处理执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1);// env.disableOperatorChaining();// // 从文件中读取数据// String inputPath = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\hello.txt";// DataStream
inputDataStream = env.readTextFile(inputPath); // 用parameter tool工具从程序启动参数中提取配置项 ParameterTool parameterTool = ParameterTool.fromArgs(args); String host = parameterTool.get("host"); int port = parameterTool.getInt("port"); // 从socket文本流读取数据 DataStream
inputDataStream = env.socketTextStream(host, port); // 基于数据流进行转换计算 DataStream
> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper()).slotSharingGroup("green") .keyBy(0) .sum(1).setParallelism(2).slotSharingGroup("red"); resultStream.print().setParallelism(1); // 执行任务 env.execute(); }}

在这里插入图片描述

在这里插入图片描述

nc -lk 7777

在这里插入图片描述

第三章 Flink 部署

flink下载

https://flink.apache.org/zh/
在这里插入图片描述

在这里插入图片描述

flink 1.10之前的版本都包含了对hadoop的依赖
在这里插入图片描述
flink1.10之后的版本flink的安装包和hadoop的依赖分离开来了
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

flink/conf/flink-conf.yaml的主要内容
在这里插入图片描述
在这里插入图片描述

3.1 Standalone 模式

3.1.1 安装

解压缩 flink-1.10.1-bin-scala_2.12.tgz,进入 conf 目录中。

1)修改 flink/conf/flink-conf.yaml 文件:
在这里插入图片描述
在这里插入图片描述

3)分发给另外两台机子:

在这里插入图片描述
4)启动 :
在这里插入图片描述
在这里插入图片描述
flink/conf/flink-conf.yaml
在这里插入图片描述

访问 http://localhost:8081 可以对 flink 集群和任务进行监控管理。

在这里插入图片描述
在这里插入图片描述

3.1.2 提交任务

web页面提交任务

在这里插入图片描述

task任务需要的taskSloat 一般等于执行任务需要最大一个并行度
在这里插入图片描述
在这里插入图片描述

命令提交任务

  1. 准备数据文件(如果需要)

    在这里插入图片描述

  2. 把含数据文件的文件夹,分发到 taskmanage 机器

    在这里插入图片描述
    如 果 从 文 件 中 读 取 数 据 , 由 于 是 从 本 地 磁 盘 读 取 , 实 际 任 务 会 被 分 发 到
    taskmanage 的机器中,所以要把目标文件分发

  3. 执行程序

    在这里插入图片描述
    在这里插入图片描述

./flink run -p 2  -c com.atguigu.wc.StreamWordCount  /root/ysw/flink/jar/FlinkTutorial-1.0-SNAPSHOT.jar --host kafka1 --port 6666

在这里插入图片描述

在这里插入图片描述

4) 查看计算结果
注意:如果输出到控制台,应该在 taskmanager 下查看;如果计算结果输出到文
件,同样会保存到 taskmanage 的机器下,不会在 jobmanage 下。

  1. 在 webui 控制台查看计算过程
    在这里插入图片描述
    在这里插入图片描述

3.2 Yarn 模式

现在流行的资源管理平台主要是yarn和k8s

在这里插入图片描述

3.2.1 Flink on Yarn

在这里插入图片描述

3.2.2 Session Cluster

  1. 启动 hadoop 集群(略)
  2. 启动 yarn-session
/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d

在这里插入图片描述

3) 执行任务

./flink run -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777
  1. 去 yarn 控制台查看任务状态
    在这里插入图片描述
  2. 取消 yarn-session
yarn application --kill application_1577588252906_0001

3.2.2 Per Job Cluster

  1. 启动 hadoop 集群(略)
  2. 不启动 yarn-session,直接执行 job
./flink run –m yarn-cluster -c com.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777

3.3 Kubernetes 部署

在这里插入图片描述

// 启动 jobmanager-service 服务kubectl create -f jobmanager-service.yaml// 启动 jobmanager-deployment 服务kubectl create -f jobmanager-deployment.yaml// 启动 taskmanager-deployment 服务kubectl create -f taskmanager-deployment.yaml

4)访问 Flink UI 页面

集群启动后,就可以通过 JobManagerServicers 中配置的 WebUI 端口,用浏览器
输入以下 url 来访问 Flink UI 页面了:
http://{JobManagerHost:Port}/api/v1/namespaces/default/services/flink-jobmanage
r:ui/proxy

转载地址:https://blog.csdn.net/yangshengwei230612/article/details/116502663 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:flink1.10.1--java 版--尚硅谷-第四章 Flink 运行架构+API
下一篇:hadoop 安装部署-HDFS/YARN/MR

发表评论

最新留言

关注你微信了!
[***.104.42.241]2024年04月29日 07时37分02秒