项目实战 从 0 到 1 学习之Flink(22)读mysql并写入mysql
发布日期:2021-05-14 00:16:42 浏览次数:32 分类:博客文章

本文共 5200 字,大约阅读时间需要 17 分钟。

���Flink������������������connector������������������������������������������������������������������������������������������connector���������������������������������������������������IO(Asynchronous I/O)������������������������������������������������

���������������������������RichSourceFunction������������������������������������������

������mysql���������

package com.my.flink.utils.streaming.mysql; import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.slf4j.Logger;import org.slf4j.LoggerFactory; import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.ResultSet;import com.my.flink.utils.config.ConfigKeys; /** * @Description mysql source * @Author jiangxiaozhi * @Date 2018/10/15 17:05 **/public class JdbcReader extends RichSourceFunction
> { private static final Logger logger = LoggerFactory.getLogger(JdbcReader.class); private Connection connection = null; private PreparedStatement ps = null; //������������������������������������������������������ConfigKeys������������������������ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName(ConfigKeys.DRIVER_CLASS());//��������������������� connection = DriverManager.getConnection(ConfigKeys.SOURCE_DRIVER_URL(), ConfigKeys.SOURCE_USER(), ConfigKeys.SOURCE_PASSWORD());//������������ ps = connection.prepareStatement(ConfigKeys.SOURCE_SQL()); } //��������������������������� @Override public void run(SourceContext
> ctx) throws Exception { try { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()) { String name = resultSet.getString("nick"); String id = resultSet.getString("user_id"); logger.error("readJDBC name:{}", name); Tuple2
tuple2 = new Tuple2<>(); tuple2.setFields(id,name); ctx.collect(tuple2);//������������������������tuple2���������2������������������������������������������������ } } catch (Exception e) { logger.error("runException:{}", e); } } //��������������������� @Override public void cancel() { try { super.close(); if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } catch (Exception e) { logger.error("runException:{}", e); } }}

 ������mysql���������

package com.my.flink.utils.streaming.mysql; import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.flink.streaming.api.functions.sink.SinkFunction;import scala.Tuple2; import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import com.my.flink.utils.config.ConfigKeys; /** * @Description mysql sink * @Author jiangxiaozhi * @Date 2018/10/15 18:31 **/public class JdbcWriter extends RichSinkFunction
> { private Connection connection; private PreparedStatement preparedStatement; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // ������JDBC������ Class.forName(ConfigKeys.DRIVER_CLASS()); // ��������������������� connection = DriverManager.getConnection(ConfigKeys.SINK_DRIVER_URL(),ConfigKeys.SINK_USER(),ConfigKeys.SINK_PASSWORD());//������mysql��������� preparedStatement = connection.prepareStatement(ConfigKeys.SINK_SQL());//insert sql������������������ super.open(parameters); } @Override public void close() throws Exception { super.close(); if(preparedStatement != null){ preparedStatement.close(); } if(connection != null){ connection.close(); } super.close(); } @Override public void invoke(Tuple1
value, Context context) throws Exception { try { String name = value._1;//������JdbcReader��������������������� preparedStatement.setString(1,name); preparedStatement.executeUpdate(); }catch (Exception e){ e.printStackTrace(); } }}

���������������������������

//scala������  val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment  env.enableCheckpointing(5000)  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)  val dataStream =  env.addSource(new JdbcReader())//���������mysql���������������dataStream������������������������������������������  dataStream.addSink(new JdbcWriter())//������mysql  env.execute("flink mysql demo")//������������

������������������������������������������������������������������ 

上一篇:项目实战 从 0 到 1 学习之Flink (23)Flink 读取hive并写入hive
下一篇:项目实战 从 0 到 1 学习之Flink (21)Flink读HBase并写入HBase

发表评论

最新留言

逛到本站,mark一下
[***.202.152.39]2025年04月09日 07时13分20秒