Commit e3098b4b authored by Qianwen's avatar Qianwen
Browse files

fix

parent f9202041
Pipeline #492293 passed with stages
in 3 minutes and 19 seconds
......@@ -81,7 +81,6 @@ func task(ag *agency.Agent) (err error) {
if id == 0 {
ag.MQTT.Subscribe("topic1", 1)
behMQTT, err := ag.NewMQTTTopicBehavior("topic1", display)
if err == nil {
behMQTT.Start()
}
......@@ -119,6 +118,8 @@ func task(ag *agency.Agent) (err error) {
if err != nil {
fmt.Println(err)
}
// log series
for i := 0; i < 5; i++ {
time.Sleep(2 * time.Second)
for idx := 1; idx < 5; idx++ {
......
......@@ -201,7 +201,7 @@ spec:
ports:
- port: 1883
protocol: TCP
nodePort: 31883
nodePort: 30883
name: mqtt
selector:
app: mosquitto
......@@ -347,8 +347,8 @@ spec:
spec:
containers:
- name: ams-container
image: registry.git.rwth-aachen.de/acs/public/cloud/mas/clonemap/ams
imagePullPolicy: Always
image: localtest/ams:latest
imagePullPolicy: Never
env:
- name: CLONEMAP_DEPLOYMENT_TYPE
value: "production"
......@@ -393,7 +393,8 @@ data:
CREATE TABLE clonemap.logging_debug ( masid int, agentid int, t timestamp, log varchar, PRIMARY KEY ((masid, agentid), t)) WITH CLUSTERING ORDER BY (t ASC);
CREATE TABLE clonemap.logging_beh ( masid int, agentid int, t timestamp, log varchar, PRIMARY KEY ((masid, agentid), t)) WITH CLUSTERING ORDER BY (t ASC);
CREATE TABLE clonemap.logging_series ( masid int, agentid int, name varchar, t timestamp, series varchar, PRIMARY KEY ((masid, agentid), name, t)) WITH CLUSTERING ORDER BY (name ASC, t ASC);
CREATE TABLE clonemap.beh_stats ( masid int, agentid int, behType varchar, start timestamp, end timestamp, duration int, PRIMARY KEY ((masid, agentid, behType), start)) WITH CLUSTERING ORDER BY (start ASC);
CREATE TABLE clonemap.heatmap ( masid int, t timestamp, item varchar, PRIMARY KEY (masid, t)) WITH CLUSTERING ORDER BY (t ASC);
CREATE TABLE clonemap.beh_stats ( masid int, agentid int, behType varchar, start timestamp, stats varchar, PRIMARY KEY ((masid, agentid, behType), start)) WITH CLUSTERING ORDER BY (start ASC);
CREATE TABLE clonemap.state ( masid int, agentid int, state varchar, PRIMARY KEY (masid, agentid));
EOF
......@@ -450,7 +451,7 @@ spec:
containers:
- name: cass-container
image: cassandra
imagePullPolicy: Always
imagePullPolicy: Never
ports:
- containerPort: 7000
name: intra-node
......@@ -565,8 +566,8 @@ spec:
spec:
containers:
- name: logger-container
image: registry.git.rwth-aachen.de/acs/public/cloud/mas/clonemap/logger
imagePullPolicy: Always
image: localtest/logger:latest
imagePullPolicy: Never
env:
- name: CLONEMAP_DEPLOYMENT_TYPE
value: "production"
......@@ -700,8 +701,8 @@ spec:
spec:
containers:
- name: fe-container
image: registry.git.rwth-aachen.de/acs/public/cloud/mas/clonemap/frontend
imagePullPolicy: Always
image: localtest/frontend:latest
imagePullPolicy: Never
env:
- name: CLONEMAP_DEPLOYMENT_TYPE
value: "production"
......
......@@ -259,24 +259,17 @@ func (stor *cassStorage) deleteAgentLogMessages(masID int, agentID int) (err err
func (stor *cassStorage) getMsgHeatmap(masID int, start time.Time, end time.Time) (heatmap map[[2]int]int, err error) {
heatmap = make(map[[2]int]int)
var iter *gocql.Iter
iter = stor.session.Query("SELECT log FROM logging_msg WHERE masid = ? AND "+
iter = stor.session.Query("SELECT item FROM heatmap WHERE masid = ? AND "+
"t > ? AND t < ?", masID, start, end).Iter()
var js []byte
for iter.Scan(&js) {
var logmsg schemas.LogMessage
err = json.Unmarshal(js, &logmsg)
var heatmapItem schemas.Heatmap
err = json.Unmarshal(js, &heatmapItem)
if err != nil {
return
}
if logmsg.Message == "ACL send" {
sender := logmsg.AgentID
recvnfo := strings.Split(logmsg.AdditionalData, ";")[1]
receiver, err := strconv.Atoi(strings.Split(recvnfo, ": ")[1])
if err == nil {
idx := [2]int{sender, receiver}
heatmap[idx] += 1
}
}
idx := [2]int{heatmapItem.Sender, heatmapItem.Receiver}
heatmap[idx] += 1
}
iter.Close()
return
......@@ -286,7 +279,7 @@ func (stor *cassStorage) getMsgHeatmap(masID int, start time.Time, end time.Time
// getStats get the data of a certain behtype
func (stor *cassStorage) getStats(masID int, agentID int, behType string, start time.Time, end time.Time) (statsInfo schemas.StatsInfo, err error) {
var iter *gocql.Iter
iter = stor.session.Query("SELECT series FROM beh_stats WHERE masid = ? AND agentid = ? AND "+
iter = stor.session.Query("SELECT stats FROM beh_stats WHERE masid = ? AND agentid = ? AND "+
"behType = ? AND start > ? AND start < ?", masID, agentID, behType, start, end).Iter()
var js []byte
for iter.Scan(&js) {
......@@ -366,6 +359,7 @@ func (stor *cassStorage) storeLogs(topic string) {
var logIn chan schemas.LogMessage
var err error
stmt := "INSERT INTO logging_" + topic + " (masid, agentid, t, log) VALUES (?, ?, ?, ?)"
stmtHeatmap := "INSERT INTO heatmap (masid, t, item) VALUES (?, ?, ?)"
if topic == "status" {
logIn = stor.logStatusIn
} else if topic == "app" {
......@@ -384,6 +378,7 @@ func (stor *cassStorage) storeLogs(topic string) {
for {
batch := gocql.NewBatch(gocql.UnloggedBatch)
batchHeatmap := gocql.NewBatch(gocql.UnloggedBatch)
log := <-logIn
var js []byte
js, err = json.Marshal(log)
......@@ -392,6 +387,19 @@ func (stor *cassStorage) storeLogs(topic string) {
}
batch.Query(stmt, log.MASID, log.AgentID, log.Timestamp, js)
size := len(js)
if log.Topic == "msg" && log.Message == "ACL send" {
sender := log.AgentID
recvnfo := strings.Split(log.AdditionalData, ";")[1]
receiver, err := strconv.Atoi(strings.Split(recvnfo, ": ")[1])
heatmapItem := schemas.Heatmap{
Sender: sender,
Receiver: receiver,
}
js, err = json.Marshal(heatmapItem)
if err == nil {
batchHeatmap.Query(stmtHeatmap, log.MASID, log.Timestamp, js)
}
}
for i := 0; i < 9; i++ {
// maximum of 10 operations in batch
if size > 25000 {
......@@ -406,6 +414,19 @@ func (stor *cassStorage) storeLogs(topic string) {
}
batch.Query(stmt, log.MASID, log.AgentID, log.Timestamp, js)
size += len(js)
if log.Topic == "msg" && log.Message == "ACL send" {
sender := log.AgentID
recvnfo := strings.Split(log.AdditionalData, ";")[1]
receiver, err := strconv.Atoi(strings.Split(recvnfo, ": ")[1])
heatmapItem := schemas.Heatmap{
Sender: sender,
Receiver: receiver,
}
js, err = json.Marshal(heatmapItem)
if err == nil {
batchHeatmap.Query(stmtHeatmap, log.MASID, log.Timestamp, js)
}
}
default:
empty = true
}
......@@ -417,6 +438,10 @@ func (stor *cassStorage) storeLogs(topic string) {
if err != nil {
fmt.Println(err)
}
err = stor.session.ExecuteBatch(batchHeatmap)
if err != nil {
fmt.Println(err)
}
}
}
......@@ -440,6 +465,7 @@ func (stor *cassStorage) storeSeries() {
if size > 25000 {
break
}
empty := false
select {
case series = <-stor.logSeriesIn:
js, err = json.Marshal(series)
......@@ -449,6 +475,9 @@ func (stor *cassStorage) storeSeries() {
batch.Query(stmt, series.MASID, series.AgentID, series.Name, series.Timestamp, js)
size += len(js)
default:
empty = true
}
if empty {
break
}
}
......@@ -462,7 +491,7 @@ func (stor *cassStorage) storeSeries() {
// storeStats stores the stats info in a batch operation
func (stor *cassStorage) storeStats() {
var err error
stmt := "INSERT INTO beh_stats (masid, agentid, behType, start, end, duration) VALUES (?, ?, ?, ?, ?, ?)"
stmt := "INSERT INTO beh_stats (masid, agentid, behType, start, stats) VALUES (?, ?, ?, ?, ?)"
for {
batch := gocql.NewBatch(gocql.UnloggedBatch)
......@@ -479,6 +508,7 @@ func (stor *cassStorage) storeStats() {
if size > 25000 {
break
}
empty := false
select {
case behStats = <-stor.behStatsIn:
js, err = json.Marshal(behStats)
......@@ -488,6 +518,9 @@ func (stor *cassStorage) storeStats() {
batch.Query(stmt, behStats.MASID, behStats.AgentID, behStats.BehType, behStats.Start, js)
size += len(js)
default:
empty = true
}
if empty {
break
}
}
......
......@@ -93,3 +93,8 @@ type Communication struct {
NumMsgSent int `json:"numsent"` // number of messages sent to this agent
NumMsgRecv int `json:"numrecv"` // number of messages received from this agent
}
type Heatmap struct {
Sender int `json:"sender"`
Receiver int `json:"receiver"`
}
......@@ -263,10 +263,9 @@ svg {
fill: var(--light-primary-color);
}
/* text {
.agentbox-text {
fill: white;
color: var(--primary-color);
} */
}
.text-title {
font: bold 30px;
......
......@@ -190,7 +190,12 @@
<path fill="var(--light-primary-color)" d="M0,-5L10,0L0,5"></path>
</marker>
<rect class="agentBox" *ngFor="let box of agentBox" [attr.x]="box.x" [attr.y]="box.y" [attr.width]="boxWidth" [attr.height]="boxHeight"></rect>
<text font-size="20px" *ngFor="let text of texts, index as i" [attr.x]="text.x" [attr.y]="text.y"> Agent {{text.textID}} </text>
<text font-size="20px" *ngFor="let text of texts, index as i"
class="agentbox-text"
[attr.x]="text.x"
[attr.y]="text.y"
[attr.fill]="white">
Agent {{text.textID}} </text>
<line stroke-dasharray="5, 5" class="vertical" *ngFor="let line of timeline" [attr.x1]="line.x1" [attr.y1]="line.y1" [attr.x2]="line.x2" [attr.y2]="height"/>
<line stroke="var(--light-primary-color)" stroke-width="4" *ngFor="let line of communications" [class.hidden]="line.hidden" [attr.x1]="line.x1" [attr.y1]="line.y1" [attr.x2]="line.x2" [attr.y2]="line.y2" marker-end="url(#arrow)"/>
<rect class="logBox" *ngFor="let box of logBoxes, index as i"
......@@ -232,6 +237,7 @@
xAxisLabel="time"
[xAxisTicks]="xAxisTicks"
[xAxisTickFormatting]="xAxisTickFormatting"
[rotateXAxisTicks]="true"
yAxisLabel="value"
[minRadius]="minRadius"
[maxRadius]="maxRadius"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment