Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
ACS
Public
Cloud
MAS
clonemap
Commits
52650d90
Commit
52650d90
authored
Feb 05, 2021
by
Stefan Dähling
Browse files
client refactoring
parent
2599bbaf
Pipeline
#405092
passed with stages
in 5 minutes and 5 seconds
Changes
11
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
pkg/agency/acc.go
View file @
52650d90
...
...
@@ -51,14 +51,14 @@ import (
"net/http"
"strconv"
agencycli
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/agency/client"
agencycli
ent
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/agency/client"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
)
// remoteAgency holds the channel used for sending messages to remot agency
type
remoteAgency
struct
{
msgIn
chan
schemas
.
ACLMessage
// ACL message inbox
agencyClient
*
agencycli
.
Client
agencyClient
*
agencycli
ent
.
Client
// agents map[int]*agent.Agent
}
...
...
@@ -109,7 +109,8 @@ func (agency *Agency) aclLookup(agentID int) (acl *ACL, err error) {
// check if remote agency is already known
if
ok
{
agency
.
logInfo
.
Println
(
"New remote agent "
,
agentID
,
" in known agency "
,
address
.
Agency
)
ag
=
newAgent
(
agentInfo
,
remAgency
.
msgIn
,
nil
,
nil
,
schemas
.
LogConfig
{},
nil
,
agency
.
logError
,
agency
.
logInfo
)
ag
=
newAgent
(
agentInfo
,
remAgency
.
msgIn
,
nil
,
nil
,
schemas
.
LogConfig
{},
nil
,
nil
,
agency
.
logError
,
agency
.
logInfo
)
}
else
{
agency
.
logInfo
.
Println
(
"New remote agent "
,
agentID
,
" in unknown agency "
,
address
.
Agency
)
// create new remote agency
...
...
@@ -130,7 +131,8 @@ func (agency *Agency) aclLookup(agentID int) (acl *ACL, err error) {
numRemAgencies
<
numLocalAgs
{
go
agency
.
receiveMsgs
()
}
ag
=
newAgent
(
agentInfo
,
remAgency
.
msgIn
,
nil
,
nil
,
schemas
.
LogConfig
{},
nil
,
agency
.
logError
,
agency
.
logInfo
)
ag
=
newAgent
(
agentInfo
,
remAgency
.
msgIn
,
nil
,
nil
,
schemas
.
LogConfig
{},
nil
,
nil
,
agency
.
logError
,
agency
.
logInfo
)
}
agency
.
mutex
.
Lock
()
agency
.
remoteAgents
[
agentID
]
=
ag
...
...
pkg/agency/agency.go
View file @
52650d90
...
...
@@ -58,8 +58,9 @@ import (
"syscall"
"time"
agencycli
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/agency/client"
amscli
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/ams/client"
agencyclient
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/agency/client"
amsclient
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/ams/client"
dfclient
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/df/client"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/status"
)
...
...
@@ -74,10 +75,11 @@ type Agency struct {
mutex
*
sync
.
Mutex
// mutex to protect agents from concurrent reads and writes
agentTask
func
(
*
Agent
)
error
msgIn
chan
[]
schemas
.
ACLMessage
logger
*
loggerClient
mqtt
*
mqttClient
amsClient
*
amscli
.
Client
agencyClient
*
agencycli
.
Client
logHandler
*
logHandler
mqttClient
*
mqttClient
dfClient
*
dfclient
.
Client
amsClient
*
amsclient
.
Client
agencyClient
*
agencyclient
.
Client
logInfo
*
log
.
Logger
// logger for info logging
logError
*
log
.
Logger
// logger for error logging
}
...
...
@@ -91,8 +93,9 @@ func StartAgency(task func(*Agent) error) (err error) {
remoteAgents
:
make
(
map
[
int
]
*
Agent
),
remoteAgencies
:
make
(
map
[
string
]
*
remoteAgency
),
msgIn
:
make
(
chan
[]
schemas
.
ACLMessage
,
1000
),
amsClient
:
amscli
.
New
(
time
.
Second
*
60
,
time
.
Second
*
1
,
4
),
agencyClient
:
agencycli
.
New
(
time
.
Second
*
60
,
time
.
Second
*
1
,
4
),
dfClient
:
dfclient
.
New
(
time
.
Second
*
60
,
time
.
Second
*
1
,
4
),
amsClient
:
amsclient
.
New
(
time
.
Second
*
60
,
time
.
Second
*
1
,
4
),
agencyClient
:
agencyclient
.
New
(
time
.
Second
*
60
,
time
.
Second
*
1
,
4
),
logError
:
log
.
New
(
os
.
Stderr
,
"[ERROR] "
,
log
.
LstdFlags
),
}
err
=
agency
.
init
()
...
...
@@ -146,9 +149,8 @@ func (agency *Agency) init() (err error) {
agency
.
info
.
ImageGroupID
,
err
=
strconv
.
Atoi
(
hostname
[
3
])
agency
.
info
.
ID
,
err
=
strconv
.
Atoi
(
hostname
[
5
])
agency
.
info
.
Name
=
temp
+
".mas"
+
hostname
[
1
]
+
"agencies"
agency
.
logger
=
newLoggerClient
(
agency
.
info
.
MASID
,
agency
.
logError
,
agency
.
logInfo
)
agency
.
mqtt
=
newMQTTClient
(
"mqtt"
,
1883
,
agency
.
info
.
Name
,
agency
.
logError
,
agency
.
logInfo
)
agency
.
mqtt
.
init
()
agency
.
logHandler
=
newLogHandler
(
agency
.
info
.
MASID
,
agency
.
logError
,
agency
.
logInfo
)
agency
.
mqttClient
=
newMQTTClient
(
"mqtt"
,
1883
,
agency
.
info
.
Name
,
agency
.
logError
,
agency
.
logInfo
)
agency
.
mutex
.
Unlock
()
return
}
...
...
@@ -164,7 +166,7 @@ func (agency *Agency) terminate(gracefulStop chan os.Signal) {
agency
.
localAgents
[
i
]
.
Terminate
()
}
agency
.
mutex
.
Unlock
()
agency
.
mqtt
.
close
()
agency
.
mqtt
Client
.
close
()
time
.
Sleep
(
time
.
Second
*
2
)
os
.
Exit
(
0
)
}
...
...
@@ -216,8 +218,8 @@ func (agency *Agency) createAgent(agentInfo schemas.AgentInfo) (err error) {
agentInfo
.
Status
.
Code
=
status
.
Starting
msgIn
:=
make
(
chan
schemas
.
ACLMessage
,
1000
)
agency
.
mutex
.
Lock
()
ag
:=
newAgent
(
agentInfo
,
msgIn
,
agency
.
aclLookup
,
agency
.
log
g
er
,
agency
.
info
.
Logger
,
agency
.
mqtt
,
agency
.
logError
,
agency
.
logInfo
)
ag
:=
newAgent
(
agentInfo
,
msgIn
,
agency
.
aclLookup
,
agency
.
log
Handl
er
,
agency
.
info
.
Logger
,
agency
.
mqtt
Client
,
agency
.
dfClient
,
agency
.
logError
,
agency
.
logInfo
)
agency
.
localAgents
[
agentInfo
.
ID
]
=
ag
agency
.
mutex
.
Unlock
()
ag
.
startAgent
(
agency
.
agentTask
)
...
...
pkg/agency/agent.go
View file @
52650d90
...
...
@@ -51,6 +51,7 @@ import (
"log"
"sync"
dfclient
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/df/client"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/status"
)
...
...
@@ -77,8 +78,8 @@ type Agent struct {
// newAgent creates a new agent
func
newAgent
(
info
schemas
.
AgentInfo
,
msgIn
chan
schemas
.
ACLMessage
,
aclLookup
func
(
int
)
(
*
ACL
,
error
),
log
*
log
gerClient
,
logConfig
schemas
.
LogConfig
,
mqtt
*
mqttClient
,
logErr
*
log
.
Logger
,
logInf
*
log
.
Logger
)
(
ag
*
Agent
)
{
aclLookup
func
(
int
)
(
*
ACL
,
error
),
log
*
log
Handler
,
logConfig
schemas
.
LogConfig
,
mqtt
*
mqttClient
,
dfClient
*
dfclient
.
Client
,
logErr
*
log
.
Logger
,
logInf
*
log
.
Logger
)
(
ag
*
Agent
)
{
ag
=
&
Agent
{
id
:
info
.
ID
,
nodeID
:
info
.
Spec
.
NodeID
,
...
...
@@ -96,7 +97,7 @@ func newAgent(info schemas.AgentInfo, msgIn chan schemas.ACLMessage,
// in, out := ag.ACL.getCommDataChannels()
ag
.
Logger
=
newLogger
(
ag
.
id
,
log
,
logConfig
,
ag
.
logError
,
ag
.
logInfo
)
ag
.
MQTT
=
newMQTT
(
ag
.
id
,
mqtt
,
ag
.
Logger
,
ag
.
logError
,
ag
.
logInfo
)
ag
.
DF
=
newDF
(
ag
.
masID
,
ag
.
id
,
ag
.
nodeID
,
ag
.
logError
,
ag
.
logInfo
)
ag
.
DF
=
newDF
(
ag
.
masID
,
ag
.
id
,
ag
.
nodeID
,
dfClient
,
ag
.
logError
,
ag
.
logInfo
)
return
}
...
...
pkg/agency/df.go
View file @
52650d90
...
...
@@ -51,7 +51,7 @@ import (
"sync"
"time"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/df/client"
dfclient
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/df/client"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
)
...
...
@@ -63,6 +63,7 @@ type DF struct {
mutex
*
sync
.
Mutex
registeredServices
map
[
string
]
schemas
.
Service
active
bool
// indicates if df is active (switch via env)
dfClient
*
dfclient
.
Client
logError
*
log
.
Logger
logInfo
*
log
.
Logger
}
...
...
@@ -97,7 +98,7 @@ func (df *DF) RegisterService(svc schemas.Service) (id string, err error) {
svc
.
NodeID
=
nodeID
svc
.
CreatedAt
=
time
.
Now
()
svc
.
ChangedAt
=
svc
.
CreatedAt
svc
,
_
,
err
=
c
lient
.
PostSvc
(
masID
,
svc
)
svc
,
_
,
err
=
df
.
dfC
lient
.
PostSvc
(
masID
,
svc
)
id
=
svc
.
GUID
if
err
!=
nil
{
return
...
...
@@ -118,7 +119,7 @@ func (df *DF) SearchForService(desc string) (svc []schemas.Service, err error) {
masID
:=
df
.
masID
df
.
mutex
.
Unlock
()
var
temp
[]
schemas
.
Service
temp
,
_
,
err
=
c
lient
.
GetSvc
(
masID
,
desc
)
temp
,
_
,
err
=
df
.
dfC
lient
.
GetSvc
(
masID
,
desc
)
if
err
!=
nil
{
return
}
...
...
@@ -141,7 +142,7 @@ func (df *DF) SearchForLocalService(desc string, dist float64) (svc []schemas.Se
nodeID
:=
df
.
nodeID
df
.
mutex
.
Unlock
()
var
temp
[]
schemas
.
Service
temp
,
_
,
err
=
c
lient
.
GetLocalSvc
(
masID
,
desc
,
nodeID
,
dist
)
temp
,
_
,
err
=
df
.
dfC
lient
.
GetLocalSvc
(
masID
,
desc
,
nodeID
,
dist
)
if
err
!=
nil
{
return
}
...
...
@@ -178,12 +179,13 @@ func (df *DF) DeregisterService(svcID string) (err error) {
df
.
mutex
.
Lock
()
delete
(
df
.
registeredServices
,
desc
)
df
.
mutex
.
Unlock
()
_
,
err
=
c
lient
.
DeleteSvc
(
masID
,
svcID
)
_
,
err
=
df
.
dfC
lient
.
DeleteSvc
(
masID
,
svcID
)
return
}
// newDF creates a new DF object
func
newDF
(
masID
int
,
agentID
int
,
nodeID
int
,
logErr
*
log
.
Logger
,
logInf
*
log
.
Logger
)
(
df
*
DF
)
{
func
newDF
(
masID
int
,
agentID
int
,
nodeID
int
,
dfCli
*
dfclient
.
Client
,
logErr
*
log
.
Logger
,
logInf
*
log
.
Logger
)
(
df
*
DF
)
{
df
=
&
DF
{
agentID
:
agentID
,
masID
:
masID
,
...
...
pkg/agency/logging.go
View file @
52650d90
...
...
@@ -52,14 +52,14 @@ import (
"sync"
"time"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/logger/client"
logclient
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/logger/client"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
)
// Logger logs data to logging service
type
Logger
struct
{
agentID
int
client
*
loggerClient
handler
*
logHandler
mutex
*
sync
.
Mutex
config
schemas
.
LogConfig
logError
*
log
.
Logger
...
...
@@ -95,7 +95,7 @@ func (log *Logger) NewLog(logType string, message string, data string) (err erro
LogType
:
logType
,
Message
:
message
,
AdditionalData
:
data
}
log
.
client
.
logIn
<-
msg
log
.
handler
.
logIn
<-
msg
return
}
...
...
@@ -109,12 +109,12 @@ func (log *Logger) UpdateState(state string) (err error) {
}
log
.
mutex
.
Unlock
()
agState
:=
schemas
.
State
{
MASID
:
log
.
client
.
masID
,
MASID
:
log
.
handler
.
masID
,
AgentID
:
log
.
agentID
,
Timestamp
:
time
.
Now
(),
State
:
state
}
// _, err = client.PutState(agState)
log
.
client
.
stateIn
<-
agState
log
.
handler
.
stateIn
<-
agState
return
}
...
...
@@ -128,17 +128,17 @@ func (log *Logger) RestoreState() (state string, err error) {
}
log
.
mutex
.
Unlock
()
var
agState
schemas
.
State
agState
,
_
,
err
=
client
.
GetState
(
log
.
client
.
masID
,
log
.
agentID
)
agState
,
_
,
err
=
log
.
handler
.
client
.
GetState
(
log
.
handler
.
masID
,
log
.
agentID
)
state
=
agState
.
State
return
}
// newLogger craetes a new object of type Logger
func
newLogger
(
agentID
int
,
client
*
loggerClient
,
config
schemas
.
LogConfig
,
logErr
*
log
.
Logger
,
func
newLogger
(
agentID
int
,
handler
*
logHandler
,
config
schemas
.
LogConfig
,
logErr
*
log
.
Logger
,
logInf
*
log
.
Logger
)
(
log
*
Logger
)
{
log
=
&
Logger
{
agentID
:
agentID
,
client
:
client
,
handler
:
handler
,
mutex
:
&
sync
.
Mutex
{},
config
:
config
,
logError
:
logErr
,
...
...
@@ -157,18 +157,19 @@ func (log *Logger) close() {
return
}
// log
gerClient
is the agency client for the logger
type
log
gerClient
struct
{
// log
Handler
is the agency client for the logger
type
log
Handler
struct
{
masID
int
logIn
chan
schemas
.
LogMessage
// logging inbox
stateIn
chan
schemas
.
State
active
bool
// indicates if logging is active (switch via env)
client
*
logclient
.
Client
logError
*
log
.
Logger
logInfo
*
log
.
Logger
}
// storeLogs periodically requests the logging service to store log messages
func
(
log
*
log
gerClient
)
storeLogs
()
(
err
error
)
{
func
(
log
*
log
Handler
)
storeLogs
()
(
err
error
)
{
if
log
.
active
{
for
{
if
len
(
log
.
logIn
)
>
0
{
...
...
@@ -178,7 +179,7 @@ func (log *loggerClient) storeLogs() (err error) {
logMsgs
[
i
]
=
<-
log
.
logIn
logMsgs
[
i
]
.
MASID
=
log
.
masID
}
_
,
err
=
client
.
PostLogs
(
log
.
masID
,
logMsgs
)
_
,
err
=
log
.
client
.
PostLogs
(
log
.
masID
,
logMsgs
)
if
err
!=
nil
{
log
.
logError
.
Println
(
err
)
for
i
:=
range
logMsgs
{
...
...
@@ -205,7 +206,7 @@ func (log *loggerClient) storeLogs() (err error) {
}
// storeState requests the logging service to store state
func
(
log
*
log
gerClient
)
storeState
()
(
err
error
)
{
func
(
log
*
log
Handler
)
storeState
()
(
err
error
)
{
if
log
.
active
{
for
{
var
states
[]
schemas
.
State
...
...
@@ -220,7 +221,7 @@ func (log *loggerClient) storeState() (err error) {
break
}
}
_
,
err
=
client
.
UpdateStates
(
states
[
0
]
.
MASID
,
states
)
_
,
err
=
log
.
client
.
UpdateStates
(
states
[
0
]
.
MASID
,
states
)
if
err
!=
nil
{
log
.
logError
.
Println
(
err
)
for
i
:=
range
states
{
...
...
@@ -233,13 +234,14 @@ func (log *loggerClient) storeState() (err error) {
return
}
// newLog
gerClient
creates an agency logger client
func
newLog
gerClient
(
masID
int
,
logErr
*
log
.
Logger
,
logInf
*
log
.
Logger
)
(
log
*
log
gerClient
)
{
log
=
&
log
gerClient
{
// newLog
Handler
creates an agency logger client
func
newLog
Handler
(
masID
int
,
logErr
*
log
.
Logger
,
logInf
*
log
.
Logger
)
(
log
*
log
Handler
)
{
log
=
&
log
Handler
{
masID
:
masID
,
active
:
false
,
logError
:
logErr
,
logInfo
:
logInf
,
client
:
logclient
.
New
(
time
.
Second
*
60
,
time
.
Second
*
1
,
4
),
}
temp
:=
os
.
Getenv
(
"CLONEMAP_LOGGING"
)
if
temp
==
"ON"
{
...
...
pkg/agency/mqtt.go
View file @
52650d90
...
...
@@ -282,6 +282,7 @@ func newMQTTClient(svc string, port int, name string, logErr *log.Logger,
cli
.
msgIn
=
make
(
chan
schemas
.
MQTTMessage
,
1000
)
cli
.
subscription
=
make
(
map
[
string
][]
*
MQTT
)
cli
.
logInfo
.
Println
(
"Created new MQTT client; status: "
,
cli
.
active
)
cli
.
init
()
return
}
...
...
pkg/ams/ams.go
View file @
52650d90
...
...
@@ -63,19 +63,21 @@ import (
// AMS contains storage and deployment object
type
AMS
struct
{
stor
storage
// interface for local or distributed storage
depl
deployment
// interface for local or cloud deployment
logInfo
*
log
.
Logger
// logger for info logging
logError
*
log
.
Logger
// logger for error logging
agencyClient
*
agcli
.
Client
stor
storage
// interface for local or distributed storage
depl
deployment
// interface for local or cloud deployment
logInfo
*
log
.
Logger
// logger for info logging
logError
*
log
.
Logger
// logger for error logging
agencyCli
*
agcli
.
Client
dfCli
*
dfcli
.
Client
}
// StartAMS starts an AMS instance. It initializes the cluster and storage object and starts API
// server.
func
StartAMS
()
(
err
error
)
{
ams
:=
&
AMS
{
logError
:
log
.
New
(
os
.
Stderr
,
"[ERROR] "
,
log
.
LstdFlags
),
agencyClient
:
agcli
.
New
(
time
.
Second
*
60
,
time
.
Second
*
1
,
4
),
logError
:
log
.
New
(
os
.
Stderr
,
"[ERROR] "
,
log
.
LstdFlags
),
agencyCli
:
agcli
.
New
(
time
.
Second
*
60
,
time
.
Second
*
1
,
4
),
dfCli
:
dfcli
.
New
(
time
.
Second
*
60
,
time
.
Second
*
1
,
4
),
}
// create storage and deployment object according to specified deployment type
err
=
ams
.
init
()
...
...
@@ -248,7 +250,7 @@ func (ams *AMS) startMAS(masID int, masInfo schemas.MASInfo, numAgencies []int)
}
ams
.
logInfo
.
Println
(
"Stored MAS data"
)
if
os
.
Getenv
(
"CLONEMAP_DEPLOYMENT_TYPE"
)
==
"local"
{
_
,
err
=
df
c
li
.
PostGraph
(
masID
,
masInfo
.
Graph
)
_
,
err
=
ams
.
df
C
li
.
PostGraph
(
masID
,
masInfo
.
Graph
)
if
err
!=
nil
{
ams
.
logInfo
.
Println
(
err
.
Error
())
// return
...
...
@@ -459,7 +461,7 @@ func (ams *AMS) removeAgent(masID int, agentID int) (err error) {
if
err
!=
nil
{
return
}
_
,
err
=
ams
.
agencyCli
ent
.
DeleteAgent
(
addr
.
Agency
,
agentID
)
_
,
err
=
ams
.
agencyCli
.
DeleteAgent
(
addr
.
Agency
,
agentID
)
return
}
...
...
@@ -467,7 +469,7 @@ func (ams *AMS) removeAgent(masID int, agentID int) (err error) {
// postAgentToAgency sends a post request to agency with info about agent to start
func
(
ams
*
AMS
)
postAgentToAgency
(
agentInfo
schemas
.
AgentInfo
)
(
err
error
)
{
var
httpStatus
int
httpStatus
,
err
=
ams
.
agencyCli
ent
.
PostAgent
(
agentInfo
.
Address
.
Agency
,
agentInfo
)
httpStatus
,
err
=
ams
.
agencyCli
.
PostAgent
(
agentInfo
.
Address
.
Agency
,
agentInfo
)
if
err
!=
nil
{
return
}
...
...
pkg/df/client/client.go
View file @
52650d90
...
...
@@ -59,21 +59,19 @@ import (
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
)
// Host contains the host name of df (IP or k8s service name)
var
Host
=
"df"
// Port contains the port on which ams is listening
var
Port
=
12000
var
httpClient
=
&
http
.
Client
{
Timeout
:
time
.
Second
*
60
}
var
delay
=
time
.
Second
*
1
var
numRetries
=
4
// Client is the ams client
type
Client
struct
{
httpClient
*
http
.
Client
// http client
Host
string
// ams host name
Port
int
// ams port
delay
time
.
Duration
// delay between two retries
numRetries
int
// number of retries
}
// Alive tests if alive
func
Alive
()
(
alive
bool
)
{
func
(
cli
*
Client
)
Alive
()
(
alive
bool
)
{
alive
=
false
_
,
httpStatus
,
err
:=
httpretry
.
Get
(
httpClient
,
"http://"
+
Host
+
":"
+
strconv
.
Itoa
(
Port
)
+
"/api/alive"
,
time
.
Second
*
2
,
2
)
_
,
httpStatus
,
err
:=
httpretry
.
Get
(
cli
.
httpClient
,
cli
.
prefix
()
+
"/api/alive"
,
time
.
Second
*
2
,
2
)
if
err
==
nil
&&
httpStatus
==
http
.
StatusOK
{
alive
=
true
}
...
...
@@ -81,11 +79,12 @@ func Alive() (alive bool) {
}
// PostSvc post an mas
func
PostSvc
(
masID
int
,
svc
schemas
.
Service
)
(
retSvc
schemas
.
Service
,
httpStatus
int
,
err
error
)
{
func
(
cli
*
Client
)
PostSvc
(
masID
int
,
svc
schemas
.
Service
)
(
retSvc
schemas
.
Service
,
httpStatus
int
,
err
error
)
{
var
body
[]
byte
js
,
_
:=
json
.
Marshal
(
svc
)
body
,
httpStatus
,
err
=
httpretry
.
Post
(
httpClient
,
"http://"
+
Host
+
":"
+
strconv
.
Itoa
(
Port
)
+
"/api/df/"
+
strconv
.
Itoa
(
masID
)
+
"/svc"
,
"application/json"
,
js
,
time
.
Second
*
2
,
2
)
body
,
httpStatus
,
err
=
httpretry
.
Post
(
cli
.
httpClient
,
cli
.
prefix
()
+
"/api/df/"
+
strconv
.
Itoa
(
masID
)
+
"/svc"
,
"application/json"
,
js
,
time
.
Second
*
2
,
2
)
if
err
!=
nil
{
return
}
...
...
@@ -94,10 +93,11 @@ func PostSvc(masID int, svc schemas.Service) (retSvc schemas.Service, httpStatus
}
// GetSvc requests mas information
func
GetSvc
(
masID
int
,
desc
string
)
(
svc
[]
schemas
.
Service
,
httpStatus
int
,
err
error
)
{
func
(
cli
*
Client
)
GetSvc
(
masID
int
,
desc
string
)
(
svc
[]
schemas
.
Service
,
httpStatus
int
,
err
error
)
{
var
body
[]
byte
body
,
httpStatus
,
err
=
httpretry
.
Get
(
httpClient
,
"http://"
+
Host
+
":"
+
strconv
.
Itoa
(
Port
)
+
"/api/df/"
+
strconv
.
Itoa
(
masID
)
+
"/svc/desc/"
+
desc
,
time
.
Second
*
2
,
2
)
body
,
httpStatus
,
err
=
httpretry
.
Get
(
cli
.
httpClient
,
cli
.
prefix
()
+
"/api/df/"
+
strconv
.
Itoa
(
masID
)
+
"/svc/desc/"
+
desc
,
time
.
Second
*
2
,
2
)
if
err
!=
nil
{
return
}
...
...
@@ -106,11 +106,11 @@ func GetSvc(masID int, desc string) (svc []schemas.Service, httpStatus int, err
}
// GetLocalSvc requests mas information
func
GetLocalSvc
(
masID
int
,
desc
string
,
nodeID
int
,
dist
float64
)
(
svc
[]
schemas
.
Service
,
httpStatus
int
,
err
error
)
{
func
(
cli
*
Client
)
GetLocalSvc
(
masID
int
,
desc
string
,
nodeID
int
,
dist
float64
)
(
svc
[]
schemas
.
Service
,
httpStatus
int
,
err
error
)
{
var
body
[]
byte
body
,
httpStatus
,
err
=
httpretry
.
Get
(
httpClient
,
"http://"
+
Host
+
":"
+
strconv
.
Itoa
(
Port
)
+
"/api/df/"
+
strconv
.
Itoa
(
masID
)
+
"/svc/desc/"
+
desc
+
"/node/"
+
strconv
.
Itoa
(
nodeID
)
+
"/dist/"
+
body
,
httpStatus
,
err
=
httpretry
.
Get
(
cli
.
httpClient
,
cli
.
prefix
()
+
"/api/df/"
+
strconv
.
Itoa
(
masID
)
+
"/svc/desc/"
+
desc
+
"/node/"
+
strconv
.
Itoa
(
nodeID
)
+
"/dist/"
+
fmt
.
Sprintf
(
"%f"
,
dist
),
time
.
Second
*
2
,
2
)
if
err
!=
nil
{
return
...
...
@@ -120,25 +120,25 @@ func GetLocalSvc(masID int, desc string, nodeID int, dist float64) (svc []schema
}
// DeleteSvc removes service from df
func
DeleteSvc
(
masID
int
,
svcID
string
)
(
httpStatus
int
,
err
error
)
{
httpStatus
,
err
=
httpretry
.
Delete
(
httpClient
,
"http://"
+
Host
+
":
"
+
strconv
.
Itoa
(
Port
)
+
"/api/df/"
+
strconv
.
Itoa
(
masID
)
+
"/svc/id/"
+
svcID
,
nil
,
time
.
Second
*
2
,
2
)
func
(
cli
*
Client
)
DeleteSvc
(
masID
int
,
svcID
string
)
(
httpStatus
int
,
err
error
)
{
httpStatus
,
err
=
httpretry
.
Delete
(
cli
.
httpClient
,
cli
.
prefix
()
+
"/api/df/
"
+
strconv
.
Itoa
(
masID
)
+
"/svc/id/"
+
svcID
,
nil
,
time
.
Second
*
2
,
2
)
return
}
// PostGraph post the graph of a mas
func
PostGraph
(
masID
int
,
gr
schemas
.
Graph
)
(
httpStatus
int
,
err
error
)
{
func
(
cli
*
Client
)
PostGraph
(
masID
int
,
gr
schemas
.
Graph
)
(
httpStatus
int
,
err
error
)
{
js
,
_
:=
json
.
Marshal
(
gr
)
_
,
httpStatus
,
err
=
httpretry
.
Post
(
httpClient
,
"http://"
+
Host
+
":"
+
strconv
.
Itoa
(
Port
)
+
"/api/df/"
+
strconv
.
Itoa
(
masID
)
+
"/graph"
,
"application/json"
,
js
,
time
.
Second
*
2
,
2
)
_
,
httpStatus
,
err
=
httpretry
.
Post
(
cli
.
httpClient
,
cli
.
prefix
()
+
"/api/df/"
+
strconv
.
Itoa
(
masID
)
+
"/graph"
,
"application/json"
,
js
,
time
.
Second
*
2
,
2
)
return
}
// GetGraph returns graph of mas
func
GetGraph
(
masID
int
)
(
graph
schemas
.
Graph
,
httpStatus
int
,
err
error
)
{
func
(
cli
*
Client
)
GetGraph
(
masID
int
)
(
graph
schemas
.
Graph
,
httpStatus
int
,
err
error
)
{
var
body
[]
byte
body
,
httpStatus
,
err
=
httpretry
.
Get
(
httpClient
,
"http://"
+
Host
+
":"
+
strconv
.
Itoa
(
Port
)
+
"/api/df/"
+
strconv
.
Itoa
(
masID
)
+
"/graph"
,
time
.
Second
*
2
,
2
)
body
,
httpStatus
,
err
=
httpretry
.
Get
(
cli
.
httpClient
,
cli
.
prefix
()
+
"/api/df/"
+
strconv
.
Itoa
(
masID
)
+
"/graph"
,
time
.
Second
*
2
,
2
)
if
err
!=
nil
{
return
}
...
...
@@ -146,16 +146,9 @@ func GetGraph(masID int) (graph schemas.Graph, httpStatus int, err error) {
return
}
// Init initializes the client
func
Init
(
timeout
time
.
Duration
,
del
time
.
Duration
,
numRet
int
)
{
httpClient
.
Timeout
=
timeout
delay
=
del
numRetries
=
numRet
}
func
getIP
()
(
ret
string
)
{
func
(
cli
*
Client
)
getIP
()
(
ret
string
)
{
for
{
ips
,
err
:=
net
.
LookupHost
(
Host
)
ips
,
err
:=
net
.
LookupHost
(
cli
.
Host
)
if
len
(
ips
)
>
0
&&
err
==
nil
{
ret
=
ips
[
0
]
break
...
...
@@ -163,3 +156,20 @@ func getIP() (ret string) {
}
return
}
func
(
cli
*
Client
)
prefix
()
(
ret
string
)
{
ret
=
"http://"
+
cli
.
Host
+
":"
+
strconv
.
Itoa
(
cli
.
Port
)
return
}
// New creates a new AMS client
func
New
(
timeout
time
.
Duration
,
del
time
.
Duration
,
numRet
int
)
(
cli
*
Client
)
{
cli
=
&
Client
{
httpClient
:
&
http
.
Client
{
Timeout
:
timeout
},
Host
:
"df"
,
Port
:
12000
,
delay
:
del
,
numRetries
:
numRet
,
}
return
}
pkg/frontend/frontend.go
View file @
52650d90
...
...
@@ -47,18 +47,24 @@ package frontend
import
(
"time"
amscli
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/ams/client"
amsclient
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/ams/client"
dfclient
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/df/client"
logclient
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/logger/client"
)
// Frontend frontend
type
Frontend
struct
{
amsClient
*
amscli
.
Client
amsClient
*
amsclient
.
Client
dfClient
*
dfclient
.
Client
logClient
*
logclient
.
Client
}
// StartFrontend start
func
StartFrontend
()
(
err
error
)
{
fe
:=
&
Frontend
{
amsClient
:
amscli
.
New
(
time
.
Second
*
60
,
time
.
Second
*
1
,
4
),
amsClient
:
amsclient
.
New
(
time
.
Second
*
60
,
time
.
Second
*
1
,
4
),
dfClient
:
dfclient
.
New
(
time
.
Second
*
60
,
time
.
Second
*
1
,
4
),
logClient
:
logclient
.
New
(
time
.
Second
*
60
,
time
.
Second
*
1
,
4
),
}
fe
.
listen
()
return
...
...
pkg/frontend/platform.go
View file @
52650d90
...
...
@@ -49,8 +49,6 @@ import (
"net/http"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/common/httpreply"
dfclient
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/df/client"
logclient
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/logger/client"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
)
...
...
@@ -90,8 +88,8 @@ func (fe *Frontend) handleModules(w http.ResponseWriter, r *http.Request) (cmapE
// getModuleStatus returns the on/off status of all modules
func
(
fe
*
Frontend
)
getModuleStatus
()
(
mods
schemas
.
ModuleStatus
,
err
error
)
{
mods
.
Logging
=
log
c
lient
.
Alive
()
mods
.
Logging
=
fe
.
log
C
lient
.
Alive
()
mods
.
Core
=
fe
.
amsClient
.
Alive
()
mods
.
DF
=
df
c
lient
.
Alive
()
mods
.
DF
=
fe
.
df
C
lient
.
Alive
()
return
}
pkg/logger/client/client.go
View file @
52650d90
...
...
@@ -55,21 +55,19 @@ import (
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
)
// Host contains the host name of logger (IP or k8s service name)
var
Host
=
"logger"
// Port contains the port on which logger is listening
var
Port
=
11000