Scala计数器开发
发布日期:2021-07-01 02:23:23 浏览次数:2 分类:技术文章

本文共 1153 字,大约阅读时间需要 3 分钟。

package com.imooc.flink.course04import org.apache.flink.api.common.JobExecutionResultimport org.apache.flink.api.common.accumulators.LongCounterimport org.apache.flink.api.common.functions.RichMapFunctionimport org.apache.flink.api.scala._import org.apache.flink.configuration.Configurationimport org.apache.flink.core.fs.FileSystem.WriteModeobject CounterApp {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    val data = env.fromElements("hadoop","spark","flink","strom")    val info = data.map(new RichMapFunction[String,String]() {      //1.定义计数器      val counter = new LongCounter()      override def open(parameters: Configuration): Unit = {        //2.注册计数器        getRuntimeContext.addAccumulator("ele-counts",counter)      }      override def map(in: String): String = {        counter.add(1)        in      }    })    val filePath="file:///F://data/"    info.writeAsText(filePath,WriteMode.OVERWRITE).setParallelism(2)    val jobResult: JobExecutionResult = env.execute("CounterApp")        //3.获取计数器    val num = jobResult.getAccumulatorResult[Long]("ele-counts")    println("num:"+num)  }}

 

转载地址:https://mapengsen.blog.csdn.net/article/details/108899940 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:4-36Flink Distributed Cache分布式缓存
下一篇:IDEA自动勾选显示类型(specify type)

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年04月10日 18时13分06秒