diff --git a/piker/_daemon.py b/piker/_daemon.py index 799b1331..5e102e4c 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -102,7 +102,9 @@ async def open_pikerd( assert _services is None # XXX: this may open a root actor as well - async with tractor.open_root_actor( + async with ( + tractor.open_root_actor( + # passed through to ``open_root_actor`` arbiter_addr=_tractor_kwargs['arbiter_addr'], name=_root_dname, @@ -113,10 +115,10 @@ async def open_pikerd( # TODO: eventually we should be able to avoid # having the root have more then permissions to # spawn other specialized daemons I think? - # enable_modules=[__name__], enable_modules=_root_modules, - - ) as _, tractor.open_nursery() as actor_nursery: + ) as _, + tractor.open_nursery() as actor_nursery, + ): async with trio.open_nursery() as service_nursery: # setup service mngr singleton instance @@ -137,6 +139,7 @@ async def open_pikerd( async def maybe_open_runtime( loglevel: Optional[str] = None, **kwargs, + ) -> None: """ Start the ``tractor`` runtime (a root actor) if none exists. @@ -159,6 +162,7 @@ async def maybe_open_runtime( async def maybe_open_pikerd( loglevel: Optional[str] = None, **kwargs, + ) -> Union[tractor._portal.Portal, Services]: """If no ``pikerd`` daemon-root-actor can be found start it and yield up (we should probably figure out returning a portal to self @@ -207,7 +211,6 @@ async def maybe_spawn_daemon( service_name: str, spawn_func: Callable, spawn_args: dict[str, Any], - # brokername: str, loglevel: Optional[str] = None, **kwargs, @@ -236,8 +239,10 @@ async def maybe_spawn_daemon( # pikerd is not live we now become the root of the # process tree async with maybe_open_pikerd( + loglevel=loglevel, **kwargs, + ) as pikerd_portal: if pikerd_portal is None: @@ -265,8 +270,6 @@ async def spawn_brokerd( ) -> tractor._portal.Portal: - from .data import _setup_persistent_brokerd - log.info(f'Spawning {brokername} broker daemon') brokermod = get_brokermod(brokername) @@ -286,13 +289,9 @@ async def spawn_brokerd( **tractor_kwargs ) - # TODO: so i think this is the perfect use case for supporting - # a cross-actor async context manager api instead of this - # shoort-and-forget task spawned in the root nursery, we'd have an - # async exit stack that we'd register the `portal.open_context()` - # call with and then have the ability to unwind the call whenevs. - # non-blocking setup of brokerd service nursery + from .data import _setup_persistent_brokerd + await _services.open_remote_ctx( portal, _setup_persistent_brokerd, @@ -327,7 +326,6 @@ async def maybe_spawn_brokerd( async def spawn_emsd( - brokername: str, loglevel: Optional[str] = None, **extra_tractor_kwargs @@ -338,10 +336,10 @@ async def spawn_emsd( """ log.info('Spawning emsd') - # TODO: raise exception when _services == None? global _services + assert _services - await _services.actor_n.start_actor( + portal = await _services.actor_n.start_actor( 'emsd', enable_modules=[ 'piker.clearing._ems', @@ -351,6 +349,15 @@ async def spawn_emsd( debug_mode=_services.debug_mode, # set by pikerd flag **extra_tractor_kwargs ) + + # non-blocking setup of clearing service + from .clearing._ems import _setup_persistent_emsd + + await _services.open_remote_ctx( + portal, + _setup_persistent_emsd, + ) + return 'emsd' @@ -367,7 +374,7 @@ async def maybe_open_emsd( 'emsd', spawn_func=spawn_emsd, - spawn_args={'brokername': brokername, 'loglevel': loglevel}, + spawn_args={'loglevel': loglevel}, loglevel=loglevel, **kwargs, diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 316056be..7d658ddb 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -36,6 +36,7 @@ from .._daemon import maybe_open_emsd log = get_logger(__name__) +# TODO: some kinda validation like this # class Order(msgspec.Struct): # action: str # price: float @@ -137,7 +138,11 @@ def get_orders( return _orders -async def send_order_cmds(symbol_key: str): +async def relay_order_cmds_from_sync_code( + symbol_key: str, + to_ems_stream: tractor.MsgStream, + +) -> None: """ Order streaming task: deliver orders transmitted from UI to downstream consumers. @@ -157,16 +162,15 @@ async def send_order_cmds(symbol_key: str): book = get_orders() orders_stream = book._from_order_book - # signal that ems connection is up and ready - book._ready_to_receive.set() - async for cmd in orders_stream: + print(cmd) if cmd['symbol'] == symbol_key: # send msg over IPC / wire log.info(f'Send order cmd:\n{pformat(cmd)}') - yield cmd + await to_ems_stream.send(cmd) + else: # XXX BRUTAL HACKZORZES !!! # re-insert for another consumer @@ -213,32 +217,32 @@ async def open_ems( - 'broker_filled' """ - actor = tractor.current_actor() - # wait for service to connect back to us signalling # ready for order commands book = get_orders() async with maybe_open_emsd(broker) as portal: - async with portal.open_stream_from( + async with ( - _emsd_main, - client_actor_name=actor.name, - broker=broker, - symbol=symbol.key, + # connect to emsd + portal.open_context( + _emsd_main, + broker=broker, + symbol=symbol.key, - ) as trades_stream: - with trio.fail_after(10): - await book._ready_to_receive.wait() + # TODO: ``first`` here should be the active orders/execs + # persistent on the ems so that loca UI's can be populated. + ) as (ctx, first), + + # open 2-way trade command stream + ctx.open_stream() as trades_stream, + ): + async with trio.open_nursery() as n: + n.start_soon( + relay_order_cmds_from_sync_code, + symbol.key, + trades_stream + ) - try: yield book, trades_stream - - finally: - # TODO: we want to eventually keep this up (by having - # the exec loop keep running in the pikerd tree) but for - # now we have to kill the context to avoid backpressure - # build-up on the shm write loop. - with trio.CancelScope(shield=True): - await trades_stream.aclose() diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index dbb0ff51..50a44426 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -21,11 +21,10 @@ In da suit parlances: "Execution management systems" from pprint import pformat import time from dataclasses import dataclass, field -from typing import ( - AsyncIterator, Dict, Callable, Tuple, -) +from typing import AsyncIterator, Callable from bidict import bidict +from pydantic import BaseModel import trio from trio_typing import TaskStatus import tractor @@ -89,11 +88,11 @@ class _DarkBook: broker: str # levels which have an executable action (eg. alert, order, signal) - orders: Dict[ + orders: dict[ str, # symbol - Dict[ + dict[ str, # uuid - Tuple[ + tuple[ Callable[[float], bool], # predicate str, # name dict, # cmd / msg type @@ -102,22 +101,13 @@ class _DarkBook: ] = field(default_factory=dict) # tracks most recent values per symbol each from data feed - lasts: Dict[ - Tuple[str, str], + lasts: dict[ + tuple[str, str], float ] = field(default_factory=dict) # mapping of broker order ids to piker ems ids - _broker2ems_ids: Dict[str, str] = field(default_factory=bidict) - - -_books: Dict[str, _DarkBook] = {} - - -def get_dark_book(broker: str) -> _DarkBook: - - global _books - return _books.setdefault(broker, _DarkBook(broker)) + _broker2ems_ids: dict[str, str] = field(default_factory=bidict) # XXX: this is in place to prevent accidental positions that are too @@ -255,10 +245,12 @@ async def exec_loop( to brokers. """ + global _router + # XXX: this should be initial price quote from target provider first_quote = await feed.receive() - book = get_dark_book(broker) + book = _router.get_dark_book(broker) book.lasts[(broker, symbol)] = first_quote[symbol]['last'] # TODO: wrap this in a more re-usable general api @@ -478,12 +470,14 @@ async def process_broker_trades( async def process_order_cmds( + ctx: tractor.Context, cmd_stream: 'tractor.ReceiveStream', # noqa symbol: str, feed: 'Feed', # noqa client: 'Client', # noqa dark_book: _DarkBook, + ) -> None: async for cmd in cmd_stream: @@ -509,6 +503,7 @@ async def process_order_cmds( try: dark_book.orders[symbol].pop(oid, None) + # TODO: move these to `tractor.MsgStream` await ctx.send_yield({ 'resp': 'dark_cancelled', 'oid': oid @@ -616,13 +611,15 @@ async def process_order_cmds( }) -@tractor.stream +@tractor.context async def _emsd_main( + ctx: tractor.Context, - client_actor_name: str, + # client_actor_name: str, broker: str, symbol: str, _mode: str = 'dark', # ('paper', 'dark', 'live') + ) -> None: """EMS (sub)actor entrypoint providing the execution management (micro)service which conducts broker @@ -649,9 +646,10 @@ async def _emsd_main( accept normalized trades responses, process and relay to ems client(s) """ - from ._client import send_order_cmds + # from ._client import send_order_cmds - dark_book = get_dark_book(broker) + global _router + dark_book = _router.get_dark_book(broker) # spawn one task per broker feed async with trio.open_nursery() as n: @@ -664,40 +662,84 @@ async def _emsd_main( ) as feed: # get a portal back to the client - async with tractor.wait_for_actor(client_actor_name) as portal: + # async with tractor.wait_for_actor(client_actor_name) as portal: - # connect back to the calling actor (the one that is - # acting as an EMS client and will submit orders) to - # receive requests pushed over a tractor stream - # using (for now) an async generator. - async with portal.open_stream_from( - send_order_cmds, - symbol_key=symbol, - ) as order_stream: + await ctx.started() - # start the condition scan loop - quote, feed, client = await n.start( - exec_loop, - ctx, - feed, - broker, - symbol, - _mode, - ) + # establish 2-way stream with requesting order-client + async with ctx.open_stream() as order_stream: - await n.start( - process_broker_trades, - ctx, - feed, - dark_book, - ) + # start the condition scan loop + quote, feed, client = await n.start( + exec_loop, + ctx, + feed, + broker, + symbol, + _mode, + ) - # start inbound order request processing - await process_order_cmds( - ctx, - order_stream, - symbol, - feed, - client, - dark_book, - ) + # begin processing order events from the target brokerd backend + await n.start( + process_broker_trades, + ctx, + feed, + dark_book, + ) + + # start inbound (from attached client) order request processing + await process_order_cmds( + ctx, + order_stream, + symbol, + feed, + client, + dark_book, + ) + + +class _Router(BaseModel): + '''Order router which manages per-broker dark books, alerts, + and clearing related data feed management. + + ''' + nursery: trio.Nursery + + feeds: dict[str, tuple[trio.CancelScope, float]] = {} + books: dict[str, _DarkBook] = {} + + class Config: + arbitrary_types_allowed = True + underscore_attrs_are_private = False + + def get_dark_book( + self, + brokername: str, + + ) -> _DarkBook: + + return self.books.setdefault(brokername, _DarkBook(brokername)) + + +_router: _Router = None + + +@tractor.context +async def _setup_persistent_emsd( + + ctx: tractor.Context, + +) -> None: + + global _router + + # spawn one task per broker feed + async with trio.open_nursery() as service_nursery: + _router = _Router(nursery=service_nursery) + + # TODO: send back the full set of persistent orders/execs persistent + await ctx.started() + + # we pin this task to keep the feeds manager active until the + # parent actor decides to tear it down + await trio.sleep_forever()