ic_amqpclient.go 11.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** AMQP package, client.
*
* @author Sonja Happ <sonja.happ@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASweb-backend-go
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
22
package infrastructure_component
23
24
25
26

import (
	"encoding/json"
	"fmt"
27
	"github.com/gin-gonic/gin"
28
	"github.com/google/uuid"
29
	"github.com/jinzhu/gorm"
30
	"github.com/streadway/amqp"
31
	"log"
32
33
34
35
36
37
38
39
40
41
42
43
	"time"
)

const VILLAS_EXCHANGE = "villas"

type AMQPclient struct {
	connection *amqp.Connection
	channel    *amqp.Channel
	replies    <-chan amqp.Delivery
}

type Action struct {
44
45
46
47
48
49
50
51
52
53
54
55
	Act        string `json:"action"`
	When       int64  `json:"when"`
	Properties struct {
		UUID        *string `json:"uuid"`
		Name        *string `json:"name"`
		Category    *string `json:"category"`
		Type        *string `json:"type"`
		Location    *string `json:"location"`
		WS_url      *string `json:"ws_url"`
		API_url     *string `json:"api_url"`
		Description *string `json:"description"`
	} `json:"properties"`
56
57
}

Sonja Happ's avatar
Sonja Happ committed
58
type ICStatus struct {
Sonja Happ's avatar
Sonja Happ committed
59
	UUID        *string  `json:"uuid"`
Sonja Happ's avatar
Sonja Happ committed
60
61
62
63
64
65
66
67
68
69
70
	State       *string  `json:"state"`
	Name        *string  `json:"name"`
	Category    *string  `json:"category"`
	Type        *string  `json:"type"`
	Location    *string  `json:"location"`
	WS_url      *string  `json:"ws_url"`
	API_url     *string  `json:"api_url"`
	Description *string  `json:"description"`
	Uptime      *float64 `json:"uptime"` // TODO check if data type of uptime is float64 or int
}

71
type ICUpdate struct {
Sonja Happ's avatar
Sonja Happ committed
72
	Status *ICStatus `json:"status"`
Sonja Happ's avatar
Sonja Happ committed
73
	// TODO add JSON start parameter scheme
74
75
}

76
77
78
79
80
81
82
83
84
var client AMQPclient

func ConnectAMQP(uri string) error {

	var err error

	// connect to broker
	client.connection, err = amqp.Dial(uri)
	if err != nil {
85
		return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v, error: %v", uri, err)
86
87
88
89
90
	}

	// create channel
	client.channel, err = client.connection.Channel()
	if err != nil {
91
		return fmt.Errorf("AMQP: failed to open a channel, error: %v", err)
92
93
94
95
96
97
98
99
100
101
	}
	// declare exchange
	err = client.channel.ExchangeDeclare(VILLAS_EXCHANGE,
		"headers",
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
102
		return fmt.Errorf("AMQP: failed to declare the exchange, error: %v", err)
103
104
	}

105
106
	// add a queue for the ICs
	ICQueue, err := client.channel.QueueDeclare("infrastructure_components",
107
108
109
110
111
112
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
113
		return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err)
114
115
	}

116
	err = client.channel.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil)
117
	if err != nil {
118
		return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err)
119
120
121
	}

	// consume deliveries
122
	client.replies, err = client.channel.Consume(ICQueue.Name,
123
		"",
124
		true,
125
126
127
128
129
		false,
		false,
		false,
		nil)
	if err != nil {
130
		return fmt.Errorf("AMQP: failed to consume deliveries, error: %v", err)
131
132
133
134
	}

	// consuming queue
	go func() {
135
136
		for {
			for message := range client.replies {
137
138
139
140
				err = processMessage(message)
				if err != nil {
					log.Println(err.Error())
				}
141
			}
142
			time.Sleep(2) // sleep for 2 sek
143
144
145
		}
	}()

146
147
	log.Printf(" AMQP: Waiting for messages... ")

148
149
150
	return nil
}

151
func sendActionAMQP(action Action) error {
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166

	payload, err := json.Marshal(action)
	if err != nil {
		return err
	}

	msg := amqp.Publishing{
		DeliveryMode:    2,
		Timestamp:       time.Now(),
		ContentType:     "application/json",
		ContentEncoding: "utf-8",
		Priority:        0,
		Body:            payload,
	}

167
168
169
170
171
172
173
174
175
176
177
178
179
180
	// set message headers
	var headers map[string]interface{}
	headers = make(map[string]interface{}) // empty map
	if action.Properties.UUID != nil {
		headers["uuid"] = *action.Properties.UUID
	}
	if action.Properties.Type != nil {
		headers["type"] = *action.Properties.Type
	}
	if action.Properties.Category != nil {
		headers["category"] = *action.Properties.Category
	}
	msg.Headers = headers

Sonja Happ's avatar
Sonja Happ committed
181
182
183
	err = CheckConnection()
	if err != nil {
		return err
184
185
	}

186
	//log.Println("AMQP: Sending message", string(msg.Body))
187
188
189
190
191
192
193
194
195
	err = client.channel.Publish(VILLAS_EXCHANGE,
		"",
		false,
		false,
		msg)
	return err

}

196
197
198
199
200
201
202
203
204
205
//func PingAMQP() error {
//	log.Println("AMQP: sending ping command to all ICs")
//
//	var a Action
//	a.Act = "ping"
//	*a.Properties.UUID = ""
//
//	err := sendActionAMQP(a)
//	return err
//}
206
207
208

func CheckConnection() error {

209
210
211
212
213
214
	if client.connection != nil {
		if client.connection.IsClosed() {
			return fmt.Errorf("connection to broker is closed")
		}
	} else {
		return fmt.Errorf("connection is nil")
215
216
217
218
	}

	return nil
}
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238

func StartAMQP(AMQPurl string, api *gin.RouterGroup) error {
	if AMQPurl != "" {
		log.Println("Starting AMQP client")

		err := ConnectAMQP(AMQPurl)
		if err != nil {
			return err
		}

		// register IC action endpoint only if AMQP client is used
		RegisterAMQPEndpoint(api.Group("/ic"))

		// Periodically call the Ping function to check which ICs are still there
		ticker := time.NewTicker(10 * time.Second)
		go func() {

			for {
				select {
				case <-ticker.C:
239
240
241
242
243
244
245
					//TODO Add a useful regular event here
					/*
						err = PingAMQP()
						if err != nil {
							log.Println("AMQP Error: ", err.Error())
						}
					*/
246
247
248
249
250
251
252
253
254
255
				}
			}

		}()

		log.Printf("Connected AMQP client to %s", AMQPurl)
	}

	return nil
}
256

257
func processMessage(message amqp.Delivery) error {
258
259
260
261

	var payload ICUpdate
	err := json.Unmarshal(message.Body, &payload)
	if err != nil {
262
		return fmt.Errorf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err)
263
264
	}

265
	//payload.Status.UUID = new(string)
266
267
	headers := amqp.Table(message.Headers)
	*payload.Status.UUID = fmt.Sprintf("%v", headers["uuid"])
268

269
	if payload.Status != nil {
270
		//log.Println("Processing AMQP message: ", string(message.Body))
271
		// if a message contains a "state" field, it is an update for an IC
Sonja Happ's avatar
Sonja Happ committed
272
		ICUUID := *payload.Status.UUID
273
		_, err = uuid.Parse(ICUUID)
274

275
276
277
278
		if err != nil {
			return fmt.Errorf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
		}
		var sToBeUpdated InfrastructureComponent
279
		err = sToBeUpdated.byUUID(ICUUID)
280
281
282

		if err == gorm.ErrRecordNotFound {
			// create new record
283
			err = createExternalIC(payload)
284
285
286
287
288
		} else if err != nil {
			// database error
			err = fmt.Errorf("AMQP: Database error for IC %v DB error message: %v", ICUUID, err)
		} else {
			// update record based on payload
289
290
291
292
293
294
295
296
297
			err = sToBeUpdated.updateExternalIC(payload)
		}
	}
	return err
}

func createExternalIC(payload ICUpdate) error {

	var newICReq AddICRequest
Sonja Happ's avatar
Sonja Happ committed
298
	newICReq.InfrastructureComponent.UUID = *payload.Status.UUID
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
	if payload.Status.Name == nil ||
		payload.Status.Category == nil ||
		payload.Status.Type == nil {
		// cannot create new IC because required information (name, type, and/or category missing)
		return fmt.Errorf("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category")
	}
	newICReq.InfrastructureComponent.Name = *payload.Status.Name
	newICReq.InfrastructureComponent.Category = *payload.Status.Category
	newICReq.InfrastructureComponent.Type = *payload.Status.Type

	// add optional params
	if payload.Status.State != nil {
		newICReq.InfrastructureComponent.State = *payload.Status.State
	} else {
		newICReq.InfrastructureComponent.State = "unknown"
	}
	if newICReq.InfrastructureComponent.State == "gone" {
		// Check if state is "gone" and abort creation of IC in this case
		log.Println("AMQP: Aborting creation of IC with state gone")
		return nil
	}

	if payload.Status.WS_url != nil {
		newICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url
	}
	if payload.Status.API_url != nil {
		newICReq.InfrastructureComponent.APIURL = *payload.Status.API_url
	}
	if payload.Status.Location != nil {
		newICReq.InfrastructureComponent.Location = *payload.Status.Location
	}
	if payload.Status.Description != nil {
		newICReq.InfrastructureComponent.Description = *payload.Status.Description
	}
	if payload.Status.Uptime != nil {
		newICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime
	}
	// TODO add JSON start parameter scheme

	// set managed externally to true because this IC is created via AMQP
	newICReq.InfrastructureComponent.ManagedExternally = newTrue()

	// Validate the new IC
	err := newICReq.validate()
	if err != nil {
		return fmt.Errorf("AMQP: Validation of new IC failed: %v", err)
	}

	// Create the new IC
	newIC, err := newICReq.createIC(true)
	if err != nil {
		return fmt.Errorf("AMQP: Creating new IC failed: %v", err)
	}

	// save IC
	err = newIC.save()
	if err != nil {
		return fmt.Errorf("AMQP: Saving new IC to DB failed: %v", err)
	}

	log.Println("AMQP: Created IC with UUID ", newIC.UUID)
	return nil
}

func (s *InfrastructureComponent) updateExternalIC(payload ICUpdate) error {

	var updatedICReq UpdateICRequest
	if payload.Status.State != nil {
		updatedICReq.InfrastructureComponent.State = *payload.Status.State

		if *payload.Status.State == "gone" {
			// remove IC from DB
			log.Println("AMQP: Deleting IC with state gone")
			err := s.delete(true)
			if err != nil {
				// if component could not be deleted there are still configurations using it in the DB
				// continue with the update to save the new state of the component and get back to the deletion later
				log.Println("AMQP: Deletion of IC postponed (config(s) associated to it)")
			}

379
		}
380
	}
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
	if payload.Status.Type != nil {
		updatedICReq.InfrastructureComponent.Type = *payload.Status.Type
	}
	if payload.Status.Category != nil {
		updatedICReq.InfrastructureComponent.Category = *payload.Status.Category
	}
	if payload.Status.Name != nil {
		updatedICReq.InfrastructureComponent.Name = *payload.Status.Name
	}
	if payload.Status.WS_url != nil {
		updatedICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url
	}
	if payload.Status.API_url != nil {
		updatedICReq.InfrastructureComponent.APIURL = *payload.Status.API_url
	}
	if payload.Status.Location != nil {
		//postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Status.Location + `"}`)}
		updatedICReq.InfrastructureComponent.Location = *payload.Status.Location
	}
	if payload.Status.Description != nil {
		updatedICReq.InfrastructureComponent.Description = *payload.Status.Description
	}
	if payload.Status.Uptime != nil {
		updatedICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime
	}
	// TODO add JSON start parameter scheme

	// Validate the updated IC
	err := updatedICReq.validate()
	if err != nil {
		return fmt.Errorf("AMQP: Validation of updated IC failed: %v", err)
	}

	// Create the updated IC from old IC
	updatedIC := updatedICReq.updatedIC(*s)

	// Finally update the IC in the DB
	err = s.update(updatedIC)
	if err != nil {
		return fmt.Errorf("AMQP: Unable to update IC %v in DB: %v", s.Name, err)
	}

	log.Println("AMQP: Updated IC with UUID ", s.UUID)
424
425
	return err
}
426
427
428
429
430
431
432
433
434
435

func newTrue() *bool {
	b := true
	return &b
}

func newFalse() *bool {
	b := false
	return &b
}