Commit 0dedd16b authored by Stefan Dähling's avatar Stefan Dähling
Browse files

using batch operations

parent b74dbcfe
Pipeline #344402 passed with stages
in 1 minute and 2 seconds
......@@ -47,39 +47,58 @@ package logger
import (
"encoding/json"
"errors"
"fmt"
"time"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
"github.com/gocql/gocql"
)
// logEnvelope contains log and agentid masid
// type logEnvelope struct {
// masID int
// agentID int
// log schemas.LogMessage
// }
// cassStorage stores information regarding connection to cassandra cluster
type cassStorage struct {
cluster *gocql.ClusterConfig
session *gocql.Session
cluster *gocql.ClusterConfig
session *gocql.Session
logStatusIn chan schemas.LogMessage // logging inbox
logAppIn chan schemas.LogMessage // logging inbox
logErrorIn chan schemas.LogMessage // logging inbox
logDebugIn chan schemas.LogMessage // logging inbox
logMsgIn chan schemas.LogMessage // logging inbox
stateIn chan schemas.State // state inbox
}
// addAgentLogMessage adds an entry to specified logging entry
func (stor *cassStorage) addAgentLogMessage(masID int, agentID int, logType string,
log LogMessage) (err error) {
var js []byte
js, err = json.Marshal(log)
switch logType {
func (stor *cassStorage) addAgentLogMessage(log schemas.LogMessage) (err error) {
// var js []byte
// js, err = json.Marshal(log)
switch log.LogType {
case "error":
err = stor.session.Query("INSERT INTO logging_error (masid, agentid, t, log) "+
"VALUES (?, ?, ?, ?)", masID, agentID, log.Timestamp, js).Exec()
stor.logErrorIn <- log
// err = stor.session.Query("INSERT INTO logging_error (masid, agentid, t, log) "+
// "VALUES (?, ?, ?, ?)", masID, agentID, log.Timestamp, js).Exec()
case "debug":
err = stor.session.Query("INSERT INTO logging_debug (masid, agentid, t, log) "+
"VALUES (?, ?, ?, ?)", masID, agentID, log.Timestamp, js).Exec()
stor.logDebugIn <- log
// err = stor.session.Query("INSERT INTO logging_debug (masid, agentid, t, log) "+
// "VALUES (?, ?, ?, ?)", masID, agentID, log.Timestamp, js).Exec()
case "msg":
err = stor.session.Query("INSERT INTO logging_msg (masid, agentid, t, log) "+
"VALUES (?, ?, ?, ?)", masID, agentID, log.Timestamp, js).Exec()
stor.logMsgIn <- log
// err = stor.session.Query("INSERT INTO logging_msg (masid, agentid, t, log) "+
// "VALUES (?, ?, ?, ?)", masID, agentID, log.Timestamp, js).Exec()
case "status":
err = stor.session.Query("INSERT INTO logging_status (masid, agentid, t, log) "+
"VALUES (?, ?, ?, ?)", masID, agentID, log.Timestamp, js).Exec()
stor.logStatusIn <- log
// err = stor.session.Query("INSERT INTO logging_status (masid, agentid, t, log) "+
// "VALUES (?, ?, ?, ?)", masID, agentID, log.Timestamp, js).Exec()
case "app":
err = stor.session.Query("INSERT INTO logging_app (masid, agentid, t, log) "+
"VALUES (?, ?, ?, ?)", masID, agentID, log.Timestamp, js).Exec()
stor.logAppIn <- log
// err = stor.session.Query("INSERT INTO logging_app (masid, agentid, t, log) "+
// "VALUES (?, ?, ?, ?)", masID, agentID, log.Timestamp, js).Exec()
default:
err = errors.New("WrongLogType")
}
......@@ -88,7 +107,7 @@ func (stor *cassStorage) addAgentLogMessage(masID int, agentID int, logType stri
// getLatestAgentLogMessages return the latest num log messages
func (stor *cassStorage) getLatestAgentLogMessages(masID int, agentID int, logtype string,
num int) (logs []LogMessage, err error) {
num int) (logs []schemas.LogMessage, err error) {
var iter *gocql.Iter
switch logtype {
case "error":
......@@ -114,7 +133,7 @@ func (stor *cassStorage) getLatestAgentLogMessages(masID int, agentID int, logty
}
var js []byte
for iter.Scan(&js) {
var logmsg LogMessage
var logmsg schemas.LogMessage
err = json.Unmarshal(js, &logmsg)
if err != nil {
return
......@@ -126,7 +145,7 @@ func (stor *cassStorage) getLatestAgentLogMessages(masID int, agentID int, logty
// getAgentLogMessagesInRange return the log messages in the specified time range
func (stor *cassStorage) getAgentLogMessagesInRange(masID int, agentID int, logtype string,
start time.Time, end time.Time) (logs []LogMessage, err error) {
start time.Time, end time.Time) (logs []schemas.LogMessage, err error) {
var iter *gocql.Iter
switch logtype {
case "error":
......@@ -152,7 +171,7 @@ func (stor *cassStorage) getAgentLogMessagesInRange(masID int, agentID int, logt
}
var js []byte
for iter.Scan(&js) {
var logmsg LogMessage
var logmsg schemas.LogMessage
err = json.Unmarshal(js, &logmsg)
if err != nil {
return
......@@ -198,22 +217,26 @@ func (stor *cassStorage) getCommunication(masID int,
}
// updateAgentState updates the agent status
func (stor *cassStorage) updateAgentState(masID int, agentID int, state []byte) (err error) {
err = stor.session.Query("INSERT INTO state (masid, agentid, state) "+
"VALUES (?, ?, ?)", masID, agentID, state).Exec()
func (stor *cassStorage) updateAgentState(masID int, agentID int, state schemas.State) (err error) {
stor.stateIn <- state
// err = stor.session.Query("INSERT INTO state (masid, agentid, state) "+
// "VALUES (?, ?, ?)", masID, agentID, state).Exec()
return
}
// getAgentState return the latest agent status
func (stor *cassStorage) getAgentState(masID int, agentID int) (state []byte, err error) {
func (stor *cassStorage) getAgentState(masID int, agentID int) (state schemas.State, err error) {
var iter *gocql.Iter
var js []byte
iter = stor.session.Query("SELECT state FROM state WHERE masid = ? AND agentid = ?", masID,
agentID).Iter()
if iter.NumRows() == 1 {
iter.Scan(&state)
iter.Scan(&js)
err = json.Unmarshal(js, &state)
}
iter.Close()
return
}
// deleteAgentState deletes the status of an agent
......@@ -222,6 +245,98 @@ func (stor *cassStorage) deleteAgentState(masID int, agentID int) (err error) {
return
}
// storeLogs stores the logs in a batch operation
func (stor *cassStorage) storeLogs(topic string) {
var logIn chan schemas.LogMessage
var err error
stmt := "INSERT INTO logging_" + topic + " (masid, agentid, t, log) VALUES (?, ?, ?, ?)"
if topic == "status" {
logIn = stor.logStatusIn
} else if topic == "app" {
logIn = stor.logAppIn
} else if topic == "error" {
logIn = stor.logErrorIn
} else if topic == "debug" {
logIn = stor.logDebugIn
} else if topic == "msg" {
logIn = stor.logMsgIn
} else {
return
}
for {
batch := gocql.NewBatch(gocql.UnloggedBatch)
log := <-logIn
var js []byte
js, err = json.Marshal(log)
if err != nil {
fmt.Println(err)
}
batch.Query(stmt, log.MASID, log.AgentID, log.Timestamp, js)
size := len(js)
for i := 0; i < 9; i++ {
// maximum of 10 operations in batch
if size > 25000 {
break
}
select {
case log = <-logIn:
js, err = json.Marshal(log)
if err != nil {
fmt.Println(err)
}
batch.Query(stmt, log.MASID, log.AgentID, log.Timestamp, js)
size += len(js)
default:
break
}
}
err = stor.session.ExecuteBatch(batch)
if err != nil {
fmt.Println(err)
}
}
}
// storeState stores the state in a batch operation
func (stor *cassStorage) storeState() {
var err error
stmt := "INSERT INTO state (masid, agentid, state) VALUES (?, ?, ?)"
for {
batch := gocql.NewBatch(gocql.UnloggedBatch)
state := <-stor.stateIn
var js []byte
js, err = json.Marshal(state)
if err != nil {
fmt.Println(err)
}
batch.Query(stmt, state.MASID, state.AgentID, js)
size := len(js)
for i := 0; i < 9; i++ {
// maximum of 10 operations in batch
if size > 25000 {
break
}
select {
case state = <-stor.stateIn:
js, err = json.Marshal(state)
if err != nil {
fmt.Println(err)
}
batch.Query(stmt, state.MASID, state.AgentID, js)
size += len(js)
default:
break
}
}
err = stor.session.ExecuteBatch(batch)
if err != nil {
fmt.Println(err)
}
}
}
func (stor *cassStorage) disconnect() {
stor.session.Close()
}
......@@ -241,6 +356,21 @@ func newCassandraStorage(ip []string, user string, pass string) (stor storage, e
if err != nil {
return
}
temp.logStatusIn = make(chan schemas.LogMessage, 10000)
temp.logAppIn = make(chan schemas.LogMessage, 10000)
temp.logDebugIn = make(chan schemas.LogMessage, 10000)
temp.logErrorIn = make(chan schemas.LogMessage, 10000)
temp.logMsgIn = make(chan schemas.LogMessage, 10000)
temp.stateIn = make(chan schemas.State, 10000)
for i := 0; i < 3; i++ {
go temp.storeLogs("status")
go temp.storeLogs("app")
go temp.storeLogs("error")
go temp.storeLogs("debug")
go temp.storeLogs("msg")
go temp.storeState()
}
stor = &temp
return
}
......@@ -298,17 +298,21 @@ func (logger *Logger) handleLogsTime(masID int, agentid int, logType string, sta
func (logger *Logger) handleState(masID int, agentid int, w http.ResponseWriter,
r *http.Request) (cmapErr, httpErr error) {
if r.Method == "GET" {
var state []byte
var state schemas.State
state, cmapErr = logger.getAgentState(masID, agentid)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, httpErr = w.Write(state)
httpErr = httpreply.Resource(w, state, cmapErr)
} else if r.Method == "PUT" {
var body []byte
body, cmapErr = ioutil.ReadAll(r.Body)
if cmapErr == nil {
go logger.updateAgentState(masID, agentid, body)
httpErr = httpreply.Updated(w, nil)
var state schemas.State
cmapErr = json.Unmarshal(body, &state)
if cmapErr == nil {
go logger.updateAgentState(masID, agentid, state)
httpErr = httpreply.Updated(w, nil)
} else {
httpErr = httpreply.JSONUnmarshalError(w)
}
} else {
httpErr = httpreply.InvalidBodyError(w)
}
......@@ -317,6 +321,7 @@ func (logger *Logger) handleState(masID int, agentid int, w http.ResponseWriter,
cmapErr = errors.New("Error: Method not allowed on path /api/state/{mas-id}/{agent-id}")
}
return
}
// listen opens a http server listening and serving request
......
......@@ -110,11 +110,7 @@ func (logger *Logger) init() (err error) {
// addAgentLogMessage adds an entry to specified logging entry
func (logger *Logger) addAgentLogMessage(logmsg schemas.LogMessage) (err error) {
log := LogMessage{
Timestamp: logmsg.Timestamp,
Message: logmsg.Message,
Data: logmsg.AdditionalData}
err = logger.stor.addAgentLogMessage(logmsg.MASID, logmsg.AgentID, logmsg.LogType, log)
err = logger.stor.addAgentLogMessage(logmsg)
return
}
......@@ -132,42 +128,14 @@ func (logger *Logger) addAgentLogMessageList(logmsg []schemas.LogMessage) (err e
// getLatestAgentLogMessages return the latest num log messages
func (logger *Logger) getLatestAgentLogMessages(masID int, agentID int, logtype string,
num int) (logs []schemas.LogMessage, err error) {
var storLogs []LogMessage
storLogs, err = logger.stor.getLatestAgentLogMessages(masID, agentID, logtype, num)
if err != nil {
return
}
for i := range storLogs {
logtemp := schemas.LogMessage{
MASID: masID,
AgentID: agentID,
Timestamp: storLogs[i].Timestamp,
LogType: logtype,
Message: storLogs[i].Message,
AdditionalData: storLogs[i].Data}
logs = append(logs, logtemp)
}
logs, err = logger.stor.getLatestAgentLogMessages(masID, agentID, logtype, num)
return
}
// getAgentLogMessagesInRange return the log messages in the specified time range
func (logger *Logger) getAgentLogMessagesInRange(masID int, agentID int, logtype string,
start time.Time, end time.Time) (logs []schemas.LogMessage, err error) {
var storLogs []LogMessage
storLogs, err = logger.stor.getAgentLogMessagesInRange(masID, agentID, logtype, start, end)
if err != nil {
return
}
for i := range storLogs {
logtemp := schemas.LogMessage{
MASID: masID,
AgentID: agentID,
Timestamp: storLogs[i].Timestamp,
LogType: logtype,
Message: storLogs[i].Message,
AdditionalData: storLogs[i].Data}
logs = append(logs, logtemp)
}
logs, err = logger.stor.getAgentLogMessagesInRange(masID, agentID, logtype, start, end)
return
}
......@@ -185,13 +153,13 @@ func (logger *Logger) getCommunication(masID int, agentID int) (comm []schemas.C
}
// getAgentState returns the latest agent state
func (logger *Logger) getAgentState(masID int, agentID int) (agState []byte, err error) {
func (logger *Logger) getAgentState(masID int, agentID int) (agState schemas.State, err error) {
agState, err = logger.stor.getAgentState(masID, agentID)
return
}
// updateAgentState updates agent state
func (logger *Logger) updateAgentState(masID int, agentID int, agState []byte) (err error) {
func (logger *Logger) updateAgentState(masID int, agentID int, agState schemas.State) (err error) {
err = logger.stor.updateAgentState(masID, agentID, agState)
return
}
......@@ -56,15 +56,15 @@ import (
// storage is the interface for logging and state storage (either local or in db)
type storage interface {
// addAgentLogMessage adds an entry to specified logging entry
addAgentLogMessage(masID int, agentID int, logType string, log LogMessage) (err error)
addAgentLogMessage(log schemas.LogMessage) (err error)
// getLatestAgentLogMessages return the latest num log messages
getLatestAgentLogMessages(masID int, agentID int, logtype string,
num int) (logs []LogMessage, err error)
num int) (logs []schemas.LogMessage, err error)
// getAgentLogMessagesInRange return the log messages in the specified time range
getAgentLogMessagesInRange(masID int, agentID int, logtype string, start time.Time,
end time.Time) (logs []LogMessage, err error)
end time.Time) (logs []schemas.LogMessage, err error)
// deleteAgentLogMessages deletes all log messages og an agent
deleteAgentLogMessages(masID int, agentID int) (err error)
......@@ -76,21 +76,21 @@ type storage interface {
getCommunication(masID int, agentID int) (commData []schemas.Communication, err error)
// updateAgentState updates the agent status
updateAgentState(masID int, agentID int, satte []byte) (err error)
updateAgentState(masID int, agentID int, state schemas.State) (err error)
// getAgentState return the latest agent status
getAgentState(masID int, agentID int) (state []byte, err error)
getAgentState(masID int, agentID int) (state schemas.State, err error)
// deleteAgentState deletes the status of an agent
deleteAgentState(masID int, agentID int) (err error)
}
// LogMessage contains the content of a single log message
type LogMessage struct {
Timestamp time.Time
Message string
Data string
}
// type LogMessage struct {
// Timestamp time.Time
// Message string
// Data string
// }
// IDs of agents and mas correspond to index in slices!
type localStorage struct {
......@@ -103,46 +103,45 @@ type masStorage struct {
}
type agentStorage struct {
errLogs []LogMessage
dbgLogs []LogMessage
msgLogs []LogMessage
statLogs []LogMessage
appLogs []LogMessage
state []byte
errLogs []schemas.LogMessage
dbgLogs []schemas.LogMessage
msgLogs []schemas.LogMessage
statLogs []schemas.LogMessage
appLogs []schemas.LogMessage
state schemas.State
commData []schemas.Communication
}
// addAgentLogMessage adds an entry to specified logging entry
func (stor *localStorage) addAgentLogMessage(masID int, agentID int, logType string,
log LogMessage) (err error) {
func (stor *localStorage) addAgentLogMessage(log schemas.LogMessage) (err error) {
stor.mutex.Lock()
numMAS := len(stor.mas)
if numMAS <= masID {
for i := 0; i < masID-numMAS+1; i++ {
if numMAS <= log.MASID {
for i := 0; i < log.MASID-numMAS+1; i++ {
stor.mas = append(stor.mas, masStorage{})
}
}
numAgents := len(stor.mas[masID].agents)
if numAgents <= agentID {
for i := 0; i < agentID-numAgents+1; i++ {
stor.mas[masID].agents = append(stor.mas[masID].agents, agentStorage{})
numAgents := len(stor.mas[log.MASID].agents)
if numAgents <= log.AgentID {
for i := 0; i < log.AgentID-numAgents+1; i++ {
stor.mas[log.MASID].agents = append(stor.mas[log.MASID].agents, agentStorage{})
}
}
switch logType {
switch log.LogType {
case "error":
stor.mas[masID].agents[agentID].errLogs = append(stor.mas[masID].agents[agentID].errLogs,
stor.mas[log.MASID].agents[log.AgentID].errLogs = append(stor.mas[log.MASID].agents[log.AgentID].errLogs,
log)
case "debug":
stor.mas[masID].agents[agentID].dbgLogs = append(stor.mas[masID].agents[agentID].dbgLogs,
stor.mas[log.MASID].agents[log.AgentID].dbgLogs = append(stor.mas[log.MASID].agents[log.AgentID].dbgLogs,
log)
case "msg":
stor.mas[masID].agents[agentID].msgLogs = append(stor.mas[masID].agents[agentID].msgLogs,
stor.mas[log.MASID].agents[log.AgentID].msgLogs = append(stor.mas[log.MASID].agents[log.AgentID].msgLogs,
log)
case "status":
stor.mas[masID].agents[agentID].statLogs = append(stor.mas[masID].agents[agentID].statLogs,
stor.mas[log.MASID].agents[log.AgentID].statLogs = append(stor.mas[log.MASID].agents[log.AgentID].statLogs,
log)
case "app":
stor.mas[masID].agents[agentID].appLogs = append(stor.mas[masID].agents[agentID].appLogs,
stor.mas[log.MASID].agents[log.AgentID].appLogs = append(stor.mas[log.MASID].agents[log.AgentID].appLogs,
log)
default:
err = errors.New("WrongLogType")
......@@ -153,7 +152,7 @@ func (stor *localStorage) addAgentLogMessage(masID int, agentID int, logType str
// getLatestAgentLogMessages return the latest num log messages
func (stor *localStorage) getLatestAgentLogMessages(masID int, agentID int, logtype string,
num int) (logs []LogMessage, err error) {
num int) (logs []schemas.LogMessage, err error) {
stor.mutex.Lock()
if masID < len(stor.mas) {
if agentID < len(stor.mas[masID].agents) {
......@@ -163,35 +162,35 @@ func (stor *localStorage) getLatestAgentLogMessages(masID int, agentID int, logt
if length < num {
num = length
}
logs = make([]LogMessage, num, num)
logs = make([]schemas.LogMessage, num, num)
copy(logs, stor.mas[masID].agents[agentID].errLogs[length-num:length])
case "debug":
length := len(stor.mas[masID].agents[agentID].dbgLogs)
if length < num {
num = length
}
logs = make([]LogMessage, num, num)
logs = make([]schemas.LogMessage, num, num)
copy(logs, stor.mas[masID].agents[agentID].dbgLogs[length-num:length])
case "msg":
length := len(stor.mas[masID].agents[agentID].msgLogs)
if length < num {
num = length
}
logs = make([]LogMessage, num, num)
logs = make([]schemas.LogMessage, num, num)
copy(logs, stor.mas[masID].agents[agentID].msgLogs[length-num:length])
case "status":
length := len(stor.mas[masID].agents[agentID].statLogs)
if length < num {
num = length
}
logs = make([]LogMessage, num, num)
logs = make([]schemas.LogMessage, num, num)
copy(logs, stor.mas[masID].agents[agentID].statLogs[length-num:length])
case "app":
length := len(stor.mas[masID].agents[agentID].appLogs)
if length < num {
num = length
}
logs = make([]LogMessage, num, num)
logs = make([]schemas.LogMessage, num, num)
copy(logs, stor.mas[masID].agents[agentID].appLogs[length-num:length])
default:
err = errors.New("WrongLogType")
......@@ -204,7 +203,7 @@ func (stor *localStorage) getLatestAgentLogMessages(masID int, agentID int, logt
// getAgentLogMessagesInRange return the log messages in the specified time range
func (stor *localStorage) getAgentLogMessagesInRange(masID int, agentID int, logtype string,
start time.Time, end time.Time) (logs []LogMessage, err error) {
start time.Time, end time.Time) (logs []schemas.LogMessage, err error) {
stor.mutex.Lock()
if masID < len(stor.mas) {
if agentID < len(stor.mas[masID].agents) {
......@@ -221,7 +220,7 @@ func (stor *localStorage) getAgentLogMessagesInRange(masID int, agentID int, log
return end.After(stor.mas[masID].agents[agentID].errLogs[i].Timestamp)
})
if endIndex >= 0 {
logs = make([]LogMessage, endIndex-startIndex, endIndex-startIndex)
logs = make([]schemas.LogMessage, endIndex-startIndex, endIndex-startIndex)
copy(logs, stor.mas[masID].agents[agentID].errLogs[startIndex:endIndex])
}
}
......@@ -237,7 +236,7 @@ func (stor *localStorage) getAgentLogMessagesInRange(masID int, agentID int, log
return end.After(stor.mas[masID].agents[agentID].dbgLogs[i].Timestamp)
})
if endIndex >= 0 {
logs = make([]LogMessage, endIndex-startIndex, endIndex-startIndex)
logs = make([]schemas.LogMessage, endIndex-startIndex, endIndex-startIndex)
copy(logs, stor.mas[masID].agents[agentID].dbgLogs[startIndex:endIndex])
}
}
......@@ -253,7 +252,7 @@ func (stor *localStorage) getAgentLogMessagesInRange(masID int, agentID int, log
return end.After(stor.mas[masID].agents[agentID].msgLogs[i].Timestamp)
})
if endIndex >= 0 {
logs = make([]LogMessage, endIndex-startIndex, endIndex-startIndex)
logs = make([]schemas.LogMessage, endIndex-startIndex, endIndex-startIndex)
copy(logs, stor.mas[masID].agents[agentID].msgLogs[startIndex:endIndex])
}
}
......@@ -270,7 +269,7 @@ func (stor *localStorage) getAgentLogMessagesInRange(masID int, agentID int, log
return end.After(stor.mas[masID].agents[agentID].statLogs[i].Timestamp)
})
if endIndex >= 0 {
logs = make([]LogMessage, endIndex-startIndex, endIndex-startIndex)
logs = make([]schemas.LogMessage, endIndex-startIndex, endIndex-startIndex)
copy(logs, stor.mas[masID].agents[agentID].statLogs[startIndex:endIndex])
}
}
......@@ -286,7 +285,7 @@ func (stor *localStorage) getAgentLogMessagesInRange(masID int, agentID int, log
return stor.mas[masID].agents[agentID].appLogs[i].Timestamp.After(end)
})
if endIndex >= 0 {
logs = make([]LogMessage, endIndex-startIndex, endIndex-startIndex)
logs = make([]schemas.LogMessage, endIndex-startIndex, endIndex-startIndex)
copy(logs, stor.mas[masID].agents[agentID].appLogs[startIndex:endIndex])
}
}
......@@ -350,7 +349,7 @@ func (stor *localStorage) getCommunication(masID int,
}
// updateAgentState updates the agent status
func (stor *localStorage) updateAgentState(masID int, agentID int, state []byte) (err error) {
func (stor *localStorage) updateAgentState(masID int, agentID int, state schemas.State) (err error) {