diff --git a/tests/test_live_view.py b/tests/test_live_view.py index ab10cd1437626e21cc3f6253888f4229ebfb2400..e5245e5efe443b36a3af3a1970b5d23c5e9dd612 100644 --- a/tests/test_live_view.py +++ b/tests/test_live_view.py @@ -35,17 +35,45 @@ def advance_frames(view: LiveViewT, n_frames: int): view.fig.canvas.flush_events() -def stop_view(view): +def advance_until_stopped(*views): + to_advance = list(views) + while to_advance: + for view in to_advance: + try: + advance_frames(view, 1) + except AttributeError: + # event_source has been removed, meaning the animation was stopped + to_advance.remove(view) + except RuntimeError: + # thread did not join + continue + + +def stop_view(*views, in_process): + if len(views) != 1: + # need to hack a bit to get the linked views to stop + views[0].stop_event.set() + if not in_process and not is_using_mpl_gui_backend(views[0].fig): + advance_until_stopped(*views) + else: + time.sleep(100e-3) + try: - view.stop() - except RuntimeError: - # Thread did not join. Try to trigger event loop once. - advance_frames(view, 1) - view.stop() + views[0].stop() + except AttributeError: + # ??? + views[0].stop() -def close_figure(fig): - if not is_using_mpl_gui_backend(fig): +def close_view(*views, in_process=False): + if in_process: + views[0].close_event.set() + time.sleep(100e-3) + return + + stop_view(*views, in_process=in_process) + + if not is_using_mpl_gui_backend(fig := views[0].fig): # Need to manually trigger the close event because the event loop isn't running fig.canvas.callbacks.process('close_event', CloseEvent('close_event', fig.canvas)) @@ -53,11 +81,14 @@ def close_figure(fig): plt.close(fig) -def statement_timed_out(stmt: Callable[[], bool], timeout) -> bool: +def statement_timed_out(stmt: Callable[[], bool], obj, timeout) -> bool: try: with misc.timeout(timeout, raise_exc=True) as exceeded: while not stmt() and not exceeded: - time.sleep(10e-3) + if hasattr(obj, 'fig'): + obj.fig.canvas.start_event_loop(10e-3) + else: + time.sleep(10e-3) except TimeoutError: return True else: @@ -89,7 +120,7 @@ def started(spectrometer, plot_timetrace, in_process, in_gitlab_ci): if not in_process: for view in views: start_animation(view.fig) - advance_frames(view, random.randint(1, 10)) + advance_frames(view, random.randint(10, 20)) with misc.timeout(30, raise_exc=True) as exceeded: while not all(view.is_running() for view in views) and not exceeded: @@ -97,43 +128,25 @@ def started(spectrometer, plot_timetrace, in_process, in_gitlab_ci): yield views - for view in views: - view.stop() - if in_process: + stop_view(*views, in_process=in_process) + close_view(*views, in_process=in_process) + if in_process: + for view in views: view.process.terminate() - else: - close_figure(view.fig) @pytest.fixture def stopped(started: list[LiveViewT], in_process: bool): - view = random.choice(started) - stop_view(view) - - if not in_process and len(started) > 1: - advance_frames(started[1 - started.index(view)], 1) - - stopped = started - return stopped + random.shuffle(views := started) + stop_view(*views, in_process=in_process) + return views @pytest.fixture def closed(started: list[LiveViewT], in_process: bool): - view = random.choice(started) - - if in_process: - view.close_event.set() - else: - close_figure(view.fig) - - if not in_process and len(started) > 1: - advance_frames(started[1 - started.index(view)], 1) - else: - # Give the process time to process all events - time.sleep(250e-3) - - closed = started - yield closed + random.shuffle(views := started) + close_view(*views, in_process=in_process) + yield views def test_started(spectrometer, started, plot_timetrace, in_process): @@ -157,9 +170,9 @@ def test_started(spectrometer, started, plot_timetrace, in_process): assert isinstance(views[0], LiveViewBase.LiveViewProxy) for view in views: - assert not statement_timed_out(lambda: view.is_running() is True, timeout=1) - assert not statement_timed_out(lambda: view.data_thread.is_alive(), timeout=1) - assert not statement_timed_out(lambda: not view.stop_event.is_set(), timeout=1) + assert not statement_timed_out(lambda: view.is_running() is True, view, timeout=1) + assert not statement_timed_out(lambda: view.data_thread.is_alive(), view, timeout=1) + assert not statement_timed_out(lambda: not view.stop_event.is_set(), view, timeout=1) @pytest.mark.parametrize("view_state", ["stopped", "closed"]) @@ -167,9 +180,9 @@ def test_finished(spectrometer, view_state, request, plot_timetrace, in_process, """Test the state of view(s) and spectrometer after stopping and closing.""" views = request.getfixturevalue(view_state) - assert not spectrometer.acquiring + assert not statement_timed_out(lambda: not spectrometer.acquiring, spectrometer, timeout=1) for view in views: - assert not statement_timed_out(lambda: view.is_running() is None, timeout=10) - assert not statement_timed_out(lambda: not view.data_thread.is_alive(), timeout=10) - assert not statement_timed_out(lambda: view.stop_event.is_set(), timeout=10) + assert not statement_timed_out(lambda: view.is_running() is None, view, timeout=10) + assert not statement_timed_out(lambda: not view.data_thread.is_alive(), view, timeout=10) + assert not statement_timed_out(lambda: view.stop_event.is_set(), view, timeout=10)