Commit c5eace60 authored by Sonja Happ's avatar Sonja Happ
Browse files

AMQP: improve message handling

parent 60bb5790
......@@ -24,6 +24,7 @@ package component_configuration
import (
"git.rwth-aachen.de/acs/public/villas/web-backend-go/database"
"git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/scenario"
"log"
)
type ComponentConfiguration struct {
......@@ -177,6 +178,7 @@ func (m *ComponentConfiguration) delete() error {
// if IC has state gone and there is no component configuration associated with it: delete IC
no_configs := db.Model(ic).Association("ComponentConfigurations").Count()
if no_configs == 0 && ic.State == "gone" {
log.Println("Deleting IC with state gone, last component config deleted", ic.UUID)
err = db.Delete(ic).Error
return err
}
......
......@@ -44,41 +44,42 @@ type AMQPclient struct {
type Action struct {
Act string `json:"action"`
When int64 `json:"when"`
Properties struct {
UUID *string `json:"uuid"`
Name *string `json:"name"`
Category *string `json:"category"`
Type *string `json:"type"`
Location *string `json:"location"`
WS_url *string `json:"ws_url"`
API_url *string `json:"api_url"`
Description *string `json:"description"`
} `json:"properties"`
Parameters struct {
UUID string `json:"uuid,omitempty"`
Name string `json:"name,omitempty"`
Category string `json:"category,omitempty"`
Type string `json:"type,omitempty"`
Location string `json:"location,omitempty"`
WS_url string `json:"ws_url,omitempty"`
API_url string `json:"api_url,omitempty"`
Description string `json:"description,omitempty"`
} `json:"parameters,omitempty"`
}
type ICStatus struct {
State *string `json:"state"`
Version *string `json:"version"`
Uptime *float64 `json:"uptime"`
Result *string `json:"result"`
Error *string `json:"error"`
State string `json:"state"`
Version string `json:"version"`
Uptime float64 `json:"uptime"`
Result string `json:"result"`
Error string `json:"error"`
}
type ICProperties struct {
Name *string `json:"name"`
Description *string `json:"description"`
Location *string `json:"location"`
Owner *string `json:"owner"`
WS_url *string `json:"ws_url"`
API_url *string `json:"api_url"`
Category *string `json:"category"`
Type *string `json:"type"`
Name string `json:"name"`
Description string `json:"description"`
Location string `json:"location"`
Owner string `json:"owner"`
WS_url string `json:"ws_url"`
API_url string `json:"api_url"`
Category string `json:"category"`
Type string `json:"type"`
}
type ICUpdate struct {
Status *ICStatus `json:"status"`
Properties *ICProperties `json:"properties"`
When *float64 `json:"when"`
Status ICStatus `json:"status"`
Properties ICProperties `json:"properties"`
When float64 `json:"when"`
Action string `json:"action"`
// TODO add JSON start parameter scheme
}
......@@ -151,7 +152,7 @@ func ConnectAMQP(uri string) error {
for message := range messages {
err = processMessage(message)
if err != nil {
log.Println("AMQP: Error processing message: ", message, err.Error())
log.Println("AMQP: Error processing message: ", err.Error())
}
}
}
......@@ -181,15 +182,10 @@ func sendActionAMQP(action Action) error {
// set message headers
var headers map[string]interface{}
headers = make(map[string]interface{}) // empty map
if action.Properties.UUID != nil {
headers["uuid"] = *action.Properties.UUID
}
if action.Properties.Type != nil {
headers["type"] = *action.Properties.Type
}
if action.Properties.Category != nil {
headers["category"] = *action.Properties.Category
}
headers["uuid"] = action.Parameters.UUID
headers["type"] = action.Parameters.Type
headers["category"] = action.Parameters.Category
msg.Headers = headers
err = CheckConnection()
......@@ -212,7 +208,7 @@ func sendActionAMQP(action Action) error {
//
// var a Action
// a.Act = "ping"
// *a.Properties.UUID = ""
// *a.Parameters.UUID = ""
//
// err := sendActionAMQP(a)
// return err
......@@ -276,100 +272,69 @@ func processMessage(message amqp.Delivery) error {
return fmt.Errorf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err)
}
if payload.Status != nil || payload.Properties != nil {
//log.Println("Processing AMQP message: ", string(message.Body))
// if a message contains a "state" field, it is an update for an IC
headers := amqp.Table(message.Headers)
ICUUID := fmt.Sprintf("%v", headers["uuid"])
_, err = uuid.Parse(ICUUID)
if err != nil {
return fmt.Errorf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
}
if payload.Action != "" {
// if a message contains an action, it is not intended for the backend
log.Println("AMQP: Ignoring action message for action", payload.Action, ICUUID)
return nil
}
headers := amqp.Table(message.Headers)
ICUUID := fmt.Sprintf("%v", headers["uuid"])
_, err = uuid.Parse(ICUUID)
var sToBeUpdated InfrastructureComponent
err = sToBeUpdated.byUUID(ICUUID)
if err != nil {
return fmt.Errorf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
}
var sToBeUpdated InfrastructureComponent
err = sToBeUpdated.byUUID(ICUUID)
if err == gorm.ErrRecordNotFound {
// create new record
err = createExternalIC(payload, ICUUID)
} else if err != nil {
// database error
err = fmt.Errorf("AMQP: Database error for IC %v DB error message: %v", ICUUID, err)
} else {
// update record based on payload
err = sToBeUpdated.updateExternalIC(payload)
}
if err == gorm.ErrRecordNotFound {
// create new record
err = createExternalIC(payload, ICUUID, message.Body)
} else if err != nil {
// database error
err = fmt.Errorf("AMQP: Database error for IC %v DB error message: %v", ICUUID, err)
} else {
log.Println("INFO: ignoring message, payload neither contains status nor properties", message)
// update record based on payload
err = sToBeUpdated.updateExternalIC(payload, message.Body)
}
return err
}
func createExternalIC(payload ICUpdate, ICUUID string) error {
if payload.Properties == nil {
return fmt.Errorf("AMQP: Cannot create new IC, Propertie field missing")
}
func createExternalIC(payload ICUpdate, ICUUID string, body []byte) error {
var newICReq AddICRequest
newICReq.InfrastructureComponent.UUID = ICUUID
if payload.Properties.Name == nil ||
payload.Properties.Category == nil ||
payload.Properties.Type == nil {
// cannot create new IC because required information (name, type, and/or category missing)
return fmt.Errorf("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category")
}
newICReq.InfrastructureComponent.Name = *payload.Properties.Name
newICReq.InfrastructureComponent.Category = *payload.Properties.Category
newICReq.InfrastructureComponent.Type = *payload.Properties.Type
newICReq.InfrastructureComponent.Name = payload.Properties.Name
newICReq.InfrastructureComponent.Category = payload.Properties.Category
newICReq.InfrastructureComponent.Type = payload.Properties.Type
// add optional params
if payload.Status != nil {
if payload.Status.State != nil {
newICReq.InfrastructureComponent.State = *payload.Status.State
} else {
newICReq.InfrastructureComponent.State = "unknown"
}
if newICReq.InfrastructureComponent.State == "gone" {
// Check if state is "gone" and abort creation of IC in this case
log.Println("AMQP: Aborting creation of IC with state gone")
return nil
}
if payload.Status.Uptime != nil {
newICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime
}
}
if payload.Properties.WS_url != nil {
newICReq.InfrastructureComponent.WebsocketURL = *payload.Properties.WS_url
}
if payload.Properties.API_url != nil {
newICReq.InfrastructureComponent.APIURL = *payload.Properties.API_url
}
if payload.Properties.Location != nil {
newICReq.InfrastructureComponent.Location = *payload.Properties.Location
}
if payload.Properties.Description != nil {
newICReq.InfrastructureComponent.Description = *payload.Properties.Description
}
// TODO add JSON start parameter scheme
if payload.Status.State != "" {
newICReq.InfrastructureComponent.State = payload.Status.State
} else {
newICReq.InfrastructureComponent.State = "unknown"
}
if newICReq.InfrastructureComponent.State == "gone" {
// Check if state is "gone" and abort creation of IC in this case
log.Println("AMQP: Aborting creation of IC with state gone")
return nil
}
newICReq.InfrastructureComponent.Uptime = payload.Status.Uptime
newICReq.InfrastructureComponent.WebsocketURL = payload.Properties.WS_url
newICReq.InfrastructureComponent.APIURL = payload.Properties.API_url
newICReq.InfrastructureComponent.Location = payload.Properties.Location
newICReq.InfrastructureComponent.Description = payload.Properties.Description
// set managed externally to true because this IC is created via AMQP
newICReq.InfrastructureComponent.ManagedExternally = newTrue()
// set raw status update if IC
payloadRaw, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("AMQP: failed to marshal raw payload: %v", err)
}
newICReq.InfrastructureComponent.StatusUpdateRaw = postgres.Jsonb{RawMessage: payloadRaw}
newICReq.InfrastructureComponent.StatusUpdateRaw = postgres.Jsonb{RawMessage: body}
// TODO add JSON start parameter scheme
// Validate the new IC
err = newICReq.validate()
err := newICReq.validate()
if err != nil {
return fmt.Errorf("AMQP: Validation of new IC failed: %v", err)
}
......@@ -390,67 +355,41 @@ func createExternalIC(payload ICUpdate, ICUUID string) error {
return nil
}
func (s *InfrastructureComponent) updateExternalIC(payload ICUpdate) error {
func (s *InfrastructureComponent) updateExternalIC(payload ICUpdate, body []byte) error {
var updatedICReq UpdateICRequest
if payload.Status != nil {
if payload.Status.State != nil {
updatedICReq.InfrastructureComponent.State = *payload.Status.State
if *payload.Status.State == "gone" {
// remove IC from DB
log.Println("AMQP: Deleting IC with state gone")
err := s.delete(true)
if err != nil {
// if component could not be deleted there are still configurations using it in the DB
// continue with the update to save the new state of the component and get back to the deletion later
log.Println("AMQP: Deletion of IC postponed (config(s) associated to it)")
}
if payload.Status.State != "" {
updatedICReq.InfrastructureComponent.State = payload.Status.State
if updatedICReq.InfrastructureComponent.State == "gone" {
// remove IC from DB
log.Println("AMQP: Deleting IC with state gone", s.UUID)
err := s.delete(true)
if err != nil {
// if component could not be deleted there are still configurations using it in the DB
// continue with the update to save the new state of the component and get back to the deletion later
log.Println(err)
}
}
if payload.Status.Uptime != nil {
updatedICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime
}
}
if payload.Properties != nil {
if payload.Properties.Type != nil {
updatedICReq.InfrastructureComponent.Type = *payload.Properties.Type
}
if payload.Properties.Category != nil {
updatedICReq.InfrastructureComponent.Category = *payload.Properties.Category
}
if payload.Properties.Name != nil {
updatedICReq.InfrastructureComponent.Name = *payload.Properties.Name
}
if payload.Properties.WS_url != nil {
updatedICReq.InfrastructureComponent.WebsocketURL = *payload.Properties.WS_url
}
if payload.Properties.API_url != nil {
updatedICReq.InfrastructureComponent.APIURL = *payload.Properties.API_url
}
if payload.Properties.Location != nil {
updatedICReq.InfrastructureComponent.Location = *payload.Properties.Location
}
if payload.Properties.Description != nil {
updatedICReq.InfrastructureComponent.Description = *payload.Properties.Description
}
}
} else {
updatedICReq.InfrastructureComponent.State = "unknown"
}
updatedICReq.InfrastructureComponent.Uptime = payload.Status.Uptime
updatedICReq.InfrastructureComponent.Type = payload.Properties.Type
updatedICReq.InfrastructureComponent.Category = payload.Properties.Category
updatedICReq.InfrastructureComponent.Name = payload.Properties.Name
updatedICReq.InfrastructureComponent.WebsocketURL = payload.Properties.WS_url
updatedICReq.InfrastructureComponent.APIURL = payload.Properties.API_url
updatedICReq.InfrastructureComponent.Location = payload.Properties.Location
updatedICReq.InfrastructureComponent.Description = payload.Properties.Description
// set raw status update if IC
payloadRaw, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("AMQP: failed to marshal raw payload: %v", err)
}
updatedICReq.InfrastructureComponent.StatusUpdateRaw = postgres.Jsonb{RawMessage: payloadRaw}
updatedICReq.InfrastructureComponent.StatusUpdateRaw = postgres.Jsonb{RawMessage: body}
// TODO add JSON start parameter scheme
// Validate the updated IC
err = updatedICReq.validate()
err := updatedICReq.validate()
if err != nil {
return fmt.Errorf("AMQP: Validation of updated IC failed: %v", err)
}
......
......@@ -284,26 +284,11 @@ func sendActionToIC(c *gin.Context) {
//now := time.Now()
for _, action := range actions {
/*if action.When == 0 {
action.When = float32(now.Unix())
}*/
// make sure that the important properties are set correctly so that the message can be identified by the receiver
if action.Properties.UUID == nil {
action.Properties.UUID = new(string)
*action.Properties.UUID = s.UUID
}
if action.Properties.Type == nil {
action.Properties.Type = new(string)
*action.Properties.Type = s.Type
}
if action.Properties.Category == nil {
action.Properties.Category = new(string)
*action.Properties.Category = s.Category
}
if action.Properties.Name == nil {
action.Properties.Name = new(string)
*action.Properties.Name = s.Name
}
action.Parameters.UUID = s.UUID
action.Parameters.Type = s.Type
action.Parameters.Category = s.Category
action.Parameters.Name = s.Name
err = sendActionAMQP(action)
if err != nil {
......
......@@ -63,8 +63,7 @@ func (s *InfrastructureComponent) delete(receivedViaAMQP bool) error {
var action Action
action.Act = "delete"
action.When = time.Now().Unix()
action.Properties.UUID = new(string)
*action.Properties.UUID = s.UUID
action.Parameters.UUID = s.UUID
log.Println("AMQP: Sending request to delete IC with UUID", s.UUID)
err := sendActionAMQP(action)
......@@ -72,11 +71,10 @@ func (s *InfrastructureComponent) delete(receivedViaAMQP bool) error {
}
db := database.GetDB()
noConfigs := db.Model(s).Association("ComponentConfigurations").Count()
no_configs := db.Model(s).Association("ComponentConfigurations").Count()
if no_configs > 0 {
return fmt.Errorf("Infrastructure Component cannot be deleted as it is still used in configurations (active or dangling)")
if noConfigs > 0 {
return fmt.Errorf("deletion of IC postponed, %v config(s) associated to it", noConfigs)
}
// delete InfrastructureComponent from DB (does NOT remain as dangling)
......
......@@ -45,7 +45,7 @@ import (
var router *gin.Engine
var api *gin.RouterGroup
var waitingTime time.Duration = 2
var waitingTime time.Duration = 1
type ICRequest struct {
UUID string `json:"uuid,omitempty"`
......@@ -75,7 +75,7 @@ type ConfigRequest struct {
FileIDs []int64 `json:"fileIDs,omitempty"`
}
type ICAction struct {
/*type ICAction struct {
Act string `json:"action,omitempty"`
When int64 `json:"when,omitempty"`
Properties struct {
......@@ -88,7 +88,7 @@ type ICAction struct {
API_url *string `json:"api_url,omitempty"`
Description *string `json:"description,omitempty"`
} `json:"properties,omitempty"`
}
}*/
var newIC1 = ICRequest{
UUID: "7be0322d-354e-431e-84bd-ae4c9633138b",
......@@ -314,16 +314,10 @@ func TestUpdateICAsAdmin(t *testing.T) {
// fake an IC update (create) message
var update ICUpdate
update.Status = new(ICStatus)
update.Properties = new(ICProperties)
update.Status.State = new(string)
*update.Status.State = "idle"
update.Properties.Name = new(string)
*update.Properties.Name = newIC2.Name
update.Properties.Category = new(string)
*update.Properties.Category = newIC2.Category
update.Properties.Type = new(string)
*update.Properties.Type = newIC2.Type
update.Status.State = "idle"
update.Properties.Name = newIC2.Name
update.Properties.Category = newIC2.Category
update.Properties.Type = newIC2.Type
payload, err := json.Marshal(update)
assert.NoError(t, err)
......@@ -445,16 +439,10 @@ func TestDeleteICAsAdmin(t *testing.T) {
// fake an IC update (create) message
var update ICUpdate
update.Status = new(ICStatus)
update.Properties = new(ICProperties)
update.Status.State = new(string)
*update.Status.State = "idle"
update.Properties.Name = new(string)
*update.Properties.Name = newIC2.Name
update.Properties.Category = new(string)
*update.Properties.Category = newIC2.Category
update.Properties.Type = new(string)
*update.Properties.Type = newIC2.Type
update.Status.State = "idle"
update.Properties.Name = newIC2.Name
update.Properties.Category = newIC2.Category
update.Properties.Type = newIC2.Type
payload, err := json.Marshal(update)
assert.NoError(t, err)
......@@ -492,6 +480,9 @@ func TestDeleteICAsAdmin(t *testing.T) {
assert.NoError(t, err)
assert.Equalf(t, 200, code, "Response body: \n%v\n", resp)
// Wait until externally managed IC is deleted (happens async)
time.Sleep(waitingTime * time.Second)
// Again count the number of all the ICs returned
finalNumberAfterExtneralDelete, err := helper.LengthOfResponse(router, token,
"/api/ic", "GET", nil)
......@@ -653,13 +644,13 @@ func TestSendActionToIC(t *testing.T) {
assert.NoError(t, err)
// create action to be sent to IC
action1 := ICAction{
action1 := Action{
Act: "start",
When: time.Now().Unix(),
}
action1.Properties.UUID = new(string)
*action1.Properties.UUID = newIC1.UUID
actions := [1]ICAction{action1}
action1.Parameters.UUID = newIC1.UUID
actions := [1]Action{action1}
// Send action to IC
code, resp, err = helper.TestEndpoint(router, token,
......@@ -687,10 +678,7 @@ func TestCreateUpdateViaAMQPRecv(t *testing.T) {
// fake an IC update message
var update ICUpdate
update.Status = new(ICStatus)
update.Properties = new(ICProperties)
update.Status.State = new(string)
*update.Status.State = "idle"
update.Status.State = "idle"
payload, err := json.Marshal(update)
assert.NoError(t, err)
......@@ -728,22 +716,14 @@ func TestCreateUpdateViaAMQPRecv(t *testing.T) {
assert.Equal(t, 0, number)
// complete the (required) data of an IC
update.Properties.Name = new(string)
*update.Properties.Name = newIC1.Name
update.Properties.Category = new(string)
*update.Properties.Category = newIC1.Category
update.Properties.Type = new(string)
*update.Properties.Type = newIC1.Type
update.Status.Uptime = new(float64)
*update.Status.Uptime = -1.0
update.Properties.WS_url = new(string)
*update.Properties.WS_url = newIC1.WebsocketURL
update.Properties.API_url = new(string)
*update.Properties.API_url = newIC1.APIURL
update.Properties.Description = new(string)
*update.Properties.Description = newIC1.Description
update.Properties.Location = new(string)
*update.Properties.Location = newIC1.Location
update.Properties.Name = newIC1.Name
update.Properties.Category = newIC1.Category
update.Properties.Type = newIC1.Type
update.Status.Uptime = 1000.1
update.Properties.WS_url = newIC1.WebsocketURL
update.Properties.API_url = newIC1.APIURL
update.Properties.Description = newIC1.Description
update.Properties.Location = newIC1.Location
payload, err = json.Marshal(update)
assert.NoError(t, err)
......@@ -778,7 +758,7 @@ func TestCreateUpdateViaAMQPRecv(t *testing.T) {
assert.Equal(t, 1, number)
// modify status update
*update.Properties.Name = "This is the new name"
update.Properties.Name = "This is the new name"
payload, err = json.Marshal(update)
assert.NoError(t, err)
......@@ -821,27 +801,16 @@ func TestDeleteICViaAMQPRecv(t *testing.T) {
// fake an IC update message
var update ICUpdate
update.Status = new(ICStatus)
update.Properties = new(ICProperties)
update.Status.State = new(string)
*update.Status.State = "idle"
update.Status.State = "idle"
// complete the (required) data of an IC
update.Properties.Name = new(string)
*update.Properties.Name = newIC1.Name
update.Properties.Category = new(string)
*update.Properties.Category = newIC1.Category
update.Properties.Type = new(string)
*update.Properties.Type = newIC1.Type
update.Status.Uptime = new(float64)
*update.Status.Uptime = -1.0
update.Properties.WS_url = new(string)
*update.Properties.WS_url = newIC1.WebsocketURL
update.Properties.API_url = new(string)
*update.Properties.API_url = newIC1.APIURL
update.Properties.Description = new(string)
*update.Properties.Description = newIC1.Description
update.Properties.Location = new(string)
*update.Properties.Location = newIC1.Location
update.Properties.Name = newIC1.Name