diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 87d41bd3..dbb0ff51 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -242,19 +242,20 @@ async def execute_triggers( async def exec_loop( + ctx: tractor.Context, feed: 'Feed', # noqa broker: str, symbol: str, _exec_mode: str, task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, + ) -> AsyncIterator[dict]: """Main scan loop for order execution conditions and submission to brokers. """ - - # TODO: get initial price quote from target broker + # XXX: this should be initial price quote from target provider first_quote = await feed.receive() book = get_dark_book(broker) @@ -351,6 +352,7 @@ async def process_broker_trades( # TODO: make this a context # in the paper engine case this is just a mem receive channel async with feed.receive_trades_data() as trades_stream: + first = await trades_stream.__anext__() # startup msg expected as first from broker backend @@ -651,35 +653,18 @@ async def _emsd_main( dark_book = get_dark_book(broker) - # get a portal back to the client - async with tractor.wait_for_actor(client_actor_name) as portal: + # spawn one task per broker feed + async with trio.open_nursery() as n: - # spawn one task per broker feed - async with trio.open_nursery() as n: + # TODO: eventually support N-brokers + async with data.open_feed( + broker, + [symbol], + loglevel='info', + ) as feed: - # TODO: eventually support N-brokers - async with data.open_feed( - broker, - [symbol], - loglevel='info', - ) as feed: - - # start the condition scan loop - quote, feed, client = await n.start( - exec_loop, - ctx, - feed, - broker, - symbol, - _mode, - ) - - await n.start( - process_broker_trades, - ctx, - feed, - dark_book, - ) + # get a portal back to the client + async with tractor.wait_for_actor(client_actor_name) as portal: # connect back to the calling actor (the one that is # acting as an EMS client and will submit orders) to @@ -690,6 +675,23 @@ async def _emsd_main( symbol_key=symbol, ) as order_stream: + # start the condition scan loop + quote, feed, client = await n.start( + exec_loop, + ctx, + feed, + broker, + symbol, + _mode, + ) + + await n.start( + process_broker_trades, + ctx, + feed, + dark_book, + ) + # start inbound order request processing await process_order_cmds( ctx,