diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index c0b06efe..1906d1e5 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -44,8 +44,8 @@ from ..data.feed import ( Feed, maybe_open_feed, ) +from ..ui._notify import notify_from_ems_status_msg from ..data.types import Struct -# from .._daemon import maybe_spawn_brokerd from . import _paper_engine as paper from ._messages import ( Order, @@ -557,7 +557,7 @@ class Router(Struct): sub_key: str, msg: dict, - ) -> None: + ) -> bool: to_remove: set[tractor.MsgStream] = set() if sub_key == 'all': @@ -567,9 +567,11 @@ class Router(Struct): else: subs = self.subscribers[sub_key] + sent_some: bool = False for client_stream in subs: try: await client_stream.send(msg) + sent_some = True except ( trio.ClosedResourceError, trio.BrokenResourceError, @@ -582,6 +584,8 @@ class Router(Struct): if to_remove: subs.difference_update(to_remove) + return sent_some + _router: Router = None @@ -760,13 +764,8 @@ async def translate_and_relay_brokerd_events( # TODO: maybe pack this into a composite type that # contains both the IPC stream as well the # msg-chain/dialog. - ems_client_order_streams = router.get_subs(oid) status_msg = book._active.get(oid) - - if ( - not ems_client_order_streams - or not status_msg - ): + if not status_msg: log.warning( f'Received status for untracked dialog {oid}:\n' f'{fmsg}' @@ -788,10 +787,20 @@ async def translate_and_relay_brokerd_events( status_msg.brokerd_msg = msg status_msg.src = msg.broker_details['name'] - await router.client_broadcast( + sent_some = await router.client_broadcast( status_msg.req.symbol, status_msg, ) + if not sent_some: + log.info( + 'No clients attached, firing notification for msg:\n' + f'{fmsg}' + ) + await notify_from_ems_status_msg( + oid, + status_msg, + is_subproc=True, + ) if status == 'closed': log.info(f'Execution for {oid} is complete!')