Commit 759c6fe9 authored by Stefan Dähling's avatar Stefan Dähling
Browse files

add custom config update behavior

parent c03af5c1
Pipeline #434779 passed with stages
in 9 minutes and 7 seconds
......@@ -61,8 +61,15 @@ func main() {
}
func task(ag *agency.Agent) (err error) {
beh, err := ag.NewCustomUpdateBehavior(customUpdateHandler)
beh.Start()
time.Sleep(2 * time.Second)
id := ag.GetAgentID()
ag.Logger.NewLog("app", "This is agent "+strconv.Itoa(id), "")
return
}
func customUpdateHandler(custom string) (err error) {
fmt.Println(custom)
return
}
......@@ -48,6 +48,7 @@ THE SOFTWARE.
package agency
import (
"errors"
"log"
"sync"
......@@ -58,22 +59,23 @@ import (
// Agent holds information about an agent and implements functionality for agent execution
type Agent struct {
mutex *sync.Mutex
id int // unique id of agent
nodeID int
name string // Name of agent
aType string // Type of agent
aSubtype string // Subtype of agent
custom string // custom data
masID int // ID of MAS agent is belongs to
status int // Status of agent
ACL *ACL // agent communication
Logger *Logger // logger object
MQTT *MQTT // mqtt object
DF *DF
logError *log.Logger
logInfo *log.Logger
active bool
mutex *sync.Mutex
id int // unique id of agent
nodeID int
name string // Name of agent
aType string // Type of agent
aSubtype string // Subtype of agent
custom string // custom data
customChan chan string // channel for custom update behavior
masID int // ID of MAS agent is belongs to
status int // Status of agent
ACL *ACL // agent communication
Logger *Logger // logger object
MQTT *MQTT // mqtt object
DF *DF
logError *log.Logger
logInfo *log.Logger
active bool
}
// newAgent creates a new agent
......@@ -81,18 +83,19 @@ func newAgent(info schemas.AgentInfo, msgIn chan schemas.ACLMessage,
aclLookup func(int) (*ACL, error), log *logHandler, logConfig schemas.LoggerConfig,
mqtt *mqttClient, dfClient *dfclient.Client, logErr *log.Logger, logInf *log.Logger) (ag *Agent) {
ag = &Agent{
id: info.ID,
nodeID: info.Spec.NodeID,
name: info.Spec.Name,
aType: info.Spec.AType,
aSubtype: info.Spec.ASubtype,
masID: info.MASID,
custom: info.Spec.Custom,
mutex: &sync.Mutex{},
ACL: newACL(info.ID, msgIn, aclLookup, logErr, logInf),
logError: logErr,
logInfo: logInf,
active: true,
id: info.ID,
nodeID: info.Spec.NodeID,
name: info.Spec.Name,
aType: info.Spec.AType,
aSubtype: info.Spec.ASubtype,
masID: info.MASID,
custom: info.Spec.Custom,
customChan: nil,
mutex: &sync.Mutex{},
ACL: newACL(info.ID, msgIn, aclLookup, logErr, logInf),
logError: logErr,
logInfo: logInf,
active: true,
}
// in, out := ag.ACL.getCommDataChannels()
ag.Logger = newLogger(ag.id, log, logConfig, ag.logError, ag.logInfo)
......@@ -142,15 +145,45 @@ func (agent *Agent) GetCustomData() (ret string) {
return
}
// registerCustomUpdateChannel sets the channel for a custom config update behavior if not already
// set
func (agent *Agent) registerCustomUpdateChannel(custChan chan string) (err error) {
agent.mutex.Lock()
if !agent.active {
agent.mutex.Unlock()
return errors.New("agent not active")
}
if agent.customChan != nil {
agent.mutex.Unlock()
return errors.New("custom config update is already handled")
}
agent.customChan = custChan
agent.mutex.Unlock()
return
}
// updateCustomData updates custom data
func (agent *Agent) updateCustomData(custom string) {
agent.mutex.Lock()
agent.custom = custom
agent.mutex.Unlock()
if agent.customChan != nil {
agent.mutex.Unlock()
agent.customChan <- custom
} else {
agent.mutex.Unlock()
}
agent.logInfo.Println("Updated config of agent ", agent.GetAgentID())
return
}
// deregisterCustomUpdateChannel deletes the channel for a custom config update behavior
func (agent *Agent) deregisterCustomUpdateChannel() (err error) {
agent.mutex.Lock()
agent.customChan = nil
agent.mutex.Unlock()
return
}
// Terminate terminates the agent
func (agent *Agent) Terminate() {
agent.logInfo.Println("Terminating agent ", agent.GetAgentID())
......
......@@ -268,3 +268,70 @@ func (periodBehavior *periodicBehavior) Stop() {
// stop message handling
periodBehavior.ctrl <- -1
}
// customUpdateBehavior describes an action that should be performed when the custom configuration
// is updated
type customUpdateBehavior struct {
ag *Agent // agent
handle func(custom string) error // handler function
ctrl chan int // control signals
customIn chan string // custom config inbox
logInfo *log.Logger
}
// NewCustomUpdateBehavior creates a new handler for custom config update actions
func (agent *Agent) NewCustomUpdateBehavior(
handle func(custom string) error) (behavior Behavior, err error) {
if handle == nil {
err = errors.New("illegal handler")
return
}
custUpBehavior := &customUpdateBehavior{
ag: agent,
handle: handle,
ctrl: make(chan int, 10),
customIn: make(chan string, 10),
logInfo: agent.logInfo,
}
behavior = custUpBehavior
return
}
// Start initiates the handling of messages
func (custUpBehavior *customUpdateBehavior) Start() {
custUpBehavior.ag.registerCustomUpdateChannel(custUpBehavior.customIn)
// execute
go custUpBehavior.task()
}
// task performs the execution of the handle function
func (custUpBehavior *customUpdateBehavior) task() {
custUpBehavior.logInfo.Println("Starting custom configuration update behavior for agent ",
custUpBehavior.ag.GetAgentID())
for {
custUpBehavior.ag.mutex.Lock()
act := custUpBehavior.ag.active
custUpBehavior.ag.mutex.Unlock()
if !act {
custUpBehavior.Stop()
}
select {
case custom := <-custUpBehavior.customIn:
custUpBehavior.handle(custom)
case command := <-custUpBehavior.ctrl:
switch command {
case -1:
custUpBehavior.logInfo.Println("Terminating custom configuration update ",
"behavior for agent ", custUpBehavior.ag.GetAgentID())
return
}
}
}
}
// Stop terminates the behavior
func (custUpBehavior *customUpdateBehavior) Stop() {
custUpBehavior.ag.deregisterCustomUpdateChannel()
// stop behavior
custUpBehavior.ctrl <- -1
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment