Commit 555479ab authored by Stefan Dähling's avatar Stefan Dähling
Browse files

handle state by agency

parent 0dedd16b
Pipeline #344543 passed with stages
in 1 minute and 46 seconds
......@@ -113,7 +113,8 @@ func (log *Logger) UpdateState(state string) (err error) {
AgentID: log.agentID,
Timestamp: time.Now(),
State: state}
_, err = client.PutState(agState)
// _, err = client.PutState(agState)
log.client.stateIn <- agState
return
}
......@@ -160,7 +161,8 @@ func (log *Logger) close() {
type loggerClient struct {
masID int
logIn chan schemas.LogMessage // logging inbox
active bool // indicates if logging is active (switch via env)
stateIn chan schemas.State
active bool // indicates if logging is active (switch via env)
logError *log.Logger
logInfo *log.Logger
}
......@@ -202,6 +204,35 @@ func (log *loggerClient) storeLogs() (err error) {
}
}
// storeState requests the logging service to store state
func (log *loggerClient) storeState() (err error) {
if log.active {
for {
var states []schemas.State
state := <-log.stateIn
states = append(states, state)
for i := 0; i < 24; i++ {
// maximum of 25 states
select {
case state = <-log.stateIn:
states = append(states, state)
default:
break
}
}
_, err = client.UpdateStates(states[0].MASID, states)
if err != nil {
log.logError.Println(err)
for i := range states {
log.stateIn <- states[i]
}
continue
}
}
}
return
}
// newLoggerClient creates an agency logger client
func newLoggerClient(masID int, logErr *log.Logger, logInf *log.Logger) (log *loggerClient) {
log = &loggerClient{
......@@ -215,7 +246,9 @@ func newLoggerClient(masID int, logErr *log.Logger, logInf *log.Logger) (log *lo
log.active = true
}
log.logIn = make(chan schemas.LogMessage, 10000)
log.stateIn = make(chan schemas.State, 10000)
go log.storeLogs()
go log.storeState()
log.logInfo.Println("Created new logger client; status: ", log.active)
return
}
......@@ -110,6 +110,14 @@ func PutState(state schemas.State) (httpStatus int, err error) {
return
}
// UpdateStates updates the state
func UpdateStates(masID int, states []schemas.State) (httpStatus int, err error) {
js, _ := json.Marshal(states)
_, httpStatus, err = httpretry.Put(httpClient, "http://"+Host+":"+strconv.Itoa(Port)+
"/api/state/"+strconv.Itoa(masID)+"/list", js, time.Second*2, 4)
return
}
// GetState requests state from logger
func GetState(masID int, agentID int) (state schemas.State, httpStatus int, err error) {
var body []byte
......
......@@ -87,10 +87,15 @@ func (logger *Logger) handleAPI(w http.ResponseWriter, r *http.Request) {
var masID, agentID int
masID, cmapErr = strconv.Atoi(respath[3])
if cmapErr == nil {
agentID, cmapErr = strconv.Atoi(respath[4])
if cmapErr == nil {
cmapErr, httpErr = logger.handleState(masID, agentID, w, r)
if respath[4] == "list" {
cmapErr, httpErr = logger.handleStateList(masID, w, r)
resvalid = true
} else {
agentID, cmapErr = strconv.Atoi(respath[4])
if cmapErr == nil {
cmapErr, httpErr = logger.handleState(masID, agentID, w, r)
resvalid = true
}
}
}
}
......@@ -321,7 +326,31 @@ func (logger *Logger) handleState(masID int, agentid int, w http.ResponseWriter,
cmapErr = errors.New("Error: Method not allowed on path /api/state/{mas-id}/{agent-id}")
}
return
}
// handleStateList is the handler for requests to path /api/state/{mas-id}/list
func (logger *Logger) handleStateList(masID int, w http.ResponseWriter,
r *http.Request) (cmapErr, httpErr error) {
if r.Method == "PUT" {
var body []byte
body, cmapErr = ioutil.ReadAll(r.Body)
if cmapErr == nil {
var states []schemas.State
cmapErr = json.Unmarshal(body, &states)
if cmapErr == nil {
go logger.updateAgentStatesList(masID, states)
httpErr = httpreply.Updated(w, nil)
} else {
httpErr = httpreply.JSONUnmarshalError(w)
}
} else {
httpErr = httpreply.InvalidBodyError(w)
}
} else {
httpErr = httpreply.MethodNotAllowed(w)
cmapErr = errors.New("Error: Method not allowed on path /api/state/{mas-id}/list")
}
return
}
// listen opens a http server listening and serving request
......
......@@ -101,7 +101,7 @@ func (logger *Logger) init() (err error) {
logger.stor = newLocalStorage()
case "production":
logger.logInfo.Println("Cassandra storage")
logger.stor, err = newCassandraStorage([]string{"cass-ssset-0.cassandra", "cass-ssset-1.cassandra", "cass-ssset-2.cassandra"}, "cassandra", "cassandra")
logger.stor, err = newCassandraStorage([]string{"cassandra", "cass-ssset-1.cassandra", "cass-ssset-2.cassandra"}, "cassandra", "cassandra")
default:
err = errors.New("Wrong deployment type: " + deplType)
}
......@@ -163,3 +163,14 @@ func (logger *Logger) updateAgentState(masID int, agentID int, agState schemas.S
err = logger.stor.updateAgentState(masID, agentID, agState)
return
}
// addAgentLogMessageList
func (logger *Logger) updateAgentStatesList(masID int, states []schemas.State) (err error) {
for i := 0; i < len(states); i++ {
err = logger.updateAgentState(masID, states[i].AgentID, states[i])
if err != nil {
return
}
}
return
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment