Python多线程模拟多发送端使用mqtt向服务器thingsboard多设备接收端发送消息
发布日期:2021-05-04 18:12:40 浏览次数:22 分类:精选文章

本文共 3435 字,大约阅读时间需要 11 分钟。

下面代码是模拟3个发送设备同时向3个接收设备发送5次消息

import paho.mqtt.client as mqttimport timefrom threading import Threaddef on_connect(client, userdata, flags, rc):    print("Connected with result code: " + str(rc))def on_message(client, userdata, msg):    print(msg.topic + " " + str(msg.payload))send_num_global = 0errorsend_num_global = 0def func(name, message, thread_id, client, send_num):    global send_num_global    global errorsend_num_global    for iter_num in range(5):        time.sleep(1) #每隔2秒发送一次消息        print(f"{name}开始")        # print(f"{message}")        id = '{"id' + str(thread_id+1) + '":' + str(send_num) + '}'        # print(id)        rt = client.publish('v1/devices/me/telemetry', payload=id, qos=0)        print(list(rt))        if list(rt)[0] != 0:            errorsend_num_global += 1 #统计一共错发的次数        send_num_global += 1 #统计一共发送的次数        send_num += 1 #统计单个设备发送的次数        print({   "错发次数":errorsend_num_global, "总发次数":send_num_global})           # 开启多线程def duoxiancheng():    for username_id in range(3): #迭代服务器接受端设备的个数        username_list = ["DqleDovP7M61QtBStKX0", "KkvXOjyfiHk65qLuYCJj", "pm3jdmtK78enWR3tv7iU"] #这里是三个接收设备的令牌,要换成自己的        client = mqtt.Client()         password = "" #密码设为空字符串        client.username_pw_set(username_list[username_id], password)        client.on_connect = on_connect        client.on_message = on_message        client.connect('your_server_ip', 1883, 600)  # your_server_ip为你服务器的ip,1883为默认的端口,600为keepalive的时间间隔        send_num = 1        for client_id in range(3): #迭代发送端设备的个数            test = Thread(target=func, args=("线程"+str(client_id + 1), "开始发送数据", client_id, client, send_num))            test.start()            time.sleep(0.1) #每隔0.1秒开启一个线程if __name__ == '__main__':        duoxiancheng()

下面代码模拟3个发送设备分别向3个接收设备一一对应发送5次消息

import paho.mqtt.client as mqttimport timefrom threading import Threaddef on_connect(client, userdata, flags, rc):    print("Connected with result code: " + str(rc))def on_message(client, userdata, msg):    print(msg.topic + " " + str(msg.payload))send_num_global = 0errorsend_num_global = 0def func(name, message, thread_id, client, send_num):    global send_num_global    global errorsend_num_global    for iter_num in range(5):        time.sleep(0.5)        print(f"{name}开始")        # print(f"{message}")        id = '{"id' + str(thread_id+1) + '":' + str(send_num) + '}'        # print(id)        rt = client.publish('v1/devices/me/telemetry', payload=id, qos=0)        if list(rt)[0] != 0:            errorsend_num_global += 1 #统计一共错发的次数        send_num_global += 1 #统计一共发送的次数        send_num += 1 #统计单个设备发送的次数    # return {"错发次数":errorsend_num, "总发次数":send_num}    print({   "错发次数": errorsend_num_global, "总发次数": send_num_global})         # 开启多线程def duoxiancheng():    for client_id in range(3):  #迭代发送端设备的个数        username_list = ["DqleDovP7M61QtBStKX0", "KkvXOjyfiHk65qLuYCJj", "pm3jdmtK78enWR3tv7iU"]        client = mqtt.Client()         password = ""        client.username_pw_set(username_list[client_id], password)        client.on_connect = on_connect        client.on_message = on_message        client.connect('your_server_ip', 1883, 600)  # your_server_ip为你服务器的ip,1883为默认的端口,600为keepalive的时间间隔        send_num = 1        test = Thread(target=func, args=("线程" + str(client_id + 1), "开始发送数据",                client_id, client, send_num))        test.start()        time.sleep(0.1)if __name__ == '__main__':        duoxiancheng()

Done!

上一篇:用numpy实现隐马尔科夫模型的概率计算问题(前向算法)
下一篇:解决ubuntu18“无线网络激活失败”的问题,重复弹出输入密码界面

发表评论

最新留言

能坚持,总会有不一样的收获!
[***.219.124.196]2025年04月15日 23时02分48秒