Write data in database table
This commit is contained in:
parent
8f7a285289
commit
ac3bb1ebe9
|
@ -1,32 +0,0 @@
|
||||||
package Config
|
|
||||||
|
|
||||||
import (
|
|
||||||
//"fmt"
|
|
||||||
"log"
|
|
||||||
|
|
||||||
"github.com/pelletier/go-toml"
|
|
||||||
)
|
|
||||||
|
|
||||||
var BrokerAddress = ""
|
|
||||||
var BrokerPort int64 = 8883
|
|
||||||
var BrokerClientId = "go_client"
|
|
||||||
var BrokerUsername = ""
|
|
||||||
var BrokerPassword = ""
|
|
||||||
var EnableTLS = true
|
|
||||||
|
|
||||||
func ReadConfig() {
|
|
||||||
config, err := toml.LoadFile("config.toml")
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
//fmt.Println("Error ", err.Error())
|
|
||||||
log.Fatal(err.Error())
|
|
||||||
} else {
|
|
||||||
// retrieve data directly
|
|
||||||
BrokerAddress = config.Get("BROKER.Address").(string)
|
|
||||||
BrokerPort = config.Get("BROKER.Port").(int64)
|
|
||||||
BrokerClientId = config.Get("BROKER.ClientId").(string)
|
|
||||||
BrokerUsername = config.Get("BROKER.Username").(string)
|
|
||||||
BrokerPassword = config.Get("BROKER.Password").(string)
|
|
||||||
EnableTLS = config.Get("BROKER.EnableTLS").(bool)
|
|
||||||
}
|
|
||||||
}
|
|
39
config/config.go
Normal file
39
config/config.go
Normal file
|
@ -0,0 +1,39 @@
|
||||||
|
package config
|
||||||
|
|
||||||
|
import (
|
||||||
|
//"fmt"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/pelletier/go-toml"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Config struct {
|
||||||
|
BrokerAddress string
|
||||||
|
BrokerPort int64
|
||||||
|
BrokerClientID string
|
||||||
|
BrokerUsername string
|
||||||
|
BrokerPassword string
|
||||||
|
EnableTLS bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConfig(configFilename string) (*Config, error) {
|
||||||
|
tomlData, err := toml.LoadFile(configFilename)
|
||||||
|
|
||||||
|
conf := Config{"", 8883, "go_client", "", "", true}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
//fmt.Println("Error ", err.Error())
|
||||||
|
log.Printf("ERROR: NewConfig(): %s", err.Error())
|
||||||
|
return nil, err
|
||||||
|
} else {
|
||||||
|
// retrieve data directly
|
||||||
|
conf.BrokerAddress = tomlData.Get("BROKER.Address").(string)
|
||||||
|
conf.BrokerPort = tomlData.Get("BROKER.Port").(int64)
|
||||||
|
conf.BrokerClientID = tomlData.Get("BROKER.ClientId").(string)
|
||||||
|
conf.BrokerUsername = tomlData.Get("BROKER.Username").(string)
|
||||||
|
conf.BrokerPassword = tomlData.Get("BROKER.Password").(string)
|
||||||
|
conf.EnableTLS = tomlData.Get("BROKER.EnableTLS").(bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &conf, nil
|
||||||
|
}
|
31
env/env.go
vendored
Normal file
31
env/env.go
vendored
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
package env
|
||||||
|
|
||||||
|
import (
|
||||||
|
"mqttListener/config"
|
||||||
|
"mqttListener/models"
|
||||||
|
|
||||||
|
"fmt"
|
||||||
|
//"log"
|
||||||
|
|
||||||
|
"net/http"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Env struct {
|
||||||
|
DB models.Datastore
|
||||||
|
Config *config.Config
|
||||||
|
}
|
||||||
|
|
||||||
|
func (env *Env) DevicesIndex(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != "GET" {
|
||||||
|
http.Error(w, http.StatusText(405), 405)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
devices, err := env.DB.AllDevices()
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, http.StatusText(500), 500)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, device := range devices {
|
||||||
|
fmt.Fprintf(w, "%s, %s, %s, £%.2f\n", device.ID, device.MAC, device.SN, device.LastMsg)
|
||||||
|
}
|
||||||
|
}
|
2
go.mod
2
go.mod
|
@ -8,4 +8,6 @@ require (
|
||||||
github.com/mmcdole/gofeed v1.0.0
|
github.com/mmcdole/gofeed v1.0.0
|
||||||
github.com/pelletier/go-toml v1.2.0
|
github.com/pelletier/go-toml v1.2.0
|
||||||
github.com/spf13/cobra v1.0.0
|
github.com/spf13/cobra v1.0.0
|
||||||
|
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
|
||||||
|
golang.org/x/tools v0.0.0-20200831203904-5a2aa26beb65 // indirect
|
||||||
)
|
)
|
||||||
|
|
35
main.go
35
main.go
|
@ -1,7 +1,8 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"mqttListener/Config"
|
"mqttListener/config"
|
||||||
|
"mqttListener/env"
|
||||||
"mqttListener/models"
|
"mqttListener/models"
|
||||||
"mqttListener/mqtt"
|
"mqttListener/mqtt"
|
||||||
|
|
||||||
|
@ -15,32 +16,31 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Env struct {
|
|
||||||
db models.Datastore
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
|
||||||
c := make(chan os.Signal, 1)
|
c := make(chan os.Signal, 1)
|
||||||
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
|
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
|
||||||
|
|
||||||
Config.ReadConfig()
|
config, err := config.NewConfig("config.toml")
|
||||||
|
if err != nil {
|
||||||
|
log.Panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
// DEBUG Config
|
// DEBUG Config
|
||||||
// fmt.Println("BrokerAddress = ", Config.BrokerAddress)
|
fmt.Println("BrokerAddress = ", config.BrokerAddress)
|
||||||
|
|
||||||
db, err := models.NewDB("simple.sqlite")
|
db, err := models.NewDB("simple.sqlite")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panic(err)
|
log.Panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
env := &Env{db}
|
env := &env.Env{db, config}
|
||||||
|
|
||||||
mqtt.Setup()
|
mqtt.Setup(env)
|
||||||
mqtt.Connect()
|
mqtt.Connect()
|
||||||
go mqtt.Listen()
|
go mqtt.Listen()
|
||||||
|
|
||||||
http.HandleFunc("/devices", env.devicesIndex)
|
http.HandleFunc("/devices", env.DevicesIndex)
|
||||||
go http.ListenAndServe(":3000", nil)
|
go http.ListenAndServe(":3000", nil)
|
||||||
|
|
||||||
fmt.Println("awaiting signal")
|
fmt.Println("awaiting signal")
|
||||||
|
@ -49,18 +49,3 @@ func main() {
|
||||||
|
|
||||||
mqtt.Disconnect()
|
mqtt.Disconnect()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (env *Env) devicesIndex(w http.ResponseWriter, r *http.Request) {
|
|
||||||
if r.Method != "GET" {
|
|
||||||
http.Error(w, http.StatusText(405), 405)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
devices, err := env.db.AllDevices()
|
|
||||||
if err != nil {
|
|
||||||
http.Error(w, http.StatusText(500), 500)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, device := range devices {
|
|
||||||
fmt.Fprintf(w, "%s, %s, %s, £%.2f\n", device.ID, device.MAC, device.SN, device.LastMsg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,5 +1,9 @@
|
||||||
package models
|
package models
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
type Device struct {
|
type Device struct {
|
||||||
ID string
|
ID string
|
||||||
MAC string
|
MAC string
|
||||||
|
@ -30,12 +34,15 @@ func (db *DB) AllDevices() ([]*Device, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) InsertDevice(device *Device) error {
|
func (db *DB) InsertDevice(device *Device) error {
|
||||||
sqlStmt := `INSERT OR REPLACE INTO table (id, mac, sn, lastMsg)
|
sqlStmt := `INSERT OR REPLACE INTO devices (id, mac, sn, lastMsg)
|
||||||
VALUES($1, $2, $3, $4);`
|
VALUES($1, $2, $3, $4);`
|
||||||
_, err := db.Exec(sqlStmt, device.ID, device.MAC, device.SN, device.LastMsg)
|
_, err := db.Exec(sqlStmt, device.ID, device.MAC, device.SN, device.LastMsg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DEBUG
|
||||||
|
fmt.Printf("Sucessfully inserted device with ID %s to database\n", device.ID)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
32
mqtt/mqtt.go
32
mqtt/mqtt.go
|
@ -8,7 +8,7 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"mqttListener/Config"
|
"mqttListener/env"
|
||||||
"mqttListener/models"
|
"mqttListener/models"
|
||||||
"regexp"
|
"regexp"
|
||||||
"time"
|
"time"
|
||||||
|
@ -22,18 +22,11 @@ import (
|
||||||
URI = mqtt://<user>:<pass>@<server>:<port>
|
URI = mqtt://<user>:<pass>@<server>:<port>
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/* TODO: This options are just a suggestion
|
|
||||||
Options:
|
|
||||||
[-help] Display help
|
|
||||||
[-customer] Customer- or project number
|
|
||||||
[-device] Device number
|
|
||||||
[-uri <uri>] Broker URI
|
|
||||||
*/
|
|
||||||
|
|
||||||
var client MQTT.Client
|
var client MQTT.Client
|
||||||
var opts *MQTT.ClientOptions
|
var opts *MQTT.ClientOptions
|
||||||
var topic string
|
var topic string
|
||||||
var uri url.URL
|
var uri url.URL
|
||||||
|
var ev *env.Env
|
||||||
|
|
||||||
func Connect() {
|
func Connect() {
|
||||||
client = MQTT.NewClient(opts)
|
client = MQTT.NewClient(opts)
|
||||||
|
@ -64,17 +57,19 @@ func Listen() {
|
||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
func Setup() {
|
func Setup(environment *env.Env) {
|
||||||
|
|
||||||
|
ev = environment
|
||||||
|
|
||||||
scheme := "tcp://"
|
scheme := "tcp://"
|
||||||
if Config.EnableTLS {
|
if ev.Config.EnableTLS {
|
||||||
scheme = "ssl://"
|
scheme = "ssl://"
|
||||||
}
|
}
|
||||||
|
|
||||||
urlString := scheme
|
urlString := scheme
|
||||||
urlString += Config.BrokerUsername + ":" + Config.BrokerPassword + "@"
|
urlString += ev.Config.BrokerUsername + ":" + ev.Config.BrokerPassword + "@"
|
||||||
urlString += Config.BrokerAddress + ":"
|
urlString += ev.Config.BrokerAddress + ":"
|
||||||
urlString += strconv.FormatInt(Config.BrokerPort, 10)
|
urlString += strconv.FormatInt(ev.Config.BrokerPort, 10)
|
||||||
|
|
||||||
fmt.Println("broker urlString: ", urlString)
|
fmt.Println("broker urlString: ", urlString)
|
||||||
|
|
||||||
|
@ -89,7 +84,7 @@ func Setup() {
|
||||||
|
|
||||||
topic = "/ATB/#"
|
topic = "/ATB/#"
|
||||||
|
|
||||||
opts = createClientOptions(Config.BrokerClientId, uri)
|
opts = createClientOptions(ev.Config.BrokerClientID, uri)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,7 +96,7 @@ func createClientOptions(ClientID string, uri *url.URL) *MQTT.ClientOptions {
|
||||||
opts.SetPassword(password)
|
opts.SetPassword(password)
|
||||||
opts.SetClientID(ClientID)
|
opts.SetClientID(ClientID)
|
||||||
opts.SetDefaultPublishHandler(f)
|
opts.SetDefaultPublishHandler(f)
|
||||||
if Config.EnableTLS {
|
if ev.Config.EnableTLS {
|
||||||
tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert}
|
tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert}
|
||||||
opts.SetTLSConfig(tlsConfig)
|
opts.SetTLSConfig(tlsConfig)
|
||||||
}
|
}
|
||||||
|
@ -180,4 +175,9 @@ var subscriptionHandler = func(client MQTT.Client, msg MQTT.Message) {
|
||||||
fmt.Printf("Device SN = %s\n", device.SN)
|
fmt.Printf("Device SN = %s\n", device.SN)
|
||||||
|
|
||||||
// TODO: store this device in database
|
// TODO: store this device in database
|
||||||
|
err = ev.DB.InsertDevice(&device)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user