项目实战 从 0 到 1 学习之Flink (29)UDF实现
发布日期:2021-05-14 00:18:23 浏览次数:18 分类:博客文章

本文共 60411 字,大约阅读时间需要 201 分钟。

1、pom.xml

4.0.0
org.example
FlinkUdf
1.0-SNAPSHOT
test
http://www.example.com
UTF-8
1.7
1.7
1.11.1
2.11
2.11.0
3.0.0
3.0.0
2.3.0
3.0.0
3.0.0
org.scala-lang
scala-library
${scala.version}
org.apache.flink
flink-table-api-java-bridge_2.11
${flink.version}
org.apache.flink
flink-table-api-java
${flink.version}
org.apache.flink
flink-table-planner-blink_${scala.binary.version}
${flink.version}
scala-library
org.scala-lang
slf4j-api
org.slf4j
org.apache.flink
flink-table-planner_${scala.binary.version}
${flink.version}
org.apache.flink
flink-table-common
${flink.version}
org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}
org.apache.flink
flink-streaming-scala_${scala.binary.version}
${flink.version}
org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}
com.alibaba
fastjson
1.2.58
javassist
org.javassist
scala-parser-combinators_2.11
org.scala-lang.modules
slf4j-api
org.slf4j
snappy-java
org.xerial.snappy
org.apache.flink
flink-java
${flink.version}
org.apache.kafka
kafka-clients
0.11.0.3
slf4j-api
org.slf4j
org.apache.flink
flink-connector-kafka-0.11_${scala.binary.version}
${flink.version}
kafka-clients
org.apache.kafka
org.apache.flink
flink-csv
${flink.version}
org.apache.flink
flink-json
1.10.0
org.apache.flink
flink-hbase_2.12
1.10.1
org.apache.flink
flink-jdbc_2.12
1.10.2
mysql
mysql-connector-java
5.1.37
org.apache.flink
flink-connector-redis_2.11
1.1.5
force-shading
org.apache.flink
slf4j-api
org.slf4j
com.fasterxml.jackson.core
jackson-core
2.9.5
io.lettuce
lettuce-core
5.0.5.RELEASE
io.netty
netty-all
4.1.4.Final
redis.clients
jedis
${jedis.version}
org.slf4j
slf4j-log4j12
1.7.7
runtime
log4j
log4j
1.2.17
runtime
net.sf.json-lib
json-lib
2.4
jdk15
junit
junit
4.12
org.apache.hadoop
hadoop-client
${hadoop.version}
org.apache.hadoop
hadoop-common
${hadoop.version}
org.apache.hadoop
hadoop-hdfs
${hadoop.version}
org.apache.hadoop
hadoop-mapreduce-client-common
${hadoop.version}
org.apache.hadoop
hadoop-mapreduce-client-core
${hadoop.version}
org.apache.hive
hive-jdbc
${hive.version}
org.apache.hive
hive-exec
${hive.version}
org.apache.hbase
hbase-server
${hbase.version}
org.apache.hbase
hbase-client
${hbase.version}
org.apache.spark
spark-core_2.12
${spark.version}
provided
org.apache.spark
spark-streaming_2.12
${spark.version}
provided
org.apache.spark
spark-sql_2.12
${spark.version}
provided
org.apache.spark
spark-mllib_2.12
${spark.version}
provided
org.apache.spark
spark-hive_2.12
${spark.version}
apache.snapshots
Apache Development Snapshot Repository
https://repository.apache.org/content/repositories/snapshots/
false
true
maven-clean-plugin
3.1.0
maven-resources-plugin
3.0.2
maven-compiler-plugin
3.8.0
maven-surefire-plugin
2.22.1
maven-jar-plugin
3.0.2
maven-install-plugin
2.5.2
maven-deploy-plugin
2.8.2
maven-site-plugin
3.7.1
maven-project-info-reports-plugin
3.0.0
org.apache.maven.plugins
maven-compiler-plugin
8
8

2、数组转字符串

1、方法①

package Udf;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;//方法1:数组按照指定的分隔符转成字符串public class ArrToString extends ScalarFunction {    private static final  long serialVersionUID=1L;    public ArrToString() {    }    @Override    public void open(FunctionContext context) throws Exception {        super.open(context);    }    @Override    public void close() throws Exception {        super.close();    }    public String eval(String[] bodyRow, String split) {        try {            if(bodyRow !=null && bodyRow.length>0)            {                StringBuilder stringBuilder=new StringBuilder();                for(int i=0;i

2、方法②

package Udf;import org.apache.commons.lang3.ArrayUtils;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;//方法3:数组按照指定的分隔符转成字符串public class ArrToString2 extends ScalarFunction {    private static final  long serialVersionUID=1L;    public ArrToString2() {    }    @Override    public void open(FunctionContext context) throws Exception {        super.open(context);    }    @Override    public void close() throws Exception {        super.close();    }    public String eval(String[] arr, String split)    {        try {           if(arr !=null && arr.length>0)           {                return ArrayUtils.toString(arr,split);           }           else           {               return null;           }        }        catch(Exception ex)        {            return ex.getMessage();        }    }}

 

3、方法③

package Udf;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.Arrays;//方法4:数组转成数组字符串public class ArrToString3 extends ScalarFunction {    private static final  long serialVersionUID=1L;    public ArrToString3() {    }    @Override    public void open(FunctionContext context) throws Exception {        super.open(context);    }    @Override    public void close() throws Exception {        super.close();    }    public String eval(String[] arr)    {        try {           if(arr !=null && arr.length>0)           {              return Arrays.toString(arr);           }           else           {               return null;           }        }        catch(Exception ex)        {            return ex.getMessage();        }    }}

 

3、字符串转数组

package Udf;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;//方法1:字符串按照指定的分隔符转成数组public class StringToArr extends ScalarFunction {    private static final  long serialVersionUID=1L;    public StringToArr() {    }    @Override    public void open(FunctionContext context) throws Exception {        super.open(context);    }    @Override    public void close() throws Exception {        super.close();    }    public String[] eval(String arr, String split)    {        try {           if(arr !=null)           {                return arr.split(split);           }           else           {               return null;           }        }        catch(Exception ex)        {            return null;        }    }}

 4、字符串转map

package Udf;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.HashMap;import java.util.Map;//方法1:字符串按照指定的分隔符转成map类型public class StringToMap extends ScalarFunction {    private static final  long serialVersionUID=1L;    public StringToMap() {    }    @Override    public void open(FunctionContext context) throws Exception {        super.open(context);    }    @Override    public void close() throws Exception {        super.close();    }    public Map eval(String str, String split, String split1)    {        try {           if(str !=null)           {                String [] arr=str.split(split);               Map
map = new HashMap<>(); for (int i=0;i

import org.junit.Test;import java.util.HashMap;import java.util.Map; public class StringUtilsTest {     @Test    public void testDataToMap() {        String data = "certificatetype=0&certificateno=220182&depositacct=622848";        Map map = new HashMap();         if (null != data) {            String[] param = data.split("&");            for (int i = 0; i < param.length; i++) {                int index = param[i].indexOf('=');                map.put(param[i].substring(0,index), param[i].substring((index + 1)));            }        }         System.out.println(map);         System.out.println("----------------分割线---------------");        Map result = new HashMap();        String[] params = data.split("\\&");        for (String entry : params) {            if (entry.contains("=")) {                String[] sub = entry.split("\\=");                if (sub.length > 1) {                    result.put(sub[0], sub[1]);                } else {                    result.put(sub[0], "");                }            }        }        System.out.println(result);    } }String str1 = "d3fe1e186e41475ea965f4722f5488a8";                String str2 = "5093";                String str3 = "公共设施";                String str = str1 + "\1" + str2 + "\1" + str3;                System.out.println(str);                // 也就是下面的字符串,分隔符为 \u0001                str = "d3fe1e186e41475ea965f4722f5488a8\u00015093\u0001公共设施";                String[] split = str.split("\1");                for (String s : split) {                    System.out.println(s);                }                System.out.println(split.length);

 ③

package Flink.Udf;import org.apache.commons.lang.StringUtils;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.LinkedHashMap;import java.util.Map;/** * mapStr转Map */public class MapStrToMap extends ScalarFunction {    private static final long serialVersionUID = 1L;    private static final Logger logger = LoggerFactory.getLogger(MapStrToMap.class);    @Override    public void open(FunctionContext context) throws Exception {        super.open(context);    }    @Override    public void close() throws Exception {        super.close();    }    public String eval(String data,String spt1,String spt2) {        Map
map = new LinkedHashMap<>(); try { if (StringUtils.isEmpty(data)) { return ""; } else { String[] split = data.split(spt1); for (String s : split) { String[] s1 = s.split(spt2); if (s1.length > 1) { map.put(s1[0], s1[1]); } else { map.put(s1[0], null); } } } } catch (Exception e) { logger.error("MapStr to Json Error!,mapStr={}", e); } return map.toString(); }}

5、map转字符串

package Udf;import org.apache.flink.table.functions.ScalarFunction;import org.apache.flink.table.functions.TableFunction;import java.util.Arrays;import java.util.Map;import java.util.Set;//方法1:map按照指定的分隔符转成字符串public class MapToString extends ScalarFunction {    private static final  long serialVersionUID=1L;    public MapToString() {    }    public String eval(Map
map,String split,String split1) { try { Set
keySet = map.keySet(); //将set集合转换为数组 String[] keyArray = keySet.
toArray(new String[keySet.size()]); //给数组排序(升序) Arrays.sort(keyArray); //因为String拼接效率会很低的,所以转用StringBuilder StringBuilder sb = new StringBuilder(); for (int i = 0; i < keyArray.length; i++) { // 参数值为空,则不参与签名 这个方法trim()是去空格 if ((String.valueOf(map.get(keyArray[i]))).trim().length() > 0) { sb.append(keyArray[i]).append(split1).append(String.valueOf(map.get(keyArray[i])).trim()); } if(i != keyArray.length-1){ sb.append(split); } } return sb.toString(); } catch(Exception ex) { ex.printStackTrace(); return null; } }}

6、map转Json

package Flink.Udf;import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang.StringUtils;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.LinkedHashMap;import java.util.Map;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * mapStr转Json */public class MapStrToJson extends ScalarFunction {    private static final long serialVersionUID = 1L;    private static final Logger logger = LoggerFactory.getLogger(MapStrToJson.class);    @Override    public void open(FunctionContext context) throws Exception {        super.open(context);    }    @Override    public void close() throws Exception {        super.close();    }    public String eval(String data,String spt1,String spt2) {        Map
map = new LinkedHashMap<>(); try { if (StringUtils.isEmpty(data)) { return ""; } else { String[] split = data.split(spt1); for (String s : split) { String[] s1 = s.split(spt2); if (s1.length > 1) { map.put(s1[0], s1[1]); } else { map.put(s1[0], null); } } } } catch (Exception e) { logger.error("MapStr to Json Error!,mapStr={}", e); } return JSONObject.toJSONString(map); }}

package com.oppo.dc.ostream.udf.common;import com.alibaba.fastjson.JSON;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.Map;public classMapToJSONString extends ScalarFunction {    private static final long serialVersionUID = 1L;    @Override    public void open(FunctionContext context) throws Exception {        super.open(context);    }    @Override    public void close() throws Exception {        super.close();    }    public String eval(Map map) {        if (map == null) {            return null;        }        String jsonStr = JSON.toJSONString(map);        if (jsonStr.length() > 5000) {            jsonStr = jsonStr.substring(0, 5000);        }        return jsonStr;    }}
package com.oppo.dc.ostream.udf.common;import com.alibaba.fastjson.JSON;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.Map;/** * Author: Fisher Xiang * Date: 2019-10-28 15:21 * * @description:MapToJSONStr * @version: 1.0.0 */public class MapToJSONStr extends ScalarFunction {    @Override    public void open(FunctionContext context) throws Exception {        super.open(context);    }    @Override    public void close() throws Exception {        super.close();    }    public String eval(Map map) {        if (map == null) {            return null;        }        if (map.containsKey("data")) {            map.put("data", ((String) map.get("data")).replace("\"%2C\"", "\",\""));        }        String jsonStr = JSON.toJSONString(map);        /*if (jsonStr.length() > 5000) {            jsonStr = jsonStr.substring(0, 5000);        }*/        return jsonStr;    }}
④map字符串转json(按照hash进行排序)
package FlinkDW.Udf;import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang.StringUtils;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.HashMap;import java.util.Map;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * mapStr转Json */public class MapStrToJson extends ScalarFunction{    private static final long serialVersionUID = 1 L;    private static final Logger logger = LoggerFactory.getLogger(MapStrToJson.class);@    Override    public void open(FunctionContext context) throws Exception    {        super.open(context);    }@    Override    public void close() throws Exception    {        super.close();    }    public String eval(String data, String spt1, String spt2)    {        Map < String, String > map = new HashMap < > ();        try        {            if(StringUtils.isEmpty(data))            {                return "";            }            else            {                String[] split = data.split(spt1);                for(String s: split)                {                    String[] s1 = s.split(spt2);                    if(s1.length > 1)                    {                        map.put(s1[0], s1[1]);                    }                    else                    {                        map.put(s1[0], "null");                    }                }            }        }        catch(Exception e)        {            logger.error("MapStr to Json Error!,mapStr={}", data, e);        }        return JSONObject.toJSONString(map);    }}
⑤优化(固定顺序)
package Flink.Udf;import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang.StringUtils;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.LinkedHashMap;import java.util.Map;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * mapStr转Json */public class MapStrToJson extends ScalarFunction{    private static final long serialVersionUID = 1 L;    private static final Logger logger = LoggerFactory.getLogger(MapStrToJson.class);@    Override    public void open(FunctionContext context) throws Exception    {        super.open(context);    }@    Override    public void close() throws Exception    {        super.close();    }    public String eval(String data, String spt1, String spt2)    {        Map < String, String > map = new LinkedHashMap < > ();        try        {            if(StringUtils.isEmpty(data))            {                return "";            }            else            {                String[] split = data.split(spt1);                for(String s: split)                {                    String[] s1 = s.split(spt2);                    if(s1.length > 1)                    {                        map.put(s1[0], s1[1]);                    }                    else                    {                        map.put(s1[0], null);                    }                }            }        }        catch(Exception e)        {            logger.error("MapStr to Json Error!,mapStr={}", e);        }        return JSONObject.toJSONString(map);    }}

测试

package Flink.Udf;import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang.StringUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.*;public class Test {    private static final Logger logger = LoggerFactory.getLogger(Test.class);    /**    * @Description: 1、map字符串转json    * @Param: [data, spt1, spt2]    * @return: java.lang.String    * @Author: BigData    * @Date: 2020/12/3    */    public static String  testDataToJson(String data,String spt1,String spt2) {        Map
map = new LinkedHashMap<>(); try { if (StringUtils.isEmpty(data)) { return ""; } else { String[] split = data.split(spt1); for (String s : split) { String[] s1 = s.split(spt2); if (s1.length > 1) { map.put(s1[0], s1[1]); } else { map.put(s1[0], "null"); } } } } catch (Exception e) { logger.error("MapStr to Json Error!,mapStr={}", data, e); } return JSONObject.toJSONString(map); } /** * @Description: 2、map字符串转map格式 * @Param: [data, spt1, spt2] * @return: java.lang.String * @Author: BigData * @Date: 2020/12/3 */ public static String testDataToMap(String data,String spt1,String spt2) { Map
map = new LinkedHashMap<>(); try { if (data instanceof String) { if (StringUtils.isEmpty(data) || data.length() <= 2) { return null; } else { String[] split = data.split(spt1); for (String s : split) { String[] s1=s.split(spt2); if(s1.length>1) { map.put(s1[0], s1[1]); } else { map.put(s1[0],"null"); } } } } else if(data!=null || data.length()>2) { return null; } } catch (Exception e) { e.getMessage(); } return map.toString(); } public static String getType(Object o){ //获取变量类型方法 return o.getClass().toString(); } public static void main(String[] args) { String data1 = "certificatetype=0&certificateno=220182&depositacct=622848&name"; System.out.println(testDataToJson(data1,"&", "=")); String str="k1:,k2:V2"; System.out.println(testDataToJson(str, ",", ":")); String str1="stayTime\u00020\u0001firstScreenTime\u00020\u0001apiName\u0002stringValue\u0001apiParam\u0002id\u003d44\u0001methodType\u0002GET\u0001loadingTime\u0002453\u0001websiteId\u0002stringValue\u0001pagePath\u0002/stringValue\u0001pageName\u0002/value\u0001sessionKey\u0002eyJhbX0AwqPisoE6V8A\u0001userName\u000280254861\u0001ipAddress\u0002XX.XX.XCX.XXX\u0001uploadTime\u00022020-11-23 13:07:13\u0001receiveTime\u00022020-11-26 11:08:55\u0001eventCode\u00025"; System.out.println(testDataToMap(str1,"\u0001", "\u0002")); System.out.println(testDataToJson(str1,"\u0001", "\u0002")); String data ="client_timestamp\u00011587375750618\u0002apply_value\u00013\u0002pre_page_id\u0001DraftActivity\u0002close_value\u00010\u0002item_id\u0001no_apply\u0002start_timestamp\u00011587375592167\u0002template_id_auto\u0001-1\u0002music_name\u0001少年2\u0002play_cnt\u00010\u0002duration_value\u00015506\u0002video_id\u00011587375633000\u0002is_story\u0001false"; System.out.println(testDataToJson(data,"\u0002", "\u0001")); String str2="ts=1529995388&channel_id=164292&program_id=9081951&play_duration=20&position=10&os=iOS&os_version=4.3&app_name=aliyun&app_version=1.0&device_model=Samsumg_br_FM__samsung_SCH-N719"; System.out.println(testDataToJson(str2,"&","=")); System.out.println(testDataToMap(str2,"&","=")); String logMap ="websiteId\u0002ota-recruit-TEST_ENV\u0001pagePath\u0002/configuration/index\u0001pageName\u0002????\u0001sessionKey\u0002sessionKey\u0001userName\u00020\u0001ipAddress\u0002172.17.75.8\u0001uploadTime\u00022020-12-09 15:10:13\u0001receiveTime\u00022020-12-09 15:10:20\u0001eventCode\u00021"; System.out.println(testDataToJson(logMap,"\u0001", "\u0002")); System.out.println(testDataToMap(logMap, "\u0001", "\u0002")); }}

测试结果

"C:\Program Files\Java\jdk1.8.0_221\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=54638:C:\Program Files\JetBrains\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\defaultuser0\AppData\Local\Temp\classpath1090826265.jar Flink.Udf.Test

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/defaultuser0/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/defaultuser0/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.10.0/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
{"certificatetype":"0","certificateno":"220182","depositacct":"622848","name":"null"}
{"k1":"null","k2":"V2"}
{stayTime=0, firstScreenTime=0, apiName=stringValue, apiParam=id=44, methodType=GET, loadingTime=453, websiteId=stringValue, pagePath=/stringValue, pageName=/value, sessionKey=eyJhbX0AwqPisoE6V8A, userName=80254861, ipAddress=XX.XX.XCX.XXX, uploadTime=2020-11-23 13:07:13, receiveTime=2020-11-26 11:08:55, eventCode=5}
{"stayTime":"0","firstScreenTime":"0","apiName":"stringValue","apiParam":"id=44","methodType":"GET","loadingTime":"453","websiteId":"stringValue","pagePath":"/stringValue","pageName":"/value","sessionKey":"eyJhbX0AwqPisoE6V8A","userName":"80254861","ipAddress":"XX.XX.XCX.XXX","uploadTime":"2020-11-23 13:07:13","receiveTime":"2020-11-26 11:08:55","eventCode":"5"}
{"client_timestamp":"1587375750618","apply_value":"3","pre_page_id":"DraftActivity","close_value":"0","item_id":"no_apply","start_timestamp":"1587375592167","template_id_auto":"-1","music_name":"少年2","play_cnt":"0","duration_value":"5506","video_id":"1587375633000","is_story":"false"}
{"ts":"1529995388","channel_id":"164292","program_id":"9081951","play_duration":"20","position":"10","os":"iOS","os_version":"4.3","app_name":"aliyun","app_version":"1.0","device_model":"Samsumg_br_FM__samsung_SCH-N719"}
{ts=1529995388, channel_id=164292, program_id=9081951, play_duration=20, position=10, os=iOS, os_version=4.3, app_name=aliyun, app_version=1.0, device_model=Samsumg_br_FM__samsung_SCH-N719}
{"websiteId":"ota-recruit-TEST_ENV","pagePath":"/configuration/index","pageName":"????","sessionKey":"sessionKey","userName":"0","ipAddress":"172.17.75.8","uploadTime":"2020-12-09 15:10:13","receiveTime":"2020-12-09 15:10:20","eventCode":"1"}
{websiteId=ota-recruit-TEST_ENV, pagePath=/configuration/index, pageName=????, sessionKey=sessionKey, userName=0, ipAddress=172.17.75.8, uploadTime=2020-12-09 15:10:13, receiveTime=2020-12-09 15:10:20, eventCode=1}

Process finished with exit code 0

 测试2

package FlinkDW.Udf;import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang.StringUtils;import java.util.*;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class Test {    public static String  testDataToMap(String data,String spt1,String spt2) {        Map
map = new HashMap
(); try { if (data instanceof String) { if (StringUtils.isEmpty(data) || data.length() <= 2) { return data; } else { String[] split = data.split(spt1); for (String s : split) { String[] s1=s.split(spt2); if(s1.length>1) { map.put(s1[0], s1[1]); } else { map.put(s1[0],"null"); } } } } else if(data!=null || data.length()>2) { return "please input type of string "+getType(data); } } catch (Exception e) { e.getMessage(); } return JSONObject.toJSONString(map); } public static String getType(Object o){ //获取变量类型方法 return o.getClass().toString(); } public static void main(String[] args) { String data1 = "certificatetype=0&certificateno=220182&depositacct=622848&name"; String data ="client_timestamp\u00011587375750618\u0002apply_value\u00013\u0002pre_page_id\u0001DraftActivity\u0002close_value\u00010\u0002item_id\u0001no_apply\u0002start_timestamp\u00011587375592167\u0002template_id_auto\u0001-1\u0002music_name\u0001少年2\u0002play_cnt\u00010\u0002duration_value\u00015506\u0002video_id\u00011587375633000\u0002is_story\u0001false"; System.out.println(testDataToMap(data,"\u0002", "\u0001")); System.out.println(testDataToMap(data1,"&", "=")); String str="k1:,k2:V2"; System.out.println(testDataToMap(str, ",", ":")); }}

结果:

"C:\Program Files\Java\jdk1.8.0_221\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=65426:C:\Program Files\JetBrains\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\defaultuser0\AppData\Local\Temp\classpath1898420598.jar FlinkDW.Udf.Test

{"client_timestamp":"1587375750618","apply_value":"3","pre_page_id":"DraftActivity","close_value":"0","item_id":"no_apply","start_timestamp":"1587375592167","template_id_auto":"-1","music_name":"少年2","play_cnt":"0","duration_value":"5506","video_id":"1587375633000","is_story":"false"}
{"name":"null","certificatetype":"0","certificateno":"220182","depositacct":"622848"}
{"k1":"null","k2":"V2"}

Process finished with exit code 0

 

7、json转map

package Udf;import com.alibaba.fastjson.JSONObject;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.Map;//方法1:json转成mappublic class JsonToMap extends ScalarFunction {    private static final  long serialVersionUID=1L;    public JsonToMap() {    }    @Override    public void open(FunctionContext context) throws Exception {        super.open(context);    }    @Override    public void close() throws Exception {        super.close();    }    public Map eval(String str)    {        try {            if(str !=null)            {                JSONObject jsonObject = JSONObject.parseObject(str);                Map
map = jsonObject; return map; } else { return null; } } catch(Exception ex) { ex.printStackTrace(); return null; } }}

注意:FastJson的json转map的6种方式

package Udf;import java.util.Map;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;public class Test1 {    public static void main(String[] args){        String str = "{\"0\":\"zhangsan\",\"1\":\"lisi\",\"2\":\"wangwu\",\"3\":\"maliu\"}";        //第一种方式        Map maps = (Map) JSON.parse(str);        System.out.println("这个是用JSON类来解析JSON字符串!!!");        for (Object map : maps.entrySet()){            System.out.println(((Map.Entry)map).getKey()+"     " + ((Map.Entry)map).getValue());        }        //第二种方式        Map mapTypes = JSON.parseObject(str);        System.out.println("这个是用JSON类的parseObject来解析JSON字符串!!!");        for (Object obj : mapTypes.keySet()){            System.out.println("key为:"+obj+"值为:"+mapTypes.get(obj));        }        //第三种方式        Map mapType = JSON.parseObject(str,Map.class);        System.out.println("这个是用JSON类,指定解析类型,来解析JSON字符串!!!");        for (Object obj : mapType.keySet()){            System.out.println("key为:"+obj+"值为:"+mapType.get(obj));        }        //第四种方式        /**         * JSONObject是Map接口的一个实现类         */        Map json = (Map) JSONObject.parse(str);        System.out.println("这个是用JSONObject类的parse方法来解析JSON字符串!!!");        for (Object map : json.entrySet()){            System.out.println(((Map.Entry)map).getKey()+"  "+((Map.Entry)map).getValue());        }        //第五种方式        /**         * JSONObject是Map接口的一个实现类         */        JSONObject jsonObject = JSONObject.parseObject(str);        System.out.println("这个是用JSONObject的parseObject方法来解析JSON字符串!!!");        for (Object map : json.entrySet()){            System.out.println(((Map.Entry)map).getKey()+"  "+((Map.Entry)map).getValue());        }        //第六种方式        /**         * JSONObject是Map接口的一个实现类         */        Map mapObj = JSONObject.parseObject(str,Map.class);        System.out.println("这个是用JSONObject的parseObject方法并执行返回类型来解析JSON字符串!!!");        for (Object map: json.entrySet()){            System.out.println(((Map.Entry)map).getKey()+"  "+((Map.Entry)map).getValue());        }        String strArr = "{{\"0\":\"zhangsan\",\"1\":\"lisi\",\"2\":\"wangwu\",\"3\":\"maliu\"}," +                "{\"00\":\"zhangsan\",\"11\":\"lisi\",\"22\":\"wangwu\",\"33\":\"maliu\"}}";        // JSONArray.parse()        System.out.println(json);    }}

结果:

"C:\Program Files\Java\jdk1.8.0_221\bin\java.exe" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=55064:C:\Program Files\JetBrains\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_221\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_221\jre\lib\rt.jar;C:\app\FlinkUdf\target\test-classes;C:\Users\defaultuser0\.m2\repository\junit\junit\4.11\junit-4.11.jar;C:\Users\defaultuser0\.m2\repository\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-table-api-java-bridge_2.11\1.11.1\flink-table-api-java-bridge_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-table-api-java\1.11.1\flink-table-api-java-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-java\1.11.1\flink-java-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\commons\commons-lang3\3.3.2\commons-lang3-3.3.2.jar;C:\Users\defaultuser0\.m2\repository\org\apache\commons\commons-math3\3.5\commons-math3-3.5.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-streaming-java_2.11\1.11.1\flink-streaming-java_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-runtime_2.11\1.11.1\flink-runtime_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-queryable-state-client-java\1.11.1\flink-queryable-state-client-java-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-hadoop-fs\1.11.1\flink-hadoop-fs-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\commons-io\commons-io\2.4\commons-io-2.4.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-shaded-netty\4.1.39.Final-11.0\flink-shaded-netty-4.1.39.Final-11.0.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-shaded-jackson\2.10.1-11.0\flink-shaded-jackson-2.10.1-11.0.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-shaded-zookeeper-3\3.4.14-11.0\flink-shaded-zookeeper-3-3.4.14-11.0.jar;C:\Users\defaultuser0\.m2\repository\commons-cli\commons-cli\1.3.1\commons-cli-1.3.1.jar;C:\Users\defaultuser0\.m2\repository\com\typesafe\akka\akka-actor_2.11\2.5.21\akka-actor_2.11-2.5.21.jar;C:\Users\defaultuser0\.m2\repository\com\typesafe\config\1.3.3\config-1.3.3.jar;C:\Users\defaultuser0\.m2\repository\org\scala-lang\modules\scala-java8-compat_2.11\0.7.0\scala-java8-compat_2.11-0.7.0.jar;C:\Users\defaultuser0\.m2\repository\com\typesafe\akka\akka-stream_2.11\2.5.21\akka-stream_2.11-2.5.21.jar;C:\Users\defaultuser0\.m2\repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;C:\Users\defaultuser0\.m2\repository\com\typesafe\ssl-config-core_2.11\0.3.7\ssl-config-core_2.11-0.3.7.jar;C:\Users\defaultuser0\.m2\repository\com\typesafe\akka\akka-protobuf_2.11\2.5.21\akka-protobuf_2.11-2.5.21.jar;C:\Users\defaultuser0\.m2\repository\com\typesafe\akka\akka-slf4j_2.11\2.5.21\akka-slf4j_2.11-2.5.21.jar;C:\Users\defaultuser0\.m2\repository\org\clapper\grizzled-slf4j_2.11\1.3.2\grizzled-slf4j_2.11-1.3.2.jar;C:\Users\defaultuser0\.m2\repository\com\github\scopt\scopt_2.11\3.5.0\scopt_2.11-3.5.0.jar;C:\Users\defaultuser0\.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\defaultuser0\.m2\repository\com\twitter\chill_2.11\0.7.6\chill_2.11-0.7.6.jar;C:\Users\defaultuser0\.m2\repository\com\twitter\chill-java\0.7.6\chill-java-0.7.6.jar;C:\Users\defaultuser0\.m2\repository\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-shaded-guava\18.0-11.0\flink-shaded-guava-18.0-11.0.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-streaming-java_2.11\1.11.1\flink-streaming-java_2.11-1.11.1-tests.jar;C:\Users\defaultuser0\.m2\repository\org\slf4j\slf4j-api\1.7.15\slf4j-api-1.7.15.jar;C:\Users\defaultuser0\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\force-shading\1.11.1\force-shading-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-table-planner-blink_2.11\1.11.1\flink-table-planner-blink_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-table-api-scala_2.11\1.11.1\flink-table-api-scala_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-table-api-scala-bridge_2.11\1.11.1\flink-table-api-scala-bridge_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-table-runtime-blink_2.11\1.11.1\flink-table-runtime-blink_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\codehaus\janino\janino\3.0.9\janino-3.0.9.jar;C:\Users\defaultuser0\.m2\repository\org\codehaus\janino\commons-compiler\3.0.9\commons-compiler-3.0.9.jar;C:\Users\defaultuser0\.m2\repository\org\apache\calcite\avatica\avatica-core\1.16.0\avatica-core-1.16.0.jar;C:\Users\defaultuser0\.m2\repository\org\reflections\reflections\0.9.10\reflections-0.9.10.jar;C:\Users\defaultuser0\.m2\repository\org\javassist\javassist\3.19.0-GA\javassist-3.19.0-GA.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-streaming-scala_2.11\1.11.1\flink-streaming-scala_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-scala_2.11\1.11.1\flink-scala_2.11-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\scala-lang\scala-reflect\2.11.12\scala-reflect-2.11.12.jar;C:\Users\defaultuser0\.m2\repository\org\scala-lang\scala-library\2.11.12\scala-library-2.11.12.jar;C:\Users\defaultuser0\.m2\repository\org\scala-lang\scala-compiler\2.11.12\scala-compiler-2.11.12.jar;C:\Users\defaultuser0\.m2\repository\org\scala-lang\modules\scala-xml_2.11\1.0.5\scala-xml_2.11-1.0.5.jar;C:\Users\defaultuser0\.m2\repository\org\scala-lang\modules\scala-parser-combinators_2.11\1.0.4\scala-parser-combinators_2.11-1.0.4.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-table-common\1.11.1\flink-table-common-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-core\1.11.1\flink-core-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-annotations\1.11.1\flink-annotations-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-metrics-core\1.11.1\flink-metrics-core-1.11.1.jar;C:\Users\defaultuser0\.m2\repository\com\esotericsoftware\kryo\kryo\2.24.0\kryo-2.24.0.jar;C:\Users\defaultuser0\.m2\repository\com\esotericsoftware\minlog\minlog\1.2\minlog-1.2.jar;C:\Users\defaultuser0\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;C:\Users\defaultuser0\.m2\repository\org\apache\commons\commons-compress\1.20\commons-compress-1.20.jar;C:\Users\defaultuser0\.m2\repository\org\apache\flink\flink-shaded-asm-7\7.1-11.0\flink-shaded-asm-7-7.1-11.0.jar;C:\Users\defaultuser0\.m2\repository\com\alibaba\fastjson\1.2.73\fastjson-1.2.73.jar;C:\Users\defaultuser0\.m2\repository\org\slf4j\slf4j-log4j12\1.7.7\slf4j-log4j12-1.7.7.jar;C:\Users\defaultuser0\.m2\repository\log4j\log4j\1.2.17\log4j-1.2.17.jar;C:\Users\defaultuser0\.m2\repository\net\sf\json-lib\json-lib\2.4\json-lib-2.4-jdk15.jar;C:\Users\defaultuser0\.m2\repository\commons-beanutils\commons-beanutils\1.8.0\commons-beanutils-1.8.0.jar;C:\Users\defaultuser0\.m2\repository\commons-collections\commons-collections\3.2.1\commons-collections-3.2.1.jar;C:\Users\defaultuser0\.m2\repository\commons-lang\commons-lang\2.5\commons-lang-2.5.jar;C:\Users\defaultuser0\.m2\repository\commons-logging\commons-logging\1.1.1\commons-logging-1.1.1.jar;C:\Users\defaultuser0\.m2\repository\net\sf\ezmorph\ezmorph\1.0.6\ezmorph-1.0.6.jar" Udf.Test1这个是用JSON类来解析JSON字符串!!!0     zhangsan1     lisi2     wangwu3     maliu这个是用JSON类的parseObject来解析JSON字符串!!!key为:0值为:zhangsankey为:1值为:lisikey为:2值为:wangwukey为:3值为:maliu这个是用JSON类,指定解析类型,来解析JSON字符串!!!key为:0值为:zhangsankey为:1值为:lisikey为:2值为:wangwukey为:3值为:maliu这个是用JSONObject类的parse方法来解析JSON字符串!!!0  zhangsan1  lisi2  wangwu3  maliu这个是用JSONObject的parseObject方法来解析JSON字符串!!!0  zhangsan1  lisi2  wangwu3  maliu这个是用JSONObject的parseObject方法并执行返回类型来解析JSON字符串!!!0  zhangsan1  lisi2  wangwu3  maliu{"0":"zhangsan","1":"lisi","2":"wangwu","3":"maliu"}Process finished with exit code 0

 8、数组转字符串数组

package Udf;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import java.util.Arrays;//方法4:数组转成数组字符串public class ArrToString3 extends ScalarFunction {    private static final  long serialVersionUID=1L;    public ArrToString3() {    }    @Override    public void open(FunctionContext context) throws Exception {        super.open(context);    }    @Override    public void close() throws Exception {        super.close();    }    public String eval(String[] arr)    {        try {           if(arr !=null && arr.length>0)           {              return Arrays.toString(arr);           }           else           {               return null;           }        }        catch(Exception ex)        {            return ex.getMessage();        }    }}

9、数组转json

package Udf;import org.apache.flink.table.functions.FunctionContext;import org.apache.flink.table.functions.ScalarFunction;import org.json.JSONArray;//方法1:数组转成json数组public class ArrToJson extends ScalarFunction {    private static final long serialVersionUID = 1L;    @Override    public void open(FunctionContext context) throws Exception {        super.open(context);    }    @Override    public void close() throws Exception {        super.close();    }    public ArrToJson() {    }    public String eval(String[] arr) {        try {            if (arr != null && arr.length > 0) {                JSONArray jsonArray=new JSONArray(arr);                return jsonArray.toString();            } else {                return null;            }        } catch (Exception ex) {            return ex.getMessage();        }    }}

 10、LogMapInfo数据格式处理(logmap字段解析udf)

1、模型包

1.CellInfo
package Test.Model;/** * @program: FlinkUdf * @description: CellInfo * @author: BigData * @create: 2020-11-16 14:50 **/public class CellInfo {    private String mcc;    private String mnc;    private String ci;    private String pci;    private String tac;    private String type;    public String getMcc() {        return mcc;    }    public void setMcc(String mcc) {        this.mcc = mcc;    }    public String getMnc() {        return mnc;    }    public void setMnc(String mnc) {        this.mnc = mnc;    }    public String getCi() {        return ci;    }    public void setCi(String ci) {        this.ci = ci;    }    public String getPci() {        return pci;    }    public void setPci(String pci) {        this.pci = pci;    }    public String getTac() {        return tac;    }    public void setTac(String tac) {        this.tac = tac;    }    public String getType() {        return type;    }    public void setType(String type) {        this.type = type;    }}
2.GPS
package Test.Model;/** * @program: FlinkUdf * @description: Gps * @author: BigData * @create: 2020-11-16 14:51 **/public class Gps {    private String longitude;    private String latitude;    public String getLongitude() {        return longitude;    }    public void setLongitude(String longitude) {        this.longitude = longitude;    }    public String getLatitude() {        return latitude;    }    public void setLatitude(String latitude) {        this.latitude = latitude;    }}
package Test.Model;/** * @program: FlinkUdf * @description: Gps * @author: BigData * @create: 2020-11-16 14:51 **/public class Gps {    private String longitude;    private String latitude;    public String getLongitude() {        return longitude;    }    public void setLongitude(String longitude) {        this.longitude = longitude;    }    public String getLatitude() {        return latitude;    }    public void setLatitude(String latitude) {        this.latitude = latitude;    }}
3.StationInfo
package Test.Model;import java.util.List;/** * @program: FlinkUdf * @description: 地铁模型 * @author: BigData * @create: 2020-11-16 14:49 **/public final class StationInfo {    private String station;    private CellInfo cell_info;    private List
wifi_infos; private String timestamp; private String reason; private String imei; private String guid; private Gps gps; private String city; private String group; private String version_name; private String version_code; public String getStation() { return station; } public void setStation(String station) { this.station = station; } public CellInfo getCell_info() { return cell_info; } public void setCell_info(CellInfo cell_info) { this.cell_info = cell_info; } public List
getWifi_infos() { return wifi_infos; } public void setWifi_infos(List
wifi_infos) { this.wifi_infos = wifi_infos; } public String getTimestamp() { return timestamp; } public void setTimestamp(String timestamp) { this.timestamp = timestamp; } public String getReason() { return reason; } public void setReason(String reason) { this.reason = reason; } public String getImei() { return imei; } public void setImei(String imei) { this.imei = imei; } public String getGuid() { return guid; } public void setGuid(String guid) { this.guid = guid; } public Gps getGps() { return gps; } public void setGps(Gps gps) { this.gps = gps; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } public String getGroup() { return group; } public void setGroup(String group) { this.group = group; } public String getVersion_name() { return version_name; } public void setVersion_name(String version_name) { this.version_name = version_name; } public String getVersion_code() { return version_code; } public void setVersion_code(String version_code) { this.version_code = version_code; }}

2、工具包

1、LogMapInfoSplit(logmap数据格式解析)
package Test.Util;import java.lang.reflect.Field;import java.nio.charset.StandardCharsets;import java.sql.Timestamp;import java.util.Base64;import java.util.HashMap;import java.util.Map;import Test.Model.StationInfo;import org.apache.commons.lang3.StringUtils;import org.apache.commons.lang3.time.DateFormatUtils;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.table.functions.TableFunction;import org.apache.flink.types.Row;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import com.google.gson.Gson;/** * @program: FlinkUdf * @description: LogMap字段解析 * @author: BigData * @create: 2020-11-16 14:53 **/public class LogMapInfoSplit extends TableFunction
{ private static final long serialVersionUID = 5485869053798534732L; private static Gson gson = new Gson(); private static Logger logger= LoggerFactory.getLogger(LogMapInfoSplit.class); /** * 接受参数格式转换 * @param param */ public void eval(String param) { if(StringUtils.isBlank(param)) { logger.debug("param is empty"); return; } try { final Base64.Decoder decoder = Base64.getUrlDecoder(); String ocaUploadJson = new String( decoder.decode(param.replace(" ", "+").replace("$", "=").getBytes(StandardCharsets.UTF_8)), StandardCharsets.UTF_8); logger.info("sourceParam :" + ocaUploadJson); if (StringUtils.isBlank(ocaUploadJson)) { return; } StationInfo stationInfo = gson.fromJson(ocaUploadJson, StationInfo.class); if (stationInfo == null) return; logger.info("received wifiInfo:" + stationInfo.toString()); String[] wifiInfos = null; if (stationInfo.getWifi_infos() != null && stationInfo.getWifi_infos().size() != 0) { wifiInfos = stationInfo.getWifi_infos().toArray(new String[stationInfo.getWifi_infos().size()]); } Row row = new Row(13); row.setField(0, stationInfo.getStation()); row.setField(1, stationInfo.getImei()); row.setField(2, objectToMap(stationInfo.getCell_info())); row.setField(3, wifiInfos); row.setField(4, stationInfo.getCity()); row.setField(5, stationInfo.getReason()); logger.info("station is:" + stationInfo.getStation()); Map
objMap = objectToMap(stationInfo.getGps()); logger.info("obj map is:" + objMap); row.setField(6, objMap); row.setField(7, new Timestamp(Long.parseLong(stationInfo.getTimestamp()))); row.setField(8, stationInfo.getGroup()); row.setField(9, DateFormatUtils.ISO_DATE_FORMAT.format(Long.parseLong(stationInfo.getTimestamp()))); row.setField(10,stationInfo.getGuid()); row.setField(11,stationInfo.getVersion_name()); row.setField(12,stationInfo.getVersion_code()); collect(row); } catch (Exception e) { logger.error(e.getMessage(), e); } } /** * 指定返回类型 */ @Override public TypeInformation
getResultType() { return Types.ROW(Types.STRING, Types.STRING, Types.MAP(Types.STRING, Types.STRING) , Types.OBJECT_ARRAY(Types.STRING), Types.STRING, Types.STRING, Types.MAP(Types.STRING, Types.STRING), Types.SQL_TIMESTAMP, Types.STRING, Types.STRING,Types.STRING,Types.STRING,Types.STRING); } /** * object转换为Map * @param obj * @return */ public static Map
objectToMap(Object obj) { if(obj==null) { return null; } else { Map
map = new HashMap
(); Class
clazz = obj.getClass(); for (Field field : clazz.getDeclaredFields()) { field.setAccessible(true); String fieldName = field.getName(); try { map.put(fieldName, String.valueOf(field.get(obj))); } catch (Exception e) { logger.error(e.getMessage()); } } return map; } }}

 flinksql代码:

INSERT INTO  software_research.subwayaware_infoSELECT  station,  logMap_imei as `imei`,  cell_info,  wifi_infos,  city,  reason,  gps,  CAST(createtime as VARCHAR) as `timestamp`,  logMap_group as `group`,  datestr,  model,  event_id,  guid,  version_name,  version_codeFROM  software_research.subwayaware_dcs  LEFT JOIN LATERAL TABLE(logMapInfoSplit(log_map ['oca_upload_json'])) AS T(    station,    logMap_imei,    cell_info,    wifi_infos,    city,    reason,    gps,    createtime,    logMap_group,    datestr,    guid,    version_name,    version_code  ) ON TRUE

 

上一篇:大数据集群运维(63)IDEA类和方法的注释设置
下一篇:python之(21)基础总结(4)

发表评论

最新留言

很好
[***.229.124.182]2025年04月10日 04时32分20秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章