MQTTListener/mqtt/mqtt.go

191 lines
4.5 KiB
Go
Raw Normal View History

2020-08-26 09:54:26 +02:00
package mqtt
import (
"crypto/tls"
"fmt"
"log"
"net/url"
"strconv"
"strings"
2020-08-26 09:54:26 +02:00
2020-09-02 09:02:30 +02:00
"mqttListener/env"
"mqttListener/models"
"regexp"
2020-08-26 09:54:26 +02:00
"time"
2020-08-31 07:01:40 +02:00
MQTT "github.com/eclipse/paho.mqtt.golang"
"encoding/json"
2020-08-26 09:54:26 +02:00
)
/*
URI = mqtt://<user>:<pass>@<server>:<port>
*/
var client MQTT.Client
var opts *MQTT.ClientOptions
var topic string
var uri url.URL
2020-09-02 09:02:30 +02:00
var ev *env.Env
2020-08-26 09:54:26 +02:00
func Connect() {
client = MQTT.NewClient(opts)
token := client.Connect()
for !token.WaitTimeout(3 * time.Second) {
}
if err := token.Error(); err != nil {
log.Fatal(err)
return
}
fmt.Println("connected to broker, topic = ", topic)
}
func Listen() {
if !client.IsConnected() {
log.Fatal("Client is not connected")
return
}
client.Subscribe(topic, 0, subscriptionHandler)
/*
client.Subscribe(topic, 0, func(client MQTT.Client, msg MQTT.Message) {
fmt.Printf("---------------------------------------------------\n")
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: \n%s\n", msg.Payload())
//fmt.Printf("* [%s] %s\n", msg.Topic(), string(msg.Payload()))
})
*/
2020-08-26 09:54:26 +02:00
}
2020-09-02 09:02:30 +02:00
func Setup(environment *env.Env) {
ev = environment
2020-08-26 09:54:26 +02:00
scheme := "tcp://"
2020-09-02 09:02:30 +02:00
if ev.Config.EnableTLS {
2020-08-26 09:54:26 +02:00
scheme = "ssl://"
}
urlString := scheme
2020-09-02 09:02:30 +02:00
urlString += ev.Config.BrokerUsername + ":" + ev.Config.BrokerPassword + "@"
urlString += ev.Config.BrokerAddress + ":"
urlString += strconv.FormatInt(ev.Config.BrokerPort, 10)
2020-08-26 09:54:26 +02:00
fmt.Println("broker urlString: ", urlString)
uri, err := url.Parse(urlString)
if err != nil {
log.Fatal(err)
}
fmt.Println("broker URI: ", *uri)
//fmt.Println("customer: ", *customer)
//fmt.Println("device: ", *device)
topic = "/ATB/#"
2020-09-02 09:02:30 +02:00
opts = createClientOptions(ev.Config.BrokerClientID, uri)
2020-08-26 09:54:26 +02:00
}
func createClientOptions(ClientID string, uri *url.URL) *MQTT.ClientOptions {
opts := MQTT.NewClientOptions()
opts.AddBroker(uri.Scheme + "://" + uri.Host)
opts.SetUsername(uri.User.Username())
password, _ := uri.User.Password()
opts.SetPassword(password)
opts.SetClientID(ClientID)
opts.SetDefaultPublishHandler(f)
2020-09-02 09:02:30 +02:00
if ev.Config.EnableTLS {
2020-08-26 09:54:26 +02:00
tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert}
opts.SetTLSConfig(tlsConfig)
}
return opts
}
func Disconnect() {
client.Disconnect(250)
}
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
fmt.Printf("---------------------------------------------------\n")
fmt.Printf("Default Message Handler:\n")
2020-08-26 09:54:26 +02:00
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}
var subscriptionHandler = func(client MQTT.Client, msg MQTT.Message) {
// DEBUG
fmt.Printf("---------------------------------------------------\n")
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: \n%s\n", msg.Payload())
//fmt.Printf("* [%s] %s\n", msg.Topic(), string(msg.Payload()))
2020-08-31 07:01:40 +02:00
//-------------------------------------------------------------------
// create ID from topic
regexpWithType := "ATB\\/[A-Z]+\\/[\\-]*[0-9]+\\/[\\-]*[0-9]+\\/mo"
regexpDefault := "ATB\\/[0-9]+\\/[\\-]*[0-9]+\\/[\\-]*[0-9]+\\/[0-9]+\\/mo"
topicSlice := strings.Split(msg.Topic(), "/")
matchedWithType, err := regexp.MatchString(regexpWithType, msg.Topic())
if err != nil {
log.Printf("ERROR: matchedWithType: %s", err.Error())
}
matchedDefault, err := regexp.MatchString(regexpDefault, msg.Topic())
if err != nil {
log.Printf("ERROR: matchedDefault: %s", err.Error())
}
2020-09-13 18:37:22 +02:00
var customerID string
var deviceID string
if matchedWithType {
fmt.Printf("Topic matched regexpWithType\n")
2020-09-13 18:37:22 +02:00
customerID = topicSlice[3]
deviceID = topicSlice[4]
} else if matchedDefault {
fmt.Printf("Topic matched regexpDefault\n")
2020-09-13 18:37:22 +02:00
customerID = topicSlice[2]
deviceID = topicSlice[5]
} else {
log.Printf("ERROR: no matching topic: %s", msg.Topic())
return
}
fmt.Printf("Generated deviceID = %s\n", deviceID)
var device models.Device
2020-09-13 18:37:22 +02:00
device.TOPIC = msg.Topic()
device.CustomerID = customerID
device.DeviceID = deviceID
device.ProjectName = "projectName t.b.d."
device.MAC = "t.b.d."
device.SN = "t.b.d."
device.LastMsg = "t.b.d"
2020-08-31 07:01:40 +02:00
//-------------------------------------------------------------------
// extract data from payload
bytes := []byte(msg.Payload())
mo := models.Mo{}
err = json.Unmarshal(bytes, &mo)
if err != nil {
fmt.Println(err.Error())
return
}
device.MAC = mo.ID.PTU4_MAC
device.SN = mo.ID.PTU4_SN
fmt.Printf("Device MAC = %s\n", device.MAC)
fmt.Printf("Device SN = %s\n", device.SN)
2020-09-13 18:37:22 +02:00
// store this device in database
2020-09-02 09:02:30 +02:00
err = ev.DB.InsertDevice(&device)
if err != nil {
fmt.Println(err.Error())
return
}
}