amqpclient.go 6.55 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** AMQP package, client.
*
* @author Sonja Happ <sonja.happ@eonerc.rwth-aachen.de>
* @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
* @license GNU General Public License (version 3)
*
* VILLASweb-backend-go
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*********************************************************************************/
22
package infrastructure_component
23
24
25
26

import (
	"encoding/json"
	"fmt"
27
	"github.com/gin-gonic/gin"
28
	"github.com/google/uuid"
29
	"github.com/jinzhu/gorm"
30
	"github.com/streadway/amqp"
31
	"log"
32
33
34
35
36
37
38
39
40
41
42
43
	"time"
)

const VILLAS_EXCHANGE = "villas"

type AMQPclient struct {
	connection *amqp.Connection
	channel    *amqp.Channel
	replies    <-chan amqp.Delivery
}

type Action struct {
44
45
46
47
48
49
50
51
52
53
54
55
	Act        string `json:"action"`
	When       int64  `json:"when"`
	Properties struct {
		UUID        *string `json:"uuid"`
		Name        *string `json:"name"`
		Category    *string `json:"category"`
		Type        *string `json:"type"`
		Location    *string `json:"location"`
		WS_url      *string `json:"ws_url"`
		API_url     *string `json:"api_url"`
		Description *string `json:"description"`
	} `json:"properties"`
56
57
}

58
type ICUpdate struct {
59
	State      *string `json:"state"`
60
	Properties struct {
Sonja Happ's avatar
Sonja Happ committed
61
62
63
64
65
66
67
68
		UUID        string  `json:"uuid"`
		Name        *string `json:"name"`
		Category    *string `json:"category"`
		Type        *string `json:"type"`
		Location    *string `json:"location"`
		WS_url      *string `json:"ws_url"`
		API_url     *string `json:"api_url"`
		Description *string `json:"description"`
69
	} `json:"properties"`
Sonja Happ's avatar
Sonja Happ committed
70
	// TODO add JSON start parameter scheme
71
72
}

73
74
75
76
77
78
79
80
81
var client AMQPclient

func ConnectAMQP(uri string) error {

	var err error

	// connect to broker
	client.connection, err = amqp.Dial(uri)
	if err != nil {
82
		return fmt.Errorf("AMQP: failed to connect to RabbitMQ broker %v, error: %v", uri, err)
83
84
85
86
87
	}

	// create channel
	client.channel, err = client.connection.Channel()
	if err != nil {
88
		return fmt.Errorf("AMQP: failed to open a channel, error: %v", err)
89
90
91
92
93
94
95
96
97
98
	}
	// declare exchange
	err = client.channel.ExchangeDeclare(VILLAS_EXCHANGE,
		"headers",
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
99
		return fmt.Errorf("AMQP: failed to declare the exchange, error: %v", err)
100
101
	}

102
103
	// add a queue for the ICs
	ICQueue, err := client.channel.QueueDeclare("infrastructure_components",
104
105
106
107
108
109
		true,
		false,
		false,
		false,
		nil)
	if err != nil {
110
		return fmt.Errorf("AMQP: failed to declare the queue, error: %v", err)
111
112
	}

113
	err = client.channel.QueueBind(ICQueue.Name, "", VILLAS_EXCHANGE, false, nil)
114
	if err != nil {
115
		return fmt.Errorf("AMQP: failed to bind the queue, error: %v", err)
116
117
118
	}

	// consume deliveries
119
	client.replies, err = client.channel.Consume(ICQueue.Name,
120
		"",
121
		true,
122
123
124
125
126
		false,
		false,
		false,
		nil)
	if err != nil {
127
		return fmt.Errorf("AMQP: failed to consume deliveries, error: %v", err)
128
129
130
131
	}

	// consuming queue
	go func() {
132
133
		for {
			for message := range client.replies {
134
135
136
137
				err = processMessage(message)
				if err != nil {
					log.Println(err.Error())
				}
138
			}
139
			time.Sleep(2) // sleep for 2 sek
140
141
142
		}
	}()

143
144
	log.Printf(" AMQP: Waiting for messages... ")

145
146
147
	return nil
}

148
func sendActionAMQP(action Action) error {
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163

	payload, err := json.Marshal(action)
	if err != nil {
		return err
	}

	msg := amqp.Publishing{
		DeliveryMode:    2,
		Timestamp:       time.Now(),
		ContentType:     "application/json",
		ContentEncoding: "utf-8",
		Priority:        0,
		Body:            payload,
	}

164
165
166
167
168
169
170
171
172
173
174
175
176
177
	// set message headers
	var headers map[string]interface{}
	headers = make(map[string]interface{}) // empty map
	if action.Properties.UUID != nil {
		headers["uuid"] = *action.Properties.UUID
	}
	if action.Properties.Type != nil {
		headers["type"] = *action.Properties.Type
	}
	if action.Properties.Category != nil {
		headers["category"] = *action.Properties.Category
	}
	msg.Headers = headers

Sonja Happ's avatar
Sonja Happ committed
178
179
180
	err = CheckConnection()
	if err != nil {
		return err
181
182
	}

Sonja Happ's avatar
Sonja Happ committed
183
	log.Println("AMQP: Sending message", string(msg.Body))
184
185
186
187
188
189
190
191
192
193
	err = client.channel.Publish(VILLAS_EXCHANGE,
		"",
		false,
		false,
		msg)
	return err

}

func PingAMQP() error {
194
	log.Println("AMQP: sending ping command to all ICs")
195
196
197

	var a Action
	a.Act = "ping"
198
	*a.Properties.UUID = ""
199

200
	err := sendActionAMQP(a)
201
202
	return err
}
203
204
205

func CheckConnection() error {

206
207
208
209
210
211
	if client.connection != nil {
		if client.connection.IsClosed() {
			return fmt.Errorf("connection to broker is closed")
		}
	} else {
		return fmt.Errorf("connection is nil")
212
213
214
215
	}

	return nil
}
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235

func StartAMQP(AMQPurl string, api *gin.RouterGroup) error {
	if AMQPurl != "" {
		log.Println("Starting AMQP client")

		err := ConnectAMQP(AMQPurl)
		if err != nil {
			return err
		}

		// register IC action endpoint only if AMQP client is used
		RegisterAMQPEndpoint(api.Group("/ic"))

		// Periodically call the Ping function to check which ICs are still there
		ticker := time.NewTicker(10 * time.Second)
		go func() {

			for {
				select {
				case <-ticker.C:
236
237
238
239
240
241
242
					//TODO Add a useful regular event here
					/*
						err = PingAMQP()
						if err != nil {
							log.Println("AMQP Error: ", err.Error())
						}
					*/
243
244
245
246
247
248
249
250
251
252
				}
			}

		}()

		log.Printf("Connected AMQP client to %s", AMQPurl)
	}

	return nil
}
253

254
func processMessage(message amqp.Delivery) error {
255

Sonja Happ's avatar
Sonja Happ committed
256
	log.Println("Processing AMQP message: ", string(message.Body))
Sonja Happ's avatar
Sonja Happ committed
257

258
259
260
	var payload ICUpdate
	err := json.Unmarshal(message.Body, &payload)
	if err != nil {
261
		return fmt.Errorf("AMQP: Could not unmarshal message to JSON: %v err: %v", string(message.Body), err)
262
263
264
265
266
267
	}

	ICUUID := payload.Properties.UUID
	_, err = uuid.Parse(ICUUID)

	if err != nil {
268
		return fmt.Errorf("AMQP: UUID not valid: %v, message ignored: %v \n", ICUUID, string(message.Body))
269
270
	}

271
	var sToBeUpdated InfrastructureComponent
272
273
274
275
	err = sToBeUpdated.ByUUID(ICUUID)

	if err == gorm.ErrRecordNotFound {
		// create new record
276
		err = createNewICviaAMQP(payload)
277
278
279
280
	} else if err != nil {
		// database error
		err = fmt.Errorf("AMQP: Database error for IC %v DB error message: %v", ICUUID, err)
	} else {
281
282
		// update record based on payload
		err = sToBeUpdated.updateICviaAMQP(payload)
283
	}
284

285
286
	return err
}