diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 5bdf34d3..22226055 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -27,12 +27,14 @@ from typing import AsyncIterator, Callable, Any from bidict import bidict from pydantic import BaseModel import trio +from trio_typing import TaskStatus import tractor from .. import data from ..log import get_logger from ..data._normalize import iterticks from ..data.feed import Feed +from .._daemon import maybe_spawn_brokerd from . import _paper_engine as paper from ._messages import ( Status, Order, @@ -82,15 +84,16 @@ def mk_check( @dataclass class _DarkBook: - """Client-side execution book. + '''EMS-trigger execution book. - Contains conditions for executions (aka "orders") which are not - exposed to brokers and thus the market; i.e. these are privacy - focussed "client side" orders. + Contains conditions for executions (aka "orders" or "triggers") + which are not exposed to brokers and thus the market; i.e. these are + privacy focussed "client side" orders which are submitted in real-time + based on specified trigger conditions. - A singleton instance is created per EMS actor (for now). + A an instance per `brokerd` is created per EMS actor (for now). - """ + ''' broker: str # levels which have an executable action (eg. alert, order, signal) @@ -256,17 +259,33 @@ async def clear_dark_triggers( # print(f'execs scan took: {time.time() - start}') +@dataclass +class TradesRelay: + brokerd_dialogue: tractor.MsgStream + positions: dict[str, float] + consumers: int = 0 + + class _Router(BaseModel): - '''Order router which manages per-broker dark books, alerts, - and clearing related data feed management. + '''Order router which manages and tracks per-broker dark book, + alerts, clearing and related data feed management. + + A singleton per ``emsd`` actor. ''' + # setup at actor spawn time nursery: trio.Nursery feeds: dict[tuple[str, str], Any] = {} + + # broker to book map books: dict[str, _DarkBook] = {} + + # order id to client stream map dialogues: dict[str, list[tractor.MsgStream]] = {} - relays: dict[str, tuple[dict, tractor.MsgStream]] = {} + + # brokername to trades-dialogues streams with ``brokerd`` actors + relays: dict[str, TradesRelay] = {} class Config: arbitrary_types_allowed = True @@ -280,10 +299,166 @@ class _Router(BaseModel): return self.books.setdefault(brokername, _DarkBook(brokername)) + @asynccontextmanager + async def maybe_open_brokerd_trades_dialogue( + + self, + feed: Feed, + symbol: str, + dark_book: _DarkBook, + _exec_mode: str, + loglevel: str, + + ) -> tuple[dict, tractor.MsgStream]: + '''Open and yield ``brokerd`` trades dialogue context-stream if none + already exists. + + ''' + relay = self.relays.get(feed.mod.name) + + if relay is None: + + relay = await self.nursery.start( + open_brokerd_trades_dialogue, + self, + feed, + symbol, + _exec_mode, + loglevel, + ) + + relay.consumers += 1 + + # TODO: get updated positions here? + assert relay.brokerd_dialogue + try: + yield relay + + finally: + + # TODO: what exactly needs to be torn down here or + # are we just consumer tracking? + + relay.consumers -= 1 + _router: _Router = None +async def open_brokerd_trades_dialogue( + + router: _Router, + feed: Feed, + symbol: str, + _exec_mode: str, + loglevel: str, + + task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED, + +) -> tuple[dict, tractor.MsgStream]: + '''Open and yield ``brokerd`` trades dialogue context-stream if none + already exists. + + ''' + trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) + + broker = feed.mod.name + + # TODO: make a `tractor` bug about this! + # portal = feed._brokerd_portal + + # XXX: we must have our own portal + channel otherwise + # when the data feed closes it may result in a half-closed/fucked + # channel that the brokerd side thinks is still open somehow!? + async with maybe_spawn_brokerd( + + broker, + loglevel=loglevel, + + ) as portal: + + if trades_endpoint is None or _exec_mode == 'paper': + + # for paper mode we need to mock this trades response feed + # so we load bidir stream to a new sub-actor running a + # paper-simulator clearing engine. + + # load the paper trading engine + _exec_mode = 'paper' + log.warning(f'Entering paper trading mode for {broker}') + + # load the paper trading engine as a subactor of this emsd + # actor to simulate the real IPC load it'll have when also + # pulling data from feeds + open_trades_endpoint = paper.open_paperboi( + broker=broker, + symbol=symbol, + loglevel=loglevel, + ) + + else: + # open live brokerd trades endpoint + open_trades_endpoint = portal.open_context( + trades_endpoint, + loglevel=loglevel, + ) + + try: + async with ( + + open_trades_endpoint as (brokerd_ctx, positions), + brokerd_ctx.open_stream() as brokerd_trades_stream, + + ): + # XXX: really we only want one stream per `emsd` actor + # to relay global `brokerd` order events unless we're + # doing to expect each backend to relay only orders + # affiliated with a particular ``trades_dialogue()`` + # session (seems annoying for implementers). So, here + # we cache the relay task and instead of running multiple + # tasks (which will result in multiples of the same msg being + # relayed for each EMS client) we just register each client + # stream to this single relay loop using _router.dialogues + + # begin processing order events from the target brokerd backend + # by receiving order submission response messages, + # normalizing them to EMS messages and relaying back to + # the piker order client set. + + # with brokerd_trades_stream.shield(): + + relay = TradesRelay( + brokerd_dialogue=brokerd_trades_stream, + positions=positions, + consumers=1 + ) + + _router.relays[broker] = relay + + # the ems scan loop may be cancelled by the client but we + # want to keep the ``brokerd`` dialogue up regardless + + task_status.started(relay) + + await translate_and_relay_brokerd_events( + broker, + brokerd_trades_stream, + _router, + ) + + # this context should block here indefinitely until + # the ``brokerd`` task either dies or is cancelled + + finally: + # context must have been closed + # remove from cache so next client will respawn if needed + # print('BROKERD DIALOGUE KILLED!!?!?!') + # with trio.CancelScope(shield=True): + # await tractor.breakpoint() + # raise + _router.relays.pop(broker) + + @tractor.context async def _setup_persistent_emsd( @@ -301,17 +476,13 @@ async def _setup_persistent_emsd( # TODO: send back the full set of persistent orders/execs persistent await ctx.started() - # we pin this task to keep the feeds manager active until the - # parent actor decides to tear it down await trio.sleep_forever() async def translate_and_relay_brokerd_events( broker: str, - # ems_client_order_stream: tractor.MsgStream, brokerd_trades_stream: tractor.MsgStream, - book: _DarkBook, router: _Router, ) -> AsyncIterator[dict]: @@ -334,6 +505,11 @@ async def translate_and_relay_brokerd_events( {'presubmitted', 'submitted', 'cancelled', 'inactive'} ''' + book = router.get_dark_book(broker) + relay = router.relays[broker] + + assert relay.brokerd_dialogue == brokerd_trades_stream + async for brokerd_msg in brokerd_trades_stream: name = brokerd_msg['name'] @@ -342,13 +518,16 @@ async def translate_and_relay_brokerd_events( if name == 'position': + pos_msg = BrokerdPosition(**brokerd_msg).dict() + + # keep up to date locally in ``emsd`` + relay.positions.update(pos_msg) + # relay through position msgs immediately by # broadcasting updates on all client streams for oid, ems_client_order_stream in router.dialogues.items(): - await ems_client_order_stream.send( - BrokerdPosition(**brokerd_msg).dict() - ) + await ems_client_order_stream.send(pos_msg) continue @@ -425,7 +604,7 @@ async def translate_and_relay_brokerd_events( resp = None broker_details = {} - client_flow_complete: bool = False + # client_flow_complete: bool = False if name in ( 'error', @@ -460,7 +639,7 @@ async def translate_and_relay_brokerd_events( if msg.status == 'cancelled': - client_flow_complete = True + # client_flow_complete = True log.info(f'Cancellation for {oid} is complete!') if msg.status == 'filled': @@ -473,7 +652,7 @@ async def translate_and_relay_brokerd_events( # be sure to pop this stream from our dialogue set # since the order dialogue should be done. - client_flow_complete = True + # client_flow_complete = True log.info(f'Execution for {oid} is complete!') # just log it @@ -514,11 +693,11 @@ async def translate_and_relay_brokerd_events( ).dict() ) - # TODO: do we want this to keep things cleaned up? - # it might require a special status from brokerd to affirm the - # flow is complete? - # if client_flow_complete: - # router.dialogues.pop(oid) + # TODO: do we want this to keep things cleaned up? + # it might require a special status from brokerd to affirm the + # flow is complete? + # if client_flow_complete: + # router.dialogues.pop(oid) async def process_client_order_cmds( @@ -655,7 +834,7 @@ async def process_client_order_cmds( # flow so that if a cancel comes from the requesting # client, before that ack, when the ack does arrive we # immediately take the reqid from the broker and cancel - # that order with them immediately. + # that live order asap. dark_book._ems_entries[oid] = msg # "DARK" triggers @@ -725,102 +904,6 @@ async def process_client_order_cmds( ) -@asynccontextmanager -async def maybe_open_brokerd_trades_dialogue( - - router: _Router, - feed: Feed, - broker: str, - symbol: str, - dark_book: _DarkBook, - _exec_mode: str, - loglevel: str, - -) -> tuple[dict, tractor.MsgStream]: - '''Open and yield ``brokerd`` trades dialogue context-stream if none - already exists. - - ''' - trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) - portal = feed._brokerd_portal - - if broker in _router.relays: - - positions, brokerd_trades_stream = _router.relays[broker] - - # TODO: get updated positions here? - yield positions, brokerd_trades_stream - return - - if trades_endpoint is None or _exec_mode == 'paper': - - # for paper mode we need to mock this trades response feed - # so we load bidir stream to a new sub-actor running a - # paper-simulator clearing engine. - - # load the paper trading engine - _exec_mode = 'paper' - log.warning(f'Entering paper trading mode for {broker}') - - # load the paper trading engine as a subactor of this emsd - # actor to simulate the real IPC load it'll have when also - # pulling data from feeds - open_trades_endpoint = paper.open_paperboi( - broker=broker, - symbol=symbol, - loglevel=loglevel, - ) - - else: - # open live brokerd trades endpoint - open_trades_endpoint = portal.open_context( - trades_endpoint, - loglevel=loglevel, - ) - - async with ( - - open_trades_endpoint as (brokerd_ctx, positions), - brokerd_ctx.open_stream() as brokerd_trades_stream, - trio.open_nursery() as n, - - ): - # XXX: really we only want one stream per `emsd` actor - # to relay global `brokerd` order events unless we're - # doing to expect each backend to relay only orders - # affiliated with a particular ``trades_dialogue()`` - # session (seems annoying for implementers). So, here - # we cache the relay task and instead of running multiple - # tasks (which will result in multiples of the same msg being - # relayed for each EMS client) we just register each client - # stream to this single relay loop using _router.dialogues - - # begin processing order events from the target brokerd backend - # by receiving order submission response messages, - # normalizing them to EMS messages and relaying back to - # the piker order client set. - - n.start_soon( - - translate_and_relay_brokerd_events, - - broker, - # ems_client_order_stream, - brokerd_trades_stream, - dark_book, - _router, - ) - - _router.relays[broker] = (positions, brokerd_trades_stream) - - try: - yield positions, brokerd_trades_stream - - finally: - # remove from cache so next client will respawn if needed - _router.relays.pop(broker) - - @tractor.context async def _emsd_main( @@ -855,7 +938,7 @@ async def _emsd_main( run (dark order) conditions on inputs and trigger brokerd "live" order submissions. | - - ``translate_and_relay_brokerd_events()``: + - (maybe) ``translate_and_relay_brokerd_events()``: accept normalized trades responses from brokerd, process and relay to ems client(s); this is a effectively a "trade event reponse" proxy-broker. @@ -905,24 +988,24 @@ async def _emsd_main( # only open if one isn't already up: we try to keep # as few duplicate streams as necessary - maybe_open_brokerd_trades_dialogue( - _router, + _router.maybe_open_brokerd_trades_dialogue( feed, - broker, symbol, dark_book, _exec_mode, loglevel, - ) as (positions, brokerd_trades_stream), + ) as relay, trio.open_nursery() as n, ): + brokerd_stream = relay.brokerd_dialogue #.clone() + # signal to client that we're started # TODO: we could eventually send back **all** brokerd # positions here? - await ems_ctx.started(positions) + await ems_ctx.started(relay.positions) # establish 2-way stream with requesting order-client and # begin handling inbound order requests and updates @@ -932,7 +1015,8 @@ async def _emsd_main( n.start_soon( clear_dark_triggers, - brokerd_trades_stream, + # relay.brokerd_dialogue, + brokerd_stream, ems_client_order_stream, feed.stream, @@ -942,12 +1026,25 @@ async def _emsd_main( ) # start inbound (from attached client) order request processing - await process_client_order_cmds( + try: + await process_client_order_cmds( - ems_client_order_stream, - brokerd_trades_stream, - symbol, - feed, - dark_book, - _router, - ) + ems_client_order_stream, + + # relay.brokerd_dialogue, + brokerd_stream, + + symbol, + feed, + dark_book, + _router, + ) + finally: + pass + # for oid, client_stream in _router.dialogs.copy().items(): + # if client_stream is ems_client_order_stream: + # # TODO: we need a placeholder for sending + # # the updates to an alert system inside + # # ``emsd`` ?? + # print(f'popping order for stream {oid}') + # _router.dialogs.pop(oid)