amqpclient.go 6.76 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** AMQP package, client.
*
* @author Sonja Happ <sonja.happ@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASweb-backend-go
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
22
package infrastructure_component
23
24
25
26

import (
	"encoding/json"
	"fmt"
27
	"github.com/gin-gonic/gin"
28
	"github.com/google/uuid"
29
	"github.com/jinzhu/gorm"
30
	"github.com/streadway/amqp"
31
	"log"
32
33
34
35
36
37
38
39
40
41
42
43
	"time"
)

const VILLAS_EXCHANGE = "villas"

type AMQPclient struct {
	connection *amqp.Connection
	channel    *amqp.Channel
	replies    <-chan amqp.Delivery
}

type Action struct {
44
45
46
47
48
49
50
51
52
53
54
55
	Act        string `json:"action"`
	When       int64  `json:"when"`
	Properties struct {
		UUID        *string `json:"uuid"`
		Name        *string `json:"name"`
		Category    *string `json:"category"`
		Type        *string `json:"type"`
		Location    *string `json:"location"`
		WS_url      *string `json:"ws_url"`
		API_url     *string `json:"api_url"`
		Description *string `json:"description"`
	} `json:"properties"`
56
57
}

Sonja Happ's avatar
Sonja Happ committed
58
59
60
61
62
63
64
65
66
67
68
69
70
type ICStatus struct {
	UUID        string   `json:"uuid"`
	State       *string  `json:"state"`
	Name        *string  `json:"name"`
	Category    *string  `json:"category"`
	Type        *string  `json:"type"`
	Location    *string  `json:"location"`
	WS_url      *string  `json:"ws_url"`
	API_url     *string  `json:"api_url"`
	Description *string  `json:"description"`
	Uptime      *float64 `json:"uptime"` // TODO check if data type of uptime is float64 or int
}

71
type ICUpdate struct {
Sonja Happ's avatar
Sonja Happ committed
72
	Status *ICStatus `json:"status"`
Sonja Happ's avatar
Sonja Happ committed
73
	// TODO add JSON start parameter scheme
74
75
}

76
77
78
79
80
81
82
83
84
var client AMQPclient

func ConnectAMQP(uri string) error {

	var err error

	// connect to broker
	client.connection, err = amqp.Dial(uri)
	if err != nil {
85
		return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v, error: %v", uri, err)
86
87
88
89
90
	}

	// create channel
	client.channel, err = client.connection.Channel()
	if err != nil {
91
		return fmt.Errorf("AMQP: failed to open a channel, error: %v", err)
92
93
94
95
96
97
98
99
100
101
	}
	// declare exchange
	err = client.channel.ExchangeDeclare(VILLAS_EXCHANGE,
		"headers",
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
102
		return fmt.Errorf("AMQP: failed to declare the exchange, error: %v", err)
103
104
	}

105
106
	// add a queue for the ICs
	ICQueue, err := client.channel.QueueDeclare("infrastructure_components",
107
108
109
110
111
112
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
113
		return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err)
114
115
	}

116
	err = client.channel.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil)
117
	if err != nil {
118
		return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err)
119
120
121
	}

	// consume deliveries
122
	client.replies, err = client.channel.Consume(ICQueue.Name,
123
		"",
124
		true,
125
126
127
128
129
		false,
		false,
		false,
		nil)
	if err != nil {
130
		return fmt.Errorf("AMQP: failed to consume deliveries, error: %v", err)
131
132
133
134
	}

	// consuming queue
	go func() {
135
136
		for {
			for message := range client.replies {
137
138
139
140
				err = processMessage(message)
				if err != nil {
					log.Println(err.Error())
				}
141
			}
142
			time.Sleep(2) // sleep for 2 sek
143
144
145
		}
	}()

146
147
	log.Printf(" AMQP: Waiting for messages... ")

148
149
150
	return nil
}

151
func sendActionAMQP(action Action) error {
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166

	payload, err := json.Marshal(action)
	if err != nil {
		return err
	}

	msg := amqp.Publishing{
		DeliveryMode:    2,
		Timestamp:       time.Now(),
		ContentType:     "application/json",
		ContentEncoding: "utf-8",
		Priority:        0,
		Body:            payload,
	}

167
168
169
170
171
172
173
174
175
176
177
178
179
180
	// set message headers
	var headers map[string]interface{}
	headers = make(map[string]interface{}) // empty map
	if action.Properties.UUID != nil {
		headers["uuid"] = *action.Properties.UUID
	}
	if action.Properties.Type != nil {
		headers["type"] = *action.Properties.Type
	}
	if action.Properties.Category != nil {
		headers["category"] = *action.Properties.Category
	}
	msg.Headers = headers

Sonja Happ's avatar
Sonja Happ committed
181
182
183
	err = CheckConnection()
	if err != nil {
		return err
184
185
	}

Sonja Happ's avatar
Sonja Happ committed
186
	log.Println("AMQP: Sending message", string(msg.Body))
187
188
189
190
191
192
193
194
195
196
	err = client.channel.Publish(VILLAS_EXCHANGE,
		"",
		false,
		false,
		msg)
	return err

}

func PingAMQP() error {
197
	log.Println("AMQP: sending ping command to all ICs")
198
199
200

	var a Action
	a.Act = "ping"
201
	*a.Properties.UUID = ""
202

203
	err := sendActionAMQP(a)
204
205
	return err
}
206
207
208

func CheckConnection() error {

209
210
211
212
213
214
	if client.connection != nil {
		if client.connection.IsClosed() {
			return fmt.Errorf("connection to broker is closed")
		}
	} else {
		return fmt.Errorf("connection is nil")
215
216
217
218
	}

	return nil
}
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238

func StartAMQP(AMQPurl string, api *gin.RouterGroup) error {
	if AMQPurl != "" {
		log.Println("Starting AMQP client")

		err := ConnectAMQP(AMQPurl)
		if err != nil {
			return err
		}

		// register IC action endpoint only if AMQP client is used
		RegisterAMQPEndpoint(api.Group("/ic"))

		// Periodically call the Ping function to check which ICs are still there
		ticker := time.NewTicker(10 * time.Second)
		go func() {

			for {
				select {
				case <-ticker.C:
239
240
241
242
243
244
245
					//TODO Add a useful regular event here
					/*
						err = PingAMQP()
						if err != nil {
							log.Println("AMQP Error: ", err.Error())
						}
					*/
246
247
248
249
250
251
252
253
254
255
				}
			}

		}()

		log.Printf("Connected AMQP client to %s", AMQPurl)
	}

	return nil
}
256

257
func processMessage(message amqp.Delivery) error {
258

Sonja Happ's avatar
Sonja Happ committed
259
	log.Println("Processing AMQP message: ", string(message.Body))
Sonja Happ's avatar
Sonja Happ committed
260

261
262
263
	var payload ICUpdate
	err := json.Unmarshal(message.Body, &payload)
	if err != nil {
264
		return fmt.Errorf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err)
265
266
	}

267
	if payload.Status != nil {
268
		// if a message contains a "state" field, it is an update for an IC
269
		ICUUID := payload.Status.UUID
270
		_, err = uuid.Parse(ICUUID)
271

272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
		if err != nil {
			return fmt.Errorf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
		}
		var sToBeUpdated InfrastructureComponent
		err = sToBeUpdated.ByUUID(ICUUID)

		if err == gorm.ErrRecordNotFound {
			// create new record
			err = createNewICviaAMQP(payload)
		} else if err != nil {
			// database error
			err = fmt.Errorf("AMQP: Database error for IC %v DB error message: %v", ICUUID, err)
		} else {
			// update record based on payload
			err = sToBeUpdated.updateICviaAMQP(payload)
		}
288
289
290
	}
	return err
}