
项目实战 从 0 到 1 学习之Flink (26)Flink采集kafka数据后存到mongodb
发布日期:2021-05-14 00:16:45
浏览次数:11
分类:博客文章
本文共 9940 字,大约阅读时间需要 33 分钟。
������������
import java.io.Serializable;public class FlinkDao implements Serializable { private String id; private String startMoney; private String startTime; private String endMoney; private String endTime; private String total; public FlinkDao(String id, String startMoney, String startTime, String endMoney, String endTime, String total) { this.id = id; this.startMoney = startMoney; this.startTime = startTime; this.endMoney = endMoney; this.endTime = endTime; this.total = total; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getStartMoney() { return startMoney; } public void setStartMoney(String startMoney) { this.startMoney = startMoney; } public String getStartTime() { return startTime; } public void setStartTime(String startTime) { this.startTime = startTime; } public String getEndMoney() { return endMoney; } public void setEndMoney(String endMoney) { this.endMoney = endMoney; } public String getEndTime() { return endTime; } public void setEndTime(String endTime) { this.endTime = endTime; } public String getTotal() { return total; } public void setTotal(String total) { this.total = total; } @Override public String toString() { return "FlinkDao{" + "id='" + id + '\'' + ", startMoney='" + startMoney + '\'' + ", startTime='" + startTime + '\'' + ", endMoney='" + endMoney + '\'' + ", endTime='" + endTime + '\'' + ", total='" + total + '\'' + '}'; }}
mongodb������������
import com.mongodb.MongoCredential;import com.mongodb.ServerAddress;import com.mongodb.MongoClient;import java.util.ArrayList;import java.util.List;public class MongoDBUtil { public static MongoClient getConnect(){ ServerAddress serverAddress = new ServerAddress("localhost", 27017); Listcredential = new ArrayList<>(); //MongoCredential.createScramSha1Credential()��������������������� ��������� ��������������� ������ MongoCredential mongoCredential1 = MongoCredential.createScramSha1Credential("root", "flink", "root".toCharArray()); credential.add(mongoCredential1); //������������������������MongoDB������ MongoClient mongoClient = new MongoClient(serverAddress, credential); return mongoClient; }}
MongoDBSink
import com.mongodb.MongoClient;import com.mongodb.client.MongoCollection;import com.mongodb.client.MongoDatabase;import org.apache.flink.api.java.tuple.Tuple6;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.bson.Document;import java.util.ArrayList;import java.util.List;public class MongoDBSink extends RichSinkFunction> { private static final long serialVersionUID = 1L; MongoClient mongoClient = null; public void invoke(Tuple6 value) { try { if (mongoClient != null) { mongoClient = MongoDBUtil.getConnect(); MongoDatabase db = mongoClient.getDatabase("flink"); MongoCollection collection = db.getCollection("flink"); List list = new ArrayList<>(); Document doc = new Document(); doc.append("id", value.f0); doc.append("startMoney", value.f1); doc.append("startTime", value.f2); doc.append("endMoney", value.f3); doc.append("endTime", value.f4); doc.append("total", value.f5); list.add(doc); System.out.println("Insert Starting"); collection.insertMany(list); } } catch (Exception e) { e.printStackTrace(); } } public void open(Configuration parms) throws Exception { super.open(parms); mongoClient = MongoDBUtil.getConnect(); } public void close() throws Exception { if (mongoClient != null) { mongoClient.close(); } }}
FlinkTest
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.tuple.Tuple6;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;import java.util.Properties;public class FlinkTest { public static void main(String[] args) throws Exception { // flink��� StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // ��������������������������������� env.enableCheckpointing(5000); // ������kafka���zookeeper���ip��������� Properties properties = new Properties(); properties.setProperty("zookeeper.connect", "192.168.203.128:2181"); properties.setProperty("bootstrap.servers", "192.168.203.128:9092"); properties.setProperty("group.id", "test"); //properties.setProperty( "enable.auto.commit", "true"); // ������kafka���zookeeper��������� // ������������������/home/fkf/Documents FlinkKafkaConsumer010consumer = new FlinkKafkaConsumer010 ("fkfTopic", new SimpleStringSchema(), properties); consumer.setStartFromLatest(); // ������������������ //consumer.setStartFromEarliest(); // - ��������������������������� //consumer.setStartFromLatest(); //- ������������������������ //consumer.setStartFromTimestamp(0); // ������������epoch������������������������������ //consumer.setStartFromGroupOffsets(); // ��������������������������������������������������������������� // ������kafka���������������flink���dataStream������// SingleOutputStreamOperator stream = env.addSource(consumer).flatMap(new FlatMapFunction () {// @Override// public void flatMap(String s, Collector collector) {// //String[] split = .split(",");// // FlinkDao flinkDao = new FlinkDao(split[0], split[1], split[2], split[3], split[4], split[5]);// collector.collect(s);//// }// }); DataStreamSource stream= env.addSource(consumer); DataStream map = stream.map(new MapFunction >() { @Override public Tuple6 map(String s) throws Exception { String[] split = s.split(","); return Tuple6.of(split[0], split[1], split[2], split[3], split[4], split[5]); } ; }); map.addSink(new MongoDBSink()); stream.print().setParallelism(1); env.execute("WordCount from kafka data"); }}
pom������
kafka_2.11-0.10.1.0
zookeeper-3.3.6apache-flume-1.9.0flink1.64.0.0 com.flink app 1.0-SNAPSHOT org.apache.flink flink-java 1.6.1 org.apache.flink flink-clients_2.11 1.6.1 org.apache.flink flink-streaming-java_2.11 1.6.1 org.apache.flink flink-connector-kafka-0.10_2.11 1.6.1 org.apache.flink flink-connector-filesystem_2.11 1.6.1 org.apache.flink flink-core 1.6.1 org.slf4j slf4j-log4j12 1.7.25 log4j log4j 1.2.17 com.alibaba fastjson 1.2.51 org.mongodb mongo-java-driver 3.10.1 org.apache.maven.plugins maven-compiler-plugin 3.2 1.8 1.8 org.apache.maven.plugins maven-dependency-plugin copy-dependencies test copy-dependencies target/classes/lib org.apache.maven.plugins maven-jar-plugin true com.tonytaotao.flink.FlinkKafka lib/ .
发表评论
最新留言
留言是一种美德,欢迎回访!
[***.207.175.100]2025年04月18日 21时39分48秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
有道云笔记 同步到我的博客园
2021-05-09
阿里云“网红"运维工程师白金:做一个平凡的圆梦人
2021-05-09
李笑来必读书籍整理
2021-05-09
http头部 Expect
2021-05-09
Hadoop(十六)之使用Combiner优化MapReduce
2021-05-09
《机器学习Python实现_10_06_集成学习_boosting_gbdt分类实现》
2021-05-09
CoreCLR源码探索(八) JIT的工作原理(详解篇)
2021-05-09
IOS开发Swift笔记16-错误处理
2021-05-10
flume使用中的一些常见错误解决办法 (地址已经使用)
2021-05-10
andriod 开发错误记录
2021-05-10
C语言编译错误列表
2021-05-10
看明白这两种情况,才敢说自己懂跨链! | 喵懂区块链24期
2021-05-10
张一鸣:创业7年,我经历的5件事
2021-05-10
SQL基础语法
2021-05-10
git拉取远程指定分支代码
2021-05-10
《web安全入门》(四)前端开发基础Javascript
2021-05-10
python中列表 元组 字典 集合的区别
2021-05-10
python struct 官方文档
2021-05-10