ic_amqpclient.go 12.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** AMQP package, client.
*
* @author Sonja Happ <sonja.happ@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASweb-backend-go
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
22
package infrastructure_component
23
24
25
26

import (
	"encoding/json"
	"fmt"
27
	"github.com/gin-gonic/gin"
28
	"github.com/google/uuid"
29
	"github.com/jinzhu/gorm"
30
	"github.com/jinzhu/gorm/dialects/postgres"
31
	"github.com/streadway/amqp"
32
	"log"
33
34
35
36
37
38
39
40
41
42
43
44
	"time"
)

const VILLAS_EXCHANGE = "villas"

type AMQPclient struct {
	connection *amqp.Connection
	channel    *amqp.Channel
	replies    <-chan amqp.Delivery
}

type Action struct {
45
46
47
48
49
50
51
52
53
54
55
56
	Act        string `json:"action"`
	When       int64  `json:"when"`
	Properties struct {
		UUID        *string `json:"uuid"`
		Name        *string `json:"name"`
		Category    *string `json:"category"`
		Type        *string `json:"type"`
		Location    *string `json:"location"`
		WS_url      *string `json:"ws_url"`
		API_url     *string `json:"api_url"`
		Description *string `json:"description"`
	} `json:"properties"`
57
58
}

Sonja Happ's avatar
Sonja Happ committed
59
type ICStatus struct {
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
	State   *string  `json:"state"`
	Version *string  `json:"version"`
	Uptime  *float64 `json:"uptime"`
	Result  *string  `json:"result"`
	Error   *string  `json:"error"`
}

type ICProperties struct {
	Name        *string `json:"name"`
	Description *string `json:"description"`
	Location    *string `json:"location"`
	Owner       *string `json:"owner"`
	WS_url      *string `json:"ws_url"`
	API_url     *string `json:"api_url"`
	Category    *string `json:"category"`
	Type        *string `json:"type"`
Sonja Happ's avatar
Sonja Happ committed
76
77
}

78
type ICUpdate struct {
79
80
81
	Status     *ICStatus     `json:"status"`
	Properties *ICProperties `json:"properties"`
	When       *float64      `json:"when"`
Sonja Happ's avatar
Sonja Happ committed
82
	// TODO add JSON start parameter scheme
83
84
}

85
86
87
88
89
90
91
92
93
var client AMQPclient

func ConnectAMQP(uri string) error {

	var err error

	// connect to broker
	client.connection, err = amqp.Dial(uri)
	if err != nil {
94
		return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v, error: %v", uri, err)
95
96
97
98
99
	}

	// create channel
	client.channel, err = client.connection.Channel()
	if err != nil {
100
		return fmt.Errorf("AMQP: failed to open a channel, error: %v", err)
101
102
103
104
105
106
107
108
109
110
	}
	// declare exchange
	err = client.channel.ExchangeDeclare(VILLAS_EXCHANGE,
		"headers",
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
111
		return fmt.Errorf("AMQP: failed to declare the exchange, error: %v", err)
112
113
	}

114
115
	// add a queue for the ICs
	ICQueue, err := client.channel.QueueDeclare("infrastructure_components",
116
117
118
119
120
121
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
122
		return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err)
123
124
	}

125
	err = client.channel.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil)
126
	if err != nil {
127
		return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err)
128
129
130
	}

	// consume deliveries
131
	client.replies, err = client.channel.Consume(ICQueue.Name,
132
		"",
133
		true,
134
135
136
137
138
		false,
		false,
		false,
		nil)
	if err != nil {
139
		return fmt.Errorf("AMQP: failed to consume deliveries, error: %v", err)
140
141
142
143
	}

	// consuming queue
	go func() {
144
145
		for {
			for message := range client.replies {
146
147
148
149
				err = processMessage(message)
				if err != nil {
					log.Println(err.Error())
				}
150
			}
151
			time.Sleep(2) // sleep for 2 sek
152
153
154
		}
	}()

155
156
	log.Printf(" AMQP: Waiting for messages... ")

157
158
159
	return nil
}

160
func sendActionAMQP(action Action) error {
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175

	payload, err := json.Marshal(action)
	if err != nil {
		return err
	}

	msg := amqp.Publishing{
		DeliveryMode:    2,
		Timestamp:       time.Now(),
		ContentType:     "application/json",
		ContentEncoding: "utf-8",
		Priority:        0,
		Body:            payload,
	}

176
177
178
179
180
181
182
183
184
185
186
187
188
189
	// set message headers
	var headers map[string]interface{}
	headers = make(map[string]interface{}) // empty map
	if action.Properties.UUID != nil {
		headers["uuid"] = *action.Properties.UUID
	}
	if action.Properties.Type != nil {
		headers["type"] = *action.Properties.Type
	}
	if action.Properties.Category != nil {
		headers["category"] = *action.Properties.Category
	}
	msg.Headers = headers

Sonja Happ's avatar
Sonja Happ committed
190
191
192
	err = CheckConnection()
	if err != nil {
		return err
193
194
	}

195
	//log.Println("AMQP: Sending message", string(msg.Body))
196
197
198
199
200
201
202
203
204
	err = client.channel.Publish(VILLAS_EXCHANGE,
		"",
		false,
		false,
		msg)
	return err

}

205
206
207
208
209
210
211
212
213
214
//func PingAMQP() error {
//	log.Println("AMQP: sending ping command to all ICs")
//
//	var a Action
//	a.Act = "ping"
//	*a.Properties.UUID = ""
//
//	err := sendActionAMQP(a)
//	return err
//}
215
216
217

func CheckConnection() error {

218
219
220
221
222
223
	if client.connection != nil {
		if client.connection.IsClosed() {
			return fmt.Errorf("connection to broker is closed")
		}
	} else {
		return fmt.Errorf("connection is nil")
224
225
226
227
	}

	return nil
}
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247

func StartAMQP(AMQPurl string, api *gin.RouterGroup) error {
	if AMQPurl != "" {
		log.Println("Starting AMQP client")

		err := ConnectAMQP(AMQPurl)
		if err != nil {
			return err
		}

		// register IC action endpoint only if AMQP client is used
		RegisterAMQPEndpoint(api.Group("/ic"))

		// Periodically call the Ping function to check which ICs are still there
		ticker := time.NewTicker(10 * time.Second)
		go func() {

			for {
				select {
				case <-ticker.C:
248
249
250
251
252
253
254
					//TODO Add a useful regular event here
					/*
						err = PingAMQP()
						if err != nil {
							log.Println("AMQP Error: ", err.Error())
						}
					*/
255
256
257
258
259
260
261
262
263
264
				}
			}

		}()

		log.Printf("Connected AMQP client to %s", AMQPurl)
	}

	return nil
}
265

266
func processMessage(message amqp.Delivery) error {
267
268
269
270

	var payload ICUpdate
	err := json.Unmarshal(message.Body, &payload)
	if err != nil {
271
		return fmt.Errorf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err)
272
273
	}

274
	if payload.Status != nil || payload.Properties != nil {
275
		//log.Println("Processing AMQP message: ", string(message.Body))
276
		// if a message contains a "state" field, it is an update for an IC
277
278
279

		headers := amqp.Table(message.Headers)
		ICUUID := fmt.Sprintf("%v", headers["uuid"])
280
		_, err = uuid.Parse(ICUUID)
281

282
283
284
285
		if err != nil {
			return fmt.Errorf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
		}
		var sToBeUpdated InfrastructureComponent
286
		err = sToBeUpdated.byUUID(ICUUID)
287
288
289

		if err == gorm.ErrRecordNotFound {
			// create new record
290
			err = createExternalIC(payload, ICUUID)
291
292
293
294
295
		} else if err != nil {
			// database error
			err = fmt.Errorf("AMQP: Database error for IC %v DB error message: %v", ICUUID, err)
		} else {
			// update record based on payload
296
297
			err = sToBeUpdated.updateExternalIC(payload)
		}
298
299
	} else {
		log.Println("INFO: ignoring message, payload neither contains status nor properties", message)
300
	}
301

302
303
304
	return err
}

305
func createExternalIC(payload ICUpdate, ICUUID string) error {
306
307

	var newICReq AddICRequest
308
	newICReq.InfrastructureComponent.UUID = ICUUID
309
310
311
	if payload.Properties.Name == nil ||
		payload.Properties.Category == nil ||
		payload.Properties.Type == nil {
312
313
314
		// cannot create new IC because required information (name, type, and/or category missing)
		return fmt.Errorf("AMQP: Cannot create new IC, required field(s) is/are missing: name, type, category")
	}
315
316
317
	newICReq.InfrastructureComponent.Name = *payload.Properties.Name
	newICReq.InfrastructureComponent.Category = *payload.Properties.Category
	newICReq.InfrastructureComponent.Type = *payload.Properties.Type
318
319
320
321
322
323
324
325
326
327
328
329
330

	// add optional params
	if payload.Status.State != nil {
		newICReq.InfrastructureComponent.State = *payload.Status.State
	} else {
		newICReq.InfrastructureComponent.State = "unknown"
	}
	if newICReq.InfrastructureComponent.State == "gone" {
		// Check if state is "gone" and abort creation of IC in this case
		log.Println("AMQP: Aborting creation of IC with state gone")
		return nil
	}

331
332
	if payload.Properties.WS_url != nil {
		newICReq.InfrastructureComponent.WebsocketURL = *payload.Properties.WS_url
333
	}
334
335
	if payload.Properties.API_url != nil {
		newICReq.InfrastructureComponent.APIURL = *payload.Properties.API_url
336
	}
337
338
	if payload.Properties.Location != nil {
		newICReq.InfrastructureComponent.Location = *payload.Properties.Location
339
	}
340
341
	if payload.Properties.Description != nil {
		newICReq.InfrastructureComponent.Description = *payload.Properties.Description
342
343
344
345
346
347
348
349
350
	}
	if payload.Status.Uptime != nil {
		newICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime
	}
	// TODO add JSON start parameter scheme

	// set managed externally to true because this IC is created via AMQP
	newICReq.InfrastructureComponent.ManagedExternally = newTrue()

351
352
353
354
355
356
357
	// set raw status update if IC
	payloadRaw, err := json.Marshal(payload)
	if err != nil {
		return fmt.Errorf("AMQP: failed to marshal raw payload: %v", err)
	}
	newICReq.InfrastructureComponent.StatusUpdateRaw = postgres.Jsonb{RawMessage: payloadRaw}

358
	// Validate the new IC
359
	err = newICReq.validate()
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
	if err != nil {
		return fmt.Errorf("AMQP: Validation of new IC failed: %v", err)
	}

	// Create the new IC
	newIC, err := newICReq.createIC(true)
	if err != nil {
		return fmt.Errorf("AMQP: Creating new IC failed: %v", err)
	}

	// save IC
	err = newIC.save()
	if err != nil {
		return fmt.Errorf("AMQP: Saving new IC to DB failed: %v", err)
	}

	log.Println("AMQP: Created IC with UUID ", newIC.UUID)
	return nil
}

func (s *InfrastructureComponent) updateExternalIC(payload ICUpdate) error {

	var updatedICReq UpdateICRequest
	if payload.Status.State != nil {
		updatedICReq.InfrastructureComponent.State = *payload.Status.State

		if *payload.Status.State == "gone" {
			// remove IC from DB
			log.Println("AMQP: Deleting IC with state gone")
			err := s.delete(true)
			if err != nil {
				// if component could not be deleted there are still configurations using it in the DB
				// continue with the update to save the new state of the component and get back to the deletion later
				log.Println("AMQP: Deletion of IC postponed (config(s) associated to it)")
			}

396
		}
397
	}
398
399
400
401
402
403
404

	if payload.Status.Uptime != nil {
		updatedICReq.InfrastructureComponent.Uptime = *payload.Status.Uptime
	}

	if payload.Properties.Type != nil {
		updatedICReq.InfrastructureComponent.Type = *payload.Properties.Type
405
	}
406
407
	if payload.Properties.Category != nil {
		updatedICReq.InfrastructureComponent.Category = *payload.Properties.Category
408
	}
409
410
	if payload.Properties.Name != nil {
		updatedICReq.InfrastructureComponent.Name = *payload.Properties.Name
411
	}
412
413
	if payload.Properties.WS_url != nil {
		updatedICReq.InfrastructureComponent.WebsocketURL = *payload.Properties.WS_url
414
	}
415
416
	if payload.Properties.API_url != nil {
		updatedICReq.InfrastructureComponent.APIURL = *payload.Properties.API_url
417
	}
418
	if payload.Properties.Location != nil {
419
		//postgres.Jsonb{json.RawMessage(`{"location" : " ` + *payload.Status.Location + `"}`)}
420
		updatedICReq.InfrastructureComponent.Location = *payload.Properties.Location
421
	}
422
423
	if payload.Properties.Description != nil {
		updatedICReq.InfrastructureComponent.Description = *payload.Properties.Description
424
	}
425
426
427
428
429

	// set raw status update if IC
	payloadRaw, err := json.Marshal(payload)
	if err != nil {
		return fmt.Errorf("AMQP: failed to marshal raw payload: %v", err)
430
	}
431
432
	updatedICReq.InfrastructureComponent.StatusUpdateRaw = postgres.Jsonb{RawMessage: payloadRaw}

433
434
435
	// TODO add JSON start parameter scheme

	// Validate the updated IC
436
	err = updatedICReq.validate()
437
438
439
440
441
442
443
444
445
446
447
448
449
450
	if err != nil {
		return fmt.Errorf("AMQP: Validation of updated IC failed: %v", err)
	}

	// Create the updated IC from old IC
	updatedIC := updatedICReq.updatedIC(*s)

	// Finally update the IC in the DB
	err = s.update(updatedIC)
	if err != nil {
		return fmt.Errorf("AMQP: Unable to update IC %v in DB: %v", s.Name, err)
	}

	log.Println("AMQP: Updated IC with UUID ", s.UUID)
451
452
	return err
}
453
454
455
456
457
458
459
460
461
462

func newTrue() *bool {
	b := true
	return &b
}

func newFalse() *bool {
	b := false
	return &b
}