diff --git a/piker/_ems.py b/piker/_ems.py index f3650f49..8a7a612b 100644 --- a/piker/_ems.py +++ b/piker/_ems.py @@ -18,10 +18,17 @@ In suit parlance: "Execution management systems" """ +from typing import AsyncIterator + import trio from trio_typing import TaskStatus import tractor +from . import data +from .log import get_logger + + +log = get_logger(__name__) _to_router: trio.abc.SendChannel = None _from_ui: trio.abc.ReceiveChannel = None @@ -44,7 +51,64 @@ async def stream_orders(): yield order -async def stream_and_route(ui_name): +async def exec_orders( + ctx: tractor.Context, + broker: str, + symbol: str, + exec_price: float, +) -> AsyncIterator[dict]: + + async with data.open_feed( + broker, + [symbol], + loglevel='info', + ) as feed: + + # TODO: get initial price + + quote = await feed.receive() + + # we automatically figure out what the alert check condition + # should be based on the current first price received from the + # feed, instead of being like every other shitty tina platform + # that makes the user choose the predicate operator. + last = quote[symbol]['close'] + + if exec_price > last: + + def check(price: float) -> bool: + if price >= exec_price: + return True + else: + return False + + elif exec_price < last: + + def check(price: float) -> bool: + if price <= exec_price: + return True + else: + return False + + async for quotes in feed.stream: + + for sym, quote in quotes.items(): + + for tick in quote.get('ticks', ()): + price = tick.get('price') + + # push trigger msg back to parent as an "alert" + # (mocking for eg. a "fill") + if price and check(price): + await ctx.send_yield({ + 'type': 'alert', + }) + return + # feed teardown + + +@tractor.stream +async def stream_and_route(ctx, ui_name): """Order router (sub)actor entrypoint. """ @@ -53,19 +117,38 @@ async def stream_and_route(ui_name): # new router entry point async with tractor.wait_for_actor(ui_name) as portal: - async for order in await portal.run(stream_orders): - print(f'order {order} received in {actor.uid}') + async with trio.open_nursery() as n: - # push order back to parent as an "alert" - # (mocking for eg. a "fill") - yield order + async for order in await portal.run(stream_orders): + + tp = order['type'] + price = order['price'] + sym = order['symbol'] + brokers = order['brokers'] + + if tp == 'alert': + log.info(f'Alert {order} received in {actor.uid}') + + n.start_soon( + exec_orders, + ctx, + # TODO: eventually support N-brokers + brokers[0], + sym, + price, + ) + + # begin wait on next order async def spawn_router_stream_alerts( ident: str, task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED, ) -> None: + """Spawn an EMS daemon and begin sending orders and receiving + alerts. + """ # setup local ui event streaming channels global _from_ui, _to_router _to_router, _from_ui = trio.open_memory_channel(100) @@ -88,4 +171,17 @@ async def spawn_router_stream_alerts( task_status.started(_to_router) async for alert in stream: - print(f'alert {alert} received in {actor.uid}') + + # TODO: this in another task? + # not sure if this will ever be a bottleneck, + # we probably could do graphics stuff first tho? + result = await trio.run_process( + [ + 'notify-send', + 'piker', + f'Alert: {alert}', + '-u', 'normal', + '-t', '10000', + ], + ) + log.runtime(result)