4-36Flink Distributed Cache分布式缓存
发布日期:2021-07-01 02:23:23 浏览次数:2 分类:技术文章

本文共 2950 字,大约阅读时间需要 9 分钟。

Flink offers a distributed cache, similar to Apache Hadoop, to make files locally accessible to parallel instances of user functions. This functionality can be used to share files that contain static external data such as dictionaries or machine-learned regression models.

The cache works as follows. A program registers a file or directory of a  under a specific name in its ExecutionEnvironment as a cached file. When the program is executed, Flink automatically copies the file or directory to the local filesystem of all workers. A user function can look up the file or directory under the specified name and access it from the worker’s local filesystem.

The distributed cache is used as follows:

Register the file or directory in the ExecutionEnvironment.

val env = ExecutionEnvironment.getExecutionEnvironment// register a file from HDFSenv.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")// register a local executable file (script, executable, ...)env.registerCachedFile("file:///path/to/exec/file", "localExecFile", true)// define your program and execute...val input: DataSet[String] = ...val result: DataSet[Integer] = input.map(new MyMapper())...env.execute()

Access the cached file in a user function (here a MapFunction). The function must extend a  class because it needs access to the RuntimeContext.

// extend a RichFunction to have access to the RuntimeContextclass MyMapper extends RichMapFunction[String, Int] {  override def open(config: Configuration): Unit = {    // access cached file via RuntimeContext and DistributedCache    val myFile: File = getRuntimeContext.getDistributedCache.getFile("hdfsFile")    // read the file (or navigate the directory)    ...  }  override def map(value: String): Int = {    // use content of cached file    ...  }}

 实例代码:

package com.imooc.flink.course04import java.io.Fileimport java.utilimport org.apache.commons.io.FileUtilsimport org.apache.flink.api.common.functions.RichMapFunctionimport org.apache.flink.api.scala._import org.apache.flink.configuration.Configurationobject DistributedCacheApp {  def main(args: Array[String]): Unit = {    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment    val filePath="file:///F://data/hello.txt"    //1.注册一个本地/DHFS文件    env.registerCachedFile(filePath,"localFile")    val data: DataSet[String] = env.fromElements("hadoop","spark","flink","strom")    data.map(new RichMapFunction[String,String] {      //2.在open方法中获取到分布式缓存的内容即可      override def open(parameters: Configuration): Unit = {        val localFile: File = getRuntimeContext.getDistributedCache.getFile("localFile")        val lines: util.List[String] = FileUtils.readLines(localFile)//java        /**         * 此时会出现一个异常,java集合和Scala集合不兼容的问题         */        import scala.collection.JavaConverters._        for (ele <- lines.asScala){          println(ele)      }}      override def map(value: String): String = {        value      }    }).print()  }}

 

转载地址:https://mapengsen.blog.csdn.net/article/details/108899976 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:5-3socket创建datastream
下一篇:Scala计数器开发

发表评论

最新留言

表示我来过!
[***.240.166.169]2024年04月07日 17时05分27秒