From b1ef5492764a3add6a629e7a3d120e1dcd4a15ba Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 23 Jun 2023 17:33:38 -0400 Subject: [PATCH] Move `broker_init()` into `brokers._daemon` We might as well start standardizing on `brokerd` init such that it can be used more generally in client code (such as the `.accounting.cli` stuff). Deats of `broker_init()` impl: - loads appropriate py pkg module, - reads any declared `__enable_modules__: listr[str]` which will be passed to `tractor.ActorNursery.start_actor(enabled_modules=)` - loads the `.brokers._daemon._setup_persistent_brokerd As expected the `accounting.cli` tools can now import directly from this new location and use the common daemon fixture definition. --- piker/accounting/cli.py | 146 ++++++++++++++++++--------------------- piker/brokers/_daemon.py | 126 +++++++++++++++++++++++++++------ piker/brokers/_util.py | 2 + 3 files changed, 174 insertions(+), 100 deletions(-) diff --git a/piker/accounting/cli.py b/piker/accounting/cli.py index 0b18a3eb..75798f3f 100644 --- a/piker/accounting/cli.py +++ b/piker/accounting/cli.py @@ -18,11 +18,6 @@ CLI front end for trades ledger and position tracking management. ''' -from typing import ( - AsyncContextManager, -) -from types import ModuleType - from rich.console import Console from rich.markdown import Markdown import tractor @@ -36,63 +31,30 @@ from ..service import ( from ..clearing._messages import BrokerdPosition from ..config import load_ledger from ..calc import humanize +from ..brokers._daemon import broker_init ledger = typer.Typer() -def broker_init( - brokername: str, - loglevel: str | None = None, - - **start_actor_kwargs, - -) -> tuple[ - ModuleType, - dict, - AsyncContextManager, -]: - ''' - Given an input broker name, load all named arguments - which can be passed to a daemon + context spawn for - the relevant `brokerd` service endpoint. - - ''' - from ..brokers import get_brokermod - brokermod = get_brokermod(brokername) - modpath = brokermod.__name__ - - start_actor_kwargs['name'] = f'brokerd.{brokername}' - start_actor_kwargs.update( - getattr( - brokermod, - '_spawn_kwargs', - {}, - ) - ) - - # lookup actor-enabled modules declared by the backend offering the - # `brokerd` endpoint(s). - enabled = start_actor_kwargs['enable_modules'] = [modpath] - for submodname in getattr( - brokermod, - '__enable_modules__', - [], - ): - subpath = f'{modpath}.{submodname}' - enabled.append(subpath) - - # TODO XXX: DO WE NEED THIS? - # enabled.append('piker.data.feed') - - # non-blocking setup of brokerd service nursery - from ..brokers._daemon import _setup_persistent_brokerd - - return ( - brokermod, - start_actor_kwargs, # to `ActorNursery.start_actor()` - _setup_persistent_brokerd, # deamon service task ep - ) +def unpack_fqan( + fully_qualified_account_name: str, + console: Console | None, +) -> tuple | bool: + try: + brokername, account = fully_qualified_account_name.split('.') + return brokername, account + except ValueError: + if console is not None: + md = Markdown( + f'=> `{fully_qualified_account_name}` <=\n\n' + 'is not a valid ' + '__fully qualified account name?__\n\n' + 'Your account name needs to be of the form ' + '`.`\n' + ) + console.print(md) + return False @ledger.command() @@ -108,19 +70,15 @@ def sync( log = get_logger(loglevel) console = Console() - try: - brokername, account = fully_qualified_account_name.split('.') - except ValueError: - md = Markdown( - f'=> `{fully_qualified_account_name}` <=\n\n' - 'is not a valid ' - '__fully qualified account name?__\n\n' - 'Your account name needs to be of the form ' - '`.`\n' - ) - console.print(md) + pair: tuple[str, str] + if not (pair := unpack_fqan( + fully_qualified_account_name, + console, + )): return + brokername, account = pair + brokermod, start_kwargs, deamon_ep = broker_init( brokername, loglevel=loglevel, @@ -155,18 +113,30 @@ def sync( ) brokerd_stream: tractor.MsgStream - async with open_brokerd_dialog( - brokermod, - portal, - exec_mode=( - 'paper' if account == 'paper' - else 'live' + async with ( + # engage the brokerd daemon context + portal.open_context( + deamon_ep, + brokername=brokername, + loglevel=loglevel, + ), + + # manually open the brokerd trade dialog EP + # (what the EMS normally does internall) B) + open_brokerd_dialog( + brokermod, + portal, + exec_mode=( + 'paper' + if account == 'paper' + else 'live' + ), + loglevel=loglevel, + ) as ( + brokerd_stream, + pp_msg_table, + accounts, ), - loglevel=loglevel, - ) as ( - brokerd_stream, - pp_msg_table, - accounts, ): try: assert len(accounts) == 1 @@ -253,5 +223,23 @@ def sync( trio.run(main) +@ledger.command() +def disect( + fully_qualified_account_name: str, + bs_mktid: int, # for ib + pdb: bool = False, + + loglevel: str = typer.Option( + 'error', + "-l", + ), +): + pair: tuple[str, str] + if not (pair := unpack_fqan( + fully_qualified_account_name, + )): + return + + if __name__ == "__main__": ledger() # this is called from ``>> ledger `` diff --git a/piker/brokers/_daemon.py b/piker/brokers/_daemon.py index 368e8116..ecb785f7 100644 --- a/piker/brokers/_daemon.py +++ b/piker/brokers/_daemon.py @@ -23,7 +23,11 @@ from __future__ import annotations from contextlib import ( asynccontextmanager as acm, ) -from typing import TYPE_CHECKING +from types import ModuleType +from typing import ( + TYPE_CHECKING, + AsyncContextManager, +) import exceptiongroup as eg import tractor @@ -39,7 +43,7 @@ if TYPE_CHECKING: # TODO: move this def to the `.data` subpkg.. # NOTE: keeping this list as small as possible is part of our caps-sec # model and should be treated with utmost care! -_data_mods = [ +_data_mods: str = [ 'piker.brokers.core', 'piker.brokers.data', 'piker.brokers._daemon', @@ -72,9 +76,13 @@ async def _setup_persistent_brokerd( loglevel or tractor.current_actor().loglevel, name=f'{_util.subsys}.{brokername}', ) + # set global for this actor to this new process-wide instance B) _util.log = log + # further, set the log level on any broker broker specific + # logger instance. + from piker.data import feed assert not feed._bus @@ -111,6 +119,79 @@ async def _setup_persistent_brokerd( raise +def broker_init( + brokername: str, + loglevel: str | None = None, + + **start_actor_kwargs, + +) -> tuple[ + ModuleType, + dict, + AsyncContextManager, +]: + ''' + Given an input broker name, load all named arguments + which can be passed for daemon endpoint + context spawn + as required in every `brokerd` (actor) service. + + This includes: + - load the appropriate .py pkg module, + - reads any declared `__enable_modules__: listr[str]` which will be + passed to `tractor.ActorNursery.start_actor(enabled_modules=)` + at actor start time, + - deliver a references to the daemon lifetime fixture, which + for now is always the `_setup_persistent_brokerd()` context defined + above. + + ''' + from ..brokers import get_brokermod + brokermod = get_brokermod(brokername) + modpath: str = brokermod.__name__ + + start_actor_kwargs['name'] = f'brokerd.{brokername}' + start_actor_kwargs.update( + getattr( + brokermod, + '_spawn_kwargs', + {}, + ) + ) + + # XXX TODO: make this not so hacky/monkeypatched.. + # -> we need a sane way to configure the logging level for all + # code running in brokerd. + # if utilmod := getattr(brokermod, '_util', False): + # utilmod.log.setLevel(loglevel.upper()) + + # lookup actor-enabled modules declared by the backend offering the + # `brokerd` endpoint(s). + enabled: list[str] + enabled = start_actor_kwargs['enable_modules'] = [ + __name__, # so that eps from THIS mod can be invoked + modpath, + ] + for submodname in getattr( + brokermod, + '__enable_modules__', + [], + ): + subpath: str = f'{modpath}.{submodname}' + enabled.append(subpath) + + # TODO XXX: DO WE NEED THIS? + # enabled.append('piker.data.feed') + + return ( + brokermod, + start_actor_kwargs, # to `ActorNursery.start_actor()` + + # XXX see impl above; contains all (actor global) + # setup/teardown expected in all `brokerd` actor instances. + _setup_persistent_brokerd, + ) + + async def spawn_brokerd( brokername: str, @@ -120,44 +201,44 @@ async def spawn_brokerd( ) -> bool: - from piker.service import Services from piker.service._util import log # use service mngr log - log.info(f'Spawning {brokername} broker daemon') - brokermod = get_brokermod(brokername) - dname = f'brokerd.{brokername}' + ( + brokermode, + tractor_kwargs, + daemon_fixture_ep, + ) = broker_init( + brokername, + loglevel, + **tractor_kwargs, + ) + brokermod = get_brokermod(brokername) extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) tractor_kwargs.update(extra_tractor_kwargs) # ask `pikerd` to spawn a new sub-actor and manage it under its # actor nursery - modpath = brokermod.__name__ - broker_enable = [modpath] - for submodname in getattr( - brokermod, - '__enable_modules__', - [], - ): - subpath = f'{modpath}.{submodname}' - broker_enable.append(subpath) + from piker.service import Services + dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}' portal = await Services.actor_n.start_actor( dname, - enable_modules=_data_mods + broker_enable, - loglevel=loglevel, + enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'), debug_mode=Services.debug_mode, **tractor_kwargs ) - # non-blocking setup of brokerd service nursery + # NOTE: the service mngr expects an already spawned actor + its + # portal ref in order to do non-blocking setup of brokerd + # service nursery. await Services.start_service_task( dname, portal, # signature of target root-task endpoint - _setup_persistent_brokerd, + daemon_fixture_ep, brokername=brokername, loglevel=loglevel, ) @@ -174,8 +255,11 @@ async def maybe_spawn_brokerd( ) -> tractor.Portal: ''' - Helper to spawn a brokerd service *from* a client - who wishes to use the sub-actor-daemon. + Helper to spawn a brokerd service *from* a client who wishes to + use the sub-actor-daemon but is fine with re-using any existing + and contactable `brokerd`. + + Mas o menos, acts as a cached-actor-getter factory. ''' from piker.service import maybe_spawn_daemon diff --git a/piker/brokers/_util.py b/piker/brokers/_util.py index baf2c7b2..30e36f2e 100644 --- a/piker/brokers/_util.py +++ b/piker/brokers/_util.py @@ -32,6 +32,8 @@ from ..log import ( subsys: str = 'piker.brokers' # NOTE: level should be reset by any actor that is spawned +# as well as given a (more) explicit name/key such +# as `piker.brokers.binance` matching the subpkg. log = get_logger(subsys) get_console_log = partial(