package mqtt import ( "crypto/tls" "fmt" "log" "net/url" "strconv" "strings" "mqttListener/env" "mqttListener/models" "regexp" "time" MQTT "github.com/eclipse/paho.mqtt.golang" "encoding/json" ) /* URI = mqtt://:@: */ var client MQTT.Client var opts *MQTT.ClientOptions var topic string var uri url.URL var ev *env.Env func Connect() { client = MQTT.NewClient(opts) token := client.Connect() for !token.WaitTimeout(3 * time.Second) { } if err := token.Error(); err != nil { log.Fatal(err) return } fmt.Println("connected to broker, topic = ", topic) } func Listen() { if !client.IsConnected() { log.Fatal("Client is not connected") return } client.Subscribe(topic, 0, subscriptionHandler) /* client.Subscribe(topic, 0, func(client MQTT.Client, msg MQTT.Message) { fmt.Printf("---------------------------------------------------\n") fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("MSG: \n%s\n", msg.Payload()) //fmt.Printf("* [%s] %s\n", msg.Topic(), string(msg.Payload())) }) */ } func Setup(environment *env.Env) { ev = environment scheme := "tcp://" if ev.Config.EnableTLS { scheme = "ssl://" } urlString := scheme urlString += ev.Config.BrokerUsername + ":" + ev.Config.BrokerPassword + "@" urlString += ev.Config.BrokerAddress + ":" urlString += strconv.FormatInt(ev.Config.BrokerPort, 10) fmt.Println("broker urlString: ", urlString) uri, err := url.Parse(urlString) if err != nil { log.Fatal(err) } fmt.Println("broker URI: ", *uri) //fmt.Println("customer: ", *customer) //fmt.Println("device: ", *device) topic = "/ATB/#" opts = createClientOptions(ev.Config.BrokerClientID, uri) } func createClientOptions(ClientID string, uri *url.URL) *MQTT.ClientOptions { opts := MQTT.NewClientOptions() opts.AddBroker(uri.Scheme + "://" + uri.Host) opts.SetUsername(uri.User.Username()) password, _ := uri.User.Password() opts.SetPassword(password) opts.SetClientID(ClientID) opts.SetDefaultPublishHandler(f) if ev.Config.EnableTLS { tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert} opts.SetTLSConfig(tlsConfig) } return opts } func Disconnect() { client.Disconnect(250) } var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) { fmt.Printf("---------------------------------------------------\n") fmt.Printf("Default Message Handler:\n") fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("MSG: %s\n", msg.Payload()) } var subscriptionHandler = func(client MQTT.Client, msg MQTT.Message) { // DEBUG fmt.Printf("---------------------------------------------------\n") fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("MSG: \n%s\n", msg.Payload()) //fmt.Printf("* [%s] %s\n", msg.Topic(), string(msg.Payload())) //------------------------------------------------------------------- // create ID from topic regexpWithType := "\\/ATB\\/[A-Z]+\\/[0-9]+\\/[0-9]+\\/mo" regexpDefault := "\\/ATB\\/[0-9]+\\/[0-9]+\\/[0-9]+\\/[0-9]+\\/mo" topicSlice := strings.Split(msg.Topic(), "/") matchedWithType, err := regexp.MatchString(regexpWithType, msg.Topic()) if err != nil { log.Printf("ERROR: matchedWithType: %s", err.Error()) } matchedDefault, err := regexp.MatchString(regexpDefault, msg.Topic()) if err != nil { log.Printf("ERROR: matchedDefault: %s", err.Error()) } var deviceID string if matchedWithType { fmt.Printf("Topic matched regexpWithType\n") deviceID = topicSlice[2] + "_" + topicSlice[3] + "_" + topicSlice[4] } else if matchedDefault { fmt.Printf("Topic matched regexpDefault\n") deviceID = topicSlice[2] + "_" + topicSlice[3] + "_" + topicSlice[4] + "_" + topicSlice[5] } else { log.Printf("ERROR: no matching topic: %s", msg.Topic()) return } fmt.Printf("Generated deviceID = %s\n", deviceID) var device models.Device device.ID = deviceID device.MAC = "t.b.d." device.SN = "t.b.d." device.LastMsg = "t.b.d" //------------------------------------------------------------------- // extract data from payload bytes := []byte(msg.Payload()) mo := models.Mo{} err = json.Unmarshal(bytes, &mo) if err != nil { fmt.Println(err.Error()) return } device.MAC = mo.ID.PTU4_MAC device.SN = mo.ID.PTU4_SN fmt.Printf("Device MAC = %s\n", device.MAC) fmt.Printf("Device SN = %s\n", device.SN) // TODO: store this device in database err = ev.DB.InsertDevice(&device) if err != nil { fmt.Println(err.Error()) return } }