Compare commits

...

5 Commits

Author SHA1 Message Date
Tyler Goodlet 3a31c9d338 to_asyncio: mask error logging, not sure it adds that much 2023-09-26 10:32:01 -04:00
Tyler Goodlet 3dc57e384e Always no-raise try-to-pop registry addrs 2023-09-15 14:20:12 -04:00
Tyler Goodlet 687852f368 Add stale entry deleted from registrar test
By spawning an actor task that immediately shuts down the transport
server and then sleeps, verify that attempting to connect via the
`._discovery.find_actor()` helper delivers `None` for the `Portal`
value.

Relates to #184 and #216
2023-08-28 12:20:12 -04:00
Tyler Goodlet d83d991f21 Handle stale registrar entries; detect and delete
In cases where an actor's transport server task (by default handling new
TCP connections) terminates early but does not de-register from the
pertaining registry (aka the registrar) actor's address table, the
trying-to-connect client actor will get a connection error on that
address. In the case where client handles a (local) `OSError` (meaning
the target actor address is likely being contacted over `localhost`)
exception, make a further call to the registrar to delete the stale
entry and `yield None` gracefully indicating to calling code that no
`Portal` can be delivered to the target address.

This issue was originally discovered in `piker` where the `emsd`
(clearing engine) actor would sometimes crash on rapid client
re-connects and then leave a `pikerd` stale entry. With this fix new
clients will attempt connect via an endpoint which will re-spawn the
`emsd` when a `None` portal is delivered (via `maybe_spawn_em()`).
2023-08-28 11:26:36 -04:00
Tyler Goodlet 1cf712cfac Add `Arbiter.delete_sockaddr()` to remove addrs
Since stale addrs can be leaked where the actor transport server task
crashes but doesn't (successfully) unregister from the registrar, we
need a remote way to remove such entries; hence this new (registrar)
method.

To implement this make use of the `bidict` lib for the `._registry`
table thus making it super simple to do reverse uuid lookups from an
input socket-address.
2023-08-21 19:07:14 -04:00
7 changed files with 182 additions and 57 deletions

View File

@ -41,6 +41,9 @@ setup(
],
install_requires=[
# discovery subsys
'bidict',
# trio related
# proper range spec:
# https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5

View File

@ -219,7 +219,8 @@ def daemon(
arb_addr: tuple[str, int],
):
'''
Run a daemon actor as a "remote arbiter".
Run a daemon actor as a "remote registrar" and/or plain ol
separate actor (service) tree.
'''
if loglevel in ('trace', 'debug'):

View File

@ -1,6 +1,7 @@
"""
Actor "discovery" testing
"""
'''
Discovery subsystem via a "registrar" actor scenarios.
'''
import os
import signal
import platform
@ -127,7 +128,10 @@ async def unpack_reg(actor_or_portal):
else:
msg = await actor_or_portal.run_from_ns('self', 'get_registry')
return {tuple(key.split('.')): val for key, val in msg.items()}
return {
tuple(key.split('.')): val
for key, val in msg.items()
}
async def spawn_and_check_registry(
@ -283,37 +287,41 @@ async def close_chans_before_nursery(
async with tractor.open_nursery() as tn:
portal1 = await tn.start_actor(
name='consumer1', enable_modules=[__name__])
name='consumer1',
enable_modules=[__name__],
)
portal2 = await tn.start_actor(
'consumer2', enable_modules=[__name__])
'consumer2',
enable_modules=[__name__],
)
# TODO: compact this back as was in last commit once
# 3.9+, see https://github.com/goodboy/tractor/issues/207
async with portal1.open_stream_from(
stream_forever
) as agen1:
async with portal2.open_stream_from(
async with (
portal1.open_stream_from(
stream_forever
) as agen2:
async with trio.open_nursery() as n:
n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
) as agen1,
portal2.open_stream_from(
stream_forever
) as agen2,
):
async with trio.open_nursery() as n:
n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
# XXX: THIS IS THE KEY THING that
# happens **before** exiting the
# actor nursery block
# XXX: THIS IS THE KEY THING that
# happens **before** exiting the
# actor nursery block
# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()
# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()
finally:
with trio.CancelScope(shield=True):
await trio.sleep(1)
@ -331,10 +339,12 @@ def test_close_channel_explicit(
use_signal,
arb_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
'''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
"""
'''
with pytest.raises(KeyboardInterrupt):
trio.run(
partial(
@ -347,16 +357,18 @@ def test_close_channel_explicit(
@pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter(
def test_close_channel_explicit_remote_registrar(
daemon,
start_method,
use_signal,
arb_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
'''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
"""
'''
with pytest.raises(KeyboardInterrupt):
trio.run(
partial(
@ -366,3 +378,51 @@ def test_close_channel_explicit_remote_arbiter(
remote_arbiter=True,
),
)
@tractor.context
async def kill_transport(
ctx: tractor.Context,
) -> None:
await ctx.started()
actor: tractor.Actor = tractor.current_actor()
actor.cancel_server()
await trio.sleep_forever()
# @pytest.mark.parametrize('use_signal', [False, True])
def test_stale_entry_is_deleted(
daemon,
start_method,
arb_addr,
):
'''
Ensure that when a stale entry is detected in the registrar's table
that the `find_actor()` API takes care of deleting the stale entry
and not delivering a bad portal.
'''
async def main():
name: str = 'transport_fails_actor'
regport: tractor.Portal
tn: tractor.ActorNursery
async with (
tractor.open_nursery() as tn,
tractor.get_registrar(*arb_addr) as regport,
):
ptl: tractor.Portal = await tn.start_actor(
name,
enable_modules=[__name__],
)
async with ptl.open_context(
kill_transport,
) as (first, ctx):
async with tractor.find_actor(name) as maybe_portal:
assert maybe_portal is None
await ptl.cancel_actor()
trio.run(main)

View File

@ -31,6 +31,7 @@ from ._streaming import (
)
from ._discovery import (
get_arbiter,
get_registrar,
find_actor,
wait_for_actor,
query_actor,
@ -77,6 +78,7 @@ __all__ = [
'find_actor',
'query_actor',
'get_arbiter',
'get_registrar',
'is_root_process',
'msg',
'open_actor_cluster',

View File

@ -35,7 +35,7 @@ from ._state import current_actor, _runtime_vars
@acm
async def get_arbiter(
async def get_registrar(
host: str,
port: int,
@ -56,11 +56,14 @@ async def get_arbiter(
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor, Channel((host, port)))
else:
async with _connect_chan(host, port) as chan:
async with (
_connect_chan(host, port) as chan,
open_portal(chan) as arb_portal,
):
yield arb_portal
async with open_portal(chan) as arb_portal:
yield arb_portal
get_arbiter = get_registrar
@acm
@ -101,7 +104,10 @@ async def query_actor(
# TODO: return portals to all available actors - for now just
# the last one that registered
if name == 'arbiter' and actor.is_arbiter:
if (
name == 'arbiter'
and actor.is_arbiter
):
raise RuntimeError("The current actor is the arbiter")
yield sockaddr if sockaddr else None
@ -112,7 +118,7 @@ async def find_actor(
name: str,
arbiter_sockaddr: tuple[str, int] | None = None
) -> AsyncGenerator[Optional[Portal], None]:
) -> AsyncGenerator[Portal | None, None]:
'''
Ask the arbiter to find actor(s) by name.
@ -120,17 +126,49 @@ async def find_actor(
known to the arbiter.
'''
async with query_actor(
name=name,
arbiter_sockaddr=arbiter_sockaddr,
) as sockaddr:
actor = current_actor()
async with get_arbiter(
*arbiter_sockaddr or actor._arb_addr
) as arb_portal:
sockaddr = await arb_portal.run_from_ns(
'self',
'find_actor',
name=name,
)
# TODO: return portals to all available actors - for now just
# the last one that registered
if (
name == 'arbiter'
and actor.is_arbiter
):
raise RuntimeError("The current actor is the arbiter")
if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
try:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
return
# most likely we were unable to connect the
# transport and there is likely a stale entry in
# the registry actor's table, thus we need to
# instruct it to clear that stale entry and then
# more silently (pretend there was no reason but
# to) indicate that the target actor can't be
# contacted at that addr.
except OSError:
# NOTE: ensure we delete the stale entry from the
# registar actor.
uid: tuple[str, str] = await arb_portal.run_from_ns(
'self',
'delete_sockaddr',
sockaddr=sockaddr,
)
yield None
@acm

View File

@ -40,6 +40,7 @@ from contextlib import ExitStack
import warnings
from async_generator import aclosing
from bidict import bidict
from exceptiongroup import BaseExceptionGroup
import trio # type: ignore
from trio_typing import TaskStatus
@ -1774,10 +1775,10 @@ class Arbiter(Actor):
def __init__(self, *args, **kwargs) -> None:
self._registry: dict[
self._registry: bidict[
tuple[str, str],
tuple[str, int],
] = {}
] = bidict({})
self._waiters: dict[
str,
# either an event to sync to receiving an actor uid (which
@ -1871,3 +1872,23 @@ class Arbiter(Actor):
entry: tuple = self._registry.pop(uid, None)
if entry is None:
log.warning(f'Request to de-register {uid} failed?')
async def delete_sockaddr(
self,
sockaddr: tuple[str, int],
) -> tuple[str, str]:
uid: tuple | None = self._registry.inverse.pop(
sockaddr,
None,
)
if uid:
log.warning(
f'Deleting registry entry for {sockaddr}@{uid}!'
)
else:
log.warning(
f'No registry entry for {sockaddr}@{uid}!'
)
return uid

View File

@ -216,7 +216,7 @@ def _run_asyncio_task(
try:
result = await coro
except BaseException as aio_err:
log.exception('asyncio task errored')
# log.exception('asyncio task errored:')
chan._aio_err = aio_err
raise
@ -300,7 +300,7 @@ def _run_asyncio_task(
elif task_err is None:
assert aio_err
aio_err.with_traceback(aio_err.__traceback__)
log.error('infected task errorred')
# log.error('infected task errorred')
# XXX: alway cancel the scope on error
# in case the trio task is blocking
@ -356,7 +356,7 @@ async def translate_aio_errors(
# relay cancel through to called ``asyncio`` task
assert chan._aio_task
chan._aio_task.cancel(
msg=f'the `trio` caller task was cancelled: {trio_task.name}'
msg=f'`trio`-side caller task cancelled: {trio_task.name}'
)
raise
@ -366,7 +366,7 @@ async def translate_aio_errors(
trio.ClosedResourceError,
# trio.BrokenResourceError,
):
aio_err = chan._aio_err
aio_err: BaseException = chan._aio_err
if (
task.cancelled() and
type(aio_err) is CancelledError