Barebones level based alerts are working!
							parent
							
								
									97b2f86cfe
								
							
						
					
					
						commit
						a3468fb915
					
				
							
								
								
									
										110
									
								
								piker/_ems.py
								
								
								
								
							
							
						
						
									
										110
									
								
								piker/_ems.py
								
								
								
								
							|  | @ -18,10 +18,17 @@ | |||
| In suit parlance: "Execution management systems" | ||||
| 
 | ||||
| """ | ||||
| from typing import AsyncIterator | ||||
| 
 | ||||
| import trio | ||||
| from trio_typing import TaskStatus | ||||
| import tractor | ||||
| 
 | ||||
| from . import data | ||||
| from .log import get_logger | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| _to_router: trio.abc.SendChannel = None | ||||
| _from_ui: trio.abc.ReceiveChannel = None | ||||
|  | @ -44,7 +51,64 @@ async def stream_orders(): | |||
|         yield order | ||||
| 
 | ||||
| 
 | ||||
| async def stream_and_route(ui_name): | ||||
| async def exec_orders( | ||||
|     ctx: tractor.Context, | ||||
|     broker: str, | ||||
|     symbol: str, | ||||
|     exec_price: float, | ||||
| ) -> AsyncIterator[dict]: | ||||
| 
 | ||||
|     async with data.open_feed( | ||||
|         broker, | ||||
|         [symbol], | ||||
|         loglevel='info', | ||||
|     ) as feed: | ||||
| 
 | ||||
|         # TODO: get initial price | ||||
| 
 | ||||
|         quote = await feed.receive() | ||||
| 
 | ||||
|         # we automatically figure out what the alert check condition | ||||
|         # should be based on the current first price received from the | ||||
|         # feed, instead of being like every other shitty tina platform | ||||
|         # that makes the user choose the predicate operator. | ||||
|         last = quote[symbol]['close'] | ||||
| 
 | ||||
|         if exec_price > last: | ||||
| 
 | ||||
|             def check(price: float) -> bool: | ||||
|                 if price >= exec_price: | ||||
|                     return True | ||||
|                 else: | ||||
|                     return False | ||||
| 
 | ||||
|         elif exec_price < last: | ||||
| 
 | ||||
|             def check(price: float) -> bool: | ||||
|                 if price <= exec_price: | ||||
|                     return True | ||||
|                 else: | ||||
|                     return False | ||||
| 
 | ||||
|         async for quotes in feed.stream: | ||||
| 
 | ||||
|             for sym, quote in quotes.items(): | ||||
| 
 | ||||
|                 for tick in quote.get('ticks', ()): | ||||
|                     price = tick.get('price') | ||||
| 
 | ||||
|                     # push trigger msg back to parent as an "alert" | ||||
|                     # (mocking for eg. a "fill") | ||||
|                     if price and check(price): | ||||
|                         await ctx.send_yield({ | ||||
|                             'type': 'alert', | ||||
|                         }) | ||||
|                         return | ||||
|     # feed teardown | ||||
| 
 | ||||
| 
 | ||||
| @tractor.stream | ||||
| async def stream_and_route(ctx, ui_name): | ||||
|     """Order router (sub)actor entrypoint. | ||||
| 
 | ||||
|     """ | ||||
|  | @ -53,19 +117,38 @@ async def stream_and_route(ui_name): | |||
|     # new router entry point | ||||
|     async with tractor.wait_for_actor(ui_name) as portal: | ||||
| 
 | ||||
|         async for order in await portal.run(stream_orders): | ||||
|             print(f'order {order} received in {actor.uid}') | ||||
|         async with trio.open_nursery() as n: | ||||
| 
 | ||||
|             # push order back to parent as an "alert" | ||||
|             # (mocking for eg. a "fill") | ||||
|             yield order | ||||
|             async for order in await portal.run(stream_orders): | ||||
| 
 | ||||
|                 tp = order['type'] | ||||
|                 price = order['price'] | ||||
|                 sym = order['symbol'] | ||||
|                 brokers = order['brokers'] | ||||
| 
 | ||||
|                 if tp == 'alert': | ||||
|                     log.info(f'Alert {order} received in {actor.uid}') | ||||
| 
 | ||||
|                     n.start_soon( | ||||
|                         exec_orders, | ||||
|                         ctx, | ||||
|                         # TODO: eventually support N-brokers | ||||
|                         brokers[0], | ||||
|                         sym, | ||||
|                         price, | ||||
|                     ) | ||||
| 
 | ||||
|                 # begin wait on next order | ||||
| 
 | ||||
| 
 | ||||
| async def spawn_router_stream_alerts( | ||||
|     ident: str, | ||||
|     task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED, | ||||
| ) -> None: | ||||
|     """Spawn an EMS daemon and begin sending orders and receiving | ||||
|     alerts. | ||||
| 
 | ||||
|     """ | ||||
|     # setup local ui event streaming channels | ||||
|     global _from_ui, _to_router | ||||
|     _to_router, _from_ui = trio.open_memory_channel(100) | ||||
|  | @ -88,4 +171,17 @@ async def spawn_router_stream_alerts( | |||
|             task_status.started(_to_router) | ||||
| 
 | ||||
|         async for alert in stream: | ||||
|             print(f'alert {alert} received in {actor.uid}') | ||||
| 
 | ||||
|             # TODO: this in another task? | ||||
|             # not sure if this will ever be a bottleneck, | ||||
|             # we probably could do graphics stuff first tho? | ||||
|             result = await trio.run_process( | ||||
|                 [ | ||||
|                     'notify-send', | ||||
|                     'piker', | ||||
|                     f'Alert: {alert}', | ||||
|                     '-u', 'normal', | ||||
|                     '-t', '10000', | ||||
|                 ], | ||||
|             ) | ||||
|             log.runtime(result) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue