ic_amqpclient.go 11.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** AMQP package, client.
*
* @author Sonja Happ <sonja.happ@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASweb-backend-go
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
22
package infrastructure_component
23
24
25
26

import (
	"encoding/json"
	"fmt"
27
	"github.com/gin-gonic/gin"
28
	"github.com/google/uuid"
29
	"github.com/jinzhu/gorm"
30
	"github.com/streadway/amqp"
31
	"log"
32
33
34
35
36
37
38
39
40
41
42
43
	"time"
)

const VILLAS_EXCHANGE = "villas"

type AMQPclient struct {
	connection *amqp.Connection
	channel    *amqp.Channel
	replies    <-chan amqp.Delivery
}

type Action struct {
44
45
46
47
48
49
50
51
52
53
54
55
	Act        string `json:"action"`
	When       int64  `json:"when"`
	Properties struct {
		UUID        *string `json:"uuid"`
		Name        *string `json:"name"`
		Category    *string `json:"category"`
		Type        *string `json:"type"`
		Location    *string `json:"location"`
		WS_url      *string `json:"ws_url"`
		API_url     *string `json:"api_url"`
		Description *string `json:"description"`
	} `json:"properties"`
56
57
}

Sonja Happ's avatar
Sonja Happ committed
58
59
60
61
62
63
64
65
66
67
68
69
type ICStatus struct {
	State       *string  `json:"state"`
	Name        *string  `json:"name"`
	Category    *string  `json:"category"`
	Type        *string  `json:"type"`
	Location    *string  `json:"location"`
	WS_url      *string  `json:"ws_url"`
	API_url     *string  `json:"api_url"`
	Description *string  `json:"description"`
	Uptime      *float64 `json:"uptime"` // TODO check if data type of uptime is float64 or int
}

70
type ICUpdate struct {
Sonja Happ's avatar
Sonja Happ committed
71
	Status *ICStatus `json:"status"`
Sonja Happ's avatar
Sonja Happ committed
72
	// TODO add JSON start parameter scheme
73
74
}

75
76
77
78
79
80
81
82
83
var client AMQPclient

func ConnectAMQP(uri string) error {

	var err error

	// connect to broker
	client.connection, err = amqp.Dial(uri)
	if err != nil {
84
		return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v, error: %v", uri, err)
85
86
87
88
89
	}

	// create channel
	client.channel, err = client.connection.Channel()
	if err != nil {
90
		return fmt.Errorf("AMQP: failed to open a channel, error: %v", err)
91
92
93
94
95
96
97
98
99
100
	}
	// declare exchange
	err = client.channel.ExchangeDeclare(VILLAS_EXCHANGE,
		"headers",
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
101
		return fmt.Errorf("AMQP: failed to declare the exchange, error: %v", err)
102
103
	}

104
105
	// add a queue for the ICs
	ICQueue, err := client.channel.QueueDeclare("infrastructure_components",
106
107
108
109
110
111
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
112
		return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err)
113
114
	}

115
	err = client.channel.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil)
116
	if err != nil {
117
		return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err)
118
119
120
	}

	// consume deliveries
121
	client.replies, err = client.channel.Consume(ICQueue.Name,
122
		"",
123
		true,
124
125
126
127
128
		false,
		false,
		false,
		nil)
	if err != nil {
129
		return fmt.Errorf("AMQP: failed to consume deliveries, error: %v", err)
130
131
132
133
	}

	// consuming queue
	go func() {
134
135
		for {
			for message := range client.replies {
136
137
138
139
				err = processMessage(message)
				if err != nil {
					log.Println(err.Error())
				}
140
			}
141
			time.Sleep(2) // sleep for 2 sek
142
143
144
		}
	}()

145
146
	log.Printf(" AMQP: Waiting for messages... ")

147
148
149
	return nil
}

150
func sendActionAMQP(action Action) error {
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165

	payload, err := json.Marshal(action)
	if err != nil {
		return err
	}

	msg := amqp.Publishing{
		DeliveryMode:    2,
		Timestamp:       time.Now(),
		ContentType:     "application/json",
		ContentEncoding: "utf-8",
		Priority:        0,
		Body:            payload,
	}

166
167
168
169
170
171
172
173
174
175
176
177
178
179
	// set message headers
	var headers map[string]interface{}
	headers = make(map[string]interface{}) // empty map
	if action.Properties.UUID != nil {
		headers["uuid"] = *action.Properties.UUID
	}
	if action.Properties.Type != nil {
		headers["type"] = *action.Properties.Type
	}
	if action.Properties.Category != nil {
		headers["category"] = *action.Properties.Category
	}
	msg.Headers = headers

Sonja Happ's avatar
Sonja Happ committed
180
181
182
	err = CheckConnection()
	if err != nil {
		return err
183
184
	}

185
	//log.Println("AMQP: Sending message", string(msg.Body))
186
187
188
189
190
191
192
193
194
	err = client.channel.Publish(VILLAS_EXCHANGE,
		"",
		false,
		false,
		msg)
	return err

}

195
196
197
198
199
200
201
202
203
204
//func PingAMQP() error {
//	log.Println("AMQP: sending ping command to all ICs")
//
//	var a Action
//	a.Act = "ping"
//	*a.Properties.UUID = ""
//
//	err := sendActionAMQP(a)
//	return err
//}
205
206
207

func CheckConnection() error {

208
209
210
211
212
213
	if client.connection != nil {
		if client.connection.IsClosed() {
			return fmt.Errorf("connection to broker is closed")
		}
	} else {
		return fmt.Errorf("connection is nil")
214
215
216
217
	}

	return nil
}
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237

func StartAMQP(AMQPurl string, api *gin.RouterGroup) error {
	if AMQPurl != "" {
		log.Println("Starting AMQP client")

		err := ConnectAMQP(AMQPurl)
		if err != nil {
			return err
		}

		// register IC action endpoint only if AMQP client is used
		RegisterAMQPEndpoint(api.Group("/ic"))

		// Periodically call the Ping function to check which ICs are still there
		ticker := time.NewTicker(10 * time.Second)
		go func() {

			for {
				select {
				case <-ticker.C:
238
239
240
241
242
243
244
					//TODO Add a useful regular event here
					/*
						err = PingAMQP()
						if err != nil {
							log.Println("AMQP Error: ", err.Error())
						}
					*/
245
246
247
248
249
250
251
252
253
254
				}
			}

		}()

		log.Printf("Connected AMQP client to %s", AMQPurl)
	}

	return nil
}
255

256
func processMessage(message amqp.Delivery) error {
257
258
259
260

	var payload ICUpdate
	err := json.Unmarshal(message.Body, &payload)
	if err != nil {
261
		return fmt.Errorf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err)
262
263
	}

264
	if payload.Status != nil {
265
		//log.Println("Processing AMQP message: ", string(message.Body))
266
		// if a message contains a "state" field, it is an update for an IC
267
268
269

		headers := amqp.Table(message.Headers)
		ICUUID := fmt.Sprintf("%v", headers["uuid"])
270
		_, err = uuid.Parse(ICUUID)
271

272
273
274
275
		if err != nil {
			return fmt.Errorf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
		}
		var sToBeUpdated InfrastructureComponent
276
		err = sToBeUpdated.byUUID(ICUUID)
277
278
279

		if err == gorm.ErrRecordNotFound {
			// create new record
280
			err = createExternalIC(payload, ICUUID)
281
282
283
284
285
		} else if err != nil {
			// database error
			err = fmt.Errorf("AMQP: Database error for IC %v DB error message: %v", ICUUID, err)
		} else {
			// update record based on payload
286
287
288
289
290
291
			err = sToBeUpdated.updateExternalIC(payload)
		}
	}
	return err
}

292
func createExternalIC(payload ICUpdate, ICUUID string) error {
293
294

	var newICReq AddICRequest
295
	newICReq.InfrastructureComponent.UUID = ICUUID
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
	if payload.Status.Name == nil ||
		payload.Status.Category == nil ||
		payload.Status.Type == nil {
		// cannot create new IC because required information (name, type, and/or category missing)
		return fmt.Errorf("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category")
	}
	newICReq.InfrastructureComponent.Name = *payload.Status.Name
	newICReq.InfrastructureComponent.Category = *payload.Status.Category
	newICReq.InfrastructureComponent.Type = *payload.Status.Type

	// add optional params
	if payload.Status.State != nil {
		newICReq.InfrastructureComponent.State = *payload.Status.State
	} else {
		newICReq.InfrastructureComponent.State = "unknown"
	}
	if newICReq.InfrastructureComponent.State == "gone" {
		// Check if state is "gone" and abort creation of IC in this case
		log.Println("AMQP: Aborting creation of IC with state gone")
		return nil
	}

	if payload.Status.WS_url != nil {
		newICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url
	}
	if payload.Status.API_url != nil {
		newICReq.InfrastructureComponent.APIURL = *payload.Status.API_url
	}
	if payload.Status.Location != nil {
		newICReq.InfrastructureComponent.Location = *payload.Status.Location
	}
	if payload.Status.Description != nil {
		newICReq.InfrastructureComponent.Description = *payload.Status.Description
	}
	if payload.Status.Uptime != nil {
		newICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime
	}
	// TODO add JSON start parameter scheme

	// set managed externally to true because this IC is created via AMQP
	newICReq.InfrastructureComponent.ManagedExternally = newTrue()

	// Validate the new IC
	err := newICReq.validate()
	if err != nil {
		return fmt.Errorf("AMQP: Validation of new IC failed: %v", err)
	}

	// Create the new IC
	newIC, err := newICReq.createIC(true)
	if err != nil {
		return fmt.Errorf("AMQP: Creating new IC failed: %v", err)
	}

	// save IC
	err = newIC.save()
	if err != nil {
		return fmt.Errorf("AMQP: Saving new IC to DB failed: %v", err)
	}

	log.Println("AMQP: Created IC with UUID ", newIC.UUID)
	return nil
}

func (s *InfrastructureComponent) updateExternalIC(payload ICUpdate) error {

	var updatedICReq UpdateICRequest
	if payload.Status.State != nil {
		updatedICReq.InfrastructureComponent.State = *payload.Status.State

		if *payload.Status.State == "gone" {
			// remove IC from DB
			log.Println("AMQP: Deleting IC with state gone")
			err := s.delete(true)
			if err != nil {
				// if component could not be deleted there are still configurations using it in the DB
				// continue with the update to save the new state of the component and get back to the deletion later
				log.Println("AMQP: Deletion of IC postponed (config(s) associated to it)")
			}

376
		}
377
	}
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
	if payload.Status.Type != nil {
		updatedICReq.InfrastructureComponent.Type = *payload.Status.Type
	}
	if payload.Status.Category != nil {
		updatedICReq.InfrastructureComponent.Category = *payload.Status.Category
	}
	if payload.Status.Name != nil {
		updatedICReq.InfrastructureComponent.Name = *payload.Status.Name
	}
	if payload.Status.WS_url != nil {
		updatedICReq.InfrastructureComponent.WebsocketURL = *payload.Status.WS_url
	}
	if payload.Status.API_url != nil {
		updatedICReq.InfrastructureComponent.APIURL = *payload.Status.API_url
	}
	if payload.Status.Location != nil {
		//postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Status.Location + `"}`)}
		updatedICReq.InfrastructureComponent.Location = *payload.Status.Location
	}
	if payload.Status.Description != nil {
		updatedICReq.InfrastructureComponent.Description = *payload.Status.Description
	}
	if payload.Status.Uptime != nil {
		updatedICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime
	}
	// TODO add JSON start parameter scheme

	// Validate the updated IC
	err := updatedICReq.validate()
	if err != nil {
		return fmt.Errorf("AMQP: Validation of updated IC failed: %v", err)
	}

	// Create the updated IC from old IC
	updatedIC := updatedICReq.updatedIC(*s)

	// Finally update the IC in the DB
	err = s.update(updatedIC)
	if err != nil {
		return fmt.Errorf("AMQP: Unable to update IC %v in DB: %v", s.Name, err)
	}

	log.Println("AMQP: Updated IC with UUID ", s.UUID)
421
422
	return err
}
423
424
425
426
427
428
429
430
431
432

func newTrue() *bool {
	b := true
	return &b
}

func newFalse() *bool {
	b := false
	return &b
}