Mqtt

来自Jack's Lab
(版本间的差异)
跳转到: 导航, 搜索
(Quickstart)
(Reference)
(未显示1个用户的13个中间版本)
第57行: 第57行:
 
<br>
 
<br>
  
== CMD ==
+
== Python ==
 +
 
 +
=== Install Python3 ===
 +
 
 +
* Install python3.8 from https://www.python.org/downloads/
 +
** Windows: https://www.python.org/ftp/python/3.8.1/python-3.8.1-amd64.exe
 +
** Mac OS X: https://www.python.org/ftp/python/3.8.1/python-3.8.1-macosx10.9.pkg
 +
 
 +
 
 +
Install pip:
  
 
<source lang=bash>
 
<source lang=bash>
#!/bin/bash
+
$ curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
 +
$ sudo python get-pip.py
 +
</source>
  
SPAM_DELAY=1;
+
=== Install mqtt package ===
  
while :
+
<source lang=bash>
do
+
$ python3 --version
     mosquitto_pub -h hostname -t /test/topic -m 'test playload'
+
Python 3.5.3rc1
    sleep $SPAM_DELAY;
+
 
done
+
$ pip3 install paho-mqtt
 +
Collecting paho-mqtt
 +
  Downloading paho-mqtt-1.2.tar.gz (49kB)
 +
     100% |████████████████████████████████| 51kB 308kB/s
 +
Building wheels for collected packages: paho-mqtt
 +
  Running setup.py bdist_wheel for paho-mqtt ... done
 +
  Stored in directory: /home/comcat/.cache/pip/wheels/fa/db/fb/b495e37057e2f40534726b3c00ab26a58fc80fb8d17223df07
 +
Successfully built paho-mqtt
 +
Installing collected packages: paho-mqtt
 +
Successfully installed paho-mqtt-1.2
 
</source>
 
</source>
  
<br><br><br><br>
+
 
 +
More infor: https://pypi.python.org/pypi/paho-mqtt
 +
 
 +
<br>
 +
 
 +
=== mqtt client ===
 +
 
 +
<source lang=python>
 +
import json
 +
import paho.mqtt.client as mqtt
 +
import sys
 +
 
 +
# replace it to your id
 +
###########################################
 +
mqtt_uname = 'test'
 +
mqtt_pass = 'test'
 +
devid = '12020030001'
 +
###########################################
 +
 
 +
def help_msg():
 +
    print("Supported cmd:")
 +
    print("\tmqtt.py message")
 +
 
 +
if len(sys.argv) >= 2:
 +
    set_state = sys.argv[1]
 +
else:
 +
    help_msg()
 +
    sys.exit(-2)
 +
 
 +
mqtt_tx_topic = 'dev/gw'
 +
 
 +
mqtt_rx_topic = 'app2dev/' + devid
 +
 
 +
mqtt_host = '192.168.1.97'
 +
 
 +
mqtt_port = 1883
 +
mqtt_msg = set_state
 +
 
 +
def on_connect(client, u_dat, rc):
 +
    if rc == 0:
 +
        print("Connected successfully")
 +
    else:
 +
        print("Connection failed. rc = "+str(rc))
 +
 
 +
def on_publish(client, u_dat, rc):
 +
    print("Message "+str(rc)+" published.")
 +
 
 +
def on_subscribe(client, u_dat, mid, qos):
 +
    print("Subscribe with "+str(mid)+" received")
 +
 
 +
def on_message(client, udat, msg):
 +
    print("Message received on topic "+msg.topic+" and payload "+str(msg.payload))
 +
 
 +
mqttc = mqtt.Client()
 +
 
 +
mqttc.on_connect = on_connect
 +
mqttc.on_publish = on_publish
 +
mqttc.on_subscribe = on_subscribe
 +
mqttc.on_message = on_message
 +
 
 +
mqttc.username_pw_set(mqtt_uname, mqtt_pass)
 +
mqttc.connect(mqtt_host, mqtt_port)
 +
 
 +
mqttc.publish(mqtt_tx_topic, mqtt_msg)
 +
</source>
 +
 
 +
<br><br>
 +
 
 +
== Go 语言 ==
 +
 
 +
起一个 MQTT 客户端监控 dev/gw  或者 dev/gws(加密)主题消息,负责将这些消息解析后写入数据库
 +
 
 +
用 Go 语言实现这个模块功能,效率高,性能好
 +
 
 +
=== Prepare ===
 +
 
 +
Windows 下安装 Go 语言环境:
 +
 
 +
Go 语言官网(https://golang.google.cn/dl/)下载安装包 go1.14.1.windows-amd64.msi
 +
 
 +
详细可参考:http://c.biancheng.net/view/3992.html
 +
 
 +
 
 +
Linux 安装 Go lang 编译环境:
 +
 
 +
<source lang=bash>
 +
$ sudo apt-get install golang-go
 +
$ sudo apt-get install golang-eclipse-paho-dev
 +
$ export GOPATH=/usr/share/gocode
 +
</source>
 +
 
 +
Refer to:
 +
 
 +
* https://golang.org
 +
* http://www.eclipse.org/paho/clients/golang/
 +
 
 +
<br>
 +
 
 +
=== 消息发送===
 +
 
 +
<source lang=cpp>
 +
package main
 +
 
 +
import (
 +
"fmt"
 +
"os"
 +
"strings"
 +
"net/http"
 +
"encoding/json"
 +
MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
 +
)
 +
 
 +
//define a function for the default message handler
 +
var f MQTT.MessageHandler = func(client *MQTT.Client, msg MQTT.Message) {
 +
fmt.Printf("TOPIC: %s\n", msg.Topic())
 +
fmt.Printf("MSG: %s\n", msg.Payload())
 +
}
 +
 
 +
var uid string = "YOUR_mqtt_uname"
 +
var pass string = "YOUR_mqtt_pass"
 +
 
 +
func main() {
 +
if len(os.Args[1:]) >= 2 {
 +
 
 +
devid := os.Args[1]
 +
cmd := os.Args[2]
 +
msg := ""
 +
 
 +
if cmd == "on" {
 +
msg = "on"
 +
} else if cmd == "off" {
 +
msg = "off"
 +
} else if cmd == "ota" {
 +
msg = "ota"
 +
}
 +
 
 +
//pass := get_token(uid)
 +
send_msg(devid, msg, pass)
 +
 
 +
} else {
 +
help_msg()
 +
}
 +
}
 +
 
 +
func help_msg() {
 +
fmt.Println("Supported cmd:")
 +
fmt.Println("\tmqtt dev_id on")
 +
fmt.Println("\tmqtt dev_id off")
 +
}
 +
 
 +
// 往主题 app2dev/DEV_ID 处发送消息
 +
func send_msg(devid string, msg string, pass string) {
 +
 
 +
opts := MQTT.NewClientOptions().AddBroker("tcp://mqtt.xxx.net:1883")
 +
opts.SetClientID("cid_" + devid)
 +
opts.SetUsername(devid)
 +
opts.SetPassword(pass)
 +
opts.SetDefaultPublishHandler(f)
 +
 
 +
c := MQTT.NewClient(opts)
 +
if token := c.Connect(); token.Wait() && token.Error() != nil {
 +
panic(token.Error())
 +
}
 +
 
 +
mqtt_tx_topic := "app2dev/" + devid
 +
 
 +
token := c.Publish(mqtt_tx_topic, 0, false, msg)
 +
token.Wait()
 +
}
 +
 
 +
/*
 +
func get_token(user_id string) (string) {
 +
url := "http://api.xxx.net/user/token"
 +
 
 +
type Body struct {
 +
UID string `json:"user_id"`
 +
}
 +
 
 +
var body = Body {
 +
UID: user_id,
 +
}
 +
 
 +
data, err := json.Marshal(body)
 +
//fmt.Println(string(data))
 +
 
 +
if err == nil {
 +
resp, err := http.Post(url, "application/json", strings.NewReader(string(data)))
 +
if err != nil {
 +
fmt.Println(err)
 +
}
 +
defer resp.Body.Close()
 +
 
 +
dec := json.NewDecoder(resp.Body)
 +
var v map[string]interface{}
 +
if err := dec.Decode(&v); err != nil {
 +
fmt.Printf("error: %v\n", err)
 +
} else {
 +
if val, ok := v["user_token"].(string); ok {
 +
return val
 +
}
 +
}
 +
}
 +
 
 +
return ""
 +
}
 +
*/
 +
</source>
 +
 
 +
 
 +
<source lang=bash>
 +
$ go build mqtt.go
 +
$ ./mqtt 12020030001  on
 +
$ ./mqtt 12020030001  off
 +
</source>
 +
 
 +
<br><br>
 +
 
 +
== PHP ==
 +
 
 +
* https://github.com/mgdm/Mosquitto-PHP
 +
* https://github.com/php-mqtt/client
 +
* https://segmentfault.com/a/1190000014031341
 +
 
 +
<br>
 +
 
 +
== Reference ==
 +
 
 +
* https://docs.emqx.io/broker/v3/cn/protocol.html
 +
 
 +
<br><br>

2020年4月20日 (一) 08:40的版本

目录

1 Overview

MQTT是一个轻量的发布订阅模式消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用设计。

MQTT官网: http://mqtt.org

MQTT V3.1.1协议规范: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html


特点:

  • 开放消息协议,简单易实现
  • 发布订阅模式,一对多消息发布
  • 基于TCP/IP网络连接
  • 1字节固定报头,2字节心跳报文,报文结构紧凑
  • 消息QoS支持,可靠传输保证


应用:

MQTT协议广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等领域。

  • 物联网M2M通信,物联网大数据采集
  • Android消息推送,WEB消息推送
  • 移动即时消息,例如Facebook Messenger
  • 智能硬件、智能家具、智能电器
  • 车联网通信,电动车站桩采集
  • 智慧城市、远程医疗、远程教育
  • 电力、石油与能源等行业市场


2 Quickstart

MQTT 协议基于主题 (Topic) 进行消息路由

主题 (Topic) 类似 URL 路径,例如:

dev/gateway
dev2srv/DEV1202
srv2dev/DEV1203
  • 发布者通过主题 (Topic) 发送消息
  • 所有订阅 (Subscribe) 该主题的订阅者,都会收到消息


主题 (Topic) 通过 '/' 分割层级,支持 '+', '#' 通配符:

  '+': 表示通配一个层级,例如 a/+,匹配 a/x, a/y
  '#': 表示通配多个层级,例如 a/#,匹配 a/x, a/b/c

订阅者可以订阅含通配符主题,但发布者不允许向含通配符主题发布消息


3 Python

3.1 Install Python3


Install pip:

$ curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
$ sudo python get-pip.py

3.2 Install mqtt package

$ python3 --version
Python 3.5.3rc1

$ pip3 install paho-mqtt
Collecting paho-mqtt
  Downloading paho-mqtt-1.2.tar.gz (49kB)
    100% |████████████████████████████████| 51kB 308kB/s 
Building wheels for collected packages: paho-mqtt
  Running setup.py bdist_wheel for paho-mqtt ... done
  Stored in directory: /home/comcat/.cache/pip/wheels/fa/db/fb/b495e37057e2f40534726b3c00ab26a58fc80fb8d17223df07
Successfully built paho-mqtt
Installing collected packages: paho-mqtt
Successfully installed paho-mqtt-1.2


More infor: https://pypi.python.org/pypi/paho-mqtt


3.3 mqtt client

import json
import paho.mqtt.client as mqtt
import sys

# replace it to your id
###########################################
mqtt_uname = 'test'
mqtt_pass = 'test'
devid = '12020030001'
###########################################

def help_msg():
    print("Supported cmd:")
    print("\tmqtt.py message")

if len(sys.argv) >= 2:
    set_state = sys.argv[1]
else:
    help_msg()
    sys.exit(-2)

mqtt_tx_topic = 'dev/gw'

mqtt_rx_topic = 'app2dev/' + devid

mqtt_host = '192.168.1.97'

mqtt_port = 1883
mqtt_msg = set_state

def on_connect(client, u_dat, rc):
    if rc == 0:
        print("Connected successfully")
    else:
        print("Connection failed. rc = "+str(rc))

def on_publish(client, u_dat, rc):
    print("Message "+str(rc)+" published.")

def on_subscribe(client, u_dat, mid, qos):
    print("Subscribe with "+str(mid)+" received")

def on_message(client, udat, msg):
    print("Message received on topic "+msg.topic+" and payload "+str(msg.payload))

mqttc = mqtt.Client()

mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe
mqttc.on_message = on_message

mqttc.username_pw_set(mqtt_uname, mqtt_pass)
mqttc.connect(mqtt_host, mqtt_port)

mqttc.publish(mqtt_tx_topic, mqtt_msg)



4 Go 语言

起一个 MQTT 客户端监控 dev/gw 或者 dev/gws(加密)主题消息,负责将这些消息解析后写入数据库

用 Go 语言实现这个模块功能,效率高,性能好

4.1 Prepare

Windows 下安装 Go 语言环境:

Go 语言官网(https://golang.google.cn/dl/)下载安装包 go1.14.1.windows-amd64.msi

详细可参考:http://c.biancheng.net/view/3992.html


Linux 安装 Go lang 编译环境:

$ sudo apt-get install golang-go
$ sudo apt-get install golang-eclipse-paho-dev
$ export GOPATH=/usr/share/gocode

Refer to:


4.2 消息发送

package main

import (
	"fmt"
	"os"
	"strings"
	"net/http"
	"encoding/json"
	MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
)

//define a function for the default message handler
var f MQTT.MessageHandler = func(client *MQTT.Client, msg MQTT.Message) {
	fmt.Printf("TOPIC: %s\n", msg.Topic())
	fmt.Printf("MSG: %s\n", msg.Payload())
}

var uid string = "YOUR_mqtt_uname"
var pass string = "YOUR_mqtt_pass"

func main() {
	if len(os.Args[1:]) >= 2 {

		devid := os.Args[1]
		cmd := os.Args[2]
		msg := ""

		if cmd == "on" {
			msg = "on"
		} else if cmd == "off" {
			msg = "off"
		} else if cmd == "ota" {
			msg = "ota"
		} 

		//pass := get_token(uid)
		send_msg(devid, msg, pass)

	} else {
		help_msg()
	}
}

func help_msg() {
	fmt.Println("Supported cmd:")
	fmt.Println("\tmqtt dev_id on")
	fmt.Println("\tmqtt dev_id off")
}

// 往主题 app2dev/DEV_ID 处发送消息
func send_msg(devid string, msg string, pass string) {

		opts := MQTT.NewClientOptions().AddBroker("tcp://mqtt.xxx.net:1883")
		opts.SetClientID("cid_" + devid)
		opts.SetUsername(devid)
		opts.SetPassword(pass)
		opts.SetDefaultPublishHandler(f)

		c := MQTT.NewClient(opts)
		if token := c.Connect(); token.Wait() && token.Error() != nil {
			panic(token.Error())
		}

		mqtt_tx_topic := "app2dev/" + devid

		token := c.Publish(mqtt_tx_topic, 0, false, msg)
		token.Wait()
}

/*
func get_token(user_id string) (string) {
	url := "http://api.xxx.net/user/token"

	type Body struct {
		UID string `json:"user_id"`
	}

	var body = Body {
		UID: user_id,
	}

	data, err := json.Marshal(body)
	//fmt.Println(string(data))

	if err == nil {
		resp, err := http.Post(url, "application/json", strings.NewReader(string(data)))
		if err != nil {
			fmt.Println(err)
		}
		defer resp.Body.Close()

		dec := json.NewDecoder(resp.Body)
		var v map[string]interface{}
		if err := dec.Decode(&v); err != nil {
			fmt.Printf("error: %v\n", err)
		} else {
			if val, ok := v["user_token"].(string); ok {
				return val
			}
		}
	}

	return ""
}
*/


$ go build mqtt.go
$ ./mqtt 12020030001  on
$ ./mqtt 12020030001  off



5 PHP


6 Reference



个人工具
名字空间

变换
操作
导航
工具箱