
MapReduce Kmeans聚类算法
发布日期:2025-04-11 23:39:18
浏览次数:9
分类:精选文章
本文共 4788 字,大约阅读时间需要 15 分钟。
MapReduce 实现 K-means 算法:基于 Hadoop 的分布式聚类
近期在学习 MapReduce 实现 K-means 算法的相关代码,虽然例子不错,但注释较少,参数较多,对新手理解有一定难度。因此,根据个人理解对代码进行了详细注释和优化,现将实现过程详细说明如下:
一、核心实现逻辑
Map 阶段:
- 每读取一条数据,与中心点进行对比,确定该条记录属于哪个聚类中心。
- 以聚类中心 ID 为 key,以记录为 value 的形式输出。
Reduce 阶段:
- 对同一聚类中心的所有记录进行归并,计算这些记录的平均值,得到新的聚类中心。
- 比较新聚类中心与原聚类中心的值,如果不同,则清空原聚类中心的数据文件,将新聚类中心写入中心文件中。
- 如果新聚类中心与原聚类中心相同,则删除 Reduce 阶段的输出目录,运行无 Reduce 的任务将聚类中心 ID 与值对应输出。
二、代码解析
Map 类:
public class MapReduce { public static class Map extends Mapper{ private List centers; private int k; protected void setup(Context context) throws IOException, InterruptedException { centers = Utils.getCentersFromHDFS(context.getConfiguration().get("centersPath"), false); k = centers.size(); } protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { ArrayList fileds = Utils.textToArray(value); int fileSize = fileds.size(); double minDistance = 99999999; int centerIndex = 0; for (int i = 0; i < k; i++) { double currentDistance = 0.0; for (int j = 0; j < fileSize; j++) { double centerPoint = Math.abs(centers.get(i).get(j)); double filedValue = Math.abs(fileds.get(j)); currentDistance += Math.abs(filedValue - centerPoint); } if (currentDistance < minDistance) { minDistance = currentDistance; centerIndex = i; } } context.write(new IntWritable(centerIndex + 1), value); } }}
Reduce 类:
public static class Reduce extends Reducer{ protected void reduce(IntWritable key, Iterable value, Context context) throws IOException, InterruptedException { ArrayList > filedsList = new ArrayList<>(); while (value.iterator().hasNext()) { Text line = value.iterator().next(); ArrayList tempList = Utils.textToArray(line); filedsList.add(tempList); } int fileSize = filedsList.get(0).size(); double[] avg = new double[fileSize]; for (int i = 0; i < fileSize; i++) { for (int j = 0; j < filedsList.size(); j++) { avg[i] += filedsList.get(j).get(i); } } double minDistance = 0.0; int bestK = 0; for (int k = 0; k < k; k++) { double distance = 0.0; for (int i = 0; i < fileSize; i++) { distance += Math.abs(avg[i] - centers.get(k).get(i)); } if (distance < minDistance) { minDistance = distance; bestK = k; } } if (bestK != -1) { context.write(new Text(String.format("%s,%f", bestK + 1, avg[0]))); } }}
三、核心工具类1. Utils 类: ```java public static class Utils { public static ArrayList> getCentersFromHDFS(String centersPath, boolean isDirectory) throws IOException { ArrayList > result = new ArrayList<>(); Path path = new Path(centersPath); Configuration conf = new Configuration(); FileSystem fileSystem = path.getFileSystem(conf); if (isDirectory) { FileStatus[] listFile = fileSystem.listStatus(path); for (int i = 0; i < listFile.length; i++) { result.addAll(getCentersFromHDFS(listFile[i].getPath().toString(), false)); } return result; } FSDataInputStream fsis = fileSystem.open(path); LineReader lineReader = new LineReader(fsis, conf); Text line = new Text(); while (lineReader.readLine(line) > 0) { ArrayList tempList = textToArray(line); result.add(tempList); } lineReader.close(); return result; } public static void deletePath(String pathStr) throws IOException { Configuration conf = new Configuration(); Path path = new Path(pathStr); FileSystem hdfs = path.getFileSystem(conf); hdfs.delete(path, true); } public static ArrayList textToArray(Text text) { ArrayList list = new ArrayList<>(); String[] fields = text.toString().split(","); for (int i = 0; i < fields.length; i++) { list.add(Double.parseDouble(fields[i])); } return list; } }
四、数据集与运行结果
- 数据集:提供一个简单的数值数据集,用于测试 MapReduce K-means 算法。
- 运行结果:可以通过对比初始中心与最终聚类中心进行对比,验证算法的正确性。
以上代码通过 MapReduce 实现了一个分布式的 K-means 算法,适用于大规模数据集的聚类分析。
发表评论
最新留言
能坚持,总会有不一样的收获!
[***.219.124.196]2025年05月20日 12时54分36秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
lodash常用API
2025-04-11
Log4j 1使用教程
2025-04-11
Log4j XML 配置
2025-04-11
Log4j 被曝核弹级漏洞,开发者炸锅了
2025-04-11
Log4j.xml和Log4j2.xml的简单认识 - log4j2/log4j的区别
2025-04-11
Log4j2 中format增加自定义的参数
2025-04-11
Log4j2 消停了,Logback 开始塌房了?
2025-04-11
log4j分离日志输出 自定义过滤 自定义日志文件
2025-04-11
Log4j日志级别
2025-04-11
log4j框架搭建
2025-04-11
Log4J的配置
2025-04-11
log4j的配置说明
2025-04-11
log4j补充
2025-04-11
Log4j输出到控制台成功,写入文件失败 - Log4j和commons log的整合
2025-04-11
Logback configuration error detected:D:\log\exchange-platform\info.2021-07-27.log (系统找不到指定的路径。)
2025-04-11
logback.xml 配置详解(1)
2025-04-11
logback.xml配置
2025-04-11
logback.xml配置导入spring无法启动:ch.qos.logback.core.joran.spi.JoranException: I/O error occurred while par
2025-04-11
logback的使用和logback.xml详解
2025-04-11