Rejig state with dataclasses; prep for numba
							parent
							
								
									7b5a72909e
								
							
						
					
					
						commit
						c030b63101
					
				
							
								
								
									
										257
									
								
								piker/_ems.py
								
								
								
								
							
							
						
						
									
										257
									
								
								piker/_ems.py
								
								
								
								
							|  | @ -18,7 +18,12 @@ | |||
| In suit parlance: "Execution management systems" | ||||
| 
 | ||||
| """ | ||||
| from typing import AsyncIterator | ||||
| from dataclasses import dataclass, field | ||||
| from typing import ( | ||||
|     AsyncIterator, List, Dict, Callable, Tuple, | ||||
|     Any, | ||||
| ) | ||||
| import uuid | ||||
| 
 | ||||
| import trio | ||||
| from trio_typing import TaskStatus | ||||
|  | @ -26,6 +31,7 @@ import tractor | |||
| 
 | ||||
| from . import data | ||||
| from .log import get_logger | ||||
| from .data._source import Symbol | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
|  | @ -34,8 +40,35 @@ _to_router: trio.abc.SendChannel = None | |||
| _from_ui: trio.abc.ReceiveChannel = None | ||||
| 
 | ||||
| 
 | ||||
| _local_book = {} | ||||
| 
 | ||||
| 
 | ||||
| @dataclass | ||||
| class OrderBook: | ||||
|     """Send (client?) side order book tracking. | ||||
| 
 | ||||
|     Mostly for keeping local state to match the EMS and use | ||||
|     events to trigger graphics updates. | ||||
| 
 | ||||
|     """ | ||||
|     orders: Dict[str, dict] = field(default_factory=dict) | ||||
|     _cmds_from_ui: trio.abc.ReceiveChannel = _from_ui | ||||
| 
 | ||||
| 
 | ||||
| _orders: OrderBook = None | ||||
| 
 | ||||
| 
 | ||||
| def get_orders() -> OrderBook: | ||||
|     global _orders | ||||
| 
 | ||||
|     if _orders is None: | ||||
|         _orders = OrderBook | ||||
| 
 | ||||
|     return _orders | ||||
| 
 | ||||
| 
 | ||||
| # TODO: make this a ``tractor.msg.pub`` | ||||
| async def stream_orders(): | ||||
| async def send_order_cmds(): | ||||
|     """Order streaming task: deliver orders transmitted from UI | ||||
|     to downstream consumers. | ||||
| 
 | ||||
|  | @ -48,7 +81,109 @@ async def stream_orders(): | |||
|     global _from_ui | ||||
| 
 | ||||
|     async for order in _from_ui: | ||||
|         yield order | ||||
| 
 | ||||
|         lc = order['chart'] | ||||
|         symbol = lc.symbol | ||||
|         tp = order['type'] | ||||
|         price = order['price'] | ||||
| 
 | ||||
|         oid = str(uuid.uuid4()) | ||||
| 
 | ||||
|         cmd = { | ||||
|             'price': price, | ||||
|             'action': 'alert', | ||||
|             'symbol': symbol.key, | ||||
|             'brokers': symbol.brokers, | ||||
|             'type': tp, | ||||
|             'price': price, | ||||
|             'oid': oid, | ||||
|         } | ||||
| 
 | ||||
|         _local_book[oid] = cmd | ||||
| 
 | ||||
|         yield cmd | ||||
| 
 | ||||
| 
 | ||||
| # streaming tasks which check for conditions per symbol per broker | ||||
| _scan_tasks: Dict[str, List] = {} | ||||
| 
 | ||||
| # levels which have an executable action (eg. alert, order, signal) | ||||
| _levels: Dict[str, list] = {} | ||||
| 
 | ||||
| # up to date last values from target streams | ||||
| _last_values: Dict[str, float] = {} | ||||
| 
 | ||||
| 
 | ||||
| # TODO: numba all of this | ||||
| def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]: | ||||
|     """Create a predicate for given ``exec_price`` based on last known | ||||
|     price, ``known_last``. | ||||
| 
 | ||||
|     This is an automatic alert level thunk generator based on where the | ||||
|     current last known value is and where the specified value of | ||||
|     interest is; pick an appropriate comparison operator based on | ||||
|     avoiding the case where the a predicate returns true immediately. | ||||
| 
 | ||||
|     """ | ||||
| 
 | ||||
|     if trigger_price >= known_last: | ||||
| 
 | ||||
|         def check_gt(price: float) -> bool: | ||||
|             if price >= trigger_price: | ||||
|                 return True | ||||
|             else: | ||||
|                 return False | ||||
| 
 | ||||
|         return check_gt | ||||
| 
 | ||||
|     elif trigger_price <= known_last: | ||||
| 
 | ||||
|         def check_lt(price: float) -> bool: | ||||
|             if price <= trigger_price: | ||||
|                 return True | ||||
|             else: | ||||
|                 return False | ||||
| 
 | ||||
|         return check_lt | ||||
| 
 | ||||
| 
 | ||||
| @dataclass | ||||
| class _ExecBook: | ||||
|     """EMS-side execution book. | ||||
| 
 | ||||
|     Contains conditions for executions (aka "orders"). | ||||
|     A singleton instance is created per EMS actor. | ||||
| 
 | ||||
|     """ | ||||
|     orders: Dict[ | ||||
|         Tuple[str, str], | ||||
|         Tuple[ | ||||
|             # predicates | ||||
|             Callable[[float], bool], | ||||
| 
 | ||||
|             # actions | ||||
|             Callable[[float], Dict[str, Any]], | ||||
| 
 | ||||
|         ] | ||||
|     ] = field(default_factory=dict) | ||||
| 
 | ||||
|     # most recent values | ||||
|     lasts: Dict[ | ||||
|         Tuple[str, str], | ||||
|         float | ||||
|     ] = field(default_factory=dict) | ||||
| 
 | ||||
| 
 | ||||
| _book = None | ||||
| 
 | ||||
| 
 | ||||
| def get_book() -> _ExecBook: | ||||
|     global _book | ||||
| 
 | ||||
|     if _book is None: | ||||
|         _book = _ExecBook() | ||||
| 
 | ||||
|     return _book | ||||
| 
 | ||||
| 
 | ||||
| async def exec_orders( | ||||
|  | @ -56,6 +191,7 @@ async def exec_orders( | |||
|     broker: str, | ||||
|     symbol: str, | ||||
|     exec_price: float, | ||||
|     task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, | ||||
| ) -> AsyncIterator[dict]: | ||||
| 
 | ||||
|     async with data.open_feed( | ||||
|  | @ -66,45 +202,50 @@ async def exec_orders( | |||
| 
 | ||||
|         # TODO: get initial price | ||||
| 
 | ||||
|         quote = await feed.receive() | ||||
|         first_quote = await feed.receive() | ||||
| 
 | ||||
|         # we automatically figure out what the alert check condition | ||||
|         # should be based on the current first price received from the | ||||
|         # feed, instead of being like every other shitty tina platform | ||||
|         # that makes the user choose the predicate operator. | ||||
|         last = quote[symbol]['close'] | ||||
|         book = get_book() | ||||
|         book.lasts[(broker, symbol)] = first_quote[symbol]['last'] | ||||
| 
 | ||||
|         if exec_price > last: | ||||
|         task_status.started(first_quote) | ||||
| 
 | ||||
|             def check(price: float) -> bool: | ||||
|                 if price >= exec_price: | ||||
|                     return True | ||||
|                 else: | ||||
|                     return False | ||||
|         # shield this field so the remote brokerd does not get cancelled | ||||
|         stream = feed.stream | ||||
| 
 | ||||
|         elif exec_price < last: | ||||
|         with stream.shield(): | ||||
|             async for quotes in stream: | ||||
| 
 | ||||
|             def check(price: float) -> bool: | ||||
|                 if price <= exec_price: | ||||
|                     return True | ||||
|                 else: | ||||
|                     return False | ||||
|                 for sym, quote in quotes.items(): | ||||
| 
 | ||||
|         async for quotes in feed.stream: | ||||
|                     execs = book.orders.get((broker, sym)) | ||||
| 
 | ||||
|             for sym, quote in quotes.items(): | ||||
|                     for tick in quote.get('ticks', ()): | ||||
|                         price = tick.get('price') | ||||
|                         if price < 0: | ||||
|                             # lel, fuck you ib | ||||
|                             continue | ||||
| 
 | ||||
|                 for tick in quote.get('ticks', ()): | ||||
|                     price = tick.get('price') | ||||
|                         # update to keep new cmds informed | ||||
|                         book.lasts[(broker, symbol)] = price | ||||
| 
 | ||||
|                     # push trigger msg back to parent as an "alert" | ||||
|                     # (mocking for eg. a "fill") | ||||
|                     if price and check(price): | ||||
|                         await ctx.send_yield({ | ||||
|                             'type': 'alert', | ||||
|                         }) | ||||
|                         return | ||||
|     # feed teardown | ||||
|                         # begin price actions sequence | ||||
| 
 | ||||
|                         if not execs: | ||||
|                             continue | ||||
| 
 | ||||
|                         for pred, action in tuple(execs): | ||||
|                             # push trigger msg back to parent as an "alert" | ||||
|                             # (mocking for eg. a "fill") | ||||
|                             if pred(price): | ||||
|                                 res = action(price) | ||||
|                                 await ctx.send_yield({ | ||||
|                                     'type': 'alert', | ||||
|                                     'price': price, | ||||
|                                 }) | ||||
|                                 execs.remove((pred, action)) | ||||
|                                 print(f"GOT ALERT FOR {exec_price} @ \n{tick}") | ||||
| 
 | ||||
|         # feed teardown | ||||
| 
 | ||||
| 
 | ||||
| @tractor.stream | ||||
|  | @ -113,36 +254,59 @@ async def stream_and_route(ctx, ui_name): | |||
| 
 | ||||
|     """ | ||||
|     actor = tractor.current_actor() | ||||
|     book = get_book() | ||||
| 
 | ||||
|     # new router entry point | ||||
|     async with tractor.wait_for_actor(ui_name) as portal: | ||||
| 
 | ||||
|         # spawn one task per broker feed | ||||
|         async with trio.open_nursery() as n: | ||||
| 
 | ||||
|             async for order in await portal.run(stream_orders): | ||||
|             async for cmd in await portal.run(send_order_cmds): | ||||
| 
 | ||||
|                 tp = order['type'] | ||||
|                 price = order['price'] | ||||
|                 sym = order['symbol'] | ||||
|                 brokers = order['brokers'] | ||||
|                 tp = cmd.pop('type') | ||||
|                 trigger_price = cmd['price'] | ||||
|                 sym = cmd['symbol'] | ||||
|                 brokers = cmd['brokers'] | ||||
| 
 | ||||
|                 if tp == 'alert': | ||||
|                     log.info(f'Alert {order} received in {actor.uid}') | ||||
|                     log.info(f'Alert {cmd} received in {actor.uid}') | ||||
| 
 | ||||
|                     n.start_soon( | ||||
|                 broker = brokers[0] | ||||
|                 last = book.lasts.get((broker, sym)) | ||||
| 
 | ||||
|                 if last is None:  # spawn new brokerd feed task | ||||
| 
 | ||||
|                     quote = await n.start( | ||||
|                         exec_orders, | ||||
|                         ctx, | ||||
|                         # TODO: eventually support N-brokers | ||||
|                         brokers[0], | ||||
|                         broker, | ||||
|                         sym, | ||||
|                         price, | ||||
|                         trigger_price, | ||||
|                     ) | ||||
|                     print(f"received first quote {quote}") | ||||
| 
 | ||||
|                 # begin wait on next order | ||||
|                 last = book.lasts[(broker, sym)] | ||||
|                 print(f'Known last is {last}') | ||||
| 
 | ||||
|                 # Auto-gen scanner predicate: | ||||
|                 # we automatically figure out what the alert check condition | ||||
|                 # should be based on the current first price received from the | ||||
|                 # feed, instead of being like every other shitty tina platform | ||||
|                 # that makes the user choose the predicate operator. | ||||
|                 pred = mk_check(trigger_price, last) | ||||
| 
 | ||||
|                 # create list of executions on first entry | ||||
|                 book.orders.setdefault((broker, sym), []).append( | ||||
|                     (pred, lambda p: p) | ||||
|                 ) | ||||
| 
 | ||||
|             # continue and wait on next order cmd | ||||
| 
 | ||||
| 
 | ||||
| async def spawn_router_stream_alerts( | ||||
|     ident: str, | ||||
|     symbol: Symbol, | ||||
|     task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED, | ||||
| ) -> None: | ||||
|     """Spawn an EMS daemon and begin sending orders and receiving | ||||
|  | @ -154,7 +318,7 @@ async def spawn_router_stream_alerts( | |||
|     _to_router, _from_ui = trio.open_memory_channel(100) | ||||
| 
 | ||||
|     actor = tractor.current_actor() | ||||
|     subactor_name = ident + '.router' | ||||
|     subactor_name = 'piker.ems' | ||||
| 
 | ||||
|     async with tractor.open_nursery() as n: | ||||
| 
 | ||||
|  | @ -166,6 +330,7 @@ async def spawn_router_stream_alerts( | |||
|             stream_and_route, | ||||
|             ui_name=actor.name | ||||
|         ) | ||||
| 
 | ||||
|         async with tractor.wait_for_actor(subactor_name): | ||||
|             # let parent task continue | ||||
|             task_status.started(_to_router) | ||||
|  | @ -175,6 +340,8 @@ async def spawn_router_stream_alerts( | |||
|             # TODO: this in another task? | ||||
|             # not sure if this will ever be a bottleneck, | ||||
|             # we probably could do graphics stuff first tho? | ||||
| 
 | ||||
|             # XXX: linux only for now | ||||
|             result = await trio.run_process( | ||||
|                 [ | ||||
|                     'notify-send', | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue