RFE模型从入门到实践
发布日期:2022-02-27 02:38:02 浏览次数:31 分类:技术文章

本文共 11483 字,大约阅读时间需要 38 分钟。

  • RFE
    • RFE模型基于用户的普通行为(非转化或交易行为)产生----采集数据得到
  • 含义
    • 最近一次访问时间R( Recency)
    • 访问频率 F(Frequency)
    • 页面互动度 E(Engagements)
  • RFE模型用途
    • 未登录用户的价值数据分析
    • 登录用户价值数据分析
    • 用户活跃分群或价值区分
  • 数据来源
    • 对于RFE的数据来源, 从企业自己监控的用户行为日志获取。
  • RFE模型开发思路分析
    • 思路1:基于三个维度值做用户群体划分和解读,对用户的活跃度做分析。哪一个维度数值较低可以通过业务去提高
    • 思路2:通过构建的R-访问时间,F-访问网站的频率,E访问的互动程度等作为数据挖掘或机器学习的特征实现机器学习算法的建模,根据模型进行预测输出。
  • 给定分成几个层次的划分
    • 划分方法1:
      - 1流失 (7天以上无访问)
      - 2不活跃(7天内未访问)
      - 3新增 (注册并访问)
      - 4回流 (3天内访问至少1次)
      - 5活跃 (2天内访问至少1次)
      - 6忠诚(1天内访问2次及以上,每次访问页面不重复)

RFE模型的开发

模型开发的初步步骤

  • 从Hbase中获取业务数据
  • 获取四级标签以及对应的五级标签
  • 使用SparkSql的函数计算Recency、Frequency、Engagements数据
  • 使用业务规则将RFE的数据进行类别化的
  • 得到RFE的打分的数据
  • 将RFE的特征数据进行特征工程
  • 使用KMeans聚类的算法对数据进行聚类—4类
  • 模型预测分析–模型保存
  • 将model的聚类中心获取
  • 将五级标签和聚类中心通过zip函数拉链操作完成PredictToTags规则定义
  • 使用自定义的UDF函数将预测值Predict与Map类型的规则进行匹配得到预测值
  • 完成Hbase的工具类将数据保存在Hbase中tbl_profile

五级标签和四级标签的数据

  • 四级标签

    • id=45的数据
    • inType=HBase##  输入的数据源来自于Hbase中zkHosts=192.168.0.100##  Host的ipzkPort=2181##  Host的port端口hbaseTable=tbl_logs##  业务数据的表family=detail##selectFields=global_user_id,loc_url,log_time 选择的数据字段
    • 具体的含义:
    • 标签名称:用户活跃度标签分类:电商-某商城-行为属性更新周期:1天业务含义:用户活跃度分为非常活跃、活跃、不活跃及非常不活跃四类标签规则:inType=hbasezkHosts=bigdata-cdh01.ljf.cnzkPort=2181hbaseTable=tbl_tag_logsfamily=detailselectFieldNames=global_user_id,loc_url,log_time程序入口:cn.ljf.tags.models.ml.RfeModel算法名称:KMEANS算法引擎:tags-model_2.11.jar模型参数:--driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1
  • 五级标签:

    • 1)、属性值【非常活跃】标签名称:非常活跃标签含义:用户活跃度为非常活跃标签规则:12)、属性值【活跃】标签名称:活跃标签含义:用户活跃度为活跃标签规则:23)、属性值【不活跃】标签名称:不活跃标签含义:用户活跃度为不活跃标签规则:34)、属性值【非常不活跃】标签名称:非常不活跃标签含义:用户活跃度为非常不活跃标签规则:4
  • Hbase表中的字段

    • 1 column=detail:global_user_id, timestamp=1574240607242,value=4241 column=detail:loc_url, timestamp=1574240607242,value=http://m.eshop.com/m obile/coupon/getCoupons.html?couponsId=33771 column=detail:log_time, timestamp=1574240607242, value=2019-08-13 03:03:55
    • 用户的ID
    • 用户的URL
    • 用户日志产生的时间
  • RFE的打分规则

    • 需要跟产品经理对接或业务专家对接业务规则数据
    • 计算R值:最近一次访问时间,距离今天的天数 - max -> datediff计算F值:所有访问浏览量(PV) - count---page view计算E值:所有访问页面量(不包含重复访问页面)(UV) - count distinctR:0-15天=5分,16-30天=4分,31-45天=3分,46-60天=2分,大于61天=1分F:≥400=5分,300-399=4分,200-299=3分,100-199=2分,≤99=1分E:≥250=5分,200-249=4分,150-199=3分,149-50=2分,≤49=1分
  • 目标:

    • 45 用户活跃度46 非常活跃 147 活跃 248 不活跃 349 非常不活跃 4

模型的详细开发步骤

1-主函数

  • 获取四级标签
  • 根据四级标签获取业务数据表
  • 重写compute方法

2-计算RFE的值

  • 参考RFM的计算
  • SparkSql的datediff(start,end)-----R最近登录时间
  • SparkSql的Count直接计算------F访问频率
  • SparkSql的CountDIstinct-------E访问程度

3-计算RFE的打分数据

  • R:0-15天=5分,16-30天=4分,31-45天=3分,46-60天=2分,大于61天=1分F:≥400=5分,300-399=4分,200-299=3分,100-199=2分,≤99=1分E:≥250=5分,200-249=4分,150-199=3分,149-50=2分,≤49=1分
  • 代码:
  • package cn.ljf.up.mltestimport cn.ljf.up.base.BaseModelimport org.apache.spark.sql._import org.apache.spark.sql.functions._object RFEModelTest extends BaseModel {  def main(args: Array[String]): Unit = {    execute()  }  /**    * 获取标签id(即模型id,该方法应该在编写不同模型时进行实现)    *    * @return    */  override def getTagID(): Int = 45  /**    * 开始计算    *    * @param fiveDF  MySQL中的5级规则 id,rule    * @param hbaseDF 根据selectFields查询出来的HBase中的数据    * @return userid,tagIds    */  override def compute(fiveDF: DataFrame, hbaseDF: DataFrame): DataFrame = {    import spark.implicits._    println("===================1、fiveDF================")    fiveDF.show(10, false)    //    +---+----+    //    |id |rule|    //    +---+----+    //    |46 |1   |    //      |47 |2   |    //      |48 |3   |    //      |49 |4   |    //      +---+----+    println("===================2、hBaseDF================")    hbaseDF.show(10, false)    hbaseDF.printSchema()    //    +--------------+-------------------------------------------------------------------+-------------------+    //    |global_user_id|loc_url                                                            |log_time           |    //    +--------------+-------------------------------------------------------------------+-------------------+    //    |424           |http://m.eshop.com/mobile/coupon/getCoupons.html?couponsId=3377    |2019-08-13 03:03:55|    //      |619           |http://m.eshop.com/?source=mobile                                  |2019-07-29 15:07:41|    //      |898           |http://m.eshop.com/mobile/item/11941.html                          |2019-08-14 09:23:44|    //      |642           |http://www.eshop.com/l/2729-2931.html                              |2019-08-11 03:20:17|    //      |130           |http://www.eshop.com/                                              |2019-08-12 11:59:28|    //      |515           |http://www.eshop.com/l/2723-0-0-1-0-0-0-0-0-0-0-0.html             |2019-07-23 14:39:25|    //      |274           |http://www.eshop.com/                                              |2019-07-24 15:37:12|    //      |772           |http://ck.eshop.com/login.html                                     |2019-07-24 07:56:49|    //      |189           |http://m.eshop.com/mobile/item/9673.html                           |2019-07-26 19:17:00|    //      |529           |http://m.eshop.com/mobile/search/_bplvbiwq_XQS75_btX_ZY1328-se.html|2019-07-25 23:18:37|    //      +--------------+-------------------------------------------------------------------+-------------------+    //    only showing top 10 rows    //    //    root    //    |-- global_user_id: string (nullable = true)    //    |-- loc_url: string (nullable = true)    //    |-- log_time: string (nullable = true)    //0.定义常量字符串,避免后续拼写错误    val recencyStr = "recency"    val frequencyStr = "frequency"    val engagementsStr = "engagements"    val featureStr = "feature"    val scaleFeatureStr = "scaleFeature"    val predictStr = "predict"    println("===================3、从原始数据中计算RFE的各个值================")    val recencyAggColum: Column = functions.datediff(date_sub(current_timestamp(), 60), max("log_time")) as recencyStr    val frequecyAggColumn: Column = functions.count("loc_url") as frequencyStr    val engagementAggColumn: Column = functions.countDistinct("loc_url") as engagementsStr    //* Returns the number of days from `start` to `end`.    val tempDF: DataFrame = hbaseDF      .groupBy("global_user_id")      .agg(recencyAggColum, frequecyAggColumn, engagementAggColumn)    tempDF.show(10, false)    /* +--------------+-------+---------+-----------+     |global_user_id|recency|frequency|engagements|     +--------------+-------+---------+-----------+     |296           |18     |380      |227        |       |467           |18     |405      |267        |       |675           |18     |370      |240        |       |691           |18     |387      |244        |       |829           |18     |404      |269        |       |125           |18     |375      |246        |       |451           |18     |347      |224        |*/    println("===================4、计算RFE的各个值的评分RFEScore================")    /*    * R:0-15天=5分,16-30天=4分,31-45天=3分,46-60天=2分,大于61天=1分    * F:≥400=5分,300-399=4分,200-299=3分,100-199=2分,≤99=1分    * E:≥250=5分,200-249=4分,150-199=3分,149-50=2分,≤49=1分    * */    val recencyScore: Column = functions      .when(col(recencyStr).between(0, 15), 5)      .when(col(recencyStr).between(16, 30), 4)      .when(col(recencyStr).between(31, 45), 3)      .when(col(recencyStr).between(46, 60), 2)      .when(col(recencyStr) > 61, 5)      .as(recencyStr)    val frequencyScore: Column = when(col(frequencyStr).geq(400), 5)      .when(col(frequencyStr).between(300, 399), 4)      .when(col(frequencyStr).between(200, 299), 3)      .when(col(frequencyStr).between(100, 199), 2)      .when(col(frequencyStr).leq(99), 1)      .as(frequencyStr)    val engagementsScore: Column = when(col(engagementsStr).geq(250), 5)      .when(col(engagementsStr).between(200, 249), 4)      .when(col(engagementsStr).between(150, 199), 3)      .when(col(engagementsStr).between(50, 149), 2)      .when(col(engagementsStr).leq(49), 1)      .as(engagementsStr)    val RFMScore: Dataset[Row] = tempDF      .select($"global_user_id".as("userId"), recencyScore, frequencyScore, engagementsScore)      .where('userId.isNotNull and col(recencyStr).isNotNull and col(frequencyStr).isNotNull and col(engagementsStr).isNotNull)    RFMScore.show(10,false)    /*    only showing top 10 rows    +------+-------+---------+-----------+    |userId|recency|frequency|engagements|    +------+-------+---------+-----------+    |296   |4      |4        |4          |    |467   |4      |5        |5          |    |675   |4      |4        |4          |    |691   |4      |4        |4          |    |829   |4      |5        |5          |    |125   |4      |4        |4          |    |451   |4      |4        |4          |    |800   |4      |4        |4          |    |853   |4      |4        |5          |    |944   |4      |4        |5          |    +------+-------+---------+-----------+    only showing top 10 rows    */    null  }}
  • 以上的代码仅仅完成RFE的打分的值的求解

4-特征工程

  • 代码
  • println("===================5、利用RFEScore实现特征工程================")val vectorAssembler: VectorAssembler = new VectorAssembler().setInputCols(Array(recencyStr, frequencyStr, engagementsStr)).setOutputCol(featureStr)val vecDF: DataFrame = vectorAssembler.transform(RFMScoreDF)//MinMaxSclaerval scaler: MinMaxScaler = new MinMaxScaler().setInputCol(featureStr).setOutputCol("minMaxScaler")val scalerModel: MinMaxScalerModel = scaler.fit(vecDF)val scalerDF: DataFrame = scalerModel.transform(vecDF)

5-训练模型并给出结果

  • 代码
  • println("===================6、建立KMeans算法模型---训练并保存模型================")val path = "/model/RFEModel123"var model: KMeansModel = nullif (HDFSUtils.getInstance().exists(path)) {  println("模型已经存在,直接进行加载模型")  model = KMeansModel.load(path)} else {  println("模型不存在,重新训练模型")  val means: KMeans = new KMeans()    .setK(4)    .setPredictionCol(predictStr)    .setFeaturesCol("minMaxScaler")  model = means.fit(scalerDF)  val WSSSE: Double = model.computeCost(scalerDF)  println("WSSSE result is:",WSSSE)  model.save(path) //fit的数据必须包含有minMaxScaler字段的数据}val result: DataFrame = model.transform(scalerDF)

6-获取聚类类簇中心点,进行数据转换

  • 思路:
    • 获取聚类中心
    • 将聚类中心的R的值、F的值、E的值进行累加
    • 对求和之后的数据进行降序排序
    • 得到的由高度活跃到非常不活跃的映射
  • 代码:
    • //model.clusterCenters(0)----【R、F、E】的值val tempRDD: immutable.IndexedSeq[(Int, Double)] = model.clusterCenters.indices.map(i => (i, model.clusterCenters(i).toArray.sum))tempRDD.foreach(println(_))//    (0,12.0)//    (1,13.0)//    (2,14.0)//    (3,13.0)//上述得到的结果一个是index,一个对应的是累加的和   (1,【R\F\E】)  R+F+E//排序-----业务是具有比较高的RFE之和的数据应该成为高活跃度val sortIndexAndRFE: immutable.IndexedSeq[(Int, Double)] = tempRDD.sortBy(_._2).reversesortIndexAndRFE.foreach(println(_))

7-和五级标签对应为用户打标签

  • 思路:
    • 获取五级标签和sortIndexAndRFE转化为RDD进行zip的拉链操作
      • zip拉链的效果是将predict+tages+tagid+rule
    • 从拉链的结果总选择
      • predict+tagid
  • 代码:
    • println("===================9、将上面的排好序的聚类编号和聚类中心与5级规则进行对应================")//7.将上面的排好序的聚类编号和聚类中心与5级规则进行对应val fiveDS: Dataset[(Long, String)] = fiveDF.as[(Long,String)]val indexAndRFE: Dataset[(Int, Double)] = sortIndexAndRFE.toDS()val tempRDDTest: RDD[((Long, String), (Int, Double))] = fiveDS.rdd.repartition(1).zip(indexAndRFE.rdd.repartition(1))tempRDDTest.foreach(println(_))/*(predict 聚类中心),(tagid,rule) ((2,14.0),(46,1))((3,13.0),(47,2))((1,13.0),(48,3))((0,12.0),(49,4)) */val ruleDF: DataFrame = tempRDDTest.map(x=>(x._1._1,x._2._1)).toDF("predict","tagId")

8-使用UDF函数为用户打上标签并存储在HBase中

  • 自定义函数UDF实现
  • 转化为userid和tagid存储在Hbase中
  • 代码
  • println("===================10、使用UDF用户组定义函数实现标签映射================")//1-ruleDF转化为Map结构val ruleMap: collection.Map[Int, Long] = ruleDF.as[(Int, Long)].rdd.collectAsMap() //"predict"=>"tagId"//2-自定义UDF函数将key对应的value找出来val predictToTages: UserDefinedFunction = udf((predcit: Int) => {  ruleMap(predcit)})//使用udf函数进行映射分析val newDF: DataFrame = result.select($"userId",predictToTages('predict).as("tagId"))newDF.show(10,false)null

转载地址:https://blog.csdn.net/weixin_43401381/article/details/112380557 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:VS2019+VisualGDB+STM32CubeMx开发stm32程序
下一篇:四 Nifi 处理器 初体验以及常用组件说明

发表评论

最新留言

哈哈,博客排版真的漂亮呢~
[***.90.31.176]2024年04月06日 13时09分27秒