Commit b292dd0d authored by Qianwen's avatar Qianwen
Browse files

add statistics

parent 087dc483
Pipeline #486446 passed with stages
in 3 minutes and 21 seconds
......@@ -59,15 +59,14 @@ RUN cd cmd/frontend; CGO_ENABLED=0 GOOS=linux go build -ldflags '-s' -o frontend
FROM node:14.15.1 AS ANGULAR_BUILD
RUN npm install -g @angular/cli@11.0.7
COPY web web
WORKDIR web
WORKDIR /web
RUN npm install && ng build --prod
FROM alpine:latest
WORKDIR /root/
#RUN apk add --update netbase ca-certificates
COPY --from=ANGULAR_BUILD /web/dist/clonemap-frontend/* ./web/
COPY --from=ANGULAR_BUILD /web/dist/clonemap-frontend ./web
COPY --from=fe_builder /clonemap/frontend .
EXPOSE 30013
CMD ["./frontend"]
......@@ -56,22 +56,86 @@ import (
)
func main() {
err := agency.StartAgency(task)
err := agency.StartAgency(task_test)
if err != nil {
fmt.Println(err)
}
}
func display(msg schemas.MQTTMessage) (err error) {
time.Sleep(10 * time.Second)
fmt.Println(string(msg.Content))
return
}
func handleDefault(msg schemas.ACLMessage) (err error) {
fmt.Println(string(msg.Content))
return
}
func task_test(ag *agency.Agent) (err error) {
id := ag.GetAgentID()
// agent 0 subsribes the topic1
if id == 0 {
ag.MQTT.Subscribe("topic1", 1)
behMQTT, err := ag.NewMQTTTopicBehavior("topic1", display)
if err == nil {
behMQTT.Start()
}
}
// agent 1 publishes the topic1
if id == 1 {
for i := 0; i < 10; i++ {
time.Sleep(5 * time.Second)
msg := "test message" + strconv.Itoa(i)
MQTTMsg, err := ag.MQTT.NewMessage("topic1", []byte(msg))
if err == nil {
ag.MQTT.SendMessage(MQTTMsg, 1)
}
}
}
// new protocol behavior
/* handlePerformative := make(map[int]func(schemas.ACLMessage) error)
handlePerformative[0] = handleDefault
behPro, err := ag.NewMessageBehavior(0, handlePerformative, handleDefault)
if err != nil {
fmt.Println("protocol started with error")
ag.Logger.NewLog("beh", "protocol error", "")
} else {
behPro.Start()
time.Sleep(30 * time.Second)
} */
return
}
func task(ag *agency.Agent) (err error) {
time.Sleep(2 * time.Second)
time.Sleep(10 * time.Second)
id := ag.GetAgentID()
recv := (id + 1) % 2
msg, _ := ag.ACL.NewMessage(recv, 0, 0, "test message")
ag.ACL.SendMessage(msg)
ag.Logger.NewLog("app", "This is agent "+strconv.Itoa(id), "")
time.Sleep(2 * time.Second)
ag.Logger.NewLog("beh", "This is the behavior of the agent"+strconv.Itoa(id), "")
ag.Logger.NewLog("debug", "This is the debug of the agent"+strconv.Itoa(id), "")
// sends 40 messages randomly to other agents
for i := 0; i < 40; i++ {
interval := rand.Intn(5)
time.Sleep(time.Duration(interval) * time.Second)
recv := rand.Intn(20)
if recv == id {
continue
}
msg, _ := ag.ACL.NewMessage(recv, 0, 0, "test message")
ag.ACL.SendMessage(msg)
}
// app logs
cnt := rand.Intn(10)
for i := 0; i < cnt; i++ {
ag.Logger.NewLog("app", "This is agent "+strconv.Itoa(id), "")
time.Sleep(2 * time.Second)
}
// service
svc := schemas.Service{
Desc: "agent" + strconv.Itoa(id),
}
......@@ -81,10 +145,10 @@ func task(ag *agency.Agent) (err error) {
}
for i := 0; i < 5; i++ {
time.Sleep(2 * time.Second)
/* idx := rand.Intn(4) + 1 */
for idx := 1; idx < 5; idx++ {
ag.Logger.NewLogSeries("type"+strconv.Itoa(idx), rand.Float64())
}
}
return
}
......@@ -348,6 +348,7 @@ spec:
containers:
- name: ams-container
image: registry.git.rwth-aachen.de/acs/public/cloud/mas/clonemap/ams
imagePullPolicy: Always
env:
- name: CLONEMAP_DEPLOYMENT_TYPE
value: "production"
......@@ -392,6 +393,8 @@ data:
CREATE TABLE clonemap.logging_debug ( masid int, agentid int, t timestamp, log varchar, PRIMARY KEY ((masid, agentid), t)) WITH CLUSTERING ORDER BY (t ASC);
CREATE TABLE clonemap.logging_beh ( masid int, agentid int, t timestamp, log varchar, PRIMARY KEY ((masid, agentid), t)) WITH CLUSTERING ORDER BY (t ASC);
CREATE TABLE clonemap.logging_series ( masid int, agentid int, name varchar, t timestamp, series varchar, PRIMARY KEY ((masid, agentid), name, t)) WITH CLUSTERING ORDER BY (name ASC, t ASC);
CREATE TABLE clonemap.beh_statis ( masid int, agentid int, behType varchar, start timestamp, end timestamp, duration int, PRIMARY KEY ((masid, agentid, behType), start)) WITH CLUSTERING ORDER BY (start ASC);
CREATE TABLE clonemap.heatmap(masid int, sender int, receiver int, count counter, PRIMARY KEY(masid, sender, receiver));
CREATE TABLE clonemap.state ( masid int, agentid int, state varchar, PRIMARY KEY (masid, agentid));
EOF
......@@ -564,6 +567,7 @@ spec:
containers:
- name: logger-container
image: registry.git.rwth-aachen.de/acs/public/cloud/mas/clonemap/logger
imagePullPolicy: Always
env:
- name: CLONEMAP_DEPLOYMENT_TYPE
value: "production"
......@@ -698,6 +702,7 @@ spec:
containers:
- name: fe-container
image: registry.git.rwth-aachen.de/acs/public/cloud/mas/clonemap/frontend
imagePullPolicy: Always
env:
- name: CLONEMAP_DEPLOYMENT_TYPE
value: "production"
......
......@@ -40,7 +40,7 @@
"name":"test3"
},
{
"nodeid":1,
"nodeid":0,
"name":"test4"
},
{
......@@ -56,11 +56,11 @@
"name":"test7"
},
{
"nodeid":2,
"nodeid":1,
"name":"test8"
},
{
"nodeid":2,
"nodeid":1,
"name":"test9"
},
{
......@@ -72,15 +72,15 @@
"name":"test11"
},
{
"nodeid":3,
"nodeid":2,
"name":"test12"
},
{
"nodeid":3,
"nodeid":2,
"name":"test13"
},
{
"nodeid":3,
"nodeid":2,
"name":"test14"
},
{
......@@ -88,19 +88,19 @@
"name":"test15"
},
{
"nodeid":4,
"nodeid":3,
"name":"test16"
},
{
"nodeid":4,
"nodeid":3,
"name":"test17"
},
{
"nodeid":4,
"nodeid":3,
"name":"test18"
},
{
"nodeid":4,
"nodeid":3,
"name":"test19"
}
]
......@@ -109,24 +109,16 @@
"graph":{
"node":[
{
"id": 0,
"agents": [0, 1, 2, 3]
"id": 0
},
{
"id": 1,
"agents": [4, 5, 6, 7]
"id": 1
},
{
"id": 2,
"agents": [8, 9, 10, 11]
"id": 2
},
{
"id": 3,
"agents": [12, 13, 14, 15]
},
{
"id": 4,
"agents": [16, 17, 18, 19]
"id": 3
}
],
"edge":[
......@@ -147,14 +139,9 @@
},
{
"n1": 3,
"n2": 4,
"weight": 2
},
{
"n1": 4,
"n2": 0,
"weight": 2
}
]
}
}
\ No newline at end of file
}
\ No newline at end of file
{
"config":{
"name":"test",
"agentsperagency":2,
"mqtt":{
"active":true
},
"df":{
"active":true
},
"logger":{
"active":true,
"msg":true,
"app":true,
"status":true,
"debug":true,
"beh": true
}
},
"imagegroups":[
{
"config":{
"image": "registry.git.rwth-aachen.de/acs/public/cloud/mas/clonemap/agency_test"
},
"agents":[
{
"nodeid":0,
"name":"test0"
},
{
"nodeid":1,
"name":"test1"
}
]
}
],
"graph":{
"node":null,
"edge":null
}
}
\ No newline at end of file
import json
import numpy as np
import datetime
curr = datetime.datetime.now()
topics = ["error", "debug", "msg", "status", "app"]
data = []
for x in range(0, 10):
for topic in topics:
# select the agentid randomly from 0 to 9
agentid = np.random.randint(0, 5)
interval = np.random.randint(1, 60 * 60 * 24 * 30)
timestamp = curr - datetime.timedelta(seconds=interval)
#topic = np.random.choice(topics)
logMessage = {
"masid": 0,
"agentid": agentid,
"timestamp": timestamp,
"topic": topic,
"msg": "msg",
"data": ""
}
if topic == "msg":
logMessage["msg"] = "ACL send"
agentids = [0, 1, 2, 3, 4]
agentids.remove(agentid)
receiver = np.random.choice(agentids)
logMessage["data"] = "Sender: " + str(agentid) + ";Receiver: " + str(receiver) + ";Timestamp: " + logMessage["timestamp"].strftime("%Y-%m%dT%H:%M:%SZ")
logMessageReceiver = {
"masid": 0,
"agentid": int(receiver),
"timestamp": timestamp,
"topic": topic,
"msg": "ACL receive",
"data": "Sender: " + str(agentid) + ";Receiver: " + str(receiver) + ";Timestamp: " +logMessage["timestamp"].strftime("%Y-%m-%dT%H:%M:%SZ")
}
data.append(logMessage)
data.append(logMessageReceiver)
else:
data.append(logMessage)
data = sorted(data, key=lambda k: k["timestamp"], reverse=True)
for log in data:
log["timestamp"] = log["timestamp"].strftime('%Y-%m-%dT%H:%M:%SZ')
with open("logs.json", "w") as write_file:
json.dump(data, write_file)
\ No newline at end of file
......@@ -92,6 +92,8 @@ func (agent *Agent) NewMessageBehavior(protocol int,
// Start initiates the handling of messages
func (protBehavior *aclProtocolBehavior) Start() {
// log the start
protBehavior.ag.Logger.NewLog("beh", "protocol start", "")
// register protocol handler
protBehavior.ag.ACL.registerProtocolChannel(protBehavior.protocol, protBehavior.msgIn)
// execute
......@@ -112,9 +114,15 @@ func (protBehavior *aclProtocolBehavior) task() {
select {
case msg := <-protBehavior.msgIn:
if handle, ok := protBehavior.handlePerformative[msg.Performative]; ok {
start := time.Now()
handle(msg)
end := time.Now()
protBehavior.ag.Logger.NewBehStats(start, end, "protocol")
} else {
start := time.Now()
protBehavior.handleDefault(msg)
end := time.Now()
protBehavior.ag.Logger.NewBehStats(start, end, "protocol")
}
case command := <-protBehavior.ctrl:
switch command {
......@@ -129,6 +137,8 @@ func (protBehavior *aclProtocolBehavior) task() {
// Stop terminates the message handling
func (protBehavior *aclProtocolBehavior) Stop() {
// log the stop
protBehavior.ag.Logger.NewLog("beh", "protocol stop", "")
// deregister handler
protBehavior.ag.ACL.deregisterProtocolChannel(protBehavior.protocol)
// stop message handling
......@@ -166,6 +176,8 @@ func (agent *Agent) NewMQTTTopicBehavior(topic string,
// Start initiates the handling of messages
func (mqttBehavior *mqttTopicBehavior) Start() {
// log the start
mqttBehavior.ag.Logger.NewLog("beh", "mqtt behavior starts; Topic:"+mqttBehavior.topic, "")
// register protocol handle
mqttBehavior.ag.MQTT.registerTopicChannel(mqttBehavior.topic, mqttBehavior.msgIn)
// execute
......@@ -174,7 +186,7 @@ func (mqttBehavior *mqttTopicBehavior) Start() {
// task performs the execution of the handle function
func (mqttBehavior *mqttTopicBehavior) task() {
mqttBehavior.logInfo.Println("Starting mqtt behavior for agent ", mqttBehavior.ag.GetAgentID())
mqttBehavior.ag.Logger.NewLog("app", "executing mqtt task......", "")
for {
mqttBehavior.ag.mutex.Lock()
act := mqttBehavior.ag.active
......@@ -184,7 +196,12 @@ func (mqttBehavior *mqttTopicBehavior) task() {
}
select {
case msg := <-mqttBehavior.msgIn:
start := time.Now()
mqttBehavior.handle(msg)
end := time.Now()
mqttBehavior.ag.Logger.NewBehStats(start, end, "mqtt")
mqttBehavior.ag.Logger.NewLog("beh", msg.String()+";start: "+start.String()+
";end: "+end.String()+";duration:"+end.Sub(start).String(), "")
case command := <-mqttBehavior.ctrl:
switch command {
case -1:
......@@ -198,6 +215,8 @@ func (mqttBehavior *mqttTopicBehavior) task() {
// Stop terminates the message handling
func (mqttBehavior *mqttTopicBehavior) Stop() {
// log the stop
mqttBehavior.ag.Logger.NewLog("beh", "mqtt stop", "")
// deregister handler
mqttBehavior.ag.MQTT.deregisterTopicChannel(mqttBehavior.topic)
// stop message handling
......@@ -233,6 +252,8 @@ func (agent *Agent) NewPeriodicBehavior(period time.Duration,
// Start initiates the handling of messages
func (periodBehavior *periodicBehavior) Start() {
// log the start
periodBehavior.ag.Logger.NewLog("beh", "period-start", "")
// execute
go periodBehavior.task()
}
......@@ -258,13 +279,19 @@ func (periodBehavior *periodicBehavior) task() {
return
}
default:
start := time.Now()
periodBehavior.handle()
end := time.Now()
periodBehavior.ag.Logger.NewBehStats(start, end, "protocol")
}
}
}
// Stop terminates the message handling
func (periodBehavior *periodicBehavior) Stop() {
// log the stop
periodBehavior.ag.Logger.NewLog("beh", "period-stop", "")
// stop message handling
periodBehavior.ctrl <- -1
}
......@@ -299,6 +326,8 @@ func (agent *Agent) NewCustomUpdateBehavior(
// Start initiates the handling of messages
func (custUpBehavior *customUpdateBehavior) Start() {
// log the start
custUpBehavior.ag.Logger.NewLog("beh", "custom", "start")
custUpBehavior.ag.registerCustomUpdateChannel(custUpBehavior.customIn)
// execute
go custUpBehavior.task()
......@@ -317,7 +346,10 @@ func (custUpBehavior *customUpdateBehavior) task() {
}
select {
case custom := <-custUpBehavior.customIn:
start := time.Now()
custUpBehavior.handle(custom)
end := time.Now()
custUpBehavior.ag.Logger.NewBehStats(start, end, "custom")
case command := <-custUpBehavior.ctrl:
switch command {
case -1:
......@@ -331,6 +363,8 @@ func (custUpBehavior *customUpdateBehavior) task() {
// Stop terminates the behavior
func (custUpBehavior *customUpdateBehavior) Stop() {
// log the stop
custUpBehavior.ag.Logger.NewLog("beh", "custom-stop", "")
custUpBehavior.ag.deregisterCustomUpdateChannel()
// stop behavior
custUpBehavior.ctrl <- -1
......
......@@ -368,10 +368,10 @@ func (ams *AMS) configureMAS(masSpec schemas.MASSpec) (masInfo schemas.MASInfo,
masInfo.Agents.Inst[agentID].ImageGroupID = i
masInfo.Agents.Inst[agentID].Address.Agency = "-im-" + strconv.Itoa(i) + "-agency-" +
strconv.Itoa(j/masSpec.Config.NumAgentsPerAgency)
for j := range masInfo.Graph.Node {
if masInfo.Graph.Node[j].ID == masInfo.Agents.Inst[i].Spec.NodeID {
masInfo.Graph.Node[j].Agent = append(masInfo.Graph.Node[j].Agent,
masInfo.Agents.Inst[i].ID)
for k := range masInfo.Graph.Node {
if masInfo.Graph.Node[k].ID == masInfo.Agents.Inst[j].Spec.NodeID {
masInfo.Graph.Node[k].Agent = append(masInfo.Graph.Node[k].Agent,
masInfo.Agents.Inst[j].ID)
break
}
}
......
......@@ -125,6 +125,21 @@ func (cli *LoggerClient) PostLogSeries(masID int, logs []schemas.LogSeries) (htt
return
}
// GetLogSeriesNames gets log series by its name
func (cli *LoggerClient) GetLogSeriesNames(masID int, agentID int) (names []string, httpStatus int, err error) {
var body []byte
body, httpStatus, err = httpretry.Get(cli.httpClient, cli.prefix()+"/api/series/"+
strconv.Itoa(masID)+"/"+strconv.Itoa(agentID)+"/names", time.Second*2, 4)
if err != nil {
return
}
err = json.Unmarshal(body, &names)
if err != nil {
names = []string{}
}
return
}
// GetLogSeriesByName gets log series by its name
func (cli *LoggerClient) GetLogSeriesByName(masID int, agentID int, name string, start string, end string) (series []schemas.LogSeries, httpStatus int, err error) {
var body []byte
......@@ -140,18 +155,38 @@ func (cli *LoggerClient) GetLogSeriesByName(masID int, agentID int, name string,
return
}
// GetLogSeriesNames gets log series by its name
func (cli *LoggerClient) GetLogSeriesNames(masID int, agentID int) (names []string, httpStatus int, err error) {
// GetMsgHeatMap gets msg communication frequency
func (cli *LoggerClient) GetMsgHeatmap(masID int) (heatmap []string, httpStatus int, err error) {
var body []byte
body, httpStatus, err = httpretry.Get(cli.httpClient, cli.prefix()+"/api/series/"+
strconv.Itoa(masID)+"/"+strconv.Itoa(agentID)+"/names", time.Second*2, 4)
body, httpStatus, err = httpretry.Get(cli.httpClient, cli.prefix()+"/api/stats/"+
strconv.Itoa(masID)+"/heatmap", time.Second*2, 4)
if err != nil {
return
}
err = json.Unmarshal(body, &names)
err = json.Unmarshal(body, &heatmap)
if err != nil {
names = []string{}
heatmap = []string{}
}
return
}
// PostBehStats posts new behavior logs to logger
func (cli *LoggerClient) PostBehStats(masID int, logs []schemas.BehStats) (httpStatus int, err error) {
js, _ := json.Marshal(logs)
_, httpStatus, err = httpretry.Post(cli.httpClient, cli.prefix()+"/api/stats/"+
strconv.Itoa(masID), "application/json", js, time.Second*2, 4)
return
}
// GetMsgHeatMap gets msg communication frequency
func (cli *LoggerClient) GetStats(masID int, agentID int, method string, behtype string, start string, end string) (data float32, httpStatus int, err error) {
var body []byte
body, httpStatus, err = httpretry.Get(cli.httpClient, cli.prefix()+"/api/stats/"+
strconv.Itoa(masID)+"/"+strconv.Itoa(agentID)+"/"+method+"/"+behtype+"/"+start+"/"+end, time.Second*2, 4)
if err != nil {
return
}
err = json.Unmarshal(body, &data)
return
}
......@@ -213,6 +248,7 @@ type LogCollector struct {
logIn chan schemas.LogMessage // logging inbox
seriesName string
logSeriesIn chan schemas.LogSeries
behStatsIn chan schemas.BehStats
stateIn chan schemas.State
client *LoggerClient
config schemas.LoggerConfig
......@@ -258,6 +294,44 @@ func (logCol *LogCollector) storeLogSeries() (err error) {
}
}
// storeLogBeh periodically requests the logging service to store behavior logs
func (logCol *LogCollector) storeBehStats() (err error) {
if logCol.config.Active {
for {
if len(logCol.behStatsIn) > 0 {
numLogs := len(logCol.behStatsIn)
behStats := make([]schemas.BehStats, numLogs)
for i := 0; i < numLogs; i++ {
item := <-logCol.behStatsIn
behStats[i] = item
behStats[i].MASID = logCol.masID
}
_, err = logCol.client.PostBehStats(logCol.masID, behStats)
if err != nil {
logCol.logError.Println(err)
for i := range behStats {
logCol.behStatsIn <- behStats[i]
}
continue
}
}
tempTime := time.Now()
for {
time.Sleep