amqpclient.go 5.96 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** AMQP package, client.
*
* @author Sonja Happ <sonja.happ@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASweb-backend-go
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
22
package infrastructure_component
23
24
25
26

import (
	"encoding/json"
	"fmt"
27
	"github.com/gin-gonic/gin"
28
	"github.com/google/uuid"
29
	"github.com/jinzhu/gorm"
30
	"github.com/streadway/amqp"
31
	"log"
32
33
34
35
36
37
38
39
40
41
42
43
	"time"
)

const VILLAS_EXCHANGE = "villas"

type AMQPclient struct {
	connection *amqp.Connection
	channel    *amqp.Channel
	replies    <-chan amqp.Delivery
}

type Action struct {
Sonja Happ's avatar
Sonja Happ committed
44
45
46
	Act        string   `json:"action"`
	When       float32  `json:"when"`
	Parameters struct{} `json:"parameters"`
Sonja Happ's avatar
Sonja Happ committed
47
48
49
	UUID       *string  `json:"uuid"`
	//Model      struct{} `json:"model"`
	//Results    struct{} `json:"results"`
50
51
}

52
type ICUpdate struct {
53
	State      *string `json:"state"`
54
	Properties struct {
Sonja Happ's avatar
Sonja Happ committed
55
56
57
58
59
60
61
62
		UUID        string  `json:"uuid"`
		Name        *string `json:"name"`
		Category    *string `json:"category"`
		Type        *string `json:"type"`
		Location    *string `json:"location"`
		WS_url      *string `json:"ws_url"`
		API_url     *string `json:"api_url"`
		Description *string `json:"description"`
63
	} `json:"properties"`
Sonja Happ's avatar
Sonja Happ committed
64
	// TODO add JSON start parameter scheme
65
66
}

67
68
69
70
71
72
73
74
75
var client AMQPclient

func ConnectAMQP(uri string) error {

	var err error

	// connect to broker
	client.connection, err = amqp.Dial(uri)
	if err != nil {
76
		return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v, error: %v", uri, err)
77
78
79
80
81
	}

	// create channel
	client.channel, err = client.connection.Channel()
	if err != nil {
82
		return fmt.Errorf("AMQP: failed to open a channel, error: %v", err)
83
84
85
86
87
88
89
90
91
92
	}
	// declare exchange
	err = client.channel.ExchangeDeclare(VILLAS_EXCHANGE,
		"headers",
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
93
		return fmt.Errorf("AMQP: failed to declare the exchange, error: %v", err)
94
95
	}

96
97
	// add a queue for the ICs
	ICQueue, err := client.channel.QueueDeclare("infrastructure_components",
98
99
100
101
102
103
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
104
		return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err)
105
106
	}

107
	err = client.channel.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil)
108
	if err != nil {
109
		return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err)
110
111
112
	}

	// consume deliveries
113
	client.replies, err = client.channel.Consume(ICQueue.Name,
114
		"",
115
		true,
116
117
118
119
120
		false,
		false,
		false,
		nil)
	if err != nil {
121
		return fmt.Errorf("AMQP: failed to consume deliveries, error: %v", err)
122
123
124
125
	}

	// consuming queue
	go func() {
126
127
		for {
			for message := range client.replies {
128
129
130
131
				err = processMessage(message)
				if err != nil {
					log.Println(err.Error())
				}
132
			}
133
			time.Sleep(2) // sleep for 2 sek
134
135
136
		}
	}()

137
138
	log.Printf(" AMQP: Waiting for messages... ")

139
140
141
	return nil
}

Sonja Happ's avatar
Sonja Happ committed
142
func SendActionAMQP(action Action) error {
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157

	payload, err := json.Marshal(action)
	if err != nil {
		return err
	}

	msg := amqp.Publishing{
		DeliveryMode:    2,
		Timestamp:       time.Now(),
		ContentType:     "application/json",
		ContentEncoding: "utf-8",
		Priority:        0,
		Body:            payload,
	}

Sonja Happ's avatar
Sonja Happ committed
158
159
160
	err = CheckConnection()
	if err != nil {
		return err
161
162
	}

Sonja Happ's avatar
Sonja Happ committed
163
	log.Println("AMQP: Sending message", string(msg.Body))
164
165
166
167
168
169
170
171
172
173
	err = client.channel.Publish(VILLAS_EXCHANGE,
		"",
		false,
		false,
		msg)
	return err

}

func PingAMQP() error {
174
	log.Println("AMQP: sending ping command to all ICs")
175
176
177

	var a Action
	a.Act = "ping"
Sonja Happ's avatar
Sonja Happ committed
178
	*a.UUID = ""
179

Sonja Happ's avatar
Sonja Happ committed
180
	err := SendActionAMQP(a)
181
182
	return err
}
183
184
185

func CheckConnection() error {

186
187
188
189
190
191
	if client.connection != nil {
		if client.connection.IsClosed() {
			return fmt.Errorf("connection to broker is closed")
		}
	} else {
		return fmt.Errorf("connection is nil")
192
193
194
195
	}

	return nil
}
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215

func StartAMQP(AMQPurl string, api *gin.RouterGroup) error {
	if AMQPurl != "" {
		log.Println("Starting AMQP client")

		err := ConnectAMQP(AMQPurl)
		if err != nil {
			return err
		}

		// register IC action endpoint only if AMQP client is used
		RegisterAMQPEndpoint(api.Group("/ic"))

		// Periodically call the Ping function to check which ICs are still there
		ticker := time.NewTicker(10 * time.Second)
		go func() {

			for {
				select {
				case <-ticker.C:
216
217
218
219
220
221
222
					//TODO Add a useful regular event here
					/*
						err = PingAMQP()
						if err != nil {
							log.Println("AMQP Error: ", err.Error())
						}
					*/
223
224
225
226
227
228
229
230
231
232
				}
			}

		}()

		log.Printf("Connected AMQP client to %s", AMQPurl)
	}

	return nil
}
233

234
func processMessage(message amqp.Delivery) error {
235

Sonja Happ's avatar
Sonja Happ committed
236
	log.Println("Processing AMQP message: ", string(message.Body))
Sonja Happ's avatar
Sonja Happ committed
237

238
239
240
	var payload ICUpdate
	err := json.Unmarshal(message.Body, &payload)
	if err != nil {
241
		return fmt.Errorf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err)
242
243
244
245
246
247
	}

	ICUUID := payload.Properties.UUID
	_, err = uuid.Parse(ICUUID)

	if err != nil {
248
		return fmt.Errorf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
249
250
	}

251
	var sToBeUpdated InfrastructureComponent
252
253
254
255
	err = sToBeUpdated.ByUUID(ICUUID)

	if err == gorm.ErrRecordNotFound {
		// create new record
256
		err = createNewICviaAMQP(payload)
257
258
259
260
	} else if err != nil {
		// database error
		err = fmt.Errorf("AMQP: Database error for IC %v DB error message: %v", ICUUID, err)
	} else {
261
262
		// update record based on payload
		err = sToBeUpdated.updateICviaAMQP(payload)
263
	}
264

265
266
	return err
}