项目实战 从 0 到 1 学习之Flink (23)Flink 读取hive并写入hive
发布日期:2021-05-14 00:16:43 浏览次数:19 分类:博客文章

本文共 8848 字,大约阅读时间需要 29 分钟。

1���������������������������������������������������������������

HCatInputFormat

HCatInputFormatBase

��������������������������������� ��������������� RichInputFormat���

public abstract class HCatInputFormatBase
extends RichInputFormat
implements ResultTypeQueryabl

������������������jar������������������������

 

������������������������������

org.apache.flink
flink-hadoop-fs
1.6.2
com.jolbox
bonecp
0.8.0.RELEASE
com.twitter
parquet-hive-bundle
1.6.0
org.apache.hive
hive-exec
2.1.0
org.apache.hive
hive-metastore
2.1.0
org.apache.hive
hive-cli
2.1.0
org.apache.hive
hive-common
2.1.0
org.apache.hive
hive-service
2.1.0
org.apache.hive
hive-shims
2.1.0
org.apache.hive.hcatalog
hive-hcatalog-core
2.1.0
org.apache.thrift
libfb303
0.9.3
pom
org.apache.flink
flink-hadoop-compatibility_2.11
1.6.2
org.apache.flink
flink-shaded-hadoop2
1.6.2

������hive���������

package com.coder.flink.core.FlinkHive  import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.hadoop.conf.Configurationimport org.apache.flink.api.scala._  //������hive���������object ReadHive {  def main(args: Array[String]): Unit = {       val conf = new Configuration()      conf.set("hive.metastore.local", "false")       conf.set("hive.metastore.uris", "thrift://172.10.4.141:9083")       //������������������ ������������nameserver//      conf.set("hive.metastore.uris", "thrift://172.10.4.142:9083")       val env = ExecutionEnvironment.getExecutionEnvironment       //todo ������������      val dataset: DataSet[TamAlert] = env.createInput(new HCatInputFormat[TamAlert]("aijiami", "test", conf))       dataset.first(10).print()//      env.execute("flink hive test")    } }

������������ Flink 1.9���������Hive���������������������������������Hive Jdbc������������������hive������������������������������������

org.apache.hive
hive-jdbc
2.1.0
package com.coder.flink.core.FlinkHive; import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.Statement; import java.sql.*; public class FlinkReadHive {    public static void main(String[] args) throws ClassNotFoundException, SQLException {         Class.forName("org.apache.hive.jdbc.HiveDriver");        Connection con = DriverManager.getConnection("jdbc:hive2://172.10.4.143:10000/aijiami","hive","hive");        Statement st = con.createStatement();        ResultSet rs = st.executeQuery("SELECT * from ods_scenes_detail_new limit 10");        while (rs.next()){            System.out.println(rs.getString(1) + "," + rs.getString(2));        }        rs.close();        st.close();        con.close();      }}
public class HiveApp {         private static String driver = "org.apache.hive.jdbc.HiveDriver";    private static String url = "jdbc:hive2://Master:10000/default";    private static String user = "root"; //���������������������������������������������������������������root���������������Hive������������������������������root    private static String password = "";     public static void main(String[] args) {        ResultSet res = null;                 try {            /**             * ���������������JDBC���������������������������������������             */            Class.forName(driver);                         /**             * ������������������JDBC���������Hive������������������������������10000������������������������������������             */            Connection conn = DriverManager.getConnection(url, user, password);                          /**             * ������������������Statement������������������������������SQL������������������             */            Statement stmt = conn.createStatement();                         /**             * ���������������SQL������������������             * ���4.1���������������Table,���������������������������������������������             */            String tableName = "testHiveDriverTable";            stmt.execute("drop table if exists " + tableName );                                     stmt.execute("create table " + tableName + " (id int, name string)" + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'");            /**             *  ���4.2������������������������Table���             */            String sql = "show tables '" + tableName + "'";            System.out.println("Running: " + sql);            res = stmt.executeQuery(sql);            if (res.next()) {              System.out.println(res.getString(1));            }            /**             *  ���4.3������������������������Table���schema���             */            sql = "describe " + tableName;            System.out.println("Running: " + sql);            res = stmt.executeQuery(sql);            while (res.next()) {              System.out.println(res.getString(1) + "\t" + res.getString(2));            }                      /**             *  ���4.4���������������������������Hive������Table���             */            String filepath = "/root/Documents/data/sql/testHiveDriver.txt";            sql = "load data local inpath '" + filepath + "' into table " + tableName;            System.out.println("Running: " + sql);            stmt.execute(sql);                      /**             *  ���4.5���������������������Hive������Table������������             */            sql = "select * from " + tableName;            System.out.println("Running: " + sql);            res = stmt.executeQuery(sql);            while (res.next()) {              System.out.println(String.valueOf(res.getInt(1)) + "\t" + res.getString(2));            }                      /**             *  ���4.6���������Hive���������Table���������������������             */            sql = "select count(1) from " + tableName;   //���������select count(*) ���������������mapreduce ������  ������������������������������������ yarn  ��� start-yarn.sh             System.out.println("Running: " + sql);            res = stmt.executeQuery(sql);                       while (res.next()) {              System.out.println("Total lines :" + res.getString(1));            }                         } catch (Exception e) {            // TODO Auto-generated catch block            e.printStackTrace();        }                          } }

������HDFS������������������

package com.coder.flink.core.test_demo import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}import org.apache.flink.core.fs.FileSystem.WriteMode object WriteToHDFS {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    //2.������������ stu(age,name,height)    val stu: DataSet[(Int, String, String)] = env.fromElements(      (19, "zhangsan","aaaa"),      (1449, "zhangsan","aaaa"),      (33, "zhangsan","aaaa"),      (22, "zhangsan","aaaa")    )     //todo ���������������    stu.setParallelism(1).writeAsText("file:///C:/Users/Administrator/Desktop/Flink������/������������/test001.txt",      WriteMode.OVERWRITE)    env.execute()      //todo ���������hdfs���������������,���������������������������������������    stu.setParallelism(1).writeAsText("hdfs:///output/flink/datasink/test001.txt",      WriteMode.OVERWRITE)    env.execute()     //todo ���������hdfs���CSV������    //3.1������csv������    val inPath = "hdfs:///input/flink/sales.csv"    case class Sales(transactionId: String, customerId: Int, itemId: Int, amountPaid: Double)    val ds2 = env.readCsvFile[Sales](      filePath = inPath,      lineDelimiter = "\n",      fieldDelimiter = ",",      lenient = false,      ignoreFirstLine = true,      includedFields = Array(0, 1, 2, 3),      pojoFields = Array("transactionId", "customerId", "itemId", "amountPaid")    )    //3.2���CSV���������������hdfs    val outPath = "hdfs:///output/flink/datasink/sales.csv"    ds2.setParallelism(1).writeAsCsv(filePath = outPath, rowDelimiter = "\n",fieldDelimiter = "|", WriteMode.OVERWRITE)     env.execute()  }}

 

上一篇:项目实战从 0 到 1 学习之Flink (24)Flink将kafka的数据存到redis中
下一篇:项目实战 从 0 到 1 学习之Flink(22)读mysql并写入mysql

发表评论

最新留言

逛到本站,mark一下
[***.202.152.39]2025年04月15日 09时54分57秒