(二)NameNode剖析
发布日期:2021-11-18 17:47:38 浏览次数:9 分类:技术文章

本文共 35762 字,大约阅读时间需要 119 分钟。

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录


前言

* Namenode服务既管理了HDFS的集群的命名空间和 "inode table"。 * 一个HDFS集群里面只有一个namenode.(除了HA方案,或者是联邦) * *  Namenode管理了两张比较重要的表: *  1)一张表管理了文件与block之间的关系。 *  2)另一张表管理了block文件块与 DataNode主机之间的关系。 * 第一张表非常珍贵,存储到了磁盘上面。(因为文件与block块之间的关系是不会发生变化的) * 第二张表是每次namenode重启的时候重新构建出来的。 *  * Namenode服务是由三个重要的类支撑的: * 1)Namenode类: * 	  管理配置的参数 * 2)Namenode server: *     IPC Server: *        NameNodeRPCServer:开放端口,等待别人调用.比如:8020/9000 *     HTTP Server: *        NameNodeHttpServer:开放50070界面,我们可以通过这个界面了解HDFS的情况 * 3) FSNameSystem: *    这个类非常重要,管理了HDFS的元数据。

根据上一讲所介绍,NameNode属于服务端会有一个启动服务进程的过程,也就是main方法,

所以NameNode类的main方法如下:

public static void main(String argv[]) throws Exception {
//解析参数 if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
//参数异常退出 System.exit(0); } try {
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG); //TODO 创建NameNode的核心代码 NameNode namenode = createNameNode(argv, null); if (namenode != null) {
//让线程阻塞在这儿。这也就是为什么大家敲jps命令的时候能一直看到 namenode namenode.join(); } } catch (Throwable e) {
LOG.error("Failed to start namenode.", e); terminate(1, e); } }
public static NameNode createNameNode(String argv[], Configuration conf)      throws IOException {
LOG.info("createNameNode " + Arrays.asList(argv)); if (conf == null) conf = new HdfsConfiguration(); /** * 我们操作HDFS集群的时候会传进来如下的参数: * hdfs namenode -format * * hadoop-daemon.sh start namenode */ StartupOption startOpt = parseArguments(argv); if (startOpt == null) {
printUsage(System.err); return null; } setStartupOption(conf, startOpt); switch (startOpt) {
case FORMAT: {
boolean aborted = format(conf, startOpt.getForceFormat(), startOpt.getInteractiveFormat()); terminate(aborted ? 1 : 0); return null; // avoid javac warning } case GENCLUSTERID: {
System.err.println("Generating new cluster id:"); System.out.println(NNStorage.newClusterID()); terminate(0); return null; } case FINALIZE: {
System.err.println("Use of the argument '" + StartupOption.FINALIZE + "' is no longer supported. To finalize an upgrade, start the NN " + " and then run `hdfs dfsadmin -finalizeUpgrade'"); terminate(1); return null; // avoid javac warning } case ROLLBACK: {
boolean aborted = doRollback(conf, true); terminate(aborted ? 1 : 0); return null; // avoid warning } case BOOTSTRAPSTANDBY: {
String toolArgs[] = Arrays.copyOfRange(argv, 1, argv.length); int rc = BootstrapStandby.run(toolArgs, conf); terminate(rc); return null; // avoid warning } case INITIALIZESHAREDEDITS: {
boolean aborted = initializeSharedEdits(conf, startOpt.getForceFormat(), startOpt.getInteractiveFormat()); terminate(aborted ? 1 : 0); return null; // avoid warning } case BACKUP: case CHECKPOINT: {
NamenodeRole role = startOpt.toNodeRole(); DefaultMetricsSystem.initialize(role.toString().replace(" ", "")); return new BackupNode(conf, role); } case RECOVER: {
NameNode.doRecovery(startOpt, conf); return null; } case METADATAVERSION: {
printMetadataVersion(conf); terminate(0); return null; // avoid javac warning } case UPGRADEONLY: {
DefaultMetricsSystem.initialize("NameNode"); new NameNode(conf); terminate(0); return null; } default: {
DefaultMetricsSystem.initialize("NameNode"); //因为我们现在分析的是启动namenode的代码,所以代码肯定是走到这儿了。 //TODO 关键代码 return new NameNode(conf); } } }

开始创建NameNode

protected NameNode(Configuration conf, NamenodeRole role)       throws IOException {
this.conf = conf; this.role = role; setClientNamenodeAddress(conf); String nsId = getNameServiceId(conf); String namenodeId = HAUtil.getNameNodeId(conf, nsId); this.haEnabled = HAUtil.isHAEnabled(conf, nsId); state = createHAState(getStartupOption(conf)); this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf); this.haContext = createHAContext(); try {
initializeGenericKeys(conf, nsId, namenodeId); //我们去分析源码的时候,这样的关键的方法我们一定要留意。 //TODO 初始化的方法 initialize(conf); try {
haContext.writeLock(); state.prepareToEnterState(haContext); state.enterState(haContext); } finally {
haContext.writeUnlock(); } } catch (IOException e) {
this.stop(); throw e; } catch (HadoopIllegalArgumentException e) {
this.stop(); throw e; } this.started.set(true); }

进行初始化

protected void initialize(Configuration conf) throws IOException {
if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY); if (intervals != null) {
conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS, intervals); } } UserGroupInformation.setConfiguration(conf); loginAsNameNodeUser(conf); NameNode.initMetrics(conf, this.getRole()); StartupProgressMetrics.register(startupProgress); if (NamenodeRole.NAMENODE == role) {
// 需要大家有java基础 // hadoop1:50070 网站 // hadoop2:8088 网站 // 校园网 网站 //TODO 启动HTTPServer startHttpServer(conf); } this.spanReceiverHost = SpanReceiverHost.getInstance(conf); //TODO 加载元数据 //加载元数据这个事,目前对集群刚启动的时候,我们不做重点分析。 //在后面分析到管理元数据的时候,我们会回过头来重点的分析。 //为什么现在不分析(场景驱动)我们启动刚初始化,所以其实没什么元数据。 loadNamesystem(conf); //TODO 重要,这个就是Hadoop RPC //NameNodeRPCserver里面有两个主要的RPC服务: //1)ClientRPCServer: 主要管理的协议是:hdfs的客户端(用户)去操作HDFS的方法 //2)ServiceRPCServer: 主要管理的协议:服务之间互相进行的方法的调用(注册,心跳等) rpcServer = createRpcServer(conf); if (clientNamenodeAddress == null) {
// This is expected for MiniDFSCluster. Set it now using // the RPC server's bind address. clientNamenodeAddress = NetUtils.getHostPortString(rpcServer.getRpcAddress()); LOG.info("Clients are to use " + clientNamenodeAddress + " to access" + " this namenode/service."); } if (NamenodeRole.NAMENODE == role) {
httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); } pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor.start(); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); //TODO 启动一些公共的服务。NameNode RPC的服务就是在里面启动的 //1)进行资源检查,检查是否有磁盘足够存储元数据 hadoop-deamon.sh start namenode 检查存储元数据的磁盘 100M //2)进入安全模式检查,检查是否可以退出安全模式。 startCommonServices(conf); }

启动Http服务

private void startHttpServer(final Configuration conf) throws IOException {
//TODO getHttpServerBindAddress 里面设置了主机名和端口号 httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf)); httpServer.start(); httpServer.setStartupProgress(startupProgress); }
protected InetSocketAddress getHttpServerBindAddress(Configuration conf) {
//TODO 里面设置的是50070端口 InetSocketAddress bindAddress = getHttpServerAddress(conf); // If DFS_NAMENODE_HTTP_BIND_HOST_KEY exists then it overrides the // host name portion of DFS_NAMENODE_HTTP_ADDRESS_KEY. final String bindHost = conf.getTrimmed(DFS_NAMENODE_HTTP_BIND_HOST_KEY); if (bindHost != null && !bindHost.isEmpty()) {
bindAddress = new InetSocketAddress(bindHost, bindAddress.getPort()); } return bindAddress; }
void start() throws IOException {
HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf); final String infoHost = bindAddress.getHostName(); final InetSocketAddress httpAddr = bindAddress; final String httpsAddrString = conf.getTrimmed( DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT); InetSocketAddress httpsAddr = NetUtils.createSocketAddr(httpsAddrString); if (httpsAddr != null) {
// If DFS_NAMENODE_HTTPS_BIND_HOST_KEY exists then it overrides the // host name portion of DFS_NAMENODE_HTTPS_ADDRESS_KEY. final String bindHost = conf.getTrimmed(DFSConfigKeys.DFS_NAMENODE_HTTPS_BIND_HOST_KEY); if (bindHost != null && !bindHost.isEmpty()) {
httpsAddr = new InetSocketAddress(bindHost, httpsAddr.getPort()); } } /** * Hadoop喜欢自己封装东西。举个例子,本来就有RPC的服务。 * 但是NameNode自己封装了一个HadoopRPC * * * 这个地方也是类似,本来有HttpServer,Hadoop经过封装封装了 * HttpServer2服务 * */ HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf, httpAddr, httpsAddr, "hdfs", DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY); httpServer = builder.build(); if (policy.isHttpsEnabled()) {
// assume same ssl port for all datanodes InetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf.getTrimmed( DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT)); httpServer.setAttribute(DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY, datanodeSslPort.getPort()); } initWebHdfs(conf); httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn); httpServer.setAttribute(JspHelper.CURRENT_CONF, conf); //TODO 绑定一堆servlet //servlet越多,支持的功能就越多 setupServlets(httpServer, conf); //TODO 启动httpServer服务,对外开放50070端口 httpServer.start(); int connIdx = 0; if (policy.isHttpEnabled()) {
httpAddress = httpServer.getConnectorAddress(connIdx++); conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, NetUtils.getHostPortString(httpAddress)); } if (policy.isHttpsEnabled()) {
httpsAddress = httpServer.getConnectorAddress(connIdx); conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, NetUtils.getHostPortString(httpsAddress)); } }

在httpserver绑定servlet,servlet可以看作是一个功能(其实http也可以看作是一个rpc)

private static void setupServlets(HttpServer2 httpServer, Configuration conf) {
httpServer.addInternalServlet("startupProgress", StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class); httpServer.addInternalServlet("getDelegationToken", GetDelegationTokenServlet.PATH_SPEC, GetDelegationTokenServlet.class, true); httpServer.addInternalServlet("renewDelegationToken", RenewDelegationTokenServlet.PATH_SPEC, RenewDelegationTokenServlet.class, true); httpServer.addInternalServlet("cancelDelegationToken", CancelDelegationTokenServlet.PATH_SPEC, CancelDelegationTokenServlet.class, true); httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class, true); //TODO 上传元数据的请求 //SecondaryNameNode合并出来的FSImage需要替换Active NameNode的fsimage //发送的就是http的请求,请求就会转发给这个servlet httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC, ImageServlet.class, true); //TODO 我们可以在50070界面上浏览目录信息,就是因为这儿有这个servlet // http://hadoop1:50070/listpahts/?path = /usr/hive/warehouse httpServer.addInternalServlet("listPaths", "/listPaths/*", ListPathsServlet.class, false); httpServer.addInternalServlet("data", "/data/*", FileDataServlet.class, false); httpServer.addInternalServlet("checksum", "/fileChecksum/*", FileChecksumServlets.RedirectServlet.class, false); httpServer.addInternalServlet("contentSummary", "/contentSummary/*", ContentSummaryServlet.class, false); }

创建RPC

createRpcServer(conf);函数里面是创建了一个NameNodeRpcServer(Configuration conf, NameNode nn)对象,它是NameNode的一个成员变量

class NameNodeRpcServer implements NamenodeProtocols {
。。。。。。。。 private final String minimumDataNodeVersion; }

里面启动了两个服务端

public NameNodeRpcServer(Configuration conf, NameNode nn)      throws IOException {
this.nn = nn; this.namesystem = nn.getNamesystem(); this.retryCache = namesystem.getRetryCache(); this.metrics = NameNode.getNameNodeMetrics(); int handlerCount = conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY, DFS_NAMENODE_HANDLER_COUNT_DEFAULT); RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); /** * TODO 如下就是一段协议,协议里面就会有一堆方法 */ //客户端调用namenode的那些方法,都在这个协议里面。 ClientNamenodeProtocolServerSideTranslatorPB clientProtocolServerTranslator = new ClientNamenodeProtocolServerSideTranslatorPB(this); BlockingService clientNNPbService = ClientNamenodeProtocol. newReflectiveBlockingService(clientProtocolServerTranslator); //datanode之间需要互相调用的协议。 DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator = new DatanodeProtocolServerSideTranslatorPB(this); BlockingService dnProtoPbService = DatanodeProtocolService .newReflectiveBlockingService(dnProtoPbTranslator); //namenode之间互相调用的协议 NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = new NamenodeProtocolServerSideTranslatorPB(this); BlockingService NNPbService = NamenodeProtocolService .newReflectiveBlockingService(namenodeProtocolXlator); RefreshAuthorizationPolicyProtocolServerSideTranslatorPB refreshAuthPolicyXlator = new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this); BlockingService refreshAuthService = RefreshAuthorizationPolicyProtocolService .newReflectiveBlockingService(refreshAuthPolicyXlator); RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator = new RefreshUserMappingsProtocolServerSideTranslatorPB(this); BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService .newReflectiveBlockingService(refreshUserMappingXlator); RefreshCallQueueProtocolServerSideTranslatorPB refreshCallQueueXlator = new RefreshCallQueueProtocolServerSideTranslatorPB(this); BlockingService refreshCallQueueService = RefreshCallQueueProtocolService .newReflectiveBlockingService(refreshCallQueueXlator); GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator = new GenericRefreshProtocolServerSideTranslatorPB(this); BlockingService genericRefreshService = GenericRefreshProtocolService .newReflectiveBlockingService(genericRefreshXlator); GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator = new GetUserMappingsProtocolServerSideTranslatorPB(this); BlockingService getUserMappingService = GetUserMappingsProtocolService .newReflectiveBlockingService(getUserMappingXlator); HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator = new HAServiceProtocolServerSideTranslatorPB(this); BlockingService haPbService = HAServiceProtocolService .newReflectiveBlockingService(haServiceProtocolXlator); TraceAdminProtocolServerSideTranslatorPB traceAdminXlator = new TraceAdminProtocolServerSideTranslatorPB(this); BlockingService traceAdminService = TraceAdminService .newReflectiveBlockingService(traceAdminXlator); WritableRpcEngine.ensureInitialized(); InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf); if (serviceRpcAddr != null) {
String bindHost = nn.getServiceRpcServerBindHost(conf); if (bindHost == null) {
bindHost = serviceRpcAddr.getHostName(); } LOG.info("Service RPC server is binding to " + bindHost + ":" + serviceRpcAddr.getPort()); int serviceHandlerCount = conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT); //TODO 启动ServiceRPCServer // 这个服务起来是服务于 Namenode之间,DataNode与NameNode之间 //进来调用的。 this.serviceRpcServer = new RPC.Builder(conf) .setProtocol( org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService) .setBindAddress(bindHost) .setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()) .build(); // Add all the RPC protocols that the namenode implements,添加协议 DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, serviceRpcServer); DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, serviceRpcServer); DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, serviceRpcServer); DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class, refreshAuthService, serviceRpcServer); DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, refreshUserMappingService, serviceRpcServer); // We support Refreshing call queue here in case the client RPC queue is full DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, refreshCallQueueService, serviceRpcServer); DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class, genericRefreshService, serviceRpcServer); DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, getUserMappingService, serviceRpcServer); DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class, traceAdminService, serviceRpcServer); // Update the address with the correct port InetSocketAddress listenAddr = serviceRpcServer.getListenerAddress(); serviceRPCAddress = new InetSocketAddress( serviceRpcAddr.getHostName(), listenAddr.getPort()); nn.setRpcServiceServerAddress(conf, serviceRPCAddress); } else {
serviceRpcServer = null; serviceRPCAddress = null; } InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf); String bindHost = nn.getRpcServerBindHost(conf); if (bindHost == null) {
bindHost = rpcAddr.getHostName(); } LOG.info("RPC server is binding to " + bindHost + ":" + rpcAddr.getPort()); //TODO 启动了clietnRpcServer //这个服务是主要服务于 用户使用的客户端与Namenode,与DataNode进行交互服务调用的。 this.clientRpcServer = new RPC.Builder(conf) .setProtocol( org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService).setBindAddress(bindHost) .setPort(rpcAddr.getPort()).setNumHandlers(handlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()) .build(); // Add all the RPC protocols that the namenode implements DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, clientRpcServer); DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, clientRpcServer); DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, clientRpcServer); DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class, refreshAuthService, clientRpcServer); DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, refreshUserMappingService, clientRpcServer); DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, refreshCallQueueService, clientRpcServer); DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class, genericRefreshService, clientRpcServer); DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, getUserMappingService, clientRpcServer); DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class, traceAdminService, clientRpcServer); // set service-level authorization security policy if (serviceAuthEnabled = conf.getBoolean( CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()); if (serviceRpcServer != null) {
serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()); } } // The rpc-server port can be ephemeral... ensure we have the correct info InetSocketAddress listenAddr = clientRpcServer.getListenerAddress(); clientRpcAddress = new InetSocketAddress( rpcAddr.getHostName(), listenAddr.getPort()); nn.setRpcServerAddress(conf, clientRpcAddress); minimumDataNodeVersion = conf.get( DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY, DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT); // Set terse exception whose stack trace won't be logged this.clientRpcServer.addTerseExceptions(SafeModeException.class, FileNotFoundException.class, HadoopIllegalArgumentException.class, FileAlreadyExistsException.class, InvalidPathException.class, ParentNotDirectoryException.class, UnresolvedLinkException.class, AlreadyBeingCreatedException.class, QuotaExceededException.class, RecoveryInProgressException.class, AccessControlException.class, InvalidToken.class, LeaseExpiredException.class, NSQuotaExceededException.class, DSQuotaExceededException.class, AclException.class, FSLimitException.PathComponentTooLongException.class, FSLimitException.MaxDirectoryItemsExceededException.class, UnresolvedPathException.class); }

重新回到initialize方法,进行启动服务

//TODO 启动一些公共的服务。NameNode RPC的服务就是在里面启动的    //1)进行资源检查,检查是否有磁盘足够存储元数据 hadoop-deamon.sh start namenode 检查存储元数据的磁盘 100M    //2)进入安全模式检查,检查是否可以退出安全模式。    startCommonServices(conf);
private void startCommonServices(Configuration conf) throws IOException {
//TODO FSNameSystem是管理HDFS的元数据的。里面涉及关于元数据的东西 namesystem.startCommonServices(conf, haContext); registerNNSMXBean(); if (NamenodeRole.NAMENODE != role) {
startHttpServer(conf); httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); } //服务给启动起来了 rpcServer.start(); plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY, ServicePlugin.class); for (ServicePlugin p: plugins) {
try {
p.start(this); } catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be started", t); } } LOG.info(getRole() + " RPC up at: " + rpcServer.getRpcAddress()); if (rpcServer.getServiceRpcAddress() != null) {
LOG.info(getRole() + " service RPC up at: " + rpcServer.getServiceRpcAddress()); } }

startCommonServices

void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
this.registerMBean(); // register the MBean for the FSNamesystemState writeLock(); this.haContext = haContext; try {
//NameNode资源检查 通过core-site.xml hdfs-site.xml两个文件,就知道了元数据存在哪儿? //需要检查三个目录,因为这三个目录都涉及到了元数据 //NameNode的两个目录:存储fsiamge的目录,存储editlog的目录。但是一般情况下,或者默认情况这两个使用的是同一个目录。 //Journlanode里面有也有存储元数据的目录。高可用的模式了 //TODO 里面是获取到了要检查的目录 nnResourceChecker = new NameNodeResourceChecker(conf); //TODO 检查是否有足够的磁盘存储元数据 checkAvailableResources(); assert safeMode != null && !isPopulatingReplQueues(); StartupProgress prog = NameNode.getStartupProgress(); prog.beginPhase(Phase.SAFEMODE); prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS, getCompleteBlocksTotal()); //TODO HDFS的安全模式的检查(面试的时候很多同学遇到过) /** * 1) 面试的时候经常 * 2)你操作hadoop的时候经常会遇到进入安全模式,但是你自己又不是很清楚为什么进入了模式。 */ setBlockTotal(); //TODO 启动重要服务 blockManager.activate(conf); } finally {
writeUnlock(); } registerMXBean(); DefaultMetricsSystem.instance().register(this); if (inodeAttributeProvider != null) {
inodeAttributeProvider.start(); dir.setINodeAttributeProvider(inodeAttributeProvider); } snapshotManager.registerMXBean(); }

NameNodeResourceChecker是namenode的验证

public NameNodeResourceChecker(Configuration conf) throws IOException {
this.conf = conf; volumes = new HashMap
(); //TODO 设置了一个阈值,剩余多少磁盘,就会告警 //默认100M duReserved = conf.getLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY, DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT); Collection
extraCheckedVolumes = Util.stringCollectionAsURIs(conf .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY)); Collection
localEditDirs = Collections2.filter( //TODO 重要 FSNamesystem.getNamespaceEditsDirs(conf), new Predicate
() {
@Override public boolean apply(URI input) {
if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
return true; } return false; } }); // Add all the local edits dirs, marking some as required if they are // configured as such. for (URI editsDirToCheck : localEditDirs) {
addDirToCheck(editsDirToCheck, FSNamesystem.getRequiredNamespaceEditsDirs(conf).contains( editsDirToCheck)); }

getNamespaceEditsDirs里面涉及获取journalnode和namenode的获取

//fsimage  public static final String  DFS_NAMENODE_NAME_DIR_KEY = "dfs.namenode.name.dir";  //editLog  public static final String  DFS_NAMENODE_EDITS_DIR_KEY = "dfs.namenode.edits.dir";
public static List
getNamespaceEditsDirs(Configuration conf, boolean includeShared) throws IOException {
// Use a LinkedHashSet so that order is maintained while we de-dup // the entries. LinkedHashSet
editsDirs = new LinkedHashSet
(); if (includeShared) {
//TODO journalnode List
sharedDirs = getSharedEditsDirs(conf); // Fail until multiple shared edits directories are supported (HDFS-2782) if (sharedDirs.size() > 1) {
throw new IOException( "Multiple shared edits directories are not yet supported"); } // First add the shared edits dirs. It's critical that the shared dirs // are added first, since JournalSet syncs them in the order they are listed, // and we need to make sure all edits are in place in the shared storage // before they are replicated locally. See HDFS-2874. for (URI dir : sharedDirs) {
if (!editsDirs.add(dir)) {
LOG.warn("Edits URI " + dir + " listed multiple times in " + DFS_NAMENODE_SHARED_EDITS_DIR_KEY + ". Ignoring duplicates."); } } } // Now add the non-shared dirs. //TODO namenode for (URI dir : getStorageDirs(conf, DFS_NAMENODE_EDITS_DIR_KEY)) {
if (!editsDirs.add(dir)) {
LOG.warn("Edits URI " + dir + " listed multiple times in " + DFS_NAMENODE_SHARED_EDITS_DIR_KEY + " and " + DFS_NAMENODE_EDITS_DIR_KEY + ". Ignoring duplicates."); } } if (editsDirs.isEmpty()) {
// If this is the case, no edit dirs have been explicitly configured. // Image dirs are to be used for edits too. // return Lists.newArrayList(getNamespaceDirs(conf)); } else {
return Lists.newArrayList(editsDirs); } }

把需要检测的目录加入到volumes

private void addDirToCheck(URI directoryToCheck, boolean required)      throws IOException {
File dir = new File(directoryToCheck.getPath()); if (!dir.exists()) {
throw new IOException("Missing directory "+dir.getAbsolutePath()); } CheckedVolume newVolume = new CheckedVolume(dir, required); CheckedVolume volume = volumes.get(newVolume.getVolume()); if (volume == null || !volume.isRequired()) {
//TODO volume 一个目录就是一个volume // volumes.put(newVolume.getVolume(), newVolume); } }

对目录磁盘进行检测

void checkAvailableResources() {
Preconditions.checkState(nnResourceChecker != null, "nnResourceChecker not initialized"); //是否有足够的磁盘空间 //hasResourcesAvailable = false; //TODO 如果资源不够那么就返回false hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace(); }
public boolean hasAvailableDiskSpace() {
//关键调用的是这儿的代码 return NameNodeResourcePolicy.areResourcesAvailable(volumes.values(), minimumRedundantVolumes); }
static boolean areResourcesAvailable(      Collection
resources, int minimumRedundantResources) {
// TODO: workaround: // - during startup, if there are no edits dirs on disk, then there is // a call to areResourcesAvailable() with no dirs at all, which was // previously causing the NN to enter safemode if (resources.isEmpty()) {
return true; } int requiredResourceCount = 0; int redundantResourceCount = 0; int disabledRedundantResourceCount = 0; //遍历每个目录 for (CheckableNameNodeResource resource : resources) {
if (!resource.isRequired()) {
redundantResourceCount++; if (!resource.isResourceAvailable()) {
disabledRedundantResourceCount++; } } else {
requiredResourceCount++; //TODO 判断磁盘资源是否充足 if (!resource.isResourceAvailable()) {
// Short circuit - a required resource is not available. return false; } } }
public boolean isResourceAvailable() {
//TODO 获取当前目录的空间大小 //这个里面的调用就是最跟的jdk的代码 long availableSpace = df.getAvailable(); if (LOG.isDebugEnabled()) {
LOG.debug("Space available on volume '" + volume + "' is " + availableSpace); } //TODO 如果空间大小小于 100M 就返回false //你经常会见到,你在公司里面不会太看到,但是你在学习的时候,你很有可能就是 //你的虚拟机的空间不足了。你启动namenode的时候,发现也没报错,服务也正常起来 //集群不能正常使用。 if (availableSpace < duReserved) {
//ERROR LOG.warn("Space available on volume '" + volume + "' is " + availableSpace + ", which is below the configured reserved amount " + duReserved); return false; } else {
return true; } }

进入安全模式

//TODO HDFS的安全模式的检查(面试的时候很多同学遇到过)      /**       * 1) 面试的时候经常       * 2)你操作hadoop的时候经常会遇到进入安全模式,但是你自己又不是很清楚为什么进入了模式。       */      setBlockTotal();      //TODO 启动重要服务      blockManager.activate(conf);
public void setBlockTotal() {
// safeMode is volatile, and may be set to null at any time SafeModeInfo safeMode = this.safeMode; if (safeMode == null) return; //TODO 设置安全模式 //getCompleteBlocksTotal() 获取集群所有正常使用的block的个数 safeMode.setBlockTotal((int)getCompleteBlocksTotal()); }
private long getCompleteBlocksTotal() {
// Calculate number of blocks under construction long numUCBlocks = 0; readLock(); //获取所有正在构建的block的个数 numUCBlocks = leaseManager.getNumUnderConstructionBlocks(); try {
//获取所有的block - 正在构建的block = 应该能正常使用的block的个数 return getBlocksTotal() - numUCBlocks; } finally {
readUnlock(); } }
synchronized long getNumUnderConstructionBlocks() {
assert this.fsnamesystem.hasReadLock() : "The FSNamesystem read lock wasn't" + "acquired before counting under construction blocks"; long numUCBlocks = 0; /** * * block的个数 * 目录 -> 文件 -> block * */ //Lease对应一个HDFS的目录 for (Lease lease : sortedLeases) {
//遍历所有的目录 for (String path : lease.getPaths()) {
//获取目录 final INodeFile cons; try {
//读取文件 cons = this.fsnamesystem.getFSDirectory().getINode(path).asFile(); Preconditions.checkState(cons.isUnderConstruction()); } catch (UnresolvedLinkException e) {
throw new AssertionError("Lease files should reside on this FS"); } //一个文件对应很多个文件块 //TODO 获取一个文件对应的所有block BlockInfoContiguous[] blocks = cons.getBlocks(); if(blocks == null) continue; //遍历所有的文件块 for(BlockInfoContiguous b : blocks) {
if(!b.isComplete()) {
numUCBlocks++; } } } } LOG.info("Number of blocks under construction: " + numUCBlocks); //返回集群中还不能使用的块的个数 return numUCBlocks; }

其中

* 在HDFS里面block有两种状态:   * COMPLETE:已经是一个可以使用的完整的一个block   * UnderConstruction: 这个block处于正在构建状态。还不能使用

计算阈值:

private synchronized void setBlockTotal(int total) {
this.blockTotal = total; //TODO 计算阈值 //举例:1000 * 0.999 = 999 this.blockThreshold = (int) (blockTotal * threshold); //999 this.blockReplQueueThreshold = (int) (blockTotal * replQueueThreshold); if (haEnabled) {
// After we initialize the block count, any further namespace // modifications done while in safe mode need to keep track // of the number of total blocks in the system. this.shouldIncrementallyTrackBlocks = true; } if(blockSafe < 0) this.blockSafe = 0; //TODO 检查安全模式 checkMode(); }
private void checkMode() {
// Have to have write-lock since leaving safemode initializes // repl queues, which requires write lock assert hasWriteLock(); if (inTransitionToActive()) {
return; } // if smmthread is already running, the block threshold must have been // reached before, there is no need to enter the safe mode again //TODO 判断是否进入安全模式 if (smmthread == null && needEnter()) {
//true //TODO 进入安全模式 enter(); // check if we are ready to initialize replication queues if (canInitializeReplQueues() && !isPopulatingReplQueues() && !haEnabled) {
initializeReplQueues(); } reportStatus("STATE* Safe mode ON.", false); return; } // the threshold is reached or was reached before if (!isOn() || // safe mode is off extension <= 0 || threshold <= 0) {
// don't need to wait this.leave(); // leave safe mode return; } if (reached > 0) {
// threshold has already been reached before reportStatus("STATE* Safe mode ON.", false); return; } // start monitor reached = monotonicNow(); reachedTimestamp = now(); if (smmthread == null) {
smmthread = new Daemon(new SafeModeMonitor()); smmthread.start(); reportStatus("STATE* Safe mode extension entered.", true); } // check if we are ready to initialize replication queues if (canInitializeReplQueues() && !isPopulatingReplQueues() && !haEnabled) {
initializeReplQueues(); } }
/**      * TODO  条件一     * threshold != 0 && blockSafe < blockThreshold     *   HDFS的元数据那儿程序总计分析出来上一次关闭集群之前     *   假设有1000个complete的block,默认是阈值的计算比例是0.999     *   这样blockThreshold的值是999     *   现在集群起来了以后,发现累计datanode汇报过来的complete的block个数(blockSafe)     *   如果小于999就让集群处于安全模式。     *     * TODO 条件二     *  datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold     *  如果存活的datanode的个数小于一定的数目的时候,也会进去安全模式     *  默认是0,所以相当于没启用,但是我们也可以配置,如果存活的datanode个数     *  少于多少就让HDFS集群出入安全模式。     * TODO 条件三     *  !nameNodeHasResourcesAvailable()     *  就是前面 检查NameNode写的元数据的目录空间是否大于100M,     *  如果目录的空间小于100M,nameNodeHasResourcesAvailable 就为false     *  hdfs就会进入安全模式。     */    private boolean needEnter() {
//999 阈值 // namenode启动起来 然后接着我们datanode也要启动起来 //datanode启动会上报主机的block的信息,然后还需要进行与namenode进行心跳 return (threshold != 0 && blockSafe < blockThreshold) || //一个集群里面存活的datanode的个数少于几个节点就让集群处于安全模式 (datanodeThreshold != 0 && getNumLiveDataNodes() < datanodeThreshold) || //磁盘空间是否充足 !flase = true (!nameNodeHasResourcesAvailable()); }

启动一些重要服务,

public void activate(Configuration conf) {
//启动了等待复制的线程 pendingReplications.start(); //TODO 启动了管理心跳的服务 datanodeManager.activate(conf); this.replicationThread.start(); }

整体流程:

在这里插入图片描述

转载地址:https://blog.csdn.net/weixin_37850264/article/details/112391455 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:(三)DataNode的剖析
下一篇:(十八)生产者源码的总结

发表评论

最新留言

能坚持,总会有不一样的收获!
[***.219.124.196]2024年04月20日 00时52分43秒