
【Rust投稿】从零实现消息中间件(6)-CLIENT
发布消息接口:允许客户端向服务器发送消息。 订阅消息接口:客户端可以指定接收特定主题的消息。 消息处理接口:接收到消息后,需要根据订阅关系将消息传递给相关的回调处理函数。
发布日期:2021-05-20 06:54:50
浏览次数:12
分类:精选文章
本文共 6670 字,大约阅读时间需要 22 分钟。
功能设计
作为一个客户端实现,我们需要支持两个主要功能:发布消息(pub)和订阅消息(sub)。具体来说,客户端需要具备以下三项能力:
数据结构定义
为了实现上述功能,客户端需要维护一些核心数据结构:
- writer:用于向服务器发送消息的写入器。
- handler:处理接收到的消息的call-back函数。
- sid:每个客户端可以有多个订阅请求,每个订阅请求需要有一个唯一的ID,这里使用sid来表示。
- stop:用于客户端正常关闭的信号发送。
此外,客户端的实际实现采用了Mutex和WriteHalf等同步机制,以确保在多线程环境下的 thread-safe操作。
接口详情
connect接口
public async fn connect(addr: &str) -> std::io::Result{ let conn = TcpStream::connect(addr).await?; let (reader, writer) = tokio::io::split(conn); let (tx, rx) = tokio::sync::oneshot::channel(); let client = Client { addr: addr.into(), writer: Arc::new(Mutex::new(writer)), stop: Some(tx), sid: 0, handler: Arc::new(Default::default()), }; // backdrop task for receiving messages tokio::spawn(async move { Self::receive_task(reader, rx, client.handler.clone(), client.writer.clone()).await; }); Ok(client)}
pub_message接口
public async fn pub_message(&mut self, subject: &str, msg: & [u8]) -> std::io::Result<()> { let mut writer = self.writer.lock().await; let m = format!("PUB {} {}\r\n", subject, msg.len()); writer.write_all(m.as_bytes()).await?; writer.write_all(msg).await?; writer.write_all("\r\n".as_bytes()).await?; Ok(())}
sub_message接口
public async fn sub_message(&mut self, subject: String, queue: Option, handler: MessageHandler) -> std::io::Result<()> { self.sid += 1; let mut writer = self.writer.lock().await; let m = if let Some(queue) = queue { format!("SUB {} {} {}\r\n", subject.as_str(), queue, self.sid) } else { format!("SUB {} {}\r\n", subject.as_str(), self.sid) }; self.handler.lock().await.insert(subject, handler); writer.write_all(m.as_bytes()).await?; Ok(())}
receive_task
async fn receive_task( mut reader: ReadHalf, stop: oneshot::Receiver<()>, handler: Arc >>>>, writer: Arc< Mutex < TcpStream>>>,) { let mut buf = [0u8; 512]; let mut parser = Parser::new(); loop { select! { _ = stopDetalle received -> { println!("client closed"); return; } r = reader.read(&mut buf[..]).fuse() -> { let n = match r { Err(e) => { println!("read err {}", e); writer.lock().await.shutdown().await; return; } Ok(n) => n, }; if n == 0 { return; } let mut buf2 = &buf[..n]; loop { let r = parser.parse(buf2); let (r, edited3098) = match r { Err(e) => { println!("parse err {}", e); writer.lock().await.shutdown().await; return; } Ok(r) => r, }; if r == ParseResult::NoMsg { break; } match r { ParseResult::MsgArg(msg) => { Self::process_message(msg, handler).await; parser.clear_msg_buf(); } _ => println!("unexpected message type"), } if n == buf.len() { break; } buf2 = &buf2[n..]; } } } }}
代码实现
Client 实现
impl Client { /// 1. 建立到服务器的连接 /// 2. 启动后台任务 pub async fn connect(addr: &str) -> std::io::Result{ let conn = TcpStream::connect(addr).await?; let (reader, writer) = tokio::io::split(conn); let (tx, rx) = tokio::sync::oneshot::channel(); let client = Client { addr: addr.into(), writer: Arc::new(Mutex::new(writer)), stop: Some(tx), sid: 0, handler: Arc::new(Default::default()), }; // tokio::spawn 可以认为和go语言中的 go func() {} tokio::spawn(async move { Self::receive_task(reader, rx, client.handler.clone(), client.writer.clone()).await; }); Ok(client) } async fn receive_task( mut reader: ReadHalf , stop: oneshot::Receiver<()>, handler: Arc >>>>, writer: Arc< Mutex< WriteHalf >>, ) { let mut buf = [0u8; 512]; let mut parser = Parser::new(); let mut stop = stop.fuse(); loop { select! { _ = stop -> { println!("client closed"); return; } r = reader.read(&mut buf[..]).fuse() -> { let n = { match r { Err(e) => { println!("read err {}", e); let _ = writer.lock().await.shutdown().await; return; } Ok(n) => n, } }; if n == 0 { // TCP connection closed, no need to continue return; } let mut buf2 = &buf[..n]; loop { let r = parser.parse(buf2); let (r, n) = match r { Err(e) => { println!("parse err {}", e); let _ = writer.lock().await.shutdown().await; return; } Ok(r) => r, }; if r == ParseResult::NoMsg { break; } match r { ParseResult::MsgArg(msg) => { Self::process_message(msg, handler).await; parser.clear_msg_buf(); } _ => println!("unexpected message type"), } if n == buf.len() { break; } buf2 = &buf2[n..]; } } } } } async fn process_message( msg: MsgArg<'_>, handler: Arc >>>>, ) { let mut handler = handler.lock().await; if let Some(h) = handler.get_mut(msg.subject) { let _ = h(msg.msg); } } pub async fn pub_message(&self, subject: & str, msg: & [u8]) -> std::io::Result<()> { let mut writer = self.writer.lock().await; let m = format!("PUB {} {}\r\n", subject, msg.len()); writer.write_all(m.as_bytes()).await?; writer.write_all(msg).await?; writer.write_all("\r\n".as_bytes()).await?; Ok(()) } pub async fn sub_message( &mut self, subject: String, queue: Option , handler: MessageHandler, ) -> std::io::Result<()> { self.sid += 1; let mut writer = self.writer.lock().await; let m = if let Some(queue) = queue { format!("SUB {} {} {}\r\n", subject.as_str(), queue, self.sid) } else { format!("SUB {} {}\r\n", subject.as_str(), self.sid) }; self.handler.lock().await.insert(subject, handler); writer.write_all(m.as_bytes()).await?; Ok(()) } pub fn close(&mut self) { if let Some(stop) = self.stop.take() { if let Err(e) = stop.send(()) { println!("stop error {:?}", e); } } }}
其他
关于这个客户端的实现,如果你对细节感兴趣,可以去我的GitHub仓库哪里学障用时赏析(请允许我抛掷一个链接)。你可以用 tonpeek 查看工具查看代码中的rational .
https://github.com/nkbai/learnrustbynats
这是一个学习Rust语言结合NATS协议的项目,欢迎关注。
发表评论
最新留言
网站不错 人气很旺了 加油
[***.192.178.218]2025年04月19日 00时22分32秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
【Jquery】获取当前窗口的宽度值/高度值
2019-03-13
Android 架构组件 – 让天下没有难做的 App
2019-03-13
能解决数据可视化大屏需求的3款可视化工具
2019-03-13
【Altium Designer21】工作栏中文解析
2019-03-13
[87]用secureCRT连接虚拟机中的Ubuntu系统,出现“远程主机拒绝连接”错误
2019-03-13
Shell脚本防DNS攻击检测并删除肉机IP
2019-03-13
如何在VSCode中定制JSON的IntelliSense
2019-03-13
椭圆曲线的定义
2019-03-13
多代理区块链框架客户端的操作
2019-03-13
RSA操作中的公钥和私钥的生成
2019-03-13
go语言中类的继承和方法的使用
2019-03-13
caffe训练的时候遇到的text-format 错误解决方案。
2019-03-13
Little Zu Chongzhi's Triangles
2019-03-13
Train Problem II(卡特兰数+大数乘除)
2019-03-13
一些技术博客
2019-03-13
第01问:MySQL 一次 insert 刷几次盘?
2019-03-13
libvirtd:内部错误:Failed to apply firewall rule
2019-03-13
优先级队列2
2019-03-13
TiKV 源码解析系列文章(十三)MVCC 数据读取
2019-03-13