
项目实战 从 0 到 1 学习之Flink(22)读mysql并写入mysql
发布日期:2021-05-14 00:16:42
浏览次数:32
分类:博客文章
本文共 5200 字,大约阅读时间需要 17 分钟。
���Flink������������������connector������������������������������������������������������������������������������������������connector���������������������������������������������������IO(Asynchronous I/O)������������������������������������������������
���������������������������RichSourceFunction������������������������������������������
������mysql���������
package com.my.flink.utils.streaming.mysql; import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.slf4j.Logger;import org.slf4j.LoggerFactory; import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;import com.my.flink.utils.config.ConfigKeys; /** * @Description mysql source * @Author jiangxiaozhi * @Date 2018/10/15 17:05 **/public class JdbcReader extends RichSourceFunction> { private static final Logger logger = LoggerFactory.getLogger(JdbcReader.class); private Connection connection = null; private PreparedStatement ps = null; //������������������������������������������������������ConfigKeys������������������������ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName(ConfigKeys.DRIVER_CLASS());//��������������������� connection = DriverManager.getConnection(ConfigKeys.SOURCE_DRIVER_URL(), ConfigKeys.SOURCE_USER(), ConfigKeys.SOURCE_PASSWORD());//������������ ps = connection.prepareStatement(ConfigKeys.SOURCE_SQL()); } //��������������������������� @Override public void run(SourceContext > ctx) throws Exception { try { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()) { String name = resultSet.getString("nick"); String id = resultSet.getString("user_id"); logger.error("readJDBC name:{}", name); Tuple2 tuple2 = new Tuple2<>(); tuple2.setFields(id,name); ctx.collect(tuple2);//������������������������tuple2���������2������������������������������������������������ } } catch (Exception e) { logger.error("runException:{}", e); } } //��������������������� @Override public void cancel() { try { super.close(); if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } catch (Exception e) { logger.error("runException:{}", e); } }}
������mysql���������
package com.my.flink.utils.streaming.mysql; import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.flink.streaming.api.functions.sink.SinkFunction;import scala.Tuple2; import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import com.my.flink.utils.config.ConfigKeys; /** * @Description mysql sink * @Author jiangxiaozhi * @Date 2018/10/15 18:31 **/public class JdbcWriter extends RichSinkFunction> { private Connection connection; private PreparedStatement preparedStatement; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // ������JDBC������ Class.forName(ConfigKeys.DRIVER_CLASS()); // ��������������������� connection = DriverManager.getConnection(ConfigKeys.SINK_DRIVER_URL(),ConfigKeys.SINK_USER(),ConfigKeys.SINK_PASSWORD());//������mysql��������� preparedStatement = connection.prepareStatement(ConfigKeys.SINK_SQL());//insert sql������������������ super.open(parameters); } @Override public void close() throws Exception { super.close(); if(preparedStatement != null){ preparedStatement.close(); } if(connection != null){ connection.close(); } super.close(); } @Override public void invoke(Tuple1 value, Context context) throws Exception { try { String name = value._1;//������JdbcReader��������������������� preparedStatement.setString(1,name); preparedStatement.executeUpdate(); }catch (Exception e){ e.printStackTrace(); } }}
���������������������������
//scala������ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(5000) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) val dataStream = env.addSource(new JdbcReader())//���������mysql���������������dataStream������������������������������������������ dataStream.addSink(new JdbcWriter())//������mysql env.execute("flink mysql demo")//������������
������������������������������������������������������������������
发表评论
最新留言
逛到本站,mark一下
[***.202.152.39]2025年04月09日 07时13分20秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
上周热点回顾(1.23-1.29)
2021-05-09
centos7一步一步搭建docker jenkins 及自定义访问路径重点讲解
2021-05-09
【Flink】Flink 底层RPC框架分析
2021-05-09
MySQL错误日志(Error Log)
2021-05-09
解决:angularjs radio默认选中失效问题
2021-05-09
windows环境下安装zookeeper(仅本地使用)
2021-05-09
缓冲区溢出实例(一)--Windows
2021-05-09
Python中字符串前添加r ,b, u, f前缀的含义
2021-05-09
Hadoop学习笔记—Yarn
2021-05-09
JSONPath小试牛刀之Snack3
2021-05-09
Jenkins - 部署在Tomcat容器里的Jenkins,提示“反向代理设置有误”
2021-05-09
wxWidgets源码分析(3) - 消息映射表
2021-05-09
wxWidgets源码分析(5) - 窗口管理
2021-05-09
wxWidgets源码分析(7) - 窗口尺寸
2021-05-09
wxWidgets源码分析(8) - MVC架构
2021-05-09
wxWidgets源码分析(9) - wxString
2021-05-09
Mybatis Generator最完整配置详解
2021-05-09
[白话解析] 深入浅出熵的概念 & 决策树之ID3算法
2021-05-09
[梁山好汉说IT] 梁山好汉和抢劫银行
2021-05-09