From cc3037149c87d07f95465326127c30b98bd90818 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 12 Jun 2023 19:51:55 -0400 Subject: [PATCH] Factor `brokerd` trade dialog init into acm Connecting to a `brokerd` daemon's trading dialog via a helper `@acm` func is handy so that arbitrary trading middleware clients **and** the ems can setup a trading dialog and, at the least, query existing position state; this is in fact our immediate need when simply querying for an account's position status in the `.accounting.cli.ledger` cli. It's now exposed (for now) as `.clearing._ems.open_brokerd_dialog()` and is called by the `Router.maybe_open_brokerd_dialog()` for every new relay allocation or paper-account engine instance. --- piker/clearing/__init__.py | 4 + piker/clearing/_ems.py | 302 +++++++++++++++++++++++-------------- 2 files changed, 190 insertions(+), 116 deletions(-) diff --git a/piker/clearing/__init__.py b/piker/clearing/__init__.py index b2cc5fa7..ec796ac9 100644 --- a/piker/clearing/__init__.py +++ b/piker/clearing/__init__.py @@ -23,11 +23,15 @@ from ._client import ( open_ems, OrderClient, ) +from ._ems import ( + open_brokerd_dialog, +) __all__ = [ 'open_ems', 'OrderClient', + 'open_brokerd_dialog', ] diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index e41ddbf1..7abd4a61 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -34,6 +34,7 @@ from typing import ( Callable, Hashable, Optional, + TYPE_CHECKING, ) from bidict import bidict @@ -50,14 +51,8 @@ from ..accounting._mktinfo import ( unpack_fqme, float_digits, ) -from ..data.feed import ( - Feed, - Flume, - maybe_open_feed, -) from ..ui._notify import notify_from_ems_status_msg from ..data.types import Struct -from . import _paper_engine as paper from ._messages import ( Order, Status, @@ -70,6 +65,12 @@ from ._messages import ( BrokerdPosition, ) +if TYPE_CHECKING: + from ..data.feed import ( + Feed, + Flume, + ) + # TODO: numba all of this def mk_check( @@ -307,15 +308,175 @@ class TradesRelay(Struct): # map of symbols to dicts of accounts to pp msgs positions: dict[ - # brokername, acctid + # brokername, acctid -> tuple[str, str], - list[BrokerdPosition], + # fqme -> msg + dict[str, BrokerdPosition], ] # allowed account names accounts: tuple[str] +@acm +async def open_brokerd_dialog( + brokermod: ModuleType, + portal: tractor.Portal, + exec_mode: str, + fqme: str | None = None, + loglevel: str | None = None, + +) -> tuple[ + tractor.MsgStream, + # {(brokername, accountname) -> {fqme -> msg}} + dict[(str, str), dict[str, BrokerdPosition]], + list[str], +]: + ''' + Open either a live trades control dialog or a dialog with a new + paper engine instance depending on live trading support for the + broker backend, configuration, or client code usage. + + ''' + broker: str = brokermod.name + + def mk_paper_ep(): + from . import _paper_engine as paper_mod + + nonlocal brokermod, exec_mode + + # for logging purposes + brokermod = paper_mod + + # for paper mode we need to mock this trades response feed + # so we load bidir stream to a new sub-actor running + # a paper-simulator clearing engine. + + # load the paper trading engine + exec_mode = 'paper' + log.info(f'{broker}: Entering `paper` trading mode') + + # load the paper trading engine as a subactor of this emsd + # actor to simulate the real IPC load it'll have when also + # pulling data from feeds + if not fqme: + log.warning( + f'Paper engine activate for {broker} but no fqme provided?' + ) + + return paper_mod.open_paperboi( + fqme=fqme, + broker=broker, + loglevel=loglevel, + ) + + # TODO: ideally choose only one of these ep names.. + trades_endpoint: Callable + for ep_name in [ + 'trades_dialogue', + 'open_trade_dialog', + ]: + trades_endpoint = getattr( + brokermod, + ep_name, + None, + ) + break + + if ( + trades_endpoint is not None + or exec_mode != 'paper' + ): + # open live brokerd trades endpoint + open_trades_endpoint = portal.open_context( + trades_endpoint, + loglevel=loglevel, + ) + + else: + exec_mode: str = 'paper' + + @acm + async def maybe_open_paper_ep(): + if exec_mode == 'paper': + async with mk_paper_ep() as msg: + yield msg + return + + # open trades-dialog endpoint with backend broker + async with open_trades_endpoint as msg: + ctx, first = msg + + # runtime indication that the backend can't support live + # order ctrl yet, so boot the paperboi B0 + if first == 'paper': + async with mk_paper_ep() as msg: + yield msg + return + else: + # working live ep case B) + yield msg + return + + pps_by_broker_account: dict[(str, str), BrokerdPosition] = {} + + async with ( + maybe_open_paper_ep() as ( + brokerd_ctx, + (position_msgs, accounts), + ), + brokerd_ctx.open_stream() as brokerd_trades_stream, + ): + # XXX: really we only want one stream per `emsd` + # actor to relay global `brokerd` order events + # unless we're going to expect each backend to + # relay only orders affiliated with a particular + # ``trades_dialogue()`` session (seems annoying + # for implementers). So, here we cache the relay + # task and instead of running multiple tasks + # (which will result in multiples of the same + # msg being relayed for each EMS client) we just + # register each client stream to this single + # relay loop in the dialog table. + + # begin processing order events from the target + # brokerd backend by receiving order submission + # response messages, normalizing them to EMS + # messages and relaying back to the piker order + # client set. + + # locally cache and track positions per account with + # a nested table of msgs: + # tuple(brokername, acctid) -> + # (fqme: str -> + # `BrokerdPosition`) + for msg in position_msgs: + + msg = BrokerdPosition(**msg) + log.info( + f'loading pp for {brokermod.__name__}:\n' + f'{pformat(msg.to_dict())}', + ) + + # TODO: state any mismatch here? + account: str = msg.account + assert account in accounts + + pps_by_broker_account.setdefault( + (broker, account), + {}, + )[msg.symbol] = msg + + # should be unique entries, verdad! + assert len(set(accounts)) == len(accounts) + + yield ( + brokerd_trades_stream, + pps_by_broker_account, + accounts, + ) + + class Router(Struct): ''' Order router which manages and tracks per-broker dark book, @@ -407,118 +568,25 @@ class Router(Struct): yield relay return - def mk_paper_ep(): - nonlocal brokermod, exec_mode + async with open_brokerd_dialog( + brokermod=brokermod, + portal=portal, + exec_mode=exec_mode, + fqme=fqme, + loglevel=loglevel, - # for logging purposes - brokermod = paper - - # for paper mode we need to mock this trades response feed - # so we load bidir stream to a new sub-actor running - # a paper-simulator clearing engine. - - # load the paper trading engine - exec_mode = 'paper' - log.info(f'{broker}: Entering `paper` trading mode') - - # load the paper trading engine as a subactor of this emsd - # actor to simulate the real IPC load it'll have when also - # pulling data from feeds - return paper.open_paperboi( - fqme=fqme, - loglevel=loglevel, - ) - - trades_endpoint = getattr(brokermod, 'trades_dialogue', None) - if ( - trades_endpoint is not None - or exec_mode != 'paper' + ) as ( + brokerd_stream, + pp_msg_table, + accounts, ): - # open live brokerd trades endpoint - open_trades_endpoint = portal.open_context( - trades_endpoint, - loglevel=loglevel, - ) - - else: - exec_mode: str = 'paper' - - @acm - async def maybe_open_paper_ep(): - if exec_mode == 'paper': - async with mk_paper_ep() as msg: - yield msg - return - - # open trades-dialog endpoint with backend broker - async with open_trades_endpoint as msg: - ctx, first = msg - - # runtime indication that the backend can't support live - # order ctrl yet, so boot the paperboi B0 - if first == 'paper': - async with mk_paper_ep() as msg: - yield msg - return - else: - # working live ep case B) - yield msg - return - - positions: list[BrokerdPosition] - accounts: tuple[str] - async with ( - maybe_open_paper_ep() as ( - brokerd_ctx, - (positions, accounts), - ), - brokerd_ctx.open_stream() as brokerd_trades_stream, - ): - # XXX: really we only want one stream per `emsd` - # actor to relay global `brokerd` order events - # unless we're going to expect each backend to - # relay only orders affiliated with a particular - # ``trades_dialogue()`` session (seems annoying - # for implementers). So, here we cache the relay - # task and instead of running multiple tasks - # (which will result in multiples of the same - # msg being relayed for each EMS client) we just - # register each client stream to this single - # relay loop in the dialog table. - - # begin processing order events from the target - # brokerd backend by receiving order submission - # response messages, normalizing them to EMS - # messages and relaying back to the piker order - # client set. - - # locally cache and track positions per account with - # a nested table of msgs: - # tuple(brokername, acctid) -> - # (fqme: str -> - # `BrokerdPosition`) + # create a new relay and sync it's state according + # to brokerd-backend reported position msgs. relay = TradesRelay( - brokerd_stream=brokerd_trades_stream, - positions={}, - accounts=accounts, + brokerd_stream=brokerd_stream, + positions=pp_msg_table, + accounts=tuple(accounts), ) - for msg in positions: - - msg = BrokerdPosition(**msg) - log.info( - f'loading pp for {brokermod.__name__}:\n' - f'{pformat(msg.to_dict())}', - ) - - # TODO: state any mismatch here? - account = msg.account - assert account in accounts - - relay.positions.setdefault( - (broker, account), - {}, - )[msg.symbol] = msg - self.relays[broker] = relay # this context should block here indefinitely until @@ -550,6 +618,8 @@ class Router(Struct): indefinitely. ''' + from ..data.feed import maybe_open_feed + async with ( maybe_open_feed( [fqme],