From cf4274c1e16ce62cda32ebb7aad937eaaea0e1b6 Mon Sep 17 00:00:00 2001 From: Siegfried Siegert Date: Wed, 26 Aug 2020 09:54:26 +0200 Subject: [PATCH] Initial commit --- Config/Config.go | 32 ++++++++++++++ main.go | 34 +++++++++++++++ mqtt/mqtt.go | 111 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 177 insertions(+) create mode 100644 Config/Config.go create mode 100644 main.go create mode 100644 mqtt/mqtt.go diff --git a/Config/Config.go b/Config/Config.go new file mode 100644 index 0000000..58456e7 --- /dev/null +++ b/Config/Config.go @@ -0,0 +1,32 @@ +package Config + +import ( + //"fmt" + "log" + + "github.com/pelletier/go-toml" +) + +var BrokerAddress = "" +var BrokerPort int64 = 8883 +var BrokerClientId = "go_client" +var BrokerUsername = "" +var BrokerPassword = "" +var EnableTLS = true + +func ReadConfig() { + config, err := toml.LoadFile("config.toml") + + if err != nil { + //fmt.Println("Error ", err.Error()) + log.Fatal(err.Error()) + } else { + // retrieve data directly + BrokerAddress = config.Get("BROKER.Address").(string) + BrokerPort = config.Get("BROKER.Port").(int64) + BrokerClientId = config.Get("BROKER.ClientId").(string) + BrokerUsername = config.Get("BROKER.Username").(string) + BrokerPassword = config.Get("BROKER.Password").(string) + EnableTLS = config.Get("BROKER.EnableTLS").(bool) + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..12c530e --- /dev/null +++ b/main.go @@ -0,0 +1,34 @@ +package main + +import ( + "mqttListener/Config" + "mqttListener/mqtt" + + //"mqttListener/cmd" + + //"fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + Config.ReadConfig() + + // DEBUG Config + // fmt.Println("BrokerAddress = ", Config.BrokerAddress) + + mqtt.Setup() + mqtt.Connect() + go mqtt.Listen() + + //cmd.Exec() + + <-c // wait for SIGTERM + + mqtt.Disconnect() +} diff --git a/mqtt/mqtt.go b/mqtt/mqtt.go new file mode 100644 index 0000000..ef3dbef --- /dev/null +++ b/mqtt/mqtt.go @@ -0,0 +1,111 @@ +package mqtt + +import ( + "crypto/tls" + //"crypto/x509" + "fmt" + "log" + "net/url" + "strconv" + + MQTT "github.com/eclipse/paho.mqtt.golang" + //"os" + "mqttListener/Config" + "time" +) + +/* + URI = mqtt://:@: +*/ + +/* TODO: This options are just a suggestion +Options: + [-help] Display help + [-customer] Customer- or project number + [-device] Device number + [-uri ] Broker URI +*/ + +var client MQTT.Client +var opts *MQTT.ClientOptions +var topic string +var uri url.URL + +func Connect() { + client = MQTT.NewClient(opts) + token := client.Connect() + for !token.WaitTimeout(3 * time.Second) { + } + if err := token.Error(); err != nil { + log.Fatal(err) + return + } + fmt.Println("connected to broker, topic = ", topic) +} + +func Listen() { + if !client.IsConnected() { + log.Fatal("Client is not connected") + return + } + client.Subscribe(topic, 0, func(client MQTT.Client, msg MQTT.Message) { + fmt.Printf("---------------------------------------------------\n") + fmt.Printf("TOPIC: %s\n", msg.Topic()) + fmt.Printf("MSG: \n%s\n", msg.Payload()) + //fmt.Printf("* [%s] %s\n", msg.Topic(), string(msg.Payload())) + }) +} + +func Setup() { + + scheme := "tcp://" + if Config.EnableTLS { + scheme = "ssl://" + } + + urlString := scheme + urlString += Config.BrokerUsername + ":" + Config.BrokerPassword + "@" + urlString += Config.BrokerAddress + ":" + urlString += strconv.FormatInt(Config.BrokerPort, 10) + + fmt.Println("broker urlString: ", urlString) + + uri, err := url.Parse(urlString) + if err != nil { + log.Fatal(err) + } + + fmt.Println("broker URI: ", *uri) + //fmt.Println("customer: ", *customer) + //fmt.Println("device: ", *device) + + topic = "/ATB/#" + + opts = createClientOptions(Config.BrokerClientId, uri) + +} + +func createClientOptions(ClientID string, uri *url.URL) *MQTT.ClientOptions { + opts := MQTT.NewClientOptions() + opts.AddBroker(uri.Scheme + "://" + uri.Host) + opts.SetUsername(uri.User.Username()) + password, _ := uri.User.Password() + opts.SetPassword(password) + opts.SetClientID(ClientID) + opts.SetDefaultPublishHandler(f) + if Config.EnableTLS { + tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert} + opts.SetTLSConfig(tlsConfig) + } + + return opts +} + +func Disconnect() { + client.Disconnect(250) +} + +var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) { + fmt.Printf("TOPIC: %s\n", msg.Topic()) + fmt.Printf("MSG: %s\n", msg.Payload()) +}