项目实战 从 0 到 1 学习之Flink (26)Flink采集kafka数据后存到mongodb
发布日期:2021-05-14 00:16:45 浏览次数:11 分类:博客文章

本文共 9940 字,大约阅读时间需要 33 分钟。

������������

import java.io.Serializable;public class FlinkDao implements Serializable  {    private String id;    private String startMoney;    private String startTime;    private String endMoney;    private String endTime;    private String total;    public FlinkDao(String id, String startMoney, String startTime, String endMoney, String endTime, String total) {        this.id = id;        this.startMoney = startMoney;        this.startTime = startTime;        this.endMoney = endMoney;        this.endTime = endTime;        this.total = total;    }    public String getId() {        return id;    }    public void setId(String id) {        this.id = id;    }    public String getStartMoney() {        return startMoney;    }    public void setStartMoney(String startMoney) {        this.startMoney = startMoney;    }    public String getStartTime() {        return startTime;    }    public void setStartTime(String startTime) {        this.startTime = startTime;    }    public String getEndMoney() {        return endMoney;    }    public void setEndMoney(String endMoney) {        this.endMoney = endMoney;    }    public String getEndTime() {        return endTime;    }    public void setEndTime(String endTime) {        this.endTime = endTime;    }    public String getTotal() {        return total;    }    public void setTotal(String total) {        this.total = total;    }    @Override    public String toString() {        return "FlinkDao{" +                "id='" + id + '\'' +                ", startMoney='" + startMoney + '\'' +                ", startTime='" + startTime + '\'' +                ", endMoney='" + endMoney + '\'' +                ", endTime='" + endTime + '\'' +                ", total='" + total + '\'' +                '}';    }}

mongodb������������

import com.mongodb.MongoCredential;import com.mongodb.ServerAddress;import com.mongodb.MongoClient;import java.util.ArrayList;import java.util.List;public class MongoDBUtil   {    public static MongoClient getConnect(){        ServerAddress serverAddress = new ServerAddress("localhost", 27017);        List
credential = new ArrayList<>(); //MongoCredential.createScramSha1Credential()��������������������� ��������� ��������������� ������ MongoCredential mongoCredential1 = MongoCredential.createScramSha1Credential("root", "flink", "root".toCharArray()); credential.add(mongoCredential1); //������������������������MongoDB������ MongoClient mongoClient = new MongoClient(serverAddress, credential); return mongoClient; }}

MongoDBSink

import com.mongodb.MongoClient;import com.mongodb.client.MongoCollection;import com.mongodb.client.MongoDatabase;import org.apache.flink.api.java.tuple.Tuple6;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.bson.Document;import java.util.ArrayList;import java.util.List;public class MongoDBSink extends RichSinkFunction
> { private static final long serialVersionUID = 1L; MongoClient mongoClient = null; public void invoke(Tuple6
value) { try { if (mongoClient != null) { mongoClient = MongoDBUtil.getConnect(); MongoDatabase db = mongoClient.getDatabase("flink"); MongoCollection collection = db.getCollection("flink"); List
list = new ArrayList<>(); Document doc = new Document(); doc.append("id", value.f0); doc.append("startMoney", value.f1); doc.append("startTime", value.f2); doc.append("endMoney", value.f3); doc.append("endTime", value.f4); doc.append("total", value.f5); list.add(doc); System.out.println("Insert Starting"); collection.insertMany(list); } } catch (Exception e) { e.printStackTrace(); } } public void open(Configuration parms) throws Exception { super.open(parms); mongoClient = MongoDBUtil.getConnect(); } public void close() throws Exception { if (mongoClient != null) { mongoClient.close(); } }}

FlinkTest

import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.tuple.Tuple6;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;import java.util.Properties;public class FlinkTest {    public static void main(String[] args) throws Exception {        // flink���        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // ���������������������������������        env.enableCheckpointing(5000);        // ������kafka���zookeeper���ip���������        Properties properties = new Properties();        properties.setProperty("zookeeper.connect", "192.168.203.128:2181");        properties.setProperty("bootstrap.servers", "192.168.203.128:9092");        properties.setProperty("group.id", "test");        //properties.setProperty( "enable.auto.commit", "true");        // ������kafka���zookeeper���������        // ������������������/home/fkf/Documents        FlinkKafkaConsumer010
consumer = new FlinkKafkaConsumer010
("fkfTopic", new SimpleStringSchema(), properties); consumer.setStartFromLatest(); // ������������������ //consumer.setStartFromEarliest(); // - ��������������������������� //consumer.setStartFromLatest(); //- ������������������������ //consumer.setStartFromTimestamp(0); // ������������epoch������������������������������ //consumer.setStartFromGroupOffsets(); // ��������������������������������������������������������������� // ������kafka���������������flink���dataStream������// SingleOutputStreamOperator
stream = env.addSource(consumer).flatMap(new FlatMapFunction
() {// @Override// public void flatMap(String s, Collector
collector) {// //String[] split = .split(",");// // FlinkDao flinkDao = new FlinkDao(split[0], split[1], split[2], split[3], split[4], split[5]);// collector.collect(s);//// }// }); DataStreamSource
stream= env.addSource(consumer); DataStream map = stream.map(new MapFunction
>() { @Override public Tuple6
map(String s) throws Exception { String[] split = s.split(","); return Tuple6.of(split[0], split[1], split[2], split[3], split[4], split[5]); } ; }); map.addSink(new MongoDBSink()); stream.print().setParallelism(1); env.execute("WordCount from kafka data"); }}

pom������

kafka_2.11-0.10.1.0

zookeeper-3.3.6
apache-flume-1.9.0
flink1.6

4.0.0
com.flink
app
1.0-SNAPSHOT
org.apache.flink
flink-java
1.6.1
org.apache.flink
flink-clients_2.11
1.6.1
org.apache.flink
flink-streaming-java_2.11
1.6.1
org.apache.flink
flink-connector-kafka-0.10_2.11
1.6.1
org.apache.flink
flink-connector-filesystem_2.11
1.6.1
org.apache.flink
flink-core
1.6.1
org.slf4j
slf4j-log4j12
1.7.25
log4j
log4j
1.2.17
com.alibaba
fastjson
1.2.51
org.mongodb
mongo-java-driver
3.10.1
org.apache.maven.plugins
maven-compiler-plugin
3.2
1.8
1.8
org.apache.maven.plugins
maven-dependency-plugin
copy-dependencies
test
copy-dependencies
target/classes/lib
org.apache.maven.plugins
maven-jar-plugin
true
com.tonytaotao.flink.FlinkKafka
lib/
.

 

上一篇:项目实战从0到1之Spark(2)Spark读取和存储HDFS上的数据
下一篇:项目实战 从 0 到 1 学习之Flink(25)Flink从redis中获取数据作为source源

发表评论

最新留言

留言是一种美德,欢迎回访!
[***.207.175.100]2025年04月18日 21时39分48秒