大数据学习之Spark——03Spark代码初体验(Word Count)
发布日期:2021-05-06 17:19:30 浏览次数:14 分类:技术文章

本文共 4257 字,大约阅读时间需要 14 分钟。

一. Spark代码流程

  1. 创建SparkConf对象
    可以设置Application name。
    可以设置运行模式及资源需求。
  2. 创建SparkContext对象
  3. 基于Spark的上下文创建一个RDD,对RDD进行处理。
  4. 应用程序中要有Action类算子来触发Transformation类算子执行。
  5. 关闭Spark上下文对象SparkContext。
1. Scala代码
  1. 代码1
    package com.hjf.coreimport org.apache.spark.rdd.RDDimport org.apache.spark.{     SparkConf, SparkContext}object WordCount {       def main(args: Array[String]): Unit = {       	// 创建SparkConf对象    val conf: SparkConf = new SparkConf()    conf.setMaster("local").setAppName("word count")    // 创建SparkContext对象    val sc: SparkContext = new SparkContext(conf)    // 指定日志等级    sc.setLogLevel("Error")    // 按行读取文件中的内容    val line: RDD[String] = sc.textFile("./data/words.txt")    // 按空格切割    val word: RDD[String] = line.flatMap(_.split(" "))    // 将单词转成(单词, 1)的格式    val map: RDD[(String, Int)] = word.map(one => new Tuple2(one, 1))    // 按key进行分组累加    val result: RDD[(String, Int)] = map.reduceByKey((v1, v2) => v1 + v2)    // 打印    result.foreach(println(_))		// 简化版本:     // sc.textFile("./data/words.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).foreach(println(_))        // 关闭    sc.stop()  }}
  2. 简化版本:
    package com.hjf.coreimport org.apache.spark.rdd.RDDimport org.apache.spark.{     SparkConf, SparkContext}object WordCount {       def main(args: Array[String]): Unit = {         val conf: SparkConf = new SparkConf()    conf.setMaster("local").setAppName("word count")    val sc: SparkContext = new SparkContext(conf)    // 指定日志等级    sc.setLogLevel("Error")	    sc.textFile("./data/words.txt").flatMap(_.split(" "))    	.map((_, 1)).reduceByKey(_ + _).foreach(println(_))        // 关闭    sc.stop()  }}
2. Java代码
  1. Java7代码

    package java7;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;public class WordCount {         public static void main(String[] args) {             SparkConf conf = new SparkConf();        conf.setAppName("word count").setMaster("local");        JavaSparkContext sc = new JavaSparkContext(conf);        sc.setLogLevel("Error");        JavaRDD
    lines = sc.textFile("./data/words.txt"); JavaRDD
    words = lines.flatMap(new FlatMapFunction
    () { @Override public Iterator
    call(String s) throws Exception { String[] one = s.split(" "); return Arrays.asList(one).iterator(); } }); JavaPairRDD
    pairRDD = words.mapToPair(new PairFunction
    () { @Override public Tuple2
    call(String s) throws Exception { return new Tuple2
    (s, 1); } }); JavaPairRDD
    result = pairRDD.reduceByKey(new Function2
    () { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); result.foreach(new VoidFunction
    >() { @Override public void call(Tuple2
    ele) throws Exception { System.out.println(ele); } }); sc.stop(); }}
  2. java8代码

    package java8;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import scala.Tuple2;import java.util.Arrays;/** * @author Jiang锋时刻 * @create 2020-07-21 22:57 */public class WordCount {         public static void main(String[] args) {             SparkConf conf = new SparkConf();        conf.setMaster("local").setAppName("word count");        JavaSparkContext sc = new JavaSparkContext(conf);        sc.setLogLevel("Error");        JavaRDD
    lines = sc.textFile("./data/words.txt"); JavaRDD
    words = lines.flatMap(one -> Arrays.asList(one.split(" ")).iterator()); JavaPairRDD
    pairRDD = words.mapToPair(one -> new Tuple2<>(one, 1)); JavaPairRDD
    result = pairRDD.reduceByKey((v1, v2) -> v1 + v2); result.foreach(one -> System.out.println(one)); sc.stop(); }}
上一篇:大数据学习之Spark——04RDD概述及创建
下一篇:大数据学习之Spark——02Spark集群安装

发表评论

最新留言

留言是一种美德,欢迎回访!
[***.207.175.100]2025年03月12日 15时31分19秒