112 lines
2.4 KiB
Go
112 lines
2.4 KiB
Go
package mqtt
|
|
|
|
import (
|
|
"crypto/tls"
|
|
//"crypto/x509"
|
|
"fmt"
|
|
"log"
|
|
"net/url"
|
|
"strconv"
|
|
|
|
MQTT "github.com/eclipse/paho.mqtt.golang"
|
|
//"os"
|
|
"mqttListener/Config"
|
|
"time"
|
|
)
|
|
|
|
/*
|
|
URI = mqtt://<user>:<pass>@<server>:<port>
|
|
*/
|
|
|
|
/* TODO: This options are just a suggestion
|
|
Options:
|
|
[-help] Display help
|
|
[-customer] Customer- or project number
|
|
[-device] Device number
|
|
[-uri <uri>] Broker URI
|
|
*/
|
|
|
|
var client MQTT.Client
|
|
var opts *MQTT.ClientOptions
|
|
var topic string
|
|
var uri url.URL
|
|
|
|
func Connect() {
|
|
client = MQTT.NewClient(opts)
|
|
token := client.Connect()
|
|
for !token.WaitTimeout(3 * time.Second) {
|
|
}
|
|
if err := token.Error(); err != nil {
|
|
log.Fatal(err)
|
|
return
|
|
}
|
|
fmt.Println("connected to broker, topic = ", topic)
|
|
}
|
|
|
|
func Listen() {
|
|
if !client.IsConnected() {
|
|
log.Fatal("Client is not connected")
|
|
return
|
|
}
|
|
client.Subscribe(topic, 0, func(client MQTT.Client, msg MQTT.Message) {
|
|
fmt.Printf("---------------------------------------------------\n")
|
|
fmt.Printf("TOPIC: %s\n", msg.Topic())
|
|
fmt.Printf("MSG: \n%s\n", msg.Payload())
|
|
//fmt.Printf("* [%s] %s\n", msg.Topic(), string(msg.Payload()))
|
|
})
|
|
}
|
|
|
|
func Setup() {
|
|
|
|
scheme := "tcp://"
|
|
if Config.EnableTLS {
|
|
scheme = "ssl://"
|
|
}
|
|
|
|
urlString := scheme
|
|
urlString += Config.BrokerUsername + ":" + Config.BrokerPassword + "@"
|
|
urlString += Config.BrokerAddress + ":"
|
|
urlString += strconv.FormatInt(Config.BrokerPort, 10)
|
|
|
|
fmt.Println("broker urlString: ", urlString)
|
|
|
|
uri, err := url.Parse(urlString)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
fmt.Println("broker URI: ", *uri)
|
|
//fmt.Println("customer: ", *customer)
|
|
//fmt.Println("device: ", *device)
|
|
|
|
topic = "/ATB/#"
|
|
|
|
opts = createClientOptions(Config.BrokerClientId, uri)
|
|
|
|
}
|
|
|
|
func createClientOptions(ClientID string, uri *url.URL) *MQTT.ClientOptions {
|
|
opts := MQTT.NewClientOptions()
|
|
opts.AddBroker(uri.Scheme + "://" + uri.Host)
|
|
opts.SetUsername(uri.User.Username())
|
|
password, _ := uri.User.Password()
|
|
opts.SetPassword(password)
|
|
opts.SetClientID(ClientID)
|
|
opts.SetDefaultPublishHandler(f)
|
|
if Config.EnableTLS {
|
|
tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert}
|
|
opts.SetTLSConfig(tlsConfig)
|
|
}
|
|
|
|
return opts
|
|
}
|
|
|
|
func Disconnect() {
|
|
client.Disconnect(250)
|
|
}
|
|
|
|
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
|
|
fmt.Printf("TOPIC: %s\n", msg.Topic())
|
|
fmt.Printf("MSG: %s\n", msg.Payload())
|
|
}
|