Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
ACS
Public
Cloud
MAS
clonemap
Commits
8cd5651d
Commit
8cd5651d
authored
Apr 01, 2021
by
Stefan Dähling
Browse files
move agent logger to client; cleanup
parent
d2fc848b
Pipeline
#439392
failed with stages
in 5 minutes and 54 seconds
Changes
36
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
cmd/benchmark/main.go
View file @
8cd5651d
...
...
@@ -56,5 +56,4 @@ func main() {
if
err
!=
nil
{
fmt
.
Println
(
err
)
}
return
}
cmd/frontend/main.go
View file @
8cd5651d
...
...
@@ -48,5 +48,4 @@ import "git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/frontend"
func
main
()
{
frontend
.
StartFrontend
()
return
}
docs/example.json
View file @
8cd5651d
...
...
@@ -23,10 +23,12 @@
},
"agents"
:[
{
"nodeid"
:
0
"nodeid"
:
0
,
"name"
:
"test"
},
{
"nodeid"
:
0
"nodeid"
:
0
,
"name"
:
"test"
}
]
}
...
...
examples/benchmark/benchmark.go
View file @
8cd5651d
...
...
@@ -335,9 +335,9 @@ func cpuLoad(ag *agency.Agent, config CustomAgentData) (err error) {
}
T
:=
float32
(
config
.
T
)
Tr
:=
config
.
Tr
a
:=
make
([]
float64
,
100
,
100
)
b
:=
make
([]
float64
,
100
,
100
)
c
:=
make
([]
float64
,
100
,
100
)
a
:=
make
([]
float64
,
100
)
b
:=
make
([]
float64
,
100
)
c
:=
make
([]
float64
,
100
)
for
i
:=
0
;
i
<
100
;
i
++
{
a
[
i
]
=
rand
.
Float64
()
b
[
i
]
=
rand
.
Float64
()
...
...
pkg/agency/acc.go
View file @
8cd5651d
...
...
@@ -99,7 +99,7 @@ func (agency *Agency) aclLookup(agentID int) (acl *ACL, err error) {
return
}
if
address
.
Agency
==
""
{
err
=
errors
.
New
(
"
R
eceiver is not active"
)
err
=
errors
.
New
(
"
r
eceiver is not active"
)
return
}
var
remAgency
*
remoteAgency
...
...
@@ -167,7 +167,7 @@ func (remAgency *remoteAgency) sendMsgs(remName string, localName string, logErr
if
num
>
99
{
num
=
99
}
msgs
:=
make
([]
schemas
.
ACLMessage
,
num
+
1
,
num
+
1
)
msgs
:=
make
([]
schemas
.
ACLMessage
,
num
+
1
)
msgs
[
0
]
=
msg
msgs
[
0
]
.
AgencySender
=
localName
msgs
[
0
]
.
AgencyReceiver
=
remName
...
...
pkg/agency/acl.go
View file @
8cd5651d
...
...
@@ -53,6 +53,7 @@ import (
"sync"
"time"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/client"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
)
...
...
@@ -64,37 +65,37 @@ type ACL struct {
mutex
*
sync
.
Mutex
// mutex for address book
// commIn chan int // ID of agents that have sent messages
// commOut chan int // ID of agents that messages have been sent to
agentID
int
active
bool
aclLookup
func
(
int
)
(
*
ACL
,
error
)
cmapL
ogger
*
Logger
logError
*
log
.
Logger
logInfo
*
log
.
Logger
agentID
int
active
bool
aclLookup
func
(
int
)
(
*
ACL
,
error
)
l
ogger
*
client
.
Agent
Logger
logError
*
log
.
Logger
logInfo
*
log
.
Logger
}
// commData stores data about communication with other agent
type
commData
struct
{
numMsgSent
int
// number of messages sent to this agent
numMsgRecv
int
// number of messages received from this agent
}
//
type commData struct {
//
numMsgSent int // number of messages sent to this agent
//
numMsgRecv int // number of messages received from this agent
//
}
// newACL creates a new ACL object
func
newACL
(
agentID
int
,
msgIn
chan
schemas
.
ACLMessage
,
aclLookup
func
(
int
)
(
*
ACL
,
error
),
cmaplog
*
Logger
,
logErr
*
log
.
Logger
,
logInf
*
log
.
Logger
)
(
acl
*
ACL
)
{
aclLookup
func
(
int
)
(
*
ACL
,
error
),
cmaplog
*
client
.
Agent
Logger
,
logErr
*
log
.
Logger
,
logInf
*
log
.
Logger
)
(
acl
*
ACL
)
{
acl
=
&
ACL
{
mutex
:
&
sync
.
Mutex
{},
msgIn
:
msgIn
,
msgInProtocol
:
make
(
map
[
int
]
chan
schemas
.
ACLMessage
),
// commIn: make(chan int, 5000),
// commOut: make(chan int, 5000),
addrBook
:
make
(
map
[
int
]
*
ACL
),
agentID
:
agentID
,
active
:
true
,
aclLookup
:
aclLookup
,
cmapL
ogger
:
cmaplog
,
logError
:
logErr
,
logInfo
:
logInf
,
addrBook
:
make
(
map
[
int
]
*
ACL
),
agentID
:
agentID
,
active
:
true
,
aclLookup
:
aclLookup
,
l
ogger
:
cmaplog
,
logError
:
logErr
,
logInfo
:
logInf
,
}
return
}
...
...
@@ -112,7 +113,6 @@ func (acl *ACL) close() {
acl
.
logInfo
.
Println
(
"Closing ACL of agent "
,
acl
.
agentID
)
acl
.
active
=
false
acl
.
mutex
.
Unlock
()
return
}
// NewMessage returns a new initiaized message
...
...
@@ -209,7 +209,7 @@ func (acl *ACL) SendMessage(msg schemas.ACLMessage) (err error) {
if
err
!=
nil
{
return
}
err
=
acl
.
cmapL
ogger
.
NewLog
(
"msg"
,
"ACL send"
,
msg
.
String
())
err
=
acl
.
l
ogger
.
NewLog
(
"msg"
,
"ACL send"
,
msg
.
String
())
// acl.mutex.Lock()
// if acl.analysis {
// acl.commOut <- msg.Receiver
...
...
@@ -235,13 +235,12 @@ func (acl *ACL) newIncomingMessage(msg schemas.ACLMessage) (err error) {
}
else
{
acl
.
msgIn
<-
msg
}
err
=
acl
.
cmapL
ogger
.
NewLog
(
"msg"
,
"ACL receive"
,
msg
.
String
())
err
=
acl
.
l
ogger
.
NewLog
(
"msg"
,
"ACL receive"
,
msg
.
String
())
// acl.mutex.Lock()
// if acl.analysis {
// acl.commIn <- msg.Sender
// }
// acl.mutex.Unlock()
err
=
nil
return
}
...
...
pkg/agency/agency.go
View file @
8cd5651d
...
...
@@ -60,6 +60,7 @@ import (
agencyclient
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/agency/client"
amsclient
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/ams/client"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/client"
dfclient
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/df/client"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/status"
...
...
@@ -75,7 +76,7 @@ type Agency struct {
mutex
*
sync
.
Mutex
// mutex to protect agents from concurrent reads and writes
agentTask
func
(
*
Agent
)
error
msgIn
chan
[]
schemas
.
ACLMessage
log
Handler
*
logHandle
r
log
Collector
*
client
.
LogCollecto
r
mqttClient
*
mqttClient
dfClient
*
dfclient
.
Client
amsClient
*
amsclient
.
Client
...
...
@@ -107,7 +108,7 @@ func StartAgency(task func(*Agent) error) (err error) {
go
agency
.
startAgents
()
// catch kill signal in order to terminate agency and agents before exiting
var
gracefulStop
=
make
(
chan
os
.
Signal
)
var
gracefulStop
=
make
(
chan
os
.
Signal
,
10
)
signal
.
Notify
(
gracefulStop
,
syscall
.
SIGTERM
)
signal
.
Notify
(
gracefulStop
,
syscall
.
SIGINT
)
go
agency
.
terminate
(
gracefulStop
)
...
...
@@ -151,11 +152,22 @@ func (agency *Agency) init() (err error) {
return
}
agency
.
info
.
MASID
,
err
=
strconv
.
Atoi
(
hostname
[
1
])
if
err
!=
nil
{
return
}
agency
.
info
.
ImageGroupID
,
err
=
strconv
.
Atoi
(
hostname
[
3
])
if
err
!=
nil
{
return
}
agency
.
info
.
ID
,
err
=
strconv
.
Atoi
(
hostname
[
5
])
if
err
!=
nil
{
return
}
agency
.
info
.
Name
=
temp
+
".mas"
+
hostname
[
1
]
+
"agencies"
agency
.
logHandler
=
newLogHandler
(
agency
.
info
.
MASID
,
agency
.
logError
,
agency
.
logInfo
)
agency
.
mqttClient
=
newMQTTClient
(
"mqtt"
,
1883
,
agency
.
info
.
Name
,
agency
.
logError
,
agency
.
logInfo
)
agency
.
logCollector
,
err
=
client
.
NewLogCollector
(
agency
.
info
.
MASID
,
agency
.
info
.
Logger
,
agency
.
logError
,
agency
.
logInfo
)
agency
.
mqttClient
=
newMQTTClient
(
"mqtt"
,
1883
,
agency
.
info
.
Name
,
agency
.
logError
,
agency
.
logInfo
)
agency
.
mutex
.
Unlock
()
return
}
...
...
@@ -164,7 +176,7 @@ func (agency *Agency) init() (err error) {
// goroutine and waits until an OS signal is inserted into the channel gracefulStop
func
(
agency
*
Agency
)
terminate
(
gracefulStop
chan
os
.
Signal
)
{
// var err error
_
=
<-
gracefulStop
<-
gracefulStop
agency
.
logInfo
.
Println
(
"Terminating agency"
)
agency
.
mutex
.
Lock
()
for
i
:=
range
agency
.
localAgents
{
...
...
@@ -180,8 +192,8 @@ func (agency *Agency) terminate(gracefulStop chan os.Signal) {
func
(
agency
*
Agency
)
startAgents
()
(
err
error
)
{
// request configuration
var
agencyInfoFull
schemas
.
AgencyInfoFull
agencyInfoFull
,
_
,
err
=
agency
.
amsClient
.
GetAgencyInfo
(
agency
.
info
.
MASID
,
agency
.
info
.
ImageGroupID
,
agency
.
info
.
ID
)
agencyInfoFull
,
_
,
err
=
agency
.
amsClient
.
GetAgencyInfo
(
agency
.
info
.
MASID
,
agency
.
info
.
ImageGroupID
,
agency
.
info
.
ID
)
agency
.
mutex
.
Lock
()
agency
.
info
.
ID
=
agencyInfoFull
.
ID
agency
.
info
.
Logger
=
agencyInfoFull
.
Logger
...
...
@@ -223,7 +235,7 @@ func (agency *Agency) createAgent(agentInfo schemas.AgentInfo) (err error) {
agentInfo
.
Status
.
Code
=
status
.
Starting
msgIn
:=
make
(
chan
schemas
.
ACLMessage
,
1000
)
agency
.
mutex
.
Lock
()
ag
:=
newAgent
(
agentInfo
,
msgIn
,
agency
.
aclLookup
,
agency
.
log
Handle
r
,
agency
.
info
.
Logger
,
ag
:=
newAgent
(
agentInfo
,
msgIn
,
agency
.
aclLookup
,
agency
.
log
Collecto
r
,
agency
.
info
.
Logger
,
agency
.
mqttClient
,
agency
.
dfClient
,
agency
.
logError
,
agency
.
logInfo
)
agency
.
localAgents
[
agentInfo
.
ID
]
=
ag
agency
.
mutex
.
Unlock
()
...
...
pkg/agency/agent.go
View file @
8cd5651d
...
...
@@ -52,6 +52,7 @@ import (
"log"
"sync"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/client"
dfclient
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/df/client"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/status"
...
...
@@ -62,16 +63,16 @@ type Agent struct {
mutex
*
sync
.
Mutex
id
int
// unique id of agent
nodeID
int
name
string
// Name of agent
aType
string
// Type of agent
aSubtype
string
// Subtype of agent
custom
string
// custom data
customChan
chan
string
// channel for custom update behavior
masID
int
// ID of MAS agent is belongs to
status
int
// Status of agent
ACL
*
ACL
// agent communication
Logger
*
Logger
// logger object
MQTT
*
MQTT
// mqtt object
name
string
// Name of agent
aType
string
// Type of agent
aSubtype
string
// Subtype of agent
custom
string
// custom data
customChan
chan
string
// channel for custom update behavior
masID
int
// ID of MAS agent is belongs to
status
int
// Status of agent
ACL
*
ACL
// agent communication
Logger
*
client
.
Agent
Logger
// logger object
MQTT
*
MQTT
// mqtt object
DF
*
DF
logError
*
log
.
Logger
logInfo
*
log
.
Logger
...
...
@@ -80,7 +81,7 @@ type Agent struct {
// newAgent creates a new agent
func
newAgent
(
info
schemas
.
AgentInfo
,
msgIn
chan
schemas
.
ACLMessage
,
aclLookup
func
(
int
)
(
*
ACL
,
error
),
log
*
logHandle
r
,
logConfig
schemas
.
LoggerConfig
,
aclLookup
func
(
int
)
(
*
ACL
,
error
),
log
Col
*
client
.
LogCollecto
r
,
logConfig
schemas
.
LoggerConfig
,
mqtt
*
mqttClient
,
dfClient
*
dfclient
.
Client
,
logErr
*
log
.
Logger
,
logInf
*
log
.
Logger
)
(
ag
*
Agent
)
{
ag
=
&
Agent
{
id
:
info
.
ID
,
...
...
@@ -97,7 +98,7 @@ func newAgent(info schemas.AgentInfo, msgIn chan schemas.ACLMessage,
active
:
true
,
}
// in, out := ag.ACL.getCommDataChannels()
ag
.
Logger
=
new
Logger
(
ag
.
id
,
log
,
logConfig
,
ag
.
logError
,
ag
.
logInfo
)
ag
.
Logger
=
logCol
.
NewAgent
Logger
(
ag
.
id
,
logConfig
,
ag
.
logError
,
ag
.
logInfo
)
ag
.
ACL
=
newACL
(
info
.
ID
,
msgIn
,
aclLookup
,
ag
.
Logger
,
logErr
,
logInf
)
ag
.
MQTT
=
newMQTT
(
ag
.
id
,
mqtt
,
ag
.
Logger
,
ag
.
logError
,
ag
.
logInfo
)
ag
.
DF
=
newDF
(
ag
.
masID
,
ag
.
id
,
ag
.
nodeID
,
dfClient
,
ag
.
logError
,
ag
.
logInfo
)
...
...
@@ -173,7 +174,6 @@ func (agent *Agent) updateCustomData(custom string) {
agent
.
mutex
.
Unlock
()
}
agent
.
logInfo
.
Println
(
"Updated config of agent "
,
agent
.
GetAgentID
())
return
}
// deregisterCustomUpdateChannel deletes the channel for a custom config update behavior
...
...
@@ -191,7 +191,7 @@ func (agent *Agent) Terminate() {
agent
.
active
=
false
agent
.
mutex
.
Unlock
()
agent
.
ACL
.
close
()
agent
.
Logger
.
c
lose
()
agent
.
Logger
.
C
lose
()
agent
.
MQTT
.
close
()
agent
.
DF
.
close
()
}
pkg/agency/df.go
View file @
8cd5651d
...
...
@@ -78,14 +78,14 @@ func (df *DF) RegisterService(svc schemas.Service) (id string, err error) {
df
.
mutex
.
Unlock
()
id
=
"-1"
if
svc
.
Desc
==
""
{
err
=
errors
.
New
(
"
E
mpty description not allowed"
)
err
=
errors
.
New
(
"
e
mpty description not allowed"
)
return
}
df
.
mutex
.
Lock
()
_
,
ok
:=
df
.
registeredServices
[
svc
.
Desc
]
df
.
mutex
.
Unlock
()
if
ok
{
err
=
errors
.
New
(
"
S
ervice already registered"
)
err
=
errors
.
New
(
"
s
ervice already registered"
)
return
}
df
.
mutex
.
Lock
()
...
...
@@ -173,7 +173,7 @@ func (df *DF) DeregisterService(svcID string) (err error) {
}
df
.
mutex
.
Unlock
()
if
desc
==
""
{
err
=
errors
.
New
(
"
N
o such service"
)
err
=
errors
.
New
(
"
n
o such service"
)
return
}
df
.
mutex
.
Lock
()
...
...
@@ -213,5 +213,4 @@ func (df *DF) close() {
df
.
logInfo
.
Println
(
"Closing DF of agent "
,
df
.
agentID
)
df
.
active
=
false
df
.
mutex
.
Unlock
()
return
}
pkg/agency/handler.go
View file @
8cd5651d
...
...
@@ -71,7 +71,6 @@ func (agency *Agency) handleGetAgency(w http.ResponseWriter, r *http.Request) {
}
httpErr
=
httpreply
.
Resource
(
w
,
agencyInfo
,
cmapErr
)
agency
.
logErrors
(
r
.
URL
.
Path
,
cmapErr
,
httpErr
)
return
}
// handlePostAgent is the handler for post requests to path /api/agency/agents
...
...
@@ -95,7 +94,6 @@ func (agency *Agency) handlePostAgent(w http.ResponseWriter, r *http.Request) {
go
agency
.
createAgent
(
agentInfo
)
httpErr
=
httpreply
.
Created
(
w
,
nil
,
"text/plain"
,
[]
byte
(
"Ressource Created"
))
agency
.
logErrors
(
r
.
URL
.
Path
,
cmapErr
,
httpErr
)
return
}
// handlePostMsgs is the handler for post requests to path /api/agency/msgs
...
...
@@ -118,7 +116,6 @@ func (agency *Agency) handlePostMsgs(w http.ResponseWriter, r *http.Request) {
agency
.
msgIn
<-
msgs
httpErr
=
httpreply
.
Created
(
w
,
cmapErr
,
"text/plain"
,
[]
byte
(
"Ressource Created"
))
agency
.
logErrors
(
r
.
URL
.
Path
,
cmapErr
,
httpErr
)
return
}
// handlePostUndeliverableMsg is the handler for post requests to path /api/agency/msgundeliv
...
...
@@ -141,7 +138,6 @@ func (agency *Agency) handlePostUndeliverableMsg(w http.ResponseWriter, r *http.
go
agency
.
resendUndeliverableMsg
(
msg
)
httpErr
=
httpreply
.
Created
(
w
,
cmapErr
,
"text/plain"
,
[]
byte
(
"Ressource Created"
))
agency
.
logErrors
(
r
.
URL
.
Path
,
cmapErr
,
httpErr
)
return
}
// handleDeleteAgentID is the handler for delete requests to path /api/agency/agents/{agentid}
...
...
@@ -158,7 +154,6 @@ func (agency *Agency) handleDeleteAgentID(w http.ResponseWriter, r *http.Request
cmapErr
=
agency
.
removeAgent
(
agentID
)
httpErr
=
httpreply
.
Deleted
(
w
,
cmapErr
)
agency
.
logErrors
(
r
.
URL
.
Path
,
cmapErr
,
httpErr
)
return
}
// handleGetAgentStatus is the handler for get requests to path /api/agency/agents/{agentid}/status
...
...
@@ -181,7 +176,6 @@ func (agency *Agency) handleGetAgentStatus(w http.ResponseWriter, r *http.Reques
}
httpErr
=
httpreply
.
Resource
(
w
,
agentStatus
,
cmapErr
)
agency
.
logErrors
(
r
.
URL
.
Path
,
cmapErr
,
httpErr
)
return
}
// handlePutAgentCustom is the handler for put requests to path /api/agency/agents/{agentid}/custom
...
...
@@ -211,7 +205,6 @@ func (agency *Agency) handlePutAgentCustom(w http.ResponseWriter, r *http.Reques
}
httpErr
=
httpreply
.
Updated
(
w
,
cmapErr
)
agency
.
logErrors
(
r
.
URL
.
Path
,
cmapErr
,
httpErr
)
return
}
// methodNotAllowed is the default handler for valid paths but invalid methods
...
...
@@ -219,15 +212,13 @@ func (agency *Agency) methodNotAllowed(w http.ResponseWriter, r *http.Request) {
httpErr
:=
httpreply
.
MethodNotAllowed
(
w
)
cmapErr
:=
errors
.
New
(
"Error: Method not allowed on path "
+
r
.
URL
.
Path
)
agency
.
logErrors
(
r
.
URL
.
Path
,
cmapErr
,
httpErr
)
return
}
// resourceNotFound is the default handler for invalid paths
func
(
agency
*
Agency
)
resourceNotFound
(
w
http
.
ResponseWriter
,
r
*
http
.
Request
)
{
httpErr
:=
httpreply
.
NotFoundError
(
w
)
cmapErr
:=
errors
.
New
(
"
R
esource not found"
)
cmapErr
:=
errors
.
New
(
"
r
esource not found"
)
agency
.
logErrors
(
r
.
URL
.
Path
,
cmapErr
,
httpErr
)
return
}
// logErrors logs errors if any
...
...
@@ -238,7 +229,6 @@ func (agency *Agency) logErrors(path string, cmapErr error, httpErr error) {
if
httpErr
!=
nil
{
agency
.
logError
.
Println
(
path
,
httpErr
)
}
return
}
// loggingMiddleware logs request before calling final handler
...
...
pkg/agency/logging.go
View file @
8cd5651d
...
...
@@ -51,7 +51,7 @@ import (
"sync"
"time"
logclient
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/
logger/
client"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/client"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
)
...
...
@@ -76,7 +76,7 @@ func (log *Logger) NewLog(topic string, message string, data string) (err error)
log
.
mutex
.
Unlock
()
if
topic
!=
"error"
&&
topic
!=
"debug"
&&
topic
!=
"status"
&&
topic
!=
"msg"
&&
topic
!=
"app"
{
err
=
errors
.
New
(
"
U
nknown topic"
)
err
=
errors
.
New
(
"
u
nknown topic"
)
return
}
log
.
mutex
.
Lock
()
...
...
@@ -163,7 +163,7 @@ type logHandler struct {
logIn
chan
schemas
.
LogMessage
// logging inbox
stateIn
chan
schemas
.
State
active
bool
// indicates if logging is active (switch via env)
client
*
log
client
.
LoggerClient
client
*
client
.
LoggerClient
logError
*
log
.
Logger
logInfo
*
log
.
Logger
}
...
...
@@ -242,7 +242,7 @@ func newLogHandler(masID int, logErr *log.Logger, logInf *log.Logger) (log *logH
logError
:
logErr
,
logInfo
:
logInf
,
}
log
.
client
,
_
=
log
client
.
NewLoggerClient
(
"logger"
,
11000
,
time
.
Second
*
60
,
time
.
Second
*
1
,
4
)
log
.
client
,
_
=
client
.
NewLoggerClient
(
"logger"
,
11000
,
time
.
Second
*
60
,
time
.
Second
*
1
,
4
)
temp
:=
os
.
Getenv
(
"CLONEMAP_LOGGING"
)
if
temp
==
"ON"
{
log
.
active
=
true
...
...
pkg/agency/mqtt.go
View file @
8cd5651d
...
...
@@ -51,6 +51,7 @@ import (
"strconv"
"sync"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/client"
"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
mqtt
"github.com/eclipse/paho.mqtt.golang"
)
...
...
@@ -63,23 +64,23 @@ type MQTT struct {
msgInTopic
map
[
string
]
chan
schemas
.
MQTTMessage
// message inbox for messages with specified topic
msgIn
chan
schemas
.
MQTTMessage
// mqtt message inbox
agentID
int
cmapL
ogger
*
Logger
l
ogger
*
client
.
Agent
Logger
logError
*
log
.
Logger
logInfo
*
log
.
Logger
active
bool
}
// newMQTT returns a new pubsub connector of type mqtt
func
newMQTT
(
agentID
int
,
cli
*
mqttClient
,
cmaplog
*
Logger
,
logErr
*
log
.
Logger
,
func
newMQTT
(
agentID
int
,
cli
*
mqttClient
,
cmaplog
*
client
.
Agent
Logger
,
logErr
*
log
.
Logger
,
logInf
*
log
.
Logger
)
(
mq
*
MQTT
)
{
mq
=
&
MQTT
{
client
:
cli
,
mutex
:
&
sync
.
Mutex
{},
agentID
:
agentID
,
cmapL
ogger
:
cmaplog
,
logError
:
logErr
,
logInfo
:
logInf
,
active
:
true
,
client
:
cli
,
mutex
:
&
sync
.
Mutex
{},
agentID
:
agentID
,
l
ogger
:
cmaplog
,
logError
:
logErr
,
logInfo
:
logInf
,
active
:
true
,
}
mq
.
subTopic
=
make
(
map
[
string
]
interface
{})
mq
.
msgInTopic
=
make
(
map
[
string
]
chan
schemas
.
MQTTMessage
)
...
...
@@ -96,7 +97,6 @@ func (mq *MQTT) close() {
mq
.
logInfo
.
Println
(
"Closing MQTT of agent "
,
mq
.
agentID
)
mq
.
active
=
false
mq
.
mutex
.
Unlock
()
return
}
// Subscribe subscribes to a topic
...
...
@@ -152,7 +152,7 @@ func (mq *MQTT) SendMessage(msg schemas.MQTTMessage, qos int) (err error) {
if
err
!=
nil
{
return
}
err
=
mq
.
cmapL
ogger
.
NewLog
(
"msg"
,
"MQTT publish"
,
msg
.
String
())
err
=
mq
.
l
ogger
.
NewLog
(
"msg"
,
"MQTT publish"
,
msg
.
String
())
return
}
...
...
@@ -208,7 +208,7 @@ func (mq *MQTT) newIncomingMQTTMessage(msg schemas.MQTTMessage) {
return
}
mq
.
mutex
.
Unlock
()
mq
.
cmapL
ogger
.
NewLog
(
"msg"
,
"MQTT receive"
,
msg
.
String
())
mq
.
l
ogger
.
NewLog
(
"msg"
,
"MQTT receive"
,
msg
.
String
())
mq
.
mutex
.
Lock
()
inbox
,
ok
:=
mq
.
msgInTopic
[
msg
.
Topic
]
mq
.
mutex
.
Unlock
()
...
...
@@ -217,7 +217,6 @@ func (mq *MQTT) newIncomingMQTTMessage(msg schemas.MQTTMessage) {
}
else
{
mq
.
msgIn
<-
msg
}
return
}
func
(
mq
*
MQTT
)
registerTopicChannel
(
topic
string
,
topicChan
chan
schemas
.
MQTTMessage
)
(
err
error
)
{
...
...
@@ -319,7 +318,6 @@ func (cli *mqttClient) newIncomingMQTTMessage(client mqtt.Client, msg mqtt.Messa
mqttMsg
.
Content
=
msg
.
Payload
()
mqttMsg
.
Topic
=
msg
.
Topic
()
cli
.
msgIn
<-
mqttMsg
return
}
// subscribe subscribes to specified topics
...
...
pkg/ams/ams.go
View file @
8cd5651d
...
...
@@ -341,20 +341,18 @@ func (ams *AMS) configureMAS(masSpec schemas.MASSpec) (masInfo schemas.MASInfo,
// total number of agents and total number of agencies
masInfo
.
Agents
.
Counter
=
0
numAgencies
=
make
([]
int
,
masInfo
.
ImageGroups
.
Counter
,
masInfo
.
ImageGroups
.
Counter
)
numAgencies
=
make
([]
int
,
masInfo
.
ImageGroups
.
Counter
)
for
i
:=
range
masSpec
.
ImageGroups
{
masInfo
.
Agents
.
Counter
+=
len
(
masSpec
.
ImageGroups
[
i
]
.
Agents
)
num
:=
len
(
masSpec
.
ImageGroups
[
i
]
.
Agents
)
/
masSpec
.
Config
.
NumAgentsPerAgency
if
len
(
masSpec
.
ImageGroups
[
i
]
.
Agents
)
%
masSpec
.
Config
.
NumAgentsPerAgency
>
0
{
num
++
}
masInfo
.
ImageGroups
.
Inst
[
i
]
.
Agencies
.
Inst
=
make
([]
schemas
.
AgencyInfo
,
num
,
num
)
masInfo
.
ImageGroups
.
Inst
[
i
]
.
Agencies
.
Inst
=
make
([]
schemas
.
AgencyInfo
,
num
)
masInfo
.
ImageGroups
.
Inst
[
i
]
.
Agencies
.
Counter
=
num
numAgencies
[
i
]
=
num
}
masInfo
.
Agents
.
Inst
=
make
([]
schemas
.
AgentInfo
,
masInfo
.
Agents
.
Counter
,
masInfo
.
Agents
.
Counter
)
masInfo
.
Agents
.
Inst
=
make
([]
schemas
.
AgentInfo
,
masInfo
.
Agents
.
Counter
)
// empty graph?
if
len
(
masInfo
.
Graph
.
Node
)
==
0
{
...
...
@@ -490,11 +488,6 @@ func (ams *AMS) createAgents(masID int, groupSpecs []schemas.ImageGroupSpec) (er
return
}
// createAgent creates a new agent and adds it to an existing mas
func
(
ams
*
AMS
)
createAgent
(
masID
int
,
agentSpec
schemas
.
AgentSpec
)
(
err
error
)
{
return
}
// removeAgent removes an agent from the MAS
func
(
ams
*
AMS
)
removeAgent
(
masID
int
,
agentID
int
)
(
err
error
)
{
var
addr
schemas
.
Address
...
...
pkg/ams/ams_test.go
View file @
8cd5651d
...
...
@@ -122,7 +122,6 @@ func stubListen() (err error) {
// stubHandler answers with created
func
stubHandler
(
w
http
.
ResponseWriter
,
r
*
http
.
Request
)
{
httpreply
.
Created
(
w
,
nil
,
"text/plain"
,
[]
byte
(
"Ressource Created"
))
return
}
// dummyClient makes requests to ams and terminates ams server at end
...
...
@@ -241,5 +240,4 @@ func dummyClient(s *http.Server, t *testing.T) {
ctx
,
cancel
:=
context
.
WithTimeout
(
context
.
Background
(),
30
*
time
.
Second
)
defer
cancel
()
s
.
Shutdown
(
ctx
)
return
}
pkg/ams/deployment.go
View file @
8cd5651d
...
...
@@ -133,7 +133,7 @@ func (localdepl *localDeployment) newImageGroup(masID int,
":8000/api/container"
,
" "
,
js
,
time
.
Second
*
2
,
2
)
if
err
==
nil
{
if
statusCode
!=
http
.
StatusCreated
{
err
=
errors
.
New
(
"
C
annot create agency"
)
err
=
errors
.
New
(
"
c
annot create agency"
)
return
}
}
...
...
@@ -156,7 +156,7 @@ func (localdepl *localDeployment) scaleImageGroup(masID int, imID int,
":8000/api/container"
,
" "
,
js
,
time
.
Second
*
2
,
2
)
if
err
==
nil
{
if
statusCode
!=
http
.
StatusCreated
{
err
=
errors
.
New
(
"
C
annot create agency"
)
err
=
errors
.
New
(
"
c
annot create agency"
)
return
}
}
...
...
pkg/ams/etcd.go
View file @
8cd5651d
...
...
@@ -223,7 +223,7 @@ func (stor *etcdStorage) uploadAgentInfo(newMAS schemas.MASInfo) (err error) {
if
newMAS
.
Agents
.
Counter
-
agentIndex
<
numAgInTrans
{
numAgInTrans
=
newMAS
.
Agents
.
Counter
-
agentIndex
}
Ops
:=
make
([]
clientv3
.
Op
,
numAgInTrans
,
numAgInTrans
)
Ops
:=
make
([]
clientv3
.
Op
,
numAgInTrans
)
// put all agent structs together
for
i
:=
0
;
i
<
numAgInTrans
;
i
++
{
var
res
[]
byte
...
...
@@ -270,7 +270,7 @@ func (stor *etcdStorage) uploadImGroupInfo(newMAS schemas.MASInfo) (err error) {
if
newMAS
.
ImageGroups
.
Inst
[
i
]
.
Agencies
.
Counter
-
agencyIndex
<
numAgInTrans
{
numAgInTrans
=
newMAS
.
ImageGroups
.
Inst
[
i
]
.
Agencies
.
Counter
-
agencyIndex
}