Skip to content
Snippets Groups Projects
Commit 2520ff39 authored by Anil Riza Bektas's avatar Anil Riza Bektas
Browse files

fixing bugs when sending events, connecting to exchange with corrent name,...

fixing bugs when sending events, connecting to exchange with corrent name, adding timestamp to eventMessage json
parent 6f02f06d
Branches version0.6
No related tags found
No related merge requests found
......@@ -19,8 +19,8 @@ class EventSystemConnector {
/// The [brokerInterface] is optional because it's only used for custom
/// events.
EventSystemConnector(this.s3iCore, {this.brokerInterface}) {
_eventBrokerConnector = getActiveBrokerEventConnector(s3iCore.authManager, args: {'exchangeType': ExchangeType.TOPIC} as Map<String,ExchangeType> );
_eventBrokerConnector
eventBrokerConnector = getActiveBrokerEventConnector(this.s3iCore.authManager, args: {'exchangeType': ExchangeType.TOPIC, "exchangeName": "eventExchange"});
eventBrokerConnector
..subscribeEventMessageReceived((EventMessage event) {
if (_eventCallbacks.containsKey(event.topic)) {
for (final Function(EventMessage event) callbackF
......@@ -60,13 +60,13 @@ class EventSystemConnector {
(S3IException exception) {};
/// The broker interface for communication with the eventExchange.
late final ActiveBrokerInterface _eventBrokerConnector;
late final ActiveBrokerInterface eventBrokerConnector;
/// Stores the topics of the subscribed events and the matching callbacks.
Map<String, List<Function(EventMessage event)>> _eventCallbacks =
<String, List<Function(EventMessage event)>>{};
/// The endpoint to which the [_eventBrokerConnector] is connected.
/// The endpoint to which the [eventBrokerConnector] is connected.
Endpoint? _endpoint;
/// Handles the complete process to receive custom events from an other thing.
......@@ -82,7 +82,7 @@ class EventSystemConnector {
/// [attributePaths]
/// - add the callback to [_eventCallbacks]
/// - create/bind queue to this topic and the event exchange via the REST API
/// - connect to the queue with the [_eventBrokerConnector]
/// - connect to the queue with the [eventBrokerConnector]
/// - send a [EventSubscriptionRequest] to the thing
Future<String> subscribeCustomEvent(String publisherThingId,
{required String publisherNormalQueue,
......@@ -120,7 +120,7 @@ class EventSystemConnector {
_eventCallbacks = events;
_endpoint = await s3iCore.createEventQueueBinding(
thisThingId, events.keys.toList());
await _eventBrokerConnector.startConsuming(_endpoint!.endpoint);
await eventBrokerConnector.startConsuming(_endpoint!.endpoint);
}
// TODO(poq): add method for unsubscribe specific events
......@@ -129,8 +129,14 @@ class EventSystemConnector {
/// deleted too (the REST-API creates queues with `auto_delete = true`).
void stopConsumingEvents() {
if (_endpoint != null) {
_eventBrokerConnector.stopConsuming(_endpoint!.endpoint);
eventBrokerConnector.stopConsuming(_endpoint!.endpoint);
_eventCallbacks.clear();
}
}
/// Publish event message via EventExchange
void publishEventMessage({required EventMessage eventMessage}) {
eventBrokerConnector.sendMessage(eventMessage, {eventMessage.topic});
}
}
......@@ -213,6 +213,7 @@ class EventMessage extends Message {
newJson[BrokerKeys.messageType] = BrokerKeys.eventMessage;
newJson[BrokerKeys.topic] = topic;
newJson[BrokerKeys.content] = content;
newJson[BrokerKeys.timestamp] = timestamp.millisecondsSinceEpoch;
return newJson;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment