Compare commits
7 Commits
main
...
dereg_on_o
Author | SHA1 | Date |
---|---|---|
|
c9fda3ff2c | |
|
11acdf8625 | |
|
0c2fb98d5e | |
|
8b82808fcd | |
|
c93fbcc9f7 | |
|
c089c8d0d6 | |
|
9f5b112d19 |
|
@ -1,6 +1,7 @@
|
|||
"""
|
||||
Actor "discovery" testing
|
||||
"""
|
||||
'''
|
||||
Discovery subsystem via a "registrar" actor scenarios.
|
||||
|
||||
'''
|
||||
import os
|
||||
import signal
|
||||
import platform
|
||||
|
@ -149,7 +150,10 @@ async def unpack_reg(actor_or_portal):
|
|||
else:
|
||||
msg = await actor_or_portal.run_from_ns('self', 'get_registry')
|
||||
|
||||
return {tuple(key.split('.')): val for key, val in msg.items()}
|
||||
return {
|
||||
tuple(key.split('.')): val
|
||||
for key, val in msg.items()
|
||||
}
|
||||
|
||||
|
||||
async def spawn_and_check_registry(
|
||||
|
@ -325,20 +329,24 @@ async def close_chans_before_nursery(
|
|||
try:
|
||||
get_reg = partial(unpack_reg, aportal)
|
||||
|
||||
async with tractor.open_nursery() as tn:
|
||||
portal1 = await tn.start_actor(
|
||||
name='consumer1', enable_modules=[__name__])
|
||||
portal2 = await tn.start_actor(
|
||||
'consumer2', enable_modules=[__name__])
|
||||
async with tractor.open_nursery() as an:
|
||||
portal1 = await an.start_actor(
|
||||
name='consumer1',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
portal2 = await an.start_actor(
|
||||
'consumer2',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
# TODO: compact this back as was in last commit once
|
||||
# 3.9+, see https://github.com/goodboy/tractor/issues/207
|
||||
async with portal1.open_stream_from(
|
||||
async with (
|
||||
portal1.open_stream_from(
|
||||
stream_forever
|
||||
) as agen1:
|
||||
async with portal2.open_stream_from(
|
||||
) as agen1,
|
||||
portal2.open_stream_from(
|
||||
stream_forever
|
||||
) as agen2:
|
||||
) as agen2,
|
||||
):
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
|
@ -361,6 +369,7 @@ async def close_chans_before_nursery(
|
|||
# also kill off channels cuz why not
|
||||
await agen1.aclose()
|
||||
await agen2.aclose()
|
||||
|
||||
finally:
|
||||
with trio.CancelScope(shield=True):
|
||||
await trio.sleep(1)
|
||||
|
@ -378,10 +387,12 @@ def test_close_channel_explicit(
|
|||
use_signal,
|
||||
reg_addr,
|
||||
):
|
||||
"""Verify that closing a stream explicitly and killing the actor's
|
||||
'''
|
||||
Verify that closing a stream explicitly and killing the actor's
|
||||
"root nursery" **before** the containing nursery tears down also
|
||||
results in subactor(s) deregistering from the arbiter.
|
||||
"""
|
||||
|
||||
'''
|
||||
with pytest.raises(KeyboardInterrupt):
|
||||
trio.run(
|
||||
partial(
|
||||
|
@ -394,16 +405,18 @@ def test_close_channel_explicit(
|
|||
|
||||
|
||||
@pytest.mark.parametrize('use_signal', [False, True])
|
||||
def test_close_channel_explicit_remote_arbiter(
|
||||
def test_close_channel_explicit_remote_registrar(
|
||||
daemon: subprocess.Popen,
|
||||
start_method,
|
||||
use_signal,
|
||||
reg_addr,
|
||||
):
|
||||
"""Verify that closing a stream explicitly and killing the actor's
|
||||
'''
|
||||
Verify that closing a stream explicitly and killing the actor's
|
||||
"root nursery" **before** the containing nursery tears down also
|
||||
results in subactor(s) deregistering from the arbiter.
|
||||
"""
|
||||
|
||||
'''
|
||||
with pytest.raises(KeyboardInterrupt):
|
||||
trio.run(
|
||||
partial(
|
||||
|
@ -413,3 +426,61 @@ def test_close_channel_explicit_remote_arbiter(
|
|||
remote_arbiter=True,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def kill_transport(
|
||||
ctx: tractor.Context,
|
||||
) -> None:
|
||||
|
||||
await ctx.started()
|
||||
actor: tractor.Actor = tractor.current_actor()
|
||||
actor.ipc_server.cancel()
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
|
||||
# @pytest.mark.parametrize('use_signal', [False, True])
|
||||
def test_stale_entry_is_deleted(
|
||||
debug_mode: bool,
|
||||
daemon: subprocess.Popen,
|
||||
start_method: str,
|
||||
reg_addr: tuple,
|
||||
):
|
||||
'''
|
||||
Ensure that when a stale entry is detected in the registrar's
|
||||
table that the `find_actor()` API takes care of deleting the
|
||||
stale entry and not delivering a bad portal.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
|
||||
name: str = 'transport_fails_actor'
|
||||
_reg_ptl: tractor.Portal
|
||||
an: tractor.ActorNursery
|
||||
async with (
|
||||
tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an,
|
||||
tractor.get_registry(reg_addr) as _reg_ptl,
|
||||
):
|
||||
ptl: tractor.Portal = await an.start_actor(
|
||||
name,
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
async with ptl.open_context(
|
||||
kill_transport,
|
||||
) as (first, ctx):
|
||||
async with tractor.find_actor(name) as maybe_portal:
|
||||
# because the transitive
|
||||
# `._discovery.maybe_open_portal()` call should
|
||||
# fail and implicitly call `.delete_addr()`
|
||||
assert maybe_portal is None
|
||||
registry: dict = await unpack_reg(_reg_ptl)
|
||||
assert ptl.chan.aid.uid not in registry
|
||||
|
||||
# should fail since we knocked out the IPC tpt XD
|
||||
await ptl.cancel_actor()
|
||||
await an.cancel()
|
||||
|
||||
trio.run(main)
|
||||
|
|
|
@ -60,7 +60,7 @@ log = get_logger(__name__)
|
|||
async def get_registry(
|
||||
addr: UnwrappedAddress|None = None,
|
||||
) -> AsyncGenerator[
|
||||
Portal | LocalPortal | None,
|
||||
Portal|LocalPortal|None,
|
||||
None,
|
||||
]:
|
||||
'''
|
||||
|
@ -150,7 +150,7 @@ async def query_actor(
|
|||
regaddr: UnwrappedAddress|None = None,
|
||||
|
||||
) -> AsyncGenerator[
|
||||
UnwrappedAddress|None,
|
||||
tuple[UnwrappedAddress|None, Portal|None],
|
||||
None,
|
||||
]:
|
||||
'''
|
||||
|
@ -164,7 +164,8 @@ async def query_actor(
|
|||
actor: Actor = current_actor()
|
||||
if (
|
||||
name == 'registrar'
|
||||
and actor.is_registrar
|
||||
and
|
||||
actor.is_registrar
|
||||
):
|
||||
raise RuntimeError(
|
||||
'The current actor IS the registry!?'
|
||||
|
@ -172,7 +173,7 @@ async def query_actor(
|
|||
|
||||
maybe_peers: list[Channel]|None = get_peer_by_name(name)
|
||||
if maybe_peers:
|
||||
yield maybe_peers[0].raddr
|
||||
yield maybe_peers[0].raddr, None
|
||||
return
|
||||
|
||||
reg_portal: Portal
|
||||
|
@ -185,8 +186,7 @@ async def query_actor(
|
|||
'find_actor',
|
||||
name=name,
|
||||
)
|
||||
yield addr
|
||||
|
||||
yield addr, reg_portal
|
||||
|
||||
@acm
|
||||
async def maybe_open_portal(
|
||||
|
@ -196,14 +196,36 @@ async def maybe_open_portal(
|
|||
async with query_actor(
|
||||
name=name,
|
||||
regaddr=addr,
|
||||
) as addr:
|
||||
pass
|
||||
) as (addr, reg_portal):
|
||||
if not addr:
|
||||
yield None
|
||||
return
|
||||
|
||||
if addr:
|
||||
try:
|
||||
async with _connect_chan(addr) as chan:
|
||||
async with open_portal(chan) as portal:
|
||||
yield portal
|
||||
else:
|
||||
|
||||
# most likely we were unable to connect the
|
||||
# transport and there is likely a stale entry in
|
||||
# the registry actor's table, thus we need to
|
||||
# instruct it to clear that stale entry and then
|
||||
# more silently (pretend there was no reason but
|
||||
# to) indicate that the target actor can't be
|
||||
# contacted at that addr.
|
||||
except OSError:
|
||||
# NOTE: ensure we delete the stale entry from the
|
||||
# registar actor.
|
||||
uid: tuple[str, str] = await reg_portal.run_from_ns(
|
||||
'self',
|
||||
'delete_addr',
|
||||
addr=addr,
|
||||
)
|
||||
log.warning(
|
||||
f'Deleted stale registry entry !\n'
|
||||
f'addr: {addr!r}\n'
|
||||
f'uid: {uid!r}\n'
|
||||
)
|
||||
yield None
|
||||
|
||||
|
||||
|
@ -272,7 +294,7 @@ async def find_actor(
|
|||
if not any(portals):
|
||||
if raise_on_none:
|
||||
raise RuntimeError(
|
||||
f'No actor "{name}" found registered @ {registry_addrs}'
|
||||
f'No actor {name!r} found registered @ {registry_addrs!r}'
|
||||
)
|
||||
yield None
|
||||
return
|
||||
|
|
|
@ -68,6 +68,7 @@ import textwrap
|
|||
from types import ModuleType
|
||||
import warnings
|
||||
|
||||
from bidict import bidict
|
||||
import trio
|
||||
from trio._core import _run as trio_runtime
|
||||
from trio import (
|
||||
|
@ -1879,10 +1880,10 @@ class Arbiter(Actor):
|
|||
**kwargs,
|
||||
) -> None:
|
||||
|
||||
self._registry: dict[
|
||||
self._registry: bidict[
|
||||
tuple[str, str],
|
||||
UnwrappedAddress,
|
||||
] = {}
|
||||
] = bidict({})
|
||||
self._waiters: dict[
|
||||
str,
|
||||
# either an event to sync to receiving an actor uid (which
|
||||
|
@ -1971,7 +1972,8 @@ class Arbiter(Actor):
|
|||
# should never be 0-dynamic-os-alloc
|
||||
await debug.pause()
|
||||
|
||||
self._registry[uid] = addr
|
||||
# XXX NOTE, value must also be hashable.
|
||||
self._registry[uid] = tuple(addr)
|
||||
|
||||
# pop and signal all waiter events
|
||||
events = self._waiters.pop(name, [])
|
||||
|
@ -1988,4 +1990,26 @@ class Arbiter(Actor):
|
|||
uid = (str(uid[0]), str(uid[1]))
|
||||
entry: tuple = self._registry.pop(uid, None)
|
||||
if entry is None:
|
||||
log.warning(f'Request to de-register {uid} failed?')
|
||||
log.warning(
|
||||
f'Request to de-register {uid!r} failed?'
|
||||
)
|
||||
|
||||
async def delete_addr(
|
||||
self,
|
||||
addr: tuple[str, int|str],
|
||||
) -> tuple[str, str]:
|
||||
uid: tuple | None = self._registry.inverse.pop(
|
||||
addr,
|
||||
None,
|
||||
)
|
||||
if uid:
|
||||
report: str = 'Deleting registry-entry for,\n'
|
||||
else:
|
||||
report: str = 'No registry entry for,\n'
|
||||
|
||||
log.warning(
|
||||
report
|
||||
+
|
||||
f'{addr!r}@{uid!r}'
|
||||
)
|
||||
return uid
|
||||
|
|
Loading…
Reference in New Issue