service_mng_to_tractor: factor service-mngr into tractor core #63
|
|
@ -23,6 +23,7 @@ from __future__ import annotations
|
|||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from functools import partial
|
||||
from types import ModuleType
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
|
|
@ -66,12 +67,13 @@ async def _setup_persistent_brokerd(
|
|||
ctx: tractor.Context,
|
||||
brokername: str,
|
||||
loglevel: str|None = None,
|
||||
debug_mode: bool = False,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Allocate a actor-wide service nursery in ``brokerd``
|
||||
such that feeds can be run in the background persistently by
|
||||
the broker backend as needed.
|
||||
Allocate a actor-wide service nursery in `brokerd` such that
|
||||
feeds can be run in the background persistently by the broker
|
||||
backend as needed.
|
||||
|
||||
'''
|
||||
# NOTE: we only need to setup logging once (and only) here
|
||||
|
|
@ -93,6 +95,18 @@ async def _setup_persistent_brokerd(
|
|||
from piker.data import feed
|
||||
assert not feed._bus
|
||||
|
||||
if (
|
||||
debug_mode
|
||||
and
|
||||
tractor.current_actor().is_infected_aio()
|
||||
):
|
||||
# NOTE, whenever running `asyncio` in provider's actor
|
||||
# runtime be sure we enabled `breakpoint()` support
|
||||
# for non-`trio.Task` usage.
|
||||
from tractor.devx import maybe_init_greenback
|
||||
await maybe_init_greenback()
|
||||
# breakpoint() # XXX, SHOULD WORK from `trio.Task`!
|
||||
|
||||
# allocate a nursery to the bus for spawning background
|
||||
# tasks to service client IPC requests, normally
|
||||
# `tractor.Context` connections to explicitly required
|
||||
|
|
@ -155,18 +169,21 @@ def broker_init(
|
|||
above.
|
||||
|
||||
'''
|
||||
from ..brokers import get_brokermod
|
||||
brokermod = get_brokermod(brokername)
|
||||
brokermod: ModuleType = get_brokermod(brokername)
|
||||
modpath: str = brokermod.__name__
|
||||
|
||||
start_actor_kwargs['name'] = f'brokerd.{brokername}'
|
||||
start_actor_kwargs.update(
|
||||
getattr(
|
||||
brokermod,
|
||||
'_spawn_kwargs',
|
||||
{},
|
||||
)
|
||||
spawn_kws: dict = getattr(
|
||||
brokermod,
|
||||
'_spawn_kwargs',
|
||||
{},
|
||||
)
|
||||
# ^^ NOTE, here we pull any runtime parameters specific
|
||||
# to spawning the sub-actor for the backend. For ex.
|
||||
# both `ib` and `deribit` rely on,
|
||||
# `'infect_asyncio': True,` since they both
|
||||
# use `tractor`'s "infected `asyncio` mode"
|
||||
# for their libs but you could also do something like
|
||||
# `'debug_mode: True` which would be like passing
|
||||
# `--pdb` for just that provider backend.
|
||||
|
||||
# XXX TODO: make this not so hacky/monkeypatched..
|
||||
# -> we need a sane way to configure the logging level for all
|
||||
|
|
@ -176,8 +193,7 @@ def broker_init(
|
|||
|
||||
# lookup actor-enabled modules declared by the backend offering the
|
||||
# `brokerd` endpoint(s).
|
||||
enabled: list[str]
|
||||
enabled = start_actor_kwargs['enable_modules'] = [
|
||||
enabled: list[str] = [
|
||||
__name__, # so that eps from THIS mod can be invoked
|
||||
modpath,
|
||||
]
|
||||
|
|
@ -189,9 +205,13 @@ def broker_init(
|
|||
subpath: str = f'{modpath}.{submodname}'
|
||||
enabled.append(subpath)
|
||||
|
||||
datad_kwargs: dict = {
|
||||
'name': f'brokerd.{brokername}',
|
||||
'enable_modules': enabled,
|
||||
}
|
||||
return (
|
||||
brokermod,
|
||||
start_actor_kwargs, # to `ActorNursery.start_actor()`
|
||||
start_actor_kwargs | datad_kwargs | spawn_kws, # to `ActorNursery.start_actor()`
|
||||
|
||||
# XXX see impl above; contains all (actor global)
|
||||
# setup/teardown expected in all `brokerd` actor instances.
|
||||
|
|
@ -206,12 +226,15 @@ async def spawn_brokerd(
|
|||
**tractor_kwargs,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
Spawn a `brokerd.<backendname>` subactor service daemon
|
||||
using `pikerd`'s service mngr.
|
||||
|
||||
'''
|
||||
log.info(
|
||||
f'Spawning broker-daemon,\n'
|
||||
f'backend: {brokername!r}'
|
||||
)
|
||||
|
||||
(
|
||||
brokermode,
|
||||
tractor_kwargs,
|
||||
|
|
@ -222,33 +245,41 @@ async def spawn_brokerd(
|
|||
**tractor_kwargs,
|
||||
)
|
||||
|
||||
brokermod = get_brokermod(brokername)
|
||||
extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {})
|
||||
tractor_kwargs.update(extra_tractor_kwargs)
|
||||
|
||||
# ask `pikerd` to spawn a new sub-actor and manage it under its
|
||||
# actor nursery
|
||||
from piker.service import Services
|
||||
|
||||
from piker.service import (
|
||||
get_service_mngr,
|
||||
ServiceMngr,
|
||||
)
|
||||
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
|
||||
portal = await Services.actor_n.start_actor(
|
||||
dname,
|
||||
enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'),
|
||||
debug_mode=Services.debug_mode,
|
||||
mngr: ServiceMngr = get_service_mngr()
|
||||
ctx: tractor.Context = await mngr.start_service(
|
||||
daemon_name=dname,
|
||||
ctx_ep=partial(
|
||||
# signature of target root-task endpoint
|
||||
daemon_fixture_ep,
|
||||
|
||||
# passed to daemon_fixture_ep(**kwargs)
|
||||
brokername=brokername,
|
||||
loglevel=loglevel,
|
||||
debug_mode=mngr.debug_mode,
|
||||
),
|
||||
debug_mode=mngr.debug_mode,
|
||||
# ^TODO, allow overriding this per-daemon from client side?
|
||||
# |_ it's already supported in `tractor` so..
|
||||
|
||||
loglevel=loglevel,
|
||||
enable_modules=(
|
||||
_data_mods
|
||||
+
|
||||
tractor_kwargs.pop('enable_modules')
|
||||
),
|
||||
**tractor_kwargs
|
||||
)
|
||||
|
||||
# NOTE: the service mngr expects an already spawned actor + its
|
||||
# portal ref in order to do non-blocking setup of brokerd
|
||||
# service nursery.
|
||||
await Services.start_service_task(
|
||||
dname,
|
||||
portal,
|
||||
|
||||
# signature of target root-task endpoint
|
||||
daemon_fixture_ep,
|
||||
brokername=brokername,
|
||||
loglevel=loglevel,
|
||||
assert (
|
||||
not ctx.cancel_called
|
||||
and ctx.portal # parent side
|
||||
and dname in ctx.chan.uid # subactor is named as desired
|
||||
)
|
||||
return True
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ from collections import (
|
|||
defaultdict,
|
||||
)
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from functools import partial
|
||||
import time
|
||||
from typing import (
|
||||
Any,
|
||||
|
|
@ -42,7 +43,7 @@ from tractor.trionics import (
|
|||
maybe_open_nursery,
|
||||
)
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
from trio import TaskStatus
|
||||
|
||||
from .ticktools import (
|
||||
frame_ticks,
|
||||
|
|
@ -68,6 +69,7 @@ if TYPE_CHECKING:
|
|||
_default_delay_s: float = 1.0
|
||||
|
||||
|
||||
# TODO: use new `tractor.singleton_acm` API for this!
|
||||
class Sampler:
|
||||
'''
|
||||
Global sampling engine registry.
|
||||
|
|
@ -398,7 +400,8 @@ async def register_with_sampler(
|
|||
|
||||
# unblock caller
|
||||
await ctx.started(
|
||||
set(Sampler.ohlcv_shms.keys())
|
||||
# XXX bc msgpack only allows one array type!
|
||||
list(Sampler.ohlcv_shms.keys())
|
||||
)
|
||||
|
||||
if open_index_stream:
|
||||
|
|
@ -454,7 +457,10 @@ async def spawn_samplerd(
|
|||
update and increment count write and stream broadcasting.
|
||||
|
||||
'''
|
||||
from piker.service import Services
|
||||
from piker.service import (
|
||||
get_service_mngr,
|
||||
ServiceMngr,
|
||||
)
|
||||
|
||||
dname = 'samplerd'
|
||||
log.info(f'Spawning `{dname}`')
|
||||
|
|
@ -462,27 +468,34 @@ async def spawn_samplerd(
|
|||
# singleton lock creation of ``samplerd`` since we only ever want
|
||||
# one daemon per ``pikerd`` proc tree.
|
||||
# TODO: make this built-into the service api?
|
||||
async with Services.locks[dname + '_singleton']:
|
||||
mngr: ServiceMngr = get_service_mngr()
|
||||
already_started: bool = dname in mngr.service_tasks
|
||||
|
||||
if dname not in Services.service_tasks:
|
||||
|
||||
portal = await Services.actor_n.start_actor(
|
||||
dname,
|
||||
enable_modules=[
|
||||
'piker.data._sampling',
|
||||
],
|
||||
loglevel=loglevel,
|
||||
debug_mode=Services.debug_mode, # set by pikerd flag
|
||||
**extra_tractor_kwargs
|
||||
)
|
||||
|
||||
await Services.start_service_task(
|
||||
dname,
|
||||
portal,
|
||||
async with mngr._locks[dname + '_singleton']:
|
||||
ctx: Context = await mngr.start_service(
|
||||
daemon_name=dname,
|
||||
ctx_ep=partial(
|
||||
register_with_sampler,
|
||||
period_s=1,
|
||||
sub_for_broadcasts=False,
|
||||
loglevel=loglevel,
|
||||
),
|
||||
debug_mode=mngr.debug_mode, # set by pikerd flag
|
||||
|
||||
# proxy-through to tractor
|
||||
enable_modules=[
|
||||
'piker.data._sampling',
|
||||
],
|
||||
loglevel=loglevel,
|
||||
**extra_tractor_kwargs
|
||||
)
|
||||
if not already_started:
|
||||
assert (
|
||||
ctx
|
||||
and
|
||||
ctx.portal
|
||||
and
|
||||
not ctx.cancel_called
|
||||
)
|
||||
return True
|
||||
|
||||
|
|
|
|||
|
|
@ -30,7 +30,11 @@ Actor runtime primtives and (distributed) service APIs for,
|
|||
=> TODO: maybe to (re)move elsewhere?
|
||||
|
||||
'''
|
||||
from ._mngr import Services as Services
|
||||
from ._mngr import (
|
||||
get_service_mngr as get_service_mngr,
|
||||
open_service_mngr as open_service_mngr,
|
||||
ServiceMngr as ServiceMngr,
|
||||
)
|
||||
from ._registry import (
|
||||
_tractor_kwargs as _tractor_kwargs,
|
||||
_default_reg_addr as _default_reg_addr,
|
||||
|
|
|
|||
|
|
@ -29,7 +29,6 @@ from contextlib import (
|
|||
)
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
|
||||
from piker.log import (
|
||||
get_console_log,
|
||||
|
|
@ -38,7 +37,8 @@ from ._util import (
|
|||
subsys,
|
||||
)
|
||||
from ._mngr import (
|
||||
Services,
|
||||
open_service_mngr,
|
||||
ServiceMngr,
|
||||
)
|
||||
from ._registry import ( # noqa
|
||||
_tractor_kwargs,
|
||||
|
|
@ -127,6 +127,10 @@ async def open_piker_runtime(
|
|||
enable_modules=enable_modules,
|
||||
hide_tb=False,
|
||||
|
||||
# TODO: how to configure this?
|
||||
# keep it on by default if debug mode is set?
|
||||
# maybe_enable_greenback=debug_mode,
|
||||
|
||||
**tractor_kwargs,
|
||||
) as actor,
|
||||
|
||||
|
|
@ -174,12 +178,13 @@ async def open_pikerd(
|
|||
|
||||
**kwargs,
|
||||
|
||||
) -> Services:
|
||||
) -> ServiceMngr:
|
||||
'''
|
||||
Start a root piker daemon with an indefinite lifetime.
|
||||
Start a root piker daemon actor (aka `pikerd`) with an indefinite
|
||||
lifetime.
|
||||
|
||||
A root actor nursery is created which can be used to create and keep
|
||||
alive underling services (see below).
|
||||
A root actor-nursery is created which can be used to spawn and
|
||||
supervise underling service sub-actors (see below).
|
||||
|
||||
'''
|
||||
# NOTE: for the root daemon we always enable the root
|
||||
|
|
@ -205,9 +210,6 @@ async def open_pikerd(
|
|||
root_actor,
|
||||
reg_addrs,
|
||||
),
|
||||
tractor.open_nursery() as actor_nursery,
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as service_tn,
|
||||
):
|
||||
for addr in reg_addrs:
|
||||
if addr not in root_actor.accept_addrs:
|
||||
|
|
@ -216,25 +218,17 @@ async def open_pikerd(
|
|||
'Maybe you have another daemon already running?'
|
||||
)
|
||||
|
||||
# assign globally for future daemon/task creation
|
||||
Services.actor_n = actor_nursery
|
||||
Services.service_n = service_tn
|
||||
Services.debug_mode = debug_mode
|
||||
|
||||
try:
|
||||
yield Services
|
||||
|
||||
finally:
|
||||
# TODO: is this more clever/efficient?
|
||||
# if 'samplerd' in Services.service_tasks:
|
||||
# await Services.cancel_service('samplerd')
|
||||
service_tn.cancel_scope.cancel()
|
||||
mngr: ServiceMngr
|
||||
async with open_service_mngr(
|
||||
debug_mode=debug_mode,
|
||||
) as mngr:
|
||||
yield mngr
|
||||
|
||||
|
||||
# TODO: do we even need this?
|
||||
# @acm
|
||||
# async def maybe_open_runtime(
|
||||
# loglevel: Optional[str] = None,
|
||||
# loglevel: str|None = None,
|
||||
# **kwargs,
|
||||
|
||||
# ) -> None:
|
||||
|
|
@ -265,7 +259,7 @@ async def maybe_open_pikerd(
|
|||
|
||||
) -> (
|
||||
tractor._portal.Portal
|
||||
|ClassVar[Services]
|
||||
|ClassVar[ServiceMngr]
|
||||
):
|
||||
'''
|
||||
If no ``pikerd`` daemon-root-actor can be found start it and
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ from piker.log import (
|
|||
get_console_log,
|
||||
get_logger,
|
||||
)
|
||||
from ._mngr import Services
|
||||
from ._mngr import ServiceMngr
|
||||
from .. import config
|
||||
|
||||
log = get_logger(name=__name__)
|
||||
|
|
@ -458,7 +458,7 @@ async def open_ahabd(
|
|||
|
||||
@acm
|
||||
async def start_ahab_service(
|
||||
services: Services,
|
||||
services: ServiceMngr,
|
||||
service_name: str,
|
||||
|
||||
# endpoint config passed as **kwargs
|
||||
|
|
@ -554,7 +554,8 @@ async def start_ahab_service(
|
|||
log.warning('Failed to cancel root permsed container')
|
||||
|
||||
except (
|
||||
trio.MultiError,
|
||||
# trio.MultiError,
|
||||
ExceptionGroup,
|
||||
) as err:
|
||||
for subexc in err.exceptions:
|
||||
if isinstance(subexc, PermissionError):
|
||||
|
|
|
|||
|
|
@ -26,16 +26,18 @@ from typing import (
|
|||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from collections import defaultdict
|
||||
|
||||
import tractor
|
||||
from trio.lowlevel import current_task
|
||||
import trio
|
||||
|
||||
from piker.log import (
|
||||
get_console_log,
|
||||
get_logger,
|
||||
)
|
||||
from ._mngr import (
|
||||
Services,
|
||||
get_service_mngr,
|
||||
ServiceMngr,
|
||||
)
|
||||
from ._actor_runtime import maybe_open_pikerd
|
||||
from ._registry import find_service
|
||||
|
|
@ -47,12 +49,12 @@ log = get_logger(name=__name__)
|
|||
async def maybe_spawn_daemon(
|
||||
service_name: str,
|
||||
service_task_target: Callable,
|
||||
|
||||
spawn_args: dict[str, Any],
|
||||
|
||||
loglevel: str|None = None,
|
||||
singleton: bool = False,
|
||||
|
||||
_locks = defaultdict(trio.Lock),
|
||||
**pikerd_kwargs,
|
||||
|
||||
) -> tractor.Portal:
|
||||
|
|
@ -76,7 +78,7 @@ async def maybe_spawn_daemon(
|
|||
|
||||
# serialize access to this section to avoid
|
||||
# 2 or more tasks racing to create a daemon
|
||||
lock = Services.locks[service_name]
|
||||
lock = _locks[service_name]
|
||||
await lock.acquire()
|
||||
|
||||
try:
|
||||
|
|
@ -112,6 +114,12 @@ async def maybe_spawn_daemon(
|
|||
# service task for that actor.
|
||||
started: bool
|
||||
if pikerd_portal is None:
|
||||
|
||||
# await tractor.pause()
|
||||
if tractor.is_debug():
|
||||
from tractor.devx._debug import maybe_init_greenback
|
||||
await maybe_init_greenback()
|
||||
|
||||
started = await service_task_target(
|
||||
loglevel=loglevel,
|
||||
**spawn_args,
|
||||
|
|
@ -142,14 +150,72 @@ async def maybe_spawn_daemon(
|
|||
async with tractor.wait_for_actor(service_name) as portal:
|
||||
lock.release()
|
||||
yield portal
|
||||
await portal.cancel_actor()
|
||||
# --- ---- ---
|
||||
# XXX NOTE XXX
|
||||
# --- ---- ---
|
||||
# DO NOT PUT A `portal.cancel_actor()` here (as was prior)!
|
||||
#
|
||||
# Doing so will cause an "out-of-band" ctxc
|
||||
# (`tractor.ContextCancelled`) to be raised inside the
|
||||
# `ServiceMngr.open_context_in_task()`'s call to
|
||||
# `ctx.wait_for_result()` AND the internal self-ctxc
|
||||
# "graceful capture" WILL NOT CATCH IT!
|
||||
#
|
||||
# This can cause certain types of operations to raise
|
||||
# that ctxc BEFORE THEY `return`, resulting in
|
||||
# a "false-negative" ctxc being raised when really
|
||||
# nothing actually failed, other then our semantic
|
||||
# "failure" to suppress an expected, graceful,
|
||||
# self-cancel scenario..
|
||||
#
|
||||
# bUt wHy duZ It WorK lIKe dis..
|
||||
# ------------------------------
|
||||
# from the perspective of the `tractor.Context` this
|
||||
# cancel request was conducted "out of band" since
|
||||
# `Context.cancel()` was never called and thus the
|
||||
# `._cancel_called: bool` was never set. Despite the
|
||||
# remote `.canceller` being set to `pikerd` (i.e. the
|
||||
# same `Actor.uid` of the raising service-mngr task) the
|
||||
# service-task's ctx itself was never marked as having
|
||||
# requested cancellation and thus still raises the ctxc
|
||||
# bc it was unaware of any such request.
|
||||
#
|
||||
# How to make grokin these cases easier tho?
|
||||
# ------------------------------------------
|
||||
# Because `Portal.cancel_actor()` was called it requests
|
||||
# "full-`Actor`-runtime-cancellation" of it's peer
|
||||
# process which IS NOT THE SAME as a single inter-actor
|
||||
# RPC task cancelling its local context with a remote
|
||||
# peer `Task` in that same peer process.
|
||||
#
|
||||
# ?TODO? It might be better if we do one (or all) of the
|
||||
# following:
|
||||
#
|
||||
# -[ ] at least set a special message for the
|
||||
# `ContextCancelled` when raised locally by the
|
||||
# unaware ctx task such that we check for the
|
||||
# `.canceller` being *our `Actor`* and in the case
|
||||
# where `Context._cancel_called == False` we specially
|
||||
# note that this is likely an "out-of-band"
|
||||
# runtime-cancel request triggered by some call to
|
||||
# `Portal.cancel_actor()`, possibly even reporting the
|
||||
# exact LOC of that caller by tracking it inside our
|
||||
# portal-type?
|
||||
# -[ ] possibly add another field `ContextCancelled` like
|
||||
# maybe a,
|
||||
# `.request_type: Literal['os', 'proc', 'actor',
|
||||
# 'ctx']` type thing which would allow immediately
|
||||
# being able to tell what kind of cancellation caused
|
||||
# the unexpected ctxc?
|
||||
# -[ ] REMOVE THIS COMMENT, once we've settled on how to
|
||||
# better augment `tractor` to be more explicit on this!
|
||||
|
||||
except BaseException as _err:
|
||||
err = _err
|
||||
if (
|
||||
lock.locked()
|
||||
and
|
||||
lock.statistics().owner is current_task()
|
||||
lock.statistics().owner is trio.lowlevel.current_task()
|
||||
):
|
||||
log.exception(
|
||||
f'Releasing stale lock after crash..?'
|
||||
|
|
@ -170,26 +236,25 @@ async def spawn_emsd(
|
|||
"""
|
||||
log.info('Spawning emsd')
|
||||
|
||||
portal = await Services.actor_n.start_actor(
|
||||
smngr: ServiceMngr = get_service_mngr()
|
||||
portal = await smngr.an.start_actor(
|
||||
'emsd',
|
||||
enable_modules=[
|
||||
'piker.clearing._ems',
|
||||
'piker.clearing._client',
|
||||
],
|
||||
loglevel=loglevel,
|
||||
debug_mode=Services.debug_mode, # set by pikerd flag
|
||||
debug_mode=smngr.debug_mode, # set by pikerd flag
|
||||
**extra_tractor_kwargs
|
||||
)
|
||||
|
||||
# non-blocking setup of clearing service
|
||||
from ..clearing._ems import _setup_persistent_emsd
|
||||
|
||||
await Services.start_service_task(
|
||||
'emsd',
|
||||
portal,
|
||||
|
||||
# signature of target root-task endpoint
|
||||
_setup_persistent_emsd,
|
||||
await smngr.start_service_ctx(
|
||||
name='emsd',
|
||||
portal=portal,
|
||||
ctx_fn=_setup_persistent_emsd,
|
||||
loglevel=loglevel,
|
||||
)
|
||||
return True
|
||||
|
|
|
|||
|
|
@ -18,148 +18,41 @@
|
|||
daemon-service management API.
|
||||
|
||||
"""
|
||||
from collections import defaultdict
|
||||
from typing import (
|
||||
Callable,
|
||||
Any,
|
||||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
from tractor import (
|
||||
current_actor,
|
||||
ContextCancelled,
|
||||
Context,
|
||||
Portal,
|
||||
from tractor.hilevel import (
|
||||
ServiceMngr,
|
||||
# open_service_mngr as _open_service_mngr,
|
||||
get_service_mngr as get_service_mngr,
|
||||
)
|
||||
|
||||
from piker.log import get_logger
|
||||
|
||||
log = get_logger(name=__name__)
|
||||
|
||||
# TODO:
|
||||
# -[ ] factor all the common shit from `.data._sampling`
|
||||
# and `.brokers._daemon` into here / `ServiceMngr`
|
||||
# in terms of allocating the `Portal` as part of the
|
||||
# "service-in-subactor" starting!
|
||||
# -[ ] move to `tractor.hilevel._service`, import and use here!
|
||||
# NOTE: purposely leaks the ref to the mod-scope Bo
|
||||
|
||||
# TODO: we need remote wrapping and a general soln:
|
||||
# - factor this into a ``tractor.highlevel`` extension # pack for the
|
||||
# library.
|
||||
# - wrap a "remote api" wherein you can get a method proxy
|
||||
# to the pikerd actor for starting services remotely!
|
||||
# - prolly rename this to ActorServicesNursery since it spawns
|
||||
# new actors and supervises them to completion?
|
||||
class Services:
|
||||
Services: ServiceMngr|None = None
|
||||
|
||||
actor_n: tractor._supervise.ActorNursery
|
||||
service_n: trio.Nursery
|
||||
debug_mode: bool # tractor sub-actor debug mode flag
|
||||
service_tasks: dict[
|
||||
str,
|
||||
tuple[
|
||||
trio.CancelScope,
|
||||
Portal,
|
||||
trio.Event,
|
||||
]
|
||||
] = {}
|
||||
locks = defaultdict(trio.Lock)
|
||||
@acm
|
||||
async def open_service_mngr(
|
||||
**kwargs,
|
||||
) -> ServiceMngr:
|
||||
|
||||
@classmethod
|
||||
async def start_service_task(
|
||||
self,
|
||||
name: str,
|
||||
portal: Portal,
|
||||
target: Callable,
|
||||
allow_overruns: bool = False,
|
||||
**ctx_kwargs,
|
||||
|
||||
) -> (trio.CancelScope, Context):
|
||||
'''
|
||||
Open a context in a service sub-actor, add to a stack
|
||||
that gets unwound at ``pikerd`` teardown.
|
||||
|
||||
This allows for allocating long-running sub-services in our main
|
||||
daemon and explicitly controlling their lifetimes.
|
||||
|
||||
'''
|
||||
async def open_context_in_task(
|
||||
task_status: TaskStatus[
|
||||
tuple[
|
||||
trio.CancelScope,
|
||||
trio.Event,
|
||||
Any,
|
||||
]
|
||||
] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> Any:
|
||||
|
||||
with trio.CancelScope() as cs:
|
||||
|
||||
async with portal.open_context(
|
||||
target,
|
||||
allow_overruns=allow_overruns,
|
||||
**ctx_kwargs,
|
||||
|
||||
) as (ctx, first):
|
||||
|
||||
# unblock once the remote context has started
|
||||
complete = trio.Event()
|
||||
task_status.started((cs, complete, first))
|
||||
log.info(
|
||||
f'`pikerd` service {name} started with value {first}'
|
||||
)
|
||||
try:
|
||||
# wait on any context's return value
|
||||
# and any final portal result from the
|
||||
# sub-actor.
|
||||
ctx_res: Any = await ctx.wait_for_result()
|
||||
|
||||
# NOTE: blocks indefinitely until cancelled
|
||||
# either by error from the target context
|
||||
# function or by being cancelled here by the
|
||||
# surrounding cancel scope.
|
||||
return (await portal.result(), ctx_res)
|
||||
except ContextCancelled as ctxe:
|
||||
canceller: tuple[str, str] = ctxe.canceller
|
||||
our_uid: tuple[str, str] = current_actor().uid
|
||||
if (
|
||||
canceller != portal.channel.uid
|
||||
and
|
||||
canceller != our_uid
|
||||
):
|
||||
log.cancel(
|
||||
f'Actor-service {name} was remotely cancelled?\n'
|
||||
f'remote canceller: {canceller}\n'
|
||||
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n'
|
||||
)
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
|
||||
finally:
|
||||
await portal.cancel_actor()
|
||||
complete.set()
|
||||
self.service_tasks.pop(name)
|
||||
|
||||
cs, complete, first = await self.service_n.start(open_context_in_task)
|
||||
|
||||
# store the cancel scope and portal for later cancellation or
|
||||
# retstart if needed.
|
||||
self.service_tasks[name] = (cs, portal, complete)
|
||||
|
||||
return cs, first
|
||||
|
||||
@classmethod
|
||||
async def cancel_service(
|
||||
self,
|
||||
name: str,
|
||||
|
||||
) -> Any:
|
||||
'''
|
||||
Cancel the service task and actor for the given ``name``.
|
||||
|
||||
'''
|
||||
log.info(f'Cancelling `pikerd` service {name}')
|
||||
cs, portal, complete = self.service_tasks[name]
|
||||
cs.cancel()
|
||||
await complete.wait()
|
||||
assert name not in self.service_tasks, \
|
||||
f'Serice task for {name} not terminated?'
|
||||
global Services
|
||||
async with tractor.hilevel.open_service_mngr(
|
||||
**kwargs,
|
||||
) as mngr:
|
||||
# Services = proxy(mngr)
|
||||
Services = mngr
|
||||
yield mngr
|
||||
Services = None
|
||||
|
|
|
|||
|
|
@ -22,14 +22,13 @@ from typing import (
|
|||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
# TODO: oof, needs to be changed to `httpx`!
|
||||
import asks
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import docker
|
||||
from ._ahab import DockerContainer
|
||||
from . import (
|
||||
Services,
|
||||
)
|
||||
from . import ServiceMngr
|
||||
|
||||
from piker.log import (
|
||||
get_console_log,
|
||||
|
|
@ -136,7 +135,7 @@ def start_elasticsearch(
|
|||
|
||||
@acm
|
||||
async def start_ahab_daemon(
|
||||
service_mngr: Services,
|
||||
service_mngr: ServiceMngr,
|
||||
user_config: dict | None = None,
|
||||
loglevel: str | None = None,
|
||||
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ import pendulum
|
|||
# import purerpc
|
||||
|
||||
from piker.data.feed import maybe_open_feed
|
||||
from . import Services
|
||||
from . import ServiceMngr
|
||||
from piker.log import (
|
||||
get_console_log,
|
||||
get_logger,
|
||||
|
|
@ -234,7 +234,7 @@ def start_marketstore(
|
|||
|
||||
@acm
|
||||
async def start_ahab_daemon(
|
||||
service_mngr: Services,
|
||||
service_mngr: ServiceMngr,
|
||||
user_config: dict | None = None,
|
||||
loglevel: str | None = None,
|
||||
|
||||
|
|
|
|||
|
|
@ -21,6 +21,4 @@ Extensions to built-in or (heavily used but 3rd party) friend-lib
|
|||
types.
|
||||
|
||||
'''
|
||||
from tractor.msg.pretty_struct import (
|
||||
Struct as Struct,
|
||||
)
|
||||
from tractor.msg import Struct as Struct
|
||||
|
|
|
|||
|
|
@ -76,7 +76,7 @@ dependencies = [
|
|||
"pyvnc",
|
||||
"exchange-calendars>=4.13.1",
|
||||
"ib-async>=2.1.0",
|
||||
"aeventkit>=2.1.0", # XXX, imports as eventkit?
|
||||
"aeventkit>=2.1.0",
|
||||
]
|
||||
# ------ dependencies ------
|
||||
# NOTE, by default we ship only a "headless" deps set bc
|
||||
|
|
@ -203,7 +203,7 @@ pyvnc = { git = "https://github.com/regulad/pyvnc.git" }
|
|||
# xonsh = { git = 'https://github.com/xonsh/xonsh.git', branch = 'main' }
|
||||
|
||||
# XXX since, we're like, always hacking new shite all-the-time. Bp
|
||||
tractor = { git = "https://github.com/goodboy/tractor.git", branch ="main" }
|
||||
tractor = { git = "https://github.com/goodboy/tractor.git", branch = "hilevel_serman" }
|
||||
# tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "piker_pin" }
|
||||
# ------ goodboy ------
|
||||
# hackin dev-envs, usually there's something new he's hackin in..
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ from piker import (
|
|||
config,
|
||||
)
|
||||
from piker.service import (
|
||||
Services,
|
||||
get_service_mngr,
|
||||
)
|
||||
from piker.log import get_console_log
|
||||
|
||||
|
|
@ -135,7 +135,7 @@ async def _open_test_pikerd(
|
|||
) as service_manager,
|
||||
):
|
||||
# this proc/actor is the pikerd
|
||||
assert service_manager is Services
|
||||
assert service_manager is get_service_mngr()
|
||||
|
||||
async with tractor.wait_for_actor(
|
||||
'pikerd',
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ import pytest
|
|||
import tractor
|
||||
from uuid import uuid4
|
||||
|
||||
from piker.service import Services
|
||||
from piker.service import ServiceMngr
|
||||
from piker.log import get_logger
|
||||
from piker.clearing._messages import (
|
||||
Order,
|
||||
|
|
@ -158,7 +158,7 @@ def load_and_check_pos(
|
|||
|
||||
|
||||
def test_ems_err_on_bad_broker(
|
||||
open_test_pikerd: Services,
|
||||
open_test_pikerd: ServiceMngr,
|
||||
loglevel: str,
|
||||
):
|
||||
async def load_bad_fqme():
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import tractor
|
|||
|
||||
from piker.service import (
|
||||
find_service,
|
||||
Services,
|
||||
ServiceMngr,
|
||||
)
|
||||
from piker.data import (
|
||||
open_feed,
|
||||
|
|
@ -44,7 +44,7 @@ def test_runtime_boot(
|
|||
async def main():
|
||||
port = 6666
|
||||
daemon_addr = ('127.0.0.1', port)
|
||||
services: Services
|
||||
services: ServiceMngr
|
||||
|
||||
async with (
|
||||
open_test_pikerd(
|
||||
|
|
|
|||
10
uv.lock
10
uv.lock
|
|
@ -1034,7 +1034,7 @@ requires-dist = [
|
|||
{ name = "tomli", specifier = ">=2.0.1,<3.0.0" },
|
||||
{ name = "tomli-w", specifier = ">=1.0.0,<2.0.0" },
|
||||
{ name = "tomlkit", git = "https://github.com/pikers/tomlkit.git?branch=piker_pin" },
|
||||
{ name = "tractor", git = "https://github.com/goodboy/tractor.git?branch=main" },
|
||||
{ name = "tractor", git = "https://github.com/goodboy/tractor.git?branch=hilevel_serman" },
|
||||
{ name = "trio", specifier = ">=0.27" },
|
||||
{ name = "trio-typing", specifier = ">=0.10.0" },
|
||||
{ name = "trio-util", specifier = ">=0.7.0,<0.8.0" },
|
||||
|
|
@ -1080,11 +1080,11 @@ uis = [
|
|||
|
||||
[[package]]
|
||||
name = "platformdirs"
|
||||
version = "4.6.0"
|
||||
version = "4.9.4"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/20/e5/474d0a8508029286b905622e6929470fb84337cfa08f9d09fbb624515249/platformdirs-4.6.0.tar.gz", hash = "sha256:4a13c2db1071e5846c3b3e04e5b095c0de36b2a24be9a3bc0145ca66fce4e328", size = 23433, upload-time = "2026-02-12T14:36:21.288Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/19/56/8d4c30c8a1d07013911a8fdbd8f89440ef9f08d07a1b50ab8ca8be5a20f9/platformdirs-4.9.4.tar.gz", hash = "sha256:1ec356301b7dc906d83f371c8f487070e99d3ccf9e501686456394622a01a934", size = 28737, upload-time = "2026-03-05T18:34:13.271Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/da/10/1b0dcf51427326f70e50d98df21b18c228117a743a1fc515a42f8dc7d342/platformdirs-4.6.0-py3-none-any.whl", hash = "sha256:dd7f808d828e1764a22ebff09e60f175ee3c41876606a6132a688d809c7c9c73", size = 19549, upload-time = "2026-02-12T14:36:19.743Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/63/d7/97f7e3a6abb67d8080dd406fd4df842c2be0efaf712d1c899c32a075027c/platformdirs-4.9.4-py3-none-any.whl", hash = "sha256:68a9a4619a666ea6439f2ff250c12a853cd1cbd5158d258bd824a7df6be2f868", size = 21216, upload-time = "2026-03-05T18:34:12.172Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
@ -1676,7 +1676,7 @@ wheels = [
|
|||
[[package]]
|
||||
name = "tractor"
|
||||
version = "0.1.0a6.dev0"
|
||||
source = { git = "https://github.com/goodboy/tractor.git?branch=main#e77198bb64f0467a50e251ed140daee439752354" }
|
||||
source = { git = "https://github.com/goodboy/tractor.git?branch=hilevel_serman#52fe3083704a7f26c37abadfdf02592fad2d1f81" }
|
||||
dependencies = [
|
||||
{ name = "bidict" },
|
||||
{ name = "cffi" },
|
||||
|
|
|
|||
Loading…
Reference in New Issue