diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 513dded8..89630722 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -25,6 +25,7 @@ from dataclasses import dataclass, field import trio import tractor +from tractor._broadcast import broadcast_receiver from ..data._source import Symbol from ..log import get_logger @@ -123,10 +124,15 @@ def get_orders( global _orders if _orders is None: + size = 100 + tx, rx = trio.open_memory_channel(size) + brx = broadcast_receiver(rx, size) + # setup local ui event streaming channels for request/resp # streamging with EMS daemon _orders = OrderBook( - *trio.open_memory_channel(100), + _to_ems=tx, + _from_order_book=brx, ) return _orders @@ -157,23 +163,12 @@ async def relay_order_cmds_from_sync_code( """ book = get_orders() - orders_stream = book._from_order_book - - async for cmd in orders_stream: - - print(cmd) - if cmd['symbol'] == symbol_key: - - # send msg over IPC / wire - log.info(f'Send order cmd:\n{pformat(cmd)}') - await to_ems_stream.send(cmd) - - else: - # XXX BRUTAL HACKZORZES !!! - # re-insert for another consumer - # we need broadcast channelz...asap - # https://github.com/goodboy/tractor/issues/204 - book._to_ems.send_nowait(cmd) + async with book._from_order_book.subscribe() as orders_stream: + async for cmd in orders_stream: + if cmd['symbol'] == symbol_key: + log.info(f'Send order cmd:\n{pformat(cmd)}') + # send msg over IPC / wire + await to_ems_stream.send(cmd) @asynccontextmanager