MapReduce Kmeans聚类算法
发布日期:2025-04-11 23:39:18 浏览次数:9 分类:精选文章

本文共 4788 字,大约阅读时间需要 15 分钟。

MapReduce 实现 K-means 算法:基于 Hadoop 的分布式聚类

近期在学习 MapReduce 实现 K-means 算法的相关代码,虽然例子不错,但注释较少,参数较多,对新手理解有一定难度。因此,根据个人理解对代码进行了详细注释和优化,现将实现过程详细说明如下:

一、核心实现逻辑

  • Map 阶段:

    • 每读取一条数据,与中心点进行对比,确定该条记录属于哪个聚类中心。
    • 以聚类中心 ID 为 key,以记录为 value 的形式输出。
  • Reduce 阶段:

    • 对同一聚类中心的所有记录进行归并,计算这些记录的平均值,得到新的聚类中心。
    • 比较新聚类中心与原聚类中心的值,如果不同,则清空原聚类中心的数据文件,将新聚类中心写入中心文件中。
    • 如果新聚类中心与原聚类中心相同,则删除 Reduce 阶段的输出目录,运行无 Reduce 的任务将聚类中心 ID 与值对应输出。
  • 二、代码解析

  • Map 类:

    public class MapReduce {    public static class Map extends Mapper
    { private List
    centers; private int k; protected void setup(Context context) throws IOException, InterruptedException { centers = Utils.getCentersFromHDFS(context.getConfiguration().get("centersPath"), false); k = centers.size(); } protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { ArrayList
    fileds = Utils.textToArray(value); int fileSize = fileds.size(); double minDistance = 99999999; int centerIndex = 0; for (int i = 0; i < k; i++) { double currentDistance = 0.0; for (int j = 0; j < fileSize; j++) { double centerPoint = Math.abs(centers.get(i).get(j)); double filedValue = Math.abs(fileds.get(j)); currentDistance += Math.abs(filedValue - centerPoint); } if (currentDistance < minDistance) { minDistance = currentDistance; centerIndex = i; } } context.write(new IntWritable(centerIndex + 1), value); } }}
  • Reduce 类:

    public static class Reduce extends Reducer
    { protected void reduce(IntWritable key, Iterable
    value, Context context) throws IOException, InterruptedException { ArrayList
    > filedsList = new ArrayList<>(); while (value.iterator().hasNext()) { Text line = value.iterator().next(); ArrayList
    tempList = Utils.textToArray(line); filedsList.add(tempList); } int fileSize = filedsList.get(0).size(); double[] avg = new double[fileSize]; for (int i = 0; i < fileSize; i++) { for (int j = 0; j < filedsList.size(); j++) { avg[i] += filedsList.get(j).get(i); } } double minDistance = 0.0; int bestK = 0; for (int k = 0; k < k; k++) { double distance = 0.0; for (int i = 0; i < fileSize; i++) { distance += Math.abs(avg[i] - centers.get(k).get(i)); } if (distance < minDistance) { minDistance = distance; bestK = k; } } if (bestK != -1) { context.write(new Text(String.format("%s,%f", bestK + 1, avg[0]))); } }}
  • 三、核心工具类1. Utils 类:   ```java   public static class Utils {       public static ArrayList
    > getCentersFromHDFS(String centersPath, boolean isDirectory) throws IOException { ArrayList
    > result = new ArrayList<>(); Path path = new Path(centersPath); Configuration conf = new Configuration(); FileSystem fileSystem = path.getFileSystem(conf); if (isDirectory) { FileStatus[] listFile = fileSystem.listStatus(path); for (int i = 0; i < listFile.length; i++) { result.addAll(getCentersFromHDFS(listFile[i].getPath().toString(), false)); } return result; } FSDataInputStream fsis = fileSystem.open(path); LineReader lineReader = new LineReader(fsis, conf); Text line = new Text(); while (lineReader.readLine(line) > 0) { ArrayList
    tempList = textToArray(line); result.add(tempList); } lineReader.close(); return result; } public static void deletePath(String pathStr) throws IOException { Configuration conf = new Configuration(); Path path = new Path(pathStr); FileSystem hdfs = path.getFileSystem(conf); hdfs.delete(path, true); } public static ArrayList
    textToArray(Text text) { ArrayList
    list = new ArrayList<>(); String[] fields = text.toString().split(","); for (int i = 0; i < fields.length; i++) { list.add(Double.parseDouble(fields[i])); } return list; } }

    四、数据集与运行结果

    • 数据集:提供一个简单的数值数据集,用于测试 MapReduce K-means 算法。
    • 运行结果:可以通过对比初始中心与最终聚类中心进行对比,验证算法的正确性。

    以上代码通过 MapReduce 实现了一个分布式的 K-means 算法,适用于大规模数据集的聚类分析。

    上一篇:MapReduce 使用案例
    下一篇:MapReduce Java API实例-统计平均成绩

    发表评论

    最新留言

    能坚持,总会有不一样的收获!
    [***.219.124.196]2025年05月20日 12时54分36秒