Compare commits

..

No commits in common. "3a31c9d3388b28277bc9af7f3aca22217431072a" and "22c14e235e3390674796dfc3c462863395d2bfda" have entirely different histories.

7 changed files with 57 additions and 182 deletions

View File

@ -41,9 +41,6 @@ setup(
], ],
install_requires=[ install_requires=[
# discovery subsys
'bidict',
# trio related # trio related
# proper range spec: # proper range spec:
# https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5 # https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5

View File

@ -219,8 +219,7 @@ def daemon(
arb_addr: tuple[str, int], arb_addr: tuple[str, int],
): ):
''' '''
Run a daemon actor as a "remote registrar" and/or plain ol Run a daemon actor as a "remote arbiter".
separate actor (service) tree.
''' '''
if loglevel in ('trace', 'debug'): if loglevel in ('trace', 'debug'):

View File

@ -1,7 +1,6 @@
''' """
Discovery subsystem via a "registrar" actor scenarios. Actor "discovery" testing
"""
'''
import os import os
import signal import signal
import platform import platform
@ -128,10 +127,7 @@ async def unpack_reg(actor_or_portal):
else: else:
msg = await actor_or_portal.run_from_ns('self', 'get_registry') msg = await actor_or_portal.run_from_ns('self', 'get_registry')
return { return {tuple(key.split('.')): val for key, val in msg.items()}
tuple(key.split('.')): val
for key, val in msg.items()
}
async def spawn_and_check_registry( async def spawn_and_check_registry(
@ -287,22 +283,18 @@ async def close_chans_before_nursery(
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
portal1 = await tn.start_actor( portal1 = await tn.start_actor(
name='consumer1', name='consumer1', enable_modules=[__name__])
enable_modules=[__name__],
)
portal2 = await tn.start_actor( portal2 = await tn.start_actor(
'consumer2', 'consumer2', enable_modules=[__name__])
enable_modules=[__name__],
)
async with ( # TODO: compact this back as was in last commit once
portal1.open_stream_from( # 3.9+, see https://github.com/goodboy/tractor/issues/207
async with portal1.open_stream_from(
stream_forever stream_forever
) as agen1, ) as agen1:
portal2.open_stream_from( async with portal2.open_stream_from(
stream_forever stream_forever
) as agen2, ) as agen2:
):
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
n.start_soon(streamer, agen1) n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5) n.start_soon(cancel, use_signal, .5)
@ -339,12 +331,10 @@ def test_close_channel_explicit(
use_signal, use_signal,
arb_addr, arb_addr,
): ):
''' """Verify that closing a stream explicitly and killing the actor's
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also "root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter. results in subactor(s) deregistering from the arbiter.
"""
'''
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
trio.run( trio.run(
partial( partial(
@ -357,18 +347,16 @@ def test_close_channel_explicit(
@pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_registrar( def test_close_channel_explicit_remote_arbiter(
daemon, daemon,
start_method, start_method,
use_signal, use_signal,
arb_addr, arb_addr,
): ):
''' """Verify that closing a stream explicitly and killing the actor's
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also "root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter. results in subactor(s) deregistering from the arbiter.
"""
'''
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
trio.run( trio.run(
partial( partial(
@ -378,51 +366,3 @@ def test_close_channel_explicit_remote_registrar(
remote_arbiter=True, remote_arbiter=True,
), ),
) )
@tractor.context
async def kill_transport(
ctx: tractor.Context,
) -> None:
await ctx.started()
actor: tractor.Actor = tractor.current_actor()
actor.cancel_server()
await trio.sleep_forever()
# @pytest.mark.parametrize('use_signal', [False, True])
def test_stale_entry_is_deleted(
daemon,
start_method,
arb_addr,
):
'''
Ensure that when a stale entry is detected in the registrar's table
that the `find_actor()` API takes care of deleting the stale entry
and not delivering a bad portal.
'''
async def main():
name: str = 'transport_fails_actor'
regport: tractor.Portal
tn: tractor.ActorNursery
async with (
tractor.open_nursery() as tn,
tractor.get_registrar(*arb_addr) as regport,
):
ptl: tractor.Portal = await tn.start_actor(
name,
enable_modules=[__name__],
)
async with ptl.open_context(
kill_transport,
) as (first, ctx):
async with tractor.find_actor(name) as maybe_portal:
assert maybe_portal is None
await ptl.cancel_actor()
trio.run(main)

View File

@ -31,7 +31,6 @@ from ._streaming import (
) )
from ._discovery import ( from ._discovery import (
get_arbiter, get_arbiter,
get_registrar,
find_actor, find_actor,
wait_for_actor, wait_for_actor,
query_actor, query_actor,
@ -78,7 +77,6 @@ __all__ = [
'find_actor', 'find_actor',
'query_actor', 'query_actor',
'get_arbiter', 'get_arbiter',
'get_registrar',
'is_root_process', 'is_root_process',
'msg', 'msg',
'open_actor_cluster', 'open_actor_cluster',

View File

@ -35,7 +35,7 @@ from ._state import current_actor, _runtime_vars
@acm @acm
async def get_registrar( async def get_arbiter(
host: str, host: str,
port: int, port: int,
@ -56,16 +56,13 @@ async def get_registrar(
# (likely a re-entrant call from the arbiter actor) # (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor, Channel((host, port))) yield LocalPortal(actor, Channel((host, port)))
else: else:
async with ( async with _connect_chan(host, port) as chan:
_connect_chan(host, port) as chan,
open_portal(chan) as arb_portal, async with open_portal(chan) as arb_portal:
):
yield arb_portal yield arb_portal
get_arbiter = get_registrar
@acm @acm
async def get_root( async def get_root(
**kwargs, **kwargs,
@ -104,10 +101,7 @@ async def query_actor(
# TODO: return portals to all available actors - for now just # TODO: return portals to all available actors - for now just
# the last one that registered # the last one that registered
if ( if name == 'arbiter' and actor.is_arbiter:
name == 'arbiter'
and actor.is_arbiter
):
raise RuntimeError("The current actor is the arbiter") raise RuntimeError("The current actor is the arbiter")
yield sockaddr if sockaddr else None yield sockaddr if sockaddr else None
@ -118,7 +112,7 @@ async def find_actor(
name: str, name: str,
arbiter_sockaddr: tuple[str, int] | None = None arbiter_sockaddr: tuple[str, int] | None = None
) -> AsyncGenerator[Portal | None, None]: ) -> AsyncGenerator[Optional[Portal], None]:
''' '''
Ask the arbiter to find actor(s) by name. Ask the arbiter to find actor(s) by name.
@ -126,48 +120,16 @@ async def find_actor(
known to the arbiter. known to the arbiter.
''' '''
actor = current_actor() async with query_actor(
async with get_arbiter(
*arbiter_sockaddr or actor._arb_addr
) as arb_portal:
sockaddr = await arb_portal.run_from_ns(
'self',
'find_actor',
name=name, name=name,
) arbiter_sockaddr=arbiter_sockaddr,
) as sockaddr:
# TODO: return portals to all available actors - for now just
# the last one that registered
if (
name == 'arbiter'
and actor.is_arbiter
):
raise RuntimeError("The current actor is the arbiter")
if sockaddr: if sockaddr:
try:
async with _connect_chan(*sockaddr) as chan: async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:
yield portal yield portal
return else:
# most likely we were unable to connect the
# transport and there is likely a stale entry in
# the registry actor's table, thus we need to
# instruct it to clear that stale entry and then
# more silently (pretend there was no reason but
# to) indicate that the target actor can't be
# contacted at that addr.
except OSError:
# NOTE: ensure we delete the stale entry from the
# registar actor.
uid: tuple[str, str] = await arb_portal.run_from_ns(
'self',
'delete_sockaddr',
sockaddr=sockaddr,
)
yield None yield None

View File

@ -40,7 +40,6 @@ from contextlib import ExitStack
import warnings import warnings
from async_generator import aclosing from async_generator import aclosing
from bidict import bidict
from exceptiongroup import BaseExceptionGroup from exceptiongroup import BaseExceptionGroup
import trio # type: ignore import trio # type: ignore
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -1775,10 +1774,10 @@ class Arbiter(Actor):
def __init__(self, *args, **kwargs) -> None: def __init__(self, *args, **kwargs) -> None:
self._registry: bidict[ self._registry: dict[
tuple[str, str], tuple[str, str],
tuple[str, int], tuple[str, int],
] = bidict({}) ] = {}
self._waiters: dict[ self._waiters: dict[
str, str,
# either an event to sync to receiving an actor uid (which # either an event to sync to receiving an actor uid (which
@ -1872,23 +1871,3 @@ class Arbiter(Actor):
entry: tuple = self._registry.pop(uid, None) entry: tuple = self._registry.pop(uid, None)
if entry is None: if entry is None:
log.warning(f'Request to de-register {uid} failed?') log.warning(f'Request to de-register {uid} failed?')
async def delete_sockaddr(
self,
sockaddr: tuple[str, int],
) -> tuple[str, str]:
uid: tuple | None = self._registry.inverse.pop(
sockaddr,
None,
)
if uid:
log.warning(
f'Deleting registry entry for {sockaddr}@{uid}!'
)
else:
log.warning(
f'No registry entry for {sockaddr}@{uid}!'
)
return uid

View File

@ -216,7 +216,7 @@ def _run_asyncio_task(
try: try:
result = await coro result = await coro
except BaseException as aio_err: except BaseException as aio_err:
# log.exception('asyncio task errored:') log.exception('asyncio task errored')
chan._aio_err = aio_err chan._aio_err = aio_err
raise raise
@ -300,7 +300,7 @@ def _run_asyncio_task(
elif task_err is None: elif task_err is None:
assert aio_err assert aio_err
aio_err.with_traceback(aio_err.__traceback__) aio_err.with_traceback(aio_err.__traceback__)
# log.error('infected task errorred') log.error('infected task errorred')
# XXX: alway cancel the scope on error # XXX: alway cancel the scope on error
# in case the trio task is blocking # in case the trio task is blocking
@ -356,7 +356,7 @@ async def translate_aio_errors(
# relay cancel through to called ``asyncio`` task # relay cancel through to called ``asyncio`` task
assert chan._aio_task assert chan._aio_task
chan._aio_task.cancel( chan._aio_task.cancel(
msg=f'`trio`-side caller task cancelled: {trio_task.name}' msg=f'the `trio` caller task was cancelled: {trio_task.name}'
) )
raise raise
@ -366,7 +366,7 @@ async def translate_aio_errors(
trio.ClosedResourceError, trio.ClosedResourceError,
# trio.BrokenResourceError, # trio.BrokenResourceError,
): ):
aio_err: BaseException = chan._aio_err aio_err = chan._aio_err
if ( if (
task.cancelled() and task.cancelled() and
type(aio_err) is CancelledError type(aio_err) is CancelledError