
Python多线程模拟多发送端使用mqtt向服务器thingsboard多设备接收端发送消息
发布日期:2021-05-04 18:12:40
浏览次数:22
分类:精选文章
本文共 3435 字,大约阅读时间需要 11 分钟。
下面代码是模拟3个发送设备同时向3个接收设备发送5次消息
import paho.mqtt.client as mqttimport timefrom threading import Threaddef on_connect(client, userdata, flags, rc): print("Connected with result code: " + str(rc))def on_message(client, userdata, msg): print(msg.topic + " " + str(msg.payload))send_num_global = 0errorsend_num_global = 0def func(name, message, thread_id, client, send_num): global send_num_global global errorsend_num_global for iter_num in range(5): time.sleep(1) #每隔2秒发送一次消息 print(f"{name}开始") # print(f"{message}") id = '{"id' + str(thread_id+1) + '":' + str(send_num) + '}' # print(id) rt = client.publish('v1/devices/me/telemetry', payload=id, qos=0) print(list(rt)) if list(rt)[0] != 0: errorsend_num_global += 1 #统计一共错发的次数 send_num_global += 1 #统计一共发送的次数 send_num += 1 #统计单个设备发送的次数 print({ "错发次数":errorsend_num_global, "总发次数":send_num_global}) # 开启多线程def duoxiancheng(): for username_id in range(3): #迭代服务器接受端设备的个数 username_list = ["DqleDovP7M61QtBStKX0", "KkvXOjyfiHk65qLuYCJj", "pm3jdmtK78enWR3tv7iU"] #这里是三个接收设备的令牌,要换成自己的 client = mqtt.Client() password = "" #密码设为空字符串 client.username_pw_set(username_list[username_id], password) client.on_connect = on_connect client.on_message = on_message client.connect('your_server_ip', 1883, 600) # your_server_ip为你服务器的ip,1883为默认的端口,600为keepalive的时间间隔 send_num = 1 for client_id in range(3): #迭代发送端设备的个数 test = Thread(target=func, args=("线程"+str(client_id + 1), "开始发送数据", client_id, client, send_num)) test.start() time.sleep(0.1) #每隔0.1秒开启一个线程if __name__ == '__main__': duoxiancheng()
下面代码模拟3个发送设备分别向3个接收设备一一对应发送5次消息
import paho.mqtt.client as mqttimport timefrom threading import Threaddef on_connect(client, userdata, flags, rc): print("Connected with result code: " + str(rc))def on_message(client, userdata, msg): print(msg.topic + " " + str(msg.payload))send_num_global = 0errorsend_num_global = 0def func(name, message, thread_id, client, send_num): global send_num_global global errorsend_num_global for iter_num in range(5): time.sleep(0.5) print(f"{name}开始") # print(f"{message}") id = '{"id' + str(thread_id+1) + '":' + str(send_num) + '}' # print(id) rt = client.publish('v1/devices/me/telemetry', payload=id, qos=0) if list(rt)[0] != 0: errorsend_num_global += 1 #统计一共错发的次数 send_num_global += 1 #统计一共发送的次数 send_num += 1 #统计单个设备发送的次数 # return {"错发次数":errorsend_num, "总发次数":send_num} print({ "错发次数": errorsend_num_global, "总发次数": send_num_global}) # 开启多线程def duoxiancheng(): for client_id in range(3): #迭代发送端设备的个数 username_list = ["DqleDovP7M61QtBStKX0", "KkvXOjyfiHk65qLuYCJj", "pm3jdmtK78enWR3tv7iU"] client = mqtt.Client() password = "" client.username_pw_set(username_list[client_id], password) client.on_connect = on_connect client.on_message = on_message client.connect('your_server_ip', 1883, 600) # your_server_ip为你服务器的ip,1883为默认的端口,600为keepalive的时间间隔 send_num = 1 test = Thread(target=func, args=("线程" + str(client_id + 1), "开始发送数据", client_id, client, send_num)) test.start() time.sleep(0.1)if __name__ == '__main__': duoxiancheng()
Done!
发表评论
最新留言
能坚持,总会有不一样的收获!
[***.219.124.196]2025年04月15日 23时02分48秒
关于作者

喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
使用mybatis-generator生成底层
2019-03-05
Android APK 重签名
2019-03-05
Mybatis【3】-- Mybatis使用工具类读取配置文件以及从属性读取DB信息
2019-03-05
Mybatis【5】-- Mybatis多种增删改查那些你会了么?
2019-03-05
Mybatis【7】-- Mybatis如何知道增删改是否成功执行?
2019-03-05
Mybatis【9】-- Mybatis占位符#{}和拼接符${}有什么区别?
2019-03-05
计算输入的一句英文语句中单词数
2019-03-05
zabbix系列之十——添加短信告警
2019-03-05
docker复制文件到宿主机
2019-03-05
lvs+keepalive构建高可用集群
2019-03-05
Mysql高可用架构(主从同步)
2019-03-05
mysql主从延迟高的原因
2019-03-05
ATS缓存数据结构
2019-03-05
glob模块
2019-03-05
6 个 Linux 运维典型问题
2019-03-05
oracle无法启动asm实例记录
2019-03-05
取消vim打开文件全是黄色方法
2019-03-05
YAML基础教程
2019-03-05