RabbitMQ笔记-使用rabbitmq-c实现Fair dispatch(公平分发)
发布日期:2021-06-30 11:02:00
浏览次数:3
分类:技术文章
本文共 3121 字,大约阅读时间需要 10 分钟。
目录
概念及注意
这里C接口中有2个函数一个是:
amqp_basic_get()
另外一个是:
amqp_basic_consume()
前者可以一条一条的读取,后者是一次读取一大坨数据。所以目前要使用
amqp_basic_get()去读取RabbitMQ的数据,同时使用amqp_read_message()把数据给读出来。
读完一条数据就发一个确认,这样的话即能者多劳。
代码与实例
程序运行截图如下:
消息队列端:
源码如下:
consumer端:
#define _CRT_SECURE_NO_WARNINGS#include#include #include #include #include #include #include using namespace std;void delay_msec(int msec){ clock_t now = clock(); while(clock() - now < msec);}int main(int *argc, int *argv[]){ string name = (char *)argv[1]; string delayStr = (char*)argv[2]; //cout << "The delayStr is : " << delayStr << endl; int delay = std::stoi(delayStr); //cout << "The delay is : " << delay << endl; //string name = "one"; //int delay = 1; string hostName = "127.0.0.1"; int port = 5672; amqp_socket_t *socket = nullptr; amqp_connection_state_t conn; conn = amqp_new_connection(); socket = amqp_tcp_socket_new(conn); if(!socket){ cout << "create socket failed!"; exit(1); } if(amqp_socket_open(socket, hostName.c_str(), port)){ cout << "opening TCP socket failed" << endl; exit(1); } //登录 if(1 != amqp_login(conn, "/vhost_cff", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "cff", "123").reply_type){ cout << "login failed" << endl; exit(1); } amqp_channel_open(conn, 1); while(1){ amqp_basic_get(conn, 1, amqp_cstring_bytes("test_work_queue"), 1); amqp_message_t *msg = new amqp_message_t; amqp_read_message(conn, 1, msg, 0); cout << "【" << name << "】 The result is : " << (char *)msg->body.bytes << endl; amqp_destroy_message(msg); delete msg; delay_msec(1000 * delay); } amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS); amqp_connection_close(conn, AMQP_REPLY_SUCCESS); getchar(); return 0;}
producer端:
#define _CRT_SECURE_NO_WARNINGS#include#include #include #include #include #include #include using namespace std;void delay_msec(int msec){ clock_t now = clock(); while(clock() - now < msec);}int main(int *argc, int *argv[]){ string hostName = "127.0.0.1"; int port = 5672; amqp_socket_t *socket = nullptr; amqp_connection_state_t conn; conn = amqp_new_connection(); socket = amqp_tcp_socket_new(conn); if(!socket){ cout << "create socket failed!"; exit(1); } if(amqp_socket_open(socket, hostName.c_str(), port)){ cout << "opening TCP socket failed" << endl; exit(1); } //登录 if(1 != amqp_login(conn, "/vhost_cff", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "cff", "123").reply_type){ cout << "login failed" << endl; } //管道 amqp_channel_open(conn, 1); int i = 0; while(1){ string str = "Hello " + to_string(i); char message[64] = {'\0'}; strcpy(message, str.c_str()); amqp_bytes_t message_bytes; message_bytes.len = sizeof(message); message_bytes.bytes = message; amqp_basic_publish(conn, 1, amqp_cstring_bytes(""), amqp_cstring_bytes("test_work_queue"), 0, 0, nullptr, message_bytes); cout << "send msg over!" << str << endl; i++; delay_msec(100); if(i == 10000) break; } amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS); amqp_connection_close(conn, AMQP_REPLY_SUCCESS); getchar(); return 0;}
源码下载地址:
转载地址:https://it1995.blog.csdn.net/article/details/94012784 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
逛到本站,mark一下
[***.202.152.39]2024年04月10日 19时43分44秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
在centos7上部署 redis 和基本操作
2019-04-30
redis配置文件的持久化(详细对比)
2019-04-30
squid代理-----透明代理模式
2019-04-30
squid代理介绍----ACL控制应用+sarg日志分析+反向代理
2019-04-30
redis集群之主从模式+哨兵模式
2019-04-30
rsync远程同步(rsync源服务器+inotify实时同步)
2019-04-30
GlusterFS原理及如何配置使用
2019-04-30
shell之条件测试和if语句
2019-04-30
shell脚本之case-for-while-until语句
2019-04-30
shell脚本小案例之九九乘法表、幸运大抽奖、简易计算器
2019-04-30
shell脚本之函数和数组(含案例,适合新手练习)
2019-04-30
shell脚本之数组的升降序排序,插入排序
2019-04-30
shell脚本之正则表达式(grep 和 egrep命令详解)
2019-04-30
shell脚本之sed工具使用
2019-04-30
shell脚本之awk工具详解
2019-04-30
shell脚本之排序工具(sort、uniq)
2019-04-30
shell脚本之expect免交互
2019-04-30
shell编程之实战----MAC记录与端口扫描脚本、开发系统监控脚本
2019-04-30
ELK日志分析系统原理与部署
2019-04-30
Nginx访问状态以及基于多域名、多端口、多IP配置虚拟主机
2019-04-30