ic_amqpclient.go 10.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** AMQP package, client.
*
* @author Sonja Happ <sonja.happ@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASweb-backend-go
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
22
package infrastructure_component
23
24
25
26

import (
	"encoding/json"
	"fmt"
27
28
29
	"log"
	"time"

30
	"github.com/gin-gonic/gin"
31
	"github.com/google/uuid"
32
	"github.com/jinzhu/gorm"
33
	"github.com/jinzhu/gorm/dialects/postgres"
34
35
36
37
38
39
40
	"github.com/streadway/amqp"
)

const VILLAS_EXCHANGE = "villas"

type AMQPclient struct {
	connection *amqp.Connection
41
42
	sendCh     *amqp.Channel
	recvCh     *amqp.Channel
43
44
45
}

type Action struct {
46
47
	Act        string `json:"action"`
	When       int64  `json:"when"`
Sonja Happ's avatar
Sonja Happ committed
48
49
50
51
52
53
54
55
56
57
	Parameters struct {
		UUID        string `json:"uuid,omitempty"`
		Name        string `json:"name,omitempty"`
		Category    string `json:"category,omitempty"`
		Type        string `json:"type,omitempty"`
		Location    string `json:"location,omitempty"`
		WS_url      string `json:"ws_url,omitempty"`
		API_url     string `json:"api_url,omitempty"`
		Description string `json:"description,omitempty"`
	} `json:"parameters,omitempty"`
58
59
}

Sonja Happ's avatar
Sonja Happ committed
60
type ICStatus struct {
Sonja Happ's avatar
Sonja Happ committed
61
62
63
64
65
	State   string  `json:"state"`
	Version string  `json:"version"`
	Uptime  float64 `json:"uptime"`
	Result  string  `json:"result"`
	Error   string  `json:"error"`
66
67
68
}

type ICProperties struct {
Sonja Happ's avatar
Sonja Happ committed
69
70
71
72
73
74
75
76
	Name        string `json:"name"`
	Description string `json:"description"`
	Location    string `json:"location"`
	Owner       string `json:"owner"`
	WS_url      string `json:"ws_url"`
	API_url     string `json:"api_url"`
	Category    string `json:"category"`
	Type        string `json:"type"`
Sonja Happ's avatar
Sonja Happ committed
77
78
}

79
type ICUpdate struct {
Sonja Happ's avatar
Sonja Happ committed
80
81
82
83
	Status     ICStatus     `json:"status"`
	Properties ICProperties `json:"properties"`
	When       float64      `json:"when"`
	Action     string       `json:"action"`
Sonja Happ's avatar
Sonja Happ committed
84
	// TODO add JSON start parameter scheme
85
86
}

87
88
89
90
91
92
93
94
95
var client AMQPclient

func ConnectAMQP(uri string) error {

	var err error

	// connect to broker
	client.connection, err = amqp.Dial(uri)
	if err != nil {
96
		return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v, error: %v", uri, err)
97
98
	}

99
100
	// create sendCh
	client.sendCh, err = client.connection.Channel()
101
	if err != nil {
102
		return fmt.Errorf("AMQP: failed to open a sendCh, error: %v", err)
103
104
	}
	// declare exchange
105
	err = client.sendCh.ExchangeDeclare(VILLAS_EXCHANGE,
106
107
108
109
110
111
112
		"headers",
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
113
		return fmt.Errorf("AMQP: failed to declare the exchange, error: %v", err)
114
115
	}

116
	// add a queue for the ICs
117
	ICQueue, err := client.sendCh.QueueDeclare("infrastructure_components",
118
119
120
121
122
123
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
124
		return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err)
125
126
	}

127
	err = client.sendCh.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil)
128
	if err != nil {
129
		return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err)
130
131
	}

132
133
134
135
136
137
138
139
	// create receive channel
	client.recvCh, err = client.connection.Channel()
	if err != nil {
		return fmt.Errorf("AMQP: failed to open a recvCh, error: %v", err)
	}

	// start deliveries
	messages, err := client.recvCh.Consume(ICQueue.Name,
140
		"",
141
		true,
142
143
144
145
146
		false,
		false,
		false,
		nil)
	if err != nil {
147
		return fmt.Errorf("AMQP: failed to start deliveries: %v", err)
148
149
	}

150
	// consume deliveries
151
	go func() {
152
		for {
153
			for message := range messages {
154
155
				err = processMessage(message)
				if err != nil {
Sonja Happ's avatar
Sonja Happ committed
156
					log.Println("AMQP: Error processing message: ", err.Error())
157
				}
158
159
160
161
			}
		}
	}()

162
163
	log.Printf(" AMQP: Waiting for messages... ")

164
165
166
	return nil
}

167
func sendActionAMQP(action Action) error {
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182

	payload, err := json.Marshal(action)
	if err != nil {
		return err
	}

	msg := amqp.Publishing{
		DeliveryMode:    2,
		Timestamp:       time.Now(),
		ContentType:     "application/json",
		ContentEncoding: "utf-8",
		Priority:        0,
		Body:            payload,
	}

183
184
185
	// set message headers
	var headers map[string]interface{}
	headers = make(map[string]interface{}) // empty map
Sonja Happ's avatar
Sonja Happ committed
186
187
188
189
	headers["uuid"] = action.Parameters.UUID
	headers["type"] = action.Parameters.Type
	headers["category"] = action.Parameters.Category

190
191
	msg.Headers = headers

Sonja Happ's avatar
Sonja Happ committed
192
193
194
	err = CheckConnection()
	if err != nil {
		return err
195
196
	}

197
	//log.Println("AMQP: Sending message", string(msg.Body))
198
	err = client.sendCh.Publish(VILLAS_EXCHANGE,
199
200
201
202
203
204
205
206
		"",
		false,
		false,
		msg)
	return err

}

207
208
209
210
211
//func PingAMQP() error {
//	log.Println("AMQP: sending ping command to all ICs")
//
//	var a Action
//	a.Act = "ping"
Sonja Happ's avatar
Sonja Happ committed
212
//	*a.Parameters.UUID = ""
213
214
215
216
//
//	err := sendActionAMQP(a)
//	return err
//}
217
218
219

func CheckConnection() error {

220
221
222
223
224
225
	if client.connection != nil {
		if client.connection.IsClosed() {
			return fmt.Errorf("connection to broker is closed")
		}
	} else {
		return fmt.Errorf("connection is nil")
226
227
228
229
	}

	return nil
}
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247

func StartAMQP(AMQPurl string, api *gin.RouterGroup) error {
	if AMQPurl != "" {
		log.Println("Starting AMQP client")

		err := ConnectAMQP(AMQPurl)
		if err != nil {
			return err
		}

		// register IC action endpoint only if AMQP client is used
		RegisterAMQPEndpoint(api.Group("/ic"))

		log.Printf("Connected AMQP client to %s", AMQPurl)
	}

	return nil
}
248

249
func processMessage(message amqp.Delivery) error {
250
251
252
253

	var payload ICUpdate
	err := json.Unmarshal(message.Body, &payload)
	if err != nil {
254
		return fmt.Errorf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err)
255
256
	}

Sonja Happ's avatar
Sonja Happ committed
257
258
259
260
261
262
263
264
265
266
267
268
	headers := amqp.Table(message.Headers)
	ICUUID := fmt.Sprintf("%v", headers["uuid"])
	_, err = uuid.Parse(ICUUID)
	if err != nil {
		return fmt.Errorf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
	}

	if payload.Action != "" {
		// if a message contains an action, it is not intended for the backend
		log.Println("AMQP: Ignoring action message for action", payload.Action, ICUUID)
		return nil
	}
269

Sonja Happ's avatar
Sonja Happ committed
270
271
	var sToBeUpdated InfrastructureComponent
	err = sToBeUpdated.byUUID(ICUUID)
272

Sonja Happ's avatar
Sonja Happ committed
273
274
275
276
277
278
	if err == gorm.ErrRecordNotFound {
		// create new record
		err = createExternalIC(payload, ICUUID, message.Body)
	} else if err != nil {
		// database error
		err = fmt.Errorf("AMQP: Database error for IC %v DB error message: %v", ICUUID, err)
279
	} else {
Sonja Happ's avatar
Sonja Happ committed
280
281
		// update record based on payload
		err = sToBeUpdated.updateExternalIC(payload, message.Body)
282
	}
283

284
285
286
	return err
}

Sonja Happ's avatar
Sonja Happ committed
287
func createExternalIC(payload ICUpdate, ICUUID string, body []byte) error {
Sonja Happ's avatar
Sonja Happ committed
288

289
	var newICReq AddICRequest
290
	newICReq.InfrastructureComponent.UUID = ICUUID
Sonja Happ's avatar
Sonja Happ committed
291
292
293
	newICReq.InfrastructureComponent.Name = payload.Properties.Name
	newICReq.InfrastructureComponent.Category = payload.Properties.Category
	newICReq.InfrastructureComponent.Type = payload.Properties.Type
294
295

	// add optional params
Sonja Happ's avatar
Sonja Happ committed
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
	if payload.Status.State != "" {
		newICReq.InfrastructureComponent.State = payload.Status.State
	} else {
		newICReq.InfrastructureComponent.State = "unknown"
	}
	if newICReq.InfrastructureComponent.State == "gone" {
		// Check if state is "gone" and abort creation of IC in this case
		log.Println("AMQP: Aborting creation of IC with state gone")
		return nil
	}
	newICReq.InfrastructureComponent.Uptime = payload.Status.Uptime
	newICReq.InfrastructureComponent.WebsocketURL = payload.Properties.WS_url
	newICReq.InfrastructureComponent.APIURL = payload.Properties.API_url
	newICReq.InfrastructureComponent.Location = payload.Properties.Location
	newICReq.InfrastructureComponent.Description = payload.Properties.Description
311
312
	// set managed externally to true because this IC is created via AMQP
	newICReq.InfrastructureComponent.ManagedExternally = newTrue()
313
	// set raw status update if IC
Sonja Happ's avatar
Sonja Happ committed
314
315
316
	newICReq.InfrastructureComponent.StatusUpdateRaw = postgres.Jsonb{RawMessage: body}

	// TODO add JSON start parameter scheme
317

318
	// Validate the new IC
Sonja Happ's avatar
Sonja Happ committed
319
	err := newICReq.validate()
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
	if err != nil {
		return fmt.Errorf("AMQP: Validation of new IC failed: %v", err)
	}

	// Create the new IC
	newIC, err := newICReq.createIC(true)
	if err != nil {
		return fmt.Errorf("AMQP: Creating new IC failed: %v", err)
	}

	// save IC
	err = newIC.save()
	if err != nil {
		return fmt.Errorf("AMQP: Saving new IC to DB failed: %v", err)
	}

	log.Println("AMQP: Created IC with UUID ", newIC.UUID)
	return nil
}

Sonja Happ's avatar
Sonja Happ committed
340
func (s *InfrastructureComponent) updateExternalIC(payload ICUpdate, body []byte) error {
341
342
343

	var updatedICReq UpdateICRequest

Sonja Happ's avatar
Sonja Happ committed
344
345
346
347
348
349
350
351
352
353
354
	if payload.Status.State != "" {
		updatedICReq.InfrastructureComponent.State = payload.Status.State

		if updatedICReq.InfrastructureComponent.State == "gone" {
			// remove IC from DB
			log.Println("AMQP: Deleting IC with state gone", s.UUID)
			err := s.delete(true)
			if err != nil {
				// if component could not be deleted there are still configurations using it in the DB
				// continue with the update to save the new state of the component and get back to the deletion later
				log.Println(err)
Sonja Happ's avatar
Sonja Happ committed
355
			}
356
		}
Sonja Happ's avatar
Sonja Happ committed
357
358
359
360
361
362
363
364
365
366
367
	} else {
		updatedICReq.InfrastructureComponent.State = "unknown"
	}
	updatedICReq.InfrastructureComponent.Uptime = payload.Status.Uptime
	updatedICReq.InfrastructureComponent.Type = payload.Properties.Type
	updatedICReq.InfrastructureComponent.Category = payload.Properties.Category
	updatedICReq.InfrastructureComponent.Name = payload.Properties.Name
	updatedICReq.InfrastructureComponent.WebsocketURL = payload.Properties.WS_url
	updatedICReq.InfrastructureComponent.APIURL = payload.Properties.API_url
	updatedICReq.InfrastructureComponent.Location = payload.Properties.Location
	updatedICReq.InfrastructureComponent.Description = payload.Properties.Description
368
	// set raw status update if IC
Sonja Happ's avatar
Sonja Happ committed
369
	updatedICReq.InfrastructureComponent.StatusUpdateRaw = postgres.Jsonb{RawMessage: body}
370

371
372
373
	// TODO add JSON start parameter scheme

	// Validate the updated IC
Sonja Happ's avatar
Sonja Happ committed
374
	err := updatedICReq.validate()
375
376
377
378
379
380
381
382
383
384
385
386
387
388
	if err != nil {
		return fmt.Errorf("AMQP: Validation of updated IC failed: %v", err)
	}

	// Create the updated IC from old IC
	updatedIC := updatedICReq.updatedIC(*s)

	// Finally update the IC in the DB
	err = s.update(updatedIC)
	if err != nil {
		return fmt.Errorf("AMQP: Unable to update IC %v in DB: %v", s.Name, err)
	}

	log.Println("AMQP: Updated IC with UUID ", s.UUID)
389
390
	return err
}
391
392
393
394
395
396
397
398
399
400

func newTrue() *bool {
	b := true
	return &b
}

func newFalse() *bool {
	b := false
	return &b
}