From e8c0485d9994c6fafa2f1cea8b328c7d80d148ab Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Feb 2025 11:38:29 -0500 Subject: [PATCH 01/12] Suppress `trio.EndOfChannel`s raised by remote peer Since now `tractor` will raise this native `trio`-exc translated from a `Stop` msg when the peer gracefully terminates a `tractor.MsgStream`. Just `info()` log in such cases versus continuing to warn for the others. --- piker/data/_sampling.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 7bb0231d..7e6fef54 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -394,7 +394,8 @@ async def register_with_sampler( finally: if ( sub_for_broadcasts - and subs + and + subs ): try: subs.remove(stream) @@ -796,7 +797,6 @@ async def uniform_rate_send( clear_types = _tick_groups['clears'] while True: - # compute the remaining time to sleep for this throttled cycle left_to_sleep = throttle_period - diff @@ -891,11 +891,23 @@ async def uniform_rate_send( # ``pikerd`` these kinds of errors! trio.ClosedResourceError, trio.BrokenResourceError, + trio.EndOfChannel, ConnectionResetError, - ): - # if the feed consumer goes down then drop - # out of this rate limiter - log.warning(f'{stream} closed') + ) as ipc_err: + match ipc_err: + case trio.EndOfChannel(): + log.info( + f'{stream} terminated by peer,\n' + f'{ipc_err!r}' + ) + case _: + # if the feed consumer goes down then drop + # out of this rate limiter + log.warning( + f'{stream} closed due to,\n' + f'{ipc_err!r}' + ) + await stream.aclose() return -- 2.34.1 From f21c44dd83ffddc7e009f7b8d70bc295e4d34f7d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 Feb 2025 11:27:17 -0500 Subject: [PATCH 02/12] Group bcast errors as `Sampler.bcast_errors` A new class var `tuple[Exception]` such that the err set can be reffed externally as needed for catching other similar pub-sub/IPC failures in other (related) real-time sub-systems. Also added some now-masked logging for debugging live-feed stream reading issues that should ONLY be used for debugging since they'll greatly degrade HFT perf. Used the new `log.mk_repr()` stuff (that one day we should prolly pull from `modden` as a dep) for pretty console emissions. --- piker/data/_sampling.py | 75 +++++++++++++++++++++++++++++++---------- piker/log.py | 30 +++++++++++++++++ 2 files changed, 88 insertions(+), 17 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 7e6fef54..0bb9a247 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -95,6 +95,12 @@ class Sampler: # history loading. incr_task_cs: trio.CancelScope | None = None + bcast_errors: tuple[Exception] = ( + trio.BrokenResourceError, + trio.ClosedResourceError, + trio.EndOfChannel, + ) + # holds all the ``tractor.Context`` remote subscriptions for # a particular sample period increment event: all subscribers are # notified on a step. @@ -258,14 +264,15 @@ class Sampler: subs: set last_ts, subs = pair - task = trio.lowlevel.current_task() - log.debug( - f'SUBS {self.subscribers}\n' - f'PAIR {pair}\n' - f'TASK: {task}: {id(task)}\n' - f'broadcasting {period_s} -> {last_ts}\n' - # f'consumers: {subs}' - ) + # NOTE, for debugging pub-sub issues + # task = trio.lowlevel.current_task() + # log.debug( + # f'AlL-SUBS@{period_s!r}: {self.subscribers}\n' + # f'PAIR: {pair}\n' + # f'TASK: {task}: {id(task)}\n' + # f'broadcasting {period_s} -> {last_ts}\n' + # f'consumers: {subs}' + # ) borked: set[MsgStream] = set() sent: set[MsgStream] = set() while True: @@ -282,12 +289,11 @@ class Sampler: await stream.send(msg) sent.add(stream) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): + except self.bcast_errors as err: log.error( - f'{stream._ctx.chan.uid} dropped connection' + f'Connection dropped for IPC ctx\n' + f'{stream._ctx}\n\n' + f'Due to {type(err)}' ) borked.add(stream) else: @@ -562,8 +568,7 @@ async def open_sample_stream( async def sample_and_broadcast( - - bus: _FeedsBus, # noqa + bus: _FeedsBus, rt_shm: ShmArray, hist_shm: ShmArray, quote_stream: trio.abc.ReceiveChannel, @@ -583,11 +588,33 @@ async def sample_and_broadcast( overruns = Counter() + # NOTE, only used for debugging live-data-feed issues, though + # this should be resolved more correctly in the future using the + # new typed-msgspec feats of `tractor`! + # + # XXX, a multiline nested `dict` formatter (since rn quote-msgs + # are just that). + # pfmt: Callable[[str], str] = mk_repr() + # iterate stream delivered by broker async for quotes in quote_stream: # print(quotes) - # TODO: ``numba`` this! + # XXX WARNING XXX only enable for debugging bc ow can cost + # ALOT of perf with HF-feedz!!! + # + # log.info( + # 'Rx live quotes:\n' + # f'{pfmt(quotes)}' + # ) + + # TODO, + # -[ ] `numba` or `cython`-nize this loop possibly? + # |_alternatively could we do it in rust somehow by upacking + # arrow msgs instead of using `msgspec`? + # -[ ] use `msgspec.Struct` support in new typed-msging from + # `tractor` to ensure only allowed msgs are transmitted? + # for broker_symbol, quote in quotes.items(): # TODO: in theory you can send the IPC msg *before* writing # to the sharedmem array to decrease latency, however, that @@ -660,6 +687,21 @@ async def sample_and_broadcast( sub_key: str = broker_symbol.lower() subs: set[Sub] = bus.get_subs(sub_key) + # TODO, figure out how to make this useful whilst + # incoporating feed "pausing" .. + # + # if not subs: + # all_bs_fqmes: list[str] = list( + # bus._subscribers.keys() + # ) + # log.warning( + # f'No subscribers for {brokername!r} live-quote ??\n' + # f'broker_symbol: {broker_symbol}\n\n' + + # f'Maybe the backend-sys symbol does not match one of,\n' + # f'{pfmt(all_bs_fqmes)}\n' + # ) + # NOTE: by default the broker backend doesn't append # it's own "name" into the fqme schema (but maybe it # should?) so we have to manually generate the correct @@ -757,7 +799,6 @@ async def sample_and_broadcast( async def uniform_rate_send( - rate: float, quote_stream: trio.abc.ReceiveChannel, stream: MsgStream, diff --git a/piker/log.py b/piker/log.py index 56776e1e..dc5cfc59 100644 --- a/piker/log.py +++ b/piker/log.py @@ -19,6 +19,10 @@ Log like a forester! """ import logging import json +import reprlib +from typing import ( + Callable, +) import tractor from pygments import ( @@ -84,3 +88,29 @@ def colorize_json( # likeable styles: algol_nu, tango, monokai formatters.TerminalTrueColorFormatter(style=style) ) + + +# TODO, eventually defer to the version in `modden` once +# it becomes a dep! +def mk_repr( + **repr_kws, +) -> Callable[[str], str]: + ''' + Allocate and deliver a `repr.Repr` instance with provided input + settings using the std-lib's `reprlib` mod, + * https://docs.python.org/3/library/reprlib.html + + ------ Ex. ------ + An up to 6-layer-nested `dict` as multi-line: + - https://stackoverflow.com/a/79102479 + - https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel + + ''' + def_kws: dict[str, int] = dict( + indent=2, + maxlevel=6, # recursion levels + maxstring=66, # match editor line-len limit + ) + def_kws |= repr_kws + reprr = reprlib.Repr(**def_kws) + return reprr.repr -- 2.34.1 From d3f047663f029eaec9f07187af137fd5516e11f5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Feb 2025 12:05:11 -0500 Subject: [PATCH 03/12] Catch using `Sampler.bcast_errors` where possible In all other possible IPC disconnect handling blocks. Also more comprehensive typing throughout `uniform_rate_send()`. --- piker/data/_sampling.py | 59 ++++++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 0bb9a247..e5b87a2a 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -771,18 +771,14 @@ async def sample_and_broadcast( if lags > 10: await tractor.pause() - except ( - trio.BrokenResourceError, - trio.ClosedResourceError, - trio.EndOfChannel, - ): + except Sampler.bcast_errors as ipc_err: ctx: Context = ipc._ctx chan: Channel = ctx.chan if ctx: log.warning( - 'Dropped `brokerd`-quotes-feed connection:\n' - f'{broker_symbol}:' - f'{ctx.cid}@{chan.uid}' + f'Dropped `brokerd`-feed for {broker_symbol!r} due to,\n' + f'x>) {ctx.cid}@{chan.uid}' + f'|_{ipc_err!r}\n\n' ) if sub.throttle_rate: assert ipc._closed @@ -803,7 +799,7 @@ async def uniform_rate_send( quote_stream: trio.abc.ReceiveChannel, stream: MsgStream, - task_status: TaskStatus = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: ''' @@ -821,13 +817,16 @@ async def uniform_rate_send( https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 ''' - # TODO: compute the approx overhead latency per cycle - left_to_sleep = throttle_period = 1/rate - 0.000616 + # ?TODO? dynamically compute the **actual** approx overhead latency per cycle + # instead of this magic # bidinezz? + throttle_period: float = 1/rate - 0.000616 + left_to_sleep: float = throttle_period # send cycle state + first_quote: dict|None first_quote = last_quote = None - last_send = time.time() - diff = 0 + last_send: float = time.time() + diff: float = 0 task_status.started() ticks_by_type: dict[ @@ -839,20 +838,27 @@ async def uniform_rate_send( while True: # compute the remaining time to sleep for this throttled cycle - left_to_sleep = throttle_period - diff + left_to_sleep: float = throttle_period - diff if left_to_sleep > 0: + cs: trio.CancelScope with trio.move_on_after(left_to_sleep) as cs: + sym: str + last_quote: dict try: sym, last_quote = await quote_stream.receive() except trio.EndOfChannel: - log.exception(f"feed for {stream} ended?") + log.exception( + f'Live stream for feed for ended?\n' + f'<=c\n' + f' |_[{stream!r}\n' + ) break - diff = time.time() - last_send + diff: float = time.time() - last_send if not first_quote: - first_quote = last_quote + first_quote: float = last_quote # first_quote['tbt'] = ticks_by_type if (throttle_period - diff) > 0: @@ -913,7 +919,9 @@ async def uniform_rate_send( # TODO: now if only we could sync this to the display # rate timing exactly lul try: - await stream.send({sym: first_quote}) + await stream.send({ + sym: first_quote + }) except tractor.RemoteActorError as rme: if rme.type is not tractor._exceptions.StreamOverrun: raise @@ -924,17 +932,14 @@ async def uniform_rate_send( f'{sym}:{ctx.cid}@{chan.uid}' ) + # NOTE: any of these can be raised by `tractor`'s IPC + # transport-layer and we want to be highly resilient + # to consumers which crash or lose network connection. + # I.e. we **DO NOT** want to crash and propagate up to + # ``pikerd`` these kinds of errors! except ( - # NOTE: any of these can be raised by ``tractor``'s IPC - # transport-layer and we want to be highly resilient - # to consumers which crash or lose network connection. - # I.e. we **DO NOT** want to crash and propagate up to - # ``pikerd`` these kinds of errors! - trio.ClosedResourceError, - trio.BrokenResourceError, - trio.EndOfChannel, ConnectionResetError, - ) as ipc_err: + ) + Sampler.bcast_errors as ipc_err: match ipc_err: case trio.EndOfChannel(): log.info( -- 2.34.1 From 683ad0ffb46140eaa16c1d01cda3a0355f2cbe25 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Feb 2025 11:21:29 -0500 Subject: [PATCH 04/12] Delegate to `tractor.msg.pretty_struct` since it was factored from here! --- piker/types.py | 228 +------------------------------------------------ 1 file changed, 1 insertion(+), 227 deletions(-) diff --git a/piker/types.py b/piker/types.py index cda3fb44..385f83b0 100644 --- a/piker/types.py +++ b/piker/types.py @@ -21,230 +21,4 @@ Extensions to built-in or (heavily used but 3rd party) friend-lib types. ''' -from __future__ import annotations -from collections import UserList -from pprint import ( - saferepr, -) -from typing import Any - -from msgspec import ( - msgpack, - Struct as _Struct, - structs, -) - - -class DiffDump(UserList): - ''' - Very simple list delegator that repr() dumps (presumed) tuple - elements of the form `tuple[str, Any, Any]` in a nice - multi-line readable form for analyzing `Struct` diffs. - - ''' - def __repr__(self) -> str: - if not len(self): - return super().__repr__() - - # format by displaying item pair's ``repr()`` on multiple, - # indented lines such that they are more easily visually - # comparable when printed to console when printed to - # console. - repstr: str = '[\n' - for k, left, right in self: - repstr += ( - f'({k},\n' - f'\t{repr(left)},\n' - f'\t{repr(right)},\n' - ')\n' - ) - repstr += ']\n' - return repstr - - -class Struct( - _Struct, - - # https://jcristharif.com/msgspec/structs.html#tagged-unions - # tag='pikerstruct', - # tag=True, -): - ''' - A "human friendlier" (aka repl buddy) struct subtype. - - ''' - def _sin_props(self) -> Iterator[ - tuple[ - structs.FieldIinfo, - str, - Any, - ] - ]: - ''' - Iterate over all non-@property fields of this struct. - - ''' - fi: structs.FieldInfo - for fi in structs.fields(self): - key: str = fi.name - val: Any = getattr(self, key) - yield fi, key, val - - def to_dict( - self, - include_non_members: bool = True, - - ) -> dict: - ''' - Like it sounds.. direct delegation to: - https://jcristharif.com/msgspec/api.html#msgspec.structs.asdict - - BUT, by default we pop all non-member (aka not defined as - struct fields) fields by default. - - ''' - asdict: dict = structs.asdict(self) - if include_non_members: - return asdict - - # only return a dict of the struct members - # which were provided as input, NOT anything - # added as type-defined `@property` methods! - sin_props: dict = {} - fi: structs.FieldInfo - for fi, k, v in self._sin_props(): - sin_props[k] = asdict[k] - - return sin_props - - def pformat( - self, - field_indent: int = 2, - indent: int = 0, - - ) -> str: - ''' - Recursion-safe `pprint.pformat()` style formatting of - a `msgspec.Struct` for sane reading by a human using a REPL. - - ''' - # global whitespace indent - ws: str = ' '*indent - - # field whitespace indent - field_ws: str = ' '*(field_indent + indent) - - # qtn: str = ws + self.__class__.__qualname__ - qtn: str = self.__class__.__qualname__ - - obj_str: str = '' # accumulator - fi: structs.FieldInfo - k: str - v: Any - for fi, k, v in self._sin_props(): - - # TODO: how can we prefer `Literal['option1', 'option2, - # ..]` over .__name__ == `Literal` but still get only the - # latter for simple types like `str | int | None` etc..? - ft: type = fi.type - typ_name: str = getattr(ft, '__name__', str(ft)) - - # recurse to get sub-struct's `.pformat()` output Bo - if isinstance(v, Struct): - val_str: str = v.pformat( - indent=field_indent + indent, - field_indent=indent + field_indent, - ) - - else: # the `pprint` recursion-safe format: - # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr - val_str: str = saferepr(v) - - obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') - - return ( - f'{qtn}(\n' - f'{obj_str}' - f'{ws})' - ) - - # TODO: use a pprint.PrettyPrinter instance around ONLY rendering - # inside a known tty? - # def __repr__(self) -> str: - # ... - - # __str__ = __repr__ = pformat - __repr__ = pformat - - def copy( - self, - update: dict | None = None, - - ) -> Struct: - ''' - Validate-typecast all self defined fields, return a copy of - us with all such fields. - - NOTE: This is kinda like the default behaviour in - `pydantic.BaseModel` except a copy of the object is - returned making it compat with `frozen=True`. - - ''' - if update: - for k, v in update.items(): - setattr(self, k, v) - - # NOTE: roundtrip serialize to validate - # - enode to msgpack binary format, - # - decode that back to a struct. - return msgpack.Decoder(type=type(self)).decode( - msgpack.Encoder().encode(self) - ) - - def typecast( - self, - - # TODO: allow only casting a named subset? - # fields: set[str] | None = None, - - ) -> None: - ''' - Cast all fields using their declared type annotations - (kinda like what `pydantic` does by default). - - NOTE: this of course won't work on frozen types, use - ``.copy()`` above in such cases. - - ''' - # https://jcristharif.com/msgspec/api.html#msgspec.structs.fields - fi: structs.FieldInfo - for fi in structs.fields(self): - setattr( - self, - fi.name, - fi.type(getattr(self, fi.name)), - ) - - def __sub__( - self, - other: Struct, - - ) -> DiffDump[tuple[str, Any, Any]]: - ''' - Compare fields/items key-wise and return a ``DiffDump`` - for easy visual REPL comparison B) - - ''' - diffs: DiffDump[tuple[str, Any, Any]] = DiffDump() - for fi in structs.fields(self): - attr_name: str = fi.name - ours: Any = getattr(self, attr_name) - theirs: Any = getattr(other, attr_name) - if ours != theirs: - diffs.append(( - attr_name, - ours, - theirs, - )) - - return diffs +from tractor.msg import Struct as Struct -- 2.34.1 From 79a4c433dde8a91b18bb19953fa168a5b30a63ca Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Feb 2025 12:30:41 -0500 Subject: [PATCH 05/12] Enable `greenback` for `.pause_from_sync()` by default? --- piker/service/_actor_runtime.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index a4e3ccf2..837b615d 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -119,6 +119,10 @@ async def open_piker_runtime( # spawn other specialized daemons I think? enable_modules=enable_modules, + # TODO: how to configure this? + # keep it on by default if debug mode is set? + maybe_enable_greenback=False, + **tractor_kwargs, ) as actor, -- 2.34.1 From 1218af883e1069d80eb5945cbeb482b58239fde8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 21 Jun 2024 15:34:57 -0400 Subject: [PATCH 06/12] Prep service mngr for move to `tractor.hilevel` Given it's a fairly simple yet useful abstraction, it makes sense to offer this sub-sys alongside the core `tractor` runtime lib. Without going into extreme detail on the impl changes (it'll come in the commit that moves to the other repo) here is the high level summary: ------ - ------ - rename `Services` -> `ServiceMngr` and use an factory `@acm` to guarantee a single-instance-per-actor using a niche approach for a singleton object using a default keyword-arg B) - the mod level `open_service_mngr()` and `get_service_mngr()` are the new allocation/access API. - add a `ServiceMngr.start_service()` method which does the work of both spawning a new subactor (for the daemon) and uses its portal to start the mngr side supervision task. - open actor/task nurseries inside the `@acm` allocator. Adjust other dependent subsystems to match: ------ - ------ - use `open_service_mngr()` when first allocated in `open_pikerd()`. - use `get_service_mngr()` instead of importing the class ref inside `.service.maybe_spawn_daemon()`, `.brokers._daemon.spawn_brokerd()` and `.data._sampling.spawn_samplerd()` using a `partial` to pack in the endpoint ctx kwargs (unpacked inside `.start_service()` XD). --- piker/brokers/_daemon.py | 53 +++-- piker/data/_sampling.py | 60 +++--- piker/service/__init__.py | 6 +- piker/service/_actor_runtime.py | 44 ++-- piker/service/_ahab.py | 7 +- piker/service/_daemon.py | 17 +- piker/service/_mngr.py | 352 +++++++++++++++++++++++++++----- piker/service/elastic.py | 4 +- piker/service/marketstore.py | 4 +- tests/conftest.py | 4 +- tests/test_ems.py | 4 +- tests/test_services.py | 4 +- 12 files changed, 416 insertions(+), 143 deletions(-) diff --git a/piker/brokers/_daemon.py b/piker/brokers/_daemon.py index 5efb03dd..da92f246 100644 --- a/piker/brokers/_daemon.py +++ b/piker/brokers/_daemon.py @@ -23,6 +23,7 @@ from __future__ import annotations from contextlib import ( asynccontextmanager as acm, ) +from functools import partial from types import ModuleType from typing import ( TYPE_CHECKING, @@ -190,14 +191,17 @@ def broker_init( async def spawn_brokerd( - brokername: str, loglevel: str | None = None, **tractor_kwargs, ) -> bool: + ''' + Spawn a `brokerd.` subactor service daemon + using `pikerd`'s service mngr. + ''' from piker.service._util import log # use service mngr log log.info(f'Spawning {brokername} broker daemon') @@ -217,27 +221,35 @@ async def spawn_brokerd( # ask `pikerd` to spawn a new sub-actor and manage it under its # actor nursery - from piker.service import Services - + from piker.service import ( + get_service_mngr, + ServiceMngr, + ) dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}' - portal = await Services.actor_n.start_actor( - dname, - enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'), - debug_mode=Services.debug_mode, + mngr: ServiceMngr = get_service_mngr() + ctx: tractor.Context = await mngr.start_service( + daemon_name=dname, + ctx_ep=partial( + # signature of target root-task endpoint + daemon_fixture_ep, + + # passed to daemon_fixture_ep(**kwargs) + brokername=brokername, + loglevel=loglevel, + ), + debug_mode=mngr.debug_mode, + loglevel=loglevel, + enable_modules=( + _data_mods + + + tractor_kwargs.pop('enable_modules') + ), **tractor_kwargs ) - - # NOTE: the service mngr expects an already spawned actor + its - # portal ref in order to do non-blocking setup of brokerd - # service nursery. - await Services.start_service_task( - dname, - portal, - - # signature of target root-task endpoint - daemon_fixture_ep, - brokername=brokername, - loglevel=loglevel, + assert ( + not ctx.cancel_called + and ctx.portal # parent side + and dname in ctx.chan.uid # subactor is named as desired ) return True @@ -262,8 +274,7 @@ async def maybe_spawn_brokerd( from piker.service import maybe_spawn_daemon async with maybe_spawn_daemon( - - f'brokerd.{brokername}', + service_name=f'brokerd.{brokername}', service_task_target=spawn_brokerd, spawn_args={ 'brokername': brokername, diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index e5b87a2a..093e19cf 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -25,6 +25,7 @@ from collections import ( defaultdict, ) from contextlib import asynccontextmanager as acm +from functools import partial import time from typing import ( Any, @@ -42,7 +43,7 @@ from tractor.trionics import ( maybe_open_nursery, ) import trio -from trio_typing import TaskStatus +from trio import TaskStatus from .ticktools import ( frame_ticks, @@ -70,6 +71,7 @@ if TYPE_CHECKING: _default_delay_s: float = 1.0 +# TODO: use new `tractor.singleton_acm` API for this! class Sampler: ''' Global sampling engine registry. @@ -79,9 +81,9 @@ class Sampler: This non-instantiated type is meant to be a singleton within a `samplerd` actor-service spawned once by the user wishing to - time-step-sample (real-time) quote feeds, see - ``.service.maybe_open_samplerd()`` and the below - ``register_with_sampler()``. + time-step-sample a (real-time) quote feeds, see + `.service.maybe_open_samplerd()` and the below + `register_with_sampler()`. ''' service_nursery: None | trio.Nursery = None @@ -381,7 +383,10 @@ async def register_with_sampler( assert Sampler.ohlcv_shms # unblock caller - await ctx.started(set(Sampler.ohlcv_shms.keys())) + await ctx.started( + # XXX bc msgpack only allows one array type! + list(Sampler.ohlcv_shms.keys()) + ) if open_index_stream: try: @@ -426,7 +431,6 @@ async def register_with_sampler( async def spawn_samplerd( - loglevel: str | None = None, **extra_tractor_kwargs @@ -436,7 +440,10 @@ async def spawn_samplerd( update and increment count write and stream broadcasting. ''' - from piker.service import Services + from piker.service import ( + get_service_mngr, + ServiceMngr, + ) dname = 'samplerd' log.info(f'Spawning `{dname}`') @@ -444,26 +451,33 @@ async def spawn_samplerd( # singleton lock creation of ``samplerd`` since we only ever want # one daemon per ``pikerd`` proc tree. # TODO: make this built-into the service api? - async with Services.locks[dname + '_singleton']: + mngr: ServiceMngr = get_service_mngr() + already_started: bool = dname in mngr.service_tasks - if dname not in Services.service_tasks: - - portal = await Services.actor_n.start_actor( - dname, - enable_modules=[ - 'piker.data._sampling', - ], - loglevel=loglevel, - debug_mode=Services.debug_mode, # set by pikerd flag - **extra_tractor_kwargs - ) - - await Services.start_service_task( - dname, - portal, + async with mngr._locks[dname + '_singleton']: + ctx: Context = await mngr.start_service( + daemon_name=dname, + ctx_ep=partial( register_with_sampler, period_s=1, sub_for_broadcasts=False, + ), + debug_mode=mngr.debug_mode, # set by pikerd flag + + # proxy-through to tractor + enable_modules=[ + 'piker.data._sampling', + ], + loglevel=loglevel, + **extra_tractor_kwargs + ) + if not already_started: + assert ( + ctx + and + ctx.portal + and + not ctx.cancel_called ) return True diff --git a/piker/service/__init__.py b/piker/service/__init__.py index 29360620..beb9c70b 100644 --- a/piker/service/__init__.py +++ b/piker/service/__init__.py @@ -30,7 +30,11 @@ Actor runtime primtives and (distributed) service APIs for, => TODO: maybe to (re)move elsewhere? ''' -from ._mngr import Services as Services +from ._mngr import ( + get_service_mngr as get_service_mngr, + open_service_mngr as open_service_mngr, + ServiceMngr as ServiceMngr, +) from ._registry import ( _tractor_kwargs as _tractor_kwargs, _default_reg_addr as _default_reg_addr, diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index 837b615d..42440f82 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -21,7 +21,6 @@ from __future__ import annotations import os from typing import ( - Optional, Any, ClassVar, ) @@ -30,13 +29,13 @@ from contextlib import ( ) import tractor -import trio from ._util import ( get_console_log, ) from ._mngr import ( - Services, + open_service_mngr, + ServiceMngr, ) from ._registry import ( # noqa _tractor_kwargs, @@ -59,7 +58,7 @@ async def open_piker_runtime( registry_addrs: list[tuple[str, int]] = [], enable_modules: list[str] = [], - loglevel: Optional[str] = None, + loglevel: str|None = None, # XXX NOTE XXX: you should pretty much never want debug mode # for data daemons when running in production. @@ -69,7 +68,7 @@ async def open_piker_runtime( # and spawn the service tree distributed per that. start_method: str = 'trio', - tractor_runtime_overrides: dict | None = None, + tractor_runtime_overrides: dict|None = None, **tractor_kwargs, ) -> tuple[ @@ -121,7 +120,7 @@ async def open_piker_runtime( # TODO: how to configure this? # keep it on by default if debug mode is set? - maybe_enable_greenback=False, + # maybe_enable_greenback=debug_mode, **tractor_kwargs, ) as actor, @@ -171,12 +170,13 @@ async def open_pikerd( **kwargs, -) -> Services: +) -> ServiceMngr: ''' - Start a root piker daemon with an indefinite lifetime. + Start a root piker daemon actor (aka `pikerd`) with an indefinite + lifetime. - A root actor nursery is created which can be used to create and keep - alive underling services (see below). + A root actor-nursery is created which can be used to spawn and + supervise underling service sub-actors (see below). ''' # NOTE: for the root daemon we always enable the root @@ -203,8 +203,6 @@ async def open_pikerd( root_actor, reg_addrs, ), - tractor.open_nursery() as actor_nursery, - trio.open_nursery() as service_nursery, ): for addr in reg_addrs: if addr not in root_actor.accept_addrs: @@ -213,25 +211,17 @@ async def open_pikerd( 'Maybe you have another daemon already running?' ) - # assign globally for future daemon/task creation - Services.actor_n = actor_nursery - Services.service_n = service_nursery - Services.debug_mode = debug_mode - - try: - yield Services - - finally: - # TODO: is this more clever/efficient? - # if 'samplerd' in Services.service_tasks: - # await Services.cancel_service('samplerd') - service_nursery.cancel_scope.cancel() + mngr: ServiceMngr + async with open_service_mngr( + debug_mode=debug_mode, + ) as mngr: + yield mngr # TODO: do we even need this? # @acm # async def maybe_open_runtime( -# loglevel: Optional[str] = None, +# loglevel: str|None = None, # **kwargs, # ) -> None: @@ -260,7 +250,7 @@ async def maybe_open_pikerd( loglevel: str | None = None, **kwargs, -) -> tractor._portal.Portal | ClassVar[Services]: +) -> tractor._portal.Portal | ClassVar[ServiceMngr]: ''' If no ``pikerd`` daemon-root-actor can be found start it and yield up (we should probably figure out returning a portal to self diff --git a/piker/service/_ahab.py b/piker/service/_ahab.py index 4cccf855..0bdd1688 100644 --- a/piker/service/_ahab.py +++ b/piker/service/_ahab.py @@ -49,7 +49,7 @@ from requests.exceptions import ( ReadTimeout, ) -from ._mngr import Services +from ._mngr import ServiceMngr from ._util import ( log, # sub-sys logger get_console_log, @@ -453,7 +453,7 @@ async def open_ahabd( @acm async def start_ahab_service( - services: Services, + services: ServiceMngr, service_name: str, # endpoint config passed as **kwargs @@ -549,7 +549,8 @@ async def start_ahab_service( log.warning('Failed to cancel root permsed container') except ( - trio.MultiError, + # trio.MultiError, + ExceptionGroup, ) as err: for subexc in err.exceptions: if isinstance(subexc, PermissionError): diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py index 1e7ff096..a76918ec 100644 --- a/piker/service/_daemon.py +++ b/piker/service/_daemon.py @@ -26,14 +26,17 @@ from typing import ( from contextlib import ( asynccontextmanager as acm, ) +from collections import defaultdict import tractor +import trio from ._util import ( log, # sub-sys logger ) from ._mngr import ( - Services, + get_service_mngr, + ServiceMngr, ) from ._actor_runtime import maybe_open_pikerd from ._registry import find_service @@ -41,15 +44,14 @@ from ._registry import find_service @acm async def maybe_spawn_daemon( - service_name: str, service_task_target: Callable, - spawn_args: dict[str, Any], loglevel: str | None = None, singleton: bool = False, + _locks = defaultdict(trio.Lock), **pikerd_kwargs, ) -> tractor.Portal: @@ -67,7 +69,7 @@ async def maybe_spawn_daemon( ''' # serialize access to this section to avoid # 2 or more tasks racing to create a daemon - lock = Services.locks[service_name] + lock = _locks[service_name] await lock.acquire() async with find_service( @@ -147,21 +149,22 @@ async def spawn_emsd( """ log.info('Spawning emsd') - portal = await Services.actor_n.start_actor( + smngr: ServiceMngr = get_service_mngr() + portal = await smngr.actor_n.start_actor( 'emsd', enable_modules=[ 'piker.clearing._ems', 'piker.clearing._client', ], loglevel=loglevel, - debug_mode=Services.debug_mode, # set by pikerd flag + debug_mode=smngr.debug_mode, # set by pikerd flag **extra_tractor_kwargs ) # non-blocking setup of clearing service from ..clearing._ems import _setup_persistent_emsd - await Services.start_service_task( + await smngr.start_service_task( 'emsd', portal, diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py index 89e98411..3197bef3 100644 --- a/piker/service/_mngr.py +++ b/piker/service/_mngr.py @@ -18,16 +18,29 @@ daemon-service management API. """ +from __future__ import annotations +from contextlib import ( + asynccontextmanager as acm, + # contextmanager as cm, +) from collections import defaultdict +from dataclasses import ( + dataclass, + field, +) +import functools +import inspect from typing import ( Callable, Any, ) -import trio -from trio_typing import TaskStatus +import msgspec import tractor +import trio +from trio import TaskStatus from tractor import ( + ActorNursery, current_actor, ContextCancelled, Context, @@ -39,6 +52,130 @@ from ._util import ( ) +# TODO: implement a singleton deco-API for wrapping the below +# factory's impl for general actor-singleton use? +# +# @singleton +# async def open_service_mngr( +# **init_kwargs, +# ) -> ServiceMngr: +# ''' +# Note this function body is invoke IFF no existing singleton instance already +# exists in this proc's memory. + +# ''' +# # setup +# yield ServiceMngr(**init_kwargs) +# # teardown + + + +# TODO: singleton factory API instead of a class API +@acm +async def open_service_mngr( + *, + debug_mode: bool = False, + + # impl deat which ensures a single global instance + _singleton: list[ServiceMngr|None] = [None], + **init_kwargs, + +) -> ServiceMngr: + ''' + Open a multi-subactor-as-service-daemon tree supervisor. + + The delivered `ServiceMngr` is a singleton instance for each + actor-process and is allocated on first open and never + de-allocated unless explicitly deleted by al call to + `del_service_mngr()`. + + ''' + # TODO: factor this an allocation into + # a `._mngr.open_service_mngr()` and put in the + # once-n-only-once setup/`.__aenter__()` part! + # -[ ] how to make this only happen on the `mngr == None` case? + # |_ use `.trionics.maybe_open_context()` (for generic + # async-with-style-only-once of the factory impl, though + # what do we do for the allocation case? + # / `.maybe_open_nursery()` (since for this specific case + # it's simpler?) to activate + async with ( + tractor.open_nursery() as an, + trio.open_nursery() as tn, + ): + # impl specific obvi.. + init_kwargs.update({ + 'actor_n': an, + 'service_n': tn, + }) + + mngr: ServiceMngr|None + if (mngr := _singleton[0]) is None: + + log.info('Allocating a new service mngr!') + mngr = _singleton[0] = ServiceMngr(**init_kwargs) + + # TODO: put into `.__aenter__()` section of + # eventual `@singleton_acm` API wrapper. + # + # assign globally for future daemon/task creation + mngr.actor_n = an + mngr.service_n = tn + + else: + assert ( + mngr.actor_n + and + mngr.service_tn + ) + log.info( + 'Using extant service mngr!\n\n' + f'{mngr!r}\n' # it has a nice `.__repr__()` of services state + ) + + try: + # NOTE: this is a singleton factory impl specific detail + # which should be supported in the condensed + # `@singleton_acm` API? + mngr.debug_mode = debug_mode + + yield mngr + finally: + # TODO: is this more clever/efficient? + # if 'samplerd' in mngr.service_tasks: + # await mngr.cancel_service('samplerd') + tn.cancel_scope.cancel() + + + +def get_service_mngr() -> ServiceMngr: + ''' + Try to get the singleton service-mngr for this actor presuming it + has already been allocated using, + + .. code:: python + + async with open_<@singleton_acm(func)>() as mngr` + ... this block kept open ... + + If not yet allocated raise a `ServiceError`. + + ''' + # https://stackoverflow.com/a/12627202 + # https://docs.python.org/3/library/inspect.html#inspect.Signature + maybe_mngr: ServiceMngr|None = inspect.signature( + open_service_mngr + ).parameters['_singleton'].default[0] + + if maybe_mngr is None: + raise RuntimeError( + 'Someone must allocate a `ServiceMngr` using\n\n' + '`async with open_service_mngr()` beforehand!!\n' + ) + + return maybe_mngr + + # TODO: we need remote wrapping and a general soln: # - factor this into a ``tractor.highlevel`` extension # pack for the # library. @@ -46,31 +183,46 @@ from ._util import ( # to the pikerd actor for starting services remotely! # - prolly rename this to ActorServicesNursery since it spawns # new actors and supervises them to completion? -class Services: +@dataclass +class ServiceMngr: +# class ServiceMngr(msgspec.Struct): + ''' + A multi-subactor-as-service manager. - actor_n: tractor._supervise.ActorNursery + Spawn, supervise and monitor service/daemon subactors in a SC + process tree. + + ''' + actor_n: ActorNursery service_n: trio.Nursery - debug_mode: bool # tractor sub-actor debug mode flag + debug_mode: bool = False # tractor sub-actor debug mode flag + service_tasks: dict[ str, tuple[ trio.CancelScope, + Context, Portal, trio.Event, ] - ] = {} - locks = defaultdict(trio.Lock) + ] = field(default_factory=dict) + + # internal per-service task mutexs + _locks = defaultdict(trio.Lock) - @classmethod async def start_service_task( self, name: str, portal: Portal, + + # TODO: typevar for the return type of the target and then + # use it below for `ctx_res`? target: Callable, + allow_overruns: bool = False, **ctx_kwargs, - ) -> (trio.CancelScope, Context): + ) -> (trio.CancelScope, Context, Any): ''' Open a context in a service sub-actor, add to a stack that gets unwound at ``pikerd`` teardown. @@ -83,6 +235,7 @@ class Services: task_status: TaskStatus[ tuple[ trio.CancelScope, + Context, trio.Event, Any, ] @@ -90,64 +243,87 @@ class Services: ) -> Any: + # TODO: use the ctx._scope directly here instead? + # -[ ] actually what semantics do we expect for this + # usage!? with trio.CancelScope() as cs: + try: + async with portal.open_context( + target, + allow_overruns=allow_overruns, + **ctx_kwargs, - async with portal.open_context( - target, - allow_overruns=allow_overruns, - **ctx_kwargs, + ) as (ctx, started): - ) as (ctx, first): - - # unblock once the remote context has started - complete = trio.Event() - task_status.started((cs, complete, first)) - log.info( - f'`pikerd` service {name} started with value {first}' - ) - try: + # unblock once the remote context has started + complete = trio.Event() + task_status.started(( + cs, + ctx, + complete, + started, + )) + log.info( + f'`pikerd` service {name} started with value {started}' + ) # wait on any context's return value # and any final portal result from the # sub-actor. - ctx_res: Any = await ctx.result() + ctx_res: Any = await ctx.wait_for_result() # NOTE: blocks indefinitely until cancelled # either by error from the target context # function or by being cancelled here by the # surrounding cancel scope. - return (await portal.result(), ctx_res) - except ContextCancelled as ctxe: - canceller: tuple[str, str] = ctxe.canceller - our_uid: tuple[str, str] = current_actor().uid - if ( - canceller != portal.channel.uid - and - canceller != our_uid - ): - log.cancel( - f'Actor-service {name} was remotely cancelled?\n' - f'remote canceller: {canceller}\n' - f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n' - ) - else: - raise + return ( + await portal.wait_for_result(), + ctx_res, + ) + except ContextCancelled as ctxe: + canceller: tuple[str, str] = ctxe.canceller + our_uid: tuple[str, str] = current_actor().uid + if ( + canceller != portal.chan.uid + and + canceller != our_uid + ): + log.cancel( + f'Actor-service `{name}` was remotely cancelled by a peer?\n' + # TODO: this would be a good spot to use + # a respawn feature Bo + f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' - finally: - await portal.cancel_actor() - complete.set() - self.service_tasks.pop(name) + f'cancellee: {portal.chan.uid}\n' + f'canceller: {canceller}\n' + ) + else: + raise - cs, complete, first = await self.service_n.start(open_context_in_task) + finally: + # NOTE: the ctx MUST be cancelled first if we + # don't want the above `ctx.wait_for_result()` to + # raise a self-ctxc. WHY, well since from the ctx's + # perspective the cancel request will have + # arrived out-out-of-band at the `Actor.cancel()` + # level, thus `Context.cancel_called == False`, + # meaning `ctx._is_self_cancelled() == False`. + # with trio.CancelScope(shield=True): + # await ctx.cancel() + await portal.cancel_actor() + complete.set() + self.service_tasks.pop(name) + + cs, sub_ctx, complete, started = await self.service_n.start( + open_context_in_task + ) # store the cancel scope and portal for later cancellation or # retstart if needed. - self.service_tasks[name] = (cs, portal, complete) + self.service_tasks[name] = (cs, sub_ctx, portal, complete) + return cs, sub_ctx, started - return cs, first - - @classmethod async def cancel_service( self, name: str, @@ -158,8 +334,80 @@ class Services: ''' log.info(f'Cancelling `pikerd` service {name}') - cs, portal, complete = self.service_tasks[name] - cs.cancel() + cs, sub_ctx, portal, complete = self.service_tasks[name] + + # cs.cancel() + await sub_ctx.cancel() await complete.wait() - assert name not in self.service_tasks, \ - f'Serice task for {name} not terminated?' + + if name in self.service_tasks: + # TODO: custom err? + # raise ServiceError( + raise RuntimeError( + f'Serice task for {name} not terminated?' + ) + + # assert name not in self.service_tasks, \ + # f'Serice task for {name} not terminated?' + + async def start_service( + self, + daemon_name: str, + ctx_ep: Callable, # kwargs must `partial`-ed in! + + debug_mode: bool = False, + **tractor_actor_kwargs, + + ) -> Context: + ''' + Start a "service" task in a new sub-actor (daemon) and manage it's lifetime + indefinitely. + + Services can be cancelled/shutdown using `.cancel_service()`. + + ''' + entry: tuple|None = self.service_tasks.get(daemon_name) + if entry: + (cs, sub_ctx, portal, complete) = entry + return sub_ctx + + if daemon_name not in self.service_tasks: + portal = await self.actor_n.start_actor( + daemon_name, + debug_mode=( # maybe set globally during allocate + debug_mode + or + self.debug_mode + ), + **tractor_actor_kwargs, + ) + ctx_kwargs: dict[str, Any] = {} + if isinstance(ctx_ep, functools.partial): + ctx_kwargs: dict[str, Any] = ctx_ep.keywords + ctx_ep: Callable = ctx_ep.func + + (cs, sub_ctx, started) = await self.start_service_task( + daemon_name, + portal, + ctx_ep, + **ctx_kwargs, + ) + + return sub_ctx + + +# TODO: +# -[ ] factor all the common shit from `.data._sampling` +# and `.brokers._daemon` into here / `ServiceMngr` +# in terms of allocating the `Portal` as part of the +# "service-in-subactor" starting! +# -[ ] move to `tractor.hilevel._service`, import and use here! +# NOTE: purposely leaks the ref to the mod-scope Bo +# import tractor +# from tractor.hilevel import ( +# open_service_mngr, +# ServiceMngr, +# ) +# mngr: ServiceMngr|None = None +# with tractor.hilevel.open_service_mngr() as mngr: +# Services = proxy(mngr) diff --git a/piker/service/elastic.py b/piker/service/elastic.py index 902f4fde..b1a13722 100644 --- a/piker/service/elastic.py +++ b/piker/service/elastic.py @@ -21,11 +21,13 @@ from typing import ( TYPE_CHECKING, ) +# TODO: oof, needs to be changed to `httpx`! import asks if TYPE_CHECKING: import docker from ._ahab import DockerContainer + from . import ServiceMngr from ._util import log # sub-sys logger from ._util import ( @@ -127,7 +129,7 @@ def start_elasticsearch( @acm async def start_ahab_daemon( - service_mngr: Services, + service_mngr: ServiceMngr, user_config: dict | None = None, loglevel: str | None = None, diff --git a/piker/service/marketstore.py b/piker/service/marketstore.py index c9f49420..852b967c 100644 --- a/piker/service/marketstore.py +++ b/piker/service/marketstore.py @@ -53,7 +53,7 @@ import pendulum # import purerpc from ..data.feed import maybe_open_feed -from . import Services +from . import ServiceMngr from ._util import ( log, # sub-sys logger get_console_log, @@ -233,7 +233,7 @@ def start_marketstore( @acm async def start_ahab_daemon( - service_mngr: Services, + service_mngr: ServiceMngr, user_config: dict | None = None, loglevel: str | None = None, diff --git a/tests/conftest.py b/tests/conftest.py index 366d5d95..cf77e76e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,7 +10,7 @@ from piker import ( config, ) from piker.service import ( - Services, + get_service_mngr, ) from piker.log import get_console_log @@ -129,7 +129,7 @@ async def _open_test_pikerd( ) as service_manager, ): # this proc/actor is the pikerd - assert service_manager is Services + assert service_manager is get_service_mngr() async with tractor.wait_for_actor( 'pikerd', diff --git a/tests/test_ems.py b/tests/test_ems.py index c2f5d7a8..e0305999 100644 --- a/tests/test_ems.py +++ b/tests/test_ems.py @@ -26,7 +26,7 @@ import pytest import tractor from uuid import uuid4 -from piker.service import Services +from piker.service import ServiceMngr from piker.log import get_logger from piker.clearing._messages import ( Order, @@ -158,7 +158,7 @@ def load_and_check_pos( def test_ems_err_on_bad_broker( - open_test_pikerd: Services, + open_test_pikerd: ServiceMngr, loglevel: str, ): async def load_bad_fqme(): diff --git a/tests/test_services.py b/tests/test_services.py index 433e97f3..ca093929 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -15,7 +15,7 @@ import tractor from piker.service import ( find_service, - Services, + ServiceMngr, ) from piker.data import ( open_feed, @@ -44,7 +44,7 @@ def test_runtime_boot( async def main(): port = 6666 daemon_addr = ('127.0.0.1', port) - services: Services + services: ServiceMngr async with ( open_test_pikerd( -- 2.34.1 From 5b686edae560dfd2b61ea51d97baa5ad740a8f82 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 21 Aug 2024 12:16:17 -0400 Subject: [PATCH 07/12] Drop `.cancel_actor()` from `maybe_spawn_daemon()` Since `tractor`'s new and improved inter-actor cancellation semantics are much more pedantic, AND bc we use the `ServiceMngr` for spawning service actors on-demand, the caller of `maybe_spawn_daemon()` should NEVER conduct a so called "out of band" `Actor`-runtime cancel request since this is precisely the job of our `ServiceMngr` XD Add a super in depth note explaining the underlying issue and adding a todo list of how we should prolly augment `tractor` to make such cases easier to grok and fix in the future! --- piker/service/_daemon.py | 60 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py index a76918ec..0cb57291 100644 --- a/piker/service/_daemon.py +++ b/piker/service/_daemon.py @@ -134,7 +134,65 @@ async def maybe_spawn_daemon( async with tractor.wait_for_actor(service_name) as portal: lock.release() yield portal - await portal.cancel_actor() + # --- ---- --- + # XXX NOTE XXX + # --- ---- --- + # DO NOT PUT A `portal.cancel_actor()` here (as was prior)! + # + # Doing so will cause an "out-of-band" ctxc + # (`tractor.ContextCancelled`) to be raised inside the + # `ServiceMngr.open_context_in_task()`'s call to + # `ctx.wait_for_result()` AND the internal self-ctxc + # "graceful capture" WILL NOT CATCH IT! + # + # This can cause certain types of operations to raise + # that ctxc BEFORE THEY `return`, resulting in + # a "false-negative" ctxc being raised when really + # nothing actually failed, other then our semantic + # "failure" to suppress an expected, graceful, + # self-cancel scenario.. + # + # bUt wHy duZ It WorK lIKe dis.. + # ------------------------------ + # from the perspective of the `tractor.Context` this + # cancel request was conducted "out of band" since + # `Context.cancel()` was never called and thus the + # `._cancel_called: bool` was never set. Despite the + # remote `.canceller` being set to `pikerd` (i.e. the + # same `Actor.uid` of the raising service-mngr task) the + # service-task's ctx itself was never marked as having + # requested cancellation and thus still raises the ctxc + # bc it was unaware of any such request. + # + # How to make grokin these cases easier tho? + # ------------------------------------------ + # Because `Portal.cancel_actor()` was called it requests + # "full-`Actor`-runtime-cancellation" of it's peer + # process which IS NOT THE SAME as a single inter-actor + # RPC task cancelling its local context with a remote + # peer `Task` in that same peer process. + # + # ?TODO? It might be better if we do one (or all) of the + # following: + # + # -[ ] at least set a special message for the + # `ContextCancelled` when raised locally by the + # unaware ctx task such that we check for the + # `.canceller` being *our `Actor`* and in the case + # where `Context._cancel_called == False` we specially + # note that this is likely an "out-of-band" + # runtime-cancel request triggered by some call to + # `Portal.cancel_actor()`, possibly even reporting the + # exact LOC of that caller by tracking it inside our + # portal-type? + # -[ ] possibly add another field `ContextCancelled` like + # maybe a, + # `.request_type: Literal['os', 'proc', 'actor', + # 'ctx']` type thing which would allow immediately + # being able to tell what kind of cancellation caused + # the unexpected ctxc? + # -[ ] REMOVE THIS COMMENT, once we've settled on how to + # better augment `tractor` to be more explicit on this! async def spawn_emsd( -- 2.34.1 From 9a933d82a2d0562f2d572f347066197ae4fa9797 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 21 Aug 2024 13:26:05 -0400 Subject: [PATCH 08/12] More service-mngr clarity notes Nothing changing functionally here just adding more `tractor` operational notes, tips for debug tooling and typing fixes B) Of particular note is adding further details about the reason we do not need to call `Context.cancel()` inside the `finally:` block of `.open_context_in_task()` thanks to `tractor`'s new and improved inter-actor cancellation semantics Bo --- piker/service/_mngr.py | 63 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 52 insertions(+), 11 deletions(-) diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py index 3197bef3..4a4c3938 100644 --- a/piker/service/_mngr.py +++ b/piker/service/_mngr.py @@ -144,6 +144,9 @@ async def open_service_mngr( # TODO: is this more clever/efficient? # if 'samplerd' in mngr.service_tasks: # await mngr.cancel_service('samplerd') + + # await tractor.pause(shield=True) + # ^XXX, if needed mk sure to shield it ;) tn.cancel_scope.cancel() @@ -241,7 +244,11 @@ class ServiceMngr: ] ] = trio.TASK_STATUS_IGNORED, - ) -> Any: + ) -> tuple[ + trio.CancelScope, + Context, + Any, # started value from ctx + ]: # TODO: use the ctx._scope directly here instead? # -[ ] actually what semantics do we expect for this @@ -251,6 +258,10 @@ class ServiceMngr: async with portal.open_context( target, allow_overruns=allow_overruns, + + # hide_tb=False, + # ^XXX^ HAWT TIPZ + **ctx_kwargs, ) as (ctx, started): @@ -269,7 +280,9 @@ class ServiceMngr: # wait on any context's return value # and any final portal result from the # sub-actor. - ctx_res: Any = await ctx.wait_for_result() + ctx_res: Any = await ctx.wait_for_result( + # hide_tb=False, + ) # NOTE: blocks indefinitely until cancelled # either by error from the target context @@ -304,25 +317,53 @@ class ServiceMngr: finally: # NOTE: the ctx MUST be cancelled first if we # don't want the above `ctx.wait_for_result()` to - # raise a self-ctxc. WHY, well since from the ctx's + # raise a self-ctxc. + # + # WHY, well since from the ctx's # perspective the cancel request will have # arrived out-out-of-band at the `Actor.cancel()` - # level, thus `Context.cancel_called == False`, + # level (since pikerd will have called + # `Portal.cancel_actor()`), and thus + # `Context.cancel_called == False`, # meaning `ctx._is_self_cancelled() == False`. - # with trio.CancelScope(shield=True): - # await ctx.cancel() + # + # HOWEVER, this should happen implicitly WITHOUT + # a manual `ctx.cancel()` call HERE since, + # + # - in the mngr shutdown case the surrounding + # `.service_n.cancel_scope` should be + # `.cancel_called == True` and the + # `Portal.open_context()` internals should take + # care of it. + # + # - in the specific-service cancellation case, + # `.cancel_service()` makes the manual + # `ctx.cancel()` call for us which SHOULD mean + # the ctxc is never raised above (since, again, + # it will be gracefully suppressed by + # `.open_context()` internals) and thus we only + # need to shut down the service actor. await portal.cancel_actor() - complete.set() self.service_tasks.pop(name) + complete.set() - cs, sub_ctx, complete, started = await self.service_n.start( + ( + cs, # internally allocated + sub_ctx, # RPC peer-actor ctx + complete, # termination syncing + started, # proxyed from internal `.open_context()` entry. + ) = await self.service_n.start( open_context_in_task ) # store the cancel scope and portal for later cancellation or # retstart if needed. self.service_tasks[name] = (cs, sub_ctx, portal, complete) - return cs, sub_ctx, started + return ( + cs, + sub_ctx, + started, + ) async def cancel_service( self, @@ -341,11 +382,11 @@ class ServiceMngr: await complete.wait() if name in self.service_tasks: - # TODO: custom err? - # raise ServiceError( raise RuntimeError( f'Serice task for {name} not terminated?' ) + # raise ServiceError( + # ^TODO? custom err type? # assert name not in self.service_tasks, \ # f'Serice task for {name} not terminated?' -- 2.34.1 From 8c908e9ed04ec9d98d959cdf6d649707b20e96ad Mon Sep 17 00:00:00 2001 From: Nelson Torres Date: Fri, 23 Aug 2024 18:06:05 +0000 Subject: [PATCH 09/12] Updated tractor method name. --- piker/cli/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index dde7f83c..c88d9b5c 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -335,7 +335,7 @@ def services(config, tl, ports): name='service_query', loglevel=config['loglevel'] if tl else None, ), - tractor.get_arbiter( + tractor.get_registry( host=host, port=ports[0] ) as portal -- 2.34.1 From e3098f56c77d09f2e90a99d3e41c2f787f375c76 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 Feb 2025 10:34:34 -0500 Subject: [PATCH 10/12] Official service-mngr to `tractor.hilevel` move Such that we maintain that subsys in the actor-runtime repo (with hopefully an extensive test suite XD). Port deats, - rewrite `open_service_mngr()` as a thin wrapper that delegates into the new `tractor.hilevel.open_service_mngr()` but with maintenance of the `Services` class-singleton for now. - port `.service._daemon` usage to the new `ServiceMngr.start_service_ctx()` a rename from `.start_service_task()` which is now likely destined for the soon supported `tractor.trionics.TaskMngr` nursery extension. - ref the new `ServiceMngr.an: ActorNursery` instance var name. Other, - always enable the `tractor.pause_from_sync()` support via `greenback` whenever `debug_mode` is set at `pikerd` init. --- piker/service/_daemon.py | 18 +- piker/service/_mngr.py | 441 ++------------------------------------- 2 files changed, 31 insertions(+), 428 deletions(-) diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py index 0cb57291..b881afc6 100644 --- a/piker/service/_daemon.py +++ b/piker/service/_daemon.py @@ -104,6 +104,12 @@ async def maybe_spawn_daemon( # service task for that actor. started: bool if pikerd_portal is None: + + # await tractor.pause() + if tractor_kwargs.get('debug_mode', False): + from tractor.devx._debug import maybe_init_greenback + await maybe_init_greenback() + started = await service_task_target( loglevel=loglevel, **spawn_args, @@ -208,7 +214,7 @@ async def spawn_emsd( log.info('Spawning emsd') smngr: ServiceMngr = get_service_mngr() - portal = await smngr.actor_n.start_actor( + portal = await smngr.an.start_actor( 'emsd', enable_modules=[ 'piker.clearing._ems', @@ -222,12 +228,10 @@ async def spawn_emsd( # non-blocking setup of clearing service from ..clearing._ems import _setup_persistent_emsd - await smngr.start_service_task( - 'emsd', - portal, - - # signature of target root-task endpoint - _setup_persistent_emsd, + await smngr.start_service_ctx( + name='emsd', + portal=portal, + ctx_fn=_setup_persistent_emsd, loglevel=loglevel, ) return True diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py index 4a4c3938..9557a828 100644 --- a/piker/service/_mngr.py +++ b/piker/service/_mngr.py @@ -18,425 +18,16 @@ daemon-service management API. """ -from __future__ import annotations from contextlib import ( asynccontextmanager as acm, - # contextmanager as cm, -) -from collections import defaultdict -from dataclasses import ( - dataclass, - field, -) -import functools -import inspect -from typing import ( - Callable, - Any, ) -import msgspec import tractor -import trio -from trio import TaskStatus -from tractor import ( - ActorNursery, - current_actor, - ContextCancelled, - Context, - Portal, +from tractor.hilevel import ( + ServiceMngr, + # open_service_mngr as _open_service_mngr, + get_service_mngr as get_service_mngr, ) - -from ._util import ( - log, # sub-sys logger -) - - -# TODO: implement a singleton deco-API for wrapping the below -# factory's impl for general actor-singleton use? -# -# @singleton -# async def open_service_mngr( -# **init_kwargs, -# ) -> ServiceMngr: -# ''' -# Note this function body is invoke IFF no existing singleton instance already -# exists in this proc's memory. - -# ''' -# # setup -# yield ServiceMngr(**init_kwargs) -# # teardown - - - -# TODO: singleton factory API instead of a class API -@acm -async def open_service_mngr( - *, - debug_mode: bool = False, - - # impl deat which ensures a single global instance - _singleton: list[ServiceMngr|None] = [None], - **init_kwargs, - -) -> ServiceMngr: - ''' - Open a multi-subactor-as-service-daemon tree supervisor. - - The delivered `ServiceMngr` is a singleton instance for each - actor-process and is allocated on first open and never - de-allocated unless explicitly deleted by al call to - `del_service_mngr()`. - - ''' - # TODO: factor this an allocation into - # a `._mngr.open_service_mngr()` and put in the - # once-n-only-once setup/`.__aenter__()` part! - # -[ ] how to make this only happen on the `mngr == None` case? - # |_ use `.trionics.maybe_open_context()` (for generic - # async-with-style-only-once of the factory impl, though - # what do we do for the allocation case? - # / `.maybe_open_nursery()` (since for this specific case - # it's simpler?) to activate - async with ( - tractor.open_nursery() as an, - trio.open_nursery() as tn, - ): - # impl specific obvi.. - init_kwargs.update({ - 'actor_n': an, - 'service_n': tn, - }) - - mngr: ServiceMngr|None - if (mngr := _singleton[0]) is None: - - log.info('Allocating a new service mngr!') - mngr = _singleton[0] = ServiceMngr(**init_kwargs) - - # TODO: put into `.__aenter__()` section of - # eventual `@singleton_acm` API wrapper. - # - # assign globally for future daemon/task creation - mngr.actor_n = an - mngr.service_n = tn - - else: - assert ( - mngr.actor_n - and - mngr.service_tn - ) - log.info( - 'Using extant service mngr!\n\n' - f'{mngr!r}\n' # it has a nice `.__repr__()` of services state - ) - - try: - # NOTE: this is a singleton factory impl specific detail - # which should be supported in the condensed - # `@singleton_acm` API? - mngr.debug_mode = debug_mode - - yield mngr - finally: - # TODO: is this more clever/efficient? - # if 'samplerd' in mngr.service_tasks: - # await mngr.cancel_service('samplerd') - - # await tractor.pause(shield=True) - # ^XXX, if needed mk sure to shield it ;) - tn.cancel_scope.cancel() - - - -def get_service_mngr() -> ServiceMngr: - ''' - Try to get the singleton service-mngr for this actor presuming it - has already been allocated using, - - .. code:: python - - async with open_<@singleton_acm(func)>() as mngr` - ... this block kept open ... - - If not yet allocated raise a `ServiceError`. - - ''' - # https://stackoverflow.com/a/12627202 - # https://docs.python.org/3/library/inspect.html#inspect.Signature - maybe_mngr: ServiceMngr|None = inspect.signature( - open_service_mngr - ).parameters['_singleton'].default[0] - - if maybe_mngr is None: - raise RuntimeError( - 'Someone must allocate a `ServiceMngr` using\n\n' - '`async with open_service_mngr()` beforehand!!\n' - ) - - return maybe_mngr - - -# TODO: we need remote wrapping and a general soln: -# - factor this into a ``tractor.highlevel`` extension # pack for the -# library. -# - wrap a "remote api" wherein you can get a method proxy -# to the pikerd actor for starting services remotely! -# - prolly rename this to ActorServicesNursery since it spawns -# new actors and supervises them to completion? -@dataclass -class ServiceMngr: -# class ServiceMngr(msgspec.Struct): - ''' - A multi-subactor-as-service manager. - - Spawn, supervise and monitor service/daemon subactors in a SC - process tree. - - ''' - actor_n: ActorNursery - service_n: trio.Nursery - debug_mode: bool = False # tractor sub-actor debug mode flag - - service_tasks: dict[ - str, - tuple[ - trio.CancelScope, - Context, - Portal, - trio.Event, - ] - ] = field(default_factory=dict) - - # internal per-service task mutexs - _locks = defaultdict(trio.Lock) - - async def start_service_task( - self, - name: str, - portal: Portal, - - # TODO: typevar for the return type of the target and then - # use it below for `ctx_res`? - target: Callable, - - allow_overruns: bool = False, - **ctx_kwargs, - - ) -> (trio.CancelScope, Context, Any): - ''' - Open a context in a service sub-actor, add to a stack - that gets unwound at ``pikerd`` teardown. - - This allows for allocating long-running sub-services in our main - daemon and explicitly controlling their lifetimes. - - ''' - async def open_context_in_task( - task_status: TaskStatus[ - tuple[ - trio.CancelScope, - Context, - trio.Event, - Any, - ] - ] = trio.TASK_STATUS_IGNORED, - - ) -> tuple[ - trio.CancelScope, - Context, - Any, # started value from ctx - ]: - - # TODO: use the ctx._scope directly here instead? - # -[ ] actually what semantics do we expect for this - # usage!? - with trio.CancelScope() as cs: - try: - async with portal.open_context( - target, - allow_overruns=allow_overruns, - - # hide_tb=False, - # ^XXX^ HAWT TIPZ - - **ctx_kwargs, - - ) as (ctx, started): - - # unblock once the remote context has started - complete = trio.Event() - task_status.started(( - cs, - ctx, - complete, - started, - )) - log.info( - f'`pikerd` service {name} started with value {started}' - ) - # wait on any context's return value - # and any final portal result from the - # sub-actor. - ctx_res: Any = await ctx.wait_for_result( - # hide_tb=False, - ) - - # NOTE: blocks indefinitely until cancelled - # either by error from the target context - # function or by being cancelled here by the - # surrounding cancel scope. - return ( - await portal.wait_for_result(), - ctx_res, - ) - - except ContextCancelled as ctxe: - canceller: tuple[str, str] = ctxe.canceller - our_uid: tuple[str, str] = current_actor().uid - if ( - canceller != portal.chan.uid - and - canceller != our_uid - ): - log.cancel( - f'Actor-service `{name}` was remotely cancelled by a peer?\n' - - # TODO: this would be a good spot to use - # a respawn feature Bo - f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' - - f'cancellee: {portal.chan.uid}\n' - f'canceller: {canceller}\n' - ) - else: - raise - - finally: - # NOTE: the ctx MUST be cancelled first if we - # don't want the above `ctx.wait_for_result()` to - # raise a self-ctxc. - # - # WHY, well since from the ctx's - # perspective the cancel request will have - # arrived out-out-of-band at the `Actor.cancel()` - # level (since pikerd will have called - # `Portal.cancel_actor()`), and thus - # `Context.cancel_called == False`, - # meaning `ctx._is_self_cancelled() == False`. - # - # HOWEVER, this should happen implicitly WITHOUT - # a manual `ctx.cancel()` call HERE since, - # - # - in the mngr shutdown case the surrounding - # `.service_n.cancel_scope` should be - # `.cancel_called == True` and the - # `Portal.open_context()` internals should take - # care of it. - # - # - in the specific-service cancellation case, - # `.cancel_service()` makes the manual - # `ctx.cancel()` call for us which SHOULD mean - # the ctxc is never raised above (since, again, - # it will be gracefully suppressed by - # `.open_context()` internals) and thus we only - # need to shut down the service actor. - await portal.cancel_actor() - self.service_tasks.pop(name) - complete.set() - - ( - cs, # internally allocated - sub_ctx, # RPC peer-actor ctx - complete, # termination syncing - started, # proxyed from internal `.open_context()` entry. - ) = await self.service_n.start( - open_context_in_task - ) - - # store the cancel scope and portal for later cancellation or - # retstart if needed. - self.service_tasks[name] = (cs, sub_ctx, portal, complete) - return ( - cs, - sub_ctx, - started, - ) - - async def cancel_service( - self, - name: str, - - ) -> Any: - ''' - Cancel the service task and actor for the given ``name``. - - ''' - log.info(f'Cancelling `pikerd` service {name}') - cs, sub_ctx, portal, complete = self.service_tasks[name] - - # cs.cancel() - await sub_ctx.cancel() - await complete.wait() - - if name in self.service_tasks: - raise RuntimeError( - f'Serice task for {name} not terminated?' - ) - # raise ServiceError( - # ^TODO? custom err type? - - # assert name not in self.service_tasks, \ - # f'Serice task for {name} not terminated?' - - async def start_service( - self, - daemon_name: str, - ctx_ep: Callable, # kwargs must `partial`-ed in! - - debug_mode: bool = False, - **tractor_actor_kwargs, - - ) -> Context: - ''' - Start a "service" task in a new sub-actor (daemon) and manage it's lifetime - indefinitely. - - Services can be cancelled/shutdown using `.cancel_service()`. - - ''' - entry: tuple|None = self.service_tasks.get(daemon_name) - if entry: - (cs, sub_ctx, portal, complete) = entry - return sub_ctx - - if daemon_name not in self.service_tasks: - portal = await self.actor_n.start_actor( - daemon_name, - debug_mode=( # maybe set globally during allocate - debug_mode - or - self.debug_mode - ), - **tractor_actor_kwargs, - ) - ctx_kwargs: dict[str, Any] = {} - if isinstance(ctx_ep, functools.partial): - ctx_kwargs: dict[str, Any] = ctx_ep.keywords - ctx_ep: Callable = ctx_ep.func - - (cs, sub_ctx, started) = await self.start_service_task( - daemon_name, - portal, - ctx_ep, - **ctx_kwargs, - ) - - return sub_ctx - - # TODO: # -[ ] factor all the common shit from `.data._sampling` # and `.brokers._daemon` into here / `ServiceMngr` @@ -444,11 +35,19 @@ class ServiceMngr: # "service-in-subactor" starting! # -[ ] move to `tractor.hilevel._service`, import and use here! # NOTE: purposely leaks the ref to the mod-scope Bo -# import tractor -# from tractor.hilevel import ( -# open_service_mngr, -# ServiceMngr, -# ) -# mngr: ServiceMngr|None = None -# with tractor.hilevel.open_service_mngr() as mngr: -# Services = proxy(mngr) + +Services: ServiceMngr|None = None + +@acm +async def open_service_mngr( + **kwargs, +) -> ServiceMngr: + + global Services + async with tractor.hilevel.open_service_mngr( + **kwargs, + ) as mngr: + # Services = proxy(mngr) + Services = mngr + yield mngr + Services = None -- 2.34.1 From 811e857c1c2b0def35cf6a0d26af53b4c5a7ff94 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Feb 2025 12:31:33 -0500 Subject: [PATCH 11/12] Type adjust to `tractor.hilevel.ServicecMngr` --- piker/cli/__init__.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index c88d9b5c..0ba7bc09 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -140,11 +140,10 @@ def pikerd( if pdb: log.warning(( - "\n" - "!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n" - "When a `piker` daemon crashes it will block the " - "task-thread until resumed from console!\n" - "\n" + '\n\n' + '!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n' + 'When a `piker` daemon crashes it will block the ' + 'task-thread until resumed from console!\n' )) # service-actor registry endpoint socket-address set @@ -177,7 +176,7 @@ def pikerd( from .. import service async def main(): - service_mngr: service.Services + service_mngr: service.ServiceMngr async with ( service.open_pikerd( -- 2.34.1 From b9e904666bb1cbbd3f278bb35e1b1ecd75131cb7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 Feb 2025 11:01:52 -0500 Subject: [PATCH 12/12] Support `tractor.pause_from_sync()` in `brokerd`s By passing down the `tractor.hilevel.ServiceMngr.debug_mode: bool` (normally proxied in from the `--pdb` CLI flag) to `spawn_brokerd()` and adjusting the `_setup_persistent_brokerd()` endpoint to do the `tractor.devx._debug.maybe_init_greenback()` if needed. Also in the `broker_init()` factory merge all `tractor` related `kwargs` (i.e. `start_actor_kwargs | datad_kwargs | spawn_kws`) into the 2nd element returned as to be passed to `ActorNursery.start_actor()`. Start re-naming some internal vars/fields as `datad` as well. --- piker/brokers/_daemon.py | 59 ++++++++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 20 deletions(-) diff --git a/piker/brokers/_daemon.py b/piker/brokers/_daemon.py index da92f246..c02ed856 100644 --- a/piker/brokers/_daemon.py +++ b/piker/brokers/_daemon.py @@ -61,12 +61,13 @@ async def _setup_persistent_brokerd( ctx: tractor.Context, brokername: str, loglevel: str | None = None, + debug_mode: bool = False, ) -> None: ''' - Allocate a actor-wide service nursery in ``brokerd`` - such that feeds can be run in the background persistently by - the broker backend as needed. + Allocate a actor-wide service nursery in `brokerd` such that + feeds can be run in the background persistently by the broker + backend as needed. ''' # NOTE: we only need to setup logging once (and only) here @@ -87,6 +88,18 @@ async def _setup_persistent_brokerd( from piker.data import feed assert not feed._bus + if ( + debug_mode + and + tractor.current_actor().is_infected_aio() + ): + # NOTE, whenever running `asyncio` in provider's actor + # runtime be sure we enabled `breakpoint()` support + # for non-`trio.Task` usage. + from tractor.devx._debug import maybe_init_greenback + await maybe_init_greenback() + # breakpoint() # XXX, SHOULD WORK from `trio.Task`! + # allocate a nursery to the bus for spawning background # tasks to service client IPC requests, normally # `tractor.Context` connections to explicitly required @@ -146,18 +159,21 @@ def broker_init( above. ''' - from ..brokers import get_brokermod - brokermod = get_brokermod(brokername) + brokermod: ModuleType = get_brokermod(brokername) modpath: str = brokermod.__name__ - - start_actor_kwargs['name'] = f'brokerd.{brokername}' - start_actor_kwargs.update( - getattr( - brokermod, - '_spawn_kwargs', - {}, - ) + spawn_kws: dict = getattr( + brokermod, + '_spawn_kwargs', + {}, ) + # ^^ NOTE, here we pull any runtime parameters specific + # to spawning the sub-actor for the backend. For ex. + # both `ib` and `deribit` rely on, + # `'infect_asyncio': True,` since they both + # use `tractor`'s "infected `asyncio` mode" + # for their libs but you could also do something like + # `'debug_mode: True` which would be like passing + # `--pdb` for just that provider backend. # XXX TODO: make this not so hacky/monkeypatched.. # -> we need a sane way to configure the logging level for all @@ -167,8 +183,7 @@ def broker_init( # lookup actor-enabled modules declared by the backend offering the # `brokerd` endpoint(s). - enabled: list[str] - enabled = start_actor_kwargs['enable_modules'] = [ + enabled: list[str] = [ __name__, # so that eps from THIS mod can be invoked modpath, ] @@ -180,9 +195,13 @@ def broker_init( subpath: str = f'{modpath}.{submodname}' enabled.append(subpath) + datad_kwargs: dict = { + 'name': f'brokerd.{brokername}', + 'enable_modules': enabled, + } return ( brokermod, - start_actor_kwargs, # to `ActorNursery.start_actor()` + start_actor_kwargs | datad_kwargs | spawn_kws, # to `ActorNursery.start_actor()` # XXX see impl above; contains all (actor global) # setup/teardown expected in all `brokerd` actor instances. @@ -215,10 +234,6 @@ async def spawn_brokerd( **tractor_kwargs, ) - brokermod = get_brokermod(brokername) - extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) - tractor_kwargs.update(extra_tractor_kwargs) - # ask `pikerd` to spawn a new sub-actor and manage it under its # actor nursery from piker.service import ( @@ -236,8 +251,12 @@ async def spawn_brokerd( # passed to daemon_fixture_ep(**kwargs) brokername=brokername, loglevel=loglevel, + debug_mode=mngr.debug_mode, ), debug_mode=mngr.debug_mode, + # ^TODO, allow overriding this per-daemon from client side? + # |_ it's already supported in `tractor` so.. + loglevel=loglevel, enable_modules=( _data_mods -- 2.34.1