Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
ACS
Public
Cloud
MAS
clonemap
Commits
175b120d
Commit
175b120d
authored
Mar 30, 2021
by
Stefan Dähling
Browse files
log to msg when message is sent or received
parent
c6b73d57
Pipeline
#437917
passed with stages
in 13 minutes and 34 seconds
Changes
6
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
cmd/agency/main.go
View file @
175b120d
...
...
@@ -61,15 +61,11 @@ func main() {
}
func
task
(
ag
*
agency
.
Agent
)
(
err
error
)
{
beh
,
err
:=
ag
.
NewCustomUpdateBehavior
(
customUpdateHandler
)
beh
.
Start
()
time
.
Sleep
(
2
*
time
.
Second
)
id
:=
ag
.
GetAgentID
()
recv
:=
(
id
+
1
)
%
2
msg
,
_
:=
ag
.
ACL
.
NewMessage
(
recv
,
0
,
0
,
"test message"
)
ag
.
ACL
.
SendMessage
(
msg
)
ag
.
Logger
.
NewLog
(
"app"
,
"This is agent "
+
strconv
.
Itoa
(
id
),
""
)
return
}
func
customUpdateHandler
(
custom
string
)
(
err
error
)
{
fmt
.
Println
(
custom
)
return
}
docs/example.json
View file @
175b120d
...
...
@@ -23,10 +23,10 @@
},
"agents"
:[
{
"nodeid"
:
0
,
"name"
:
"agentnotask"
,
"type"
:
"NoTask"
,
"
custom"
:
""
"nodeid"
:
0
}
,
{
"
nodeid"
:
0
}
]
}
...
...
pkg/agency/acl.go
View file @
175b120d
...
...
@@ -64,11 +64,12 @@ type ACL struct {
mutex
*
sync
.
Mutex
// mutex for address book
// commIn chan int // ID of agents that have sent messages
// commOut chan int // ID of agents that messages have been sent to
agentID
int
active
bool
aclLookup
func
(
int
)
(
*
ACL
,
error
)
logError
*
log
.
Logger
logInfo
*
log
.
Logger
agentID
int
active
bool
aclLookup
func
(
int
)
(
*
ACL
,
error
)
cmapLogger
*
Logger
logError
*
log
.
Logger
logInfo
*
log
.
Logger
}
// commData stores data about communication with other agent
...
...
@@ -79,19 +80,21 @@ type commData struct {
// newACL creates a new ACL object
func
newACL
(
agentID
int
,
msgIn
chan
schemas
.
ACLMessage
,
aclLookup
func
(
int
)
(
*
ACL
,
error
),
logErr
*
log
.
Logger
,
logInf
*
log
.
Logger
)
(
acl
*
ACL
)
{
aclLookup
func
(
int
)
(
*
ACL
,
error
),
cmaplog
*
Logger
,
logErr
*
log
.
Logger
,
logInf
*
log
.
Logger
)
(
acl
*
ACL
)
{
acl
=
&
ACL
{
mutex
:
&
sync
.
Mutex
{},
msgIn
:
msgIn
,
msgInProtocol
:
make
(
map
[
int
]
chan
schemas
.
ACLMessage
),
// commIn: make(chan int, 5000),
// commOut: make(chan int, 5000),
addrBook
:
make
(
map
[
int
]
*
ACL
),
agentID
:
agentID
,
active
:
true
,
aclLookup
:
aclLookup
,
logError
:
logErr
,
logInfo
:
logInf
,
addrBook
:
make
(
map
[
int
]
*
ACL
),
agentID
:
agentID
,
active
:
true
,
aclLookup
:
aclLookup
,
cmapLogger
:
cmaplog
,
logError
:
logErr
,
logInfo
:
logInf
,
}
return
}
...
...
@@ -203,6 +206,10 @@ func (acl *ACL) SendMessage(msg schemas.ACLMessage) (err error) {
acl
.
mutex
.
Unlock
()
err
=
aclRecv
.
newIncomingMessage
(
msg
)
}
if
err
!=
nil
{
return
}
err
=
acl
.
cmapLogger
.
NewLog
(
"msg"
,
"ACL send"
,
msg
.
String
())
// acl.mutex.Lock()
// if acl.analysis {
// acl.commOut <- msg.Receiver
...
...
@@ -228,6 +235,7 @@ func (acl *ACL) newIncomingMessage(msg schemas.ACLMessage) (err error) {
}
else
{
acl
.
msgIn
<-
msg
}
err
=
acl
.
cmapLogger
.
NewLog
(
"msg"
,
"ACL receive"
,
msg
.
String
())
// acl.mutex.Lock()
// if acl.analysis {
// acl.commIn <- msg.Sender
...
...
pkg/agency/agent.go
View file @
175b120d
...
...
@@ -92,13 +92,13 @@ func newAgent(info schemas.AgentInfo, msgIn chan schemas.ACLMessage,
custom
:
info
.
Spec
.
Custom
,
customChan
:
nil
,
mutex
:
&
sync
.
Mutex
{},
ACL
:
newACL
(
info
.
ID
,
msgIn
,
aclLookup
,
logErr
,
logInf
),
logError
:
logErr
,
logInfo
:
logInf
,
active
:
true
,
}
// in, out := ag.ACL.getCommDataChannels()
ag
.
Logger
=
newLogger
(
ag
.
id
,
log
,
logConfig
,
ag
.
logError
,
ag
.
logInfo
)
ag
.
ACL
=
newACL
(
info
.
ID
,
msgIn
,
aclLookup
,
ag
.
Logger
,
logErr
,
logInf
)
ag
.
MQTT
=
newMQTT
(
ag
.
id
,
mqtt
,
ag
.
Logger
,
ag
.
logError
,
ag
.
logInfo
)
ag
.
DF
=
newDF
(
ag
.
masID
,
ag
.
id
,
ag
.
nodeID
,
dfClient
,
ag
.
logError
,
ag
.
logInfo
)
return
...
...
pkg/agency/mqtt.go
View file @
175b120d
...
...
@@ -152,7 +152,7 @@ func (mq *MQTT) SendMessage(msg schemas.MQTTMessage, qos int) (err error) {
if
err
!=
nil
{
return
}
err
=
mq
.
cmapLogger
.
NewLog
(
"msg"
,
"
Sent MQTT message: "
+
string
(
msg
.
Content
),
string
(
msg
.
Content
))
err
=
mq
.
cmapLogger
.
NewLog
(
"msg"
,
"
MQTT publish"
,
msg
.
String
(
))
return
}
...
...
@@ -208,7 +208,7 @@ func (mq *MQTT) newIncomingMQTTMessage(msg schemas.MQTTMessage) {
return
}
mq
.
mutex
.
Unlock
()
mq
.
cmapLogger
.
NewLog
(
"msg"
,
"
Received MQTT message: "
+
string
(
msg
.
Content
),
string
(
msg
.
Content
))
mq
.
cmapLogger
.
NewLog
(
"msg"
,
"
MQTT receive"
,
msg
.
String
(
))
mq
.
mutex
.
Lock
()
inbox
,
ok
:=
mq
.
msgInTopic
[
msg
.
Topic
]
mq
.
mutex
.
Unlock
()
...
...
pkg/schemas/iot.go
View file @
175b120d
...
...
@@ -56,3 +56,9 @@ type MQTTMessage struct {
Topic
string
// Topic of message
Content
[]
byte
// Denotes the content of the message
}
// String outputs message
func
(
msg
MQTTMessage
)
String
()
(
ret
string
)
{
ret
=
"Topic: "
+
msg
.
Topic
+
"; Content: "
+
string
(
msg
.
Content
)
return
}
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment