Hack broadcast chan for order submissions, smh
parent
4774881812
commit
9622254cdb
|
@ -137,8 +137,9 @@ def get_orders(
|
||||||
|
|
||||||
|
|
||||||
# TODO: make this a ``tractor.msg.pub``
|
# TODO: make this a ``tractor.msg.pub``
|
||||||
async def send_order_cmds():
|
async def send_order_cmds(symbol_key: str):
|
||||||
"""Order streaming task: deliver orders transmitted from UI
|
"""
|
||||||
|
Order streaming task: deliver orders transmitted from UI
|
||||||
to downstream consumers.
|
to downstream consumers.
|
||||||
|
|
||||||
This is run in the UI actor (usually the one running Qt but could be
|
This is run in the UI actor (usually the one running Qt but could be
|
||||||
|
@ -160,10 +161,18 @@ async def send_order_cmds():
|
||||||
book._ready_to_receive.set()
|
book._ready_to_receive.set()
|
||||||
|
|
||||||
async for cmd in orders_stream:
|
async for cmd in orders_stream:
|
||||||
|
print(cmd)
|
||||||
|
if cmd['symbol'] == symbol_key:
|
||||||
|
|
||||||
# send msg over IPC / wire
|
# send msg over IPC / wire
|
||||||
log.info(f'Send order cmd:\n{pformat(cmd)}')
|
log.info(f'Send order cmd:\n{pformat(cmd)}')
|
||||||
yield cmd
|
yield cmd
|
||||||
|
else:
|
||||||
|
# XXX BRUTAL HACKZORZES !!!
|
||||||
|
# re-insert for another consumer
|
||||||
|
# we need broadcast channelz...asap
|
||||||
|
# https://github.com/goodboy/tractor/issues/204
|
||||||
|
book._to_ems.send_nowait(cmd)
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
|
|
|
@ -675,7 +675,10 @@ async def _emsd_main(
|
||||||
# acting as an EMS client and will submit orders) to
|
# acting as an EMS client and will submit orders) to
|
||||||
# receive requests pushed over a tractor stream
|
# receive requests pushed over a tractor stream
|
||||||
# using (for now) an async generator.
|
# using (for now) an async generator.
|
||||||
order_stream = await portal.run(send_order_cmds)
|
order_stream = await portal.run(
|
||||||
|
send_order_cmds,
|
||||||
|
symbol_key=symbol,
|
||||||
|
)
|
||||||
|
|
||||||
# start inbound order request processing
|
# start inbound order request processing
|
||||||
await process_order_cmds(
|
await process_order_cmds(
|
||||||
|
|
|
@ -317,6 +317,7 @@ async def start_order_mode(
|
||||||
symbol: Symbol,
|
symbol: Symbol,
|
||||||
brokername: str,
|
brokername: str,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
# spawn EMS actor-service
|
# spawn EMS actor-service
|
||||||
async with open_ems(
|
async with open_ems(
|
||||||
brokername,
|
brokername,
|
||||||
|
|
Loading…
Reference in New Issue