项目实战 从 0 到 1 学习之Flink (28)FlinkSql教程(二)
发布日期:2021-05-14 00:18:20 浏览次数:11 分类:博客文章

本文共 17222 字,大约阅读时间需要 57 分钟。

���kafka���mysql

������Java������

  • ������������������������������������������������������������curl https://flink.apache.org/q/quickstart.sh | bash -s 1.10.0���������������������������������������������������������������������������pom.xml���������������������������������������������������������������������������������������������������������������������������scope������������������������������������������������������Run -> Edit Configurations������������Include dependencies with "Provided" scope������������resources���������������log4j������������������������������������������������������������������

  • ���������������������������������������������������������������������Flink ���������Hello World������������������������������������������������

     
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    DataStream dataStream = env.fromElements("Hello World");    dataStream.print();      env.execute("test");

    ������������������

    Hello World

    ������������������������������������������������������4>������������������������������4������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������CPU���������������������������������������������������������������������������������setParallelism(1)������������������������������������������������������������DataStream ������Hellow World������������������������������������������������������������������������������������������������������������kafka���mysql������������������������������DataStream���������������������������������������������Flink DataStream���������������������������������

������kafka������������

������������������������������������������������

import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.types.Row;public class FlinkSql02 {    public static final String  KAFKA_TABLE_SOURCE_DDL = "" +            "CREATE TABLE user_behavior (\n" +            "    user_id BIGINT,\n" +            "    item_id BIGINT,\n" +            "    category_id BIGINT,\n" +            "    behavior STRING,\n" +            "    ts TIMESTAMP(3)\n" +            ") WITH (\n" +            "    'connector.type' = 'kafka',  -- ���������������������kafka\n" +            "    'connector.version' = '0.11',  -- ���������������Docker���������kafka���������������\n" +            "    'connector.topic' = 'mykafka', -- ���������������topic \n" +            "    'connector.properties.group.id' = 'flink-test-0', -- ������������������������������������������\n" +            "    'connector.startup-mode' = 'earliest-offset',  --���������������������\n" +            "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zk������\n" +            "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- broker������\n" +            "    'format.type' = 'json'  -- json������������topic������������������������������\n" +            ")";    public static void main(String[] args) throws Exception {        //������StreamExecutionEnvironment         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();                //������EnvironmentSettings ���������Blink Planner        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();                //������StreamTableEnvironment         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);                //������DDL���������kafka������������        tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL);                //������������        Table table = tEnv.sqlQuery("select * from user_behavior");                //������DataStream���������        tEnv.toAppendStream(table, Row.class).print().setParallelism(1);        //������������������������������������        env.execute("test");    }}

���������������������������������������������������run������������������

543462,1715,1464116,pv,2017-11-26T01:00543462,1715,1464116,pv,2017-11-26T01:00543462,1715,1464116,pv,2017-11-26T01:00543462,1715,1464116,pv,2017-11-26T01:00

���������������������kafka������������������������������������

������������������������������������Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in������������������������������������DDL���������������������������������������������������������������������������������������������������������������������������������������������������������������

������mysql���������������

  • ������mysql������������������������flink������������������������������������������������������������������������
CREATE TABLE `user_behavior` (  `user_id` bigint(20) DEFAULT NULL,  `item_id` bigint(20) DEFAULT NULL,  `behavior` varchar(255) DEFAULT NULL,  `category_id` bigint(20) DEFAULT NULL,  `ts` timestamp(6) NULL DEFAULT NULL)

���mysql���������������������������������������������������mysql���������������������������kafka������������������������������mysql���������������������������������������������kafka������������������������

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.types.Row;public class FlinkSql02 {    public static final String  KAFKA_TABLE_SOURCE_DDL = "" +            "CREATE TABLE user_behavior (\n" +            "    user_id BIGINT,\n" +            "    item_id BIGINT,\n" +            "    category_id BIGINT,\n" +            "    behavior STRING,\n" +            "    ts TIMESTAMP(3)\n" +            ") WITH (\n" +            "    'connector.type' = 'kafka',  -- ���������������������kafka\n" +            "    'connector.version' = '0.11',  -- ���������������Docker���������kafka���������������\n" +            "    'connector.topic' = 'mykafka', -- ���������������topic \n" +            "    'connector.properties.group.id' = 'flink-test-0', -- ������������������������������������������\n" +            "    'connector.startup-mode' = 'earliest-offset',  --���������������������\n" +            "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zk������\n" +            "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- broker������\n" +            "    'format.type' = 'json'  -- json������������topic������������������������������\n" +            ")";    public static final String MYSQL_TABLE_SINK_DDL=""+            "CREATE TABLE `user_behavior_mysql` (\n" +            "  `user_id` bigint  ,\n" +            "  `item_id` bigint  ,\n" +            "  `behavior` varchar  ,\n" +            "  `category_id` bigint  ,\n" +            "  `ts` timestamp(3)   \n" +            ")WITH (\n" +            "  'connector.type' = 'jdbc', -- ������������\n" +            "  'connector.url' = 'jdbc:mysql://localhost:3306/mysql', -- jdbc���url\n" +            "  'connector.table' = 'user_behavior',  -- ������\n" +            "  'connector.driver' = 'com.mysql.jdbc.Driver', -- ���������������������������������������������������jdbc url������ \n" +            "  'connector.username' = 'root', -- ������������ ���������\n" +            "  'connector.password' = '123456' , -- ������\n" +            "  'connector.write.flush.max-rows' = '5000', -- ��������������������������������������� \n" +            "  'connector.write.flush.interval' = '2s' -- ���������������������������������������������2���������������������������������������������������������������\n"+            ")"            ;    public static void main(String[] args) throws Exception {        //������StreamExecutionEnvironment         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();                //������EnvironmentSettings ���������Blink Planner        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();                //������StreamTableEnvironment         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);                //������DDL���������kafka������������        tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL);        //������DDL���������mysql���������������        tEnv.sqlUpdate(MYSQL_TABLE_SINK_DDL);                //������kafka���������������������������mysql���        tEnv.sqlUpdate("insert into user_behavior_mysql select user_id,item_id,behavior,category_id,ts from user_behavior");                //������������������������������������        env.execute("test");    }}

���������������Navicat���������������������������������������������mysql������

user_id item_id behavior category_id ts
543462 1715 pv 1464116 2017-11-26 01:00:00.000
543462 1715 pv 1464116 2017-11-26 01:00:00.000
543462 1715 pv 1464116 2017-11-26 01:00:00.000
543462 1715 pv 1464116 2017-11-26 01:00:00.000

������������������������������kafka������������������������������������������������������������������Java������kafka������������������������������������������������������������������������������������Flink Sql������������������������������������������������������������������������������������������������������������������Flink Sql���������������������Join������������������������������������������������������������������������������������������������������������������������������������������������������������Flink ���������

������

pom.xml

1.10.0
2.11
org.apache.flink
flink-table-api-java
${flink.version}
provided
org.apache.flink
flink-table-planner-blink_${scala.binary.version}
${flink.version}
provided
scala-library
org.scala-lang
slf4j-api
org.slf4j
org.apache.flink
flink-json
1.10.0
org.apache.flink
flink-table-planner_${scala.binary.version}
${flink.version}
provided
org.apache.flink
flink-jdbc_2.11
${flink.version}
provided
org.apache.flink
flink-clients_2.11
${flink.version}
provided
javassist
org.javassist
scala-parser-combinators_2.11
org.scala-lang.modules
slf4j-api
org.slf4j
snappy-java
org.xerial.snappy
org.apache.flink
flink-java
${flink.version}
provided
org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}
provided
org.apache.kafka
kafka-clients
0.11.0.3
slf4j-api
org.slf4j
org.apache.flink
flink-connector-kafka-0.11_${scala.binary.version}
${flink.version}
kafka-clients
org.apache.kafka
mysql
mysql-connector-java
5.1.37
org.apache.flink
flink-connector-redis_2.11
1.1.5
force-shading
org.apache.flink
slf4j-api
org.slf4j
com.fasterxml.jackson.core
jackson-core
2.9.5
io.lettuce
lettuce-core
5.0.5.RELEASE
com.alibaba
fastjson
1.2.46
org.apache.flink
flink-table-api-java-bridge_2.11
1.10.0
provided
io.netty
netty-all
4.1.4.Final
org.apache.flink
flink-jdbc_2.11
1.10.0
org.apache.maven.plugins
maven-compiler-plugin
3.8.1
UTF-8
1.8
1.8
org.apache.maven.plugins
maven-shade-plugin
2.4.3
package
shade
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
junit:junit

������������������������������������������������������������������

log4j.xml

���������������resource���������������������������

 
上一篇:大数据集群运维(62)pycharm2020安装破解方法
下一篇:项目实战 从 0 到 1 学习之Flink (27)FlinkSql教程(一)

发表评论

最新留言

逛到本站,mark一下
[***.202.152.39]2025年04月22日 10时32分18秒