Commit 6c33f0cd authored by Stefan Dähling's avatar Stefan Dähling
Browse files

use paho mqtt message type

parent e590f503
Pipeline #547507 passed with stages
in 3 minutes and 16 seconds
......@@ -52,7 +52,9 @@ import (
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/agency"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
// "git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func main() {
......@@ -62,9 +64,9 @@ func main() {
}
}
func display(msg schemas.MQTTMessage) (err error) {
func display(msg mqtt.Message) (err error) {
time.Sleep(5 * time.Second)
fmt.Println(string(msg.Content))
fmt.Println(string(msg.Payload()))
return
}
......@@ -98,10 +100,7 @@ func task(ag *agency.Agent) (err error) {
for i := 0; i < 20; i++ {
time.Sleep(5 * time.Second)
msg := "test message" + strconv.Itoa(i)
MQTTMsg, err := ag.MQTT.NewMessage("topic1", []byte(msg))
if err == nil {
ag.MQTT.SendMessage(MQTTMsg, 1)
}
ag.MQTT.Publish("topic1", 1, msg)
}
}
......@@ -175,10 +174,7 @@ func task_test(ag *agency.Agent) (err error) {
for i := 0; i < 20; i++ {
time.Sleep(5 * time.Second)
msg := "test message" + strconv.Itoa(i)
MQTTMsg, err := ag.MQTT.NewMessage("topic1", []byte(msg))
if err == nil {
ag.MQTT.SendMessage(MQTTMsg, 1)
}
ag.MQTT.Publish("topic1", 1, msg)
}
}
......
......@@ -50,6 +50,7 @@ import (
"time"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// Behavior defines execution of a certain behavior
......@@ -151,17 +152,17 @@ func (protBehavior *aclProtocolBehavior) Stop() {
// mqttTopicBehavior describes how mqtt messages with a certain topic should be handled
type mqttTopicBehavior struct {
ag *Agent // agent
topic string // indicates for which protocol handler should be used
handle func(schemas.MQTTMessage) error // handler function
msgIn chan schemas.MQTTMessage // msg inbox
ctrl chan int // control signals
ag *Agent // agent
topic string // indicates for which protocol handler should be used
handle func(mqtt.Message) error // handler function
msgIn chan mqtt.Message // msg inbox
ctrl chan int // control signals
logInfo *log.Logger
}
// NewMQTTTopicBehavior creates a new handler for messages of the specified topic
func (agent *Agent) NewMQTTTopicBehavior(topic string,
handle func(schemas.MQTTMessage) error) (behavior Behavior, err error) {
handle func(mqtt.Message) error) (behavior Behavior, err error) {
if handle == nil {
err = errors.New("illegal handler")
return
......@@ -170,7 +171,7 @@ func (agent *Agent) NewMQTTTopicBehavior(topic string,
ag: agent,
topic: topic,
handle: handle,
msgIn: make(chan schemas.MQTTMessage, 100),
msgIn: make(chan mqtt.Message, 100),
ctrl: make(chan int, 10),
logInfo: agent.logInfo,
}
......@@ -205,7 +206,7 @@ func (mqttBehavior *mqttTopicBehavior) task() {
end := time.Now()
mqttBehavior.ag.Logger.NewBehStats(start, end, "mqtt")
mqttBehavior.ag.Logger.NewLog("beh", "mqtt topic behavior task", "start: "+start.String()+
";end: "+end.String()+";duration:"+end.Sub(start).String()+";"+msg.String())
";end: "+end.String()+";duration:"+end.Sub(start).String()+";"+mqttMsgToString(msg))
case command := <-mqttBehavior.ctrl:
switch command {
case -1:
......
......@@ -46,6 +46,7 @@ package agency
import (
"errors"
"fmt"
"log"
"strconv"
"sync"
......@@ -57,12 +58,12 @@ import (
// mqttCollector is the agency client for mqtt
type mqttCollector struct {
client mqtt.Client // mqtt client
msgIn chan schemas.MQTTMessage // mqtt message inbox
name string // agency name
config schemas.MQTTConfig // indicates if mqtt is active (switch via env)
mutex *sync.Mutex // mutex for message inbox map
subscription map[string][]*AgentMQTT // map for subscription topics to agents' mqtt object
client mqtt.Client // mqtt client
msgIn chan mqtt.Message // mqtt message inbox
name string // agency name
config schemas.MQTTConfig // indicates if mqtt is active (switch via env)
mutex *sync.Mutex // mutex for message inbox map
subscription map[string][]*AgentMQTT // map for subscription topics to agents' mqtt object
// numDeliverer int // number of go routines for delivery
logError *log.Logger
logInfo *log.Logger
......@@ -78,7 +79,7 @@ func newMQTTCollector(config schemas.MQTTConfig, name string, logErr *log.Logger
logInfo: logInf,
config: config,
}
col.msgIn = make(chan schemas.MQTTMessage, 1000)
col.msgIn = make(chan mqtt.Message, 1000)
col.subscription = make(map[string][]*AgentMQTT)
col.logInfo.Println("Created new MQTT client; status: ", col.config.Active)
col.init()
......@@ -114,10 +115,10 @@ func (mqttCol *mqttCollector) close() (err error) {
// newIncomingMQTTMessage adds message to channel for incoming messages
func (mqttCol *mqttCollector) newIncomingMQTTMessage(client mqtt.Client, msg mqtt.Message) {
var mqttMsg schemas.MQTTMessage
mqttMsg.Content = msg.Payload()
mqttMsg.Topic = msg.Topic()
mqttCol.msgIn <- mqttMsg
// var mqttMsg schemas.MQTTMessage
// mqttMsg.Content = msg.Payload()
// mqttMsg.Topic = msg.Topic()
mqttCol.msgIn <- msg
}
// subscribe subscribes to specified topics
......@@ -211,9 +212,9 @@ func (mqttCol *mqttCollector) unsubscribe(mq *AgentMQTT, topic string) (err erro
}
// publish sends a message
func (mqttCol *mqttCollector) publish(msg schemas.MQTTMessage, qos int) (err error) {
func (mqttCol *mqttCollector) publish(topic string, qos int, payload interface{}) (err error) {
if mqttCol.config.Active {
token := mqttCol.client.Publish(msg.Topic, byte(qos), false, msg.Content)
token := mqttCol.client.Publish(topic, byte(qos), false, payload)
token.Wait()
}
return
......@@ -221,11 +222,11 @@ func (mqttCol *mqttCollector) publish(msg schemas.MQTTMessage, qos int) (err err
// deliverMsg delivers incoming messages to agents according to their topic
func (mqttCol *mqttCollector) deliverMsgs() {
var msg schemas.MQTTMessage
var msg mqtt.Message
for {
msg = <-mqttCol.msgIn
mqttCol.mutex.Lock()
ag, ok := mqttCol.subscription[msg.Topic]
ag, ok := mqttCol.subscription[msg.Topic()]
mqttCol.mutex.Unlock()
if ok {
for i := range ag {
......@@ -238,10 +239,10 @@ func (mqttCol *mqttCollector) deliverMsgs() {
// AgentMQTT provides functions to subscribe and publish via mqtt
type AgentMQTT struct {
collector *mqttCollector
mutex *sync.Mutex // mutex for message inbox map
subTopic map[string]interface{} // subscribed topics
msgInTopic map[string]chan schemas.MQTTMessage // message inbox for messages with specified topic
msgIn chan schemas.MQTTMessage // mqtt message inbox
mutex *sync.Mutex // mutex for message inbox map
subTopic map[string]interface{} // subscribed topics
msgInTopic map[string]chan mqtt.Message // message inbox for messages with specified topic
msgIn chan mqtt.Message // mqtt message inbox
agentID int
logger *client.AgentLogger
logError *log.Logger
......@@ -262,8 +263,8 @@ func (mqttCol *mqttCollector) newAgentMQTT(agentID int, cmaplog *client.AgentLog
active: mqttCol.config.Active,
}
mq.subTopic = make(map[string]interface{})
mq.msgInTopic = make(map[string]chan schemas.MQTTMessage)
mq.msgIn = make(chan schemas.MQTTMessage)
mq.msgInTopic = make(map[string]chan mqtt.Message)
mq.msgIn = make(chan mqtt.Message)
return
}
......@@ -318,8 +319,8 @@ func (mq *AgentMQTT) Unsubscribe(topic string) (err error) {
return
}
// SendMessage sends a message
func (mq *AgentMQTT) SendMessage(msg schemas.MQTTMessage, qos int) (err error) {
// Publish sends a message
func (mq *AgentMQTT) Publish(topic string, qos int, payload interface{}) (err error) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
......@@ -327,24 +328,24 @@ func (mq *AgentMQTT) SendMessage(msg schemas.MQTTMessage, qos int) (err error) {
return
}
mq.mutex.Unlock()
err = mq.collector.publish(msg, qos)
err = mq.collector.publish(topic, qos, payload)
if err != nil {
return
}
err = mq.logger.NewLog("msg", "MQTT publish", msg.String())
err = mq.logger.NewLog("msg", "MQTT publish", "Topic: "+topic+";Content: "+fmt.Sprintf("%v", payload))
return
}
// NewMessage returns a new initiaized message
func (mq *AgentMQTT) NewMessage(topic string, content []byte) (msg schemas.MQTTMessage, err error) {
msg.Topic = topic
msg.Content = content
err = nil
return
}
// // NewMessage returns a new initiaized message
// func (mq *AgentMQTT) NewMessage(topic string, content []byte) (msg mqtt.Message, err error) {
// msg.Topic = topic
// msg.Content = content
// err = nil
// return
// }
// RecvMessages retrieves all messages since last call of this function
func (mq *AgentMQTT) RecvMessages() (num int, msgs []schemas.MQTTMessage, err error) {
func (mq *AgentMQTT) RecvMessages() (num int, msgs []mqtt.Message, err error) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
......@@ -366,7 +367,7 @@ func (mq *AgentMQTT) RecvMessages() (num int, msgs []schemas.MQTTMessage, err er
}
// RecvMessageWait retrieves next message and blocks if no message is available
func (mq *AgentMQTT) RecvMessageWait() (msg schemas.MQTTMessage, err error) {
func (mq *AgentMQTT) RecvMessageWait() (msg mqtt.Message, err error) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
......@@ -380,16 +381,16 @@ func (mq *AgentMQTT) RecvMessageWait() (msg schemas.MQTTMessage, err error) {
}
// newIncomingMQTTMessage adds message to channel for incoming messages
func (mq *AgentMQTT) newIncomingMQTTMessage(msg schemas.MQTTMessage) {
func (mq *AgentMQTT) newIncomingMQTTMessage(msg mqtt.Message) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
return
}
mq.mutex.Unlock()
mq.logger.NewLog("msg", "MQTT receive", msg.String())
mq.logger.NewLog("msg", "MQTT receive", mqttMsgToString(msg))
mq.mutex.Lock()
inbox, ok := mq.msgInTopic[msg.Topic]
inbox, ok := mq.msgInTopic[msg.Topic()]
mq.mutex.Unlock()
if ok {
inbox <- msg
......@@ -399,7 +400,7 @@ func (mq *AgentMQTT) newIncomingMQTTMessage(msg schemas.MQTTMessage) {
}
func (mq *AgentMQTT) registerTopicChannel(topic string,
topicChan chan schemas.MQTTMessage) (err error) {
topicChan chan mqtt.Message) (err error) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
......@@ -426,3 +427,9 @@ func (mq *AgentMQTT) deregisterTopicChannel(topic string) (err error) {
mq.mutex.Unlock()
return
}
// String outputs message
func mqttMsgToString(msg mqtt.Message) (ret string) {
ret = "Topic: " + msg.Topic() + ";Content: " + string(msg.Payload())
return
}
......@@ -53,11 +53,11 @@ type MQTTConfig struct {
Port int `json:"port,omitempty"` // port of MQTT
}
// MQTTMessage struct representing mqtt message
type MQTTMessage struct {
Topic string // Topic of message
Content []byte // Denotes the content of the message
}
// // MQTTMessage struct representing mqtt message
// type MQTTMessage struct {
// Topic string // Topic of message
// Content []byte // Denotes the content of the message
// }
// behStats contains behavior information
type BehStats struct {
......@@ -77,9 +77,3 @@ type StatsInfo struct {
Average float32 `json:"average"`
List []BehStats `json:"list"`
}
// String outputs message
func (msg MQTTMessage) String() (ret string) {
ret = "Topic: " + msg.Topic + ";Content: " + string(msg.Content)
return
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment