Commit eb7752de authored by Stefan Dähling's avatar Stefan Dähling
Browse files

optimize logger

parent cea2433e
Pipeline #344103 passed with stages
in 1 minute and 16 seconds
......@@ -210,6 +210,7 @@ func pingPongft(ag *agency.Agent, config CustomAgentData) (err error) {
time.Sleep(time.Second * 40)
if config.Start {
state := 0
count := 0
var rtts []int
var msg schemas.ACLMessage
msg, err = ag.ACL.NewMessage(config.PeerID, schemas.FIPAProtQuery, schemas.FIPAPerfInform,
......@@ -225,10 +226,14 @@ func pingPongft(ag *agency.Agent, config CustomAgentData) (err error) {
if err != nil {
fmt.Println(err)
}
state++
err = ag.Logger.UpdateState(strconv.Itoa(state))
if err != nil {
fmt.Println(err)
count++
if count == 10 {
state++
err = ag.Logger.UpdateState(strconv.Itoa(state))
if err != nil {
fmt.Println(err)
}
count = 0
}
msg, err = ag.ACL.RecvMessageWait()
if err != nil {
......@@ -244,10 +249,14 @@ func pingPongft(ag *agency.Agent, config CustomAgentData) (err error) {
if err != nil {
fmt.Println(err)
}
state++
err = ag.Logger.UpdateState(strconv.Itoa(state))
if err != nil {
fmt.Println(err)
count++
if count == 10 {
state++
err = ag.Logger.UpdateState(strconv.Itoa(state))
if err != nil {
fmt.Println(err)
}
count = 0
}
msg, err = ag.ACL.RecvMessageWait()
rtt := time.Since(t).Nanoseconds()
......@@ -286,10 +295,14 @@ func pingPongft(ag *agency.Agent, config CustomAgentData) (err error) {
if err != nil {
fmt.Println(err)
}
state++
err = ag.Logger.UpdateState(strconv.Itoa(state))
if err != nil {
fmt.Println(err)
count++
if count == 10 {
state++
err = ag.Logger.UpdateState(strconv.Itoa(state))
if err != nil {
fmt.Println(err)
}
count = 0
}
msg, err = ag.ACL.RecvMessageWait()
if err != nil {
......
......@@ -47,9 +47,10 @@ package logger
import (
"encoding/json"
"errors"
"time"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
"github.com/gocql/gocql"
"time"
)
// cassStorage stores information regarding connection to cassandra cluster
......@@ -197,26 +198,19 @@ func (stor *cassStorage) getCommunication(masID int,
}
// updateAgentState updates the agent status
func (stor *cassStorage) updateAgentState(masID int, agentID int, state schemas.State) (err error) {
var js []byte
js, err = json.Marshal(state)
if err != nil {
return
}
func (stor *cassStorage) updateAgentState(masID int, agentID int, state []byte) (err error) {
err = stor.session.Query("INSERT INTO state (masid, agentid, state) "+
"VALUES (?, ?, ?)", masID, agentID, js).Exec()
"VALUES (?, ?, ?)", masID, agentID, state).Exec()
return
}
// getAgentState return the latest agent status
func (stor *cassStorage) getAgentState(masID int, agentID int) (state schemas.State, err error) {
func (stor *cassStorage) getAgentState(masID int, agentID int) (state []byte, err error) {
var iter *gocql.Iter
var js []byte
iter = stor.session.Query("SELECT state FROM state WHERE masid = ? AND agentid = ?", masID,
agentID).Iter()
if iter.NumRows() == 1 {
iter.Scan(&js)
err = json.Unmarshal(js, &state)
iter.Scan(&state)
}
iter.Close()
return
......
......@@ -298,21 +298,17 @@ func (logger *Logger) handleLogsTime(masID int, agentid int, logType string, sta
func (logger *Logger) handleState(masID int, agentid int, w http.ResponseWriter,
r *http.Request) (cmapErr, httpErr error) {
if r.Method == "GET" {
var state schemas.State
var state []byte
state, cmapErr = logger.getAgentState(masID, agentid)
httpErr = httpreply.Resource(w, state, cmapErr)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, httpErr = w.Write(state)
} else if r.Method == "PUT" {
var body []byte
body, cmapErr = ioutil.ReadAll(r.Body)
if cmapErr == nil {
var state schemas.State
cmapErr = json.Unmarshal(body, &state)
if cmapErr == nil {
go logger.updateAgentState(masID, agentid, state)
httpErr = httpreply.Updated(w, nil)
} else {
httpErr = httpreply.JSONUnmarshalError(w)
}
go logger.updateAgentState(masID, agentid, body)
httpErr = httpreply.Updated(w, nil)
} else {
httpErr = httpreply.InvalidBodyError(w)
}
......
......@@ -185,15 +185,13 @@ func (logger *Logger) getCommunication(masID int, agentID int) (comm []schemas.C
}
// getAgentState returns the latest agent state
func (logger *Logger) getAgentState(masID int, agentID int) (agState schemas.State,
err error) {
func (logger *Logger) getAgentState(masID int, agentID int) (agState []byte, err error) {
agState, err = logger.stor.getAgentState(masID, agentID)
return
}
// updateAgentState updates agent state
func (logger *Logger) updateAgentState(masID int, agentID int,
agState schemas.State) (err error) {
func (logger *Logger) updateAgentState(masID int, agentID int, agState []byte) (err error) {
err = logger.stor.updateAgentState(masID, agentID, agState)
return
}
......@@ -76,10 +76,10 @@ type storage interface {
getCommunication(masID int, agentID int) (commData []schemas.Communication, err error)
// updateAgentState updates the agent status
updateAgentState(masID int, agentID int, state schemas.State) (err error)
updateAgentState(masID int, agentID int, satte []byte) (err error)
// getAgentState return the latest agent status
getAgentState(masID int, agentID int) (state schemas.State, err error)
getAgentState(masID int, agentID int) (state []byte, err error)
// deleteAgentState deletes the status of an agent
deleteAgentState(masID int, agentID int) (err error)
......@@ -108,7 +108,7 @@ type agentStorage struct {
msgLogs []LogMessage
statLogs []LogMessage
appLogs []LogMessage
state schemas.State
state []byte
commData []schemas.Communication
}
......@@ -350,7 +350,7 @@ func (stor *localStorage) getCommunication(masID int,
}
// updateAgentState updates the agent status
func (stor *localStorage) updateAgentState(masID int, agentID int, state schemas.State) (err error) {
func (stor *localStorage) updateAgentState(masID int, agentID int, state []byte) (err error) {
stor.mutex.Lock()
numMAS := len(stor.mas)
if numMAS <= masID {
......@@ -370,7 +370,7 @@ func (stor *localStorage) updateAgentState(masID int, agentID int, state schemas
}
// getAgentState return the latest agent status
func (stor *localStorage) getAgentState(masID int, agentID int) (state schemas.State, err error) {
func (stor *localStorage) getAgentState(masID int, agentID int) (state []byte, err error) {
stor.mutex.Lock()
if masID < len(stor.mas) {
if agentID < len(stor.mas[masID].agents) {
......@@ -386,7 +386,7 @@ func (stor *localStorage) deleteAgentState(masID int, agentID int) (err error) {
stor.mutex.Lock()
if masID < len(stor.mas) {
if agentID < len(stor.mas[masID].agents) {
stor.mas[masID].agents[agentID].state.State = ""
stor.mas[masID].agents[agentID].state = nil
}
}
stor.mutex.Unlock()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment