diff --git a/piker/clearing/__init__.py b/piker/clearing/__init__.py index c4fc2647..06a9212e 100644 --- a/piker/clearing/__init__.py +++ b/piker/clearing/__init__.py @@ -18,3 +18,9 @@ Market machinery for order executions, book, management. """ +from ._client import open_ems + + +__all__ = [ + 'open_ems', +] diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 03fb62d3..f3b26cbe 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -18,8 +18,10 @@ Orders and execution client API. """ +from __future__ import annotations from contextlib import asynccontextmanager as acm from pprint import pformat +from typing import TYPE_CHECKING import trio import tractor @@ -27,11 +29,16 @@ from tractor.trionics import broadcast_receiver from ..log import get_logger from ..data.types import Struct -from ._ems import _emsd_main from .._daemon import maybe_open_emsd from ._messages import Order, Cancel from ..brokers import get_brokermod +if TYPE_CHECKING: + from ._messages import ( + BrokerdPosition, + Status, + ) + log = get_logger(__name__) @@ -167,12 +174,19 @@ async def relay_order_cmds_from_sync_code( @acm async def open_ems( fqsn: str, + mode: str = 'live', -) -> ( +) -> tuple[ OrderBook, tractor.MsgStream, - dict, -): + dict[ + # brokername, acctid + tuple[str, str], + list[BrokerdPosition], + ], + list[str], + dict[str, Status], +]: ''' Spawn an EMS daemon and begin sending orders and receiving alerts. @@ -213,14 +227,16 @@ async def open_ems( from ..data._source import unpack_fqsn broker, symbol, suffix = unpack_fqsn(fqsn) - mode: str = 'live' - async with maybe_open_emsd(broker) as portal: mod = get_brokermod(broker) - if not getattr(mod, 'trades_dialogue', None): + if ( + not getattr(mod, 'trades_dialogue', None) + or mode == 'paper' + ): mode = 'paper' + from ._ems import _emsd_main async with ( # connect to emsd portal.open_context( diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 8d6ba868..ba33e584 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -418,7 +418,7 @@ class Router(Struct): # load the paper trading engine exec_mode = 'paper' - log.warning(f'Entering paper trading mode for {broker}') + log.info(f'{broker}: Entering `paper` trading mode') # load the paper trading engine as a subactor of this emsd # actor to simulate the real IPC load it'll have when also @@ -1367,7 +1367,15 @@ async def _emsd_main( exec_mode: str, # ('paper', 'live') loglevel: str = 'info', -) -> None: +) -> tuple[ + dict[ + # brokername, acctid + tuple[str, str], + list[BrokerdPosition], + ], + list[str], + dict[str, Status], +]: ''' EMS (sub)actor entrypoint providing the execution management (micro)service which conducts broker order clearing control on