Commit 931119dc authored by Stefan Dähling's avatar Stefan Dähling
Browse files

move agent df to client

parent 14025b2a
Pipeline #439687 passed with stages
in 22 minutes and 31 seconds
......@@ -50,6 +50,7 @@ import (
"time"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/agency"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
// "git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
)
......@@ -67,5 +68,9 @@ func task(ag *agency.Agent) (err error) {
msg, _ := ag.ACL.NewMessage(recv, 0, 0, "test message")
ag.ACL.SendMessage(msg)
ag.Logger.NewLog("app", "This is agent "+strconv.Itoa(id), "")
svc := schemas.Service{
Desc: "agent" + strconv.Itoa(id),
}
ag.DF.RegisterService(svc)
return
}
......@@ -91,7 +91,6 @@ func StartAgency(task func(*Agent) error) (err error) {
remoteAgents: make(map[int]*Agent),
remoteAgencies: make(map[string]*remoteAgency),
msgIn: make(chan []schemas.ACLMessage, 1000),
dfClient: client.NewDFClient(time.Second*60, time.Second*1, 4),
amsClient: client.NewAMSClient(time.Second*60, time.Second*1, 4),
agencyClient: client.NewAgencyClient(time.Second*60, time.Second*1, 4),
logError: log.New(os.Stderr, "[ERROR] ", log.LstdFlags),
......@@ -191,6 +190,8 @@ func (agency *Agency) init() (err error) {
agency.mutex.Unlock()
return
}
agency.dfClient = client.NewDFClient(agency.info.DF.Host, agency.info.DF.Port,
time.Second*60, time.Second*1, 4)
agency.mqttClient = newMQTTClient("mqtt", 1883, agency.info.Name, agency.logError,
agency.logInfo)
agency.mutex.Unlock()
......
......@@ -72,7 +72,7 @@ type Agent struct {
ACL *ACL // agent communication
Logger *client.AgentLogger // logger object
MQTT *MQTT // mqtt object
DF *DF
DF *client.AgentDF
logError *log.Logger
logInfo *log.Logger
active bool
......@@ -102,7 +102,7 @@ func newAgent(info schemas.AgentInfo, msgIn chan schemas.ACLMessage,
}
ag.ACL = newACL(info.ID, msgIn, aclLookup, ag.Logger, logErr, logInf)
ag.MQTT = newMQTT(ag.id, mqtt, ag.Logger, ag.logError, ag.logInfo)
ag.DF = newDF(ag.masID, ag.id, ag.nodeID, dfClient, ag.logError, ag.logInfo)
ag.DF = client.NewAgentDF(ag.masID, ag.id, ag.nodeID, dfClient, ag.logError, ag.logInfo)
return
}
......@@ -194,5 +194,5 @@ func (agent *Agent) Terminate() {
agent.ACL.close()
agent.Logger.Close()
agent.MQTT.close()
agent.DF.close()
agent.DF.Close()
}
......@@ -76,7 +76,7 @@ func StartAMS() (err error) {
ams := &AMS{
logError: log.New(os.Stderr, "[ERROR] ", log.LstdFlags),
agencyClient: client.NewAgencyClient(time.Second*60, time.Second*1, 4),
dfClient: client.NewDFClient(time.Second*60, time.Second*1, 4),
dfClient: client.NewDFClient("df", 12000, time.Second*60, time.Second*1, 4),
}
// create storage and deployment object according to specified deployment type
err = ams.init()
......@@ -335,7 +335,7 @@ func (ams *AMS) configureMAS(masSpec schemas.MASSpec) (masInfo schemas.MASInfo,
}
// MAS configuration
masInfo.Config = masSpec.Config
masInfo.Config = ams.checkModules(masSpec.Config)
masInfo.Graph = masSpec.Graph
// total number of agents and total number of agencies
......@@ -387,7 +387,9 @@ func (ams *AMS) configureMAS(masSpec schemas.MASSpec) (masInfo schemas.MASInfo,
agencyInfo := schemas.AgencyInfo{
ImageGroupID: i,
ID: j,
Logger: masSpec.Config.Logger,
Logger: masInfo.Config.Logger,
DF: masInfo.Config.DF,
MQTT: masInfo.Config.MQTT,
Name: "-im-" + strconv.Itoa(i) + "-agency-" + strconv.Itoa(j),
}
for k := 0; k < masSpec.Config.NumAgentsPerAgency; k++ {
......@@ -404,6 +406,34 @@ func (ams *AMS) configureMAS(masSpec schemas.MASSpec) (masInfo schemas.MASInfo,
return
}
// checkModules checks if used modules are active
func (ams *AMS) checkModules(configIn schemas.MASConfig) (configOut schemas.MASConfig) {
configOut = configIn
if configOut.DF.Active {
if configOut.DF.Host == "" {
configOut.DF.Host = "df"
}
if configOut.DF.Port == 0 {
configOut.DF.Port = 12000
}
dfClient := client.NewDFClient(configOut.DF.Host, configOut.DF.Port, time.Second,
time.Second, 3)
configOut.DF.Active = dfClient.Alive()
}
if configOut.Logger.Active {
if configOut.Logger.Host == "" {
configOut.Logger.Host = "logger"
}
if configOut.Logger.Port == 0 {
configOut.Logger.Port = 11000
}
logClient := client.NewLoggerClient(configOut.Logger.Host, configOut.Logger.Port,
time.Second, time.Second, 3)
configOut.Logger.Active = logClient.Alive()
}
return
}
// removeMAS removes specified mas if it exists
func (ams *AMS) removeMAS(masID int) (err error) {
err = ams.depl.deleteMAS(masID)
......
......@@ -70,7 +70,7 @@ func TestAMS(t *testing.T) {
logError: log.New(os.Stderr, "[ERROR] ", log.LstdFlags),
logInfo: log.New(os.Stdout, "[INFO] ", log.LstdFlags),
agencyClient: client.NewAgencyClient(time.Second*60, time.Second*1, 4),
dfClient: client.NewDFClient(time.Second*60, time.Second*1, 4),
dfClient: client.NewDFClient("df", 12000, time.Second*60, time.Second*1, 4),
}
// create storage and deployment object according to specified deployment type
err := ams.init()
......
......@@ -48,10 +48,14 @@ import (
//"fmt"
"encoding/json"
"errors"
"fmt"
"log"
"net"
"net/http"
"os"
"strconv"
"sync"
"time"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/common/httpretry"
......@@ -61,8 +65,8 @@ import (
// DFClient is the ams client
type DFClient struct {
httpClient *http.Client // http client
Host string // ams host name
Port int // ams port
host string // ams host name
port int // ams port
delay time.Duration // delay between two retries
numRetries int // number of retries
}
......@@ -147,7 +151,7 @@ func (cli *DFClient) GetGraph(masID int) (graph schemas.Graph, httpStatus int, e
func (cli *DFClient) getIP() (ret string) {
for {
ips, err := net.LookupHost(cli.Host)
ips, err := net.LookupHost(cli.host)
if len(ips) > 0 && err == nil {
ret = ips[0]
break
......@@ -157,18 +161,179 @@ func (cli *DFClient) getIP() (ret string) {
}
func (cli *DFClient) prefix() (ret string) {
ret = "http://" + cli.Host + ":" + strconv.Itoa(cli.Port)
ret = "http://" + cli.host + ":" + strconv.Itoa(cli.port)
return
}
// NewDFClient creates a new AMS client
func NewDFClient(timeout time.Duration, del time.Duration, numRet int) (cli *DFClient) {
func NewDFClient(host string, port int, timeout time.Duration, del time.Duration,
numRet int) (cli *DFClient) {
cli = &DFClient{
httpClient: &http.Client{Timeout: timeout},
Host: "df",
Port: 12000,
host: host,
port: port,
delay: del,
numRetries: numRet,
}
return
}
// AgentDF provides access to the functionality of the DF
type AgentDF struct {
agentID int
masID int
nodeID int
mutex *sync.Mutex
registeredServices map[string]schemas.Service
active bool // indicates if df is active (switch via env)
dfClient *DFClient
logError *log.Logger
logInfo *log.Logger
}
// RegisterService registers a new service with the DF
func (df *AgentDF) RegisterService(svc schemas.Service) (id string, err error) {
df.mutex.Lock()
if !df.active {
df.mutex.Unlock()
return
}
df.mutex.Unlock()
id = "-1"
if svc.Desc == "" {
err = errors.New("empty description not allowed")
return
}
df.mutex.Lock()
_, ok := df.registeredServices[svc.Desc]
df.mutex.Unlock()
if ok {
err = errors.New("service already registered")
return
}
df.mutex.Lock()
masID := df.masID
agentID := df.agentID
nodeID := df.nodeID
df.mutex.Unlock()
svc.MASID = masID
svc.AgentID = agentID
svc.NodeID = nodeID
svc.CreatedAt = time.Now()
svc.ChangedAt = svc.CreatedAt
svc, _, err = df.dfClient.PostSvc(masID, svc)
id = svc.GUID
if err != nil {
return
}
df.mutex.Lock()
df.registeredServices[svc.Desc] = svc
df.mutex.Unlock()
return
}
// SearchForService search for a service with given description
func (df *AgentDF) SearchForService(desc string) (svc []schemas.Service, err error) {
df.mutex.Lock()
if !df.active {
df.mutex.Unlock()
return
}
masID := df.masID
df.mutex.Unlock()
var temp []schemas.Service
temp, _, err = df.dfClient.GetSvc(masID, desc)
if err != nil {
return
}
for i := range temp {
if temp[i].AgentID != df.agentID {
svc = append(svc, temp[i])
}
}
return
}
// SearchForLocalService search for a service with given description
func (df *AgentDF) SearchForLocalService(desc string, dist float64) (svc []schemas.Service, err error) {
df.mutex.Lock()
if !df.active {
df.mutex.Unlock()
return
}
masID := df.masID
nodeID := df.nodeID
df.mutex.Unlock()
var temp []schemas.Service
temp, _, err = df.dfClient.GetLocalSvc(masID, desc, nodeID, dist)
if err != nil {
return
}
for i := range temp {
if temp[i].AgentID != df.agentID {
svc = append(svc, temp[i])
}
}
return
}
// DeregisterService registers a new service with the DF
func (df *AgentDF) DeregisterService(svcID string) (err error) {
df.mutex.Lock()
if !df.active {
df.mutex.Unlock()
return
}
df.mutex.Unlock()
desc := ""
df.mutex.Lock()
masID := df.masID
for i := range df.registeredServices {
if df.registeredServices[i].GUID == svcID {
desc = i
break
}
}
df.mutex.Unlock()
if desc == "" {
err = errors.New("no such service")
return
}
df.mutex.Lock()
delete(df.registeredServices, desc)
df.mutex.Unlock()
_, err = df.dfClient.DeleteSvc(masID, svcID)
return
}
// NewAgentDF creates a new DF object
func NewAgentDF(masID int, agentID int, nodeID int, dfCli *DFClient, logErr *log.Logger,
logInf *log.Logger) (df *AgentDF) {
df = &AgentDF{
agentID: agentID,
masID: masID,
nodeID: nodeID,
mutex: &sync.Mutex{},
active: false,
logError: logErr,
logInfo: logInf,
}
act := os.Getenv("CLONEMAP_DF")
if act == "ON" {
df.active = true
}
df.registeredServices = make(map[string]schemas.Service)
return
}
// close closes the DF module
func (df *AgentDF) Close() {
for d := range df.registeredServices {
svc := df.registeredServices[d]
df.DeregisterService(svc.GUID)
}
df.mutex.Lock()
df.logInfo.Println("Closing DF of agent ", df.agentID)
df.active = false
df.mutex.Unlock()
}
......@@ -141,7 +141,7 @@ func (cli *LoggerClient) prefix() (ret string) {
// NewLoggerClient creates a new Logger client
func NewLoggerClient(host string, port int, timeout time.Duration, del time.Duration,
numRet int) (cli *LoggerClient, err error) {
numRet int) (cli *LoggerClient) {
cli = &LoggerClient{
httpClient: &http.Client{Timeout: timeout},
host: host,
......@@ -149,9 +149,6 @@ func NewLoggerClient(host string, port int, timeout time.Duration, del time.Dura
delay: del,
numRetries: numRet,
}
if !cli.Alive() {
err = errors.New("Logger Module is not running on " + host + ":" + strconv.Itoa(port))
}
return
}
......@@ -259,12 +256,7 @@ func NewLogCollector(masID int, config schemas.LoggerConfig, logErr *log.Logger,
config: config,
}
if logCol.config.Active {
logCol.client, err = NewLoggerClient(config.Host, config.Port, time.Second*60,
time.Second*1, 4)
if err != nil {
logCol.config.Active = false
return
}
logCol.client = NewLoggerClient(config.Host, config.Port, time.Second*60, time.Second*1, 4)
}
logCol.logIn = make(chan schemas.LogMessage, 10000)
logCol.stateIn = make(chan schemas.State, 10000)
......
......@@ -68,10 +68,10 @@ type Frontend struct {
func StartFrontend() (err error) {
fe := &Frontend{
amsClient: client.NewAMSClient(time.Second*60, time.Second*1, 4),
dfClient: client.NewDFClient(time.Second*60, time.Second*1, 4),
dfClient: client.NewDFClient("df", 12000, time.Second*60, time.Second*1, 4),
logError: log.New(os.Stderr, "[ERROR] ", log.LstdFlags),
}
fe.logClient, _ = client.NewLoggerClient("logger", 11000, time.Second*60, time.Second*1, 4)
fe.logClient = client.NewLoggerClient("logger", 11000, time.Second*60, time.Second*1, 4)
logType := os.Getenv("CLONEMAP_LOG_LEVEL")
switch logType {
case "info":
......
......@@ -171,6 +171,8 @@ type AgencyInfo struct {
ID int `json:"id"` // within image group unique ID (contained in name)
ImageGroupID int `json:"imid"` // ID of agency image group
Logger LoggerConfig `json:"logger"` // logger configuration
MQTT MQTTConfig `json:"mqtt"` // MQTT configuration
DF DFConfig `json:"df"` // DF configuration
Agents []int `json:"agents"`
Status Status `json:"status"`
}
......@@ -182,6 +184,8 @@ type AgencyInfoFull struct {
ID int `json:"id"` // within image group unique ID (contained in name)
ImageGroupID int `json:"imid"` // ID of agency image group
Logger LoggerConfig `json:"logger"` // logger configuration
MQTT MQTTConfig `json:"mqtt"` // MQTT configuration
DF DFConfig `json:"df"` // DF configuration
Agents []AgentInfo `json:"agents"`
Status Status `json:"status"`
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment