diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 598f4f54..6138086c 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -137,8 +137,9 @@ def get_orders( # TODO: make this a ``tractor.msg.pub`` -async def send_order_cmds(): - """Order streaming task: deliver orders transmitted from UI +async def send_order_cmds(symbol_key: str): + """ + Order streaming task: deliver orders transmitted from UI to downstream consumers. This is run in the UI actor (usually the one running Qt but could be @@ -160,10 +161,18 @@ async def send_order_cmds(): book._ready_to_receive.set() async for cmd in orders_stream: + print(cmd) + if cmd['symbol'] == symbol_key: - # send msg over IPC / wire - log.info(f'Send order cmd:\n{pformat(cmd)}') - yield cmd + # send msg over IPC / wire + log.info(f'Send order cmd:\n{pformat(cmd)}') + yield cmd + else: + # XXX BRUTAL HACKZORZES !!! + # re-insert for another consumer + # we need broadcast channelz...asap + # https://github.com/goodboy/tractor/issues/204 + book._to_ems.send_nowait(cmd) @asynccontextmanager diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 7cfd3a60..52fab921 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -675,7 +675,10 @@ async def _emsd_main( # acting as an EMS client and will submit orders) to # receive requests pushed over a tractor stream # using (for now) an async generator. - order_stream = await portal.run(send_order_cmds) + order_stream = await portal.run( + send_order_cmds, + symbol_key=symbol, + ) # start inbound order request processing await process_order_cmds( diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 8b1d1a14..29efcd0f 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -317,6 +317,7 @@ async def start_order_mode( symbol: Symbol, brokername: str, ) -> None: + # spawn EMS actor-service async with open_ems( brokername,