Mqtt

来自Jack's Lab
2020年3月23日 (一) 11:43Comcat (讨论 | 贡献)的版本

跳转到: 导航, 搜索

目录

1 Overview

MQTT是一个轻量的发布订阅模式消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用设计。

MQTT官网: http://mqtt.org

MQTT V3.1.1协议规范: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html


特点:

  • 开放消息协议,简单易实现
  • 发布订阅模式,一对多消息发布
  • 基于TCP/IP网络连接
  • 1字节固定报头,2字节心跳报文,报文结构紧凑
  • 消息QoS支持,可靠传输保证


应用:

MQTT协议广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等领域。

  • 物联网M2M通信,物联网大数据采集
  • Android消息推送,WEB消息推送
  • 移动即时消息,例如Facebook Messenger
  • 智能硬件、智能家具、智能电器
  • 车联网通信,电动车站桩采集
  • 智慧城市、远程医疗、远程教育
  • 电力、石油与能源等行业市场


2 Quickstart

MQTT 协议基于主题 (Topic) 进行消息路由

主题 (Topic) 类似 URL 路径,例如:

dev/gateway
dev2srv/DEV1202
srv2dev/DEV1203
  • 发布者通过主题 (Topic) 发送消息
  • 所有订阅 (Subscribe) 该主题的订阅者,都会收到消息


主题 (Topic) 通过 '/' 分割层级,支持 '+', '#' 通配符:

  '+': 表示通配一个层级,例如 a/+,匹配 a/x, a/y
  '#': 表示通配多个层级,例如 a/#,匹配 a/x, a/b/c

订阅者可以订阅含通配符主题,但发布者不允许向含通配符主题发布消息


3 Python

3.1 Install Python3


Install pip:

$ curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py
$ sudo python get-pip.py

3.2 Install mqtt package

$ python3 --version
Python 3.5.3rc1

$ pip3 install paho-mqtt
Collecting paho-mqtt
  Downloading paho-mqtt-1.2.tar.gz (49kB)
    100% |████████████████████████████████| 51kB 308kB/s 
Building wheels for collected packages: paho-mqtt
  Running setup.py bdist_wheel for paho-mqtt ... done
  Stored in directory: /home/comcat/.cache/pip/wheels/fa/db/fb/b495e37057e2f40534726b3c00ab26a58fc80fb8d17223df07
Successfully built paho-mqtt
Installing collected packages: paho-mqtt
Successfully installed paho-mqtt-1.2


More infor: https://pypi.python.org/pypi/paho-mqtt


3.3 mqtt client

import json
import paho.mqtt.client as mqtt
import sys

# replace it to your id
###########################################
mqtt_uname = 'test'
mqtt_pass = 'test'
devid = '12020030001'
###########################################

def help_msg():
    print("Supported cmd:")
    print("\tmqtt.py message")

if len(sys.argv) >= 2:
    set_state = sys.argv[1]
else:
    help_msg()
    sys.exit(-2)

mqtt_tx_topic = 'dev/gw'

mqtt_rx_topic = 'app2dev/' + devid

mqtt_host = '192.168.1.97'

mqtt_port = 1883
mqtt_msg = set_state

def on_connect(client, u_dat, rc):
    if rc == 0:
        print("Connected successfully")
    else:
        print("Connection failed. rc = "+str(rc))

def on_publish(client, u_dat, rc):
    print("Message "+str(rc)+" published.")

def on_subscribe(client, u_dat, mid, qos):
    print("Subscribe with "+str(mid)+" received")

def on_message(client, udat, msg):
    print("Message received on topic "+msg.topic+" and payload "+str(msg.payload))

mqttc = mqtt.Client()

mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe
mqttc.on_message = on_message

mqttc.username_pw_set(mqtt_uname, mqtt_pass)
mqttc.connect(mqtt_host, mqtt_port)

mqttc.publish(mqtt_tx_topic, mqtt_msg)



3.4 Go 语言

3.4.1 Prepare

安装 Go lang 编译环境:

$ sudo apt-get install golang-go
$ sudo apt-get install golang-eclipse-paho-dev
$ export GOPATH=/usr/share/gocode

Refer to:


3.4.2 Turn On/Off

package main

import (
	"fmt"
	"os"
	"strings"
	"net/http"
	"encoding/json"
	MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
)

//define a function for the default message handler
var f MQTT.MessageHandler = func(client *MQTT.Client, msg MQTT.Message) {
	fmt.Printf("TOPIC: %s\n", msg.Topic())
	fmt.Printf("MSG: %s\n", msg.Payload())
}

var uid string = "YOUR_mqtt_uname"

func main() {
	if len(os.Args[1:]) >= 2 {

		devid := os.Args[1]
		cmd := os.Args[2]
		msg := ""

		if cmd == "on" {
			msg = "on"
		} else if cmd == "off" {
			msg = "off"
		} else if cmd == "ota" {
			msg = "ota"
		} 

		pass := get_token(uid)
		send_msg(devid, msg, pass)

	} else {
		help_msg()
	}
}

func help_msg() {
	fmt.Println("Supported cmd:")
	fmt.Println("\tmqtt dev_id on")
	fmt.Println("\tmqtt dev_id off")
}

// 往主题 app2dev/DEV_ID 处发送消息
func send_msg(devid string, msg string, pass string) {

		opts := MQTT.NewClientOptions().AddBroker("tcp://mqtt.xxx.net:1883")
		opts.SetClientID("cid_" + devid)
		opts.SetUsername(devid)
		opts.SetPassword(pass)
		opts.SetDefaultPublishHandler(f)

		c := MQTT.NewClient(opts)
		if token := c.Connect(); token.Wait() && token.Error() != nil {
			panic(token.Error())
		}

		mqtt_tx_topic := "app2dev/" + devid

		token := c.Publish(mqtt_tx_topic, 0, false, msg)
		token.Wait()
}

func get_token(user_id string) (string) {
	url := "http://api.xxx.net/user/token"

	type Body struct {
		UID string `json:"user_id"`
	}

	var body = Body {
		UID: user_id,
	}

	data, err := json.Marshal(body)
	//fmt.Println(string(data))

	if err == nil {
		resp, err := http.Post(url, "application/json", strings.NewReader(string(data)))
		if err != nil {
			fmt.Println(err)
		}
		defer resp.Body.Close()

		dec := json.NewDecoder(resp.Body)
		var v map[string]interface{}
		if err := dec.Decode(&v); err != nil {
			fmt.Printf("error: %v\n", err)
		} else {
			if val, ok := v["user_token"].(string); ok {
				return val
			}
		}
	}

	return ""
}


$ go build mqtt.go
$ ./mqtt 12020030001  on
$ ./mqtt 12020030001  off



4 Reference



个人工具
名字空间

变换
操作
导航
工具箱