网络编程之每天学习一点点[day12]-----netty最佳实践之心跳检测
发布日期:2021-06-30 13:45:26
浏览次数:2
分类:技术文章
本文共 11063 字,大约阅读时间需要 36 分钟。
实际应用中,常见的有一个master机器和多个slave机器组成的集群,master常需要对slave进行心跳检测,接收来自slave的信息,这个例子中我们就使用netty来实现。
Server
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //设置日志 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ServerHeartBeatHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); }}
服务端代码不变,主要看下server
import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import java.util.HashMap;public class ServerHeartBeatHandler extends ChannelHandlerAdapter { /** key:ip value:auth */ private static HashMapAUTH_IP_MAP = new HashMap (); private static final String SUCCESS_KEY = "auth_success_key"; static { AUTH_IP_MAP.put("169.254.90.205", "1234"); } private boolean auth(ChannelHandlerContext ctx, Object msg){ //System.out.println(msg); String [] ret = ((String) msg).split(","); String auth = AUTH_IP_MAP.get(ret[0]); if(auth != null && auth.equals(ret[1])){ ctx.writeAndFlush(SUCCESS_KEY); return true; } else { ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE); return false; } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof String){ auth(ctx, msg); } else if (msg instanceof RequestInfo) { RequestInfo info = (RequestInfo) msg; System.out.println("--------------------------------------------"); System.out.println("当前主机ip为: " + info.getIp()); System.out.println("当前主机cpu情况: "); HashMap cpu = info.getCpuPercMap(); System.out.println("总使用率: " + cpu.get("combined")); System.out.println("用户使用率: " + cpu.get("user")); System.out.println("系统使用率: " + cpu.get("sys")); System.out.println("等待率: " + cpu.get("wait")); System.out.println("空闲率: " + cpu.get("idle")); System.out.println("当前主机memory情况: "); HashMap memory = info.getMemoryMap(); System.out.println("内存总量: " + memory.get("total")); System.out.println("当前内存使用量: " + memory.get("used")); System.out.println("当前内存剩余量: " + memory.get("free")); System.out.println("--------------------------------------------"); ctx.writeAndFlush("info received!"); } else { ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE); } }}
定义了一个静态代码块,来模拟数据库中的ip和key映射关系,当客户端发送认证信息auth时,我们将根据这个映射关系来比较:
private static HashMapAUTH_IP_MAP = new HashMap (); private static final String SUCCESS_KEY = "auth_success_key";
static { AUTH_IP_MAP.put("169.254.90.205", "1234"); }
Client代码也不变:
import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class Client { public static void main(String[] args) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ClienHeartBeattHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().closeFuture().sync(); group.shutdownGracefully(); }}
看下ClienHeartBeattHandler
import java.net.InetAddress;import java.util.HashMap;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.ScheduledFuture;import java.util.concurrent.TimeUnit;import org.hyperic.sigar.CpuPerc;import org.hyperic.sigar.Mem;import org.hyperic.sigar.Sigar;import io.netty.channel.ChannelHandlerAdapter;import io.netty.channel.ChannelHandlerContext;import io.netty.util.ReferenceCountUtil;public class ClienHeartBeattHandler extends ChannelHandlerAdapter { private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private ScheduledFuture heartBeat; //主动向服务器发送认证信息 private InetAddress addr ; private static final String SUCCESS_KEY = "auth_success_key"; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { addr = InetAddress.getLocalHost(); String ip = addr.getHostAddress(); String key = "1234"; //证书 String auth = ip + "," + key; ctx.writeAndFlush(auth); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { if(msg instanceof String){ String ret = (String)msg; if(SUCCESS_KEY.equals(ret)){ // 握手成功,主动发送心跳消息 this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2, TimeUnit.SECONDS); System.out.println(msg); } else { System.out.println(msg); } } } finally { ReferenceCountUtil.release(msg); } } private class HeartBeatTask implements Runnable { private final ChannelHandlerContext ctx; public HeartBeatTask(final ChannelHandlerContext ctx) { this.ctx = ctx; } @Override public void run() { try { RequestInfo info = new RequestInfo(); //ip info.setIp(addr.getHostAddress()); Sigar sigar = new Sigar(); //cpu prec CpuPerc cpuPerc = sigar.getCpuPerc(); HashMapcpuPercMap = new HashMap (); cpuPercMap.put("combined", cpuPerc.getCombined()); cpuPercMap.put("user", cpuPerc.getUser()); cpuPercMap.put("sys", cpuPerc.getSys()); cpuPercMap.put("wait", cpuPerc.getWait()); cpuPercMap.put("idle", cpuPerc.getIdle()); // memory Mem mem = sigar.getMem(); HashMap memoryMap = new HashMap (); memoryMap.put("total", mem.getTotal() / 1024L); memoryMap.put("used", mem.getUsed() / 1024L); memoryMap.put("free", mem.getFree() / 1024L); info.setCpuPercMap(cpuPercMap); info.setMemoryMap(memoryMap); ctx.writeAndFlush(info); } catch (Exception e) { e.printStackTrace(); } } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); if (heartBeat != null) { heartBeat.cancel(true); heartBeat = null; } ctx.fireExceptionCaught(cause); } }}
当client连接上server后,触发channelActive方法,发送本机ip和key:1234,给服务端,服务端拿到后进行认证。
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { addr = InetAddress.getLocalHost(); String ip = addr.getHostAddress(); String key = "1234"; //证书 String auth = ip + "," + key; ctx.writeAndFlush(auth); }
看下服务端认证代码:
private boolean auth(ChannelHandlerContext ctx, Object msg){ //System.out.println(msg); String [] ret = ((String) msg).split(","); String auth = AUTH_IP_MAP.get(ret[0]); if(auth != null && auth.equals(ret[1])){ ctx.writeAndFlush(SUCCESS_KEY); return true; } else { ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE); return false; } }
根据发客户端发送信息拆分第一个参数ip,获取得到对应key,跟服务端AUTH_IP_MAP中的key比较,如果相等则认证成功,返回给客户端一个:
private static final String SUCCESS_KEY = "auth_success_key";
客户端拿到后,握手成功,发心跳给服务端:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { if(msg instanceof String){ String ret = (String)msg; if(SUCCESS_KEY.equals(ret)){ // 握手成功,主动发送心跳消息 this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2, TimeUnit.SECONDS); System.out.println(msg); } else { System.out.println(msg); } } } finally { ReferenceCountUtil.release(msg); } }
心跳检测的实现是有一个newScheduledThreadPool,延迟0s执行,每2s执行一次HeartBeatTask,发送RequestInfo当前客户端机器信息给服务端打印出来,服务端如果没收到来自客户端的消息则断开和客户端的连接。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof String){ auth(ctx, msg); } else if (msg instanceof RequestInfo) { RequestInfo info = (RequestInfo) msg; System.out.println("--------------------------------------------"); System.out.println("当前主机ip为: " + info.getIp()); System.out.println("当前主机cpu情况: "); HashMapcpu = info.getCpuPercMap(); System.out.println("总使用率: " + cpu.get("combined")); System.out.println("用户使用率: " + cpu.get("user")); System.out.println("系统使用率: " + cpu.get("sys")); System.out.println("等待率: " + cpu.get("wait")); System.out.println("空闲率: " + cpu.get("idle")); System.out.println("当前主机memory情况: "); HashMap memory = info.getMemoryMap(); System.out.println("内存总量: " + memory.get("total")); System.out.println("当前内存使用量: " + memory.get("used")); System.out.println("当前内存剩余量: " + memory.get("free")); System.out.println("--------------------------------------------"); ctx.writeAndFlush("info received!"); } else { ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE); } }
RequestInfo:
import java.io.Serializable;import java.util.HashMap;public class RequestInfo implements Serializable { private String ip ; private HashMapcpuPercMap ; private HashMap memoryMap; //.. other field public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public HashMap getCpuPercMap() { return cpuPercMap; } public void setCpuPercMap(HashMap cpuPercMap) { this.cpuPercMap = cpuPercMap; } public HashMap getMemoryMap() { return memoryMap; } public void setMemoryMap(HashMap memoryMap) { this.memoryMap = memoryMap; } }
转载地址:https://jeffsheng.blog.csdn.net/article/details/80822556 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
哈哈,博客排版真的漂亮呢~
[***.90.31.176]2024年05月02日 13时20分38秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
Windows Server 2012下安装Hyper-V虚拟机
2019-05-01
MacOSX和Windows 8的完美融合
2019-05-01
Iphone5S 体验(视频+截图)
2019-05-01
python 多进程之进程池的操作
2019-05-01
flask整理之 flask程序中的debug模式
2019-05-01
如何和大妈解释区块链,让他们理解区块链
2019-05-01
比特币,父母这一辈能接受吗?
2019-05-01
解释区块链钱包的意思
2019-05-01
让老百姓知道什么是比特币,只需要10分钟
2019-05-01
为什么要反对比特币,这不代表是空气币
2019-05-01
5G技术如何运用在数字资产方面
2019-05-01
区块链技术运用最广泛的还是保险
2019-05-01
我们提出了DeFi项目如何成为聚合器,有两种路径
2019-05-01
SnapEx的新感觉,对新手很友好
2019-05-01
聚合器运用在什么地方最合适
2019-05-01
区块链可以解决供应链的问题,包括食品方面
2019-05-01
首个聚合器怎么产生的,并运用领域在什么
2019-05-01
区块链的优势和劣势的对比
2019-05-01
新技术,区块链能用在什么领域
2019-05-01