Netty网络聊天室之会话管理
发布日期:2021-05-06 20:09:49 浏览次数:24 分类:原创文章

本文共 4749 字,大约阅读时间需要 15 分钟。

写过web的同学们应该对Session这个东西很熟悉。浏览器第一次与服务器建立连接的时候,服务器就会自动为之分配一个Session。Session可以用来判断用户是否经过登录验证,也可以保存用户的各种信息。

其实,Session是很常用的技术。不管是WEB,还是游戏服务,还是联网的桌面程序,都有session的身影。有了Session,我们可以向里面保存各种个人参数,还可以利用session来向客户端发送消息。极大方便了程序对客户端的管理。

Mina IO框架默认有IoSession这个对象,Netty可就没有了。所以我们可以自己创建一个Session抽象。

关于Session,我们希望它有这样的作用。

1. 客户端链路第一次激活时,服务端为之创建一个Session;

2. 可以使用Session向用户发送消息;

3. 可以保存一些重要的且不需要持久化的用户信息;

4. 只能由服务端控制它的生命周期消亡;

public class IoSession {		private static final Logger logger = LoggerFactory.getLogger(IoSession.class);	/** 网络连接channel */	private Channel channel;	private User user;	/** ip地址 */	private String ipAddr;	private boolean reconnected;		/** 拓展用,保存一些个人数据  */	private Map<String, Object> attrs = new HashMap<>();	public IoSession() {	}	public IoSession(Channel channel) {		this.channel = channel;		this.ipAddr = ChannelUtils.getIp(channel);	}		public void setUser(User user) {		this.user = user;	}		/**	 * 向客户端发送消息	 * @param packet	 */	public void sendPacket(Packet packet) {		if (packet == null) {			return;		}		if (channel != null) {			channel.writeAndFlush(packet);		}	}	public String getIpAddr() {		return ipAddr;	}	public void setIpAddr(String ipAddr) {		this.ipAddr = ipAddr;	}	public boolean isReconnected() {		return reconnected;	}	public void setReconnected(boolean reconnected) {		this.reconnected = reconnected;	}	public User getUser() {		return user;	}		public boolean isClose() {		if (channel == null) {			return true;		}		return !channel.isActive() ||			   !channel.isOpen();	}		/**	 * 关闭session 	 * @param reason {@link SessionCloseReason}	 */	public void close(SessionCloseReason reason) {		try{			if (this.channel == null) {				return;			}			if (channel.isOpen()) {				channel.close();				logger.info("close session[{}], reason is {}", getUser().getUserId(), reason);			}else{				logger.info("session[{}] already close, reason is {}", getUser().getUserId(), reason);			}		}catch(Exception e){		}	}}

Session被关闭可以有一系列原因,所以我们最后有一个枚举保存各种原因,像这样

package com.kingston.net;public enum SessionCloseReason {		/** 正常退出 */	NORMAL,		/** 链接超时 */	OVER_TIME,	}

在Netty,channel是通讯的载体,为了方便对channel的各种操作,加了一个channel的工具类(ChannelUtils.java)

public final class ChannelUtils {		public static AttributeKey<IoSession> SESSION_KEY = AttributeKey.valueOf("session");		/**	 * 添加新的会话	 * @param channel	 * @param session	 * @return	 */	public static boolean addChannelSession(Channel channel, IoSession session) {		Attribute<IoSession> sessionAttr = channel.attr(SESSION_KEY);		return sessionAttr.compareAndSet(null, session);	}		public static IoSession getSessionBy(Channel channel) {		Attribute<IoSession> sessionAttr = channel.attr(SESSION_KEY);		return sessionAttr.get() ;	}		public static String getIp(Channel channel) {		return ((InetSocketAddress)channel.remoteAddress()).getAddress().toString().substring(1);	}}

使用了IoSession,先前用于管理用户通讯的工具类,也相应发生变化

public enum ServerManager {	INSTANCE;	private Logger logger = LoggerFactory.getLogger(ServerManager.class);	/** 缓存通信上下文环境对应的登录用户(主要用于服务) */ 	private Map<IoSession, Long> session2UserIds  = new ConcurrentHashMap<>();	/** 缓存用户id与对应的会话 */	private ConcurrentMap<Long, IoSession> userId2Sessions = new ConcurrentHashMap<>();	public void sendPacketTo(Packet pact,Long userId){		if(pact == null || userId <= 0) return;		IoSession session = userId2Sessions.get(userId);		if (session != null) {			session.sendPacket(pact);		}	}	/**	 *  向所有在线用户发送数据包	 */	public void sendPacketToAllUsers(Packet pact){		if(pact == null ) return;		userId2Sessions.values().forEach( (session) -> session.sendPacket(pact));	}	/**	 *  向单一在线用户发送数据包	 */	public void sendPacketTo(Packet pact,ChannelHandlerContext targetContext ){		if(pact == null || targetContext == null) return;		targetContext.writeAndFlush(pact);	}	public IoSession getSessionBy(long userId) {		return this.userId2Sessions.get(userId);	}	public boolean registerSession(User user, IoSession session) {		session.setUser(user);		userId2Sessions.put(user.getUserId(), session);		logger.info("[{}] registered...", user.getUserId());		return true;	}	/**	 *   注销用户通信渠道	 */	public void ungisterUserContext(Channel context ){		if(context  == null){			return;		}		IoSession session = ChannelUtils.getSessionBy(context);		Long userId = session2UserIds.remove(session);		userId2Sessions.remove(userId);		if (session != null) {			session.close(SessionCloseReason.OVER_TIME);		}	}}

加入IoSession后,先前的业务需要做点修改,比如在客户端链路建立后,需要创建新的session对象

在MessageTransportHandler类增加方法

@Override	public void channelActive(ChannelHandlerContext ctx) throws Exception {		if (!ChannelUtils.addChannelSession(ctx.channel(), new IoSession(ctx.channel()))) {			ctx.channel().close();			logger.error("Duplicate session,IP=[{}]",ChannelUtils.getIp(ctx.channel()));		}	}


全部代码已在github上托管

 

服务端代码请移步 --> 

客户端代码请移步 --> 

 

 

上一篇:Netty网络聊天室之使用spring管理各种组件
下一篇:游戏服务端框架之GM金手指的设计

发表评论

最新留言

关注你微信了!
[***.104.42.241]2025年04月10日 19时33分12秒