Commit c6eea1c2 authored by Qianwen's avatar Qianwen
Browse files

add log series

parent 1fa45805
Pipeline #462795 passed with stages
in 3 minutes and 18 seconds
......@@ -46,6 +46,7 @@ package main
import (
"fmt"
"math/rand"
"strconv"
"time"
......@@ -75,5 +76,10 @@ func task(ag *agency.Agent) (err error) {
if err != nil {
fmt.Println(err)
}
for i := 0; i < 20; i++ {
time.Sleep(2 * time.Second)
idx := rand.Intn(4) + 1
ag.Logger.NewLogSeries("type"+strconv.Itoa(idx), rand.Intn(100))
}
return
}
......@@ -19,7 +19,7 @@
"imagegroups":[
{
"config":{
"image":"agency"
"image":"qianwen12/agency"
},
"agents":[
{
......
{
"config":{
"name":"test_log_series",
"agentsperagency":2,
"mqtt":{
"active":true
},
"df":{
"active":true
},
"logger":{
"active":true,
"msg":true,
"app":true,
"status":true,
"debug":true
}
},
"imagegroups":[
{
"config":{
"image":"qianwen12/agency1"
},
"agents":[
{
"nodeid":0,
"name":"agent0"
},
{
"nodeid":0,
"name":"agent1"
}
]
}
],
"graph":{
"node":[
{
"id": 0,
"agents": [0, 1]
}
],
"edge":[
]
}
}
\ No newline at end of file
......@@ -117,6 +117,29 @@ func (cli *LoggerClient) GetLogsInRange(masID int, agentID int, topic string, st
return
}
// PostLogSeries posts new log series to logger
func (cli *LoggerClient) PostLogSeries(masID int, logs []schemas.LogSeries) (httpStatus int, err error) {
js, _ := json.Marshal(logs)
_, httpStatus, err = httpretry.Post(cli.httpClient, cli.prefix()+"/api/series/"+
strconv.Itoa(masID), "application/json", js, time.Second*2, 4)
return
}
// GetLogSeries gets log series with its name
func (cli *LoggerClient) GetLogSeries(masID int, agentID int) (series []schemas.LogSeries, httpStatus int, err error) {
var body []byte
body, httpStatus, err = httpretry.Get(cli.httpClient, cli.prefix()+"/api/series/"+
strconv.Itoa(masID)+"/"+strconv.Itoa(agentID), time.Second*2, 4)
if err != nil {
return
}
err = json.Unmarshal(body, &series)
if err != nil {
series = []schemas.LogSeries{}
}
return
}
// PutState updates the state
func (cli *LoggerClient) PutState(state schemas.State) (httpStatus int, err error) {
js, _ := json.Marshal(state)
......@@ -171,13 +194,52 @@ func NewLoggerClient(host string, port int, timeout time.Duration, del time.Dura
// LogCollector collects logs and states and sends them to the Logger service
// one LogCollector per agency is used; each agent obtains one AgentLogger
type LogCollector struct {
masID int
logIn chan schemas.LogMessage // logging inbox
stateIn chan schemas.State
client *LoggerClient
config schemas.LoggerConfig
logError *log.Logger
logInfo *log.Logger
masID int
logIn chan schemas.LogMessage // logging inbox
logSeriesIn chan schemas.LogSeries
stateIn chan schemas.State
client *LoggerClient
config schemas.LoggerConfig
logError *log.Logger
logInfo *log.Logger
}
// storeLogSeries periodically requests the logging service to store log series
func (logCol *LogCollector) storeLogSeries() (err error) {
if logCol.config.Active {
for {
if len(logCol.logSeriesIn) > 0 {
numSeries := len(logCol.logSeriesIn)
logSeries := make([]schemas.LogSeries, numSeries)
for i := 0; i < numSeries; i++ {
logMsg := <-logCol.logSeriesIn
logSeries[i] = logMsg
logSeries[i].MASID = logCol.masID
}
_, err = logCol.client.PostLogSeries(logCol.masID, logSeries)
if err != nil {
logCol.logError.Println(err)
for i := range logSeries {
logCol.logSeriesIn <- logSeries[i]
}
continue
}
}
tempTime := time.Now()
for {
time.Sleep(100 * time.Millisecond)
if time.Since(tempTime).Seconds() > 15 || len(logCol.logSeriesIn) > 50 {
break
}
}
}
} else {
for {
// print messages to stdout if logger is turned off
logMsg := <-logCol.logSeriesIn
logCol.logInfo.Println(logMsg)
}
}
}
// storeLogs periodically requests the logging service to store log messages
......@@ -275,8 +337,10 @@ func NewLogCollector(masID int, config schemas.LoggerConfig, logErr *log.Logger,
logCol.client = NewLoggerClient(config.Host, config.Port, time.Second*60, time.Second*1, 4)
}
logCol.logIn = make(chan schemas.LogMessage, 10000)
logCol.logSeriesIn = make(chan schemas.LogSeries, 10000)
logCol.stateIn = make(chan schemas.State, 10000)
go logCol.storeLogs()
go logCol.storeLogSeries()
go logCol.storeState()
logCol.logInfo.Println("Created new logger client; status: ", logCol.config.Active)
return
......@@ -284,15 +348,16 @@ func NewLogCollector(masID int, config schemas.LoggerConfig, logErr *log.Logger,
// AgentLogger is the endpoint for agents
type AgentLogger struct {
agentID int
masID int
client *LoggerClient
logOut chan schemas.LogMessage // logging inbox
stateOut chan schemas.State
mutex *sync.Mutex
logError *log.Logger
logInfo *log.Logger
active bool
agentID int
masID int
client *LoggerClient
logOut chan schemas.LogMessage // logging inbox
logSeriesOut chan schemas.LogSeries
stateOut chan schemas.State
mutex *sync.Mutex
logError *log.Logger
logInfo *log.Logger
active bool
}
// NewLog sends a new logging message to the logging service
......@@ -325,6 +390,29 @@ func (agLog *AgentLogger) NewLog(topic string, message string, data string) (err
return
}
// NewLogSeries sends a new logging series to
func (agLog *AgentLogger) NewLogSeries(name string, value int) (err error) {
if agLog == nil {
return
}
agLog.mutex.Lock()
if !agLog.active {
agLog.mutex.Unlock()
return errors.New("logger not active")
}
time.Sleep(time.Millisecond * 5)
tStamp := time.Now()
agLog.mutex.Unlock()
logSeries := schemas.LogSeries{
AgentID: agLog.agentID,
Timestamp: tStamp,
Name: name,
Value: value,
}
agLog.logSeriesOut <- logSeries
return
}
// UpdateState overrides the state stored in database
func (agLog *AgentLogger) UpdateState(state string) (err error) {
agLog.mutex.Lock()
......@@ -357,19 +445,20 @@ func (agLog *AgentLogger) RestoreState() (state string, err error) {
return
}
// NewAgentLogger craetes a new object of type AgentLogger
// NewAgentLogger creates a new object of type AgentLogger
func (logCol *LogCollector) NewAgentLogger(agentID int, logErr *log.Logger,
logInf *log.Logger) (agLog *AgentLogger) {
agLog = &AgentLogger{
agentID: agentID,
masID: logCol.masID,
client: logCol.client,
logOut: logCol.logIn,
stateOut: logCol.stateIn,
mutex: &sync.Mutex{},
logError: logErr,
logInfo: logInf,
active: logCol.config.Active,
agentID: agentID,
masID: logCol.masID,
client: logCol.client,
logOut: logCol.logIn,
logSeriesOut: logCol.logSeriesIn,
stateOut: logCol.stateIn,
mutex: &sync.Mutex{},
logError: logErr,
logInfo: logInf,
active: logCol.config.Active,
}
return
}
......
......@@ -221,6 +221,7 @@ func (fe *Frontend) server(port int) (serv *http.Server) {
s.Path("/df/{masid}/svc/desc/{desc}/node/{nodeid}/dist/{dist}").Methods("Get").HandlerFunc(fe.handleSvcWithDist)
// api for logger
s.Path("/logging/series/{masid}/{agentid}").Methods("GET").HandlerFunc(fe.handleGetLogSeries)
s.Path("/logging/{masid}/{agentid}/{topic}/latest/{num}").Methods("GET").HandlerFunc(fe.handleGetNLatestLogs)
s.Path("/logging/{masid}/list").Methods("POST").HandlerFunc(fe.handlePostLogs)
s.Path("/logging/{masid}/{agentid}/{topic}/time/{start}/{end}").Methods("GET").HandlerFunc(fe.handleGetLogsInRange)
......
......@@ -159,5 +159,26 @@ func (fe *Frontend) handleGetLogsInRange(w http.ResponseWriter, r *http.Request)
httpErr = httpreply.Resource(w, msgs, cmapErr)
fe.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
// handleGetLogSeries is the handler to /api/logging/series/{masid}/{agentid}
func (fe *Frontend) handleGetLogSeries(w http.ResponseWriter, r *http.Request) {
var cmapErr, httpErr error
masID, agentID, cmapErr := getAgentID(r)
if cmapErr != nil {
httpErr = httpreply.NotFoundError(w)
fe.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
var series []schemas.LogSeries
series, _, cmapErr = fe.logClient.GetLogSeries(masID, agentID)
if cmapErr != nil {
httpErr = httpreply.CMAPError(w, cmapErr.Error())
fe.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
httpErr = httpreply.Resource(w, series, cmapErr)
fe.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
......@@ -71,6 +71,7 @@ type cassStorage struct {
logDebugIn chan schemas.LogMessage // logging inbox
logMsgIn chan schemas.LogMessage // logging inbox
stateIn chan schemas.State // state inbox
logSeriesIn chan schemas.LogSeries // logging inbox
}
// addAgentLogMessage adds an entry to specified logging entry
......@@ -182,6 +183,17 @@ func (stor *cassStorage) getAgentLogMessagesInRange(masID int, agentID int, topi
return
}
// addAgentLogSeries add log series
func (stor *cassStorage) addAgentLogSeries(series schemas.LogSeries) {
stor.logSeriesIn <- series
return
}
// getAgentLogSeries get log series
func (stor *cassStorage) getAgentLogSeries(masID int, agentID int) (series []schemas.LogSeries, err error) {
return
}
// deleteAgentLogMessages deletes all log messages og an agent
func (stor *cassStorage) deleteAgentLogMessages(masID int, agentID int) (err error) {
......@@ -361,6 +373,7 @@ func newCassandraStorage(ip []string, user string, pass string) (stor storage, e
temp.logErrorIn = make(chan schemas.LogMessage, 10000)
temp.logMsgIn = make(chan schemas.LogMessage, 10000)
temp.stateIn = make(chan schemas.State, 10000)
temp.logSeriesIn = make(chan schemas.LogSeries, 10000)
for i := 0; i < 3; i++ {
go temp.storeLogs("status")
......
......@@ -163,6 +163,29 @@ func (logger *Logger) handlePostLogMsgList(w http.ResponseWriter, r *http.Reques
logger.logErrors(r.URL.Path, cmapErr, httpErr)
}
// handlePostLogSeries is the handler for post requests to path /api/series/{masid}/
func (logger *Logger) handlePostLogSeries(w http.ResponseWriter, r *http.Request) {
var cmapErr, httpErr error
// create new log message entry
var body []byte
body, cmapErr = ioutil.ReadAll(r.Body)
if cmapErr != nil {
httpErr = httpreply.NotFoundError(w)
logger.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
var logSeries []schemas.LogSeries
cmapErr = json.Unmarshal(body, &logSeries)
if cmapErr != nil {
httpErr = httpreply.JSONUnmarshalError(w)
logger.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
go logger.addAgentLogSeries(logSeries)
httpErr = httpreply.Created(w, nil, "text/plain", []byte("Ressource Created"))
logger.logErrors(r.URL.Path, cmapErr, httpErr)
}
// handleGetLogsLatest is the handler for requests to path
// /api/logging/{masid}/{agentid}/{topic}/latest/{num}
func (logger *Logger) handleGetLogsLatest(w http.ResponseWriter, r *http.Request) {
......@@ -228,6 +251,26 @@ func (logger *Logger) handleGetLogsTime(w http.ResponseWriter, r *http.Request)
logger.logErrors(r.URL.Path, cmapErr, httpErr)
}
// handleGetLogSeries is the handler for get requests to path /api/series/{masid}/{agentid}/{name}
func (logger *Logger) handleGetLogSeries(w http.ResponseWriter, r *http.Request) {
var cmapErr, httpErr error
masID, agentID, cmapErr := getAgentID(r)
if cmapErr != nil {
httpErr = httpreply.NotFoundError(w)
logger.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
var logSeries []schemas.LogSeries
logSeries, cmapErr = logger.getAgentLogSeries(masID, agentID)
if cmapErr != nil {
httpErr = httpreply.CMAPError(w, cmapErr.Error())
logger.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
httpErr = httpreply.Resource(w, logSeries, cmapErr)
logger.logErrors(r.URL.Path, cmapErr, httpErr)
}
// handleGetState is the handler for get requests to path /api/state/{masid}/{agentid}
func (logger *Logger) handleGetState(w http.ResponseWriter, r *http.Request) {
var cmapErr, httpErr error
......@@ -378,6 +421,10 @@ func (logger *Logger) server(port int) (serv *http.Server) {
HandlerFunc(logger.handleGetLogsTime)
s.Path("/logging/{masid}/{agentid}/{topic}/time/{start}/{end}").
Methods("POST", "PUT", "DELETE").HandlerFunc(logger.methodNotAllowed)
s.Path("/series/{masid}/{agentid}").Methods("GET").HandlerFunc(logger.handleGetLogSeries)
s.Path("/series/{masid}").Methods("POST").HandlerFunc(logger.handlePostLogSeries)
s.Path("/state/{masid}/{agentid}").Methods("GET").HandlerFunc(logger.handleGetState)
s.Path("/state/{masid}/{agentid}").Methods("PUT").HandlerFunc(logger.handlePutState)
s.Path("/state/{masid}/{agentid}").Methods("POST", "DELETE").
......
......@@ -129,6 +129,13 @@ func (logger *Logger) addAgentLogMessageList(logmsg []schemas.LogMessage) (err e
return
}
// addAgentLogSeries
func (logger *Logger) addAgentLogSeries(logseries []schemas.LogSeries) {
for i := 0; i < len(logseries); i++ {
logger.stor.addAgentLogSeries(logseries[i])
}
}
// getLatestAgentLogMessages return the latest num log messages
func (logger *Logger) getLatestAgentLogMessages(masID int, agentID int, topic string,
num int) (logs []schemas.LogMessage, err error) {
......@@ -143,6 +150,12 @@ func (logger *Logger) getAgentLogMessagesInRange(masID int, agentID int, topic s
return
}
// getLogSeries return all the log series of a specific name
func (logger *Logger) getAgentLogSeries(masID int, agentID int) (series []schemas.LogSeries, err error) {
series, err = logger.stor.getAgentLogSeries(masID, agentID)
return
}
// updateCommunication updates communication data of agent
func (logger *Logger) updateCommunication(masID int, agentID int,
comm []schemas.Communication) (err error) {
......
......@@ -66,6 +66,12 @@ type storage interface {
getAgentLogMessagesInRange(masID int, agentID int, topic string, start time.Time,
end time.Time) (logs []schemas.LogMessage, err error)
// addAgentLogSeries add the log series
addAgentLogSeries(series schemas.LogSeries)
// getAgentLogSeries get the log series
getAgentLogSeries(masID int, agentID int) (series []schemas.LogSeries, err error)
// deleteAgentLogMessages deletes all log messages og an agent
deleteAgentLogMessages(masID int, agentID int) (err error)
......@@ -103,13 +109,14 @@ type masStorage struct {
}
type agentStorage struct {
errLogs []schemas.LogMessage
dbgLogs []schemas.LogMessage
msgLogs []schemas.LogMessage
statLogs []schemas.LogMessage
appLogs []schemas.LogMessage
state schemas.State
commData []schemas.Communication
errLogs []schemas.LogMessage
dbgLogs []schemas.LogMessage
msgLogs []schemas.LogMessage
statLogs []schemas.LogMessage
appLogs []schemas.LogMessage
logSeries []schemas.LogSeries
state schemas.State
commData []schemas.Communication
}
// addAgentLogMessage adds an entry to specified logging entry
......@@ -298,6 +305,40 @@ func (stor *localStorage) getAgentLogMessagesInRange(masID int, agentID int, top
return
}
// addAgentLogSeries add the log series
func (stor *localStorage) addAgentLogSeries(series schemas.LogSeries) {
stor.mutex.Lock()
numMAS := len(stor.mas)
if numMAS <= series.MASID {
for i := 0; i < series.MASID-numMAS+1; i++ {
stor.mas = append(stor.mas, masStorage{})
}
}
numAgents := len(stor.mas[series.MASID].agents)
if numAgents <= series.AgentID {
for i := 0; i < series.AgentID-numAgents+1; i++ {
stor.mas[series.MASID].agents = append(stor.mas[series.MASID].agents, agentStorage{})
}
}
stor.mas[series.MASID].agents[series.AgentID].logSeries = append(stor.mas[series.MASID].agents[series.AgentID].logSeries, series)
stor.mutex.Unlock()
return
}
// getAgentLogSeries return the log series
func (stor *localStorage) getAgentLogSeries(masID int, agentID int) (series []schemas.LogSeries, err error) {
stor.mutex.Lock()
if masID < len(stor.mas) {
if agentID < len(stor.mas[masID].agents) {
series = make([]schemas.LogSeries, len(stor.mas[masID].agents[agentID].logSeries))
copy(series, stor.mas[masID].agents[agentID].logSeries)
}
}
stor.mutex.Unlock()
return
}
// deleteAgentLogMessages deletes all log messages og an agent
func (stor *localStorage) deleteAgentLogMessages(masID int, agentID int) (err error) {
stor.mutex.Lock()
......
......@@ -69,6 +69,15 @@ type LogMessage struct {
AdditionalData string `json:"data,omitempty"` // additional information e.g in json
}
// LogSeries contains series data of a single agent
type LogSeries struct {
MASID int `json:"masid"` // ID of MAS agent runs in
AgentID int `json:"agentid"` // ID of agent
Timestamp time.Time `json:"timestamp"` // start time of the logSeries
Name string `json:"name"` // name of the logSeries
Value int `json:"value"` // data of the logSeries
}
// State contains the state of an agent as byte array (json)
type State struct {
MASID int `json:"masid"` // ID of MAS agent runs in
......
#docker build -f build/docker/logger/Dockerfile -t qianwen12/logger .
docker build -f build/docker/frontend/Dockerfile -t qianwen12/frontend .
#docker build -f build/docker/agency/Dockerfile -t qianwen12/agency .
......@@ -1820,6 +1820,25 @@
"semver-intersect": "1.4.0"
}
},
"@swimlane/ngx-charts": {
"version": "17.0.1",
"resolved": "https://registry.npmjs.org/@swimlane/ngx-charts/-/ngx-charts-17.0.1.tgz",
"integrity": "sha512-4pvSznkFo/vM59YUnXH0Y/f8n9cUBBelHuh7UoNlMchl1jI083eFk0zK5fEL2sF3c+vvEpBeYB523GxWvWoifw==",
"requires": {
"d3-array": "^2.9.1",
"d3-brush": "^2.1.0",
"d3-color": "^2.0.0",
"d3-format": "^2.0.0",
"d3-hierarchy": "^2.0.0",
"d3-interpolate": "^2.0.1",
"d3-scale": "^3.2.3",
"d3-selection": "^2.0.0",
"d3-shape": "^2.0.0",
"d3-time-format": "^3.0.0",
"d3-transition": "^2.0.0",
"tslib": "^2.0.0"
}
},
"@types/cytoscape": {
"version": "3.14.13",
"resolved": "https://registry.npmjs.org/@types/cytoscape/-/cytoscape-3.14.13.tgz",
......@@ -4060,6 +4079,131 @@
"type": "^1.0.1"
}
},
"d3-array": {
"version": "2.12.1",
"resolved": "https://registry.npmjs.org/d3-array/-/d3-array-2.12.1.tgz",
"integrity": "sha512-B0ErZK/66mHtEsR1TkPEEkwdy+WDesimkM5gpZr5Dsg54BiTA5RXtYW5qTLIAcekaS9xfZrzBLF/OAkB3Qn1YQ==",
"requires": {
"internmap": "^1.0.0"
}
},
"d3-brush": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/d3-brush/-/d3-brush-2.1.0.tgz",
"integrity": "sha512-cHLLAFatBATyIKqZOkk/mDHUbzne2B3ZwxkzMHvFTCZCmLaXDpZRihQSn8UNXTkGD/3lb/W2sQz0etAftmHMJQ==",
"requires": {
"d3-dispatch": "1 - 2",
"d3-drag": "2",
"d3-interpolate": "1 - 2",
"d3-selection": "2",
"d3-transition": "2"
}
},
"d3-color": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/d3-color/-/d3-color-2.0.0.tgz",
"integrity": "sha512-SPXi0TSKPD4g9tw0NMZFnR95XVgUZiBH+uUTqQuDu1OsE2zomHU7ho0FISciaPvosimixwHFl3WHLGabv6dDgQ=="
},
"d3-dispatch": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/d3-dispatch/-/d3-dispatch-2.0.0.tgz",
"integrity": "sha512-S/m2VsXI7gAti2pBoLClFFTMOO1HTtT0j99AuXLoGFKO6deHDdnv6ZGTxSTTUTgO1zVcv82fCOtDjYK4EECmWA=="
},
"d3-drag": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/d3-drag/-/d3-drag-2.0.0.tgz",
"integrity": "sha512-g9y9WbMnF5uqB9qKqwIIa/921RYWzlUDv9Jl1/yONQwxbOfszAWTCm8u7HOTgJgRDXiRZN56cHT9pd24dmXs8w==",
"requires": {
"d3-dispatch": "1 - 2",
"d3-selection": "2"
}
},
"d3-ease": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/d3-ease/-/d3-ease-2.0.0.tgz",
"integrity": "sha512-68/n9JWarxXkOWMshcT5IcjbB+agblQUaIsbnXmrzejn2O82n3p2A9R2zEB9HIEFWKFwPAEDDN8gR0VdSAyyAQ=="
},
"d3-format": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/d3-format/-/d3-format-2.0.0.tgz",
"integrity": "sha512-Ab3S6XuE/Q+flY96HXT0jOXcM4EAClYFnRGY5zsjRGNy6qCYrQsMffs7cV5Q9xejb35zxW5hf/guKw34kvIKsA=="
},