From d8e48e29ba73409a164b2a0e12fc8b6255c9d0e6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 11 Dec 2022 19:46:33 -0500 Subject: [PATCH 1/7] Add `mngrs=()` test --- tests/test_clustering.py | 51 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/tests/test_clustering.py b/tests/test_clustering.py index 56e629b..df3d835 100644 --- a/tests/test_clustering.py +++ b/tests/test_clustering.py @@ -1,5 +1,6 @@ import itertools +import pytest import trio import tractor from tractor import open_actor_cluster @@ -11,26 +12,72 @@ from conftest import tractor_test MESSAGE = 'tractoring at full speed' +def test_empty_mngrs_input_raises() -> None: + + async def main(): + with trio.fail_after(1): + async with ( + open_actor_cluster( + modules=[__name__], + + # NOTE: ensure we can passthrough runtime opts + loglevel='info', + # debug_mode=True, + + ) as portals, + + gather_contexts( + # NOTE: it's the use of inline-generator syntax + # here that causes the empty input. + mngrs=( + p.open_context(worker) for p in portals.values() + ), + ), + ): + assert 0 + + with pytest.raises(ValueError): + trio.run(main) + + @tractor.context -async def worker(ctx: tractor.Context) -> None: +async def worker( + ctx: tractor.Context, + +) -> None: + await ctx.started() - async with ctx.open_stream(backpressure=True) as stream: + + async with ctx.open_stream( + backpressure=True, + ) as stream: + + # TODO: this with the below assert causes a hang bug? + # with trio.move_on_after(1): + async for msg in stream: # do something with msg print(msg) assert msg == MESSAGE + # TODO: does this ever cause a hang + # assert 0 + @tractor_test async def test_streaming_to_actor_cluster() -> None: + async with ( open_actor_cluster(modules=[__name__]) as portals, + gather_contexts( mngrs=[p.open_context(worker) for p in portals.values()], ) as contexts, + gather_contexts( mngrs=[ctx[0].open_stream() for ctx in contexts], ) as streams, + ): with trio.move_on_after(1): for stream in itertools.cycle(streams): From c606be8c640d43da260ac1d457a3551150b3d85e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 11 Dec 2022 19:47:59 -0500 Subject: [PATCH 2/7] Passthrough runtime kwargs from `open_actor_cluster()` --- tractor/_clustering.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tractor/_clustering.py b/tractor/_clustering.py index 0775a22..93562fe 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -32,9 +32,12 @@ import tractor async def open_actor_cluster( modules: list[str], count: int = cpu_count(), - names: Optional[list[str]] = None, - start_method: Optional[str] = None, + names: list[str] | None = None, hard_kill: bool = False, + + # passed through verbatim to ``open_root_actor()`` + **runtime_kwargs, + ) -> AsyncGenerator[ dict[str, tractor.Portal], None, @@ -49,7 +52,9 @@ async def open_actor_cluster( raise ValueError( 'Number of names is {len(names)} but count it {count}') - async with tractor.open_nursery(start_method=start_method) as an: + async with tractor.open_nursery( + **runtime_kwargs, + ) as an: async with trio.open_nursery() as n: uid = tractor.current_actor().uid From b5192cca8ef32d89ab46ba98d231bb47aa1218a3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 11 Dec 2022 19:51:53 -0500 Subject: [PATCH 3/7] Always greedily `list`-cast`mngrs` input sequence --- tractor/trionics/_mngrs.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 09df201..b25c59a 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -109,6 +109,17 @@ async def gather_contexts( all_entered = trio.Event() parent_exit = trio.Event() + # XXX: ensure greedy sequence of manager instances + # since a lazy inline generator doesn't seem to work + # with `async with` syntax. + mngrs = list(mngrs) + + if not mngrs: + raise ValueError( + 'input mngrs is empty?\n' + 'Did try to use inline generator syntax?' + ) + async with trio.open_nursery() as n: for mngr in mngrs: n.start_soon( From 38326e8c155052b5b60cc29b730dad0b61d2ed73 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 11 Dec 2022 19:48:54 -0500 Subject: [PATCH 4/7] Avoid error on context double pops --- tractor/_portal.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 089b09e..bdba793 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -536,7 +536,10 @@ class Portal: await maybe_wait_for_debugger() # remove the context from runtime tracking - self.actor._contexts.pop((self.channel.uid, ctx.cid)) + self.actor._contexts.pop( + (self.channel.uid, ctx.cid), + None, + ) @dataclass From 6c8cacc9d1e1374772775f1aa715115f61e4d3d7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 12 Dec 2022 13:18:22 -0500 Subject: [PATCH 5/7] Adjust all default is `None` annots (per new `mypy`) --- tractor/_discovery.py | 4 ++-- tractor/_entry.py | 4 ++-- tractor/_ipc.py | 2 +- tractor/_portal.py | 2 +- tractor/_runtime.py | 34 +++++++++++++++++++++++---------- tractor/_supervise.py | 16 ++++++++-------- tractor/experimental/_pubsub.py | 8 ++++---- tractor/log.py | 4 ++-- tractor/trionics/_mngrs.py | 2 +- 9 files changed, 45 insertions(+), 31 deletions(-) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 58aeb3e..b6957ba 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -108,7 +108,7 @@ async def query_actor( @acm async def find_actor( name: str, - arbiter_sockaddr: tuple[str, int] = None + arbiter_sockaddr: tuple[str, int] | None = None ) -> AsyncGenerator[Optional[Portal], None]: ''' @@ -134,7 +134,7 @@ async def find_actor( @acm async def wait_for_actor( name: str, - arbiter_sockaddr: tuple[str, int] = None + arbiter_sockaddr: tuple[str, int] | None = None ) -> AsyncGenerator[Portal, None]: """Wait on an actor to register with the arbiter. diff --git a/tractor/_entry.py b/tractor/_entry.py index 9e95fee..1e7997e 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -51,7 +51,7 @@ def _mp_main( accept_addr: tuple[str, int], forkserver_info: tuple[Any, Any, Any, Any, Any], start_method: SpawnMethodKey, - parent_addr: tuple[str, int] = None, + parent_addr: tuple[str, int] | None = None, infect_asyncio: bool = False, ) -> None: @@ -98,7 +98,7 @@ def _trio_main( actor: Actor, # type: ignore *, - parent_addr: tuple[str, int] = None, + parent_addr: tuple[str, int] | None = None, infect_asyncio: bool = False, ) -> None: diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 0ed22bb..ebfd261 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -341,7 +341,7 @@ class Channel: async def connect( self, - destaddr: tuple[Any, ...] = None, + destaddr: tuple[Any, ...] | None = None, **kwargs ) -> MsgTransport: diff --git a/tractor/_portal.py b/tractor/_portal.py index bdba793..5546949 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -186,7 +186,7 @@ class Portal: async def cancel_actor( self, - timeout: float = None, + timeout: float | None = None, ) -> bool: ''' diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 9f8fed0..12b1f82 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -423,8 +423,8 @@ class Actor: name: str, *, enable_modules: list[str] = [], - uid: str = None, - loglevel: str = None, + uid: str | None = None, + loglevel: str | None = None, arbiter_addr: Optional[tuple[str, int]] = None, spawn_method: Optional[str] = None ) -> None: @@ -980,7 +980,7 @@ class Actor: handler_nursery: trio.Nursery, *, # (host, port) to bind for channel server - accept_host: tuple[str, int] = None, + accept_host: tuple[str, int] | None = None, accept_port: int = 0, task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -1648,17 +1648,28 @@ class Arbiter(Actor): ''' is_arbiter = True - def __init__(self, *args, **kwargs): + def __init__(self, *args, **kwargs) -> None: self._registry: dict[ tuple[str, str], tuple[str, int], ] = {} - self._waiters = {} + self._waiters: dict[ + str, + # either an event to sync to receiving an actor uid (which + # is filled in once the actor has sucessfully registered), + # or that uid after registry is complete. + list[trio.Event | tuple[str, str]] + ] = {} super().__init__(*args, **kwargs) - async def find_actor(self, name: str) -> Optional[tuple[str, int]]: + async def find_actor( + self, + name: str, + + ) -> tuple[str, int] | None: + for uid, sockaddr in self._registry.items(): if name in uid: return sockaddr @@ -1693,7 +1704,8 @@ class Arbiter(Actor): registered. ''' - sockaddrs = [] + sockaddrs: list[tuple[str, int]] = [] + sockaddr: tuple[str, int] for (aname, _), sockaddr in self._registry.items(): if name == aname: @@ -1703,8 +1715,10 @@ class Arbiter(Actor): waiter = trio.Event() self._waiters.setdefault(name, []).append(waiter) await waiter.wait() + for uid in self._waiters[name]: - sockaddrs.append(self._registry[uid]) + if not isinstance(uid, trio.Event): + sockaddrs.append(self._registry[uid]) return sockaddrs @@ -1714,11 +1728,11 @@ class Arbiter(Actor): sockaddr: tuple[str, int] ) -> None: - uid = name, uuid = (str(uid[0]), str(uid[1])) + uid = name, _ = (str(uid[0]), str(uid[1])) self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1])) # pop and signal all waiter events - events = self._waiters.pop(name, ()) + events = self._waiters.pop(name, []) self._waiters.setdefault(name, []).append(uid) for event in events: if isinstance(event, trio.Event): diff --git a/tractor/_supervise.py b/tractor/_supervise.py index a41cfd5..3085272 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -111,11 +111,11 @@ class ActorNursery: name: str, *, bind_addr: tuple[str, int] = _default_bind_addr, - rpc_module_paths: list[str] = None, - enable_modules: list[str] = None, - loglevel: str = None, # set log level per subactor - nursery: trio.Nursery = None, - debug_mode: Optional[bool] = None, + rpc_module_paths: list[str] | None = None, + enable_modules: list[str] | None = None, + loglevel: str | None = None, # set log level per subactor + nursery: trio.Nursery | None = None, + debug_mode: Optional[bool] | None = None, infect_asyncio: bool = False, ) -> Portal: ''' @@ -182,9 +182,9 @@ class ActorNursery: name: Optional[str] = None, bind_addr: tuple[str, int] = _default_bind_addr, - rpc_module_paths: Optional[list[str]] = None, - enable_modules: list[str] = None, - loglevel: str = None, # set log level per subactor + rpc_module_paths: list[str] | None = None, + enable_modules: list[str] | None = None, + loglevel: str | None = None, # set log level per subactor infect_asyncio: bool = False, **kwargs, # explicit args to ``fn`` diff --git a/tractor/experimental/_pubsub.py b/tractor/experimental/_pubsub.py index 0481f77..99117b0 100644 --- a/tractor/experimental/_pubsub.py +++ b/tractor/experimental/_pubsub.py @@ -48,7 +48,7 @@ log = get_logger('messaging') async def fan_out_to_ctxs( pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy topics2ctxs: dict[str, list], - packetizer: typing.Callable = None, + packetizer: typing.Callable | None = None, ) -> None: ''' Request and fan out quotes to each subscribed actor channel. @@ -144,7 +144,7 @@ _pubtask2lock: dict[str, trio.StrictFIFOLock] = {} def pub( - wrapped: typing.Callable = None, + wrapped: typing.Callable | None = None, *, tasks: set[str] = set(), ): @@ -249,8 +249,8 @@ def pub( topics: set[str], *args, # *, - task_name: str = None, # default: only one task allocated - packetizer: Callable = None, + task_name: str | None = None, # default: only one task allocated + packetizer: Callable | None = None, **kwargs, ): if task_name is None: diff --git a/tractor/log.py b/tractor/log.py index 4273c9b..342257f 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -172,7 +172,7 @@ class ActorContextInfo(Mapping): def get_logger( - name: str = None, + name: str | None = None, _root_name: str = _proj_name, ) -> StackLevelAdapter: @@ -207,7 +207,7 @@ def get_logger( def get_console_log( - level: str = None, + level: str | None = None, **kwargs, ) -> logging.LoggerAdapter: '''Get the package logger and enable a handler which writes to stderr. diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index b25c59a..9a7cf7f 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -47,7 +47,7 @@ T = TypeVar("T") @acm async def maybe_open_nursery( - nursery: trio.Nursery = None, + nursery: trio.Nursery | None = None, shield: bool = False, ) -> AsyncGenerator[trio.Nursery, Any]: ''' From 48f6d514ef8a59739235182e2c66cc60de9fa5e8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 12 Dec 2022 14:05:32 -0500 Subject: [PATCH 6/7] Handle earlier name error crash in debug test --- tests/test_debugger.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 8704bb1..dd5ab46 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -634,18 +634,23 @@ def test_multi_daemon_subactors( # expect another breakpoint actor entry child.sendline('c') child.expect(r"\(Pdb\+\+\)") - assert_before(child, [bp_forever_msg]) - if ctlc: - do_ctlc(child) + try: + assert_before(child, [bp_forever_msg]) + except AssertionError: + assert_before(child, [name_error_msg]) - # should crash with the 2nd name error (simulates - # a retry) and then the root eventually (boxed) errors - # after 1 or more further bp actor entries. + else: + if ctlc: + do_ctlc(child) - child.sendline('c') - child.expect(r"\(Pdb\+\+\)") - assert_before(child, [name_error_msg]) + # should crash with the 2nd name error (simulates + # a retry) and then the root eventually (boxed) errors + # after 1 or more further bp actor entries. + + child.sendline('c') + child.expect(r"\(Pdb\+\+\)") + assert_before(child, [name_error_msg]) # wait for final error in root # where it crashs with boxed error @@ -660,10 +665,6 @@ def test_multi_daemon_subactors( except AssertionError: break - # child.sendline('c') - # assert_before( - - # child.sendline('c') assert_before( child, [ From d8214735b946bfa283c6fe5a7fc541054abd86dc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 12 Dec 2022 14:53:59 -0500 Subject: [PATCH 7/7] Add bugfix nooz --- nooz/344.bugfix.rst | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 nooz/344.bugfix.rst diff --git a/nooz/344.bugfix.rst b/nooz/344.bugfix.rst new file mode 100644 index 0000000..997ee77 --- /dev/null +++ b/nooz/344.bugfix.rst @@ -0,0 +1,11 @@ +Always ``list``-cast the ``mngrs`` input to +``.trionics.gather_contexts()`` and ensure its size otherwise raise +a ``ValueError``. + +Turns out that trying to pass an inline-style generator comprehension +doesn't seem to work inside the ``async with`` expression? Further, in +such a case we can get a hang waiting on the all-entered event +completion when the internal mngrs iteration is a noop. Instead we +always greedily check a size and error on empty input; the lazy +iteration of a generator input is not beneficial anyway since we're +entering all manager instances in concurrent tasks.