diff --git a/piker/_ems.py b/piker/_ems.py index 8a7a612b..8a8b0d25 100644 --- a/piker/_ems.py +++ b/piker/_ems.py @@ -18,7 +18,12 @@ In suit parlance: "Execution management systems" """ -from typing import AsyncIterator +from dataclasses import dataclass, field +from typing import ( + AsyncIterator, List, Dict, Callable, Tuple, + Any, +) +import uuid import trio from trio_typing import TaskStatus @@ -26,6 +31,7 @@ import tractor from . import data from .log import get_logger +from .data._source import Symbol log = get_logger(__name__) @@ -34,8 +40,35 @@ _to_router: trio.abc.SendChannel = None _from_ui: trio.abc.ReceiveChannel = None +_local_book = {} + + +@dataclass +class OrderBook: + """Send (client?) side order book tracking. + + Mostly for keeping local state to match the EMS and use + events to trigger graphics updates. + + """ + orders: Dict[str, dict] = field(default_factory=dict) + _cmds_from_ui: trio.abc.ReceiveChannel = _from_ui + + +_orders: OrderBook = None + + +def get_orders() -> OrderBook: + global _orders + + if _orders is None: + _orders = OrderBook + + return _orders + + # TODO: make this a ``tractor.msg.pub`` -async def stream_orders(): +async def send_order_cmds(): """Order streaming task: deliver orders transmitted from UI to downstream consumers. @@ -48,7 +81,109 @@ async def stream_orders(): global _from_ui async for order in _from_ui: - yield order + + lc = order['chart'] + symbol = lc.symbol + tp = order['type'] + price = order['price'] + + oid = str(uuid.uuid4()) + + cmd = { + 'price': price, + 'action': 'alert', + 'symbol': symbol.key, + 'brokers': symbol.brokers, + 'type': tp, + 'price': price, + 'oid': oid, + } + + _local_book[oid] = cmd + + yield cmd + + +# streaming tasks which check for conditions per symbol per broker +_scan_tasks: Dict[str, List] = {} + +# levels which have an executable action (eg. alert, order, signal) +_levels: Dict[str, list] = {} + +# up to date last values from target streams +_last_values: Dict[str, float] = {} + + +# TODO: numba all of this +def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]: + """Create a predicate for given ``exec_price`` based on last known + price, ``known_last``. + + This is an automatic alert level thunk generator based on where the + current last known value is and where the specified value of + interest is; pick an appropriate comparison operator based on + avoiding the case where the a predicate returns true immediately. + + """ + + if trigger_price >= known_last: + + def check_gt(price: float) -> bool: + if price >= trigger_price: + return True + else: + return False + + return check_gt + + elif trigger_price <= known_last: + + def check_lt(price: float) -> bool: + if price <= trigger_price: + return True + else: + return False + + return check_lt + + +@dataclass +class _ExecBook: + """EMS-side execution book. + + Contains conditions for executions (aka "orders"). + A singleton instance is created per EMS actor. + + """ + orders: Dict[ + Tuple[str, str], + Tuple[ + # predicates + Callable[[float], bool], + + # actions + Callable[[float], Dict[str, Any]], + + ] + ] = field(default_factory=dict) + + # most recent values + lasts: Dict[ + Tuple[str, str], + float + ] = field(default_factory=dict) + + +_book = None + + +def get_book() -> _ExecBook: + global _book + + if _book is None: + _book = _ExecBook() + + return _book async def exec_orders( @@ -56,6 +191,7 @@ async def exec_orders( broker: str, symbol: str, exec_price: float, + task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, ) -> AsyncIterator[dict]: async with data.open_feed( @@ -66,45 +202,50 @@ async def exec_orders( # TODO: get initial price - quote = await feed.receive() + first_quote = await feed.receive() - # we automatically figure out what the alert check condition - # should be based on the current first price received from the - # feed, instead of being like every other shitty tina platform - # that makes the user choose the predicate operator. - last = quote[symbol]['close'] + book = get_book() + book.lasts[(broker, symbol)] = first_quote[symbol]['last'] - if exec_price > last: + task_status.started(first_quote) - def check(price: float) -> bool: - if price >= exec_price: - return True - else: - return False + # shield this field so the remote brokerd does not get cancelled + stream = feed.stream - elif exec_price < last: + with stream.shield(): + async for quotes in stream: - def check(price: float) -> bool: - if price <= exec_price: - return True - else: - return False + for sym, quote in quotes.items(): - async for quotes in feed.stream: + execs = book.orders.get((broker, sym)) - for sym, quote in quotes.items(): + for tick in quote.get('ticks', ()): + price = tick.get('price') + if price < 0: + # lel, fuck you ib + continue - for tick in quote.get('ticks', ()): - price = tick.get('price') + # update to keep new cmds informed + book.lasts[(broker, symbol)] = price - # push trigger msg back to parent as an "alert" - # (mocking for eg. a "fill") - if price and check(price): - await ctx.send_yield({ - 'type': 'alert', - }) - return - # feed teardown + # begin price actions sequence + + if not execs: + continue + + for pred, action in tuple(execs): + # push trigger msg back to parent as an "alert" + # (mocking for eg. a "fill") + if pred(price): + res = action(price) + await ctx.send_yield({ + 'type': 'alert', + 'price': price, + }) + execs.remove((pred, action)) + print(f"GOT ALERT FOR {exec_price} @ \n{tick}") + + # feed teardown @tractor.stream @@ -113,36 +254,59 @@ async def stream_and_route(ctx, ui_name): """ actor = tractor.current_actor() + book = get_book() # new router entry point async with tractor.wait_for_actor(ui_name) as portal: + # spawn one task per broker feed async with trio.open_nursery() as n: - async for order in await portal.run(stream_orders): + async for cmd in await portal.run(send_order_cmds): - tp = order['type'] - price = order['price'] - sym = order['symbol'] - brokers = order['brokers'] + tp = cmd.pop('type') + trigger_price = cmd['price'] + sym = cmd['symbol'] + brokers = cmd['brokers'] if tp == 'alert': - log.info(f'Alert {order} received in {actor.uid}') + log.info(f'Alert {cmd} received in {actor.uid}') - n.start_soon( + broker = brokers[0] + last = book.lasts.get((broker, sym)) + + if last is None: # spawn new brokerd feed task + + quote = await n.start( exec_orders, ctx, # TODO: eventually support N-brokers - brokers[0], + broker, sym, - price, + trigger_price, ) + print(f"received first quote {quote}") - # begin wait on next order + last = book.lasts[(broker, sym)] + print(f'Known last is {last}') + + # Auto-gen scanner predicate: + # we automatically figure out what the alert check condition + # should be based on the current first price received from the + # feed, instead of being like every other shitty tina platform + # that makes the user choose the predicate operator. + pred = mk_check(trigger_price, last) + + # create list of executions on first entry + book.orders.setdefault((broker, sym), []).append( + (pred, lambda p: p) + ) + + # continue and wait on next order cmd async def spawn_router_stream_alerts( - ident: str, + symbol: Symbol, task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED, ) -> None: """Spawn an EMS daemon and begin sending orders and receiving @@ -154,7 +318,7 @@ async def spawn_router_stream_alerts( _to_router, _from_ui = trio.open_memory_channel(100) actor = tractor.current_actor() - subactor_name = ident + '.router' + subactor_name = 'piker.ems' async with tractor.open_nursery() as n: @@ -166,6 +330,7 @@ async def spawn_router_stream_alerts( stream_and_route, ui_name=actor.name ) + async with tractor.wait_for_actor(subactor_name): # let parent task continue task_status.started(_to_router) @@ -175,6 +340,8 @@ async def spawn_router_stream_alerts( # TODO: this in another task? # not sure if this will ever be a bottleneck, # we probably could do graphics stuff first tho? + + # XXX: linux only for now result = await trio.run_process( [ 'notify-send',