Commit 15bd3bce authored by Stefan Dähling's avatar Stefan Dähling
Browse files

mqtt unsubscribe

parent db048686
Pipeline #290148 passed with stages
in 1 minute and 26 seconds
......@@ -108,26 +108,34 @@ func (agent *Agent) startAgent(task func(*Agent) error) (err error) {
// GetAgentID returns the agent ID
func (agent *Agent) GetAgentID() (ret int) {
agent.mutex.Lock()
ret = agent.id
agent.mutex.Unlock()
return
}
// GetAgentType returns the agent type and subtype
func (agent *Agent) GetAgentType() (aType string, aSubtype string) {
agent.mutex.Lock()
aType = agent.aType
aSubtype = agent.aSubtype
agent.mutex.Unlock()
return
}
// GetAgentName returns the agent name
func (agent *Agent) GetAgentName() (ret string) {
agent.mutex.Lock()
ret = agent.name
agent.mutex.Unlock()
return
}
// GetCustomData returns custom data
func (agent *Agent) GetCustomData() (ret string) {
agent.mutex.Lock()
ret = agent.custom
agent.mutex.Unlock()
return
}
......
......@@ -107,6 +107,19 @@ func (mq *MQTT) Subscribe(topic string, qos int) (err error) {
return
}
// Unsubscribe unsubscribes a topic
func (mq *MQTT) Unsubscribe(topic string) (err error) {
mq.mutex.Lock()
if !mq.active {
mq.mutex.Unlock()
err = errors.New("mqtt not active")
return
}
mq.mutex.Unlock()
err = mq.client.unsubscribe(mq, topic)
return
}
// SendMessage sends a message
func (mq *MQTT) SendMessage(msg schemas.MQTTMessage, qos int) (err error) {
mq.mutex.Lock()
......@@ -297,47 +310,90 @@ func (cli *mqttClient) newIncomingMQTTMessage(client mqtt.Client, msg mqtt.Messa
// subscribe subscribes to specified topics
func (cli *mqttClient) subscribe(mq *MQTT, topic string, qos int) (err error) {
if cli.active {
cli.mutex.Lock()
ag, ok := cli.subscription[topic]
cli.mutex.Unlock()
if ok {
subscribed := false
for i := range ag {
if ag[i].agentID == mq.agentID {
subscribed = true
break
}
}
if !subscribed {
ag = append(ag, mq)
cli.mutex.Lock()
cli.subscription[topic] = ag
cli.mutex.Unlock()
}
} else {
cli.mutex.Lock()
token := cli.client.Subscribe(topic, byte(qos), nil)
cli.mutex.Unlock()
if token.Wait() && token.Error() != nil {
err = token.Error()
return
if !cli.active {
return
}
cli.mutex.Lock()
ag, ok := cli.subscription[topic]
cli.mutex.Unlock()
if ok {
subscribed := false
for i := range ag {
if ag[i].agentID == mq.agentID {
subscribed = true
break
}
ag = make([]*MQTT, 0)
}
if !subscribed {
ag = append(ag, mq)
cli.mutex.Lock()
cli.subscription[topic] = ag
cli.mutex.Unlock()
}
// cli.mutex.Lock()
// numDel := len(cli.subscription) / 25
// if numDel > cli.numDeliverer {
// for i := 0; i < numDel-cli.numDeliverer; i++ {
// go cli.deliverMsgs()
// }
// cli.numDeliverer = numDel
// }
// cli.mutex.Unlock()
} else {
cli.mutex.Lock()
token := cli.client.Subscribe(topic, byte(qos), nil)
cli.mutex.Unlock()
if token.Wait() && token.Error() != nil {
err = token.Error()
return
}
ag = make([]*MQTT, 0)
ag = append(ag, mq)
cli.mutex.Lock()
cli.subscription[topic] = ag
cli.mutex.Unlock()
}
// cli.mutex.Lock()
// numDel := len(cli.subscription) / 25
// if numDel > cli.numDeliverer {
// for i := 0; i < numDel-cli.numDeliverer; i++ {
// go cli.deliverMsgs()
// }
// cli.numDeliverer = numDel
// }
// cli.mutex.Unlock()
return
}
// unsubscribe to a topic
func (cli *mqttClient) unsubscribe(mq *MQTT, topic string) (err error) {
if !cli.active {
return
}
cli.mutex.Lock()
ag, ok := cli.subscription[topic]
cli.mutex.Unlock()
if !ok {
return
}
index := -1
for i := range ag {
if mq.agentID == ag[i].agentID {
index = i
break
}
}
if index == -1 {
return
}
if index == 0 && len(ag) == 1 {
// agent is the only one who has subscribed -> unsubscribe
delete(cli.subscription, topic)
token := cli.client.Unsubscribe(topic)
if token.Wait() && token.Error() != nil {
err = token.Error()
return
}
} else {
// remove agent from list of subscribed agents
ag[index] = ag[len(ag)-1]
ag[len(ag)-1] = nil
ag = ag[:len(ag)-1]
cli.mutex.Lock()
cli.subscription[topic] = ag
cli.mutex.Unlock()
}
return
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment