189 lines
4.5 KiB
Go
189 lines
4.5 KiB
Go
package mqtt
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"fmt"
|
|
"log"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"mqttListener/env"
|
|
"mqttListener/models"
|
|
"regexp"
|
|
"time"
|
|
|
|
MQTT "github.com/eclipse/paho.mqtt.golang"
|
|
|
|
"encoding/json"
|
|
)
|
|
|
|
/*
|
|
URI = mqtt://<user>:<pass>@<server>:<port>
|
|
*/
|
|
|
|
var client MQTT.Client
|
|
var opts *MQTT.ClientOptions
|
|
var topic string
|
|
var uri url.URL
|
|
var ev *env.Env
|
|
|
|
func Connect() {
|
|
client = MQTT.NewClient(opts)
|
|
token := client.Connect()
|
|
for !token.WaitTimeout(3 * time.Second) {
|
|
}
|
|
if err := token.Error(); err != nil {
|
|
log.Fatal(err)
|
|
return
|
|
}
|
|
fmt.Println("connected to broker, topic = ", topic)
|
|
}
|
|
|
|
func Listen(topic string) {
|
|
if !client.IsConnected() {
|
|
log.Fatal("Client is not connected")
|
|
return
|
|
}
|
|
|
|
client.Subscribe(topic, 0, subscriptionHandler)
|
|
/*
|
|
client.Subscribe(topic, 0, func(client MQTT.Client, msg MQTT.Message) {
|
|
fmt.Printf("---------------------------------------------------\n")
|
|
fmt.Printf("TOPIC: %s\n", msg.Topic())
|
|
fmt.Printf("MSG: \n%s\n", msg.Payload())
|
|
//fmt.Printf("* [%s] %s\n", msg.Topic(), string(msg.Payload()))
|
|
})
|
|
*/
|
|
}
|
|
|
|
func Setup(environment *env.Env) {
|
|
|
|
ev = environment
|
|
|
|
scheme := "tcp://"
|
|
if ev.Config.EnableTLS {
|
|
scheme = "ssl://"
|
|
}
|
|
|
|
urlString := scheme
|
|
urlString += ev.Config.BrokerUsername + ":" + ev.Config.BrokerPassword + "@"
|
|
urlString += ev.Config.BrokerAddress + ":"
|
|
urlString += strconv.FormatInt(ev.Config.BrokerPort, 10)
|
|
|
|
fmt.Println("broker urlString: ", urlString)
|
|
|
|
uri, err := url.Parse(urlString)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
fmt.Println("broker URI: ", *uri)
|
|
//fmt.Println("customer: ", *customer)
|
|
//fmt.Println("device: ", *device)
|
|
|
|
opts = createClientOptions(ev.Config.BrokerClientID, uri)
|
|
|
|
}
|
|
|
|
func createClientOptions(ClientID string, uri *url.URL) *MQTT.ClientOptions {
|
|
opts := MQTT.NewClientOptions()
|
|
opts.AddBroker(uri.Scheme + "://" + uri.Host)
|
|
opts.SetUsername(uri.User.Username())
|
|
password, _ := uri.User.Password()
|
|
opts.SetPassword(password)
|
|
opts.SetClientID(ClientID)
|
|
opts.SetDefaultPublishHandler(f)
|
|
if ev.Config.EnableTLS {
|
|
tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert}
|
|
opts.SetTLSConfig(tlsConfig)
|
|
}
|
|
|
|
return opts
|
|
}
|
|
|
|
func Disconnect() {
|
|
client.Disconnect(250)
|
|
}
|
|
|
|
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
|
|
fmt.Printf("---------------------------------------------------\n")
|
|
fmt.Printf("Default Message Handler:\n")
|
|
fmt.Printf("TOPIC: %s\n", msg.Topic())
|
|
fmt.Printf("MSG: %s\n", msg.Payload())
|
|
}
|
|
|
|
var subscriptionHandler = func(client MQTT.Client, msg MQTT.Message) {
|
|
// DEBUG
|
|
fmt.Printf("---------------------------------------------------\n")
|
|
fmt.Printf("TOPIC: %s\n", msg.Topic())
|
|
fmt.Printf("MSG: \n%s\n", msg.Payload())
|
|
//fmt.Printf("* [%s] %s\n", msg.Topic(), string(msg.Payload()))
|
|
|
|
//-------------------------------------------------------------------
|
|
// create ID from topic
|
|
regexpWithType := "ATB\\/[A-Z]+\\/[\\-]*[0-9]+\\/[\\-]*[0-9]+\\/mo"
|
|
regexpDefault := "ATB\\/[0-9]+\\/[\\-]*[0-9]+\\/[\\-]*[0-9]+\\/[0-9]+\\/mo"
|
|
|
|
topicSlice := strings.Split(msg.Topic(), "/")
|
|
|
|
matchedWithType, err := regexp.MatchString(regexpWithType, msg.Topic())
|
|
if err != nil {
|
|
log.Printf("ERROR: matchedWithType: %s", err.Error())
|
|
}
|
|
matchedDefault, err := regexp.MatchString(regexpDefault, msg.Topic())
|
|
if err != nil {
|
|
log.Printf("ERROR: matchedDefault: %s", err.Error())
|
|
}
|
|
|
|
var customerID string
|
|
var deviceID string
|
|
if matchedWithType {
|
|
fmt.Printf("Topic matched regexpWithType\n")
|
|
customerID = topicSlice[3]
|
|
deviceID = topicSlice[4]
|
|
} else if matchedDefault {
|
|
fmt.Printf("Topic matched regexpDefault\n")
|
|
customerID = topicSlice[2]
|
|
deviceID = topicSlice[5]
|
|
} else {
|
|
log.Printf("ERROR: no matching topic: %s", msg.Topic())
|
|
return
|
|
}
|
|
|
|
fmt.Printf("Generated deviceID = %s\n", deviceID)
|
|
|
|
var device models.Device
|
|
device.TOPIC = msg.Topic()
|
|
device.CustomerID = customerID
|
|
device.DeviceID = deviceID
|
|
device.ProjectName = "projectName t.b.d."
|
|
|
|
device.MAC = "t.b.d."
|
|
device.SN = "t.b.d."
|
|
device.LastMsg = "t.b.d"
|
|
|
|
//-------------------------------------------------------------------
|
|
// extract data from payload
|
|
bytes := []byte(msg.Payload())
|
|
mo := models.Mo{}
|
|
err = json.Unmarshal(bytes, &mo)
|
|
if err != nil {
|
|
fmt.Println(err.Error())
|
|
return
|
|
}
|
|
|
|
device.MAC = mo.ID.PTU4_MAC
|
|
device.SN = mo.ID.PTU4_SN
|
|
|
|
fmt.Printf("Device MAC = %s\n", device.MAC)
|
|
fmt.Printf("Device SN = %s\n", device.SN)
|
|
|
|
// store this device in database
|
|
err = ev.DB.InsertDevice(&device)
|
|
if err != nil {
|
|
fmt.Println(err.Error())
|
|
return
|
|
}
|
|
}
|