Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

MQTT协议、REST API和并发优化策略.md 8.7 KiB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. # MQTT协议
  2. MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传递协议,采用发布-订阅模式,旨在实现在物联网和M2M通信中的可靠、高效的消息传递。
  3. ## MQTT协议的工作原理
  4. ### MQTT协议中的三种角色
  5. 客户端角色:MQTT协议涉及两种类型的客户端:发布者(Publisher)和订阅者(Subscriber)。发布者负责发布消息到特定的主题(Topic),而订阅者则订阅感兴趣的主题。
  6. 代理(Broker):MQTT通信需要通过一个称为代理(Broker)的中间件来进行。代理是负责接收、路由和分发消息的服务器。它扮演着消息传递的中心角色,管理发布者和订阅者之间的通信。
  7. 主题(Topic):主题是MQTT中消息的分类标识。发布者将消息发布到特定的主题上,而订阅者可以订阅感兴趣的主题以接收相关的消息。主题可以使用层级结构,例如"sensor/temperature",其中"sensor"是高层级,"temperature"是低层级。订阅者可以使用通配符来订阅多个主题。
  8. ### MQTT协议中的QoS级别
  9. QoS级别:MQTT协议支持三个不同的服务质量(QoS)级别,用于控制消息传递的可靠性和保证。这些级别是:
  10. QoS 0:最多一次传递。消息发布者将消息发送给代理,但不会进行确认或保证消息的传递,也不会进行重试。这是最低的QoS级别,适用于不需要可靠传递的场景。
  11. QoS 1:至少一次传递。消息发布者将消息发送给代理,代理确保至少将消息传递给订阅者一次。如果没有收到确认,代理将重试直到消息传递成功。
  12. QoS 2:恰好一次传递。消息发布者将消息发送给代理,代理确保将消息只传递一次给每个订阅者。这是最高的QoS级别,提供了最强的消息传递保证。
  13. 连接和会话:客户端与代理之间通过TCP/IP建立连接。在连接建立后,客户端可以选择保持持久连接,即会话(Session)。在会话期间,客户端可以发布和订阅消息,并保持订阅状态。持久连接允许客户端在断开连接后重新连接时恢复之前的订阅状态。
  14. ### MQTT协议的消息传递流程
  15. 发布者将消息发布到特定的主题。
  16. 代理接收到发布者的消息,并根据订阅者的订阅信息,将消息路由到匹配的订阅者。
  17. 订阅者接收到匹配的消息,并可以进行相应的处理。
  18. 保留消息:MQTT协议支持保留消息的功能。发布者可以选择将消息标记为保留消息,这意味着代理将存储最新的保留消息,并在订阅者订阅相关主题时立即发送给它们。这使得订阅者可以获取到最新的状态信息,即使它们在消息发布之前已经订阅了主题。
  19. ### MQTT协议小例子
  20. 该示例为:一个发布者发布温度数据,而订阅者接收并打印该数据的过程。
  21. 假设我们有一个温度传感器设备,它每隔一段时间会采集当前的温度值,并通过MQTT协议发布到主题 `sensor/temperature` 上。订阅者可以订阅该主题,以便接收到温度数据。
  22. ```python
  23. import paho.mqtt.client as mqtt
  24. # MQTT回调函数,接收到消息时被调用
  25. def on_message(client, userdata, msg):
  26. print(f"收到消息:{msg.topic} {msg.payload.decode()}")
  27. # 创建MQTT客户端
  28. client = mqtt.Client()
  29. # 设置回调函数
  30. client.on_message = on_message
  31. # 连接到MQTT代理
  32. broker_address = "mqtt.example.com" # 这里要根据实际情况,替换为实际的MQTT代理地址
  33. client.connect(broker_address)
  34. # 订阅主题
  35. topic = "sensor/temperature"
  36. client.subscribe(topic)
  37. # 循环监听MQTT消息
  38. client.loop_start()
  39. # 程序持续运行,等待消息到达
  40. try:
  41. while True:
  42. pass
  43. except KeyboardInterrupt:
  44. pass
  45. # 断开MQTT连接
  46. client.loop_stop()
  47. client.disconnect()
  48. ```
  49. 上面代码创建了一个MQTT客户端,并设置了回调函数 `on_message`,用于接收到消息时的处理。然后,我们连接到MQTT代理,并订阅了主题 `sensor/temperature`。最后,通过循环监听MQTT消息的方式,程序会一直运行并等待消息到达。
  50. 下面代码是创建一个发布者,用于模拟温度传感器设备。它会定期采集温度数据,并通过MQTT协议发布到主题 `sensor/temperature` 上。
  51. ```python
  52. import paho.mqtt.client as mqtt
  53. import time
  54. import random
  55. # 创建MQTT客户端
  56. client = mqtt.Client()
  57. # 连接到MQTT代理
  58. broker_address = "mqtt.example.com" # 替换为实际的MQTT代理地址
  59. client.connect(broker_address)
  60. # 模拟温度数据发布
  61. try:
  62. while True:
  63. temperature = random.uniform(20, 30) # 随机生成温度值
  64. payload = str(temperature)
  65. client.publish("sensor/temperature", payload)
  66. print(f"发布温度数据:{payload}")
  67. time.sleep(5) # 每隔5秒发布一次
  68. except KeyboardInterrupt:
  69. pass
  70. # 断开MQTT连接
  71. client.disconnect()
  72. ```
  73. 我们创建了另一个MQTT客户端,并连接到MQTT代理。然后,我们使用一个while循环,模拟温度数据的采集和发布过程。每隔5秒,它会生成一个随机的温度值,并将其作为消息发布到主题 `sensor/temperature` 上。
  74. 这样,当订阅者程序运行时,它将收到发布者发送的温度数据,并将其打印出来。
  75. 请注意,示例中的MQTT代理地址需要替换为实际使用的MQTT代理地址。
  76. ### 自我感悟
  77. 这种模式与计算机网络中的SDN很像,有一个中间层(远程控制器)维护路由转发和链路信息,Qos级别类似于TCP和UDP,分别对应可靠传输和不可靠传输。
  78. # REST API
  79. REST API(Representational State Transfer Application Programming Interface)是一种用于构建网络应用程序的架构风格和通信协议。它是一种基于现有网络协议的设计原则,主要用于在客户端和服务器之间进行数据传输和交互。
  80. REST API的设计基于以下几个关键概念:
  81. 1. 资源(Resources):在REST中,所有的数据都被视为资源。每个资源都有一个唯一的标识符(URI),通过该标识符可以对资源进行访问和操作。
  82. 1. 统一接口(Uniform Interface):REST API使用统一的接口进行通信。这意味着客户端和服务器之间的交互遵循一组统一的原则和规范,如使用HTTP方法(GET、POST、PUT、DELETE等)对资源进行操作。
  83. 1. 无状态(Stateless):REST API是无状态的,即服务器不会在请求之间保留客户端的状态信息。每个请求都包含足够的信息,使服务器能够理解和处理请求,而不需要依赖先前的请求。
  84. 1. 消息驱动(Message-Driven):REST API通过传递消息进行通信。客户端发送请求消息给服务器,并从服务器接收响应消息。这些消息通常使用标准的HTTP协议进行传输。
  85. 通过使用REST API,开发人员可以创建可扩展、可维护和可互操作的网络应用程序。REST API常用于构建Web服务、移动应用程序后端和云服务等各种类型的应用程序。它的设计原则简单明了,易于理解和实现,因此被广泛采用。
  86. ### 自我感悟
  87. 看了REST API的描述,感觉和之前使用的天气付费接口、百度文本翻译付费接口、图片文字识别付费接口、Chatgpt付费接口等等接口没有区别,都是将HTTP请求接口提供给第三方使用,并提供接口的消息传递方式。
  88. # 多设备时的并发处理优化策略
  89. 对于30个相同设备的状态反馈,可以通过以下几个点来优化延迟控制在500ms以下:
  90. 1. 为每个设备创建一个独立的MQTT连接,避免同一个连接的阻塞影响其他设备。
  91. ```python
  92. # 创建设备连接
  93. device_clients = []
  94. for i in range(30):
  95. client = mqtt.Client()
  96. client.connect(host, port)
  97. device_clients.append(client)
  98. ```
  99. 2. 设备状态主题使用独立的Queue队列收集,避免设备主题处理互相阻塞。
  100. ```python
  101. from queue import Queue
  102. # 状态队列
  103. device_queues = []
  104. for i in range(30):
  105. q = Queue()
  106. device_queues.append(q)
  107. # 订阅设备状态主题到独立队列
  108. for i, client in enumerate(device_clients):
  109. client.subscribe(device_topic[i], q.put)
  110. ```
  111. 3. 使用多线程并发处理设备队列数据,加快主题处理速度。
  112. ```python
  113. from threading import Thread
  114. def handle_queue(i):
  115. while True:
  116. data = device_queues[i].get()
  117. # 处理数据并发送到API
  118. # 启动队列处理线程
  119. threads = []
  120. for i in range(30):
  121. t = Thread(target=handle_queue, args=(i,))
  122. t.start()
  123. threads.append(t)
  124. ```
  125. 4. 对API使用异步框架,如Sanic,FastAPI,提高并发能力。
  126. 5. 对数据库使用连接池,提高写入效率。
  127. 通过以上优化,可以提高系统的并发处理能力,将30个设备的状态反馈延迟控制在500ms以内。关键是避免不同设备间的相互阻塞。