Compare commits

...

5 Commits

Author SHA1 Message Date
Tyler Goodlet 3a31c9d338 to_asyncio: mask error logging, not sure it adds that much 2023-09-26 10:32:01 -04:00
Tyler Goodlet 3dc57e384e Always no-raise try-to-pop registry addrs 2023-09-15 14:20:12 -04:00
Tyler Goodlet 687852f368 Add stale entry deleted from registrar test
By spawning an actor task that immediately shuts down the transport
server and then sleeps, verify that attempting to connect via the
`._discovery.find_actor()` helper delivers `None` for the `Portal`
value.

Relates to #184 and #216
2023-08-28 12:20:12 -04:00
Tyler Goodlet d83d991f21 Handle stale registrar entries; detect and delete
In cases where an actor's transport server task (by default handling new
TCP connections) terminates early but does not de-register from the
pertaining registry (aka the registrar) actor's address table, the
trying-to-connect client actor will get a connection error on that
address. In the case where client handles a (local) `OSError` (meaning
the target actor address is likely being contacted over `localhost`)
exception, make a further call to the registrar to delete the stale
entry and `yield None` gracefully indicating to calling code that no
`Portal` can be delivered to the target address.

This issue was originally discovered in `piker` where the `emsd`
(clearing engine) actor would sometimes crash on rapid client
re-connects and then leave a `pikerd` stale entry. With this fix new
clients will attempt connect via an endpoint which will re-spawn the
`emsd` when a `None` portal is delivered (via `maybe_spawn_em()`).
2023-08-28 11:26:36 -04:00
Tyler Goodlet 1cf712cfac Add `Arbiter.delete_sockaddr()` to remove addrs
Since stale addrs can be leaked where the actor transport server task
crashes but doesn't (successfully) unregister from the registrar, we
need a remote way to remove such entries; hence this new (registrar)
method.

To implement this make use of the `bidict` lib for the `._registry`
table thus making it super simple to do reverse uuid lookups from an
input socket-address.
2023-08-21 19:07:14 -04:00
7 changed files with 182 additions and 57 deletions

View File

@ -41,6 +41,9 @@ setup(
], ],
install_requires=[ install_requires=[
# discovery subsys
'bidict',
# trio related # trio related
# proper range spec: # proper range spec:
# https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5 # https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5

View File

@ -219,7 +219,8 @@ def daemon(
arb_addr: tuple[str, int], arb_addr: tuple[str, int],
): ):
''' '''
Run a daemon actor as a "remote arbiter". Run a daemon actor as a "remote registrar" and/or plain ol
separate actor (service) tree.
''' '''
if loglevel in ('trace', 'debug'): if loglevel in ('trace', 'debug'):

View File

@ -1,6 +1,7 @@
""" '''
Actor "discovery" testing Discovery subsystem via a "registrar" actor scenarios.
"""
'''
import os import os
import signal import signal
import platform import platform
@ -127,7 +128,10 @@ async def unpack_reg(actor_or_portal):
else: else:
msg = await actor_or_portal.run_from_ns('self', 'get_registry') msg = await actor_or_portal.run_from_ns('self', 'get_registry')
return {tuple(key.split('.')): val for key, val in msg.items()} return {
tuple(key.split('.')): val
for key, val in msg.items()
}
async def spawn_and_check_registry( async def spawn_and_check_registry(
@ -283,37 +287,41 @@ async def close_chans_before_nursery(
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
portal1 = await tn.start_actor( portal1 = await tn.start_actor(
name='consumer1', enable_modules=[__name__]) name='consumer1',
enable_modules=[__name__],
)
portal2 = await tn.start_actor( portal2 = await tn.start_actor(
'consumer2', enable_modules=[__name__]) 'consumer2',
enable_modules=[__name__],
)
# TODO: compact this back as was in last commit once async with (
# 3.9+, see https://github.com/goodboy/tractor/issues/207 portal1.open_stream_from(
async with portal1.open_stream_from(
stream_forever
) as agen1:
async with portal2.open_stream_from(
stream_forever stream_forever
) as agen2: ) as agen1,
async with trio.open_nursery() as n: portal2.open_stream_from(
n.start_soon(streamer, agen1) stream_forever
n.start_soon(cancel, use_signal, .5) ) as agen2,
try: ):
await streamer(agen2) async with trio.open_nursery() as n:
finally: n.start_soon(streamer, agen1)
# Kill the root nursery thus resulting in n.start_soon(cancel, use_signal, .5)
# normal arbiter channel ops to fail during try:
# teardown. It doesn't seem like this is await streamer(agen2)
# reliably triggered by an external SIGINT. finally:
# tractor.current_actor()._root_nursery.cancel_scope.cancel() # Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
# XXX: THIS IS THE KEY THING that # XXX: THIS IS THE KEY THING that
# happens **before** exiting the # happens **before** exiting the
# actor nursery block # actor nursery block
# also kill off channels cuz why not # also kill off channels cuz why not
await agen1.aclose() await agen1.aclose()
await agen2.aclose() await agen2.aclose()
finally: finally:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await trio.sleep(1) await trio.sleep(1)
@ -331,10 +339,12 @@ def test_close_channel_explicit(
use_signal, use_signal,
arb_addr, arb_addr,
): ):
"""Verify that closing a stream explicitly and killing the actor's '''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also "root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter. results in subactor(s) deregistering from the arbiter.
"""
'''
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
trio.run( trio.run(
partial( partial(
@ -347,16 +357,18 @@ def test_close_channel_explicit(
@pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter( def test_close_channel_explicit_remote_registrar(
daemon, daemon,
start_method, start_method,
use_signal, use_signal,
arb_addr, arb_addr,
): ):
"""Verify that closing a stream explicitly and killing the actor's '''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also "root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter. results in subactor(s) deregistering from the arbiter.
"""
'''
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
trio.run( trio.run(
partial( partial(
@ -366,3 +378,51 @@ def test_close_channel_explicit_remote_arbiter(
remote_arbiter=True, remote_arbiter=True,
), ),
) )
@tractor.context
async def kill_transport(
ctx: tractor.Context,
) -> None:
await ctx.started()
actor: tractor.Actor = tractor.current_actor()
actor.cancel_server()
await trio.sleep_forever()
# @pytest.mark.parametrize('use_signal', [False, True])
def test_stale_entry_is_deleted(
daemon,
start_method,
arb_addr,
):
'''
Ensure that when a stale entry is detected in the registrar's table
that the `find_actor()` API takes care of deleting the stale entry
and not delivering a bad portal.
'''
async def main():
name: str = 'transport_fails_actor'
regport: tractor.Portal
tn: tractor.ActorNursery
async with (
tractor.open_nursery() as tn,
tractor.get_registrar(*arb_addr) as regport,
):
ptl: tractor.Portal = await tn.start_actor(
name,
enable_modules=[__name__],
)
async with ptl.open_context(
kill_transport,
) as (first, ctx):
async with tractor.find_actor(name) as maybe_portal:
assert maybe_portal is None
await ptl.cancel_actor()
trio.run(main)

View File

@ -31,6 +31,7 @@ from ._streaming import (
) )
from ._discovery import ( from ._discovery import (
get_arbiter, get_arbiter,
get_registrar,
find_actor, find_actor,
wait_for_actor, wait_for_actor,
query_actor, query_actor,
@ -77,6 +78,7 @@ __all__ = [
'find_actor', 'find_actor',
'query_actor', 'query_actor',
'get_arbiter', 'get_arbiter',
'get_registrar',
'is_root_process', 'is_root_process',
'msg', 'msg',
'open_actor_cluster', 'open_actor_cluster',

View File

@ -35,7 +35,7 @@ from ._state import current_actor, _runtime_vars
@acm @acm
async def get_arbiter( async def get_registrar(
host: str, host: str,
port: int, port: int,
@ -56,11 +56,14 @@ async def get_arbiter(
# (likely a re-entrant call from the arbiter actor) # (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor, Channel((host, port))) yield LocalPortal(actor, Channel((host, port)))
else: else:
async with _connect_chan(host, port) as chan: async with (
_connect_chan(host, port) as chan,
open_portal(chan) as arb_portal,
):
yield arb_portal
async with open_portal(chan) as arb_portal:
yield arb_portal get_arbiter = get_registrar
@acm @acm
@ -101,7 +104,10 @@ async def query_actor(
# TODO: return portals to all available actors - for now just # TODO: return portals to all available actors - for now just
# the last one that registered # the last one that registered
if name == 'arbiter' and actor.is_arbiter: if (
name == 'arbiter'
and actor.is_arbiter
):
raise RuntimeError("The current actor is the arbiter") raise RuntimeError("The current actor is the arbiter")
yield sockaddr if sockaddr else None yield sockaddr if sockaddr else None
@ -112,7 +118,7 @@ async def find_actor(
name: str, name: str,
arbiter_sockaddr: tuple[str, int] | None = None arbiter_sockaddr: tuple[str, int] | None = None
) -> AsyncGenerator[Optional[Portal], None]: ) -> AsyncGenerator[Portal | None, None]:
''' '''
Ask the arbiter to find actor(s) by name. Ask the arbiter to find actor(s) by name.
@ -120,17 +126,49 @@ async def find_actor(
known to the arbiter. known to the arbiter.
''' '''
async with query_actor( actor = current_actor()
name=name, async with get_arbiter(
arbiter_sockaddr=arbiter_sockaddr, *arbiter_sockaddr or actor._arb_addr
) as sockaddr: ) as arb_portal:
sockaddr = await arb_portal.run_from_ns(
'self',
'find_actor',
name=name,
)
# TODO: return portals to all available actors - for now just
# the last one that registered
if (
name == 'arbiter'
and actor.is_arbiter
):
raise RuntimeError("The current actor is the arbiter")
if sockaddr: if sockaddr:
async with _connect_chan(*sockaddr) as chan: try:
async with open_portal(chan) as portal: async with _connect_chan(*sockaddr) as chan:
yield portal async with open_portal(chan) as portal:
else: yield portal
yield None return
# most likely we were unable to connect the
# transport and there is likely a stale entry in
# the registry actor's table, thus we need to
# instruct it to clear that stale entry and then
# more silently (pretend there was no reason but
# to) indicate that the target actor can't be
# contacted at that addr.
except OSError:
# NOTE: ensure we delete the stale entry from the
# registar actor.
uid: tuple[str, str] = await arb_portal.run_from_ns(
'self',
'delete_sockaddr',
sockaddr=sockaddr,
)
yield None
@acm @acm

View File

@ -40,6 +40,7 @@ from contextlib import ExitStack
import warnings import warnings
from async_generator import aclosing from async_generator import aclosing
from bidict import bidict
from exceptiongroup import BaseExceptionGroup from exceptiongroup import BaseExceptionGroup
import trio # type: ignore import trio # type: ignore
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -1774,10 +1775,10 @@ class Arbiter(Actor):
def __init__(self, *args, **kwargs) -> None: def __init__(self, *args, **kwargs) -> None:
self._registry: dict[ self._registry: bidict[
tuple[str, str], tuple[str, str],
tuple[str, int], tuple[str, int],
] = {} ] = bidict({})
self._waiters: dict[ self._waiters: dict[
str, str,
# either an event to sync to receiving an actor uid (which # either an event to sync to receiving an actor uid (which
@ -1871,3 +1872,23 @@ class Arbiter(Actor):
entry: tuple = self._registry.pop(uid, None) entry: tuple = self._registry.pop(uid, None)
if entry is None: if entry is None:
log.warning(f'Request to de-register {uid} failed?') log.warning(f'Request to de-register {uid} failed?')
async def delete_sockaddr(
self,
sockaddr: tuple[str, int],
) -> tuple[str, str]:
uid: tuple | None = self._registry.inverse.pop(
sockaddr,
None,
)
if uid:
log.warning(
f'Deleting registry entry for {sockaddr}@{uid}!'
)
else:
log.warning(
f'No registry entry for {sockaddr}@{uid}!'
)
return uid

View File

@ -216,7 +216,7 @@ def _run_asyncio_task(
try: try:
result = await coro result = await coro
except BaseException as aio_err: except BaseException as aio_err:
log.exception('asyncio task errored') # log.exception('asyncio task errored:')
chan._aio_err = aio_err chan._aio_err = aio_err
raise raise
@ -300,7 +300,7 @@ def _run_asyncio_task(
elif task_err is None: elif task_err is None:
assert aio_err assert aio_err
aio_err.with_traceback(aio_err.__traceback__) aio_err.with_traceback(aio_err.__traceback__)
log.error('infected task errorred') # log.error('infected task errorred')
# XXX: alway cancel the scope on error # XXX: alway cancel the scope on error
# in case the trio task is blocking # in case the trio task is blocking
@ -356,7 +356,7 @@ async def translate_aio_errors(
# relay cancel through to called ``asyncio`` task # relay cancel through to called ``asyncio`` task
assert chan._aio_task assert chan._aio_task
chan._aio_task.cancel( chan._aio_task.cancel(
msg=f'the `trio` caller task was cancelled: {trio_task.name}' msg=f'`trio`-side caller task cancelled: {trio_task.name}'
) )
raise raise
@ -366,7 +366,7 @@ async def translate_aio_errors(
trio.ClosedResourceError, trio.ClosedResourceError,
# trio.BrokenResourceError, # trio.BrokenResourceError,
): ):
aio_err = chan._aio_err aio_err: BaseException = chan._aio_err
if ( if (
task.cancelled() and task.cancelled() and
type(aio_err) is CancelledError type(aio_err) is CancelledError