Commit db048686 authored by Stefan Dähling's avatar Stefan Dähling
Browse files

close logger and mqtt when terminating agent

parent ba157a90
Pipeline #289062 passed with stages
in 2 minutes and 42 seconds
......@@ -105,8 +105,10 @@ func newACL(agentID int, msgIn chan schemas.ACLMessage,
// close closes the acl
func (acl *ACL) close() {
acl.mutex.Lock()
acl.logInfo.Println("Closing ACL of agent ", acl.agentID)
acl.active = false
acl.mutex.Unlock()
return
}
......@@ -127,6 +129,13 @@ func (acl *ACL) NewMessage(receiver int, prot int, perf int,
// RecvMessages retrieves all messages since last call of this function
func (acl *ACL) RecvMessages() (num int, msgs []schemas.ACLMessage, err error) {
acl.mutex.Lock()
if !acl.active {
acl.mutex.Unlock()
err = errors.New("acl not active")
return
}
acl.mutex.Unlock()
num = 0
err = nil
for {
......@@ -142,6 +151,13 @@ func (acl *ACL) RecvMessages() (num int, msgs []schemas.ACLMessage, err error) {
// RecvMessageWait retrieves next message and blocks if no message is available
func (acl *ACL) RecvMessageWait() (msg schemas.ACLMessage, err error) {
acl.mutex.Lock()
if !acl.active {
acl.mutex.Unlock()
err = errors.New("acl not active")
return
}
acl.mutex.Unlock()
err = nil
msg = <-acl.msgIn
return
......@@ -152,7 +168,13 @@ func (acl *ACL) SendMessage(msg schemas.ACLMessage) (err error) {
var aclRecv *ACL
var ok bool
msg.Timestamp = time.Now()
acl.mutex.Lock()
if !acl.active {
acl.mutex.Unlock()
return errors.New("acl not active")
}
msg.Sender = acl.agentID
aclRecv, ok = acl.addrBook[msg.Receiver]
acl.mutex.Unlock()
......@@ -191,9 +213,12 @@ func (acl *ACL) SendMessage(msg schemas.ACLMessage) (err error) {
// newIncomingMessage adds message to channel for incoming messages
func (acl *ACL) newIncomingMessage(msg schemas.ACLMessage) (err error) {
acl.mutex.Lock()
if !acl.active {
acl.mutex.Unlock()
return errors.New("acl not active")
}
acl.mutex.Unlock()
acl.logInfo.Println("New message for agent ", msg.Receiver)
acl.mutex.Lock()
inbox, ok := acl.msgInProtocol[msg.Protocol]
......@@ -216,6 +241,10 @@ func (acl *ACL) newIncomingMessage(msg schemas.ACLMessage) (err error) {
func (acl *ACL) registerProtocolChannel(prot int,
protChannel chan schemas.ACLMessage) (err error) {
acl.mutex.Lock()
if !acl.active {
acl.mutex.Unlock()
return errors.New("acl not active")
}
_, ok := acl.msgInProtocol[prot]
acl.mutex.Unlock()
if !ok {
......@@ -229,6 +258,10 @@ func (acl *ACL) registerProtocolChannel(prot int,
// deregisterProtocolChannel deregisters the protocol channel with the messaging service
func (acl *ACL) deregisterProtocolChannel(prot int) (err error) {
acl.mutex.Lock()
if !acl.active {
acl.mutex.Unlock()
return errors.New("acl not active")
}
_, ok := acl.msgInProtocol[prot]
acl.mutex.Unlock()
if ok {
......
......@@ -135,4 +135,6 @@ func (agent *Agent) GetCustomData() (ret string) {
func (agent *Agent) Terminate() {
agent.logInfo.Println("Terminating agent ", agent.GetAgentID())
agent.ACL.close()
agent.Logger.close()
agent.MQTT.close()
}
......@@ -209,8 +209,8 @@ func (agency *Agency) handleAgentID(agid int, w http.ResponseWriter, r *http.Req
httpErr error) {
if r.Method == "DELETE" {
// delete specified agent
go agency.removeAgent(agid)
httpErr = httpreply.Deleted(w, nil)
cmapErr = agency.removeAgent(agid)
httpErr = httpreply.Deleted(w, cmapErr)
} else {
httpErr = httpreply.MethodNotAllowed(w)
......
......@@ -64,10 +64,17 @@ type Logger struct {
config schemas.LogConfig
logError *log.Logger
logInfo *log.Logger
active bool
}
// NewLog sends a new logging message to the logging service
func (log *Logger) NewLog(logType string, message string, data string) (err error) {
log.mutex.Lock()
if !log.active {
log.mutex.Unlock()
return errors.New("log not active")
}
log.mutex.Unlock()
if logType != "error" && logType != "debug" && logType != "status" && logType != "msg" &&
logType != "app" {
err = errors.New("UnknownLogType")
......@@ -95,6 +102,12 @@ func (log *Logger) NewLog(logType string, message string, data string) (err erro
// UpdateState overrides the state stored in database
func (log *Logger) UpdateState(state string) (err error) {
log.mutex.Lock()
if !log.active {
log.mutex.Unlock()
return errors.New("log not active")
}
log.mutex.Unlock()
agState := schemas.State{
MASID: log.client.masID,
AgentID: log.agentID,
......@@ -106,12 +119,52 @@ func (log *Logger) UpdateState(state string) (err error) {
// RestoreState loads state saved in database and return it
func (log *Logger) RestoreState() (state string, err error) {
log.mutex.Lock()
if !log.active {
log.mutex.Unlock()
err = errors.New("log not active")
return
}
log.mutex.Unlock()
var agState schemas.State
agState, _, err = client.GetState(log.client.masID, log.agentID)
state = agState.State
return
}
// newLogger craetes a new object of type Logger
func newLogger(agentID int, client *loggerClient, config schemas.LogConfig, logErr *log.Logger,
logInf *log.Logger) (log *Logger) {
log = &Logger{
agentID: agentID,
client: client,
mutex: &sync.Mutex{},
config: config,
logError: logErr,
logInfo: logInf,
active: true,
}
return
}
// close closes the logger
func (log *Logger) close() {
log.mutex.Lock()
log.logInfo.Println("Closing Logger of agent ", log.agentID)
log.active = false
log.mutex.Unlock()
return
}
// loggerClient is the agency client for the logger
type loggerClient struct {
masID int
logIn chan schemas.LogMessage // logging inbox
active bool // indicates if logging is active (switch via env)
logError *log.Logger
logInfo *log.Logger
}
// storeLogs periodically requests the logging service to store log messages
func (log *loggerClient) storeLogs() (err error) {
if log.active {
......@@ -147,30 +200,6 @@ func (log *loggerClient) storeLogs() (err error) {
fmt.Println(logMsg)
}
}
return
}
// newLogger craetes a new object of type Logger
func newLogger(agentID int, client *loggerClient, config schemas.LogConfig, logErr *log.Logger,
logInf *log.Logger) (log *Logger) {
log = &Logger{
agentID: agentID,
client: client,
mutex: &sync.Mutex{},
config: config,
logError: logErr,
logInfo: logInf,
}
return
}
// loggerClient is the agency client for the logger
type loggerClient struct {
masID int
logIn chan schemas.LogMessage // logging inbox
active bool // indicates if logging is active (switch via env)
logError *log.Logger
logInfo *log.Logger
}
// newLoggerClient creates an agency logger client
......
......@@ -65,6 +65,7 @@ type MQTT struct {
cmapLogger *Logger
logError *log.Logger
logInfo *log.Logger
active bool
}
// newMQTT returns a new pubsub connector of type mqtt
......@@ -77,20 +78,44 @@ func newMQTT(agentID int, cli *mqttClient, cmaplog *Logger, logErr *log.Logger,
cmapLogger: cmaplog,
logError: logErr,
logInfo: logInf,
active: true,
}
mq.msgInTopic = make(map[string]chan schemas.MQTTMessage)
mq.msgIn = make(chan schemas.MQTTMessage)
return
}
// close closes the mqtt
func (mq *MQTT) close() {
mq.mutex.Lock()
mq.logInfo.Println("Closing Logger of agent ", mq.agentID)
mq.active = false
mq.mutex.Unlock()
return
}
// Subscribe subscribes to a topic
func (mq *MQTT) Subscribe(topic string, qos int) (err error) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
err = errors.New("mqtt not active")
return
}
mq.mutex.Unlock()
err = mq.client.subscribe(mq, topic, qos)
return
}
// SendMessage sends a message
func (mq *MQTT) SendMessage(msg schemas.MQTTMessage, qos int) (err error) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
err = errors.New("mqtt not active")
return
}
mq.mutex.Unlock()
err = mq.client.publish(msg, qos)
if err != nil {
return
......@@ -109,6 +134,13 @@ func (mq *MQTT) NewMessage(topic string, content []byte) (msg schemas.MQTTMessag
// RecvMessages retrieves all messages since last call of this function
func (mq *MQTT) RecvMessages() (num int, msgs []schemas.MQTTMessage, err error) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
err = errors.New("mqtt not active")
return
}
mq.mutex.Unlock()
num = 0
err = nil
for {
......@@ -124,6 +156,13 @@ func (mq *MQTT) RecvMessages() (num int, msgs []schemas.MQTTMessage, err error)
// RecvMessageWait retrieves next message and blocks if no message is available
func (mq *MQTT) RecvMessageWait() (msg schemas.MQTTMessage, err error) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
err = errors.New("mqtt not active")
return
}
mq.mutex.Unlock()
err = nil
msg = <-mq.msgIn
return
......@@ -131,6 +170,12 @@ func (mq *MQTT) RecvMessageWait() (msg schemas.MQTTMessage, err error) {
// newIncomingMQTTMessage adds message to channel for incoming messages
func (mq *MQTT) newIncomingMQTTMessage(msg schemas.MQTTMessage) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
return
}
mq.mutex.Unlock()
mq.cmapLogger.NewLog("msg", "Received MQTT message: "+string(msg.Content), string(msg.Content))
mq.mutex.Lock()
inbox, ok := mq.msgInTopic[msg.Topic]
......@@ -145,6 +190,12 @@ func (mq *MQTT) newIncomingMQTTMessage(msg schemas.MQTTMessage) {
func (mq *MQTT) registerTopicChannel(topic string, topicChan chan schemas.MQTTMessage) (err error) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
err = errors.New("mqtt not active")
return
}
if _, ok := mq.msgInTopic[topic]; !ok {
mq.msgInTopic[topic] = topicChan
} else {
......@@ -156,6 +207,12 @@ func (mq *MQTT) registerTopicChannel(topic string, topicChan chan schemas.MQTTMe
func (mq *MQTT) deregisterTopicChannel(topic string) (err error) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
err = errors.New("mqtt not active")
return
}
if _, ok := mq.msgInTopic[topic]; ok {
delete(mq.msgInTopic, topic)
} else {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment