Commit cc519131 authored by Sonja Happ's avatar Sonja Happ
Browse files

Add more test for AMQP functions, improve code structure

parent 2e649468
......@@ -23,7 +23,6 @@ package component_configuration
import (
"git.rwth-aachen.de/acs/public/villas/web-backend-go/database"
"git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/infrastructure-component"
"git.rwth-aachen.de/acs/public/villas/web-backend-go/routes/scenario"
)
......@@ -61,8 +60,11 @@ func (m *ComponentConfiguration) addToScenario() error {
}
// associate IC with component configuration
var ic infrastructure_component.InfrastructureComponent
err = ic.ByID(m.ICID)
var ic database.InfrastructureComponent
err = db.Find(&ic, m.ICID).Error
if err != nil {
return err
}
err = db.Model(&ic).Association("ComponentConfigurations").Append(m).Error
if err != nil {
return err
......@@ -80,23 +82,24 @@ func (m *ComponentConfiguration) Update(modifiedConfig ComponentConfiguration) e
// check if IC has been updated
if m.ICID != modifiedConfig.ICID {
// update IC
var s infrastructure_component.InfrastructureComponent
var s_old infrastructure_component.InfrastructureComponent
err := s.ByID(modifiedConfig.ICID)
var ic database.InfrastructureComponent
var ic_old database.InfrastructureComponent
err := db.Find(&ic, modifiedConfig.ICID).Error
if err != nil {
return err
}
err = s_old.ByID(m.ICID)
err = db.Find(&ic_old, m.ICID).Error
if err != nil {
return err
}
// remove component configuration from old IC
err = db.Model(&s_old).Association("ComponentConfigurations").Delete(m).Error
err = db.Model(&ic_old).Association("ComponentConfigurations").Delete(m).Error
if err != nil {
return err
}
// add component configuration to new IC
err = db.Model(&s).Association("ComponentConfigurations").Append(m).Error
err = db.Model(&ic).Association("ComponentConfigurations").Append(m).Error
if err != nil {
return err
}
......@@ -121,8 +124,8 @@ func (m *ComponentConfiguration) delete() error {
return err
}
var ic infrastructure_component.InfrastructureComponent
err = ic.ByID(m.ICID)
var ic database.InfrastructureComponent
err = db.Find(&ic, m.ICID).Error
if err != nil {
return err
}
......
......@@ -183,7 +183,7 @@ func sendActionAMQP(action Action) error {
return err
}
log.Println("AMQP: Sending message", string(msg.Body))
//log.Println("AMQP: Sending message", string(msg.Body))
err = client.channel.Publish(VILLAS_EXCHANGE,
"",
false,
......@@ -193,16 +193,16 @@ func sendActionAMQP(action Action) error {
}
func PingAMQP() error {
log.Println("AMQP: sending ping command to all ICs")
var a Action
a.Act = "ping"
*a.Properties.UUID = ""
err := sendActionAMQP(a)
return err
}
//func PingAMQP() error {
// log.Println("AMQP: sending ping command to all ICs")
//
// var a Action
// a.Act = "ping"
// *a.Properties.UUID = ""
//
// err := sendActionAMQP(a)
// return err
//}
func CheckConnection() error {
......@@ -256,8 +256,6 @@ func StartAMQP(AMQPurl string, api *gin.RouterGroup) error {
func processMessage(message amqp.Delivery) error {
log.Println("Processing AMQP message: ", string(message.Body))
var payload ICUpdate
err := json.Unmarshal(message.Body, &payload)
if err != nil {
......@@ -265,6 +263,7 @@ func processMessage(message amqp.Delivery) error {
}
if payload.Status != nil {
//log.Println("Processing AMQP message: ", string(message.Body))
// if a message contains a "state" field, it is an update for an IC
ICUUID := payload.Status.UUID
_, err = uuid.Parse(ICUUID)
......@@ -273,18 +272,160 @@ func processMessage(message amqp.Delivery) error {
return fmt.Errorf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
}
var sToBeUpdated InfrastructureComponent
err = sToBeUpdated.ByUUID(ICUUID)
err = sToBeUpdated.byUUID(ICUUID)
if err == gorm.ErrRecordNotFound {
// create new record
err = createNewICviaAMQP(payload)
err = createExternalIC(payload)
} else if err != nil {
// database error
err = fmt.Errorf("AMQP: Database error for IC %v DB error message: %v", ICUUID, err)
} else {
// update record based on payload
err = sToBeUpdated.updateICviaAMQP(payload)
err = sToBeUpdated.updateExternalIC(payload)
}
}
return err
}
func createExternalIC(payload ICUpdate) error {
var newICReq AddICRequest
newICReq.InfrastructureComponent.UUID = payload.Status.UUID
if payload.Status.Name == nil ||
payload.Status.Category == nil ||
payload.Status.Type == nil {
// cannot create new IC because required information (name, type, and/or category missing)
return fmt.Errorf("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category")
}
newICReq.InfrastructureComponent.Name = *payload.Status.Name
newICReq.InfrastructureComponent.Category = *payload.Status.Category
newICReq.InfrastructureComponent.Type = *payload.Status.Type
// add optional params
if payload.Status.State != nil {
newICReq.InfrastructureComponent.State = *payload.Status.State
} else {
newICReq.InfrastructureComponent.State = "unknown"
}
if newICReq.InfrastructureComponent.State == "gone" {
// Check if state is "gone" and abort creation of IC in this case
log.Println("AMQP: Aborting creation of IC with state gone")
return nil
}
if payload.Status.WS_url != nil {
newICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url
}
if payload.Status.API_url != nil {
newICReq.InfrastructureComponent.APIURL = *payload.Status.API_url
}
if payload.Status.Location != nil {
newICReq.InfrastructureComponent.Location = *payload.Status.Location
}
if payload.Status.Description != nil {
newICReq.InfrastructureComponent.Description = *payload.Status.Description
}
if payload.Status.Uptime != nil {
newICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime
}
// TODO add JSON start parameter scheme
// set managed externally to true because this IC is created via AMQP
newICReq.InfrastructureComponent.ManagedExternally = newTrue()
// Validate the new IC
err := newICReq.validate()
if err != nil {
return fmt.Errorf("AMQP: Validation of new IC failed: %v", err)
}
// Create the new IC
newIC, err := newICReq.createIC(true)
if err != nil {
return fmt.Errorf("AMQP: Creating new IC failed: %v", err)
}
// save IC
err = newIC.save()
if err != nil {
return fmt.Errorf("AMQP: Saving new IC to DB failed: %v", err)
}
log.Println("AMQP: Created IC with UUID ", newIC.UUID)
return nil
}
func (s *InfrastructureComponent) updateExternalIC(payload ICUpdate) error {
var updatedICReq UpdateICRequest
if payload.Status.State != nil {
updatedICReq.InfrastructureComponent.State = *payload.Status.State
if *payload.Status.State == "gone" {
// remove IC from DB
log.Println("AMQP: Deleting IC with state gone")
err := s.delete(true)
if err != nil {
// if component could not be deleted there are still configurations using it in the DB
// continue with the update to save the new state of the component and get back to the deletion later
log.Println("AMQP: Deletion of IC postponed (config(s) associated to it)")
}
}
}
if payload.Status.Type != nil {
updatedICReq.InfrastructureComponent.Type = *payload.Status.Type
}
if payload.Status.Category != nil {
updatedICReq.InfrastructureComponent.Category = *payload.Status.Category
}
if payload.Status.Name != nil {
updatedICReq.InfrastructureComponent.Name = *payload.Status.Name
}
if payload.Status.WS_url != nil {
updatedICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url
}
if payload.Status.API_url != nil {
updatedICReq.InfrastructureComponent.APIURL = *payload.Status.API_url
}
if payload.Status.Location != nil {
//postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Status.Location + `"}`)}
updatedICReq.InfrastructureComponent.Location = *payload.Status.Location
}
if payload.Status.Description != nil {
updatedICReq.InfrastructureComponent.Description = *payload.Status.Description
}
if payload.Status.Uptime != nil {
updatedICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime
}
// TODO add JSON start parameter scheme
// Validate the updated IC
err := updatedICReq.validate()
if err != nil {
return fmt.Errorf("AMQP: Validation of updated IC failed: %v", err)
}
// Create the updated IC from old IC
updatedIC := updatedICReq.updatedIC(*s)
// Finally update the IC in the DB
err = s.update(updatedIC)
if err != nil {
return fmt.Errorf("AMQP: Unable to update IC %v in DB: %v", s.Name, err)
}
log.Println("AMQP: Updated IC with UUID ", s.UUID)
return err
}
func newTrue() *bool {
b := true
return &b
}
func newFalse() *bool {
b := false
return &b
}
......@@ -22,7 +22,6 @@
package infrastructure_component
import (
"fmt"
"git.rwth-aachen.de/acs/public/villas/web-backend-go/database"
"git.rwth-aachen.de/acs/public/villas/web-backend-go/helper"
"github.com/gin-gonic/gin"
......@@ -108,9 +107,9 @@ func addIC(c *gin.Context) {
return
}
if !newIC.ManagedExternally {
if !(newIC.ManagedExternally) {
// Save new IC to DB if not managed externally
err = newIC.Save()
err = newIC.save()
if helper.DBError(c, err) {
return
......@@ -142,6 +141,11 @@ func updateIC(c *gin.Context) {
return
}
if oldIC.ManagedExternally {
helper.ForbiddenError(c, "Cannot update externally managed component via API")
return
}
var req UpdateICRequest
err := c.BindJSON(&req)
if err != nil {
......@@ -156,17 +160,10 @@ func updateIC(c *gin.Context) {
}
// Create the updatedIC from oldIC
updatedIC, err := req.updatedIC(oldIC, false)
if err != nil {
c.JSON(http.StatusForbidden, gin.H{
"success": false,
"message": fmt.Sprintf("%v", err),
})
return
}
updatedIC := req.updatedIC(oldIC)
// Finally update the IC in the DB
err = oldIC.Update(updatedIC)
err = oldIC.update(updatedIC)
if !helper.DBError(c, err) {
c.JSON(http.StatusOK, gin.H{"ic": updatedIC.InfrastructureComponent})
}
......@@ -285,7 +282,7 @@ func sendActionToIC(c *gin.Context) {
}
//now := time.Now()
log.Println("AMQP: Will attempt to send the following actions:", actions)
log.Println("AMQP: Sending actions:", actions)
for _, action := range actions {
/*if action.When == 0 {
......
......@@ -32,25 +32,25 @@ type InfrastructureComponent struct {
database.InfrastructureComponent
}
func (s *InfrastructureComponent) Save() error {
func (s *InfrastructureComponent) save() error {
db := database.GetDB()
err := db.Create(s).Error
return err
}
func (s *InfrastructureComponent) ByID(id uint) error {
func (s *InfrastructureComponent) byID(id uint) error {
db := database.GetDB()
err := db.Find(s, id).Error
return err
}
func (s *InfrastructureComponent) ByUUID(uuid string) error {
func (s *InfrastructureComponent) byUUID(uuid string) error {
db := database.GetDB()
err := db.Find(s, "UUID = ?", uuid).Error
return err
}
func (s *InfrastructureComponent) Update(updatedIC InfrastructureComponent) error {
func (s *InfrastructureComponent) update(updatedIC InfrastructureComponent) error {
db := database.GetDB()
err := db.Model(s).Updates(updatedIC).Error
......@@ -66,6 +66,7 @@ func (s *InfrastructureComponent) delete(receivedViaAMQP bool) error {
action.Properties.UUID = new(string)
*action.Properties.UUID = s.UUID
log.Println("AMQP: Sending request to delete IC with UUID", s.UUID)
err := sendActionAMQP(action)
return err
}
......@@ -89,144 +90,3 @@ func (s *InfrastructureComponent) getConfigs() ([]database.ComponentConfiguratio
err := db.Order("ID asc").Model(s).Related(&configs, "ComponentConfigurations").Error
return configs, len(configs), err
}
func createNewICviaAMQP(payload ICUpdate) error {
var newICReq AddICRequest
newICReq.InfrastructureComponent.UUID = payload.Status.UUID
if payload.Status.Name == nil ||
payload.Status.Category == nil ||
payload.Status.Type == nil {
// cannot create new IC because required information (name, type, and/or category missing)
return fmt.Errorf("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category")
}
newICReq.InfrastructureComponent.Name = *payload.Status.Name
newICReq.InfrastructureComponent.Category = *payload.Status.Category
newICReq.InfrastructureComponent.Type = *payload.Status.Type
// add optional params
if payload.Status.State != nil {
newICReq.InfrastructureComponent.State = *payload.Status.State
} else {
newICReq.InfrastructureComponent.State = "unknown"
}
if newICReq.InfrastructureComponent.State == "gone" {
// Check if state is "gone" and abort creation of IC in this case
log.Println("########## AMQP: Aborting creation of IC with state gone")
return nil
}
if payload.Status.WS_url != nil {
newICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url
}
if payload.Status.API_url != nil {
newICReq.InfrastructureComponent.APIURL = *payload.Status.API_url
}
if payload.Status.Location != nil {
newICReq.InfrastructureComponent.Location = *payload.Status.Location
}
if payload.Status.Description != nil {
newICReq.InfrastructureComponent.Description = *payload.Status.Description
}
if payload.Status.Uptime != nil {
newICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime
}
// TODO add JSON start parameter scheme
// set managed externally to true because this IC is created via AMQP
newICReq.InfrastructureComponent.ManagedExternally = newTrue()
// Validate the new IC
err := newICReq.validate()
if err != nil {
return fmt.Errorf("AMQP: Validation of new IC failed: %v", err)
}
// Create the new IC
newIC, err := newICReq.createIC(true)
if err != nil {
return fmt.Errorf("AMQP: Creating new IC failed: %v", err)
}
// save IC
err = newIC.Save()
if err != nil {
return fmt.Errorf("AMQP: Saving new IC to DB failed: %v", err)
}
return nil
}
func (s *InfrastructureComponent) updateICviaAMQP(payload ICUpdate) error {
var updatedICReq UpdateICRequest
if payload.Status.State != nil {
updatedICReq.InfrastructureComponent.State = *payload.Status.State
if *payload.Status.State == "gone" {
// remove IC from DB
log.Println("########## AMQP: Deleting IC with state gone")
err := s.delete(true)
if err != nil {
// if component could not be deleted there are still configurations using it in the DB
// continue with the update to save the new state of the component and get back to the deletion later
log.Println("########## AMQP: Deletion of IC postponed (config(s) associated to it)")
}
}
}
if payload.Status.Type != nil {
updatedICReq.InfrastructureComponent.Type = *payload.Status.Type
}
if payload.Status.Category != nil {
updatedICReq.InfrastructureComponent.Category = *payload.Status.Category
}
if payload.Status.Name != nil {
updatedICReq.InfrastructureComponent.Name = *payload.Status.Name
}
if payload.Status.WS_url != nil {
updatedICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url
}
if payload.Status.API_url != nil {
updatedICReq.InfrastructureComponent.APIURL = *payload.Status.API_url
}
if payload.Status.Location != nil {
//postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Status.Location + `"}`)}
updatedICReq.InfrastructureComponent.Location = *payload.Status.Location
}
if payload.Status.Description != nil {
updatedICReq.InfrastructureComponent.Description = *payload.Status.Description
}
if payload.Status.Uptime != nil {
updatedICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime
}
// TODO add JSON start parameter scheme
// set managed externally to true because this IC is updated via AMQP
updatedICReq.InfrastructureComponent.ManagedExternally = newTrue()
// Validate the updated IC
err := updatedICReq.validate()
if err != nil {
return fmt.Errorf("AMQP: Validation of updated IC failed: %v", err)
}
// Create the updated IC from old IC
updatedIC, err := updatedICReq.updatedIC(*s, true)
if err != nil {
return fmt.Errorf("AMQP: Unable to update IC %v : %v", s.Name, err)
}
// Finally update the IC in the DB
err = s.Update(updatedIC)
if err != nil {
return fmt.Errorf("AMQP: Unable to update IC %v in DB: %v", s.Name, err)
}
return err
}
func newTrue() *bool {
b := true
return &b
}
......@@ -45,7 +45,7 @@ func CheckPermissions(c *gin.Context, modeltype database.ModelName, operation da
return false, s
}
err = s.ByID(uint(ICID))
err = s.byID(uint(ICID))
if helper.DBError(c, err) {
return false, s
}
......
This diff is collapsed.
......@@ -23,7 +23,6 @@ package infrastructure_component
import (
"encoding/json"
"fmt"
"github.com/google/uuid"
"github.com/jinzhu/gorm/dialects/postgres"
"github.com/nsf/jsondiff"
......@@ -60,7 +59,6 @@ type validUpdatedIC struct {
Location string `form:"Location" validate:"omitempty"`
Description string `form:"Description" validate:"omitempty"`
StartParameterScheme postgres.Jsonb `form:"StartParameterScheme" validate:"omitempty"`
ManagedExternally *bool `form:"ManagedExternally" validate:"required"`
Uptime float64 `form:"Uptime" validate:"omitempty"`
}
......@@ -134,43 +132,36 @@ func (r *AddICRequest) createIC(receivedViaAMQP bool) (InfrastructureComponent,
*action.Properties.UUID = r.InfrastructureComponent.UUID
}
log.Println("########## AMQP: Sending request to create new IC")
log.Println("AMQP: Sending request to create new IC")
err = sendActionAMQP(action)
}
// s remains empty
s.UUID = r.InfrastructureComponent.UUID
s.WebsocketURL = r.InfrastructureComponent.WebsocketURL
s.APIURL = r.InfrastructureComponent.APIURL
s.Type = r.InfrastructureComponent.Type
s.Name = r.InfrastructureComponent.Name
s.Category = r.InfrastructureComponent.Category
s.Location = r.InfrastructureComponent.Location
s.Description = r.InfrastructureComponent.Description
s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme
s.ManagedExternally = *r.InfrastructureComponent.ManagedExternally
s.Uptime = -1.0 // no uptime available
if r.InfrastructureComponent.State != "" {
s.State = r.InfrastructureComponent.State
} else {
s.UUID = r.InfrastructureComponent.UUID
s.WebsocketURL = r.InfrastructureComponent.WebsocketURL
s.APIURL = r.InfrastructureComponent.APIURL
s.Type = r.InfrastructureComponent.Type
s.Name = r.InfrastructureComponent.Name
s.Category = r.InfrastructureComponent.Category
s.Location = r.InfrastructureComponent.Location
s.Description = r.InfrastructureComponent.Description
s.StartParameterScheme = r.InfrastructureComponent.StartParameterScheme
s.ManagedExternally = *r.InfrastructureComponent.ManagedExternally
s.Uptime = -1.0 // no uptime available
if r.InfrastructureComponent.State != "" {
s.State = r.InfrastructureComponent.State
} else {
s.State = "unknown"
}
// set last update to creation time of IC
s.StateUpdateAt = time.Now().Format(time.RFC1123)
s.State = "unknown"
}
// set last update to creation time of IC