diff --git a/piker/brokers/_daemon.py b/piker/brokers/_daemon.py index 5efb03dd..c02ed856 100644 --- a/piker/brokers/_daemon.py +++ b/piker/brokers/_daemon.py @@ -23,6 +23,7 @@ from __future__ import annotations from contextlib import ( asynccontextmanager as acm, ) +from functools import partial from types import ModuleType from typing import ( TYPE_CHECKING, @@ -60,12 +61,13 @@ async def _setup_persistent_brokerd( ctx: tractor.Context, brokername: str, loglevel: str | None = None, + debug_mode: bool = False, ) -> None: ''' - Allocate a actor-wide service nursery in ``brokerd`` - such that feeds can be run in the background persistently by - the broker backend as needed. + Allocate a actor-wide service nursery in `brokerd` such that + feeds can be run in the background persistently by the broker + backend as needed. ''' # NOTE: we only need to setup logging once (and only) here @@ -86,6 +88,18 @@ async def _setup_persistent_brokerd( from piker.data import feed assert not feed._bus + if ( + debug_mode + and + tractor.current_actor().is_infected_aio() + ): + # NOTE, whenever running `asyncio` in provider's actor + # runtime be sure we enabled `breakpoint()` support + # for non-`trio.Task` usage. + from tractor.devx._debug import maybe_init_greenback + await maybe_init_greenback() + # breakpoint() # XXX, SHOULD WORK from `trio.Task`! + # allocate a nursery to the bus for spawning background # tasks to service client IPC requests, normally # `tractor.Context` connections to explicitly required @@ -145,18 +159,21 @@ def broker_init( above. ''' - from ..brokers import get_brokermod - brokermod = get_brokermod(brokername) + brokermod: ModuleType = get_brokermod(brokername) modpath: str = brokermod.__name__ - - start_actor_kwargs['name'] = f'brokerd.{brokername}' - start_actor_kwargs.update( - getattr( - brokermod, - '_spawn_kwargs', - {}, - ) + spawn_kws: dict = getattr( + brokermod, + '_spawn_kwargs', + {}, ) + # ^^ NOTE, here we pull any runtime parameters specific + # to spawning the sub-actor for the backend. For ex. + # both `ib` and `deribit` rely on, + # `'infect_asyncio': True,` since they both + # use `tractor`'s "infected `asyncio` mode" + # for their libs but you could also do something like + # `'debug_mode: True` which would be like passing + # `--pdb` for just that provider backend. # XXX TODO: make this not so hacky/monkeypatched.. # -> we need a sane way to configure the logging level for all @@ -166,8 +183,7 @@ def broker_init( # lookup actor-enabled modules declared by the backend offering the # `brokerd` endpoint(s). - enabled: list[str] - enabled = start_actor_kwargs['enable_modules'] = [ + enabled: list[str] = [ __name__, # so that eps from THIS mod can be invoked modpath, ] @@ -179,9 +195,13 @@ def broker_init( subpath: str = f'{modpath}.{submodname}' enabled.append(subpath) + datad_kwargs: dict = { + 'name': f'brokerd.{brokername}', + 'enable_modules': enabled, + } return ( brokermod, - start_actor_kwargs, # to `ActorNursery.start_actor()` + start_actor_kwargs | datad_kwargs | spawn_kws, # to `ActorNursery.start_actor()` # XXX see impl above; contains all (actor global) # setup/teardown expected in all `brokerd` actor instances. @@ -190,14 +210,17 @@ def broker_init( async def spawn_brokerd( - brokername: str, loglevel: str | None = None, **tractor_kwargs, ) -> bool: + ''' + Spawn a `brokerd.` subactor service daemon + using `pikerd`'s service mngr. + ''' from piker.service._util import log # use service mngr log log.info(f'Spawning {brokername} broker daemon') @@ -211,33 +234,41 @@ async def spawn_brokerd( **tractor_kwargs, ) - brokermod = get_brokermod(brokername) - extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) - tractor_kwargs.update(extra_tractor_kwargs) - # ask `pikerd` to spawn a new sub-actor and manage it under its # actor nursery - from piker.service import Services - + from piker.service import ( + get_service_mngr, + ServiceMngr, + ) dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}' - portal = await Services.actor_n.start_actor( - dname, - enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'), - debug_mode=Services.debug_mode, + mngr: ServiceMngr = get_service_mngr() + ctx: tractor.Context = await mngr.start_service( + daemon_name=dname, + ctx_ep=partial( + # signature of target root-task endpoint + daemon_fixture_ep, + + # passed to daemon_fixture_ep(**kwargs) + brokername=brokername, + loglevel=loglevel, + debug_mode=mngr.debug_mode, + ), + debug_mode=mngr.debug_mode, + # ^TODO, allow overriding this per-daemon from client side? + # |_ it's already supported in `tractor` so.. + + loglevel=loglevel, + enable_modules=( + _data_mods + + + tractor_kwargs.pop('enable_modules') + ), **tractor_kwargs ) - - # NOTE: the service mngr expects an already spawned actor + its - # portal ref in order to do non-blocking setup of brokerd - # service nursery. - await Services.start_service_task( - dname, - portal, - - # signature of target root-task endpoint - daemon_fixture_ep, - brokername=brokername, - loglevel=loglevel, + assert ( + not ctx.cancel_called + and ctx.portal # parent side + and dname in ctx.chan.uid # subactor is named as desired ) return True @@ -262,8 +293,7 @@ async def maybe_spawn_brokerd( from piker.service import maybe_spawn_daemon async with maybe_spawn_daemon( - - f'brokerd.{brokername}', + service_name=f'brokerd.{brokername}', service_task_target=spawn_brokerd, spawn_args={ 'brokername': brokername, diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index dde7f83c..0ba7bc09 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -140,11 +140,10 @@ def pikerd( if pdb: log.warning(( - "\n" - "!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n" - "When a `piker` daemon crashes it will block the " - "task-thread until resumed from console!\n" - "\n" + '\n\n' + '!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n' + 'When a `piker` daemon crashes it will block the ' + 'task-thread until resumed from console!\n' )) # service-actor registry endpoint socket-address set @@ -177,7 +176,7 @@ def pikerd( from .. import service async def main(): - service_mngr: service.Services + service_mngr: service.ServiceMngr async with ( service.open_pikerd( @@ -335,7 +334,7 @@ def services(config, tl, ports): name='service_query', loglevel=config['loglevel'] if tl else None, ), - tractor.get_arbiter( + tractor.get_registry( host=host, port=ports[0] ) as portal diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 7bb0231d..093e19cf 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -25,6 +25,7 @@ from collections import ( defaultdict, ) from contextlib import asynccontextmanager as acm +from functools import partial import time from typing import ( Any, @@ -42,7 +43,7 @@ from tractor.trionics import ( maybe_open_nursery, ) import trio -from trio_typing import TaskStatus +from trio import TaskStatus from .ticktools import ( frame_ticks, @@ -70,6 +71,7 @@ if TYPE_CHECKING: _default_delay_s: float = 1.0 +# TODO: use new `tractor.singleton_acm` API for this! class Sampler: ''' Global sampling engine registry. @@ -79,9 +81,9 @@ class Sampler: This non-instantiated type is meant to be a singleton within a `samplerd` actor-service spawned once by the user wishing to - time-step-sample (real-time) quote feeds, see - ``.service.maybe_open_samplerd()`` and the below - ``register_with_sampler()``. + time-step-sample a (real-time) quote feeds, see + `.service.maybe_open_samplerd()` and the below + `register_with_sampler()`. ''' service_nursery: None | trio.Nursery = None @@ -95,6 +97,12 @@ class Sampler: # history loading. incr_task_cs: trio.CancelScope | None = None + bcast_errors: tuple[Exception] = ( + trio.BrokenResourceError, + trio.ClosedResourceError, + trio.EndOfChannel, + ) + # holds all the ``tractor.Context`` remote subscriptions for # a particular sample period increment event: all subscribers are # notified on a step. @@ -258,14 +266,15 @@ class Sampler: subs: set last_ts, subs = pair - task = trio.lowlevel.current_task() - log.debug( - f'SUBS {self.subscribers}\n' - f'PAIR {pair}\n' - f'TASK: {task}: {id(task)}\n' - f'broadcasting {period_s} -> {last_ts}\n' - # f'consumers: {subs}' - ) + # NOTE, for debugging pub-sub issues + # task = trio.lowlevel.current_task() + # log.debug( + # f'AlL-SUBS@{period_s!r}: {self.subscribers}\n' + # f'PAIR: {pair}\n' + # f'TASK: {task}: {id(task)}\n' + # f'broadcasting {period_s} -> {last_ts}\n' + # f'consumers: {subs}' + # ) borked: set[MsgStream] = set() sent: set[MsgStream] = set() while True: @@ -282,12 +291,11 @@ class Sampler: await stream.send(msg) sent.add(stream) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): + except self.bcast_errors as err: log.error( - f'{stream._ctx.chan.uid} dropped connection' + f'Connection dropped for IPC ctx\n' + f'{stream._ctx}\n\n' + f'Due to {type(err)}' ) borked.add(stream) else: @@ -375,7 +383,10 @@ async def register_with_sampler( assert Sampler.ohlcv_shms # unblock caller - await ctx.started(set(Sampler.ohlcv_shms.keys())) + await ctx.started( + # XXX bc msgpack only allows one array type! + list(Sampler.ohlcv_shms.keys()) + ) if open_index_stream: try: @@ -394,7 +405,8 @@ async def register_with_sampler( finally: if ( sub_for_broadcasts - and subs + and + subs ): try: subs.remove(stream) @@ -419,7 +431,6 @@ async def register_with_sampler( async def spawn_samplerd( - loglevel: str | None = None, **extra_tractor_kwargs @@ -429,7 +440,10 @@ async def spawn_samplerd( update and increment count write and stream broadcasting. ''' - from piker.service import Services + from piker.service import ( + get_service_mngr, + ServiceMngr, + ) dname = 'samplerd' log.info(f'Spawning `{dname}`') @@ -437,26 +451,33 @@ async def spawn_samplerd( # singleton lock creation of ``samplerd`` since we only ever want # one daemon per ``pikerd`` proc tree. # TODO: make this built-into the service api? - async with Services.locks[dname + '_singleton']: + mngr: ServiceMngr = get_service_mngr() + already_started: bool = dname in mngr.service_tasks - if dname not in Services.service_tasks: - - portal = await Services.actor_n.start_actor( - dname, - enable_modules=[ - 'piker.data._sampling', - ], - loglevel=loglevel, - debug_mode=Services.debug_mode, # set by pikerd flag - **extra_tractor_kwargs - ) - - await Services.start_service_task( - dname, - portal, + async with mngr._locks[dname + '_singleton']: + ctx: Context = await mngr.start_service( + daemon_name=dname, + ctx_ep=partial( register_with_sampler, period_s=1, sub_for_broadcasts=False, + ), + debug_mode=mngr.debug_mode, # set by pikerd flag + + # proxy-through to tractor + enable_modules=[ + 'piker.data._sampling', + ], + loglevel=loglevel, + **extra_tractor_kwargs + ) + if not already_started: + assert ( + ctx + and + ctx.portal + and + not ctx.cancel_called ) return True @@ -561,8 +582,7 @@ async def open_sample_stream( async def sample_and_broadcast( - - bus: _FeedsBus, # noqa + bus: _FeedsBus, rt_shm: ShmArray, hist_shm: ShmArray, quote_stream: trio.abc.ReceiveChannel, @@ -582,11 +602,33 @@ async def sample_and_broadcast( overruns = Counter() + # NOTE, only used for debugging live-data-feed issues, though + # this should be resolved more correctly in the future using the + # new typed-msgspec feats of `tractor`! + # + # XXX, a multiline nested `dict` formatter (since rn quote-msgs + # are just that). + # pfmt: Callable[[str], str] = mk_repr() + # iterate stream delivered by broker async for quotes in quote_stream: # print(quotes) - # TODO: ``numba`` this! + # XXX WARNING XXX only enable for debugging bc ow can cost + # ALOT of perf with HF-feedz!!! + # + # log.info( + # 'Rx live quotes:\n' + # f'{pfmt(quotes)}' + # ) + + # TODO, + # -[ ] `numba` or `cython`-nize this loop possibly? + # |_alternatively could we do it in rust somehow by upacking + # arrow msgs instead of using `msgspec`? + # -[ ] use `msgspec.Struct` support in new typed-msging from + # `tractor` to ensure only allowed msgs are transmitted? + # for broker_symbol, quote in quotes.items(): # TODO: in theory you can send the IPC msg *before* writing # to the sharedmem array to decrease latency, however, that @@ -659,6 +701,21 @@ async def sample_and_broadcast( sub_key: str = broker_symbol.lower() subs: set[Sub] = bus.get_subs(sub_key) + # TODO, figure out how to make this useful whilst + # incoporating feed "pausing" .. + # + # if not subs: + # all_bs_fqmes: list[str] = list( + # bus._subscribers.keys() + # ) + # log.warning( + # f'No subscribers for {brokername!r} live-quote ??\n' + # f'broker_symbol: {broker_symbol}\n\n' + + # f'Maybe the backend-sys symbol does not match one of,\n' + # f'{pfmt(all_bs_fqmes)}\n' + # ) + # NOTE: by default the broker backend doesn't append # it's own "name" into the fqme schema (but maybe it # should?) so we have to manually generate the correct @@ -728,18 +785,14 @@ async def sample_and_broadcast( if lags > 10: await tractor.pause() - except ( - trio.BrokenResourceError, - trio.ClosedResourceError, - trio.EndOfChannel, - ): + except Sampler.bcast_errors as ipc_err: ctx: Context = ipc._ctx chan: Channel = ctx.chan if ctx: log.warning( - 'Dropped `brokerd`-quotes-feed connection:\n' - f'{broker_symbol}:' - f'{ctx.cid}@{chan.uid}' + f'Dropped `brokerd`-feed for {broker_symbol!r} due to,\n' + f'x>) {ctx.cid}@{chan.uid}' + f'|_{ipc_err!r}\n\n' ) if sub.throttle_rate: assert ipc._closed @@ -756,12 +809,11 @@ async def sample_and_broadcast( async def uniform_rate_send( - rate: float, quote_stream: trio.abc.ReceiveChannel, stream: MsgStream, - task_status: TaskStatus = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: ''' @@ -779,13 +831,16 @@ async def uniform_rate_send( https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 ''' - # TODO: compute the approx overhead latency per cycle - left_to_sleep = throttle_period = 1/rate - 0.000616 + # ?TODO? dynamically compute the **actual** approx overhead latency per cycle + # instead of this magic # bidinezz? + throttle_period: float = 1/rate - 0.000616 + left_to_sleep: float = throttle_period # send cycle state + first_quote: dict|None first_quote = last_quote = None - last_send = time.time() - diff = 0 + last_send: float = time.time() + diff: float = 0 task_status.started() ticks_by_type: dict[ @@ -796,22 +851,28 @@ async def uniform_rate_send( clear_types = _tick_groups['clears'] while True: - # compute the remaining time to sleep for this throttled cycle - left_to_sleep = throttle_period - diff + left_to_sleep: float = throttle_period - diff if left_to_sleep > 0: + cs: trio.CancelScope with trio.move_on_after(left_to_sleep) as cs: + sym: str + last_quote: dict try: sym, last_quote = await quote_stream.receive() except trio.EndOfChannel: - log.exception(f"feed for {stream} ended?") + log.exception( + f'Live stream for feed for ended?\n' + f'<=c\n' + f' |_[{stream!r}\n' + ) break - diff = time.time() - last_send + diff: float = time.time() - last_send if not first_quote: - first_quote = last_quote + first_quote: float = last_quote # first_quote['tbt'] = ticks_by_type if (throttle_period - diff) > 0: @@ -872,7 +933,9 @@ async def uniform_rate_send( # TODO: now if only we could sync this to the display # rate timing exactly lul try: - await stream.send({sym: first_quote}) + await stream.send({ + sym: first_quote + }) except tractor.RemoteActorError as rme: if rme.type is not tractor._exceptions.StreamOverrun: raise @@ -883,19 +946,28 @@ async def uniform_rate_send( f'{sym}:{ctx.cid}@{chan.uid}' ) + # NOTE: any of these can be raised by `tractor`'s IPC + # transport-layer and we want to be highly resilient + # to consumers which crash or lose network connection. + # I.e. we **DO NOT** want to crash and propagate up to + # ``pikerd`` these kinds of errors! except ( - # NOTE: any of these can be raised by ``tractor``'s IPC - # transport-layer and we want to be highly resilient - # to consumers which crash or lose network connection. - # I.e. we **DO NOT** want to crash and propagate up to - # ``pikerd`` these kinds of errors! - trio.ClosedResourceError, - trio.BrokenResourceError, ConnectionResetError, - ): - # if the feed consumer goes down then drop - # out of this rate limiter - log.warning(f'{stream} closed') + ) + Sampler.bcast_errors as ipc_err: + match ipc_err: + case trio.EndOfChannel(): + log.info( + f'{stream} terminated by peer,\n' + f'{ipc_err!r}' + ) + case _: + # if the feed consumer goes down then drop + # out of this rate limiter + log.warning( + f'{stream} closed due to,\n' + f'{ipc_err!r}' + ) + await stream.aclose() return diff --git a/piker/log.py b/piker/log.py index 56776e1e..dc5cfc59 100644 --- a/piker/log.py +++ b/piker/log.py @@ -19,6 +19,10 @@ Log like a forester! """ import logging import json +import reprlib +from typing import ( + Callable, +) import tractor from pygments import ( @@ -84,3 +88,29 @@ def colorize_json( # likeable styles: algol_nu, tango, monokai formatters.TerminalTrueColorFormatter(style=style) ) + + +# TODO, eventually defer to the version in `modden` once +# it becomes a dep! +def mk_repr( + **repr_kws, +) -> Callable[[str], str]: + ''' + Allocate and deliver a `repr.Repr` instance with provided input + settings using the std-lib's `reprlib` mod, + * https://docs.python.org/3/library/reprlib.html + + ------ Ex. ------ + An up to 6-layer-nested `dict` as multi-line: + - https://stackoverflow.com/a/79102479 + - https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel + + ''' + def_kws: dict[str, int] = dict( + indent=2, + maxlevel=6, # recursion levels + maxstring=66, # match editor line-len limit + ) + def_kws |= repr_kws + reprr = reprlib.Repr(**def_kws) + return reprr.repr diff --git a/piker/service/__init__.py b/piker/service/__init__.py index 29360620..beb9c70b 100644 --- a/piker/service/__init__.py +++ b/piker/service/__init__.py @@ -30,7 +30,11 @@ Actor runtime primtives and (distributed) service APIs for, => TODO: maybe to (re)move elsewhere? ''' -from ._mngr import Services as Services +from ._mngr import ( + get_service_mngr as get_service_mngr, + open_service_mngr as open_service_mngr, + ServiceMngr as ServiceMngr, +) from ._registry import ( _tractor_kwargs as _tractor_kwargs, _default_reg_addr as _default_reg_addr, diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index a4e3ccf2..42440f82 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -21,7 +21,6 @@ from __future__ import annotations import os from typing import ( - Optional, Any, ClassVar, ) @@ -30,13 +29,13 @@ from contextlib import ( ) import tractor -import trio from ._util import ( get_console_log, ) from ._mngr import ( - Services, + open_service_mngr, + ServiceMngr, ) from ._registry import ( # noqa _tractor_kwargs, @@ -59,7 +58,7 @@ async def open_piker_runtime( registry_addrs: list[tuple[str, int]] = [], enable_modules: list[str] = [], - loglevel: Optional[str] = None, + loglevel: str|None = None, # XXX NOTE XXX: you should pretty much never want debug mode # for data daemons when running in production. @@ -69,7 +68,7 @@ async def open_piker_runtime( # and spawn the service tree distributed per that. start_method: str = 'trio', - tractor_runtime_overrides: dict | None = None, + tractor_runtime_overrides: dict|None = None, **tractor_kwargs, ) -> tuple[ @@ -119,6 +118,10 @@ async def open_piker_runtime( # spawn other specialized daemons I think? enable_modules=enable_modules, + # TODO: how to configure this? + # keep it on by default if debug mode is set? + # maybe_enable_greenback=debug_mode, + **tractor_kwargs, ) as actor, @@ -167,12 +170,13 @@ async def open_pikerd( **kwargs, -) -> Services: +) -> ServiceMngr: ''' - Start a root piker daemon with an indefinite lifetime. + Start a root piker daemon actor (aka `pikerd`) with an indefinite + lifetime. - A root actor nursery is created which can be used to create and keep - alive underling services (see below). + A root actor-nursery is created which can be used to spawn and + supervise underling service sub-actors (see below). ''' # NOTE: for the root daemon we always enable the root @@ -199,8 +203,6 @@ async def open_pikerd( root_actor, reg_addrs, ), - tractor.open_nursery() as actor_nursery, - trio.open_nursery() as service_nursery, ): for addr in reg_addrs: if addr not in root_actor.accept_addrs: @@ -209,25 +211,17 @@ async def open_pikerd( 'Maybe you have another daemon already running?' ) - # assign globally for future daemon/task creation - Services.actor_n = actor_nursery - Services.service_n = service_nursery - Services.debug_mode = debug_mode - - try: - yield Services - - finally: - # TODO: is this more clever/efficient? - # if 'samplerd' in Services.service_tasks: - # await Services.cancel_service('samplerd') - service_nursery.cancel_scope.cancel() + mngr: ServiceMngr + async with open_service_mngr( + debug_mode=debug_mode, + ) as mngr: + yield mngr # TODO: do we even need this? # @acm # async def maybe_open_runtime( -# loglevel: Optional[str] = None, +# loglevel: str|None = None, # **kwargs, # ) -> None: @@ -256,7 +250,7 @@ async def maybe_open_pikerd( loglevel: str | None = None, **kwargs, -) -> tractor._portal.Portal | ClassVar[Services]: +) -> tractor._portal.Portal | ClassVar[ServiceMngr]: ''' If no ``pikerd`` daemon-root-actor can be found start it and yield up (we should probably figure out returning a portal to self diff --git a/piker/service/_ahab.py b/piker/service/_ahab.py index 4cccf855..0bdd1688 100644 --- a/piker/service/_ahab.py +++ b/piker/service/_ahab.py @@ -49,7 +49,7 @@ from requests.exceptions import ( ReadTimeout, ) -from ._mngr import Services +from ._mngr import ServiceMngr from ._util import ( log, # sub-sys logger get_console_log, @@ -453,7 +453,7 @@ async def open_ahabd( @acm async def start_ahab_service( - services: Services, + services: ServiceMngr, service_name: str, # endpoint config passed as **kwargs @@ -549,7 +549,8 @@ async def start_ahab_service( log.warning('Failed to cancel root permsed container') except ( - trio.MultiError, + # trio.MultiError, + ExceptionGroup, ) as err: for subexc in err.exceptions: if isinstance(subexc, PermissionError): diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py index 1e7ff096..b881afc6 100644 --- a/piker/service/_daemon.py +++ b/piker/service/_daemon.py @@ -26,14 +26,17 @@ from typing import ( from contextlib import ( asynccontextmanager as acm, ) +from collections import defaultdict import tractor +import trio from ._util import ( log, # sub-sys logger ) from ._mngr import ( - Services, + get_service_mngr, + ServiceMngr, ) from ._actor_runtime import maybe_open_pikerd from ._registry import find_service @@ -41,15 +44,14 @@ from ._registry import find_service @acm async def maybe_spawn_daemon( - service_name: str, service_task_target: Callable, - spawn_args: dict[str, Any], loglevel: str | None = None, singleton: bool = False, + _locks = defaultdict(trio.Lock), **pikerd_kwargs, ) -> tractor.Portal: @@ -67,7 +69,7 @@ async def maybe_spawn_daemon( ''' # serialize access to this section to avoid # 2 or more tasks racing to create a daemon - lock = Services.locks[service_name] + lock = _locks[service_name] await lock.acquire() async with find_service( @@ -102,6 +104,12 @@ async def maybe_spawn_daemon( # service task for that actor. started: bool if pikerd_portal is None: + + # await tractor.pause() + if tractor_kwargs.get('debug_mode', False): + from tractor.devx._debug import maybe_init_greenback + await maybe_init_greenback() + started = await service_task_target( loglevel=loglevel, **spawn_args, @@ -132,7 +140,65 @@ async def maybe_spawn_daemon( async with tractor.wait_for_actor(service_name) as portal: lock.release() yield portal - await portal.cancel_actor() + # --- ---- --- + # XXX NOTE XXX + # --- ---- --- + # DO NOT PUT A `portal.cancel_actor()` here (as was prior)! + # + # Doing so will cause an "out-of-band" ctxc + # (`tractor.ContextCancelled`) to be raised inside the + # `ServiceMngr.open_context_in_task()`'s call to + # `ctx.wait_for_result()` AND the internal self-ctxc + # "graceful capture" WILL NOT CATCH IT! + # + # This can cause certain types of operations to raise + # that ctxc BEFORE THEY `return`, resulting in + # a "false-negative" ctxc being raised when really + # nothing actually failed, other then our semantic + # "failure" to suppress an expected, graceful, + # self-cancel scenario.. + # + # bUt wHy duZ It WorK lIKe dis.. + # ------------------------------ + # from the perspective of the `tractor.Context` this + # cancel request was conducted "out of band" since + # `Context.cancel()` was never called and thus the + # `._cancel_called: bool` was never set. Despite the + # remote `.canceller` being set to `pikerd` (i.e. the + # same `Actor.uid` of the raising service-mngr task) the + # service-task's ctx itself was never marked as having + # requested cancellation and thus still raises the ctxc + # bc it was unaware of any such request. + # + # How to make grokin these cases easier tho? + # ------------------------------------------ + # Because `Portal.cancel_actor()` was called it requests + # "full-`Actor`-runtime-cancellation" of it's peer + # process which IS NOT THE SAME as a single inter-actor + # RPC task cancelling its local context with a remote + # peer `Task` in that same peer process. + # + # ?TODO? It might be better if we do one (or all) of the + # following: + # + # -[ ] at least set a special message for the + # `ContextCancelled` when raised locally by the + # unaware ctx task such that we check for the + # `.canceller` being *our `Actor`* and in the case + # where `Context._cancel_called == False` we specially + # note that this is likely an "out-of-band" + # runtime-cancel request triggered by some call to + # `Portal.cancel_actor()`, possibly even reporting the + # exact LOC of that caller by tracking it inside our + # portal-type? + # -[ ] possibly add another field `ContextCancelled` like + # maybe a, + # `.request_type: Literal['os', 'proc', 'actor', + # 'ctx']` type thing which would allow immediately + # being able to tell what kind of cancellation caused + # the unexpected ctxc? + # -[ ] REMOVE THIS COMMENT, once we've settled on how to + # better augment `tractor` to be more explicit on this! async def spawn_emsd( @@ -147,26 +213,25 @@ async def spawn_emsd( """ log.info('Spawning emsd') - portal = await Services.actor_n.start_actor( + smngr: ServiceMngr = get_service_mngr() + portal = await smngr.an.start_actor( 'emsd', enable_modules=[ 'piker.clearing._ems', 'piker.clearing._client', ], loglevel=loglevel, - debug_mode=Services.debug_mode, # set by pikerd flag + debug_mode=smngr.debug_mode, # set by pikerd flag **extra_tractor_kwargs ) # non-blocking setup of clearing service from ..clearing._ems import _setup_persistent_emsd - await Services.start_service_task( - 'emsd', - portal, - - # signature of target root-task endpoint - _setup_persistent_emsd, + await smngr.start_service_ctx( + name='emsd', + portal=portal, + ctx_fn=_setup_persistent_emsd, loglevel=loglevel, ) return True diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py index 89e98411..9557a828 100644 --- a/piker/service/_mngr.py +++ b/piker/service/_mngr.py @@ -18,148 +18,36 @@ daemon-service management API. """ -from collections import defaultdict -from typing import ( - Callable, - Any, +from contextlib import ( + asynccontextmanager as acm, ) -import trio -from trio_typing import TaskStatus import tractor -from tractor import ( - current_actor, - ContextCancelled, - Context, - Portal, +from tractor.hilevel import ( + ServiceMngr, + # open_service_mngr as _open_service_mngr, + get_service_mngr as get_service_mngr, ) +# TODO: +# -[ ] factor all the common shit from `.data._sampling` +# and `.brokers._daemon` into here / `ServiceMngr` +# in terms of allocating the `Portal` as part of the +# "service-in-subactor" starting! +# -[ ] move to `tractor.hilevel._service`, import and use here! +# NOTE: purposely leaks the ref to the mod-scope Bo -from ._util import ( - log, # sub-sys logger -) +Services: ServiceMngr|None = None +@acm +async def open_service_mngr( + **kwargs, +) -> ServiceMngr: -# TODO: we need remote wrapping and a general soln: -# - factor this into a ``tractor.highlevel`` extension # pack for the -# library. -# - wrap a "remote api" wherein you can get a method proxy -# to the pikerd actor for starting services remotely! -# - prolly rename this to ActorServicesNursery since it spawns -# new actors and supervises them to completion? -class Services: - - actor_n: tractor._supervise.ActorNursery - service_n: trio.Nursery - debug_mode: bool # tractor sub-actor debug mode flag - service_tasks: dict[ - str, - tuple[ - trio.CancelScope, - Portal, - trio.Event, - ] - ] = {} - locks = defaultdict(trio.Lock) - - @classmethod - async def start_service_task( - self, - name: str, - portal: Portal, - target: Callable, - allow_overruns: bool = False, - **ctx_kwargs, - - ) -> (trio.CancelScope, Context): - ''' - Open a context in a service sub-actor, add to a stack - that gets unwound at ``pikerd`` teardown. - - This allows for allocating long-running sub-services in our main - daemon and explicitly controlling their lifetimes. - - ''' - async def open_context_in_task( - task_status: TaskStatus[ - tuple[ - trio.CancelScope, - trio.Event, - Any, - ] - ] = trio.TASK_STATUS_IGNORED, - - ) -> Any: - - with trio.CancelScope() as cs: - - async with portal.open_context( - target, - allow_overruns=allow_overruns, - **ctx_kwargs, - - ) as (ctx, first): - - # unblock once the remote context has started - complete = trio.Event() - task_status.started((cs, complete, first)) - log.info( - f'`pikerd` service {name} started with value {first}' - ) - try: - # wait on any context's return value - # and any final portal result from the - # sub-actor. - ctx_res: Any = await ctx.result() - - # NOTE: blocks indefinitely until cancelled - # either by error from the target context - # function or by being cancelled here by the - # surrounding cancel scope. - return (await portal.result(), ctx_res) - except ContextCancelled as ctxe: - canceller: tuple[str, str] = ctxe.canceller - our_uid: tuple[str, str] = current_actor().uid - if ( - canceller != portal.channel.uid - and - canceller != our_uid - ): - log.cancel( - f'Actor-service {name} was remotely cancelled?\n' - f'remote canceller: {canceller}\n' - f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n' - ) - else: - raise - - - - finally: - await portal.cancel_actor() - complete.set() - self.service_tasks.pop(name) - - cs, complete, first = await self.service_n.start(open_context_in_task) - - # store the cancel scope and portal for later cancellation or - # retstart if needed. - self.service_tasks[name] = (cs, portal, complete) - - return cs, first - - @classmethod - async def cancel_service( - self, - name: str, - - ) -> Any: - ''' - Cancel the service task and actor for the given ``name``. - - ''' - log.info(f'Cancelling `pikerd` service {name}') - cs, portal, complete = self.service_tasks[name] - cs.cancel() - await complete.wait() - assert name not in self.service_tasks, \ - f'Serice task for {name} not terminated?' + global Services + async with tractor.hilevel.open_service_mngr( + **kwargs, + ) as mngr: + # Services = proxy(mngr) + Services = mngr + yield mngr + Services = None diff --git a/piker/service/elastic.py b/piker/service/elastic.py index 902f4fde..b1a13722 100644 --- a/piker/service/elastic.py +++ b/piker/service/elastic.py @@ -21,11 +21,13 @@ from typing import ( TYPE_CHECKING, ) +# TODO: oof, needs to be changed to `httpx`! import asks if TYPE_CHECKING: import docker from ._ahab import DockerContainer + from . import ServiceMngr from ._util import log # sub-sys logger from ._util import ( @@ -127,7 +129,7 @@ def start_elasticsearch( @acm async def start_ahab_daemon( - service_mngr: Services, + service_mngr: ServiceMngr, user_config: dict | None = None, loglevel: str | None = None, diff --git a/piker/service/marketstore.py b/piker/service/marketstore.py index c9f49420..852b967c 100644 --- a/piker/service/marketstore.py +++ b/piker/service/marketstore.py @@ -53,7 +53,7 @@ import pendulum # import purerpc from ..data.feed import maybe_open_feed -from . import Services +from . import ServiceMngr from ._util import ( log, # sub-sys logger get_console_log, @@ -233,7 +233,7 @@ def start_marketstore( @acm async def start_ahab_daemon( - service_mngr: Services, + service_mngr: ServiceMngr, user_config: dict | None = None, loglevel: str | None = None, diff --git a/piker/types.py b/piker/types.py index cda3fb44..385f83b0 100644 --- a/piker/types.py +++ b/piker/types.py @@ -21,230 +21,4 @@ Extensions to built-in or (heavily used but 3rd party) friend-lib types. ''' -from __future__ import annotations -from collections import UserList -from pprint import ( - saferepr, -) -from typing import Any - -from msgspec import ( - msgpack, - Struct as _Struct, - structs, -) - - -class DiffDump(UserList): - ''' - Very simple list delegator that repr() dumps (presumed) tuple - elements of the form `tuple[str, Any, Any]` in a nice - multi-line readable form for analyzing `Struct` diffs. - - ''' - def __repr__(self) -> str: - if not len(self): - return super().__repr__() - - # format by displaying item pair's ``repr()`` on multiple, - # indented lines such that they are more easily visually - # comparable when printed to console when printed to - # console. - repstr: str = '[\n' - for k, left, right in self: - repstr += ( - f'({k},\n' - f'\t{repr(left)},\n' - f'\t{repr(right)},\n' - ')\n' - ) - repstr += ']\n' - return repstr - - -class Struct( - _Struct, - - # https://jcristharif.com/msgspec/structs.html#tagged-unions - # tag='pikerstruct', - # tag=True, -): - ''' - A "human friendlier" (aka repl buddy) struct subtype. - - ''' - def _sin_props(self) -> Iterator[ - tuple[ - structs.FieldIinfo, - str, - Any, - ] - ]: - ''' - Iterate over all non-@property fields of this struct. - - ''' - fi: structs.FieldInfo - for fi in structs.fields(self): - key: str = fi.name - val: Any = getattr(self, key) - yield fi, key, val - - def to_dict( - self, - include_non_members: bool = True, - - ) -> dict: - ''' - Like it sounds.. direct delegation to: - https://jcristharif.com/msgspec/api.html#msgspec.structs.asdict - - BUT, by default we pop all non-member (aka not defined as - struct fields) fields by default. - - ''' - asdict: dict = structs.asdict(self) - if include_non_members: - return asdict - - # only return a dict of the struct members - # which were provided as input, NOT anything - # added as type-defined `@property` methods! - sin_props: dict = {} - fi: structs.FieldInfo - for fi, k, v in self._sin_props(): - sin_props[k] = asdict[k] - - return sin_props - - def pformat( - self, - field_indent: int = 2, - indent: int = 0, - - ) -> str: - ''' - Recursion-safe `pprint.pformat()` style formatting of - a `msgspec.Struct` for sane reading by a human using a REPL. - - ''' - # global whitespace indent - ws: str = ' '*indent - - # field whitespace indent - field_ws: str = ' '*(field_indent + indent) - - # qtn: str = ws + self.__class__.__qualname__ - qtn: str = self.__class__.__qualname__ - - obj_str: str = '' # accumulator - fi: structs.FieldInfo - k: str - v: Any - for fi, k, v in self._sin_props(): - - # TODO: how can we prefer `Literal['option1', 'option2, - # ..]` over .__name__ == `Literal` but still get only the - # latter for simple types like `str | int | None` etc..? - ft: type = fi.type - typ_name: str = getattr(ft, '__name__', str(ft)) - - # recurse to get sub-struct's `.pformat()` output Bo - if isinstance(v, Struct): - val_str: str = v.pformat( - indent=field_indent + indent, - field_indent=indent + field_indent, - ) - - else: # the `pprint` recursion-safe format: - # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr - val_str: str = saferepr(v) - - obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') - - return ( - f'{qtn}(\n' - f'{obj_str}' - f'{ws})' - ) - - # TODO: use a pprint.PrettyPrinter instance around ONLY rendering - # inside a known tty? - # def __repr__(self) -> str: - # ... - - # __str__ = __repr__ = pformat - __repr__ = pformat - - def copy( - self, - update: dict | None = None, - - ) -> Struct: - ''' - Validate-typecast all self defined fields, return a copy of - us with all such fields. - - NOTE: This is kinda like the default behaviour in - `pydantic.BaseModel` except a copy of the object is - returned making it compat with `frozen=True`. - - ''' - if update: - for k, v in update.items(): - setattr(self, k, v) - - # NOTE: roundtrip serialize to validate - # - enode to msgpack binary format, - # - decode that back to a struct. - return msgpack.Decoder(type=type(self)).decode( - msgpack.Encoder().encode(self) - ) - - def typecast( - self, - - # TODO: allow only casting a named subset? - # fields: set[str] | None = None, - - ) -> None: - ''' - Cast all fields using their declared type annotations - (kinda like what `pydantic` does by default). - - NOTE: this of course won't work on frozen types, use - ``.copy()`` above in such cases. - - ''' - # https://jcristharif.com/msgspec/api.html#msgspec.structs.fields - fi: structs.FieldInfo - for fi in structs.fields(self): - setattr( - self, - fi.name, - fi.type(getattr(self, fi.name)), - ) - - def __sub__( - self, - other: Struct, - - ) -> DiffDump[tuple[str, Any, Any]]: - ''' - Compare fields/items key-wise and return a ``DiffDump`` - for easy visual REPL comparison B) - - ''' - diffs: DiffDump[tuple[str, Any, Any]] = DiffDump() - for fi in structs.fields(self): - attr_name: str = fi.name - ours: Any = getattr(self, attr_name) - theirs: Any = getattr(other, attr_name) - if ours != theirs: - diffs.append(( - attr_name, - ours, - theirs, - )) - - return diffs +from tractor.msg import Struct as Struct diff --git a/tests/conftest.py b/tests/conftest.py index 366d5d95..cf77e76e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,7 +10,7 @@ from piker import ( config, ) from piker.service import ( - Services, + get_service_mngr, ) from piker.log import get_console_log @@ -129,7 +129,7 @@ async def _open_test_pikerd( ) as service_manager, ): # this proc/actor is the pikerd - assert service_manager is Services + assert service_manager is get_service_mngr() async with tractor.wait_for_actor( 'pikerd', diff --git a/tests/test_ems.py b/tests/test_ems.py index c2f5d7a8..e0305999 100644 --- a/tests/test_ems.py +++ b/tests/test_ems.py @@ -26,7 +26,7 @@ import pytest import tractor from uuid import uuid4 -from piker.service import Services +from piker.service import ServiceMngr from piker.log import get_logger from piker.clearing._messages import ( Order, @@ -158,7 +158,7 @@ def load_and_check_pos( def test_ems_err_on_bad_broker( - open_test_pikerd: Services, + open_test_pikerd: ServiceMngr, loglevel: str, ): async def load_bad_fqme(): diff --git a/tests/test_services.py b/tests/test_services.py index 433e97f3..ca093929 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -15,7 +15,7 @@ import tractor from piker.service import ( find_service, - Services, + ServiceMngr, ) from piker.data import ( open_feed, @@ -44,7 +44,7 @@ def test_runtime_boot( async def main(): port = 6666 daemon_addr = ('127.0.0.1', port) - services: Services + services: ServiceMngr async with ( open_test_pikerd(