Skip to content
Snippets Groups Projects

Add event handling

Merged Pascal Palenda requested to merge feature/event-handling into feature/pure-python
Files
3
import asyncio
import warnings
from typing import List, Optional, Tuple, Union
from threading import Thread
from typing import Callable, List, Optional, Tuple, Union
from grpclib.client import Channel
@@ -21,6 +22,9 @@ class VAInterface:
"""Initializes the VAInterface object"""
self._service: Optional[vanet.VaStub] = None
self._channel: Optional[Channel] = None
self._event_callbacks: List[Callable[[vanet.Event], None]] = []
self._event_thread: Optional[Thread] = None
self.__loop: Optional[asyncio.AbstractEventLoop] = None
def __del__(self):
"""Destructor for the VAInterface object
@@ -31,7 +35,7 @@ class VAInterface:
if self._channel:
self._channel.close()
def connect(self, host: str = "localhost", port: int = 12340, channel: Union[Channel, None] = None) -> None:
def connect(self, host: str = "localhost", port: int = 12340, channel: Union[Channel, None] = None, *, add_event_handling: bool = True) -> None:
"""Connects to the VA server
Args:
@@ -39,19 +43,37 @@ class VAInterface:
port: The port number of the VA server.
channel: The channel to use for the connection.
If not provided, a new channel will be created, this is the default and should be used in most cases.
add_event_handling: If True, VAPython will receive events from the VAServer.
If False, no events will be received and the server will not be able to determine that a client is connected.
However, not sending events can reduce network traffic.
"""
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
if channel:
self._channel = channel
self.loop = asyncio.get_running_loop()
self.__loop = asyncio.get_running_loop()
else:
self.loop = asyncio.get_event_loop()
self._channel = Channel(host=host, port=port, loop=self.loop)
self.__loop = asyncio.get_event_loop()
self._channel = Channel(host=host, port=port, loop=self._loop)
self._service = vanet.VaStub(self._channel)
#todo setup event handling
if add_event_handling:
# Event loop in separate thread, based on: https://gist.github.com/dmfigol/3e7d5b84a16d076df02baa9f53271058
async def handle_events():
async for event in self.service.attach_event_handler(vanet.betterproto_lib_google_protobuf.Empty()):
for event_callback in self._event_callbacks:
event_callback(event)
def background_event_tread(loop: asyncio.AbstractEventLoop) -> None:
asyncio.set_event_loop(loop)
loop.run_forever()
self._event_thread = Thread(target=background_event_tread, args=(self._loop,) , daemon=True)
self._event_thread.start()
asyncio.run_coroutine_threadsafe(handle_events(), self._loop)
def disconnect(self):
"""Disconnects from the VA server"""
@@ -91,7 +113,7 @@ class VAInterface:
"""
{% endif %}
return_value = self.loop.run_until_complete(
return_value = self._loop.run_until_complete(
self.service.{{ method.org_name }}(
vanet.{{ method.message_type }}(
{% for arg in method.args %}
@@ -138,4 +160,33 @@ class VAInterface:
msg = "Not connected to a server"
raise ValueError(msg)
return self._service
\ No newline at end of file
return self._service
@property
def _loop(self) -> asyncio.AbstractEventLoop:
"""The event loop
Returns:
The event loop.
"""
if not self.__loop:
msg = "Not connected to a server"
raise ValueError(msg)
return self.__loop
def attach_event_handler(self, callback: Callable[[vanet.Event], None]) -> None:
"""Attaches a callback function to the event handler
Args:
callback: The callback function to attach.
"""
self._event_callbacks.append(callback)
def detach_event_handler(self, callback: Callable[[vanet.Event], None]) -> None:
"""Detaches a callback function from the event handler
Args:
callback: The callback function to detach.
"""
self._event_callbacks.remove(callback)
\ No newline at end of file
Loading