网络编程之每天学习一点点[day12]-----netty最佳实践之心跳检测
发布日期:2021-06-30 13:45:26 浏览次数:2 分类:技术文章

本文共 11063 字,大约阅读时间需要 36 分钟。

实际应用中,常见的有一个master机器和多个slave机器组成的集群,master常需要对slave进行心跳检测,接收来自slave的信息,这个例子中我们就使用netty来实现。

Server

import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;public class Server {	public static void main(String[] args) throws Exception{				EventLoopGroup pGroup = new NioEventLoopGroup();		EventLoopGroup cGroup = new NioEventLoopGroup();				ServerBootstrap b = new ServerBootstrap();		b.group(pGroup, cGroup)		 .channel(NioServerSocketChannel.class)		 .option(ChannelOption.SO_BACKLOG, 1024)		 //设置日志		 .handler(new LoggingHandler(LogLevel.INFO))		 .childHandler(new ChannelInitializer
() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ServerHeartBeatHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); }}

服务端代码不变,主要看下server

import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import java.util.HashMap;public class ServerHeartBeatHandler extends ChannelHandlerAdapter {    	/** key:ip value:auth */	private static HashMap
AUTH_IP_MAP = new HashMap
(); private static final String SUCCESS_KEY = "auth_success_key"; static { AUTH_IP_MAP.put("169.254.90.205", "1234"); } private boolean auth(ChannelHandlerContext ctx, Object msg){ //System.out.println(msg); String [] ret = ((String) msg).split(","); String auth = AUTH_IP_MAP.get(ret[0]); if(auth != null && auth.equals(ret[1])){ ctx.writeAndFlush(SUCCESS_KEY); return true; } else { ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE); return false; } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof String){ auth(ctx, msg); } else if (msg instanceof RequestInfo) { RequestInfo info = (RequestInfo) msg; System.out.println("--------------------------------------------"); System.out.println("当前主机ip为: " + info.getIp()); System.out.println("当前主机cpu情况: "); HashMap
cpu = info.getCpuPercMap(); System.out.println("总使用率: " + cpu.get("combined")); System.out.println("用户使用率: " + cpu.get("user")); System.out.println("系统使用率: " + cpu.get("sys")); System.out.println("等待率: " + cpu.get("wait")); System.out.println("空闲率: " + cpu.get("idle")); System.out.println("当前主机memory情况: "); HashMap
memory = info.getMemoryMap(); System.out.println("内存总量: " + memory.get("total")); System.out.println("当前内存使用量: " + memory.get("used")); System.out.println("当前内存剩余量: " + memory.get("free")); System.out.println("--------------------------------------------"); ctx.writeAndFlush("info received!"); } else { ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE); } }}

定义了一个静态代码块,来模拟数据库中的ip和key映射关系,当客户端发送认证信息auth时,我们将根据这个映射关系来比较:

private static HashMap
AUTH_IP_MAP = new HashMap
(); private static final String SUCCESS_KEY = "auth_success_key";
static {		AUTH_IP_MAP.put("169.254.90.205", "1234");	}

Client代码也不变:

import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class Client {		public static void main(String[] args) throws Exception{				EventLoopGroup group = new NioEventLoopGroup();		Bootstrap b = new Bootstrap();		b.group(group)		 .channel(NioSocketChannel.class)		 .handler(new ChannelInitializer
() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ClienHeartBeattHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().closeFuture().sync(); group.shutdownGracefully(); }}

看下ClienHeartBeattHandler

import java.net.InetAddress;import java.util.HashMap;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.ScheduledFuture;import java.util.concurrent.TimeUnit;import org.hyperic.sigar.CpuPerc;import org.hyperic.sigar.Mem;import org.hyperic.sigar.Sigar;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import io.netty.util.ReferenceCountUtil;public class ClienHeartBeattHandler extends ChannelHandlerAdapter {    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);        private ScheduledFuture
heartBeat; //主动向服务器发送认证信息 private InetAddress addr ; private static final String SUCCESS_KEY = "auth_success_key"; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { addr = InetAddress.getLocalHost(); String ip = addr.getHostAddress(); String key = "1234"; //证书 String auth = ip + "," + key; ctx.writeAndFlush(auth); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { if(msg instanceof String){ String ret = (String)msg; if(SUCCESS_KEY.equals(ret)){ // 握手成功,主动发送心跳消息 this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2, TimeUnit.SECONDS); System.out.println(msg); } else { System.out.println(msg); } } } finally { ReferenceCountUtil.release(msg); } } private class HeartBeatTask implements Runnable { private final ChannelHandlerContext ctx; public HeartBeatTask(final ChannelHandlerContext ctx) { this.ctx = ctx; } @Override public void run() { try { RequestInfo info = new RequestInfo(); //ip info.setIp(addr.getHostAddress()); Sigar sigar = new Sigar(); //cpu prec CpuPerc cpuPerc = sigar.getCpuPerc(); HashMap
cpuPercMap = new HashMap
(); cpuPercMap.put("combined", cpuPerc.getCombined()); cpuPercMap.put("user", cpuPerc.getUser()); cpuPercMap.put("sys", cpuPerc.getSys()); cpuPercMap.put("wait", cpuPerc.getWait()); cpuPercMap.put("idle", cpuPerc.getIdle()); // memory Mem mem = sigar.getMem(); HashMap
memoryMap = new HashMap
(); memoryMap.put("total", mem.getTotal() / 1024L); memoryMap.put("used", mem.getUsed() / 1024L); memoryMap.put("free", mem.getFree() / 1024L); info.setCpuPercMap(cpuPercMap); info.setMemoryMap(memoryMap); ctx.writeAndFlush(info); } catch (Exception e) { e.printStackTrace(); } } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if (heartBeat != null) { heartBeat.cancel(true); heartBeat = null; } ctx.fireExceptionCaught(cause); } }}

当client连接上server后,触发channelActive方法,发送本机ip和key:1234,给服务端,服务端拿到后进行认证。

@Override	public void channelActive(ChannelHandlerContext ctx) throws Exception {		addr = InetAddress.getLocalHost();        String ip = addr.getHostAddress();		String key = "1234";		//证书		String auth = ip + "," + key;		ctx.writeAndFlush(auth);	}

看下服务端认证代码:

private boolean auth(ChannelHandlerContext ctx, Object msg){			//System.out.println(msg);			String [] ret = ((String) msg).split(",");			String auth = AUTH_IP_MAP.get(ret[0]);			if(auth != null && auth.equals(ret[1])){				ctx.writeAndFlush(SUCCESS_KEY);				return true;			} else {				ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);				return false;			}	}

根据发客户端发送信息拆分第一个参数ip,获取得到对应key,跟服务端AUTH_IP_MAP中的key比较,如果相等则认证成功,返回给客户端一个:

private static final String SUCCESS_KEY = "auth_success_key";

客户端拿到后,握手成功,发心跳给服务端:

@Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    	try {        	if(msg instanceof String){        		String ret = (String)msg;        		if(SUCCESS_KEY.equals(ret)){        	    	// 握手成功,主动发送心跳消息        	    	this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2, TimeUnit.SECONDS);        		    System.out.println(msg);    			        		}        		else {        			System.out.println(msg);        		}        	}		} finally {			ReferenceCountUtil.release(msg);		}    }

心跳检测的实现是有一个newScheduledThreadPool,延迟0s执行,每2s执行一次HeartBeatTask,发送RequestInfo当前客户端机器信息给服务端打印出来,服务端如果没收到来自客户端的消息则断开和客户端的连接。

@Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {		if(msg instanceof String){			auth(ctx, msg);		} else if (msg instanceof RequestInfo) {						RequestInfo info = (RequestInfo) msg;			System.out.println("--------------------------------------------");			System.out.println("当前主机ip为: " + info.getIp());			System.out.println("当前主机cpu情况: ");			HashMap
cpu = info.getCpuPercMap(); System.out.println("总使用率: " + cpu.get("combined")); System.out.println("用户使用率: " + cpu.get("user")); System.out.println("系统使用率: " + cpu.get("sys")); System.out.println("等待率: " + cpu.get("wait")); System.out.println("空闲率: " + cpu.get("idle")); System.out.println("当前主机memory情况: "); HashMap
memory = info.getMemoryMap(); System.out.println("内存总量: " + memory.get("total")); System.out.println("当前内存使用量: " + memory.get("used")); System.out.println("当前内存剩余量: " + memory.get("free")); System.out.println("--------------------------------------------"); ctx.writeAndFlush("info received!"); } else { ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE); } }

RequestInfo:

import java.io.Serializable;import java.util.HashMap;public class RequestInfo implements Serializable {	private String ip ;	private HashMap
cpuPercMap ; private HashMap
memoryMap; //.. other field public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public HashMap
getCpuPercMap() { return cpuPercMap; } public void setCpuPercMap(HashMap
cpuPercMap) { this.cpuPercMap = cpuPercMap; } public HashMap
getMemoryMap() { return memoryMap; } public void setMemoryMap(HashMap
memoryMap) { this.memoryMap = memoryMap; } }

转载地址:https://jeffsheng.blog.csdn.net/article/details/80822556 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:网络编程之每天学习一点点[day13]-----netty编解码技术
下一篇:网络编程之每天学习一点点[day11]-----netty最佳实践之数据通信

发表评论

最新留言

哈哈,博客排版真的漂亮呢~
[***.90.31.176]2024年05月02日 13时20分38秒