
项目实战 从 0 到 1 学习之Flink (29)UDF实现
发布日期:2021-05-14 00:18:23
浏览次数:18
分类:博客文章
本文共 60411 字,大约阅读时间需要 201 分钟。
1、pom.xml
4.0.0 org.example FlinkUdf 1.0-SNAPSHOT test http://www.example.com UTF-8 1.7 1.7 1.11.1 2.11 2.11.0 3.0.0 3.0.0 2.3.0 3.0.0 3.0.0 org.scala-lang scala-library ${scala.version} org.apache.flink flink-table-api-java-bridge_2.11 ${flink.version} org.apache.flink flink-table-api-java ${flink.version} org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} scala-library org.scala-lang slf4j-api org.slf4j org.apache.flink flink-table-planner_${scala.binary.version} ${flink.version} org.apache.flink flink-table-common ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} com.alibaba fastjson 1.2.58 javassist org.javassist scala-parser-combinators_2.11 org.scala-lang.modules slf4j-api org.slf4j snappy-java org.xerial.snappy org.apache.flink flink-java ${flink.version} org.apache.kafka kafka-clients 0.11.0.3 slf4j-api org.slf4j org.apache.flink flink-connector-kafka-0.11_${scala.binary.version} ${flink.version} kafka-clients org.apache.kafka org.apache.flink flink-csv ${flink.version} org.apache.flink flink-json 1.10.0 org.apache.flink flink-hbase_2.12 1.10.1 org.apache.flink flink-jdbc_2.12 1.10.2 mysql mysql-connector-java 5.1.37 org.apache.flink flink-connector-redis_2.11 1.1.5 force-shading org.apache.flink slf4j-api org.slf4j com.fasterxml.jackson.core jackson-core 2.9.5 io.lettuce lettuce-core 5.0.5.RELEASE io.netty netty-all 4.1.4.Final redis.clients jedis ${jedis.version} org.slf4j slf4j-log4j12 1.7.7 runtime log4j log4j 1.2.17 runtime net.sf.json-lib json-lib 2.4 jdk15 junit junit 4.12 org.apache.hadoop hadoop-client ${hadoop.version} org.apache.hadoop hadoop-common ${hadoop.version} org.apache.hadoop hadoop-hdfs ${hadoop.version} org.apache.hadoop hadoop-mapreduce-client-common ${hadoop.version} org.apache.hadoop hadoop-mapreduce-client-core ${hadoop.version} org.apache.hive hive-jdbc ${hive.version} org.apache.hive hive-exec ${hive.version} org.apache.hbase hbase-server ${hbase.version} org.apache.hbase hbase-client ${hbase.version} org.apache.spark spark-core_2.12 ${spark.version} provided org.apache.spark spark-streaming_2.12 ${spark.version} provided org.apache.spark spark-sql_2.12 ${spark.version} provided org.apache.spark spark-mllib_2.12 ${spark.version} provided org.apache.spark spark-hive_2.12 ${spark.version} apache.snapshots Apache Development Snapshot Repository https://repository.apache.org/content/repositories/snapshots/ false true maven-clean-plugin 3.1.0 maven-resources-plugin 3.0.2 maven-compiler-plugin 3.8.0 maven-surefire-plugin 2.22.1 maven-jar-plugin 3.0.2 maven-install-plugin 2.5.2 maven-deploy-plugin 2.8.2 maven-site-plugin 3.7.1 maven-project-info-reports-plugin 3.0.0 org.apache.maven.plugins maven-compiler-plugin 8 8
2、数组转字符串
1、方法①
package Udf;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;//方法1:数组按照指定的分隔符转成字符串public class ArrToString extends ScalarFunction { private static final long serialVersionUID=1L; public ArrToString() { } @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String eval(String[] bodyRow, String split) { try { if(bodyRow !=null && bodyRow.length>0) { StringBuilder stringBuilder=new StringBuilder(); for(int i=0;i
2、方法②
package Udf;import org.apache.commons.lang3.ArrayUtils;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;//方法3:数组按照指定的分隔符转成字符串public class ArrToString2 extends ScalarFunction { private static final long serialVersionUID=1L; public ArrToString2() { } @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String eval(String[] arr, String split) { try { if(arr !=null && arr.length>0) { return ArrayUtils.toString(arr,split); } else { return null; } } catch(Exception ex) { return ex.getMessage(); } }}
3、方法③
package Udf;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.Arrays;//方法4:数组转成数组字符串public class ArrToString3 extends ScalarFunction { private static final long serialVersionUID=1L; public ArrToString3() { } @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String eval(String[] arr) { try { if(arr !=null && arr.length>0) { return Arrays.toString(arr); } else { return null; } } catch(Exception ex) { return ex.getMessage(); } }}
3、字符串转数组
package Udf;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;//方法1:字符串按照指定的分隔符转成数组public class StringToArr extends ScalarFunction { private static final long serialVersionUID=1L; public StringToArr() { } @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String[] eval(String arr, String split) { try { if(arr !=null) { return arr.split(split); } else { return null; } } catch(Exception ex) { return null; } }}
4、字符串转map
①
package Udf;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.HashMap;import java.util.Map;//方法1:字符串按照指定的分隔符转成map类型public class StringToMap extends ScalarFunction { private static final long serialVersionUID=1L; public StringToMap() { } @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public Map eval(String str, String split, String split1) { try { if(str !=null) { String [] arr=str.split(split); Mapmap = new HashMap<>(); for (int i=0;i
②
import org.junit.Test;import java.util.HashMap;import java.util.Map; public class StringUtilsTest { @Test public void testDataToMap() { String data = "certificatetype=0&certificateno=220182&depositacct=622848"; Map map = new HashMap(); if (null != data) { String[] param = data.split("&"); for (int i = 0; i < param.length; i++) { int index = param[i].indexOf('='); map.put(param[i].substring(0,index), param[i].substring((index + 1))); } } System.out.println(map); System.out.println("----------------分割线---------------"); Map result = new HashMap(); String[] params = data.split("\\&"); for (String entry : params) { if (entry.contains("=")) { String[] sub = entry.split("\\="); if (sub.length > 1) { result.put(sub[0], sub[1]); } else { result.put(sub[0], ""); } } } System.out.println(result); } }String str1 = "d3fe1e186e41475ea965f4722f5488a8"; String str2 = "5093"; String str3 = "公共设施"; String str = str1 + "\1" + str2 + "\1" + str3; System.out.println(str); // 也就是下面的字符串,分隔符为 \u0001 str = "d3fe1e186e41475ea965f4722f5488a8\u00015093\u0001公共设施"; String[] split = str.split("\1"); for (String s : split) { System.out.println(s); } System.out.println(split.length);
③
package Flink.Udf;import org.apache.commons.lang.StringUtils;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.LinkedHashMap;import java.util.Map;/** * mapStr转Map */public class MapStrToMap extends ScalarFunction { private static final long serialVersionUID = 1L; private static final Logger logger = LoggerFactory.getLogger(MapStrToMap.class); @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String eval(String data,String spt1,String spt2) { Mapmap = new LinkedHashMap<>(); try { if (StringUtils.isEmpty(data)) { return ""; } else { String[] split = data.split(spt1); for (String s : split) { String[] s1 = s.split(spt2); if (s1.length > 1) { map.put(s1[0], s1[1]); } else { map.put(s1[0], null); } } } } catch (Exception e) { logger.error("MapStr to Json Error!,mapStr={}", e); } return map.toString(); }}
5、map转字符串
package Udf;import org.apache.flink.table.functions.ScalarFunction;import org.apache.flink.table.functions.TableFunction;import java.util.Arrays;import java.util.Map;import java.util.Set;//方法1:map按照指定的分隔符转成字符串public class MapToString extends ScalarFunction { private static final long serialVersionUID=1L; public MapToString() { } public String eval(Mapmap,String split,String split1) { try { Set keySet = map.keySet(); //将set集合转换为数组 String[] keyArray = keySet. toArray(new String[keySet.size()]); //给数组排序(升序) Arrays.sort(keyArray); //因为String拼接效率会很低的,所以转用StringBuilder StringBuilder sb = new StringBuilder(); for (int i = 0; i < keyArray.length; i++) { // 参数值为空,则不参与签名 这个方法trim()是去空格 if ((String.valueOf(map.get(keyArray[i]))).trim().length() > 0) { sb.append(keyArray[i]).append(split1).append(String.valueOf(map.get(keyArray[i])).trim()); } if(i != keyArray.length-1){ sb.append(split); } } return sb.toString(); } catch(Exception ex) { ex.printStackTrace(); return null; } }}
6、map转Json
①
package Flink.Udf;import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang.StringUtils;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.LinkedHashMap;import java.util.Map;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * mapStr转Json */public class MapStrToJson extends ScalarFunction { private static final long serialVersionUID = 1L; private static final Logger logger = LoggerFactory.getLogger(MapStrToJson.class); @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String eval(String data,String spt1,String spt2) { Mapmap = new LinkedHashMap<>(); try { if (StringUtils.isEmpty(data)) { return ""; } else { String[] split = data.split(spt1); for (String s : split) { String[] s1 = s.split(spt2); if (s1.length > 1) { map.put(s1[0], s1[1]); } else { map.put(s1[0], null); } } } } catch (Exception e) { logger.error("MapStr to Json Error!,mapStr={}", e); } return JSONObject.toJSONString(map); }}
②
package com.oppo.dc.ostream.udf.common;import com.alibaba.fastjson.JSON;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.Map;public classMapToJSONString extends ScalarFunction { private static final long serialVersionUID = 1L; @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String eval(Map map) { if (map == null) { return null; } String jsonStr = JSON.toJSONString(map); if (jsonStr.length() > 5000) { jsonStr = jsonStr.substring(0, 5000); } return jsonStr; }}
③
package com.oppo.dc.ostream.udf.common;import com.alibaba.fastjson.JSON;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.Map;/** * Author: Fisher Xiang * Date: 2019-10-28 15:21 * * @description:MapToJSONStr * @version: 1.0.0 */public class MapToJSONStr extends ScalarFunction { @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String eval(Map map) { if (map == null) { return null; } if (map.containsKey("data")) { map.put("data", ((String) map.get("data")).replace("\"%2C\"", "\",\"")); } String jsonStr = JSON.toJSONString(map); /*if (jsonStr.length() > 5000) { jsonStr = jsonStr.substring(0, 5000); }*/ return jsonStr; }}
④map字符串转json(按照hash进行排序)
package FlinkDW.Udf;import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang.StringUtils;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.HashMap;import java.util.Map;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * mapStr转Json */public class MapStrToJson extends ScalarFunction{ private static final long serialVersionUID = 1 L; private static final Logger logger = LoggerFactory.getLogger(MapStrToJson.class);@ Override public void open(FunctionContext context) throws Exception { super.open(context); }@ Override public void close() throws Exception { super.close(); } public String eval(String data, String spt1, String spt2) { Map < String, String > map = new HashMap < > (); try { if(StringUtils.isEmpty(data)) { return ""; } else { String[] split = data.split(spt1); for(String s: split) { String[] s1 = s.split(spt2); if(s1.length > 1) { map.put(s1[0], s1[1]); } else { map.put(s1[0], "null"); } } } } catch(Exception e) { logger.error("MapStr to Json Error!,mapStr={}", data, e); } return JSONObject.toJSONString(map); }}
⑤优化(固定顺序)
package Flink.Udf;import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang.StringUtils;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.LinkedHashMap;import java.util.Map;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * mapStr转Json */public class MapStrToJson extends ScalarFunction{ private static final long serialVersionUID = 1 L; private static final Logger logger = LoggerFactory.getLogger(MapStrToJson.class);@ Override public void open(FunctionContext context) throws Exception { super.open(context); }@ Override public void close() throws Exception { super.close(); } public String eval(String data, String spt1, String spt2) { Map < String, String > map = new LinkedHashMap < > (); try { if(StringUtils.isEmpty(data)) { return ""; } else { String[] split = data.split(spt1); for(String s: split) { String[] s1 = s.split(spt2); if(s1.length > 1) { map.put(s1[0], s1[1]); } else { map.put(s1[0], null); } } } } catch(Exception e) { logger.error("MapStr to Json Error!,mapStr={}", e); } return JSONObject.toJSONString(map); }}
测试
package Flink.Udf;import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang.StringUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.*;public class Test { private static final Logger logger = LoggerFactory.getLogger(Test.class); /** * @Description: 1、map字符串转json * @Param: [data, spt1, spt2] * @return: java.lang.String * @Author: BigData * @Date: 2020/12/3 */ public static String testDataToJson(String data,String spt1,String spt2) { Mapmap = new LinkedHashMap<>(); try { if (StringUtils.isEmpty(data)) { return ""; } else { String[] split = data.split(spt1); for (String s : split) { String[] s1 = s.split(spt2); if (s1.length > 1) { map.put(s1[0], s1[1]); } else { map.put(s1[0], "null"); } } } } catch (Exception e) { logger.error("MapStr to Json Error!,mapStr={}", data, e); } return JSONObject.toJSONString(map); } /** * @Description: 2、map字符串转map格式 * @Param: [data, spt1, spt2] * @return: java.lang.String * @Author: BigData * @Date: 2020/12/3 */ public static String testDataToMap(String data,String spt1,String spt2) { Map map = new LinkedHashMap<>(); try { if (data instanceof String) { if (StringUtils.isEmpty(data) || data.length() <= 2) { return null; } else { String[] split = data.split(spt1); for (String s : split) { String[] s1=s.split(spt2); if(s1.length>1) { map.put(s1[0], s1[1]); } else { map.put(s1[0],"null"); } } } } else if(data!=null || data.length()>2) { return null; } } catch (Exception e) { e.getMessage(); } return map.toString(); } public static String getType(Object o){ //获取变量类型方法 return o.getClass().toString(); } public static void main(String[] args) { String data1 = "certificatetype=0&certificateno=220182&depositacct=622848&name"; System.out.println(testDataToJson(data1,"&", "=")); String str="k1:,k2:V2"; System.out.println(testDataToJson(str, ",", ":")); String str1="stayTime\u00020\u0001firstScreenTime\u00020\u0001apiName\u0002stringValue\u0001apiParam\u0002id\u003d44\u0001methodType\u0002GET\u0001loadingTime\u0002453\u0001websiteId\u0002stringValue\u0001pagePath\u0002/stringValue\u0001pageName\u0002/value\u0001sessionKey\u0002eyJhbX0AwqPisoE6V8A\u0001userName\u000280254861\u0001ipAddress\u0002XX.XX.XCX.XXX\u0001uploadTime\u00022020-11-23 13:07:13\u0001receiveTime\u00022020-11-26 11:08:55\u0001eventCode\u00025"; System.out.println(testDataToMap(str1,"\u0001", "\u0002")); System.out.println(testDataToJson(str1,"\u0001", "\u0002")); String data ="client_timestamp\u00011587375750618\u0002apply_value\u00013\u0002pre_page_id\u0001DraftActivity\u0002close_value\u00010\u0002item_id\u0001no_apply\u0002start_timestamp\u00011587375592167\u0002template_id_auto\u0001-1\u0002music_name\u0001少年2\u0002play_cnt\u00010\u0002duration_value\u00015506\u0002video_id\u00011587375633000\u0002is_story\u0001false"; System.out.println(testDataToJson(data,"\u0002", "\u0001")); String str2="ts=1529995388&channel_id=164292&program_id=9081951&play_duration=20&position=10&os=iOS&os_version=4.3&app_name=aliyun&app_version=1.0&device_model=Samsumg_br_FM__samsung_SCH-N719"; System.out.println(testDataToJson(str2,"&","=")); System.out.println(testDataToMap(str2,"&","=")); String logMap ="websiteId\u0002ota-recruit-TEST_ENV\u0001pagePath\u0002/configuration/index\u0001pageName\u0002????\u0001sessionKey\u0002sessionKey\u0001userName\u00020\u0001ipAddress\u0002172.17.75.8\u0001uploadTime\u00022020-12-09 15:10:13\u0001receiveTime\u00022020-12-09 15:10:20\u0001eventCode\u00021"; System.out.println(testDataToJson(logMap,"\u0001", "\u0002")); System.out.println(testDataToMap(logMap, "\u0001", "\u0002")); }}
测试结果
"C:\Program Files\Java\jdk1.8.0_221\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=54638:C:\Program Files\JetBrains\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\defaultuser0\AppData\Local\Temp\classpath1090826265.jar Flink.Udf.Test
SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/C:/Users/defaultuser0/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/C:/Users/defaultuser0/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.10.0/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]{"certificatetype":"0","certificateno":"220182","depositacct":"622848","name":"null"}{"k1":"null","k2":"V2"}{stayTime=0, firstScreenTime=0, apiName=stringValue, apiParam=id=44, methodType=GET, loadingTime=453, websiteId=stringValue, pagePath=/stringValue, pageName=/value, sessionKey=eyJhbX0AwqPisoE6V8A, userName=80254861, ipAddress=XX.XX.XCX.XXX, uploadTime=2020-11-23 13:07:13, receiveTime=2020-11-26 11:08:55, eventCode=5}{"stayTime":"0","firstScreenTime":"0","apiName":"stringValue","apiParam":"id=44","methodType":"GET","loadingTime":"453","websiteId":"stringValue","pagePath":"/stringValue","pageName":"/value","sessionKey":"eyJhbX0AwqPisoE6V8A","userName":"80254861","ipAddress":"XX.XX.XCX.XXX","uploadTime":"2020-11-23 13:07:13","receiveTime":"2020-11-26 11:08:55","eventCode":"5"}{"client_timestamp":"1587375750618","apply_value":"3","pre_page_id":"DraftActivity","close_value":"0","item_id":"no_apply","start_timestamp":"1587375592167","template_id_auto":"-1","music_name":"少年2","play_cnt":"0","duration_value":"5506","video_id":"1587375633000","is_story":"false"}{"ts":"1529995388","channel_id":"164292","program_id":"9081951","play_duration":"20","position":"10","os":"iOS","os_version":"4.3","app_name":"aliyun","app_version":"1.0","device_model":"Samsumg_br_FM__samsung_SCH-N719"}{ts=1529995388, channel_id=164292, program_id=9081951, play_duration=20, position=10, os=iOS, os_version=4.3, app_name=aliyun, app_version=1.0, device_model=Samsumg_br_FM__samsung_SCH-N719}{"websiteId":"ota-recruit-TEST_ENV","pagePath":"/configuration/index","pageName":"????","sessionKey":"sessionKey","userName":"0","ipAddress":"172.17.75.8","uploadTime":"2020-12-09 15:10:13","receiveTime":"2020-12-09 15:10:20","eventCode":"1"}{websiteId=ota-recruit-TEST_ENV, pagePath=/configuration/index, pageName=????, sessionKey=sessionKey, userName=0, ipAddress=172.17.75.8, uploadTime=2020-12-09 15:10:13, receiveTime=2020-12-09 15:10:20, eventCode=1}Process finished with exit code 0
测试2
package FlinkDW.Udf;import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang.StringUtils;import java.util.*;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class Test { public static String testDataToMap(String data,String spt1,String spt2) { Mapmap = new HashMap (); try { if (data instanceof String) { if (StringUtils.isEmpty(data) || data.length() <= 2) { return data; } else { String[] split = data.split(spt1); for (String s : split) { String[] s1=s.split(spt2); if(s1.length>1) { map.put(s1[0], s1[1]); } else { map.put(s1[0],"null"); } } } } else if(data!=null || data.length()>2) { return "please input type of string "+getType(data); } } catch (Exception e) { e.getMessage(); } return JSONObject.toJSONString(map); } public static String getType(Object o){ //获取变量类型方法 return o.getClass().toString(); } public static void main(String[] args) { String data1 = "certificatetype=0&certificateno=220182&depositacct=622848&name"; String data ="client_timestamp\u00011587375750618\u0002apply_value\u00013\u0002pre_page_id\u0001DraftActivity\u0002close_value\u00010\u0002item_id\u0001no_apply\u0002start_timestamp\u00011587375592167\u0002template_id_auto\u0001-1\u0002music_name\u0001少年2\u0002play_cnt\u00010\u0002duration_value\u00015506\u0002video_id\u00011587375633000\u0002is_story\u0001false"; System.out.println(testDataToMap(data,"\u0002", "\u0001")); System.out.println(testDataToMap(data1,"&", "=")); String str="k1:,k2:V2"; System.out.println(testDataToMap(str, ",", ":")); }}
结果:
"C:\Program Files\Java\jdk1.8.0_221\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=65426:C:\Program Files\JetBrains\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\defaultuser0\AppData\Local\Temp\classpath1898420598.jar FlinkDW.Udf.Test
{"client_timestamp":"1587375750618","apply_value":"3","pre_page_id":"DraftActivity","close_value":"0","item_id":"no_apply","start_timestamp":"1587375592167","template_id_auto":"-1","music_name":"少年2","play_cnt":"0","duration_value":"5506","video_id":"1587375633000","is_story":"false"}{"name":"null","certificatetype":"0","certificateno":"220182","depositacct":"622848"}{"k1":"null","k2":"V2"}Process finished with exit code 0
7、json转map
package Udf;import com.alibaba.fastjson.JSONObject;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.Map;//方法1:json转成mappublic class JsonToMap extends ScalarFunction { private static final long serialVersionUID=1L; public JsonToMap() { } @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public Map eval(String str) { try { if(str !=null) { JSONObject jsonObject = JSONObject.parseObject(str); Mapmap = jsonObject; return map; } else { return null; } } catch(Exception ex) { ex.printStackTrace(); return null; } }}
注意:FastJson的json转map的6种方式
package Udf;import java.util.Map;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;public class Test1 { public static void main(String[] args){ String str = "{\"0\":\"zhangsan\",\"1\":\"lisi\",\"2\":\"wangwu\",\"3\":\"maliu\"}"; //第一种方式 Map maps = (Map) JSON.parse(str); System.out.println("这个是用JSON类来解析JSON字符串!!!"); for (Object map : maps.entrySet()){ System.out.println(((Map.Entry)map).getKey()+" " + ((Map.Entry)map).getValue()); } //第二种方式 Map mapTypes = JSON.parseObject(str); System.out.println("这个是用JSON类的parseObject来解析JSON字符串!!!"); for (Object obj : mapTypes.keySet()){ System.out.println("key为:"+obj+"值为:"+mapTypes.get(obj)); } //第三种方式 Map mapType = JSON.parseObject(str,Map.class); System.out.println("这个是用JSON类,指定解析类型,来解析JSON字符串!!!"); for (Object obj : mapType.keySet()){ System.out.println("key为:"+obj+"值为:"+mapType.get(obj)); } //第四种方式 /** * JSONObject是Map接口的一个实现类 */ Map json = (Map) JSONObject.parse(str); System.out.println("这个是用JSONObject类的parse方法来解析JSON字符串!!!"); for (Object map : json.entrySet()){ System.out.println(((Map.Entry)map).getKey()+" "+((Map.Entry)map).getValue()); } //第五种方式 /** * JSONObject是Map接口的一个实现类 */ JSONObject jsonObject = JSONObject.parseObject(str); System.out.println("这个是用JSONObject的parseObject方法来解析JSON字符串!!!"); for (Object map : json.entrySet()){ System.out.println(((Map.Entry)map).getKey()+" "+((Map.Entry)map).getValue()); } //第六种方式 /** * JSONObject是Map接口的一个实现类 */ Map mapObj = JSONObject.parseObject(str,Map.class); System.out.println("这个是用JSONObject的parseObject方法并执行返回类型来解析JSON字符串!!!"); for (Object map: json.entrySet()){ System.out.println(((Map.Entry)map).getKey()+" "+((Map.Entry)map).getValue()); } String strArr = "{{\"0\":\"zhangsan\",\"1\":\"lisi\",\"2\":\"wangwu\",\"3\":\"maliu\"}," + "{\"00\":\"zhangsan\",\"11\":\"lisi\",\"22\":\"wangwu\",\"33\":\"maliu\"}}"; // JSONArray.parse() System.out.println(json); }}
结果:
"C:\Program Files\Java\jdk1.8.0_221\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=55064:C:\Program Files\JetBrains\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_221\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\rt.jar;C:\app\FlinkUdf\target\test-classes;C:\Users\defaultuser0\.m2\repository\junit\junit\4.11\junit-4.11.jar;C:\Users\defaultuser0\.m2\repository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-table-api-java-bridge_2.11\1.11.1\flink-table-api-java-bridge_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-table-api-java\1.11.1\flink-table-api-java-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-java\1.11.1\flink-java-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;C:\Users\defaultuser0\.m2\repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-streaming-java_2.11\1.11.1\flink-streaming-java_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-runtime_2.11\1.11.1\flink-runtime_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-queryable-state-client-java\1.11.1\flink-queryable-state-client-java-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-hadoop-fs\1.11.1\flink-hadoop-fs-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.39.Final-11.0\flink-shaded-netty-4.1.39.Final-11.0.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-shaded-jackson\2.10.1-11.0\flink-shaded-jackson-2.10.1-11.0.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-shaded-zookeeper-3\3.4.14-11.0\flink-shaded-zookeeper-3-3.4.14-11.0.jar;C:\Users\defaultuser0\.m2\repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;C:\Users\defaultuser0\.m2\repository\com\typesafe\akka\akka-actor_2.11\2.5.21\akka-actor_2.11-2.5.21.jar;C:\Users\defaultuser0\.m2\repository\com\typesafe\config\1.3.3\config-1.3.3.jar;C:\Users\defaultuser0\.m2\repository\org\scala-lang\modules\scala-java8-compat_2.11\0.7.0\scala-java8-compat_2.11-0.7.0.jar;C:\Users\defaultuser0\.m2\repository\com\typesafe\akka\akka-stream_2.11\2.5.21\akka-stream_2.11-2.5.21.jar;C:\Users\defaultuser0\.m2\repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;C:\Users\defaultuser0\.m2\repository\com\typesafe\ssl-config-core_2.11\0.3.7\ssl-config-core_2.11-0.3.7.jar;C:\Users\defaultuser0\.m2\repository\com\typesafe\akka\akka-protobuf_2.11\2.5.21\akka-protobuf_2.11-2.5.21.jar;C:\Users\defaultuser0\.m2\repository\com\typesafe\akka\akka-slf4j_2.11\2.5.21\akka-slf4j_2.11-2.5.21.jar;C:\Users\defaultuser0\.m2\repository\org\clapper\grizzled-slf4j_2.11\1.3.2\grizzled-slf4j_2.11-1.3.2.jar;C:\Users\defaultuser0\.m2\repository\com\github\scopt\scopt_2.11\3.5.0\scopt_2.11-3.5.0.jar;C:\Users\defaultuser0\.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\defaultuser0\.m2\repository\com\twitter\chill_2.11\0.7.6\chill_2.11-0.7.6.jar;C:\Users\defaultuser0\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;C:\Users\defaultuser0\.m2\repository\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-shaded-guava\18.0-11.0\flink-shaded-guava-18.0-11.0.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-streaming-java_2.11\1.11.1\flink-streaming-java_2.11-1.11.1-tests.jar;C:\Users\defaultuser0\.m2\repository\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;C:\Users\defaultuser0\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\force-shading\1.11.1\force-shading-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-table-planner-blink_2.11\1.11.1\flink-table-planner-blink_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-table-api-scala_2.11\1.11.1\flink-table-api-scala_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-table-api-scala-bridge_2.11\1.11.1\flink-table-api-scala-bridge_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-table-runtime-blink_2.11\1.11.1\flink-table-runtime-blink_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\codehaus\janino\janino\3.0.9\janino-3.0.9.jar;C:\Users\defaultuser0\.m2\repository\org\codehaus\janino\commons-compiler\3.0.9\commons-compiler-3.0.9.jar;C:\Users\defaultuser0\.m2\repository\org\apache\calcite\avatica\avatica-core\1.16.0\avatica-core-1.16.0.jar;C:\Users\defaultuser0\.m2\repository\org\reflections\reflections\0.9.10\reflections-0.9.10.jar;C:\Users\defaultuser0\.m2\repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-streaming-scala_2.11\1.11.1\flink-streaming-scala_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-scala_2.11\1.11.1\flink-scala_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;C:\Users\defaultuser0\.m2\repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;C:\Users\defaultuser0\.m2\repository\org\scala-lang\scala-compiler\2.11.12\scala-compiler-2.11.12.jar;C:\Users\defaultuser0\.m2\repository\org\scala-lang\modules\scala-xml_2.11\1.0.5\scala-xml_2.11-1.0.5.jar;C:\Users\defaultuser0\.m2\repository\org\scala-lang\modules\scala-parser-combinators_2.11\1.0.4\scala-parser-combinators_2.11-1.0.4.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-table-common\1.11.1\flink-table-common-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-core\1.11.1\flink-core-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-annotations\1.11.1\flink-annotations-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-metrics-core\1.11.1\flink-metrics-core-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;C:\Users\defaultuser0\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;C:\Users\defaultuser0\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\commons\commons-compress\1.20\commons-compress-1.20.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-shaded-asm-7\7.1-11.0\flink-shaded-asm-7-7.1-11.0.jar;C:\Users\defaultuser0\.m2\repository\com\alibaba\fastjson\1.2.73\fastjson-1.2.73.jar;C:\Users\defaultuser0\.m2\repository\org\slf4j\slf4j-log4j12\1.7.7\slf4j-log4j12-1.7.7.jar;C:\Users\defaultuser0\.m2\repository\log4j\log4j\1.2.17\log4j-1.2.17.jar;C:\Users\defaultuser0\.m2\repository\net\sf\json-lib\json-lib\2.4\json-lib-2.4-jdk15.jar;C:\Users\defaultuser0\.m2\repository\commons-beanutils\commons-beanutils\1.8.0\commons-beanutils-1.8.0.jar;C:\Users\defaultuser0\.m2\repository\commons-collections\commons-collections\3.2.1\commons-collections-3.2.1.jar;C:\Users\defaultuser0\.m2\repository\commons-lang\commons-lang\2.5\commons-lang-2.5.jar;C:\Users\defaultuser0\.m2\repository\commons-logging\commons-logging\1.1.1\commons-logging-1.1.1.jar;C:\Users\defaultuser0\.m2\repository\net\sf\ezmorph\ezmorph\1.0.6\ezmorph-1.0.6.jar" Udf.Test1这个是用JSON类来解析JSON字符串!!!0 zhangsan1 lisi2 wangwu3 maliu这个是用JSON类的parseObject来解析JSON字符串!!!key为:0值为:zhangsankey为:1值为:lisikey为:2值为:wangwukey为:3值为:maliu这个是用JSON类,指定解析类型,来解析JSON字符串!!!key为:0值为:zhangsankey为:1值为:lisikey为:2值为:wangwukey为:3值为:maliu这个是用JSONObject类的parse方法来解析JSON字符串!!!0 zhangsan1 lisi2 wangwu3 maliu这个是用JSONObject的parseObject方法来解析JSON字符串!!!0 zhangsan1 lisi2 wangwu3 maliu这个是用JSONObject的parseObject方法并执行返回类型来解析JSON字符串!!!0 zhangsan1 lisi2 wangwu3 maliu{"0":"zhangsan","1":"lisi","2":"wangwu","3":"maliu"}Process finished with exit code 0
8、数组转字符串数组
package Udf;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.Arrays;//方法4:数组转成数组字符串public class ArrToString3 extends ScalarFunction { private static final long serialVersionUID=1L; public ArrToString3() { } @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public String eval(String[] arr) { try { if(arr !=null && arr.length>0) { return Arrays.toString(arr); } else { return null; } } catch(Exception ex) { return ex.getMessage(); } }}
9、数组转json
package Udf;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import org.json.JSONArray;//方法1:数组转成json数组public class ArrToJson extends ScalarFunction { private static final long serialVersionUID = 1L; @Override public void open(FunctionContext context) throws Exception { super.open(context); } @Override public void close() throws Exception { super.close(); } public ArrToJson() { } public String eval(String[] arr) { try { if (arr != null && arr.length > 0) { JSONArray jsonArray=new JSONArray(arr); return jsonArray.toString(); } else { return null; } } catch (Exception ex) { return ex.getMessage(); } }}
10、LogMapInfo数据格式处理(logmap字段解析udf)
1、模型包
1.CellInfo
package Test.Model;/** * @program: FlinkUdf * @description: CellInfo * @author: BigData * @create: 2020-11-16 14:50 **/public class CellInfo { private String mcc; private String mnc; private String ci; private String pci; private String tac; private String type; public String getMcc() { return mcc; } public void setMcc(String mcc) { this.mcc = mcc; } public String getMnc() { return mnc; } public void setMnc(String mnc) { this.mnc = mnc; } public String getCi() { return ci; } public void setCi(String ci) { this.ci = ci; } public String getPci() { return pci; } public void setPci(String pci) { this.pci = pci; } public String getTac() { return tac; } public void setTac(String tac) { this.tac = tac; } public String getType() { return type; } public void setType(String type) { this.type = type; }}
2.GPS
package Test.Model;/** * @program: FlinkUdf * @description: Gps * @author: BigData * @create: 2020-11-16 14:51 **/public class Gps { private String longitude; private String latitude; public String getLongitude() { return longitude; } public void setLongitude(String longitude) { this.longitude = longitude; } public String getLatitude() { return latitude; } public void setLatitude(String latitude) { this.latitude = latitude; }}
package Test.Model;/** * @program: FlinkUdf * @description: Gps * @author: BigData * @create: 2020-11-16 14:51 **/public class Gps { private String longitude; private String latitude; public String getLongitude() { return longitude; } public void setLongitude(String longitude) { this.longitude = longitude; } public String getLatitude() { return latitude; } public void setLatitude(String latitude) { this.latitude = latitude; }}
3.StationInfo
package Test.Model;import java.util.List;/** * @program: FlinkUdf * @description: 地铁模型 * @author: BigData * @create: 2020-11-16 14:49 **/public final class StationInfo { private String station; private CellInfo cell_info; private Listwifi_infos; private String timestamp; private String reason; private String imei; private String guid; private Gps gps; private String city; private String group; private String version_name; private String version_code; public String getStation() { return station; } public void setStation(String station) { this.station = station; } public CellInfo getCell_info() { return cell_info; } public void setCell_info(CellInfo cell_info) { this.cell_info = cell_info; } public List getWifi_infos() { return wifi_infos; } public void setWifi_infos(List wifi_infos) { this.wifi_infos = wifi_infos; } public String getTimestamp() { return timestamp; } public void setTimestamp(String timestamp) { this.timestamp = timestamp; } public String getReason() { return reason; } public void setReason(String reason) { this.reason = reason; } public String getImei() { return imei; } public void setImei(String imei) { this.imei = imei; } public String getGuid() { return guid; } public void setGuid(String guid) { this.guid = guid; } public Gps getGps() { return gps; } public void setGps(Gps gps) { this.gps = gps; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } public String getGroup() { return group; } public void setGroup(String group) { this.group = group; } public String getVersion_name() { return version_name; } public void setVersion_name(String version_name) { this.version_name = version_name; } public String getVersion_code() { return version_code; } public void setVersion_code(String version_code) { this.version_code = version_code; }}
2、工具包
1、LogMapInfoSplit(logmap数据格式解析)
package Test.Util;import java.lang.reflect.Field;import java.nio.charset.StandardCharsets;import java.sql.Timestamp;import java.util.Base64;import java.util.HashMap;import java.util.Map;import Test.Model.StationInfo;import org.apache.commons.lang3.StringUtils;import org.apache.commons.lang3.time.DateFormatUtils;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.table.functions.TableFunction;import org.apache.flink.types.Row;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.google.gson.Gson;/** * @program: FlinkUdf * @description: LogMap字段解析 * @author: BigData * @create: 2020-11-16 14:53 **/public class LogMapInfoSplit extends TableFunction{ private static final long serialVersionUID = 5485869053798534732L; private static Gson gson = new Gson(); private static Logger logger= LoggerFactory.getLogger(LogMapInfoSplit.class); /** * 接受参数格式转换 * @param param */ public void eval(String param) { if(StringUtils.isBlank(param)) { logger.debug("param is empty"); return; } try { final Base64.Decoder decoder = Base64.getUrlDecoder(); String ocaUploadJson = new String( decoder.decode(param.replace(" ", "+").replace("$", "=").getBytes(StandardCharsets.UTF_8)), StandardCharsets.UTF_8); logger.info("sourceParam :" + ocaUploadJson); if (StringUtils.isBlank(ocaUploadJson)) { return; } StationInfo stationInfo = gson.fromJson(ocaUploadJson, StationInfo.class); if (stationInfo == null) return; logger.info("received wifiInfo:" + stationInfo.toString()); String[] wifiInfos = null; if (stationInfo.getWifi_infos() != null && stationInfo.getWifi_infos().size() != 0) { wifiInfos = stationInfo.getWifi_infos().toArray(new String[stationInfo.getWifi_infos().size()]); } Row row = new Row(13); row.setField(0, stationInfo.getStation()); row.setField(1, stationInfo.getImei()); row.setField(2, objectToMap(stationInfo.getCell_info())); row.setField(3, wifiInfos); row.setField(4, stationInfo.getCity()); row.setField(5, stationInfo.getReason()); logger.info("station is:" + stationInfo.getStation()); Map
objMap = objectToMap(stationInfo.getGps()); logger.info("obj map is:" + objMap); row.setField(6, objMap); row.setField(7, new Timestamp(Long.parseLong(stationInfo.getTimestamp()))); row.setField(8, stationInfo.getGroup()); row.setField(9, DateFormatUtils.ISO_DATE_FORMAT.format(Long.parseLong(stationInfo.getTimestamp()))); row.setField(10,stationInfo.getGuid()); row.setField(11,stationInfo.getVersion_name()); row.setField(12,stationInfo.getVersion_code()); collect(row); } catch (Exception e) { logger.error(e.getMessage(), e); } } /** * 指定返回类型 */ @Override public TypeInformation getResultType() { return Types.ROW(Types.STRING, Types.STRING, Types.MAP(Types.STRING, Types.STRING) , Types.OBJECT_ARRAY(Types.STRING), Types.STRING, Types.STRING, Types.MAP(Types.STRING, Types.STRING), Types.SQL_TIMESTAMP, Types.STRING, Types.STRING,Types.STRING,Types.STRING,Types.STRING); } /** * object转换为Map * @param obj * @return */ public static Map
objectToMap(Object obj) { if(obj==null) { return null; } else { Map map = new HashMap (); Class clazz = obj.getClass(); for (Field field : clazz.getDeclaredFields()) { field.setAccessible(true); String fieldName = field.getName(); try { map.put(fieldName, String.valueOf(field.get(obj))); } catch (Exception e) { logger.error(e.getMessage()); } } return map; } }}
flinksql代码:
INSERT INTO software_research.subwayaware_infoSELECT station, logMap_imei as `imei`, cell_info, wifi_infos, city, reason, gps, CAST(createtime as VARCHAR) as `timestamp`, logMap_group as `group`, datestr, model, event_id, guid, version_name, version_codeFROM software_research.subwayaware_dcs LEFT JOIN LATERAL TABLE(logMapInfoSplit(log_map ['oca_upload_json'])) AS T( station, logMap_imei, cell_info, wifi_infos, city, reason, gps, createtime, logMap_group, datestr, guid, version_name, version_code ) ON TRUE
发表评论
最新留言
很好
[***.229.124.182]2025年04月10日 04时32分20秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
二 召回算法
2019-03-11
2020-11月计划实施表
2019-03-11
个人常用网络
2019-03-11
折线图
2019-03-11
常识:
2019-03-11
注册页面案例
2019-03-11
np.bincount(x)的简单解释
2019-03-11
一些面试的准备的回答
2019-03-11
django中文件的上传问题
2019-03-11
Spark Standalone模式下启动集群的基本流程
2019-03-11
LeetCode Top-100 T22-括号生成
2019-03-11
svg基础+微信公众号交互(二)
2019-03-11
webstorm 自定义快捷键
2019-03-11
vscode设置eslint保存文件时自动修复eslint错误
2019-03-11
deepin 安装过程记录
2019-03-11
JAVA 多线程
2019-03-11
接口详解
2019-03-11
Java的 arraylist类【具体案例】
2019-03-11
FileWriter
2019-03-11
Java中IO流的打印流-PrintWriter
2019-03-11