diff --git a/piker/_ems.py b/piker/_ems.py index 8ede8f31..aa1bdac5 100644 --- a/piker/_ems.py +++ b/piker/_ems.py @@ -18,7 +18,7 @@ In suit parlance: "Execution management systems" """ -import time +# import time from dataclasses import dataclass, field from typing import ( AsyncIterator, Dict, Callable, Tuple, @@ -174,6 +174,9 @@ def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]: return check_lt, 'up' + else: + return None, None + @dataclass class _ExecBook: @@ -236,7 +239,7 @@ async def exec_orders( book = get_book() book.lasts[(broker, symbol)] = first_quote[symbol]['last'] - task_status.started(first_quote) + task_status.started((first_quote, feed)) # shield this field so the remote brokerd does not get cancelled stream = feed.stream @@ -249,7 +252,7 @@ async def exec_orders( # XXX: optimize this for speed ############################## - start = time.time() + # start = time.time() for sym, quote in quotes.items(): execs = book.orders.get((broker, sym)) @@ -288,10 +291,20 @@ async def exec_orders( print(f'execs are {execs}') - print(f'execs scan took: {time.time() - start}') + # print(f'execs scan took: {time.time() - start}') # feed teardown +async def receive_trade_updates( + ctx: tractor.Context, + feed: 'Feed', # noqa +) -> AsyncIterator[dict]: + # await tractor.breakpoint() + print("TRADESZ") + async for update in await feed.recv_trades_data(): + log.info(update) + + @tractor.stream async def stream_and_route(ctx, ui_name): """Order router (sub)actor entrypoint. @@ -338,7 +351,7 @@ async def stream_and_route(ctx, ui_name): if last is None: # spawn new brokerd feed task - quote = await n.start( + quote, feed = await n.start( exec_orders, ctx, # TODO: eventually support N-brokers @@ -346,7 +359,14 @@ async def stream_and_route(ctx, ui_name): sym, trigger_price, ) - print(f"received first quote {quote}") + + n.start_soon( + receive_trade_updates, + ctx, + # TODO: eventually support N-brokers + feed, + ) + last = book.lasts[(broker, sym)] print(f'Known last is {last}') @@ -359,6 +379,7 @@ async def stream_and_route(ctx, ui_name): # the user choose the predicate operator. pred, name = mk_check(trigger_price, last) + # create list of executions on first entry book.orders.setdefault( (broker, sym), {})[oid] = (pred, name, cmd)