Commit f9202041 authored by Qianwen Yuan's avatar Qianwen Yuan
Browse files

fix

parent 646ff488
Pipeline #491841 passed with stages
in 3 minutes and 21 seconds
......@@ -73,7 +73,8 @@ func handleDefault(msg schemas.ACLMessage) (err error) {
return
}
func task_test(ag *agency.Agent) (err error) {
func task(ag *agency.Agent) (err error) {
time.Sleep(10 * time.Second)
id := ag.GetAgentID()
// agent 0 subsribes the topic1
......@@ -98,33 +99,8 @@ func task_test(ag *agency.Agent) (err error) {
}
}
// new protocol behavior
/* handlePerformative := make(map[int]func(schemas.ACLMessage) error)
handlePerformative[0] = handleDefault
behPro, err := ag.NewMessageBehavior(0, handlePerformative, handleDefault)
if err != nil {
fmt.Println("protocol started with error")
ag.Logger.NewLog("beh", "protocol error", "")
} else {
behPro.Start()
time.Sleep(30 * time.Second)
} */
return
}
func task(ag *agency.Agent) (err error) {
time.Sleep(10 * time.Second)
id := ag.GetAgentID()
// app logs
cnt := rand.Intn(10)
for i := 0; i < cnt; i++ {
ag.Logger.NewLog("app", "This is agent "+strconv.Itoa(id), "")
time.Sleep(2 * time.Second)
}
// sends 40 messages randomly to other agents
for i := 0; i < 40; i++ {
// sends 20 messages randomly to other agents
for i := 0; i < 20; i++ {
interval := rand.Intn(5)
time.Sleep(time.Duration(interval) * time.Second)
recv := rand.Intn(20)
......
......@@ -393,8 +393,7 @@ data:
CREATE TABLE clonemap.logging_debug ( masid int, agentid int, t timestamp, log varchar, PRIMARY KEY ((masid, agentid), t)) WITH CLUSTERING ORDER BY (t ASC);
CREATE TABLE clonemap.logging_beh ( masid int, agentid int, t timestamp, log varchar, PRIMARY KEY ((masid, agentid), t)) WITH CLUSTERING ORDER BY (t ASC);
CREATE TABLE clonemap.logging_series ( masid int, agentid int, name varchar, t timestamp, series varchar, PRIMARY KEY ((masid, agentid), name, t)) WITH CLUSTERING ORDER BY (name ASC, t ASC);
CREATE TABLE clonemap.beh_statis ( masid int, agentid int, behType varchar, start timestamp, end timestamp, duration int, PRIMARY KEY ((masid, agentid, behType), start)) WITH CLUSTERING ORDER BY (start ASC);
CREATE TABLE clonemap.heatmap(masid int, sender int, receiver int, count counter, PRIMARY KEY(masid, sender, receiver));
CREATE TABLE clonemap.beh_stats ( masid int, agentid int, behType varchar, start timestamp, end timestamp, duration int, PRIMARY KEY ((masid, agentid, behType), start)) WITH CLUSTERING ORDER BY (start ASC);
CREATE TABLE clonemap.state ( masid int, agentid int, state varchar, PRIMARY KEY (masid, agentid));
EOF
......
......@@ -156,10 +156,10 @@ func (cli *LoggerClient) GetLogSeriesByName(masID int, agentID int, name string,
}
// GetMsgHeatMap gets msg communication frequency
func (cli *LoggerClient) GetMsgHeatmap(masID int) (heatmap []string, httpStatus int, err error) {
func (cli *LoggerClient) GetMsgHeatmap(masID int, start string, end string) (heatmap []string, httpStatus int, err error) {
var body []byte
body, httpStatus, err = httpretry.Get(cli.httpClient, cli.prefix()+"/api/stats/"+
strconv.Itoa(masID)+"/heatmap", time.Second*2, 4)
strconv.Itoa(masID)+"/heatmap"+"/"+start+"/"+end, time.Second*2, 4)
if err != nil {
return
}
......
......@@ -204,7 +204,7 @@ func (fe *Frontend) server(port int) (serv *http.Server) {
// api for logger
s.Path("/logging/series/{masid}/{agentid}/names").Methods("GET").HandlerFunc(fe.handleGetLogSeriesNames)
s.Path("/logging/series/{masid}/{agentid}/{name}/time/{start}/{end}").Methods("GET").HandlerFunc(fe.handleGetLogSeriesByName)
s.Path("/logging/stats/{masid}/heatmap").Methods("GET").HandlerFunc(fe.handleGetMsgHeatmap)
s.Path("/logging/stats/{masid}/heatmap/{start}/{end}").Methods("GET").HandlerFunc(fe.handleGetMsgHeatmap)
s.Path("/logging/stats/{masid}/{agentid}/{behtype}/{start}/{end}").Methods("GET").HandlerFunc(fe.handleGetStats)
s.Path("/logging/{masid}/{agentid}/{topic}/latest/{num}").Methods("GET").HandlerFunc(fe.handleGetNLatestLogs)
......
......@@ -223,8 +223,10 @@ func (fe *Frontend) handleGetMsgHeatmap(w http.ResponseWriter, r *http.Request)
fe.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
start := vars["start"]
end := vars["end"]
var heatmap []string
heatmap, _, cmapErr = fe.logClient.GetMsgHeatmap(masID)
heatmap, _, cmapErr = fe.logClient.GetMsgHeatmap(masID, start, end)
if cmapErr != nil {
httpErr = httpreply.CMAPError(w, cmapErr.Error())
fe.logErrors(r.URL.Path, cmapErr, httpErr)
......
......@@ -249,30 +249,38 @@ func (stor *cassStorage) getAgentLogSeriesNames(masID int, agentID int) (names [
return
}
// deleteAgentLogMessages deletes all log messages og an agent
// deleteAgentLogMessages deletes all log messages of an agent
func (stor *cassStorage) deleteAgentLogMessages(masID int, agentID int) (err error) {
return
}
// getMsgHeatmap get the msg communication frequency
func (stor *cassStorage) getMsgHeatmap(masID int) (heatmap map[[2]int]int, err error) {
scanner := stor.session.Query("SELECT sender, receiver, count FROM heatmap WHERE masid = ? ", masID).Iter().Scanner()
func (stor *cassStorage) getMsgHeatmap(masID int, start time.Time, end time.Time) (heatmap map[[2]int]int, err error) {
heatmap = make(map[[2]int]int)
for scanner.Next() {
var (
sender int
receiver int
count int
)
err = scanner.Scan(&sender, &receiver, &count)
var iter *gocql.Iter
iter = stor.session.Query("SELECT log FROM logging_msg WHERE masid = ? AND "+
"t > ? AND t < ?", masID, start, end).Iter()
var js []byte
for iter.Scan(&js) {
var logmsg schemas.LogMessage
err = json.Unmarshal(js, &logmsg)
if err != nil {
return
}
idx := [2]int{sender, receiver}
heatmap[idx] = count
if logmsg.Message == "ACL send" {
sender := logmsg.AgentID
recvnfo := strings.Split(logmsg.AdditionalData, ";")[1]
receiver, err := strconv.Atoi(strings.Split(recvnfo, ": ")[1])
if err == nil {
idx := [2]int{sender, receiver}
heatmap[idx] += 1
}
}
}
iter.Close()
return
}
// getStats get the data of a certain behtype
......@@ -353,7 +361,7 @@ func (stor *cassStorage) deleteAgentState(masID int, agentID int) (err error) {
return
}
// storeLogs stores the logs in a batch operation and store the communication frequency
// storeLogs stores the logs in a batch operation
func (stor *cassStorage) storeLogs(topic string) {
var logIn chan schemas.LogMessage
var err error
......@@ -382,12 +390,6 @@ func (stor *cassStorage) storeLogs(topic string) {
if err != nil {
fmt.Println(err)
}
if topic == "msg" && log.Message == "ACL send" {
recvStr := strings.Split(log.AdditionalData, ";")[1]
rec, _ := strconv.Atoi(strings.Split(recvStr, " ")[2])
stor.session.Query("UPDATE heatmap SET count=count+1 WHERE masid = ? AND sender = ? AND receiver = ?",
log.MASID, log.AgentID, rec).Exec()
}
batch.Query(stmt, log.MASID, log.AgentID, log.Timestamp, js)
size := len(js)
for i := 0; i < 9; i++ {
......@@ -403,12 +405,6 @@ func (stor *cassStorage) storeLogs(topic string) {
fmt.Println(err)
}
batch.Query(stmt, log.MASID, log.AgentID, log.Timestamp, js)
if topic == "msg" && log.Message == "ACL send" {
recvStr := strings.Split(log.AdditionalData, ";")[1]
rec, _ := strconv.Atoi(strings.Split(recvStr, " ")[2])
stor.session.Query("UPDATE heatmap SET count=count+1 WHERE masid = ? AND sender = ? AND receiver = ?",
log.MASID, log.AgentID, rec).Exec()
}
size += len(js)
default:
empty = true
......@@ -463,10 +459,10 @@ func (stor *cassStorage) storeSeries() {
}
}
// storeStatis stores the log series in a batch operation
// storeStats stores the stats info in a batch operation
func (stor *cassStorage) storeStats() {
var err error
stmt := "INSERT INTO beh_stats (masid, agentid, type, start, end, duration) VALUES (?, ?, ?, ?, ?, ?)"
stmt := "INSERT INTO beh_stats (masid, agentid, behType, start, end, duration) VALUES (?, ?, ?, ?, ?, ?)"
for {
batch := gocql.NewBatch(gocql.UnloggedBatch)
......
......@@ -335,8 +335,20 @@ func (logger *Logger) handleGetMsgHeatmap(w http.ResponseWriter, r *http.Request
if cmapErr != nil {
return
}
start, cmapErr := time.Parse("20060102150405", vars["start"])
if cmapErr != nil {
httpErr = httpreply.NotFoundError(w)
logger.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
end, cmapErr := time.Parse("20060102150405", vars["end"])
if cmapErr != nil {
httpErr = httpreply.NotFoundError(w)
logger.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
var heatmap map[[2]int]int
heatmap, cmapErr = logger.getMsgHeatmap(masID)
heatmap, cmapErr = logger.getMsgHeatmap(masID, start, end)
if cmapErr != nil {
httpErr := httpreply.CMAPError(w, cmapErr.Error())
logger.logErrors(r.URL.Path, cmapErr, httpErr)
......@@ -537,7 +549,7 @@ func (logger *Logger) server(port int) (serv *http.Server) {
s.Path("/series/{masid}").Methods("POST").HandlerFunc(logger.handlePostLogSeries)
s.Path("/stats/{masid}").Methods("POST").HandlerFunc(logger.handlePostBehsStats)
s.Path("/stats/{masid}/heatmap").Methods("GET").HandlerFunc(logger.handleGetMsgHeatmap)
s.Path("/stats/{masid}/heatmap/{start}/{end}").Methods("GET").HandlerFunc(logger.handleGetMsgHeatmap)
s.Path("/stats/{masid}/{agentid}/{behtype}/{start}/{end}").Methods("GET").HandlerFunc(logger.handleGetStats)
s.Path("/state/{masid}/{agentid}").Methods("GET").HandlerFunc(logger.handleGetState)
......
......@@ -171,8 +171,8 @@ func (logger *Logger) getAgentLogSeriesNames(masID int, agentID int) (names []st
// getMsgHeatmap return the frequency of message communication
func (logger *Logger) getMsgHeatmap(masID int) (heatmap map[[2]int]int, err error) {
heatmap, err = logger.stor.getMsgHeatmap(masID)
func (logger *Logger) getMsgHeatmap(masID int, start time.Time, end time.Time) (heatmap map[[2]int]int, err error) {
heatmap, err = logger.stor.getMsgHeatmap(masID, start, end)
return
}
......
......@@ -81,8 +81,8 @@ type storage interface {
// getAgentLogSeries get the log series
getAgentLogSeries(masID int, agentID int, name string, start time.Time, end time.Time) (series []schemas.LogSeries, err error)
// getMsgHeatmap get the msg communication frequency
getMsgHeatmap(masID int) (heatmap map[[2]int]int, err error)
// getMsgHeatmap get the msg communication frequency within a range
getMsgHeatmap(masID int, start time.Time, end time.Time) (heatmap map[[2]int]int, err error)
// getStats get the data of a certain behtype
getStats(masID int, agentID int, behType string, start time.Time, end time.Time) (statsInfo schemas.StatsInfo, err error)
......@@ -121,7 +121,6 @@ type localStorage struct {
type masStorage struct {
agents []agentStorage
msgCnt map[[2]int]int
}
type agentStorage struct {
......@@ -162,15 +161,6 @@ func (stor *localStorage) addAgentLogMessage(log schemas.LogMessage) (err error)
case "msg":
stor.mas[log.MASID].agents[log.AgentID].msgLogs = append(stor.mas[log.MASID].agents[log.AgentID].msgLogs,
log)
if log.Message == "ACL send" {
if stor.mas[log.MASID].msgCnt == nil {
stor.mas[log.MASID].msgCnt = make(map[[2]int]int)
}
recvStr := strings.Split(log.AdditionalData, ";")[1]
rec, _ := strconv.Atoi(strings.Split(recvStr, " ")[1])
idx := [2]int{log.AgentID, rec}
stor.mas[log.MASID].msgCnt[idx] += 1
}
case "status":
stor.mas[log.MASID].agents[log.AgentID].statLogs = append(stor.mas[log.MASID].agents[log.AgentID].statLogs,
log)
......@@ -449,11 +439,35 @@ func (stor *localStorage) getAgentLogSeries(masID int, agentID int, name string,
return
}
// getMsgHeatmap return the msg communication frequency
func (stor *localStorage) getMsgHeatmap(masID int) (heatmap map[[2]int]int, err error) {
// getMsgHeatmap return the msg communication frequency within a range
func (stor *localStorage) getMsgHeatmap(masID int, start time.Time, end time.Time) (heatmap map[[2]int]int, err error) {
heatmap = make(map[[2]int]int)
stor.mutex.Lock()
if masID < len(stor.mas) {
heatmap = stor.mas[masID].msgCnt
for agentID := 0; agentID < len(stor.mas[masID].agents); agentID++ {
length := len(stor.mas[masID].agents[agentID].msgLogs)
if length > 0 {
startIndex := sort.Search(length,
func(i int) bool {
return stor.mas[masID].agents[agentID].msgLogs[i].Timestamp.After(start)
})
endIndex := sort.Search(length,
func(i int) bool {
return stor.mas[masID].agents[agentID].msgLogs[i].Timestamp.After(end)
})
if endIndex-startIndex >= 0 {
for _, log := range stor.mas[masID].agents[agentID].msgLogs[startIndex:endIndex] {
if log.Message == "ACL send" {
sender := log.AgentID
recvnfo := strings.Split(log.AdditionalData, ";")[1]
receiver, err := strconv.Atoi(strings.Split(recvnfo, ": ")[1])
if err == nil {
idx := [2]int{sender, receiver}
heatmap[idx] += 1
}
}
}
}
}
}
stor.mutex.Unlock()
return
......
......@@ -340,7 +340,7 @@
[attr.x] = "750"
[attr.y]= "(legendWidth + 10) * i + 70"
[attr.fill]="gray"
> {{text[0]}}-{{text[1]}}</text>
> {{text}}</text>
<!-- <text class="xlabel" x="350" y="730"> agent ID of sender</text>
......
......@@ -30,7 +30,7 @@ export class LoggerComponent implements OnInit {
// parameters and variables for drawing logs
searchStartTime: string = "20210301000000";
searchEndTime: string = "20210331000000"
searchEndTime: string = "20211231000000"
isTopicSelected: boolean[] = [true, true, true, true, true, true];
topics: string[] = ["error", "debug", "msg", "status", "app", "beh" ];
width: number = 1500;
......@@ -103,7 +103,7 @@ export class LoggerComponent implements OnInit {
autoPartition: boolean = false;
partitionNum: number = 5;
colorPartitionEle: string[] = [];
colorLegendTexts: number[][] = [];
colorLegendTexts: string[] = [];
legendWidth = 30;
......@@ -613,15 +613,25 @@ export class LoggerComponent implements OnInit {
return colors;
}
getColorLegendTexts(values: number[]): number[][] {
getColorLegendTexts(values: number[]): string[] {
const quo: number = Math.floor(values.length / this.partitionNum)
const remainder: number = values.length % this.partitionNum
let res: number[][] = []
let res: string[] = []
for (let i = 0; i < remainder; i++) {
res.push([values[i * (quo + 1)], values[(i + 1) * (quo + 1) - 1]])
if (values[i * (quo + 1)] === values[(i + 1) * (quo + 1) - 1]) {
res.push(values[i * (quo + 1)].toString())
} else {
res.push( values[i * (quo + 1)].toString() + "-" +
values[(i + 1) * (quo + 1) - 1].toString())
}
}
for (let i = remainder; i < this.partitionNum; i++) {
res.push([values[i * quo + remainder], values[(i + 1) * quo + remainder - 1]])
if (values[i * quo + remainder] === values[(i + 1) * quo + remainder - 1]) {
res.push(values[i * quo + remainder].toString())
} else {
res.push(values[i * quo + remainder].toString() + "-" +
values[(i + 1) * quo + remainder - 1].toString())
}
}
return res;
}
......@@ -649,7 +659,7 @@ export class LoggerComponent implements OnInit {
}
drawHeatmap() {
this.loggerService.getMsgHeatmap(this.selectedMASID.toString()).subscribe( (res: any) => {
this.loggerService.getMsgHeatmap(this.selectedMASID.toString(), this.searchStartTime, this.searchEndTime).subscribe( (res: any) => {
let values: number[] = [];
for (let i = 0; i < res.length; i++) {
values.push(parseInt(res[i].split("-")[2]))
......
......@@ -36,8 +36,8 @@ export class LoggerService {
return this.webReqService.get(`api/logging/series/${masid}/${agentid}/${name}/time/${start}/${end}`)
}
getMsgHeatmap(masid: string) {
return this.webReqService.get(`api/logging/stats/${masid}/heatmap`)
getMsgHeatmap(masid: string, start: string, end: string) {
return this.webReqService.get(`api/logging/stats/${masid}/heatmap/${start}/${end}`)
}
getBehavior(masid:string, agentid: string, behtype: string, start: string, end: string) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment