Commit ea5433fa authored by Stefan Dähling's avatar Stefan Dähling
Browse files

dont use byte array in schemas

parent 9448583e
Pipeline #252286 passed with stages
in 3 minutes and 11 seconds
...@@ -61,10 +61,10 @@ func main() { ...@@ -61,10 +61,10 @@ func main() {
func task(ag *agency.Agent) (err error) { func task(ag *agency.Agent) (err error) {
id := ag.GetAgentID() id := ag.GetAgentID()
ag.Logger.NewLog("app", "This is agent "+strconv.Itoa(id), nil) ag.Logger.NewLog("app", "This is agent "+strconv.Itoa(id), "")
msg, _ := ag.ACL.NewMessage((id+1)%2, 0, 0, "Message from agent "+strconv.Itoa(id)) msg, _ := ag.ACL.NewMessage((id+1)%2, 0, 0, "Message from agent "+strconv.Itoa(id))
ag.ACL.SendMessage(msg) ag.ACL.SendMessage(msg)
msg, _ = ag.ACL.RecvMessageWait() msg, _ = ag.ACL.RecvMessageWait()
ag.Logger.NewLog("app", msg.Content, nil) ag.Logger.NewLog("app", msg.Content, "")
return return
} }
{"spec":{"id":0,"name":"example","registry":"registry.git.rwth-aachen.de/acs/public/cloud/mas/clonemap","image":"agency","secret":"","agentsperagency":1,"logging":true,"mqtt":true,"df":true,"log":{"msg":true,"app":true,"status":true,"debug":true},"uptime":"0001-01-01T00:00:00Z"},"agents":[{"masid":0,"agencyid":0,"nodeid":0,"id":0,"name":"ExampleAgent","type":"example","custom":""},{"masid":0,"agencyid":0,"nodeid":0,"id":0,"name":"ExampleAgent","type":"example","custom":""}],"graph":{"node":null,"edge":null}} {"spec":{"id":0,"name":"example","registry":"registry.git.rwth-aachen.de/acs/public/cloud/mas/clonemap","image":"agencypy","secret":"","agentsperagency":1,"logging":true,"mqtt":true,"df":true,"log":{"msg":true,"app":true,"status":true,"debug":true},"uptime":"0001-01-01T00:00:00Z"},"agents":[{"masid":0,"agencyid":0,"nodeid":0,"id":0,"name":"ExampleAgent","type":"example","custom":""},{"masid":0,"agencyid":0,"nodeid":0,"id":0,"name":"ExampleAgent","type":"example","custom":""}],"graph":{"node":null,"edge":null}}
\ No newline at end of file \ No newline at end of file
...@@ -69,7 +69,7 @@ type CustomAgentData struct { ...@@ -69,7 +69,7 @@ type CustomAgentData struct {
// Task switches the right benchmark task // Task switches the right benchmark task
func Task(ag *agency.Agent) (err error) { func Task(ag *agency.Agent) (err error) {
var config CustomAgentData var config CustomAgentData
err = json.Unmarshal(ag.GetCustomData(), &config) err = json.Unmarshal([]byte(ag.GetCustomData()), &config)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
return return
...@@ -97,7 +97,7 @@ func Task(ag *agency.Agent) (err error) { ...@@ -97,7 +97,7 @@ func Task(ag *agency.Agent) (err error) {
func pingPong(ag *agency.Agent, config CustomAgentData) (err error) { func pingPong(ag *agency.Agent, config CustomAgentData) (err error) {
// d := time.Until(config.StartTime) // d := time.Until(config.StartTime)
err = ag.Logger.NewLog("status", "Starting PingPong Behavior; Peer: "+ err = ag.Logger.NewLog("status", "Starting PingPong Behavior; Peer: "+
strconv.Itoa(config.PeerID)+", Start: "+strconv.FormatBool(config.Start), nil) strconv.Itoa(config.PeerID)+", Start: "+strconv.FormatBool(config.Start), "")
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
return return
...@@ -158,7 +158,7 @@ func pingPong(ag *agency.Agent, config CustomAgentData) (err error) { ...@@ -158,7 +158,7 @@ func pingPong(ag *agency.Agent, config CustomAgentData) (err error) {
avg = sum / len(rtts) avg = sum / len(rtts)
js, _ := json.Marshal(rtts) js, _ := json.Marshal(rtts)
err = ag.Logger.NewLog("status", "RTT in µs: min: "+strconv.Itoa(min)+", max: "+ err = ag.Logger.NewLog("status", "RTT in µs: min: "+strconv.Itoa(min)+", max: "+
strconv.Itoa(max)+", avg: "+strconv.Itoa(avg), js) strconv.Itoa(max)+", avg: "+strconv.Itoa(avg), string(js))
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
} }
...@@ -198,7 +198,7 @@ func pingPong(ag *agency.Agent, config CustomAgentData) (err error) { ...@@ -198,7 +198,7 @@ func pingPong(ag *agency.Agent, config CustomAgentData) (err error) {
// cpuLoad is the task function for a benchmark with high cpu load // cpuLoad is the task function for a benchmark with high cpu load
func cpuLoad(ag *agency.Agent, config CustomAgentData) (err error) { func cpuLoad(ag *agency.Agent, config CustomAgentData) (err error) {
err = ag.Logger.NewLog("status", "Starting CPU Load Behavior; Peer: "+ err = ag.Logger.NewLog("status", "Starting CPU Load Behavior; Peer: "+
strconv.Itoa(config.PeerID)+", Start: "+strconv.FormatBool(config.Start), nil) strconv.Itoa(config.PeerID)+", Start: "+strconv.FormatBool(config.Start), "")
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
return return
...@@ -248,7 +248,7 @@ func communication(ag *agency.Agent, config CustomAgentData) (err error) { ...@@ -248,7 +248,7 @@ func communication(ag *agency.Agent, config CustomAgentData) (err error) {
func dfBench(ag *agency.Agent, config CustomAgentData) (err error) { func dfBench(ag *agency.Agent, config CustomAgentData) (err error) {
err = ag.Logger.NewLog("status", "Starting DF Behavior; Peer: "+ err = ag.Logger.NewLog("status", "Starting DF Behavior; Peer: "+
strconv.Itoa(config.PeerID)+", Start: "+strconv.FormatBool(config.Start), nil) strconv.Itoa(config.PeerID)+", Start: "+strconv.FormatBool(config.Start), "")
if err != nil { if err != nil {
return return
} }
...@@ -299,29 +299,29 @@ func dfBench(ag *agency.Agent, config CustomAgentData) (err error) { ...@@ -299,29 +299,29 @@ func dfBench(ag *agency.Agent, config CustomAgentData) (err error) {
} }
js, _ := json.Marshal(rtts) js, _ := json.Marshal(rtts)
err = ag.Logger.NewLog("status", "Total time in µs: "+strconv.Itoa(rtts[10]), js) err = ag.Logger.NewLog("status", "Total time in µs: "+strconv.Itoa(rtts[10]), string(js))
return return
} }
func stateTest(ag *agency.Agent, config CustomAgentData) (err error) { func stateTest(ag *agency.Agent, config CustomAgentData) (err error) {
time.Sleep(time.Second * 10) time.Sleep(time.Second * 10)
err = ag.Logger.NewLog("status", "Starting state test Behavior", nil) err = ag.Logger.NewLog("status", "Starting state test Behavior", "")
if err != nil { if err != nil {
return return
} }
var state []byte var state string
state, err = ag.Logger.RestoreState() state, err = ag.Logger.RestoreState()
if err != nil { if err != nil {
return return
} }
if state == nil { if state == "" {
err = ag.Logger.NewLog("status", "No previous state", nil) err = ag.Logger.NewLog("status", "No previous state", "")
if err != nil { if err != nil {
return return
} }
} }
state = []byte("test state") state = "test state"
err = ag.Logger.UpdateState(state) err = ag.Logger.UpdateState(state)
if err != nil { if err != nil {
return return
...@@ -330,6 +330,6 @@ func stateTest(ag *agency.Agent, config CustomAgentData) (err error) { ...@@ -330,6 +330,6 @@ func stateTest(ag *agency.Agent, config CustomAgentData) (err error) {
if err != nil { if err != nil {
return return
} }
err = ag.Logger.NewLog("status", "State: "+string(state), nil) err = ag.Logger.NewLog("status", "State: "+string(state), "")
return return
} }
...@@ -63,7 +63,7 @@ type Agent struct { ...@@ -63,7 +63,7 @@ type Agent struct {
name string // Name of agent name string // Name of agent
aType string // Type of agent aType string // Type of agent
aSubtype string // Subtype of agent aSubtype string // Subtype of agent
custom []byte // custom data custom string // custom data
masID int // ID of MAS agent is belongs to masID int // ID of MAS agent is belongs to
status int // Status of agent status int // Status of agent
ACL *ACL // agent communication ACL *ACL // agent communication
...@@ -126,7 +126,7 @@ func (agent *Agent) GetAgentName() (ret string) { ...@@ -126,7 +126,7 @@ func (agent *Agent) GetAgentName() (ret string) {
} }
// GetCustomData returns custom data // GetCustomData returns custom data
func (agent *Agent) GetCustomData() (ret []byte) { func (agent *Agent) GetCustomData() (ret string) {
ret = agent.custom ret = agent.custom
return return
} }
......
...@@ -66,7 +66,7 @@ type Logger struct { ...@@ -66,7 +66,7 @@ type Logger struct {
} }
// NewLog sends a new logging message to the logging service // NewLog sends a new logging message to the logging service
func (log *Logger) NewLog(logType string, message string, data []byte) (err error) { func (log *Logger) NewLog(logType string, message string, data string) (err error) {
if logType != "error" && logType != "debug" && logType != "status" && logType != "msg" && if logType != "error" && logType != "debug" && logType != "status" && logType != "msg" &&
logType != "app" { logType != "app" {
err = errors.New("UnknownLogType") err = errors.New("UnknownLogType")
...@@ -93,7 +93,7 @@ func (log *Logger) NewLog(logType string, message string, data []byte) (err erro ...@@ -93,7 +93,7 @@ func (log *Logger) NewLog(logType string, message string, data []byte) (err erro
} }
// UpdateState overrides the state stored in database // UpdateState overrides the state stored in database
func (log *Logger) UpdateState(state []byte) (err error) { func (log *Logger) UpdateState(state string) (err error) {
agState := schemas.State{ agState := schemas.State{
MASID: log.client.masID, MASID: log.client.masID,
AgentID: log.agentID, AgentID: log.agentID,
...@@ -104,7 +104,7 @@ func (log *Logger) UpdateState(state []byte) (err error) { ...@@ -104,7 +104,7 @@ func (log *Logger) UpdateState(state []byte) (err error) {
} }
// RestoreState loads state saved in database and return it // RestoreState loads state saved in database and return it
func (log *Logger) RestoreState() (state []byte, err error) { func (log *Logger) RestoreState() (state string, err error) {
var agState schemas.State var agState schemas.State
agState, _, err = client.GetState(log.client.masID, log.agentID) agState, _, err = client.GetState(log.client.masID, log.agentID)
state = agState.State state = agState.State
......
...@@ -95,7 +95,7 @@ func (mq *MQTT) SendMessage(msg schemas.MQTTMessage, qos int) (err error) { ...@@ -95,7 +95,7 @@ func (mq *MQTT) SendMessage(msg schemas.MQTTMessage, qos int) (err error) {
if err != nil { if err != nil {
return return
} }
err = mq.cmapLogger.NewLog("msg", "Sent MQTT message: "+string(msg.Content), msg.Content) err = mq.cmapLogger.NewLog("msg", "Sent MQTT message: "+string(msg.Content), string(msg.Content))
return return
} }
...@@ -131,7 +131,7 @@ func (mq *MQTT) RecvMessageWait() (msg schemas.MQTTMessage, err error) { ...@@ -131,7 +131,7 @@ func (mq *MQTT) RecvMessageWait() (msg schemas.MQTTMessage, err error) {
// newIncomingMQTTMessage adds message to channel for incoming messages // newIncomingMQTTMessage adds message to channel for incoming messages
func (mq *MQTT) newIncomingMQTTMessage(msg schemas.MQTTMessage) { func (mq *MQTT) newIncomingMQTTMessage(msg schemas.MQTTMessage) {
mq.cmapLogger.NewLog("msg", "Received MQTT message: "+string(msg.Content), msg.Content) mq.cmapLogger.NewLog("msg", "Received MQTT message: "+string(msg.Content), string(msg.Content))
mq.mutex.Lock() mq.mutex.Lock()
inbox, ok := mq.msgInTopic[msg.Topic] inbox, ok := mq.msgInTopic[msg.Topic]
mq.mutex.Unlock() mq.mutex.Unlock()
......
...@@ -92,6 +92,7 @@ func (stub *LocalStub) createAMS() (err error) { ...@@ -92,6 +92,7 @@ func (stub *LocalStub) createAMS() (err error) {
com += " -e CLONEMAP_DEPLOYMENT_TYPE=\"local\"" com += " -e CLONEMAP_DEPLOYMENT_TYPE=\"local\""
com += " -e CLONEMAP_STORAGE_TYPE=\"local\"" com += " -e CLONEMAP_STORAGE_TYPE=\"local\""
com += " -e CLONEMAP_SUFFIX=\".clonemap\"" com += " -e CLONEMAP_SUFFIX=\".clonemap\""
com += " -e CLONEMAP_LOG_LEVEL=\"error\""
com += " ams" com += " ams"
cmd := exec.Command("bash", "-c", com) cmd := exec.Command("bash", "-c", com)
cmdOut, err := cmd.Output() cmdOut, err := cmd.Output()
...@@ -128,6 +129,7 @@ func (stub *LocalStub) createLogger() (err error) { ...@@ -128,6 +129,7 @@ func (stub *LocalStub) createLogger() (err error) {
com += " --hostname=logger" com += " --hostname=logger"
com += " --network=clonemap-net" com += " --network=clonemap-net"
com += " -e CLONEMAP_DEPLOYMENT_TYPE=\"local\"" com += " -e CLONEMAP_DEPLOYMENT_TYPE=\"local\""
com += " -e CLONEMAP_LOG_LEVEL=\"error\""
com += " logger" com += " logger"
cmd := exec.Command("bash", "-c", com) cmd := exec.Command("bash", "-c", com)
cmdOut, err := cmd.Output() cmdOut, err := cmd.Output()
...@@ -163,6 +165,7 @@ func (stub *LocalStub) createDF() (err error) { ...@@ -163,6 +165,7 @@ func (stub *LocalStub) createDF() (err error) {
com += " --hostname=df" com += " --hostname=df"
com += " --network=clonemap-net" com += " --network=clonemap-net"
com += " -e CLONEMAP_DEPLOYMENT_TYPE=\"local\"" com += " -e CLONEMAP_DEPLOYMENT_TYPE=\"local\""
com += " -e CLONEMAP_LOG_LEVEL=\"error\""
com += " df" com += " df"
cmd := exec.Command("bash", "-c", com) cmd := exec.Command("bash", "-c", com)
cmdOut, err := cmd.Output() cmdOut, err := cmd.Output()
...@@ -249,6 +252,7 @@ func (stub *LocalStub) createAgency(image string, agents int, masID int, agencyI ...@@ -249,6 +252,7 @@ func (stub *LocalStub) createAgency(image string, agents int, masID int, agencyI
} else { } else {
com += " -e CLONEMAP_DF=\"OFF\" " com += " -e CLONEMAP_DF=\"OFF\" "
} }
com += " -e CLONEMAP_LOG_LEVEL=\"error\" "
com += image com += image
cmd := exec.Command("bash", "-c", com) cmd := exec.Command("bash", "-c", com)
......
...@@ -46,10 +46,11 @@ package logger ...@@ -46,10 +46,11 @@ package logger
import ( import (
"errors" "errors"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
"sort" "sort"
"sync" "sync"
"time" "time"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
) )
// storage is the interface for logging and state storage (either local or in db) // storage is the interface for logging and state storage (either local or in db)
...@@ -88,7 +89,7 @@ type storage interface { ...@@ -88,7 +89,7 @@ type storage interface {
type LogMessage struct { type LogMessage struct {
Timestamp time.Time Timestamp time.Time
Message string Message string
Data []byte Data string
} }
// IDs of agents and mas correspond to index in slices! // IDs of agents and mas correspond to index in slices!
...@@ -385,7 +386,7 @@ func (stor *localStorage) deleteAgentState(masID int, agentID int) (err error) { ...@@ -385,7 +386,7 @@ func (stor *localStorage) deleteAgentState(masID int, agentID int) (err error) {
stor.mutex.Lock() stor.mutex.Lock()
if masID < len(stor.mas) { if masID < len(stor.mas) {
if agentID < len(stor.mas[masID].agents) { if agentID < len(stor.mas[masID].agents) {
stor.mas[masID].agents[agentID].state.State = nil stor.mas[masID].agents[agentID].state.State = ""
} }
} }
stor.mutex.Unlock() stor.mutex.Unlock()
......
...@@ -81,7 +81,7 @@ type AgentSpec struct { ...@@ -81,7 +81,7 @@ type AgentSpec struct {
Name string `json:"name,omitempty"` // name/description of agent Name string `json:"name,omitempty"` // name/description of agent
AType string `json:"type,omitempty"` // type of agent (application dependent) AType string `json:"type,omitempty"` // type of agent (application dependent)
ASubtype string `json:"subtype,omitempty"` // subtype of agent (application dependent) ASubtype string `json:"subtype,omitempty"` // subtype of agent (application dependent)
Custom []byte `json:"custom,omitempty"` // custom configuration data Custom string `json:"custom,omitempty"` // custom configuration data
} }
// Address holds the address information of an agent // Address holds the address information of an agent
......
...@@ -63,7 +63,7 @@ type LogMessage struct { ...@@ -63,7 +63,7 @@ type LogMessage struct {
Timestamp time.Time `json:"timestamp"` // time of message Timestamp time.Time `json:"timestamp"` // time of message
LogType string `json:"logtype"` // log type (error, debug, msg, status, app) LogType string `json:"logtype"` // log type (error, debug, msg, status, app)
Message string `json:"msg"` // log message Message string `json:"msg"` // log message
AdditionalData []byte `json:"data,omitempty"` // additional information e.g in json AdditionalData string `json:"data,omitempty"` // additional information e.g in json
} }
// State contains the state of an agent as byte array (json) // State contains the state of an agent as byte array (json)
...@@ -71,7 +71,7 @@ type State struct { ...@@ -71,7 +71,7 @@ type State struct {
MASID int `json:"masid"` // ID of MAS agent runs in MASID int `json:"masid"` // ID of MAS agent runs in
AgentID int `json:"agentid"` // ID of agent AgentID int `json:"agentid"` // ID of agent
Timestamp time.Time `json:"timestamp"` // time of state Timestamp time.Time `json:"timestamp"` // time of state
State []byte `json:"state"` // State State string `json:"state"` // State
} }
// Communication contains information regarding communication with another agent // Communication contains information regarding communication with another agent
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment