Move `broker_init()` into `brokers._daemon`

We might as well start standardizing on `brokerd` init such that it can
be used more generally in client code (such as the `.accounting.cli`
stuff).

Deats of `broker_init()` impl:
- loads appropriate py pkg module,
- reads any declared `__enable_modules__: listr[str]` which will be
  passed to `tractor.ActorNursery.start_actor(enabled_modules=<this>)`
- loads the `.brokers._daemon._setup_persistent_brokerd

As expected the `accounting.cli` tools can now import directly from this
new location and use the common daemon fixture definition.
basic_buy_bot
Tyler Goodlet 2023-06-23 17:33:38 -04:00
parent f7f76137ca
commit b1ef549276
3 changed files with 174 additions and 100 deletions

View File

@ -18,11 +18,6 @@
CLI front end for trades ledger and position tracking management. CLI front end for trades ledger and position tracking management.
''' '''
from typing import (
AsyncContextManager,
)
from types import ModuleType
from rich.console import Console from rich.console import Console
from rich.markdown import Markdown from rich.markdown import Markdown
import tractor import tractor
@ -36,63 +31,30 @@ from ..service import (
from ..clearing._messages import BrokerdPosition from ..clearing._messages import BrokerdPosition
from ..config import load_ledger from ..config import load_ledger
from ..calc import humanize from ..calc import humanize
from ..brokers._daemon import broker_init
ledger = typer.Typer() ledger = typer.Typer()
def broker_init( def unpack_fqan(
brokername: str, fully_qualified_account_name: str,
loglevel: str | None = None, console: Console | None,
) -> tuple | bool:
**start_actor_kwargs, try:
brokername, account = fully_qualified_account_name.split('.')
) -> tuple[ return brokername, account
ModuleType, except ValueError:
dict, if console is not None:
AsyncContextManager, md = Markdown(
]: f'=> `{fully_qualified_account_name}` <=\n\n'
''' 'is not a valid '
Given an input broker name, load all named arguments '__fully qualified account name?__\n\n'
which can be passed to a daemon + context spawn for 'Your account name needs to be of the form '
the relevant `brokerd` service endpoint. '`<brokername>.<account_name>`\n'
'''
from ..brokers import get_brokermod
brokermod = get_brokermod(brokername)
modpath = brokermod.__name__
start_actor_kwargs['name'] = f'brokerd.{brokername}'
start_actor_kwargs.update(
getattr(
brokermod,
'_spawn_kwargs',
{},
)
)
# lookup actor-enabled modules declared by the backend offering the
# `brokerd` endpoint(s).
enabled = start_actor_kwargs['enable_modules'] = [modpath]
for submodname in getattr(
brokermod,
'__enable_modules__',
[],
):
subpath = f'{modpath}.{submodname}'
enabled.append(subpath)
# TODO XXX: DO WE NEED THIS?
# enabled.append('piker.data.feed')
# non-blocking setup of brokerd service nursery
from ..brokers._daemon import _setup_persistent_brokerd
return (
brokermod,
start_actor_kwargs, # to `ActorNursery.start_actor()`
_setup_persistent_brokerd, # deamon service task ep
) )
console.print(md)
return False
@ledger.command() @ledger.command()
@ -108,19 +70,15 @@ def sync(
log = get_logger(loglevel) log = get_logger(loglevel)
console = Console() console = Console()
try: pair: tuple[str, str]
brokername, account = fully_qualified_account_name.split('.') if not (pair := unpack_fqan(
except ValueError: fully_qualified_account_name,
md = Markdown( console,
f'=> `{fully_qualified_account_name}` <=\n\n' )):
'is not a valid '
'__fully qualified account name?__\n\n'
'Your account name needs to be of the form '
'`<brokername>.<account_name>`\n'
)
console.print(md)
return return
brokername, account = pair
brokermod, start_kwargs, deamon_ep = broker_init( brokermod, start_kwargs, deamon_ep = broker_init(
brokername, brokername,
loglevel=loglevel, loglevel=loglevel,
@ -155,11 +113,22 @@ def sync(
) )
brokerd_stream: tractor.MsgStream brokerd_stream: tractor.MsgStream
async with open_brokerd_dialog( async with (
# engage the brokerd daemon context
portal.open_context(
deamon_ep,
brokername=brokername,
loglevel=loglevel,
),
# manually open the brokerd trade dialog EP
# (what the EMS normally does internall) B)
open_brokerd_dialog(
brokermod, brokermod,
portal, portal,
exec_mode=( exec_mode=(
'paper' if account == 'paper' 'paper'
if account == 'paper'
else 'live' else 'live'
), ),
loglevel=loglevel, loglevel=loglevel,
@ -167,6 +136,7 @@ def sync(
brokerd_stream, brokerd_stream,
pp_msg_table, pp_msg_table,
accounts, accounts,
),
): ):
try: try:
assert len(accounts) == 1 assert len(accounts) == 1
@ -253,5 +223,23 @@ def sync(
trio.run(main) trio.run(main)
@ledger.command()
def disect(
fully_qualified_account_name: str,
bs_mktid: int, # for ib
pdb: bool = False,
loglevel: str = typer.Option(
'error',
"-l",
),
):
pair: tuple[str, str]
if not (pair := unpack_fqan(
fully_qualified_account_name,
)):
return
if __name__ == "__main__": if __name__ == "__main__":
ledger() # this is called from ``>> ledger <accountname>`` ledger() # this is called from ``>> ledger <accountname>``

View File

@ -23,7 +23,11 @@ from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from typing import TYPE_CHECKING from types import ModuleType
from typing import (
TYPE_CHECKING,
AsyncContextManager,
)
import exceptiongroup as eg import exceptiongroup as eg
import tractor import tractor
@ -39,7 +43,7 @@ if TYPE_CHECKING:
# TODO: move this def to the `.data` subpkg.. # TODO: move this def to the `.data` subpkg..
# NOTE: keeping this list as small as possible is part of our caps-sec # NOTE: keeping this list as small as possible is part of our caps-sec
# model and should be treated with utmost care! # model and should be treated with utmost care!
_data_mods = [ _data_mods: str = [
'piker.brokers.core', 'piker.brokers.core',
'piker.brokers.data', 'piker.brokers.data',
'piker.brokers._daemon', 'piker.brokers._daemon',
@ -72,9 +76,13 @@ async def _setup_persistent_brokerd(
loglevel or tractor.current_actor().loglevel, loglevel or tractor.current_actor().loglevel,
name=f'{_util.subsys}.{brokername}', name=f'{_util.subsys}.{brokername}',
) )
# set global for this actor to this new process-wide instance B) # set global for this actor to this new process-wide instance B)
_util.log = log _util.log = log
# further, set the log level on any broker broker specific
# logger instance.
from piker.data import feed from piker.data import feed
assert not feed._bus assert not feed._bus
@ -111,6 +119,79 @@ async def _setup_persistent_brokerd(
raise raise
def broker_init(
brokername: str,
loglevel: str | None = None,
**start_actor_kwargs,
) -> tuple[
ModuleType,
dict,
AsyncContextManager,
]:
'''
Given an input broker name, load all named arguments
which can be passed for daemon endpoint + context spawn
as required in every `brokerd` (actor) service.
This includes:
- load the appropriate <brokername>.py pkg module,
- reads any declared `__enable_modules__: listr[str]` which will be
passed to `tractor.ActorNursery.start_actor(enabled_modules=<this>)`
at actor start time,
- deliver a references to the daemon lifetime fixture, which
for now is always the `_setup_persistent_brokerd()` context defined
above.
'''
from ..brokers import get_brokermod
brokermod = get_brokermod(brokername)
modpath: str = brokermod.__name__
start_actor_kwargs['name'] = f'brokerd.{brokername}'
start_actor_kwargs.update(
getattr(
brokermod,
'_spawn_kwargs',
{},
)
)
# XXX TODO: make this not so hacky/monkeypatched..
# -> we need a sane way to configure the logging level for all
# code running in brokerd.
# if utilmod := getattr(brokermod, '_util', False):
# utilmod.log.setLevel(loglevel.upper())
# lookup actor-enabled modules declared by the backend offering the
# `brokerd` endpoint(s).
enabled: list[str]
enabled = start_actor_kwargs['enable_modules'] = [
__name__, # so that eps from THIS mod can be invoked
modpath,
]
for submodname in getattr(
brokermod,
'__enable_modules__',
[],
):
subpath: str = f'{modpath}.{submodname}'
enabled.append(subpath)
# TODO XXX: DO WE NEED THIS?
# enabled.append('piker.data.feed')
return (
brokermod,
start_actor_kwargs, # to `ActorNursery.start_actor()`
# XXX see impl above; contains all (actor global)
# setup/teardown expected in all `brokerd` actor instances.
_setup_persistent_brokerd,
)
async def spawn_brokerd( async def spawn_brokerd(
brokername: str, brokername: str,
@ -120,44 +201,44 @@ async def spawn_brokerd(
) -> bool: ) -> bool:
from piker.service import Services
from piker.service._util import log # use service mngr log from piker.service._util import log # use service mngr log
log.info(f'Spawning {brokername} broker daemon') log.info(f'Spawning {brokername} broker daemon')
brokermod = get_brokermod(brokername) (
dname = f'brokerd.{brokername}' brokermode,
tractor_kwargs,
daemon_fixture_ep,
) = broker_init(
brokername,
loglevel,
**tractor_kwargs,
)
brokermod = get_brokermod(brokername)
extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {})
tractor_kwargs.update(extra_tractor_kwargs) tractor_kwargs.update(extra_tractor_kwargs)
# ask `pikerd` to spawn a new sub-actor and manage it under its # ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery # actor nursery
modpath = brokermod.__name__ from piker.service import Services
broker_enable = [modpath]
for submodname in getattr(
brokermod,
'__enable_modules__',
[],
):
subpath = f'{modpath}.{submodname}'
broker_enable.append(subpath)
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
portal = await Services.actor_n.start_actor( portal = await Services.actor_n.start_actor(
dname, dname,
enable_modules=_data_mods + broker_enable, enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'),
loglevel=loglevel,
debug_mode=Services.debug_mode, debug_mode=Services.debug_mode,
**tractor_kwargs **tractor_kwargs
) )
# non-blocking setup of brokerd service nursery # NOTE: the service mngr expects an already spawned actor + its
# portal ref in order to do non-blocking setup of brokerd
# service nursery.
await Services.start_service_task( await Services.start_service_task(
dname, dname,
portal, portal,
# signature of target root-task endpoint # signature of target root-task endpoint
_setup_persistent_brokerd, daemon_fixture_ep,
brokername=brokername, brokername=brokername,
loglevel=loglevel, loglevel=loglevel,
) )
@ -174,8 +255,11 @@ async def maybe_spawn_brokerd(
) -> tractor.Portal: ) -> tractor.Portal:
''' '''
Helper to spawn a brokerd service *from* a client Helper to spawn a brokerd service *from* a client who wishes to
who wishes to use the sub-actor-daemon. use the sub-actor-daemon but is fine with re-using any existing
and contactable `brokerd`.
Mas o menos, acts as a cached-actor-getter factory.
''' '''
from piker.service import maybe_spawn_daemon from piker.service import maybe_spawn_daemon

View File

@ -32,6 +32,8 @@ from ..log import (
subsys: str = 'piker.brokers' subsys: str = 'piker.brokers'
# NOTE: level should be reset by any actor that is spawned # NOTE: level should be reset by any actor that is spawned
# as well as given a (more) explicit name/key such
# as `piker.brokers.binance` matching the subpkg.
log = get_logger(subsys) log = get_logger(subsys)
get_console_log = partial( get_console_log = partial(