Commit 29314085 authored by Stefan Dähling's avatar Stefan Dähling
Browse files

df close

parent 15bd3bce
Pipeline #290170 passed with stages
in 1 minute and 9 seconds
......@@ -145,4 +145,5 @@ func (agent *Agent) Terminate() {
agent.ACL.close()
agent.Logger.close()
agent.MQTT.close()
agent.DF.close()
}
......@@ -200,3 +200,16 @@ func newDF(masID int, agentID int, nodeID int, logErr *log.Logger, logInf *log.L
df.registeredServices = make(map[string]schemas.Service)
return
}
// close closes the DF module
func (df *DF) close() {
for d := range df.registeredServices {
svc := df.registeredServices[d]
df.DeregisterService(svc.GUID)
}
df.mutex.Lock()
df.logInfo.Println("Closing DF of agent ", df.agentID)
df.active = false
df.mutex.Unlock()
return
}
......@@ -59,6 +59,7 @@ import (
type MQTT struct {
client *mqttClient
mutex *sync.Mutex // mutex for message inbox map
subTopic map[string]interface{} // subscribed topics
msgInTopic map[string]chan schemas.MQTTMessage // message inbox for messages with specified topic
msgIn chan schemas.MQTTMessage // mqtt message inbox
agentID int
......@@ -80,6 +81,7 @@ func newMQTT(agentID int, cli *mqttClient, cmaplog *Logger, logErr *log.Logger,
logInfo: logInf,
active: true,
}
mq.subTopic = make(map[string]interface{})
mq.msgInTopic = make(map[string]chan schemas.MQTTMessage)
mq.msgIn = make(chan schemas.MQTTMessage)
return
......@@ -87,8 +89,11 @@ func newMQTT(agentID int, cli *mqttClient, cmaplog *Logger, logErr *log.Logger,
// close closes the mqtt
func (mq *MQTT) close() {
for t := range mq.subTopic {
mq.Unsubscribe(t)
}
mq.mutex.Lock()
mq.logInfo.Println("Closing Logger of agent ", mq.agentID)
mq.logInfo.Println("Closing MQTT of agent ", mq.agentID)
mq.active = false
mq.mutex.Unlock()
return
......@@ -102,6 +107,13 @@ func (mq *MQTT) Subscribe(topic string, qos int) (err error) {
err = errors.New("mqtt not active")
return
}
_, ok := mq.subTopic[topic]
mq.mutex.Unlock()
if ok {
return
}
mq.mutex.Lock()
mq.subTopic[topic] = nil
mq.mutex.Unlock()
err = mq.client.subscribe(mq, topic, qos)
return
......@@ -115,6 +127,13 @@ func (mq *MQTT) Unsubscribe(topic string) (err error) {
err = errors.New("mqtt not active")
return
}
_, ok := mq.subTopic[topic]
mq.mutex.Unlock()
if !ok {
return
}
mq.mutex.Lock()
delete(mq.subTopic, topic)
mq.mutex.Unlock()
err = mq.client.unsubscribe(mq, topic)
return
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment