一 环境:
spark-2.2.0;hive-1.1.0;scala-2.11.8;hadoop-2.6.0-cdh-5.15.0;jdk-1.8; mongodb-2.4.10;
二.数据情况:
MongoDB数据格式 { "_id" : ObjectId("5ba0569cafc9ec432bd310a3"), "id" : 7, "name" : "7mongoDBi am using mongodb now", "location" : "shanghai", "sex" : "male", "position" : "big data platform engineer" }
Hive普通表create table mgtohive_2(id string,name string,age string,deptno string)row format delimited fields terminated by '\t'; create table mgtohive_2( id int, name string, location string, sex string, position string ) row format delimited fields terminated by '\t';
Hive分区表 create table mg_hive_external( id int, name string, location string, position string ) PARTITIONED BY(sex string) row format delimited fields terminated by '\t';
三.Eclipse+Maven+Java 3.1 依赖:
org.apache.spark spark-sql_2.11 2.2.0 org.apache.spark spark-hive_2.11 2.2.0 org.mongodb mongo-java-driver 3.6.3 org.mongodb.spark mongo-spark-connector_2.11 2.2.2
3.2 代码:
package com.mobanker.mongo2hive.Mongo2Hive;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.hive.HiveContext;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import org.bson.Document;import com.mongodb.spark.MongoSpark;import java.io.File;import java.util.ArrayList;import java.util.List;public class Mongo2Hive { public static void main(String[] args) { //spark 2.x String warehouseLocation = new File("spark-warehouse").getAbsolutePath(); SparkSession spark = SparkSession.builder() .master("local[2]") .appName("SparkReadMgToHive") .config("spark.sql.warehouse.dir", warehouseLocation) .config("spark.mongodb.input.uri", "mongodb://10.40.20.47:27017/test_db.test_table") .enableHiveSupport() .getOrCreate(); JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); // spark 1.x // JavaSparkContext sc = new JavaSparkContext(conf); // sc.addJar("/Users/mac/zhangchun/jar/mongo-spark-connector_2.11-2.2.2.jar"); // sc.addJar("/Users/mac/zhangchun/jar/mongo-java-driver-3.6.3.jar"); // SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkReadMgToHive"); // conf.set("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/test.mgtest"); // conf.set("spark. serializer","org.apache.spark.serializer.KryoSerialzier"); // HiveContext sqlContext = new HiveContext(sc); // //create df from mongo // Datasetdf = MongoSpark.read(sqlContext).load().toDF(); // df.select("id","name","name").show(); String querysql= "select id,name,location,sex,position from mgtohive_2 b"; String opType ="P"; SQLUtils sqlUtils = new SQLUtils(); List
column = sqlUtils.getColumns(querysql); //create rdd from mongo JavaRDD rdd = MongoSpark.load(sc); //将Document转成Object JavaRDD
工具类:
package com.mobanker.mongo2hive.Mongo2Hive;import java.util.ArrayList;import java.util.List;public class SQLUtils { public ListgetColumns(String querysql){ List column = new ArrayList (); String tmp = querysql.substring(querysql.indexOf("select") + 6, querysql.indexOf("from")).trim(); if (tmp.indexOf("*") == -1){ String cols[] = tmp.split(","); for (String c:cols){ column.add(c); } } return column; } public String getTBname(String querysql){ String tmp = querysql.substring(querysql.indexOf("from")+4).trim(); int sx = tmp.indexOf(" "); if(sx == -1){ return tmp; }else { return tmp.substring(0,sx); } }}
四 错误解决办法:
下载cdh集群Hive的hive-site.xml文件,在项目中新建resources文件夹,讲hive-site.xml配置文件放入其中:
五 执行情况:
耗时14mins,写入hive表10398582条数据: