Commit f82747df authored by Stefan Dähling's avatar Stefan Dähling
Browse files

agent mqtt refactoring

parent bb93f2ae
Pipeline #440496 passed with stages
in 27 minutes and 14 seconds
......@@ -74,7 +74,7 @@ type Agency struct {
agentTask func(*Agent) error
msgIn chan []schemas.ACLMessage
logCollector *client.LogCollector
mqttClient *mqttClient
mqttCollector *mqttCollector
dfClient *client.DFClient
amsClient *client.AMSClient
agencyClient *client.AgencyClient
......@@ -184,15 +184,11 @@ func (agency *Agency) init() (err error) {
}
agency.mutex.Lock()
agency.logCollector, err = client.NewLogCollector(agency.info.MASID, agency.info.Logger,
agency.logCollector = client.NewLogCollector(agency.info.MASID, agency.info.Logger,
agency.logError, agency.logInfo)
if err != nil {
agency.mutex.Unlock()
return
}
agency.dfClient = client.NewDFClient(agency.info.DF.Host, agency.info.DF.Port,
time.Second*60, time.Second*1, 4)
agency.mqttClient = newMQTTClient("mqtt", 1883, agency.info.Name, agency.logError,
agency.mqttCollector = newMQTTCollector(agency.info.MQTT, agency.info.Name, agency.logError,
agency.logInfo)
agency.mutex.Unlock()
......@@ -211,7 +207,7 @@ func (agency *Agency) terminate(gracefulStop chan os.Signal) {
agency.localAgents[i].Terminate()
}
agency.mutex.Unlock()
agency.mqttClient.close()
agency.mqttCollector.close()
time.Sleep(time.Second * 2)
os.Exit(0)
}
......@@ -249,7 +245,8 @@ func (agency *Agency) createAgent(agentInfo schemas.AgentInfo) (err error) {
msgIn := make(chan schemas.ACLMessage, 1000)
agency.mutex.Lock()
ag := newAgent(agentInfo, msgIn, agency.aclLookup, agency.logCollector, agency.info.Logger,
agency.mqttClient, agency.info.DF.Active, agency.dfClient, agency.logError, agency.logInfo)
agency.mqttCollector, agency.info.DF.Active, agency.dfClient, agency.logError,
agency.logInfo)
agency.localAgents[agentInfo.ID] = ag
agency.mutex.Unlock()
ag.startAgent(agency.agentTask)
......
......@@ -71,7 +71,7 @@ type Agent struct {
status int // Status of agent
ACL *ACL // agent communication
Logger *client.AgentLogger // logger object
MQTT *MQTT // mqtt object
MQTT *AgentMQTT // mqtt object
DF *client.AgentDF
logError *log.Logger
logInfo *log.Logger
......@@ -81,7 +81,7 @@ type Agent struct {
// newAgent creates a new agent
func newAgent(info schemas.AgentInfo, msgIn chan schemas.ACLMessage,
aclLookup func(int) (*ACL, error), logCol *client.LogCollector, logConfig schemas.LoggerConfig,
mqtt *mqttClient, dfActive bool, dfClient *client.DFClient, logErr *log.Logger,
mqttCol *mqttCollector, dfActive bool, dfClient *client.DFClient, logErr *log.Logger,
logInf *log.Logger) (ag *Agent) {
ag = &Agent{
id: info.ID,
......@@ -99,12 +99,16 @@ func newAgent(info schemas.AgentInfo, msgIn chan schemas.ACLMessage,
}
// in, out := ag.ACL.getCommDataChannels()
if logCol != nil {
ag.Logger = logCol.NewAgentLogger(ag.id, logConfig, ag.logError, ag.logInfo)
ag.Logger = logCol.NewAgentLogger(ag.id, ag.logError, ag.logInfo)
}
ag.ACL = newACL(info.ID, msgIn, aclLookup, ag.Logger, logErr, logInf)
ag.MQTT = newMQTT(ag.id, mqtt, ag.Logger, ag.logError, ag.logInfo)
ag.DF = client.NewAgentDF(ag.masID, ag.id, ag.nodeID, dfActive, dfClient, ag.logError,
ag.logInfo)
if mqttCol != nil {
ag.MQTT = mqttCol.newAgentMQTT(ag.id, ag.Logger, ag.logError, ag.logInfo)
}
if dfClient != nil {
ag.DF = client.NewAgentDF(ag.masID, ag.id, ag.nodeID, dfActive, dfClient, ag.logError,
ag.logInfo)
}
return
}
......
/*
Copyright 2020 Institute for Automation of Complex Power Systems,
E.ON Energy Research Center, RWTH Aachen University
This project is licensed under either of
- Apache License, Version 2.0
- MIT License
at your option.
Apache License, Version 2.0:
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
MIT License:
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package agency
import (
"errors"
"log"
"os"
"sync"
"time"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/client"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
)
// DF provides access to the functionality of the DF
type DF struct {
agentID int
masID int
nodeID int
mutex *sync.Mutex
registeredServices map[string]schemas.Service
active bool // indicates if df is active (switch via env)
dfClient *client.DFClient
logError *log.Logger
logInfo *log.Logger
}
// RegisterService registers a new service with the DF
func (df *DF) RegisterService(svc schemas.Service) (id string, err error) {
df.mutex.Lock()
if !df.active {
df.mutex.Unlock()
return
}
df.mutex.Unlock()
id = "-1"
if svc.Desc == "" {
err = errors.New("empty description not allowed")
return
}
df.mutex.Lock()
_, ok := df.registeredServices[svc.Desc]
df.mutex.Unlock()
if ok {
err = errors.New("service already registered")
return
}
df.mutex.Lock()
masID := df.masID
agentID := df.agentID
nodeID := df.nodeID
df.mutex.Unlock()
svc.MASID = masID
svc.AgentID = agentID
svc.NodeID = nodeID
svc.CreatedAt = time.Now()
svc.ChangedAt = svc.CreatedAt
svc, _, err = df.dfClient.PostSvc(masID, svc)
id = svc.GUID
if err != nil {
return
}
df.mutex.Lock()
df.registeredServices[svc.Desc] = svc
df.mutex.Unlock()
return
}
// SearchForService search for a service with given description
func (df *DF) SearchForService(desc string) (svc []schemas.Service, err error) {
df.mutex.Lock()
if !df.active {
df.mutex.Unlock()
return
}
masID := df.masID
df.mutex.Unlock()
var temp []schemas.Service
temp, _, err = df.dfClient.GetSvc(masID, desc)
if err != nil {
return
}
for i := range temp {
if temp[i].AgentID != df.agentID {
svc = append(svc, temp[i])
}
}
return
}
// SearchForLocalService search for a service with given description
func (df *DF) SearchForLocalService(desc string, dist float64) (svc []schemas.Service, err error) {
df.mutex.Lock()
if !df.active {
df.mutex.Unlock()
return
}
masID := df.masID
nodeID := df.nodeID
df.mutex.Unlock()
var temp []schemas.Service
temp, _, err = df.dfClient.GetLocalSvc(masID, desc, nodeID, dist)
if err != nil {
return
}
for i := range temp {
if temp[i].AgentID != df.agentID {
svc = append(svc, temp[i])
}
}
return
}
// DeregisterService registers a new service with the DF
func (df *DF) DeregisterService(svcID string) (err error) {
df.mutex.Lock()
if !df.active {
df.mutex.Unlock()
return
}
df.mutex.Unlock()
desc := ""
df.mutex.Lock()
masID := df.masID
for i := range df.registeredServices {
if df.registeredServices[i].GUID == svcID {
desc = i
break
}
}
df.mutex.Unlock()
if desc == "" {
err = errors.New("no such service")
return
}
df.mutex.Lock()
delete(df.registeredServices, desc)
df.mutex.Unlock()
_, err = df.dfClient.DeleteSvc(masID, svcID)
return
}
// newDF creates a new DF object
func newDF(masID int, agentID int, nodeID int, dfCli *client.DFClient, logErr *log.Logger,
logInf *log.Logger) (df *DF) {
df = &DF{
agentID: agentID,
masID: masID,
nodeID: nodeID,
mutex: &sync.Mutex{},
active: false,
logError: logErr,
logInfo: logInf,
}
act := os.Getenv("CLONEMAP_DF")
if act == "ON" {
df.active = true
}
df.registeredServices = make(map[string]schemas.Service)
return
}
// close closes the DF module
func (df *DF) close() {
for d := range df.registeredServices {
svc := df.registeredServices[d]
df.DeregisterService(svc.GUID)
}
df.mutex.Lock()
df.logInfo.Println("Closing DF of agent ", df.agentID)
df.active = false
df.mutex.Unlock()
}
......@@ -47,7 +47,6 @@ package agency
import (
"errors"
"log"
"os"
"strconv"
"sync"
......@@ -56,9 +55,189 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// MQTT provides functions to subscribe and publish via mqtt
type MQTT struct {
client *mqttClient
// mqttCollector is the agency client for mqtt
type mqttCollector struct {
client mqtt.Client // mqtt client
msgIn chan schemas.MQTTMessage // mqtt message inbox
name string // agency name
config schemas.MQTTConfig // indicates if mqtt is active (switch via env)
mutex *sync.Mutex // mutex for message inbox map
subscription map[string][]*AgentMQTT // map for subscription topics to agents' mqtt object
// numDeliverer int // number of go routines for delivery
logError *log.Logger
logInfo *log.Logger
}
// newMQTTCollector creates a new mqtt agency client
func newMQTTCollector(config schemas.MQTTConfig, name string, logErr *log.Logger,
logInf *log.Logger) (col *mqttCollector) {
col = &mqttCollector{
name: name,
mutex: &sync.Mutex{},
logError: logErr,
logInfo: logInf,
config: config,
}
col.msgIn = make(chan schemas.MQTTMessage, 1000)
col.subscription = make(map[string][]*AgentMQTT)
col.logInfo.Println("Created new MQTT client; status: ", col.config.Active)
col.init()
return
}
func (mqttCol *mqttCollector) init() (err error) {
if mqttCol.config.Active {
opts := mqtt.NewClientOptions().AddBroker("tcp://" + mqttCol.config.Host + ":" +
strconv.Itoa(mqttCol.config.Port)).SetClientID(mqttCol.name)
opts.SetDefaultPublishHandler(mqttCol.newIncomingMQTTMessage)
mqttCol.client = mqtt.NewClient(opts)
if token := mqttCol.client.Connect(); token.Wait() && token.Error() != nil {
err = errors.New("MQTTInitError")
return
}
for i := 0; i < 3; i++ {
go mqttCol.deliverMsgs()
}
// cli.numDeliverer = 3
}
return
}
func (mqttCol *mqttCollector) close() (err error) {
if mqttCol.config.Active {
mqttCol.logInfo.Println("Disconnecting MQTT client")
mqttCol.client.Disconnect(250)
}
err = nil
return
}
// newIncomingMQTTMessage adds message to channel for incoming messages
func (mqttCol *mqttCollector) newIncomingMQTTMessage(client mqtt.Client, msg mqtt.Message) {
var mqttMsg schemas.MQTTMessage
mqttMsg.Content = msg.Payload()
mqttMsg.Topic = msg.Topic()
mqttCol.msgIn <- mqttMsg
}
// subscribe subscribes to specified topics
func (mqttCol *mqttCollector) subscribe(mq *AgentMQTT, topic string, qos int) (err error) {
if !mqttCol.config.Active {
return
}
mqttCol.mutex.Lock()
ag, ok := mqttCol.subscription[topic]
mqttCol.mutex.Unlock()
if ok {
subscribed := false
for i := range ag {
if ag[i].agentID == mq.agentID {
subscribed = true
break
}
}
if !subscribed {
ag = append(ag, mq)
mqttCol.mutex.Lock()
mqttCol.subscription[topic] = ag
mqttCol.mutex.Unlock()
}
} else {
mqttCol.mutex.Lock()
token := mqttCol.client.Subscribe(topic, byte(qos), nil)
mqttCol.mutex.Unlock()
if token.Wait() && token.Error() != nil {
err = token.Error()
return
}
ag = make([]*AgentMQTT, 0)
ag = append(ag, mq)
mqttCol.mutex.Lock()
mqttCol.subscription[topic] = ag
mqttCol.mutex.Unlock()
}
// cli.mutex.Lock()
// numDel := len(cli.subscription) / 25
// if numDel > cli.numDeliverer {
// for i := 0; i < numDel-cli.numDeliverer; i++ {
// go cli.deliverMsgs()
// }
// cli.numDeliverer = numDel
// }
// cli.mutex.Unlock()
return
}
// unsubscribe to a topic
func (mqttCol *mqttCollector) unsubscribe(mq *AgentMQTT, topic string) (err error) {
if !mqttCol.config.Active {
return
}
mqttCol.mutex.Lock()
ag, ok := mqttCol.subscription[topic]
mqttCol.mutex.Unlock()
if !ok {
return
}
index := -1
for i := range ag {
if mq.agentID == ag[i].agentID {
index = i
break
}
}
if index == -1 {
return
}
if index == 0 && len(ag) == 1 {
// agent is the only one who has subscribed -> unsubscribe
delete(mqttCol.subscription, topic)
token := mqttCol.client.Unsubscribe(topic)
if token.Wait() && token.Error() != nil {
err = token.Error()
return
}
} else {
// remove agent from list of subscribed agents
ag[index] = ag[len(ag)-1]
ag[len(ag)-1] = nil
ag = ag[:len(ag)-1]
mqttCol.mutex.Lock()
mqttCol.subscription[topic] = ag
mqttCol.mutex.Unlock()
}
return
}
// publish sends a message
func (mqttCol *mqttCollector) publish(msg schemas.MQTTMessage, qos int) (err error) {
if mqttCol.config.Active {
token := mqttCol.client.Publish(msg.Topic, byte(qos), false, msg.Content)
token.Wait()
}
return
}
// deliverMsg delivers incoming messages to agents according to their topic
func (mqttCol *mqttCollector) deliverMsgs() {
var msg schemas.MQTTMessage
for {
msg = <-mqttCol.msgIn
mqttCol.mutex.Lock()
ag, ok := mqttCol.subscription[msg.Topic]
mqttCol.mutex.Unlock()
if ok {
for i := range ag {
ag[i].newIncomingMQTTMessage(msg)
}
}
}
}
// AgentMQTT provides functions to subscribe and publish via mqtt
type AgentMQTT struct {
collector *mqttCollector
mutex *sync.Mutex // mutex for message inbox map
subTopic map[string]interface{} // subscribed topics
msgInTopic map[string]chan schemas.MQTTMessage // message inbox for messages with specified topic
......@@ -70,17 +249,17 @@ type MQTT struct {
active bool
}
// newMQTT returns a new pubsub connector of type mqtt
func newMQTT(agentID int, cli *mqttClient, cmaplog *client.AgentLogger, logErr *log.Logger,
logInf *log.Logger) (mq *MQTT) {
mq = &MQTT{
client: cli,
mutex: &sync.Mutex{},
agentID: agentID,
logger: cmaplog,
logError: logErr,
logInfo: logInf,
active: true,
// newAgentMQTT returns a new pubsub connector of type mqtt
func (mqttCol *mqttCollector) newAgentMQTT(agentID int, cmaplog *client.AgentLogger,
logErr *log.Logger, logInf *log.Logger) (mq *AgentMQTT) {
mq = &AgentMQTT{
collector: mqttCol,
mutex: &sync.Mutex{},
agentID: agentID,
logger: cmaplog,
logError: logErr,
logInfo: logInf,
active: mqttCol.config.Active,
}
mq.subTopic = make(map[string]interface{})
mq.msgInTopic = make(map[string]chan schemas.MQTTMessage)
......@@ -89,7 +268,7 @@ func newMQTT(agentID int, cli *mqttClient, cmaplog *client.AgentLogger, logErr *
}
// close closes the mqtt
func (mq *MQTT) close() {
func (mq *AgentMQTT) close() {
for t := range mq.subTopic {
mq.Unsubscribe(t)
}
......@@ -100,7 +279,7 @@ func (mq *MQTT) close() {
}
// Subscribe subscribes to a topic
func (mq *MQTT) Subscribe(topic string, qos int) (err error) {
func (mq *AgentMQTT) Subscribe(topic string, qos int) (err error) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
......@@ -115,12 +294,12 @@ func (mq *MQTT) Subscribe(topic string, qos int) (err error) {
mq.mutex.Lock()
mq.subTopic[topic] = nil
mq.mutex.Unlock()
err = mq.client.subscribe(mq, topic, qos)
err = mq.collector.subscribe(mq, topic, qos)
return
}
// Unsubscribe unsubscribes a topic
func (mq *MQTT) Unsubscribe(topic string) (err error) {
func (mq *AgentMQTT) Unsubscribe(topic string) (err error) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
......@@ -135,12 +314,12 @@ func (mq *MQTT) Unsubscribe(topic string) (err error) {
mq.mutex.Lock()
delete(mq.subTopic, topic)
mq.mutex.Unlock()
err = mq.client.unsubscribe(mq, topic)
err = mq.collector.unsubscribe(mq, topic)
return
}
// SendMessage sends a message
func (mq *MQTT) SendMessage(msg schemas.MQTTMessage, qos int) (err error) {
func (mq *AgentMQTT) SendMessage(msg schemas.MQTTMessage, qos int) (err error) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
......@@ -148,7 +327,7 @@ func (mq *MQTT) SendMessage(msg schemas.MQTTMessage, qos int) (err error) {
return
}
mq.mutex.Unlock()
err = mq.client.publish(msg, qos)
err = mq.collector.publish(msg, qos)
if err != nil {
return
}
......@@ -157,7 +336,7 @@ func (mq *MQTT) SendMessage(msg schemas.MQTTMessage, qos int) (err error) {
}
// NewMessage returns a new initiaized message
func (mq *MQTT) NewMessage(topic string, content []byte) (msg schemas.MQTTMessage, err error) {
func (mq *AgentMQTT) NewMessage(topic string, content []byte) (msg schemas.MQTTMessage, err error) {
msg.Topic = topic
msg.Content = content
err = nil
......@@ -165,7 +344,7 @@ func (mq *MQTT) NewMessage(topic string, content []byte) (msg schemas.MQTTMessag
}
// RecvMessages retrieves all messages since last call of this function
func (mq *MQTT) RecvMessages() (num int, msgs []schemas.MQTTMessage, err error) {
func (mq *AgentMQTT) RecvMessages() (num int, msgs []schemas.MQTTMessage, err error) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
......@@ -187,7 +366,7 @@ func (mq *MQTT) RecvMessages() (num int, msgs []schemas.MQTTMessage, err error)
}
// RecvMessageWait retrieves next message and blocks if no message is available
func (mq *MQTT) RecvMessageWait() (msg schemas.MQTTMessage, err error) {
func (mq *AgentMQTT) RecvMessageWait() (msg schemas.MQTTMessage, err error) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
......@@ -201,7 +380,7 @@ func (mq *MQTT) RecvMessageWait() (msg schemas.MQTTMessage, err error) {
}