diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 23b50ddf..c0b06efe 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -23,7 +23,6 @@ from collections import ( defaultdict, # ChainMap, ) -from contextlib import asynccontextmanager from math import isnan from pprint import pformat import time @@ -41,9 +40,12 @@ import tractor from ..log import get_logger from ..data._normalize import iterticks -from ..data.feed import Feed, maybe_open_feed +from ..data.feed import ( + Feed, + maybe_open_feed, +) from ..data.types import Struct -from .._daemon import maybe_spawn_brokerd +# from .._daemon import maybe_spawn_brokerd from . import _paper_engine as paper from ._messages import ( Order, @@ -135,7 +137,6 @@ class _DarkBook(Struct): float, ] = {} - # _ems_entries: dict[str, str] = {} _active: dict = {} _ems2brokerd_ids: dict[str, str] = bidict() @@ -247,7 +248,6 @@ async def clear_dark_triggers( await brokerd_orders_stream.send(brokerd_msg) - # book._ems_entries[oid] = live_req # book._msgflows[oid].maps.insert(0, live_req) case _: @@ -383,62 +383,174 @@ class Router(Struct): if not stream._closed ) - @asynccontextmanager - async def maybe_open_brokerd_trades_dialogue( + async def maybe_open_trade_relays( self, - feed: Feed, - symbol: str, - dark_book: _DarkBook, + fqsn: str, exec_mode: str, loglevel: str, - ) -> tuple[dict, tractor.MsgStream]: + task_status: TaskStatus[ + tuple[TradesRelay, Feed] + ] = trio.TASK_STATUS_IGNORED, + + ) -> tuple[TradesRelay, Feed]: ''' Open and yield ``brokerd`` trades dialogue context-stream if none already exists. ''' - broker = feed.mod.name - relay: TradesRelay = self.relays.get(broker) + from ..data._source import unpack_fqsn + broker, symbol, suffix = unpack_fqsn(fqsn) - if ( - relay is None - - # We always want to spawn a new relay for the paper engine - # per symbol since we need a new tractor context to be - # opened for every every symbol such that a new data feed - # and ``PaperBoi`` client will be created and then used to - # simulate clearing events. - or exec_mode == 'paper' + async with ( + maybe_open_feed( + [fqsn], + loglevel=loglevel, + ) as (feed, quote_stream), ): + brokermod = feed.mod + broker = brokermod.name - relay = await self.nursery.start( - open_brokerd_trades_dialog, - self, - feed, - symbol, - exec_mode, - loglevel, - ) + # XXX: this should be initial price quote from target provider + first_quote: dict = feed.first_quotes[fqsn] + book: _DarkBook = self.get_dark_book(broker) + book.lasts[fqsn]: float = first_quote['last'] - self.nursery.start_soon( - translate_and_relay_brokerd_events, - broker, - relay.brokerd_stream, - self, - ) + relay: TradesRelay = self.relays.get(broker) + if ( + relay - relay.consumers += 1 + # We always want to spawn a new relay for the paper engine + # per symbol since we need a new tractor context to be + # opened for every every symbol such that a new data feed + # and ``PaperBoi`` client will be created and then used to + # simulate clearing events. + and exec_mode != 'paper' + ): + task_status.started((relay, feed)) + await trio.sleep_forever() + return - # TODO: get updated positions here? - assert relay.brokerd_stream - try: - yield relay - finally: + trades_endpoint = getattr(brokermod, 'trades_dialogue', None) + if ( + trades_endpoint is None + or exec_mode == 'paper' + ): + # for paper mode we need to mock this trades response feed + # so we load bidir stream to a new sub-actor running + # a paper-simulator clearing engine. - # TODO: what exactly needs to be torn down here or - # are we just consumer tracking? - relay.consumers -= 1 + # load the paper trading engine + exec_mode = 'paper' + log.warning(f'Entering paper trading mode for {broker}') + + # load the paper trading engine as a subactor of this emsd + # actor to simulate the real IPC load it'll have when also + # pulling data from feeds + open_trades_endpoint = paper.open_paperboi( + fqsn='.'.join([symbol, broker]), + loglevel=loglevel, + ) + + else: + # open live brokerd trades endpoint + open_trades_endpoint = feed.portal.open_context( + trades_endpoint, + loglevel=loglevel, + ) + + # open trades-dialog endpoint with backend broker + try: + positions: list[BrokerdPosition] + accounts: tuple[str] + + async with ( + open_trades_endpoint as ( + brokerd_ctx, + (positions, accounts,), + ), + brokerd_ctx.open_stream() as brokerd_trades_stream, + ): + # XXX: really we only want one stream per `emsd` + # actor to relay global `brokerd` order events + # unless we're going to expect each backend to + # relay only orders affiliated with a particular + # ``trades_dialogue()`` session (seems annoying + # for implementers). So, here we cache the relay + # task and instead of running multiple tasks + # (which will result in multiples of the same + # msg being relayed for each EMS client) we just + # register each client stream to this single + # relay loop in the dialog table. + + # begin processing order events from the target + # brokerd backend by receiving order submission + # response messages, normalizing them to EMS + # messages and relaying back to the piker order + # client set. + + # locally cache and track positions per account with + # a table of (brokername, acctid) -> `BrokerdPosition` + # msgs. + pps = {} + for msg in positions: + log.info(f'loading pp: {msg}') + + account = msg['account'] + + # TODO: better value error for this which + # dumps the account and message and states the + # mismatch.. + assert account in accounts + + pps.setdefault( + (broker, account), + [], + ).append(msg) + + relay = TradesRelay( + brokerd_stream=brokerd_trades_stream, + positions=pps, + accounts=accounts, + consumers=1, + ) + + self.relays[broker] = relay + + # spawn a ``brokerd`` order control dialog stream + # that syncs lifetime with the parent `emsd` daemon. + self.nursery.start_soon( + translate_and_relay_brokerd_events, + broker, + relay.brokerd_stream, + self, + ) + + # dark book clearing loop, also lives with parent + # daemon to allow dark order clearing while no + # client is connected. + self.nursery.start_soon( + clear_dark_triggers, + self, + relay.brokerd_stream, + quote_stream, + broker, + fqsn, # form: ... + book + ) + + task_status.started((relay, feed)) + + # this context should block here indefinitely until + # the ``brokerd`` task either dies or is cancelled + await trio.sleep_forever() + + finally: + # parent context must have been closed remove from cache so + # next client will respawn if needed + relay = self.relays.pop(broker, None) + if not relay: + log.warning(f'Relay for {broker} was already removed!?') async def client_broadcast( self, @@ -447,7 +559,14 @@ class Router(Struct): ) -> None: to_remove: set[tractor.MsgStream] = set() - subs = self.subscribers[sub_key] + + if sub_key == 'all': + subs = set() + for s in self.subscribers.values(): + subs |= s + else: + subs = self.subscribers[sub_key] + for client_stream in subs: try: await client_stream.send(msg) @@ -467,134 +586,6 @@ class Router(Struct): _router: Router = None -async def open_brokerd_trades_dialog( - - router: Router, - feed: Feed, - symbol: str, - exec_mode: str, - loglevel: str, - - task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED, - -) -> tuple[dict, tractor.MsgStream]: - ''' - Open and yield ``brokerd`` trades dialogue context-stream if none - already exists. - - ''' - trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) - - broker = feed.mod.name - - # TODO: make a `tractor` bug/test for this! - # if only i could member what the problem was.. - # probably some GC of the portal thing? - # portal = feed.portal - - # XXX: we must have our own portal + channel otherwise - # when the data feed closes it may result in a half-closed - # channel that the brokerd side thinks is still open somehow!? - async with maybe_spawn_brokerd( - broker, - loglevel=loglevel, - - ) as portal: - if ( - trades_endpoint is None - or exec_mode == 'paper' - ): - # for paper mode we need to mock this trades response feed - # so we load bidir stream to a new sub-actor running - # a paper-simulator clearing engine. - - # load the paper trading engine - exec_mode = 'paper' - log.warning(f'Entering paper trading mode for {broker}') - - # load the paper trading engine as a subactor of this emsd - # actor to simulate the real IPC load it'll have when also - # pulling data from feeds - open_trades_endpoint = paper.open_paperboi( - fqsn='.'.join([symbol, broker]), - loglevel=loglevel, - ) - - else: - # open live brokerd trades endpoint - open_trades_endpoint = portal.open_context( - trades_endpoint, - loglevel=loglevel, - ) - - try: - positions: list[BrokerdPosition] - accounts: tuple[str] - - async with ( - open_trades_endpoint as (brokerd_ctx, (positions, accounts,)), - brokerd_ctx.open_stream() as brokerd_trades_stream, - ): - # XXX: really we only want one stream per `emsd` actor - # to relay global `brokerd` order events unless we're - # going to expect each backend to relay only orders - # affiliated with a particular ``trades_dialogue()`` - # session (seems annoying for implementers). So, here - # we cache the relay task and instead of running multiple - # tasks (which will result in multiples of the same msg being - # relayed for each EMS client) we just register each client - # stream to this single relay loop in the dialog table. - - # begin processing order events from the target brokerd backend - # by receiving order submission response messages, - # normalizing them to EMS messages and relaying back to - # the piker order client set. - - # locally cache and track positions per account with - # a table of (brokername, acctid) -> `BrokerdPosition` - # msgs. - pps = {} - for msg in positions: - log.info(f'loading pp: {msg}') - - account = msg['account'] - - # TODO: better value error for this which - # dumps the account and message and states the - # mismatch.. - assert account in accounts - - pps.setdefault( - (broker, account), - [], - ).append(msg) - - relay = TradesRelay( - brokerd_stream=brokerd_trades_stream, - positions=pps, - accounts=accounts, - consumers=1, - ) - - router.relays[broker] = relay - - # the ems scan loop may be cancelled by the client but we - # want to keep the ``brokerd`` dialogue up regardless - - task_status.started(relay) - - # this context should block here indefinitely until - # the ``brokerd`` task either dies or is cancelled - await trio.sleep_forever() - - finally: - # parent context must have been closed remove from cache so - # next client will respawn if needed - relay = router.relays.pop(broker, None) - if not relay: - log.warning(f'Relay for {broker} was already removed!?') - - @tractor.context async def _setup_persistent_emsd( @@ -677,7 +668,7 @@ async def translate_and_relay_brokerd_events( # fan-out-relay position msgs immediately by # broadcasting updates on all client streams - await router.client_broadcast(sym, pos_msg) + await router.client_broadcast('all', pos_msg) continue # BrokerdOrderAck @@ -827,7 +818,7 @@ async def translate_and_relay_brokerd_events( # ``.oid`` in the msg since we're planning to # maybe-kinda offer that via using ``Status`` # in the longer run anyway? - log.warning(f'Unkown fill for {fmsg}') + log.warning(f'Unknown fill for {fmsg}') continue # proxy through the "fill" result(s) @@ -1026,7 +1017,6 @@ async def process_client_order_cmds( # acked yet by a brokerd, so register a cancel for when # the order ack does show up later such that the brokerd # order request can be cancelled at that time. - # dark_book._ems_entries[oid] = msg # special case for now.. status.req = to_brokerd_msg @@ -1286,7 +1276,6 @@ async def _emsd_main( from ..data._source import unpack_fqsn broker, symbol, suffix = unpack_fqsn(fqsn) - dark_book = _router.get_dark_book(broker) # TODO: would be nice if in tractor we can require either a ctx arg, # or a named arg with ctx in it and a type annotation of @@ -1294,108 +1283,82 @@ async def _emsd_main( ems_ctx = ctx # spawn one task per broker feed + relay: TradesRelay feed: Feed - async with ( - maybe_open_feed( - [fqsn], - loglevel=loglevel, - ) as (feed, quote_stream), - ): - # XXX: this should be initial price quote from target provider - first_quote: dict = feed.first_quotes[fqsn] - book: _DarkBook = _router.get_dark_book(broker) - book.lasts[fqsn]: float = first_quote['last'] + # open a stream with the brokerd backend for order flow dialogue + # only open if one isn't already up: we try to keep as few duplicate + # streams as necessary. + # TODO: should we try using `tractor.trionics.maybe_open_context()` + # here? + relay, feed = await _router.nursery.start( + _router.maybe_open_trade_relays, + fqsn, + exec_mode, + loglevel, + ) + brokerd_stream = relay.brokerd_stream + dark_book = _router.get_dark_book(broker) - # open a stream with the brokerd backend for order - # flow dialogue - async with ( + # signal to client that we're started and deliver + # all known pps and accounts for this ``brokerd``. + await ems_ctx.started(( + relay.positions, + list(relay.accounts), + dark_book._active, + )) - # only open if one isn't already up: we try to keep - # as few duplicate streams as necessary - _router.maybe_open_brokerd_trades_dialogue( + # establish 2-way stream with requesting order-client and + # begin handling inbound order requests and updates + async with ems_ctx.open_stream() as client_stream: + + # register the client side before starting the + # brokerd-side relay task to ensure the client is + # delivered all exisiting open orders on startup. + _router.clients.add(client_stream) + + # TODO: instead of by fqsn we need a subscription + # system/schema here to limit what each new client is + # allowed to see in terms of broadcasted order flow + # updates per dialog. + _router.subscribers[fqsn].add(client_stream) + + # start inbound (from attached client) order request processing + # main entrypoint, run here until cancelled. + try: + await process_client_order_cmds( + client_stream, + brokerd_stream, + fqsn, feed, - symbol, dark_book, - exec_mode, - loglevel, + _router, + ) - ) as relay, - trio.open_nursery() as n, - ): - - brokerd_stream = relay.brokerd_stream - - # signal to client that we're started and deliver - # all known pps and accounts for this ``brokerd``. - await ems_ctx.started(( - relay.positions, - list(relay.accounts), - book._active, - )) - - # establish 2-way stream with requesting order-client and - # begin handling inbound order requests and updates - async with ems_ctx.open_stream() as client_stream: - - # register the client side before starting the - # brokerd-side relay task to ensure the client is - # delivered all exisiting open orders on startup. - _router.clients.add(client_stream) - - # TODO: instead of by fqsn we need a subscription - # system/schema here to limit what each new client is - # allowed to see in terms of broadcasted order flow - # updates per dialog. - _router.subscribers[fqsn].add(client_stream) - - # trigger scan and exec loop - n.start_soon( - clear_dark_triggers, - _router, - brokerd_stream, - quote_stream, - broker, - fqsn, # form: ... - book + finally: + # try to remove client from "registry" + try: + _router.clients.remove(client_stream) + except KeyError: + log.warning( + f'Stream {client_stream._ctx.chan.uid}' + ' was already dropped?' ) - # start inbound (from attached client) order request processing - # main entrypoint, run here until cancelled. - try: - await process_client_order_cmds( - client_stream, - brokerd_stream, - fqsn, - feed, - dark_book, - _router, - ) + _router.subscribers[fqsn].remove(client_stream) + dialogs = _router.dialogs + for oid, client_streams in dialogs.items(): + if client_stream in client_streams: + client_streams.remove(client_stream) - finally: - # try to remove client from "registry" - try: - _router.clients.remove(client_stream) - except KeyError: + # TODO: for order dialogs left "alive" in + # the ems this is where we should allow some + # system to take over management. Likely we + # want to allow the user to choose what kind + # of policy to use (eg. cancel all orders + # from client, run some algo, etc.) + if not client_streams: log.warning( - f'Stream {client_stream._ctx.chan.uid}' - ' was already dropped?' + f'Order dialog is being unmonitored:\n' + f'{oid} ->\n{client_stream._ctx.chan.uid}' ) - - _router.subscribers[fqsn].remove(client_stream) - dialogs = _router.dialogs - for oid, client_streams in dialogs.items(): - if client_stream in client_streams: - client_streams.remove(client_stream) - - # TODO: for order dialogs left "alive" in - # the ems this is where we should allow some - # system to take over management. Likely we - # want to allow the user to choose what kind - # of policy to use (eg. cancel all orders - # from client, run some algo, etc.) - if not client_streams: - log.warning( - f'Order dialog is being unmonitored:\n' - f'{oid} ->\n{client_stream._ctx.chan.uid}' - )