storage.go 12.8 KB
Newer Older
Stefan Dähling's avatar
Stefan Dähling committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/*
Copyright 2020 Institute for Automation of Complex Power Systems,
E.ON Energy Research Center, RWTH Aachen University

This project is licensed under either of
- Apache License, Version 2.0
- MIT License
at your option.

Apache License, Version 2.0:

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

MIT License:

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/

package logger

import (
	"errors"
	"sort"
	"sync"
	"time"
52
53

	"git.rwth-aachen.de/acs/public/cloud/mas/clonemap/pkg/schemas"
Stefan Dähling's avatar
Stefan Dähling committed
54
55
56
57
58
)

// storage is the interface for logging and state storage (either local or in db)
type storage interface {
	// addAgentLogMessage adds an entry to specified logging entry
Stefan Dähling's avatar
Stefan Dähling committed
59
	addAgentLogMessage(log schemas.LogMessage) (err error)
Stefan Dähling's avatar
Stefan Dähling committed
60
61

	// getLatestAgentLogMessages return the latest num log messages
Stefan Dähling's avatar
Stefan Dähling committed
62
	getLatestAgentLogMessages(masID int, agentID int, topic string,
Stefan Dähling's avatar
Stefan Dähling committed
63
		num int) (logs []schemas.LogMessage, err error)
Stefan Dähling's avatar
Stefan Dähling committed
64
65

	// getAgentLogMessagesInRange return the log messages in the specified time range
Stefan Dähling's avatar
Stefan Dähling committed
66
	getAgentLogMessagesInRange(masID int, agentID int, topic string, start time.Time,
Stefan Dähling's avatar
Stefan Dähling committed
67
		end time.Time) (logs []schemas.LogMessage, err error)
Stefan Dähling's avatar
Stefan Dähling committed
68
69
70
71
72
73
74
75
76
77
78

	// deleteAgentLogMessages deletes all log messages og an agent
	deleteAgentLogMessages(masID int, agentID int) (err error)

	// updateCommunication updates communication data
	updateCommunication(masID int, agentID int, commData []schemas.Communication) (err error)

	// getCommunication returns communication data
	getCommunication(masID int, agentID int) (commData []schemas.Communication, err error)

	// updateAgentState updates the agent status
Stefan Dähling's avatar
Stefan Dähling committed
79
	updateAgentState(masID int, agentID int, state schemas.State) (err error)
Stefan Dähling's avatar
Stefan Dähling committed
80
81

	// getAgentState return the latest agent status
Stefan Dähling's avatar
Stefan Dähling committed
82
	getAgentState(masID int, agentID int) (state schemas.State, err error)
Stefan Dähling's avatar
Stefan Dähling committed
83
84
85
86
87
88

	// deleteAgentState deletes the status of an agent
	deleteAgentState(masID int, agentID int) (err error)
}

// LogMessage contains the content of a single log message
Stefan Dähling's avatar
Stefan Dähling committed
89
90
91
92
93
// type LogMessage struct {
// 	Timestamp time.Time
// 	Message   string
// 	Data      string
// }
Stefan Dähling's avatar
Stefan Dähling committed
94
95
96
97
98
99
100
101
102
103
104
105

// IDs of agents and mas correspond to index in slices!
type localStorage struct {
	mas   []masStorage
	mutex *sync.Mutex
}

type masStorage struct {
	agents []agentStorage
}

type agentStorage struct {
Stefan Dähling's avatar
Stefan Dähling committed
106
107
108
109
110
111
	errLogs  []schemas.LogMessage
	dbgLogs  []schemas.LogMessage
	msgLogs  []schemas.LogMessage
	statLogs []schemas.LogMessage
	appLogs  []schemas.LogMessage
	state    schemas.State
Stefan Dähling's avatar
Stefan Dähling committed
112
113
114
115
	commData []schemas.Communication
}

// addAgentLogMessage adds an entry to specified logging entry
Stefan Dähling's avatar
Stefan Dähling committed
116
func (stor *localStorage) addAgentLogMessage(log schemas.LogMessage) (err error) {
Stefan Dähling's avatar
Stefan Dähling committed
117
118
	stor.mutex.Lock()
	numMAS := len(stor.mas)
Stefan Dähling's avatar
Stefan Dähling committed
119
120
	if numMAS <= log.MASID {
		for i := 0; i < log.MASID-numMAS+1; i++ {
Stefan Dähling's avatar
Stefan Dähling committed
121
122
123
			stor.mas = append(stor.mas, masStorage{})
		}
	}
Stefan Dähling's avatar
Stefan Dähling committed
124
125
126
127
	numAgents := len(stor.mas[log.MASID].agents)
	if numAgents <= log.AgentID {
		for i := 0; i < log.AgentID-numAgents+1; i++ {
			stor.mas[log.MASID].agents = append(stor.mas[log.MASID].agents, agentStorage{})
Stefan Dähling's avatar
Stefan Dähling committed
128
129
		}
	}
Stefan Dähling's avatar
Stefan Dähling committed
130
	switch log.Topic {
Stefan Dähling's avatar
Stefan Dähling committed
131
	case "error":
Stefan Dähling's avatar
Stefan Dähling committed
132
		stor.mas[log.MASID].agents[log.AgentID].errLogs = append(stor.mas[log.MASID].agents[log.AgentID].errLogs,
Stefan Dähling's avatar
Stefan Dähling committed
133
134
			log)
	case "debug":
Stefan Dähling's avatar
Stefan Dähling committed
135
		stor.mas[log.MASID].agents[log.AgentID].dbgLogs = append(stor.mas[log.MASID].agents[log.AgentID].dbgLogs,
Stefan Dähling's avatar
Stefan Dähling committed
136
137
			log)
	case "msg":
Stefan Dähling's avatar
Stefan Dähling committed
138
		stor.mas[log.MASID].agents[log.AgentID].msgLogs = append(stor.mas[log.MASID].agents[log.AgentID].msgLogs,
Stefan Dähling's avatar
Stefan Dähling committed
139
140
			log)
	case "status":
Stefan Dähling's avatar
Stefan Dähling committed
141
		stor.mas[log.MASID].agents[log.AgentID].statLogs = append(stor.mas[log.MASID].agents[log.AgentID].statLogs,
Stefan Dähling's avatar
Stefan Dähling committed
142
143
			log)
	case "app":
Stefan Dähling's avatar
Stefan Dähling committed
144
		stor.mas[log.MASID].agents[log.AgentID].appLogs = append(stor.mas[log.MASID].agents[log.AgentID].appLogs,
Stefan Dähling's avatar
Stefan Dähling committed
145
146
			log)
	default:
147
		err = errors.New("wrong topic")
Stefan Dähling's avatar
Stefan Dähling committed
148
149
150
151
152
153
	}
	stor.mutex.Unlock()
	return
}

// getLatestAgentLogMessages return the latest num log messages
Stefan Dähling's avatar
Stefan Dähling committed
154
func (stor *localStorage) getLatestAgentLogMessages(masID int, agentID int, topic string,
Stefan Dähling's avatar
Stefan Dähling committed
155
	num int) (logs []schemas.LogMessage, err error) {
Stefan Dähling's avatar
Stefan Dähling committed
156
157
158
	stor.mutex.Lock()
	if masID < len(stor.mas) {
		if agentID < len(stor.mas[masID].agents) {
Stefan Dähling's avatar
Stefan Dähling committed
159
			switch topic {
Stefan Dähling's avatar
Stefan Dähling committed
160
161
162
163
164
			case "error":
				length := len(stor.mas[masID].agents[agentID].errLogs)
				if length < num {
					num = length
				}
165
				logs = make([]schemas.LogMessage, num)
Stefan Dähling's avatar
Stefan Dähling committed
166
167
168
169
170
171
				copy(logs, stor.mas[masID].agents[agentID].errLogs[length-num:length])
			case "debug":
				length := len(stor.mas[masID].agents[agentID].dbgLogs)
				if length < num {
					num = length
				}
172
				logs = make([]schemas.LogMessage, num)
Stefan Dähling's avatar
Stefan Dähling committed
173
174
175
176
177
178
				copy(logs, stor.mas[masID].agents[agentID].dbgLogs[length-num:length])
			case "msg":
				length := len(stor.mas[masID].agents[agentID].msgLogs)
				if length < num {
					num = length
				}
179
				logs = make([]schemas.LogMessage, num)
Stefan Dähling's avatar
Stefan Dähling committed
180
181
182
183
184
185
				copy(logs, stor.mas[masID].agents[agentID].msgLogs[length-num:length])
			case "status":
				length := len(stor.mas[masID].agents[agentID].statLogs)
				if length < num {
					num = length
				}
186
				logs = make([]schemas.LogMessage, num)
Stefan Dähling's avatar
Stefan Dähling committed
187
188
189
190
191
192
				copy(logs, stor.mas[masID].agents[agentID].statLogs[length-num:length])
			case "app":
				length := len(stor.mas[masID].agents[agentID].appLogs)
				if length < num {
					num = length
				}
193
				logs = make([]schemas.LogMessage, num)
Stefan Dähling's avatar
Stefan Dähling committed
194
195
				copy(logs, stor.mas[masID].agents[agentID].appLogs[length-num:length])
			default:
196
				err = errors.New("wrong topic")
Stefan Dähling's avatar
Stefan Dähling committed
197
198
199
200
201
202
203
204
			}
		}
	}
	stor.mutex.Unlock()
	return
}

// getAgentLogMessagesInRange return the log messages in the specified time range
Stefan Dähling's avatar
Stefan Dähling committed
205
func (stor *localStorage) getAgentLogMessagesInRange(masID int, agentID int, topic string,
Stefan Dähling's avatar
Stefan Dähling committed
206
	start time.Time, end time.Time) (logs []schemas.LogMessage, err error) {
Stefan Dähling's avatar
Stefan Dähling committed
207
208
209
	stor.mutex.Lock()
	if masID < len(stor.mas) {
		if agentID < len(stor.mas[masID].agents) {
Stefan Dähling's avatar
Stefan Dähling committed
210
			switch topic {
Stefan Dähling's avatar
Stefan Dähling committed
211
212
213
214
215
			case "error":
				length := len(stor.mas[masID].agents[agentID].errLogs)
				if length > 0 {
					startIndex := sort.Search(length,
						func(i int) bool {
Stefan Dähling's avatar
Stefan Dähling committed
216
							return stor.mas[masID].agents[agentID].errLogs[i].Timestamp.After(start)
Stefan Dähling's avatar
Stefan Dähling committed
217
218
219
						})
					endIndex := sort.Search(length,
						func(i int) bool {
Stefan Dähling's avatar
Stefan Dähling committed
220
							return stor.mas[masID].agents[agentID].errLogs[i].Timestamp.After(end)
Stefan Dähling's avatar
Stefan Dähling committed
221
						})
Stefan Dähling's avatar
Stefan Dähling committed
222
					if endIndex-startIndex >= 0 {
223
						logs = make([]schemas.LogMessage, endIndex-startIndex)
Stefan Dähling's avatar
Stefan Dähling committed
224
225
226
227
228
229
230
231
						copy(logs, stor.mas[masID].agents[agentID].errLogs[startIndex:endIndex])
					}
				}
			case "debug":
				length := len(stor.mas[masID].agents[agentID].dbgLogs)
				if length > 0 {
					startIndex := sort.Search(length,
						func(i int) bool {
Stefan Dähling's avatar
Stefan Dähling committed
232
							return stor.mas[masID].agents[agentID].dbgLogs[i].Timestamp.After(start)
Stefan Dähling's avatar
Stefan Dähling committed
233
234
235
						})
					endIndex := sort.Search(length,
						func(i int) bool {
Stefan Dähling's avatar
Stefan Dähling committed
236
							return stor.mas[masID].agents[agentID].dbgLogs[i].Timestamp.After(end)
Stefan Dähling's avatar
Stefan Dähling committed
237
						})
Stefan Dähling's avatar
Stefan Dähling committed
238
					if endIndex-startIndex >= 0 {
239
						logs = make([]schemas.LogMessage, endIndex-startIndex)
Stefan Dähling's avatar
Stefan Dähling committed
240
241
242
243
244
245
246
247
						copy(logs, stor.mas[masID].agents[agentID].dbgLogs[startIndex:endIndex])
					}
				}
			case "msg":
				length := len(stor.mas[masID].agents[agentID].msgLogs)
				if length > 0 {
					startIndex := sort.Search(length,
						func(i int) bool {
Stefan Dähling's avatar
Stefan Dähling committed
248
							return stor.mas[masID].agents[agentID].msgLogs[i].Timestamp.After(start)
Stefan Dähling's avatar
Stefan Dähling committed
249
250
251
						})
					endIndex := sort.Search(length,
						func(i int) bool {
Stefan Dähling's avatar
Stefan Dähling committed
252
							return stor.mas[masID].agents[agentID].msgLogs[i].Timestamp.After(end)
Stefan Dähling's avatar
Stefan Dähling committed
253
						})
Stefan Dähling's avatar
Stefan Dähling committed
254
					if endIndex-startIndex >= 0 {
255
						logs = make([]schemas.LogMessage, endIndex-startIndex)
Stefan Dähling's avatar
Stefan Dähling committed
256
257
258
259
260
261
262
263
						copy(logs, stor.mas[masID].agents[agentID].msgLogs[startIndex:endIndex])
					}
				}
			case "status":
				length := len(stor.mas[masID].agents[agentID].statLogs)
				if length > 0 {
					startIndex := sort.Search(length,
						func(i int) bool {
Stefan Dähling's avatar
Stefan Dähling committed
264
265
							return stor.mas[masID].agents[agentID].statLogs[i].Timestamp.
								After(start)
Stefan Dähling's avatar
Stefan Dähling committed
266
267
268
						})
					endIndex := sort.Search(length,
						func(i int) bool {
Stefan Dähling's avatar
Stefan Dähling committed
269
							return stor.mas[masID].agents[agentID].statLogs[i].Timestamp.After(end)
Stefan Dähling's avatar
Stefan Dähling committed
270
						})
Stefan Dähling's avatar
Stefan Dähling committed
271
					if endIndex-startIndex >= 0 {
272
						logs = make([]schemas.LogMessage, endIndex-startIndex)
Stefan Dähling's avatar
Stefan Dähling committed
273
274
275
276
277
278
279
280
281
282
283
284
285
286
						copy(logs, stor.mas[masID].agents[agentID].statLogs[startIndex:endIndex])
					}
				}
			case "app":
				length := len(stor.mas[masID].agents[agentID].appLogs)
				if length > 0 {
					startIndex := sort.Search(length,
						func(i int) bool {
							return stor.mas[masID].agents[agentID].appLogs[i].Timestamp.After(start)
						})
					endIndex := sort.Search(length,
						func(i int) bool {
							return stor.mas[masID].agents[agentID].appLogs[i].Timestamp.After(end)
						})
Stefan Dähling's avatar
Stefan Dähling committed
287
					if endIndex-startIndex >= 0 {
288
						logs = make([]schemas.LogMessage, endIndex-startIndex)
Stefan Dähling's avatar
Stefan Dähling committed
289
290
291
292
						copy(logs, stor.mas[masID].agents[agentID].appLogs[startIndex:endIndex])
					}
				}
			default:
293
				err = errors.New("wrong topic")
Stefan Dähling's avatar
Stefan Dähling committed
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
			}
		}
	}
	stor.mutex.Unlock()
	return
}

// deleteAgentLogMessages deletes all log messages og an agent
func (stor *localStorage) deleteAgentLogMessages(masID int, agentID int) (err error) {
	stor.mutex.Lock()
	if masID < len(stor.mas) {
		if agentID < len(stor.mas[masID].agents) {
			stor.mas[masID].agents[agentID].errLogs = nil
			stor.mas[masID].agents[agentID].dbgLogs = nil
			stor.mas[masID].agents[agentID].msgLogs = nil
			stor.mas[masID].agents[agentID].statLogs = nil
			stor.mas[masID].agents[agentID].appLogs = nil
		}
	}
	stor.mutex.Unlock()
	return
}

// updateCommunication updates communication data
func (stor *localStorage) updateCommunication(masID int, agentID int,
	commData []schemas.Communication) (err error) {
	stor.mutex.Lock()
	numMAS := len(stor.mas)
	if numMAS <= masID {
		for i := 0; i < masID-numMAS+1; i++ {
			stor.mas = append(stor.mas, masStorage{})
		}
	}
	numAgents := len(stor.mas[masID].agents)
	if numAgents <= agentID {
		for i := 0; i < agentID-numAgents+1; i++ {
			stor.mas[i].agents = append(stor.mas[i].agents, agentStorage{})
		}
	}
	stor.mas[masID].agents[agentID].commData = commData
	stor.mutex.Unlock()
	return
}

// getCommunication returns communication data
func (stor *localStorage) getCommunication(masID int,
	agentID int) (commData []schemas.Communication, err error) {
	stor.mutex.Lock()
	if masID < len(stor.mas) {
		if agentID < len(stor.mas[masID].agents) {
			commData = stor.mas[masID].agents[agentID].commData
		}
	}
	stor.mutex.Unlock()
	return
}

// updateAgentState updates the agent status
Stefan Dähling's avatar
Stefan Dähling committed
352
func (stor *localStorage) updateAgentState(masID int, agentID int, state schemas.State) (err error) {
Stefan Dähling's avatar
Stefan Dähling committed
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
	stor.mutex.Lock()
	numMAS := len(stor.mas)
	if numMAS <= masID {
		for i := 0; i < masID-numMAS+1; i++ {
			stor.mas = append(stor.mas, masStorage{})
		}
	}
	numAgents := len(stor.mas[masID].agents)
	if numAgents <= agentID {
		for i := 0; i < agentID-numAgents+1; i++ {
			stor.mas[i].agents = append(stor.mas[i].agents, agentStorage{})
		}
	}
	stor.mas[masID].agents[agentID].state = state
	stor.mutex.Unlock()
	return
}

// getAgentState return the latest agent status
Stefan Dähling's avatar
Stefan Dähling committed
372
func (stor *localStorage) getAgentState(masID int, agentID int) (state schemas.State, err error) {
Stefan Dähling's avatar
Stefan Dähling committed
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
	stor.mutex.Lock()
	if masID < len(stor.mas) {
		if agentID < len(stor.mas[masID].agents) {
			state = stor.mas[masID].agents[agentID].state
		}
	}
	stor.mutex.Unlock()
	return
}

// deleteAgentState deletes the status of an agent
func (stor *localStorage) deleteAgentState(masID int, agentID int) (err error) {
	stor.mutex.Lock()
	if masID < len(stor.mas) {
		if agentID < len(stor.mas[masID].agents) {
Stefan Dähling's avatar
Stefan Dähling committed
388
			stor.mas[masID].agents[agentID].state = schemas.State{}
Stefan Dähling's avatar
Stefan Dähling committed
389
390
391
392
393
394
395
396
397
398
399
400
		}
	}
	stor.mutex.Unlock()
	return
}

// newLocalStorage returns Storage interface with localStorage type
func newLocalStorage() storage {
	var temp localStorage
	temp.mutex = &sync.Mutex{}
	return &temp
}