Merge pull request #344 from goodboy/harden_cluster_tests

Harden cluster tests
dun_unset_current_actor
goodboy 2022-12-12 15:02:23 -05:00 committed by GitHub
commit 588b7ca7bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 142 additions and 50 deletions

View File

@ -0,0 +1,11 @@
Always ``list``-cast the ``mngrs`` input to
``.trionics.gather_contexts()`` and ensure its size otherwise raise
a ``ValueError``.
Turns out that trying to pass an inline-style generator comprehension
doesn't seem to work inside the ``async with`` expression? Further, in
such a case we can get a hang waiting on the all-entered event
completion when the internal mngrs iteration is a noop. Instead we
always greedily check a size and error on empty input; the lazy
iteration of a generator input is not beneficial anyway since we're
entering all manager instances in concurrent tasks.

View File

@ -1,5 +1,6 @@
import itertools import itertools
import pytest
import trio import trio
import tractor import tractor
from tractor import open_actor_cluster from tractor import open_actor_cluster
@ -11,26 +12,72 @@ from conftest import tractor_test
MESSAGE = 'tractoring at full speed' MESSAGE = 'tractoring at full speed'
def test_empty_mngrs_input_raises() -> None:
async def main():
with trio.fail_after(1):
async with (
open_actor_cluster(
modules=[__name__],
# NOTE: ensure we can passthrough runtime opts
loglevel='info',
# debug_mode=True,
) as portals,
gather_contexts(
# NOTE: it's the use of inline-generator syntax
# here that causes the empty input.
mngrs=(
p.open_context(worker) for p in portals.values()
),
),
):
assert 0
with pytest.raises(ValueError):
trio.run(main)
@tractor.context @tractor.context
async def worker(ctx: tractor.Context) -> None: async def worker(
ctx: tractor.Context,
) -> None:
await ctx.started() await ctx.started()
async with ctx.open_stream(backpressure=True) as stream:
async with ctx.open_stream(
backpressure=True,
) as stream:
# TODO: this with the below assert causes a hang bug?
# with trio.move_on_after(1):
async for msg in stream: async for msg in stream:
# do something with msg # do something with msg
print(msg) print(msg)
assert msg == MESSAGE assert msg == MESSAGE
# TODO: does this ever cause a hang
# assert 0
@tractor_test @tractor_test
async def test_streaming_to_actor_cluster() -> None: async def test_streaming_to_actor_cluster() -> None:
async with ( async with (
open_actor_cluster(modules=[__name__]) as portals, open_actor_cluster(modules=[__name__]) as portals,
gather_contexts( gather_contexts(
mngrs=[p.open_context(worker) for p in portals.values()], mngrs=[p.open_context(worker) for p in portals.values()],
) as contexts, ) as contexts,
gather_contexts( gather_contexts(
mngrs=[ctx[0].open_stream() for ctx in contexts], mngrs=[ctx[0].open_stream() for ctx in contexts],
) as streams, ) as streams,
): ):
with trio.move_on_after(1): with trio.move_on_after(1):
for stream in itertools.cycle(streams): for stream in itertools.cycle(streams):

View File

@ -634,8 +634,13 @@ def test_multi_daemon_subactors(
# expect another breakpoint actor entry # expect another breakpoint actor entry
child.sendline('c') child.sendline('c')
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")
assert_before(child, [bp_forever_msg])
try:
assert_before(child, [bp_forever_msg])
except AssertionError:
assert_before(child, [name_error_msg])
else:
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(child)
@ -660,10 +665,6 @@ def test_multi_daemon_subactors(
except AssertionError: except AssertionError:
break break
# child.sendline('c')
# assert_before(
# child.sendline('c')
assert_before( assert_before(
child, child,
[ [

View File

@ -32,9 +32,12 @@ import tractor
async def open_actor_cluster( async def open_actor_cluster(
modules: list[str], modules: list[str],
count: int = cpu_count(), count: int = cpu_count(),
names: Optional[list[str]] = None, names: list[str] | None = None,
start_method: Optional[str] = None,
hard_kill: bool = False, hard_kill: bool = False,
# passed through verbatim to ``open_root_actor()``
**runtime_kwargs,
) -> AsyncGenerator[ ) -> AsyncGenerator[
dict[str, tractor.Portal], dict[str, tractor.Portal],
None, None,
@ -49,7 +52,9 @@ async def open_actor_cluster(
raise ValueError( raise ValueError(
'Number of names is {len(names)} but count it {count}') 'Number of names is {len(names)} but count it {count}')
async with tractor.open_nursery(start_method=start_method) as an: async with tractor.open_nursery(
**runtime_kwargs,
) as an:
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
uid = tractor.current_actor().uid uid = tractor.current_actor().uid

View File

@ -108,7 +108,7 @@ async def query_actor(
@acm @acm
async def find_actor( async def find_actor(
name: str, name: str,
arbiter_sockaddr: tuple[str, int] = None arbiter_sockaddr: tuple[str, int] | None = None
) -> AsyncGenerator[Optional[Portal], None]: ) -> AsyncGenerator[Optional[Portal], None]:
''' '''
@ -134,7 +134,7 @@ async def find_actor(
@acm @acm
async def wait_for_actor( async def wait_for_actor(
name: str, name: str,
arbiter_sockaddr: tuple[str, int] = None arbiter_sockaddr: tuple[str, int] | None = None
) -> AsyncGenerator[Portal, None]: ) -> AsyncGenerator[Portal, None]:
"""Wait on an actor to register with the arbiter. """Wait on an actor to register with the arbiter.

View File

@ -51,7 +51,7 @@ def _mp_main(
accept_addr: tuple[str, int], accept_addr: tuple[str, int],
forkserver_info: tuple[Any, Any, Any, Any, Any], forkserver_info: tuple[Any, Any, Any, Any, Any],
start_method: SpawnMethodKey, start_method: SpawnMethodKey,
parent_addr: tuple[str, int] = None, parent_addr: tuple[str, int] | None = None,
infect_asyncio: bool = False, infect_asyncio: bool = False,
) -> None: ) -> None:
@ -98,7 +98,7 @@ def _trio_main(
actor: Actor, # type: ignore actor: Actor, # type: ignore
*, *,
parent_addr: tuple[str, int] = None, parent_addr: tuple[str, int] | None = None,
infect_asyncio: bool = False, infect_asyncio: bool = False,
) -> None: ) -> None:

View File

@ -341,7 +341,7 @@ class Channel:
async def connect( async def connect(
self, self,
destaddr: tuple[Any, ...] = None, destaddr: tuple[Any, ...] | None = None,
**kwargs **kwargs
) -> MsgTransport: ) -> MsgTransport:

View File

@ -186,7 +186,7 @@ class Portal:
async def cancel_actor( async def cancel_actor(
self, self,
timeout: float = None, timeout: float | None = None,
) -> bool: ) -> bool:
''' '''
@ -536,7 +536,10 @@ class Portal:
await maybe_wait_for_debugger() await maybe_wait_for_debugger()
# remove the context from runtime tracking # remove the context from runtime tracking
self.actor._contexts.pop((self.channel.uid, ctx.cid)) self.actor._contexts.pop(
(self.channel.uid, ctx.cid),
None,
)
@dataclass @dataclass

View File

@ -423,8 +423,8 @@ class Actor:
name: str, name: str,
*, *,
enable_modules: list[str] = [], enable_modules: list[str] = [],
uid: str = None, uid: str | None = None,
loglevel: str = None, loglevel: str | None = None,
arbiter_addr: Optional[tuple[str, int]] = None, arbiter_addr: Optional[tuple[str, int]] = None,
spawn_method: Optional[str] = None spawn_method: Optional[str] = None
) -> None: ) -> None:
@ -980,7 +980,7 @@ class Actor:
handler_nursery: trio.Nursery, handler_nursery: trio.Nursery,
*, *,
# (host, port) to bind for channel server # (host, port) to bind for channel server
accept_host: tuple[str, int] = None, accept_host: tuple[str, int] | None = None,
accept_port: int = 0, accept_port: int = 0,
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -1648,17 +1648,28 @@ class Arbiter(Actor):
''' '''
is_arbiter = True is_arbiter = True
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs) -> None:
self._registry: dict[ self._registry: dict[
tuple[str, str], tuple[str, str],
tuple[str, int], tuple[str, int],
] = {} ] = {}
self._waiters = {} self._waiters: dict[
str,
# either an event to sync to receiving an actor uid (which
# is filled in once the actor has sucessfully registered),
# or that uid after registry is complete.
list[trio.Event | tuple[str, str]]
] = {}
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
async def find_actor(self, name: str) -> Optional[tuple[str, int]]: async def find_actor(
self,
name: str,
) -> tuple[str, int] | None:
for uid, sockaddr in self._registry.items(): for uid, sockaddr in self._registry.items():
if name in uid: if name in uid:
return sockaddr return sockaddr
@ -1693,7 +1704,8 @@ class Arbiter(Actor):
registered. registered.
''' '''
sockaddrs = [] sockaddrs: list[tuple[str, int]] = []
sockaddr: tuple[str, int]
for (aname, _), sockaddr in self._registry.items(): for (aname, _), sockaddr in self._registry.items():
if name == aname: if name == aname:
@ -1703,7 +1715,9 @@ class Arbiter(Actor):
waiter = trio.Event() waiter = trio.Event()
self._waiters.setdefault(name, []).append(waiter) self._waiters.setdefault(name, []).append(waiter)
await waiter.wait() await waiter.wait()
for uid in self._waiters[name]: for uid in self._waiters[name]:
if not isinstance(uid, trio.Event):
sockaddrs.append(self._registry[uid]) sockaddrs.append(self._registry[uid])
return sockaddrs return sockaddrs
@ -1714,11 +1728,11 @@ class Arbiter(Actor):
sockaddr: tuple[str, int] sockaddr: tuple[str, int]
) -> None: ) -> None:
uid = name, uuid = (str(uid[0]), str(uid[1])) uid = name, _ = (str(uid[0]), str(uid[1]))
self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1])) self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1]))
# pop and signal all waiter events # pop and signal all waiter events
events = self._waiters.pop(name, ()) events = self._waiters.pop(name, [])
self._waiters.setdefault(name, []).append(uid) self._waiters.setdefault(name, []).append(uid)
for event in events: for event in events:
if isinstance(event, trio.Event): if isinstance(event, trio.Event):

View File

@ -111,11 +111,11 @@ class ActorNursery:
name: str, name: str,
*, *,
bind_addr: tuple[str, int] = _default_bind_addr, bind_addr: tuple[str, int] = _default_bind_addr,
rpc_module_paths: list[str] = None, rpc_module_paths: list[str] | None = None,
enable_modules: list[str] = None, enable_modules: list[str] | None = None,
loglevel: str = None, # set log level per subactor loglevel: str | None = None, # set log level per subactor
nursery: trio.Nursery = None, nursery: trio.Nursery | None = None,
debug_mode: Optional[bool] = None, debug_mode: Optional[bool] | None = None,
infect_asyncio: bool = False, infect_asyncio: bool = False,
) -> Portal: ) -> Portal:
''' '''
@ -182,9 +182,9 @@ class ActorNursery:
name: Optional[str] = None, name: Optional[str] = None,
bind_addr: tuple[str, int] = _default_bind_addr, bind_addr: tuple[str, int] = _default_bind_addr,
rpc_module_paths: Optional[list[str]] = None, rpc_module_paths: list[str] | None = None,
enable_modules: list[str] = None, enable_modules: list[str] | None = None,
loglevel: str = None, # set log level per subactor loglevel: str | None = None, # set log level per subactor
infect_asyncio: bool = False, infect_asyncio: bool = False,
**kwargs, # explicit args to ``fn`` **kwargs, # explicit args to ``fn``

View File

@ -48,7 +48,7 @@ log = get_logger('messaging')
async def fan_out_to_ctxs( async def fan_out_to_ctxs(
pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy
topics2ctxs: dict[str, list], topics2ctxs: dict[str, list],
packetizer: typing.Callable = None, packetizer: typing.Callable | None = None,
) -> None: ) -> None:
''' '''
Request and fan out quotes to each subscribed actor channel. Request and fan out quotes to each subscribed actor channel.
@ -144,7 +144,7 @@ _pubtask2lock: dict[str, trio.StrictFIFOLock] = {}
def pub( def pub(
wrapped: typing.Callable = None, wrapped: typing.Callable | None = None,
*, *,
tasks: set[str] = set(), tasks: set[str] = set(),
): ):
@ -249,8 +249,8 @@ def pub(
topics: set[str], topics: set[str],
*args, *args,
# *, # *,
task_name: str = None, # default: only one task allocated task_name: str | None = None, # default: only one task allocated
packetizer: Callable = None, packetizer: Callable | None = None,
**kwargs, **kwargs,
): ):
if task_name is None: if task_name is None:

View File

@ -172,7 +172,7 @@ class ActorContextInfo(Mapping):
def get_logger( def get_logger(
name: str = None, name: str | None = None,
_root_name: str = _proj_name, _root_name: str = _proj_name,
) -> StackLevelAdapter: ) -> StackLevelAdapter:
@ -207,7 +207,7 @@ def get_logger(
def get_console_log( def get_console_log(
level: str = None, level: str | None = None,
**kwargs, **kwargs,
) -> logging.LoggerAdapter: ) -> logging.LoggerAdapter:
'''Get the package logger and enable a handler which writes to stderr. '''Get the package logger and enable a handler which writes to stderr.

View File

@ -47,7 +47,7 @@ T = TypeVar("T")
@acm @acm
async def maybe_open_nursery( async def maybe_open_nursery(
nursery: trio.Nursery = None, nursery: trio.Nursery | None = None,
shield: bool = False, shield: bool = False,
) -> AsyncGenerator[trio.Nursery, Any]: ) -> AsyncGenerator[trio.Nursery, Any]:
''' '''
@ -109,6 +109,17 @@ async def gather_contexts(
all_entered = trio.Event() all_entered = trio.Event()
parent_exit = trio.Event() parent_exit = trio.Event()
# XXX: ensure greedy sequence of manager instances
# since a lazy inline generator doesn't seem to work
# with `async with` syntax.
mngrs = list(mngrs)
if not mngrs:
raise ValueError(
'input mngrs is empty?\n'
'Did try to use inline generator syntax?'
)
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
for mngr in mngrs: for mngr in mngrs:
n.start_soon( n.start_soon(