Try having `brokerd` eps defined in `.brokers._daemon`

Since it's a bit weird having service specific implementation details
inside the general service `._daemon` mod, and since i'd mentioned
trying this re-org; let's do it B)

Requires enabling the new mod in both `pikerd` and `brokerd` and
obviously a bit more runtime-loading of the service modules in the
`brokerd` service eps to avoid import cycles.

Also moved `_setup_persistent_brokerd()` into the new mod since the
naming would place it there even though the implementation really
wouldn't (longer run) since we want to split up `.data.feed` layer
backend-invoked eps into a separate actor eventually from the "actual"
`brokerd` which will be the actor running **only** the trade control eps
(eg. trades_dialogue()` and friends).
rekt_pps
Tyler Goodlet 2023-04-21 15:47:54 -04:00
parent ed434e284b
commit 0b43e0aa8c
8 changed files with 179 additions and 136 deletions

View File

@ -79,7 +79,7 @@ def broker_init(
# enabled.append('piker.data.feed') # enabled.append('piker.data.feed')
# non-blocking setup of brokerd service nursery # non-blocking setup of brokerd service nursery
from ..data import _setup_persistent_brokerd from ..brokers import _setup_persistent_brokerd
return ( return (
start_actor_kwargs, # to `ActorNursery.start_actor()` start_actor_kwargs, # to `ActorNursery.start_actor()`

View File

@ -0,0 +1,169 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Broker-daemon-actor "endpoint-hooks": the service task entry points for
``brokerd``.
'''
from contextlib import (
asynccontextmanager as acm,
)
import tractor
import trio
from . import _util
from . import get_brokermod
# `brokerd` enabled modules
# TODO: move this def to the `.data` subpkg..
# NOTE: keeping this list as small as possible is part of our caps-sec
# model and should be treated with utmost care!
_data_mods = [
'piker.brokers.core',
'piker.brokers.data',
'piker.brokers._daemon',
'piker.data',
'piker.data.feed',
'piker.data._sampling'
]
# TODO: we should rename the daemon to datad prolly once we split up
# broker vs. data tasks into separate actors?
@tractor.context
async def _setup_persistent_brokerd(
ctx: tractor.Context,
brokername: str,
loglevel: str | None = None,
) -> None:
'''
Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by
the broker backend as needed.
'''
log = _util.get_console_log(
loglevel or tractor.current_actor().loglevel,
name=f'{_util.subsys}.{brokername}',
)
# set global for this actor to this new process-wide instance B)
_util.log = log
from piker.data.feed import (
_bus,
get_feed_bus,
)
global _bus
assert not _bus
async with trio.open_nursery() as service_nursery:
# assign a nursery to the feeds bus for spawning
# background tasks from clients
get_feed_bus(brokername, service_nursery)
# unblock caller
await ctx.started()
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
async def spawn_brokerd(
brokername: str,
loglevel: str | None = None,
**tractor_kwargs,
) -> bool:
from piker.service import Services
from piker.service._util import log # use service mngr log
log.info(f'Spawning {brokername} broker daemon')
brokermod = get_brokermod(brokername)
dname = f'brokerd.{brokername}'
extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {})
tractor_kwargs.update(extra_tractor_kwargs)
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
modpath = brokermod.__name__
broker_enable = [modpath]
for submodname in getattr(
brokermod,
'__enable_modules__',
[],
):
subpath = f'{modpath}.{submodname}'
broker_enable.append(subpath)
portal = await Services.actor_n.start_actor(
dname,
enable_modules=_data_mods + broker_enable,
loglevel=loglevel,
debug_mode=Services.debug_mode,
**tractor_kwargs
)
# non-blocking setup of brokerd service nursery
await Services.start_service_task(
dname,
portal,
# signature of target root-task endpoint
_setup_persistent_brokerd,
brokername=brokername,
loglevel=loglevel,
)
return True
@acm
async def maybe_spawn_brokerd(
brokername: str,
loglevel: str | None = None,
**pikerd_kwargs,
) -> tractor.Portal:
'''
Helper to spawn a brokerd service *from* a client
who wishes to use the sub-actor-daemon.
'''
from piker.service import maybe_spawn_daemon
async with maybe_spawn_daemon(
f'brokerd.{brokername}',
service_task_target=spawn_brokerd,
spawn_args={
'brokername': brokername,
},
loglevel=loglevel,
**pikerd_kwargs,
) as portal:
yield portal

View File

@ -31,6 +31,7 @@ from ..log import (
) )
subsys: str = 'piker.brokers' subsys: str = 'piker.brokers'
# NOTE: level should be reset by any actor that is spawned
log = get_logger(subsys) log = get_logger(subsys)
get_console_log = partial( get_console_log = partial(

View File

@ -50,40 +50,3 @@ __all__ = [
'open_shm_array', 'open_shm_array',
'get_shm_token', 'get_shm_token',
] ]
@tractor.context
async def _setup_persistent_brokerd(
ctx: tractor.Context,
brokername: str,
loglevel: str | None = None,
) -> None:
'''
Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by
the broker backend as needed.
'''
get_console_log(
loglevel or tractor.current_actor().loglevel,
)
from .feed import (
_bus,
get_feed_bus,
)
global _bus
assert not _bus
async with trio.open_nursery() as service_nursery:
# assign a nursery to the feeds bus for spawning
# background tasks from clients
get_feed_bus(brokername, service_nursery)
# unblock caller
await ctx.started()
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()

View File

@ -20,7 +20,6 @@ Actor-runtime service orchestration machinery.
""" """
from __future__ import annotations from __future__ import annotations
from ._util import log
from ._mngr import Services from ._mngr import Services
from ._registry import ( # noqa from ._registry import ( # noqa
_tractor_kwargs, _tractor_kwargs,
@ -33,8 +32,6 @@ from ._registry import ( # noqa
) )
from ._daemon import ( # noqa from ._daemon import ( # noqa
maybe_spawn_daemon, maybe_spawn_daemon,
spawn_brokerd,
maybe_spawn_brokerd,
spawn_emsd, spawn_emsd,
maybe_open_emsd, maybe_open_emsd,
) )
@ -44,6 +41,10 @@ from ._actor_runtime import (
open_pikerd, open_pikerd,
get_tractor_runtime_kwargs, get_tractor_runtime_kwargs,
) )
from ..brokers._daemon import (
spawn_brokerd,
maybe_spawn_brokerd,
)
__all__ = [ __all__ = [

View File

@ -133,8 +133,11 @@ _root_dname = 'pikerd'
_root_modules = [ _root_modules = [
__name__, __name__,
'piker.service._daemon', 'piker.service._daemon',
'piker.brokers._daemon',
'piker.clearing._ems', 'piker.clearing._ems',
'piker.clearing._client', 'piker.clearing._client',
'piker.data._sampling', 'piker.data._sampling',
] ]

View File

@ -32,25 +32,12 @@ import tractor
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
) )
from ..brokers import get_brokermod
from ._mngr import ( from ._mngr import (
Services, Services,
) )
from ._actor_runtime import maybe_open_pikerd from ._actor_runtime import maybe_open_pikerd
from ._registry import find_service from ._registry import find_service
# `brokerd` enabled modules
# TODO: move this def to the `.data` subpkg..
# NOTE: keeping this list as small as possible is part of our caps-sec
# model and should be treated with utmost care!
_data_mods = [
'piker.brokers.core',
'piker.brokers.data',
'piker.data',
'piker.data.feed',
'piker.data._sampling'
]
@acm @acm
async def maybe_spawn_daemon( async def maybe_spawn_daemon(
@ -145,87 +132,6 @@ async def maybe_spawn_daemon(
await portal.cancel_actor() await portal.cancel_actor()
async def spawn_brokerd(
brokername: str,
loglevel: str | None = None,
**tractor_kwargs,
) -> bool:
log.info(f'Spawning {brokername} broker daemon')
brokermod = get_brokermod(brokername)
dname = f'brokerd.{brokername}'
extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {})
tractor_kwargs.update(extra_tractor_kwargs)
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
modpath = brokermod.__name__
broker_enable = [modpath]
for submodname in getattr(
brokermod,
'__enable_modules__',
[],
):
subpath = f'{modpath}.{submodname}'
broker_enable.append(subpath)
portal = await Services.actor_n.start_actor(
dname,
enable_modules=_data_mods + broker_enable,
loglevel=loglevel,
debug_mode=Services.debug_mode,
**tractor_kwargs
)
# non-blocking setup of brokerd service nursery
from ..data import _setup_persistent_brokerd
await Services.start_service_task(
dname,
portal,
# signature of target root-task endpoint
_setup_persistent_brokerd,
brokername=brokername,
loglevel=loglevel,
)
return True
@acm
async def maybe_spawn_brokerd(
brokername: str,
loglevel: str | None = None,
**pikerd_kwargs,
) -> tractor.Portal:
'''
Helper to spawn a brokerd service *from* a client
who wishes to use the sub-actor-daemon.
'''
async with maybe_spawn_daemon(
f'brokerd.{brokername}',
service_task_target=spawn_brokerd,
spawn_args={
'brokername': brokername,
},
loglevel=loglevel,
**pikerd_kwargs,
) as portal:
yield portal
async def spawn_emsd( async def spawn_emsd(
loglevel: str | None = None, loglevel: str | None = None,

View File

@ -26,7 +26,7 @@ if TYPE_CHECKING:
import docker import docker
from ._ahab import DockerContainer from ._ahab import DockerContainer
from . import log # sub-sys logger from ._util import log # sub-sys logger
from ._util import ( from ._util import (
get_console_log, get_console_log,
) )