Commit 0b65e036 authored by Stefan Dähling's avatar Stefan Dähling
Browse files

started to refactor logger client code

parent ce724983
Pipeline #417728 passed with stages
in 5 minutes and 38 seconds
......@@ -19,7 +19,7 @@
"imagegroups":[
{
"config":{
"image":"cmapytest"
"image":"agency"
},
"agents":[
{
......
......@@ -163,7 +163,7 @@ type logHandler struct {
logIn chan schemas.LogMessage // logging inbox
stateIn chan schemas.State
active bool // indicates if logging is active (switch via env)
client *logclient.Client
client *logclient.LoggerClient
logError *log.Logger
logInfo *log.Logger
}
......@@ -241,8 +241,8 @@ func newLogHandler(masID int, logErr *log.Logger, logInf *log.Logger) (log *logH
active: false,
logError: logErr,
logInfo: logInf,
client: logclient.New(time.Second*60, time.Second*1, 4),
}
log.client, _ = logclient.NewLoggerClient("logger", 11000, time.Second*60, time.Second*1, 4)
temp := os.Getenv("CLONEMAP_LOGGING")
if temp == "ON" {
log.active = true
......
......@@ -61,7 +61,7 @@ import (
type Frontend struct {
amsClient *amsclient.Client
dfClient *dfclient.Client
logClient *logclient.Client
logClient *logclient.LoggerClient
logInfo *log.Logger // logger for info logging
logError *log.Logger // logger for error logging
}
......@@ -71,9 +71,9 @@ func StartFrontend() (err error) {
fe := &Frontend{
amsClient: amsclient.New(time.Second*60, time.Second*1, 4),
dfClient: dfclient.New(time.Second*60, time.Second*1, 4),
logClient: logclient.New(time.Second*60, time.Second*1, 4),
logError: log.New(os.Stderr, "[ERROR] ", log.LstdFlags),
}
fe.logClient, _ = logclient.NewLoggerClient("logger", 11000, time.Second*60, time.Second*1, 4)
logType := os.Getenv("CLONEMAP_LOG_LEVEL")
switch logType {
case "info":
......
......@@ -47,25 +47,28 @@ package client
import (
"encoding/json"
"errors"
"log"
"net/http"
"strconv"
"sync"
"time"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/common/httpretry"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
)
// Client is the ams client
type Client struct {
// LoggerClient is the ams client
type LoggerClient struct {
httpClient *http.Client // http client
Host string // ams host name
Port int // ams port
host string // ams host name
port int // ams port
delay time.Duration // delay between two retries
numRetries int // number of retries
}
// Alive tests if alive
func (cli *Client) Alive() (alive bool) {
func (cli *LoggerClient) Alive() (alive bool) {
alive = false
_, httpStatus, err := httpretry.Get(cli.httpClient, cli.prefix()+"/api/alive", time.Second*2, 2)
if err == nil && httpStatus == http.StatusOK {
......@@ -75,7 +78,7 @@ func (cli *Client) Alive() (alive bool) {
}
// PostLogs posts new log messages to logger
func (cli *Client) PostLogs(masID int, logs []schemas.LogMessage) (httpStatus int, err error) {
func (cli *LoggerClient) PostLogs(masID int, logs []schemas.LogMessage) (httpStatus int, err error) {
js, _ := json.Marshal(logs)
_, httpStatus, err = httpretry.Post(cli.httpClient, cli.prefix()+"/api/logging/"+
strconv.Itoa(masID)+"/list", "application/json", js, time.Second*2, 4)
......@@ -83,7 +86,7 @@ func (cli *Client) PostLogs(masID int, logs []schemas.LogMessage) (httpStatus in
}
// GetLatestLogs gets log messages
func (cli *Client) GetLatestLogs(masID int, agentID int, topic string,
func (cli *LoggerClient) GetLatestLogs(masID int, agentID int, topic string,
num int) (msgs []schemas.LogMessage, httpStatus int, err error) {
var body []byte
body, httpStatus, err = httpretry.Get(cli.httpClient, cli.prefix()+"/api/logging/"+
......@@ -100,7 +103,7 @@ func (cli *Client) GetLatestLogs(masID int, agentID int, topic string,
}
// PutState updates the state
func (cli *Client) PutState(state schemas.State) (httpStatus int, err error) {
func (cli *LoggerClient) PutState(state schemas.State) (httpStatus int, err error) {
js, _ := json.Marshal(state)
_, httpStatus, err = httpretry.Put(cli.httpClient, cli.prefix()+"/api/state/"+
strconv.Itoa(state.MASID)+"/"+strconv.Itoa(state.AgentID), js,
......@@ -109,7 +112,7 @@ func (cli *Client) PutState(state schemas.State) (httpStatus int, err error) {
}
// UpdateStates updates the state
func (cli *Client) UpdateStates(masID int, states []schemas.State) (httpStatus int, err error) {
func (cli *LoggerClient) UpdateStates(masID int, states []schemas.State) (httpStatus int, err error) {
js, _ := json.Marshal(states)
_, httpStatus, err = httpretry.Put(cli.httpClient, cli.prefix()+"/api/state/"+
strconv.Itoa(masID)+"/list", js, time.Second*2, 4)
......@@ -117,7 +120,7 @@ func (cli *Client) UpdateStates(masID int, states []schemas.State) (httpStatus i
}
// GetState requests state from logger
func (cli *Client) GetState(masID int, agentID int) (state schemas.State, httpStatus int,
func (cli *LoggerClient) GetState(masID int, agentID int) (state schemas.State, httpStatus int,
err error) {
var body []byte
body, httpStatus, err = httpretry.Get(cli.httpClient, cli.prefix()+"/api/state/"+
......@@ -132,19 +135,239 @@ func (cli *Client) GetState(masID int, agentID int) (state schemas.State, httpSt
return
}
func (cli *Client) prefix() (ret string) {
ret = "http://" + cli.Host + ":" + strconv.Itoa(cli.Port)
func (cli *LoggerClient) prefix() (ret string) {
ret = "http://" + cli.host + ":" + strconv.Itoa(cli.port)
return
}
// New creates a new AMS client
func New(timeout time.Duration, del time.Duration, numRet int) (cli *Client) {
cli = &Client{
// NewLoggerClient creates a new Logger client
func NewLoggerClient(host string, port int, timeout time.Duration, del time.Duration,
numRet int) (cli *LoggerClient, err error) {
cli = &LoggerClient{
httpClient: &http.Client{Timeout: timeout},
Host: "logger",
Port: 11000,
host: host,
port: port,
delay: del,
numRetries: numRet,
}
if !cli.Alive() {
err = errors.New("Logger Module is not running on " + host + ":" + strconv.Itoa(port))
}
return
}
// LogCollector collects logs and states and sends them to the Logger service
// one LogCollector per agency is used; each agent obtains one AgentLogger
type LogCollector struct {
masID int
logIn chan schemas.LogMessage // logging inbox
stateIn chan schemas.State
client *LoggerClient
config schemas.LoggerConfig
logError *log.Logger
logInfo *log.Logger
}
// storeLogs periodically requests the logging service to store log messages
func (logCol *LogCollector) storeLogs() (err error) {
if logCol.config.Active {
for {
if len(logCol.logIn) > 0 {
numMsg := len(logCol.logIn)
logMsgs := make([]schemas.LogMessage, numMsg, numMsg)
index := 0
for i := 0; i < numMsg; i++ {
logMsg := <-logCol.logIn
if (logMsg.Topic == "msg" && !logCol.config.TopicMsg) ||
(logMsg.Topic == "app" && !logCol.config.TopicApp) ||
(logMsg.Topic == "debug" && !logCol.config.TopicDebug) ||
(logMsg.Topic == "status" && !logCol.config.TopicStatus) {
continue
}
logMsgs[index] = logMsg
logMsgs[index].MASID = logCol.masID
index++
}
logMsgs = logMsgs[:index]
_, err = logCol.client.PostLogs(logCol.masID, logMsgs)
if err != nil {
logCol.logError.Println(err)
for i := range logMsgs {
logCol.logIn <- logMsgs[i]
}
continue
}
}
tempTime := time.Now()
for {
time.Sleep(100 * time.Millisecond)
if time.Since(tempTime).Seconds() > 15 || len(logCol.logIn) > 50 {
break
}
}
}
} else {
for {
// print messages to stdout if logger is turned off
logMsg := <-logCol.logIn
if (logMsg.Topic == "msg" && !logCol.config.TopicMsg) ||
(logMsg.Topic == "app" && !logCol.config.TopicApp) ||
(logMsg.Topic == "debug" && !logCol.config.TopicDebug) ||
(logMsg.Topic == "status" && !logCol.config.TopicStatus) {
continue
}
logCol.logInfo.Println(logMsg)
}
}
}
// storeState requests the logging service to store state
func (logCol *LogCollector) storeState() (err error) {
if logCol.config.Active {
for {
var states []schemas.State
state := <-logCol.stateIn
states = append(states, state)
for i := 0; i < 24; i++ {
// maximum of 25 states
select {
case state = <-logCol.stateIn:
states = append(states, state)
default:
break
}
}
_, err = logCol.client.UpdateStates(states[0].MASID, states)
if err != nil {
logCol.logError.Println(err)
for i := range states {
logCol.stateIn <- states[i]
}
continue
}
}
}
return
}
// NewLogCollector creates an agency logger client
func NewLogCollector(masID int, config schemas.LoggerConfig, logErr *log.Logger,
logInf *log.Logger) (logCol *LogCollector, err error) {
logCol = &LogCollector{
masID: masID,
logError: logErr,
logInfo: logInf,
config: config,
}
if logCol.config.Active {
logCol.client, err = NewLoggerClient(config.Host, config.Port, time.Second*60,
time.Second*1, 4)
if err != nil {
logCol.config.Active = false
}
}
logCol.logIn = make(chan schemas.LogMessage, 10000)
logCol.stateIn = make(chan schemas.State, 10000)
go logCol.storeLogs()
go logCol.storeState()
logCol.logInfo.Println("Created new logger client; status: ", logCol.config.Active)
return
}
// AgentLogger is the endpoint for agents
type AgentLogger struct {
agentID int
masID int
client *LoggerClient
logOut chan schemas.LogMessage // logging inbox
stateOut chan schemas.State
mutex *sync.Mutex
logError *log.Logger
logInfo *log.Logger
active bool
}
// NewLog sends a new logging message to the logging service
func (agLog *AgentLogger) NewLog(topic string, message string, data string) (err error) {
agLog.mutex.Lock()
if !agLog.active {
agLog.mutex.Unlock()
return errors.New("Logger not active")
}
agLog.mutex.Unlock()
if topic != "error" && topic != "debug" && topic != "status" && topic != "msg" &&
topic != "app" {
err = errors.New("Unknown topic")
return
}
agLog.mutex.Lock()
time.Sleep(time.Millisecond * 5)
tStamp := time.Now()
agLog.mutex.Unlock()
msg := schemas.LogMessage{
AgentID: agLog.agentID,
Timestamp: tStamp,
Topic: topic,
Message: message,
AdditionalData: data}
agLog.logOut <- msg
return
}
// UpdateState overrides the state stored in database
func (agLog *AgentLogger) UpdateState(state string) (err error) {
agLog.mutex.Lock()
if !agLog.active {
agLog.mutex.Unlock()
return errors.New("agLog not active")
}
agLog.mutex.Unlock()
agState := schemas.State{
MASID: agLog.masID,
AgentID: agLog.agentID,
Timestamp: time.Now(),
State: state}
agLog.stateOut <- agState
return
}
// RestoreState loads state saved in database and return it
func (agLog *AgentLogger) RestoreState() (state string, err error) {
agLog.mutex.Lock()
if !agLog.active {
agLog.mutex.Unlock()
err = errors.New("agLog not active")
return
}
agLog.mutex.Unlock()
var agState schemas.State
agState, _, err = agLog.client.GetState(agLog.masID, agLog.agentID)
state = agState.State
return
}
// NewAgentLogger craetes a new object of type AgentLogger
func (logCol *LogCollector) NewAgentLogger(agentID int, config schemas.LoggerConfig,
logErr *log.Logger, logInf *log.Logger) (agLog *AgentLogger) {
agLog = &AgentLogger{
agentID: agentID,
masID: logCol.masID,
client: logCol.client,
logOut: logCol.logIn,
stateOut: logCol.stateIn,
mutex: &sync.Mutex{},
logError: logErr,
logInfo: logInf,
active: true,
}
return
}
// close closes the logger
func (agLog *AgentLogger) close() {
agLog.mutex.Lock()
agLog.logInfo.Println("Closing Logger of agent ", agLog.agentID)
agLog.active = false
agLog.mutex.Unlock()
return
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment