Don't short circuit relay loop when headless
If no clients are connected we now process as normal and try to fire a desktop notification on linux.offline_dark_clearing
parent
d3abfce540
commit
b65c02336d
|
@ -44,8 +44,8 @@ from ..data.feed import (
|
||||||
Feed,
|
Feed,
|
||||||
maybe_open_feed,
|
maybe_open_feed,
|
||||||
)
|
)
|
||||||
|
from ..ui._notify import notify_from_ems_status_msg
|
||||||
from ..data.types import Struct
|
from ..data.types import Struct
|
||||||
# from .._daemon import maybe_spawn_brokerd
|
|
||||||
from . import _paper_engine as paper
|
from . import _paper_engine as paper
|
||||||
from ._messages import (
|
from ._messages import (
|
||||||
Order,
|
Order,
|
||||||
|
@ -557,7 +557,7 @@ class Router(Struct):
|
||||||
sub_key: str,
|
sub_key: str,
|
||||||
msg: dict,
|
msg: dict,
|
||||||
|
|
||||||
) -> None:
|
) -> bool:
|
||||||
to_remove: set[tractor.MsgStream] = set()
|
to_remove: set[tractor.MsgStream] = set()
|
||||||
|
|
||||||
if sub_key == 'all':
|
if sub_key == 'all':
|
||||||
|
@ -567,9 +567,11 @@ class Router(Struct):
|
||||||
else:
|
else:
|
||||||
subs = self.subscribers[sub_key]
|
subs = self.subscribers[sub_key]
|
||||||
|
|
||||||
|
sent_some: bool = False
|
||||||
for client_stream in subs:
|
for client_stream in subs:
|
||||||
try:
|
try:
|
||||||
await client_stream.send(msg)
|
await client_stream.send(msg)
|
||||||
|
sent_some = True
|
||||||
except (
|
except (
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
|
@ -582,6 +584,8 @@ class Router(Struct):
|
||||||
if to_remove:
|
if to_remove:
|
||||||
subs.difference_update(to_remove)
|
subs.difference_update(to_remove)
|
||||||
|
|
||||||
|
return sent_some
|
||||||
|
|
||||||
|
|
||||||
_router: Router = None
|
_router: Router = None
|
||||||
|
|
||||||
|
@ -760,13 +764,8 @@ async def translate_and_relay_brokerd_events(
|
||||||
# TODO: maybe pack this into a composite type that
|
# TODO: maybe pack this into a composite type that
|
||||||
# contains both the IPC stream as well the
|
# contains both the IPC stream as well the
|
||||||
# msg-chain/dialog.
|
# msg-chain/dialog.
|
||||||
ems_client_order_streams = router.get_subs(oid)
|
|
||||||
status_msg = book._active.get(oid)
|
status_msg = book._active.get(oid)
|
||||||
|
if not status_msg:
|
||||||
if (
|
|
||||||
not ems_client_order_streams
|
|
||||||
or not status_msg
|
|
||||||
):
|
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Received status for untracked dialog {oid}:\n'
|
f'Received status for untracked dialog {oid}:\n'
|
||||||
f'{fmsg}'
|
f'{fmsg}'
|
||||||
|
@ -788,10 +787,20 @@ async def translate_and_relay_brokerd_events(
|
||||||
status_msg.brokerd_msg = msg
|
status_msg.brokerd_msg = msg
|
||||||
status_msg.src = msg.broker_details['name']
|
status_msg.src = msg.broker_details['name']
|
||||||
|
|
||||||
await router.client_broadcast(
|
sent_some = await router.client_broadcast(
|
||||||
status_msg.req.symbol,
|
status_msg.req.symbol,
|
||||||
status_msg,
|
status_msg,
|
||||||
)
|
)
|
||||||
|
if not sent_some:
|
||||||
|
log.info(
|
||||||
|
'No clients attached, firing notification for msg:\n'
|
||||||
|
f'{fmsg}'
|
||||||
|
)
|
||||||
|
await notify_from_ems_status_msg(
|
||||||
|
oid,
|
||||||
|
status_msg,
|
||||||
|
is_subproc=True,
|
||||||
|
)
|
||||||
|
|
||||||
if status == 'closed':
|
if status == 'closed':
|
||||||
log.info(f'Execution for {oid} is complete!')
|
log.info(f'Execution for {oid} is complete!')
|
||||||
|
|
Loading…
Reference in New Issue