Use broadcast chan for order client and avoid chan repacking

pause_feeds_on_sym_switch
Tyler Goodlet 2021-08-12 14:50:18 -04:00
parent 954dc6a8b0
commit 71b50fdae8
1 changed files with 13 additions and 18 deletions

View File

@ -25,6 +25,7 @@ from dataclasses import dataclass, field
import trio import trio
import tractor import tractor
from tractor._broadcast import broadcast_receiver
from ..data._source import Symbol from ..data._source import Symbol
from ..log import get_logger from ..log import get_logger
@ -123,10 +124,15 @@ def get_orders(
global _orders global _orders
if _orders is None: if _orders is None:
size = 100
tx, rx = trio.open_memory_channel(size)
brx = broadcast_receiver(rx, size)
# setup local ui event streaming channels for request/resp # setup local ui event streaming channels for request/resp
# streamging with EMS daemon # streamging with EMS daemon
_orders = OrderBook( _orders = OrderBook(
*trio.open_memory_channel(100), _to_ems=tx,
_from_order_book=brx,
) )
return _orders return _orders
@ -157,24 +163,13 @@ async def relay_order_cmds_from_sync_code(
""" """
book = get_orders() book = get_orders()
orders_stream = book._from_order_book async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream: async for cmd in orders_stream:
print(cmd)
if cmd['symbol'] == symbol_key: if cmd['symbol'] == symbol_key:
# send msg over IPC / wire
log.info(f'Send order cmd:\n{pformat(cmd)}') log.info(f'Send order cmd:\n{pformat(cmd)}')
# send msg over IPC / wire
await to_ems_stream.send(cmd) await to_ems_stream.send(cmd)
else:
# XXX BRUTAL HACKZORZES !!!
# re-insert for another consumer
# we need broadcast channelz...asap
# https://github.com/goodboy/tractor/issues/204
book._to_ems.send_nowait(cmd)
@asynccontextmanager @asynccontextmanager
async def open_ems( async def open_ems(