Attach to order client *after* feed connection to speed up the startup time
parent
319eacd8c1
commit
f51e12819a
|
@ -242,19 +242,20 @@ async def execute_triggers(
|
||||||
|
|
||||||
|
|
||||||
async def exec_loop(
|
async def exec_loop(
|
||||||
|
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
feed: 'Feed', # noqa
|
feed: 'Feed', # noqa
|
||||||
broker: str,
|
broker: str,
|
||||||
symbol: str,
|
symbol: str,
|
||||||
_exec_mode: str,
|
_exec_mode: str,
|
||||||
task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> AsyncIterator[dict]:
|
) -> AsyncIterator[dict]:
|
||||||
"""Main scan loop for order execution conditions and submission
|
"""Main scan loop for order execution conditions and submission
|
||||||
to brokers.
|
to brokers.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
# XXX: this should be initial price quote from target provider
|
||||||
# TODO: get initial price quote from target broker
|
|
||||||
first_quote = await feed.receive()
|
first_quote = await feed.receive()
|
||||||
|
|
||||||
book = get_dark_book(broker)
|
book = get_dark_book(broker)
|
||||||
|
@ -351,6 +352,7 @@ async def process_broker_trades(
|
||||||
# TODO: make this a context
|
# TODO: make this a context
|
||||||
# in the paper engine case this is just a mem receive channel
|
# in the paper engine case this is just a mem receive channel
|
||||||
async with feed.receive_trades_data() as trades_stream:
|
async with feed.receive_trades_data() as trades_stream:
|
||||||
|
|
||||||
first = await trades_stream.__anext__()
|
first = await trades_stream.__anext__()
|
||||||
|
|
||||||
# startup msg expected as first from broker backend
|
# startup msg expected as first from broker backend
|
||||||
|
@ -651,35 +653,18 @@ async def _emsd_main(
|
||||||
|
|
||||||
dark_book = get_dark_book(broker)
|
dark_book = get_dark_book(broker)
|
||||||
|
|
||||||
# get a portal back to the client
|
# spawn one task per broker feed
|
||||||
async with tractor.wait_for_actor(client_actor_name) as portal:
|
async with trio.open_nursery() as n:
|
||||||
|
|
||||||
# spawn one task per broker feed
|
# TODO: eventually support N-brokers
|
||||||
async with trio.open_nursery() as n:
|
async with data.open_feed(
|
||||||
|
broker,
|
||||||
|
[symbol],
|
||||||
|
loglevel='info',
|
||||||
|
) as feed:
|
||||||
|
|
||||||
# TODO: eventually support N-brokers
|
# get a portal back to the client
|
||||||
async with data.open_feed(
|
async with tractor.wait_for_actor(client_actor_name) as portal:
|
||||||
broker,
|
|
||||||
[symbol],
|
|
||||||
loglevel='info',
|
|
||||||
) as feed:
|
|
||||||
|
|
||||||
# start the condition scan loop
|
|
||||||
quote, feed, client = await n.start(
|
|
||||||
exec_loop,
|
|
||||||
ctx,
|
|
||||||
feed,
|
|
||||||
broker,
|
|
||||||
symbol,
|
|
||||||
_mode,
|
|
||||||
)
|
|
||||||
|
|
||||||
await n.start(
|
|
||||||
process_broker_trades,
|
|
||||||
ctx,
|
|
||||||
feed,
|
|
||||||
dark_book,
|
|
||||||
)
|
|
||||||
|
|
||||||
# connect back to the calling actor (the one that is
|
# connect back to the calling actor (the one that is
|
||||||
# acting as an EMS client and will submit orders) to
|
# acting as an EMS client and will submit orders) to
|
||||||
|
@ -690,6 +675,23 @@ async def _emsd_main(
|
||||||
symbol_key=symbol,
|
symbol_key=symbol,
|
||||||
) as order_stream:
|
) as order_stream:
|
||||||
|
|
||||||
|
# start the condition scan loop
|
||||||
|
quote, feed, client = await n.start(
|
||||||
|
exec_loop,
|
||||||
|
ctx,
|
||||||
|
feed,
|
||||||
|
broker,
|
||||||
|
symbol,
|
||||||
|
_mode,
|
||||||
|
)
|
||||||
|
|
||||||
|
await n.start(
|
||||||
|
process_broker_trades,
|
||||||
|
ctx,
|
||||||
|
feed,
|
||||||
|
dark_book,
|
||||||
|
)
|
||||||
|
|
||||||
# start inbound order request processing
|
# start inbound order request processing
|
||||||
await process_order_cmds(
|
await process_order_cmds(
|
||||||
ctx,
|
ctx,
|
||||||
|
|
Loading…
Reference in New Issue