Commit a903bfeb authored by Qianwen's avatar Qianwen
Browse files

add log series

parent 1fa45805
Pipeline #465037 passed with stages
in 3 minutes and 20 seconds
......@@ -46,6 +46,7 @@ package main
import (
"fmt"
"math/rand"
"strconv"
"time"
......@@ -75,5 +76,10 @@ func task(ag *agency.Agent) (err error) {
if err != nil {
fmt.Println(err)
}
for i := 0; i < 20; i++ {
time.Sleep(2 * time.Second)
idx := rand.Intn(4) + 1
ag.Logger.NewLogSeries("type"+strconv.Itoa(idx), rand.Intn(100))
}
return
}
......@@ -12,8 +12,7 @@ services:
- clonemap-net
logger:
#image: registry.git.rwth-aachen.de/acs/public/cloud/mas/clonemap/logger
image: qianwen12/logger
image: registry.git.rwth-aachen.de/acs/public/cloud/mas/clonemap/logger
environment:
CLONEMAP_DEPLOYMENT_TYPE: ${CLONEMAP_DEPLOYMENT_TYPE}
CLONEMAP_LOG_LEVEL: ${CLONEMAP_LOG_LEVEL}
......@@ -51,8 +50,7 @@ services:
- clonemap-net
fe:
# image: registry.git.rwth-aachen.de/acs/public/cloud/mas/clonemap/frontend
image: qianwen12/frontend
image: registry.git.rwth-aachen.de/acs/public/cloud/mas/clonemap/frontend
environment:
CLONEMAP_DEPLOYMENT_TYPE: ${CLONEMAP_DEPLOYMENT_TYPE}
CLONEMAP_LOG_LEVEL: ${CLONEMAP_LOG_LEVEL}
......
......@@ -390,6 +390,7 @@ data:
CREATE TABLE clonemap.logging_msg ( masid int, agentid int, t timestamp, log varchar, PRIMARY KEY ((masid, agentid), t)) WITH CLUSTERING ORDER BY (t ASC);
CREATE TABLE clonemap.logging_status ( masid int, agentid int, t timestamp, log varchar, PRIMARY KEY ((masid, agentid), t)) WITH CLUSTERING ORDER BY (t ASC);
CREATE TABLE clonemap.logging_debug ( masid int, agentid int, t timestamp, log varchar, PRIMARY KEY ((masid, agentid), t)) WITH CLUSTERING ORDER BY (t ASC);
CREATE TABLE clonemap.logging_series ( masid int, agentid int, t timestamp, log varchar, PRIMARY KEY ((masid, agentid), t)) WITH CLUSTERING ORDER BY (t ASC);
CREATE TABLE clonemap.state ( masid int, agentid int, state varchar, PRIMARY KEY (masid, agentid));
EOF
......@@ -566,7 +567,7 @@ spec:
- name: CLONEMAP_DEPLOYMENT_TYPE
value: "production"
- name: CLONEMAP_LOG_LEVEL
value: "error"
value: "info"
resources:
requests:
memory: "128Mi"
......@@ -662,7 +663,7 @@ metadata:
labels:
app: fe
spec:
type: NodePort
type: LoadBalancer
ports:
- port: 13000
protocol: TCP
......
{
"id": "0",
"agentid": 0,
"nodeid": 0,
"masid": 0,
"createdat": "2012-04-23T18:25:43.511Z" ,
"changedat": "2013-04-23T18:25:43.511Z" ,
"desc":"This is the description of the service",
"dist": 1.45
}
\ No newline at end of file
......@@ -19,7 +19,7 @@
"imagegroups":[
{
"config":{
"image":"agency"
"image":"qianwen12/agency"
},
"agents":[
{
......
[{"masid": 0, "agentid": 3, "timestamp": "2021-04-09T00:29:33Z", "topic": "debug", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 2, "timestamp": "2021-04-08T17:05:58Z", "topic": "debug", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 0, "timestamp": "2021-04-08T15:55:09Z", "topic": "app", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 1, "timestamp": "2021-04-07T22:01:46Z", "topic": "error", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 1, "timestamp": "2021-04-07T15:40:50Z", "topic": "status", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 2, "timestamp": "2021-04-07T12:12:05Z", "topic": "debug", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 0, "timestamp": "2021-04-07T11:54:12Z", "topic": "debug", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 0, "timestamp": "2021-04-06T08:44:20Z", "topic": "msg", "msg": "ACL send", "data": "Sender: 0;Receiver: 2;Timestamp: 2021-0406T08:44:20Z"}, {"masid": 0, "agentid": 2, "timestamp": "2021-04-06T08:44:20Z", "topic": "msg", "msg": "ACL receive", "data": "Sender: 0;Receiver: 2;Timestamp: 2021-04-06T08:44:20Z"}, {"masid": 0, "agentid": 4, "timestamp": "2021-04-06T08:25:30Z", "topic": "error", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 4, "timestamp": "2021-04-06T01:35:38Z", "topic": "error", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 3, "timestamp": "2021-04-05T17:56:59Z", "topic": "status", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 3, "timestamp": "2021-04-04T23:06:28Z", "topic": "debug", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 0, "timestamp": "2021-04-02T07:13:27Z", "topic": "app", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 1, "timestamp": "2021-03-31T07:08:06Z", "topic": "msg", "msg": "ACL send", "data": "Sender: 1;Receiver: 4;Timestamp: 2021-0331T07:08:06Z"}, {"masid": 0, "agentid": 4, "timestamp": "2021-03-31T07:08:06Z", "topic": "msg", "msg": "ACL receive", "data": "Sender: 1;Receiver: 4;Timestamp: 2021-03-31T07:08:06Z"}, {"masid": 0, "agentid": 2, "timestamp": "2021-03-30T17:37:29Z", "topic": "app", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 1, "timestamp": "2021-03-30T09:44:29Z", "topic": "status", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 4, "timestamp": "2021-03-29T05:44:12Z", "topic": "error", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 4, "timestamp": "2021-03-27T13:54:29Z", "topic": "app", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 0, "timestamp": "2021-03-27T12:08:32Z", "topic": "status", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 3, "timestamp": "2021-03-27T11:26:55Z", "topic": "msg", "msg": "ACL send", "data": "Sender: 3;Receiver: 0;Timestamp: 2021-0327T11:26:55Z"}, {"masid": 0, "agentid": 0, "timestamp": "2021-03-27T11:26:55Z", "topic": "msg", "msg": "ACL receive", "data": "Sender: 3;Receiver: 0;Timestamp: 2021-03-27T11:26:55Z"}, {"masid": 0, "agentid": 0, "timestamp": "2021-03-27T06:39:14Z", "topic": "error", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 2, "timestamp": "2021-03-26T15:41:50Z", "topic": "debug", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 1, "timestamp": "2021-03-26T08:06:05Z", "topic": "debug", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 4, "timestamp": "2021-03-25T21:23:48Z", "topic": "debug", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 2, "timestamp": "2021-03-25T21:22:33Z", "topic": "app", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 3, "timestamp": "2021-03-25T17:57:28Z", "topic": "error", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 4, "timestamp": "2021-03-25T08:49:05Z", "topic": "msg", "msg": "ACL send", "data": "Sender: 4;Receiver: 2;Timestamp: 2021-0325T08:49:05Z"}, {"masid": 0, "agentid": 2, "timestamp": "2021-03-25T08:49:05Z", "topic": "msg", "msg": "ACL receive", "data": "Sender: 4;Receiver: 2;Timestamp: 2021-03-25T08:49:05Z"}, {"masid": 0, "agentid": 4, "timestamp": "2021-03-25T03:09:24Z", "topic": "app", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 4, "timestamp": "2021-03-24T22:22:08Z", "topic": "app", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 3, "timestamp": "2021-03-24T18:19:09Z", "topic": "error", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 0, "timestamp": "2021-03-23T21:01:36Z", "topic": "status", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 2, "timestamp": "2021-03-23T20:02:50Z", "topic": "msg", "msg": "ACL send", "data": "Sender: 2;Receiver: 0;Timestamp: 2021-0323T20:02:50Z"}, {"masid": 0, "agentid": 0, "timestamp": "2021-03-23T20:02:50Z", "topic": "msg", "msg": "ACL receive", "data": "Sender: 2;Receiver: 0;Timestamp: 2021-03-23T20:02:50Z"}, {"masid": 0, "agentid": 0, "timestamp": "2021-03-23T05:27:32Z", "topic": "error", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 1, "timestamp": "2021-03-23T01:59:00Z", "topic": "app", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 0, "timestamp": "2021-03-21T20:10:01Z", "topic": "msg", "msg": "ACL send", "data": "Sender: 0;Receiver: 1;Timestamp: 2021-0321T20:10:01Z"}, {"masid": 0, "agentid": 1, "timestamp": "2021-03-21T20:10:01Z", "topic": "msg", "msg": "ACL receive", "data": "Sender: 0;Receiver: 1;Timestamp: 2021-03-21T20:10:01Z"}, {"masid": 0, "agentid": 3, "timestamp": "2021-03-21T15:40:07Z", "topic": "msg", "msg": "ACL send", "data": "Sender: 3;Receiver: 1;Timestamp: 2021-0321T15:40:07Z"}, {"masid": 0, "agentid": 1, "timestamp": "2021-03-21T15:40:07Z", "topic": "msg", "msg": "ACL receive", "data": "Sender: 3;Receiver: 1;Timestamp: 2021-03-21T15:40:07Z"}, {"masid": 0, "agentid": 1, "timestamp": "2021-03-21T00:42:36Z", "topic": "status", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 2, "timestamp": "2021-03-20T18:25:31Z", "topic": "error", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 4, "timestamp": "2021-03-20T11:11:00Z", "topic": "status", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 2, "timestamp": "2021-03-18T21:05:32Z", "topic": "app", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 0, "timestamp": "2021-03-17T12:00:46Z", "topic": "status", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 0, "timestamp": "2021-03-16T20:17:55Z", "topic": "debug", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 1, "timestamp": "2021-03-14T20:25:32Z", "topic": "app", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 2, "timestamp": "2021-03-14T18:54:56Z", "topic": "error", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 2, "timestamp": "2021-03-14T09:17:39Z", "topic": "msg", "msg": "ACL send", "data": "Sender: 2;Receiver: 4;Timestamp: 2021-0314T09:17:39Z"}, {"masid": 0, "agentid": 4, "timestamp": "2021-03-14T09:17:39Z", "topic": "msg", "msg": "ACL receive", "data": "Sender: 2;Receiver: 4;Timestamp: 2021-03-14T09:17:39Z"}, {"masid": 0, "agentid": 0, "timestamp": "2021-03-14T01:06:23Z", "topic": "msg", "msg": "ACL send", "data": "Sender: 0;Receiver: 4;Timestamp: 2021-0314T01:06:23Z"}, {"masid": 0, "agentid": 4, "timestamp": "2021-03-14T01:06:23Z", "topic": "msg", "msg": "ACL receive", "data": "Sender: 0;Receiver: 4;Timestamp: 2021-03-14T01:06:23Z"}, {"masid": 0, "agentid": 2, "timestamp": "2021-03-13T19:01:00Z", "topic": "debug", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 2, "timestamp": "2021-03-13T17:44:38Z", "topic": "status", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 3, "timestamp": "2021-03-12T19:27:50Z", "topic": "status", "msg": "msg", "data": ""}, {"masid": 0, "agentid": 1, "timestamp": "2021-03-12T09:07:13Z", "topic": "msg", "msg": "ACL send", "data": "Sender: 1;Receiver: 3;Timestamp: 2021-0312T09:07:13Z"}, {"masid": 0, "agentid": 3, "timestamp": "2021-03-12T09:07:13Z", "topic": "msg", "msg": "ACL receive", "data": "Sender: 1;Receiver: 3;Timestamp: 2021-03-12T09:07:13Z"}]
\ No newline at end of file
......@@ -117,6 +117,29 @@ func (cli *LoggerClient) GetLogsInRange(masID int, agentID int, topic string, st
return
}
// PostLogSeries posts new log series to logger
func (cli *LoggerClient) PostLogSeries(masID int, logs []schemas.LogSeries) (httpStatus int, err error) {
js, _ := json.Marshal(logs)
_, httpStatus, err = httpretry.Post(cli.httpClient, cli.prefix()+"/api/series/"+
strconv.Itoa(masID), "application/json", js, time.Second*2, 4)
return
}
// GetLogSeries gets log series with its name
func (cli *LoggerClient) GetLogSeries(masID int, agentID int) (series []schemas.LogSeries, httpStatus int, err error) {
var body []byte
body, httpStatus, err = httpretry.Get(cli.httpClient, cli.prefix()+"/api/series/"+
strconv.Itoa(masID)+"/"+strconv.Itoa(agentID), time.Second*2, 4)
if err != nil {
return
}
err = json.Unmarshal(body, &series)
if err != nil {
series = []schemas.LogSeries{}
}
return
}
// PutState updates the state
func (cli *LoggerClient) PutState(state schemas.State) (httpStatus int, err error) {
js, _ := json.Marshal(state)
......@@ -171,13 +194,52 @@ func NewLoggerClient(host string, port int, timeout time.Duration, del time.Dura
// LogCollector collects logs and states and sends them to the Logger service
// one LogCollector per agency is used; each agent obtains one AgentLogger
type LogCollector struct {
masID int
logIn chan schemas.LogMessage // logging inbox
stateIn chan schemas.State
client *LoggerClient
config schemas.LoggerConfig
logError *log.Logger
logInfo *log.Logger
masID int
logIn chan schemas.LogMessage // logging inbox
logSeriesIn chan schemas.LogSeries
stateIn chan schemas.State
client *LoggerClient
config schemas.LoggerConfig
logError *log.Logger
logInfo *log.Logger
}
// storeLogSeries periodically requests the logging service to store log series
func (logCol *LogCollector) storeLogSeries() (err error) {
if logCol.config.Active {
for {
if len(logCol.logSeriesIn) > 0 {
numSeries := len(logCol.logSeriesIn)
logSeries := make([]schemas.LogSeries, numSeries)
for i := 0; i < numSeries; i++ {
logMsg := <-logCol.logSeriesIn
logSeries[i] = logMsg
logSeries[i].MASID = logCol.masID
}
_, err = logCol.client.PostLogSeries(logCol.masID, logSeries)
if err != nil {
logCol.logError.Println(err)
for i := range logSeries {
logCol.logSeriesIn <- logSeries[i]
}
continue
}
}
tempTime := time.Now()
for {
time.Sleep(100 * time.Millisecond)
if time.Since(tempTime).Seconds() > 15 || len(logCol.logSeriesIn) > 50 {
break
}
}
}
} else {
for {
// print messages to stdout if logger is turned off
logMsg := <-logCol.logSeriesIn
logCol.logInfo.Println(logMsg)
}
}
}
// storeLogs periodically requests the logging service to store log messages
......@@ -275,8 +337,10 @@ func NewLogCollector(masID int, config schemas.LoggerConfig, logErr *log.Logger,
logCol.client = NewLoggerClient(config.Host, config.Port, time.Second*60, time.Second*1, 4)
}
logCol.logIn = make(chan schemas.LogMessage, 10000)
logCol.logSeriesIn = make(chan schemas.LogSeries, 10000)
logCol.stateIn = make(chan schemas.State, 10000)
go logCol.storeLogs()
go logCol.storeLogSeries()
go logCol.storeState()
logCol.logInfo.Println("Created new logger client; status: ", logCol.config.Active)
return
......@@ -284,15 +348,16 @@ func NewLogCollector(masID int, config schemas.LoggerConfig, logErr *log.Logger,
// AgentLogger is the endpoint for agents
type AgentLogger struct {
agentID int
masID int
client *LoggerClient
logOut chan schemas.LogMessage // logging inbox
stateOut chan schemas.State
mutex *sync.Mutex
logError *log.Logger
logInfo *log.Logger
active bool
agentID int
masID int
client *LoggerClient
logOut chan schemas.LogMessage // logging inbox
logSeriesOut chan schemas.LogSeries
stateOut chan schemas.State
mutex *sync.Mutex
logError *log.Logger
logInfo *log.Logger
active bool
}
// NewLog sends a new logging message to the logging service
......@@ -325,6 +390,29 @@ func (agLog *AgentLogger) NewLog(topic string, message string, data string) (err
return
}
// NewLogSeries sends a new logging series to
func (agLog *AgentLogger) NewLogSeries(name string, value int) (err error) {
if agLog == nil {
return
}
agLog.mutex.Lock()
if !agLog.active {
agLog.mutex.Unlock()
return errors.New("logger not active")
}
time.Sleep(time.Millisecond * 5)
tStamp := time.Now()
agLog.mutex.Unlock()
logSeries := schemas.LogSeries{
AgentID: agLog.agentID,
Timestamp: tStamp,
Name: name,
Value: value,
}
agLog.logSeriesOut <- logSeries
return
}
// UpdateState overrides the state stored in database
func (agLog *AgentLogger) UpdateState(state string) (err error) {
agLog.mutex.Lock()
......@@ -357,19 +445,20 @@ func (agLog *AgentLogger) RestoreState() (state string, err error) {
return
}
// NewAgentLogger craetes a new object of type AgentLogger
// NewAgentLogger creates a new object of type AgentLogger
func (logCol *LogCollector) NewAgentLogger(agentID int, logErr *log.Logger,
logInf *log.Logger) (agLog *AgentLogger) {
agLog = &AgentLogger{
agentID: agentID,
masID: logCol.masID,
client: logCol.client,
logOut: logCol.logIn,
stateOut: logCol.stateIn,
mutex: &sync.Mutex{},
logError: logErr,
logInfo: logInf,
active: logCol.config.Active,
agentID: agentID,
masID: logCol.masID,
client: logCol.client,
logOut: logCol.logIn,
logSeriesOut: logCol.logSeriesIn,
stateOut: logCol.stateIn,
mutex: &sync.Mutex{},
logError: logErr,
logInfo: logInf,
active: logCol.config.Active,
}
return
}
......
......@@ -221,6 +221,7 @@ func (fe *Frontend) server(port int) (serv *http.Server) {
s.Path("/df/{masid}/svc/desc/{desc}/node/{nodeid}/dist/{dist}").Methods("Get").HandlerFunc(fe.handleSvcWithDist)
// api for logger
s.Path("/logging/series/{masid}/{agentid}").Methods("GET").HandlerFunc(fe.handleGetLogSeries)
s.Path("/logging/{masid}/{agentid}/{topic}/latest/{num}").Methods("GET").HandlerFunc(fe.handleGetNLatestLogs)
s.Path("/logging/{masid}/list").Methods("POST").HandlerFunc(fe.handlePostLogs)
s.Path("/logging/{masid}/{agentid}/{topic}/time/{start}/{end}").Methods("GET").HandlerFunc(fe.handleGetLogsInRange)
......
......@@ -159,5 +159,26 @@ func (fe *Frontend) handleGetLogsInRange(w http.ResponseWriter, r *http.Request)
httpErr = httpreply.Resource(w, msgs, cmapErr)
fe.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
// handleGetLogSeries is the handler to /api/logging/series/{masid}/{agentid}
func (fe *Frontend) handleGetLogSeries(w http.ResponseWriter, r *http.Request) {
var cmapErr, httpErr error
masID, agentID, cmapErr := getAgentID(r)
if cmapErr != nil {
httpErr = httpreply.NotFoundError(w)
fe.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
var series []schemas.LogSeries
series, _, cmapErr = fe.logClient.GetLogSeries(masID, agentID)
if cmapErr != nil {
httpErr = httpreply.CMAPError(w, cmapErr.Error())
fe.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
httpErr = httpreply.Resource(w, series, cmapErr)
fe.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
......@@ -71,6 +71,7 @@ type cassStorage struct {
logDebugIn chan schemas.LogMessage // logging inbox
logMsgIn chan schemas.LogMessage // logging inbox
stateIn chan schemas.State // state inbox
logSeriesIn chan schemas.LogSeries // logging inbox
}
// addAgentLogMessage adds an entry to specified logging entry
......@@ -182,6 +183,30 @@ func (stor *cassStorage) getAgentLogMessagesInRange(masID int, agentID int, topi
return
}
// addAgentLogSeries add log series
func (stor *cassStorage) addAgentLogSeries(series schemas.LogSeries) {
stor.logSeriesIn <- series
return
}
// getAgentLogSeries get log series
func (stor *cassStorage) getAgentLogSeries(masID int, agentID int) (series []schemas.LogSeries, err error) {
var iter *gocql.Iter
iter = stor.session.Query("SELECT state FROM logging_series WHERE masid = ? AND agentid = ?", masID,
agentID).Iter()
var js []byte
for iter.Scan(&js) {
var logSeries schemas.LogSeries
err = json.Unmarshal(js, &logSeries)
if err != nil {
return
}
series = append(series, logSeries)
}
iter.Close()
return
}
// deleteAgentLogMessages deletes all log messages og an agent
func (stor *cassStorage) deleteAgentLogMessages(masID int, agentID int) (err error) {
......@@ -297,6 +322,45 @@ func (stor *cassStorage) storeLogs(topic string) {
}
}
// storeSeries stores the log series in a batch operation
func (stor *cassStorage) storeSeries() {
var err error
stmt := "INSERT INTO logging_series (masid, agentid, t, log) VALUES (?, ?, ?, ?)"
for {
batch := gocql.NewBatch(gocql.UnloggedBatch)
series := <-stor.logSeriesIn
var js []byte
js, err = json.Marshal(series)
if err != nil {
fmt.Println(err)
}
batch.Query(stmt, series.MASID, series.AgentID, series.Timestamp, js)
size := len(js)
for i := 0; i < 9; i++ {
// maximum of 10 operations in batch
if size > 25000 {
break
}
select {
case series = <-stor.logSeriesIn:
js, err = json.Marshal(series)
if err != nil {
fmt.Println(err)
}
batch.Query(stmt, series.MASID, series.AgentID, series.Timestamp, js)
size += len(js)
default:
break
}
}
err = stor.session.ExecuteBatch(batch)
if err != nil {
fmt.Println(err)
}
}
}
// storeState stores the state in a batch operation
func (stor *cassStorage) storeState() {
var err error
......@@ -361,6 +425,7 @@ func newCassandraStorage(ip []string, user string, pass string) (stor storage, e
temp.logErrorIn = make(chan schemas.LogMessage, 10000)
temp.logMsgIn = make(chan schemas.LogMessage, 10000)
temp.stateIn = make(chan schemas.State, 10000)
temp.logSeriesIn = make(chan schemas.LogSeries, 10000)
for i := 0; i < 3; i++ {
go temp.storeLogs("status")
......@@ -368,6 +433,7 @@ func newCassandraStorage(ip []string, user string, pass string) (stor storage, e
go temp.storeLogs("error")
go temp.storeLogs("debug")
go temp.storeLogs("msg")
go temp.storeSeries()
go temp.storeState()
}
stor = &temp
......
......@@ -163,6 +163,29 @@ func (logger *Logger) handlePostLogMsgList(w http.ResponseWriter, r *http.Reques
logger.logErrors(r.URL.Path, cmapErr, httpErr)
}
// handlePostLogSeries is the handler for post requests to path /api/series/{masid}/
func (logger *Logger) handlePostLogSeries(w http.ResponseWriter, r *http.Request) {
var cmapErr, httpErr error
// create new log message entry
var body []byte
body, cmapErr = ioutil.ReadAll(r.Body)
if cmapErr != nil {
httpErr = httpreply.NotFoundError(w)
logger.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
var logSeries []schemas.LogSeries
cmapErr = json.Unmarshal(body, &logSeries)
if cmapErr != nil {
httpErr = httpreply.JSONUnmarshalError(w)
logger.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
go logger.addAgentLogSeries(logSeries)
httpErr = httpreply.Created(w, nil, "text/plain", []byte("Ressource Created"))
logger.logErrors(r.URL.Path, cmapErr, httpErr)
}
// handleGetLogsLatest is the handler for requests to path
// /api/logging/{masid}/{agentid}/{topic}/latest/{num}
func (logger *Logger) handleGetLogsLatest(w http.ResponseWriter, r *http.Request) {
......@@ -228,6 +251,26 @@ func (logger *Logger) handleGetLogsTime(w http.ResponseWriter, r *http.Request)
logger.logErrors(r.URL.Path, cmapErr, httpErr)
}
// handleGetLogSeries is the handler for get requests to path /api/series/{masid}/{agentid}/{name}
func (logger *Logger) handleGetLogSeries(w http.ResponseWriter, r *http.Request) {
var cmapErr, httpErr error
masID, agentID, cmapErr := getAgentID(r)
if cmapErr != nil {
httpErr = httpreply.NotFoundError(w)
logger.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
var logSeries []schemas.LogSeries
logSeries, cmapErr = logger.getAgentLogSeries(masID, agentID)
if cmapErr != nil {
httpErr = httpreply.CMAPError(w, cmapErr.Error())
logger.logErrors(r.URL.Path, cmapErr, httpErr)
return
}
httpErr = httpreply.Resource(w, logSeries, cmapErr)
logger.logErrors(r.URL.Path, cmapErr, httpErr)
}
// handleGetState is the handler for get requests to path /api/state/{masid}/{agentid}
func (logger *Logger) handleGetState(w http.ResponseWriter, r *http.Request) {
var cmapErr, httpErr error
......@@ -378,6 +421,10 @@ func (logger *Logger) server(port int) (serv *http.Server) {
HandlerFunc(logger.handleGetLogsTime)
s.Path("/logging/{masid}/{agentid}/{topic}/time/{start}/{end}").
Methods("POST", "PUT", "DELETE").HandlerFunc(logger.methodNotAllowed)
s.Path("/series/{masid}/{agentid}").Methods("GET").HandlerFunc(logger.handleGetLogSeries)
s.Path("/series/{masid}").Methods("POST").HandlerFunc(logger.handlePostLogSeries)
s.Path("/state/{masid}/{agentid}").Methods("GET").HandlerFunc(logger.handleGetState)
s.Path("/state/{masid}/{agentid}").Methods("PUT").HandlerFunc(logger.handlePutState)
s.Path("/state/{masid}/{agentid}").Methods("POST", "DELETE").
......
......@@ -129,6 +129,13 @@ func (logger *Logger) addAgentLogMessageList(logmsg []schemas.LogMessage) (err e
return
}
// addAgentLogSeries
func (logger *Logger) addAgentLogSeries(logseries []schemas.LogSeries) {
for i := 0; i < len(logseries); i++ {
logger.stor.addAgentLogSeries(logseries[i])
}
}
// getLatestAgentLogMessages return the latest num log messages
func (logger *Logger) getLatestAgentLogMessages(masID int, agentID int, topic string,
num int) (logs []schemas.LogMessage, err error) {
......@@ -143,6 +150,12 @@ func (logger *Logger) getAgentLogMessagesInRange(masID int, agentID int, topic s
return
}
// getLogSeries return all the log series of a specific name
func (logger *Logger) getAgentLogSeries(masID int, agentID int) (series []schemas.LogSeries, err error) {
series, err = logger.stor.getAgentLogSeries(masID, agentID)
return
}
// updateCommunication updates communication data of agent
func (logger *Logger) updateCommunication(masID int, agentID int,
comm []schemas.Communication) (err error) {
......
......@@ -66,6 +66,12 @@ type storage interface {
getAgentLogMessagesInRange(masID int, agentID int, topic string, start time.Time,
end time.Time) (logs []schemas.LogMessage, err error)
// addAgentLogSeries add the log series
addAgentLogSeries(series schemas.LogSeries)
// getAgentLogSeries get the log series
getAgentLogSeries(masID int, agentID int) (series []schemas.LogSeries, err error)
// deleteAgentLogMessages deletes all log messages og an agent
deleteAgentLogMessages(masID int, agentID int) (err error)
......@@ -103,13 +109,14 @@ type masStorage struct {
}
type agentStorage struct {
errLogs []schemas.LogMessage
dbgLogs []schemas.LogMessage
msgLogs []schemas.LogMessage
statLogs []schemas.LogMessage
appLogs []schemas.LogMessage
state schemas.State
commData []schemas.Communication
errLogs []schemas.LogMessage
dbgLogs []schemas.LogMessage
msgLogs []schemas.LogMessage
statLogs []schemas.LogMessage
appLogs []schemas.LogMessage
logSeries []schemas.LogSeries
state schemas.State
commData []schemas.Communication
}
// addAgentLogMessage adds an entry to specified logging entry
......@@ -298,6 +305,40 @@ func (stor *localStorage) getAgentLogMessagesInRange(masID int, agentID int, top
return
}
// addAgentLogSeries add the log series
func (stor *localStorage) addAgentLogSeries(series schemas.LogSeries) {
stor.mutex.Lock()
numMAS := len(stor.mas)
if numMAS <= series.MASID {
for i := 0; i < series.MASID-numMAS+1; i++ {
stor.mas = append(stor.mas, masStorage{})
}
}
numAgents := len(stor.mas[series.MASID].agents)
if numAgents <= series.AgentID {
for i := 0; i < series.AgentID-numAgents+1; i++ {
stor.mas[series.MASID].agents = append(stor.mas[series.MASID].agents, agentStorage{})
}
}
stor.mas[series.MASID].agents[series.AgentID].logSeries = append(stor.mas[series.MASID].agents[series.AgentID].logSeries, series)
stor.mutex.Unlock()
return
}
// getAgentLogSeries return the log series
func (stor *localStorage) getAgentLogSeries(masID int, agentID int)