Compare commits
5 Commits
22c14e235e
...
3a31c9d338
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 3a31c9d338 | |
Tyler Goodlet | 3dc57e384e | |
Tyler Goodlet | 687852f368 | |
Tyler Goodlet | d83d991f21 | |
Tyler Goodlet | 1cf712cfac |
3
setup.py
3
setup.py
|
@ -41,6 +41,9 @@ setup(
|
||||||
],
|
],
|
||||||
install_requires=[
|
install_requires=[
|
||||||
|
|
||||||
|
# discovery subsys
|
||||||
|
'bidict',
|
||||||
|
|
||||||
# trio related
|
# trio related
|
||||||
# proper range spec:
|
# proper range spec:
|
||||||
# https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5
|
# https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5
|
||||||
|
|
|
@ -219,7 +219,8 @@ def daemon(
|
||||||
arb_addr: tuple[str, int],
|
arb_addr: tuple[str, int],
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Run a daemon actor as a "remote arbiter".
|
Run a daemon actor as a "remote registrar" and/or plain ol
|
||||||
|
separate actor (service) tree.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if loglevel in ('trace', 'debug'):
|
if loglevel in ('trace', 'debug'):
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
"""
|
'''
|
||||||
Actor "discovery" testing
|
Discovery subsystem via a "registrar" actor scenarios.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
import platform
|
import platform
|
||||||
|
@ -127,7 +128,10 @@ async def unpack_reg(actor_or_portal):
|
||||||
else:
|
else:
|
||||||
msg = await actor_or_portal.run_from_ns('self', 'get_registry')
|
msg = await actor_or_portal.run_from_ns('self', 'get_registry')
|
||||||
|
|
||||||
return {tuple(key.split('.')): val for key, val in msg.items()}
|
return {
|
||||||
|
tuple(key.split('.')): val
|
||||||
|
for key, val in msg.items()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
async def spawn_and_check_registry(
|
async def spawn_and_check_registry(
|
||||||
|
@ -283,37 +287,41 @@ async def close_chans_before_nursery(
|
||||||
|
|
||||||
async with tractor.open_nursery() as tn:
|
async with tractor.open_nursery() as tn:
|
||||||
portal1 = await tn.start_actor(
|
portal1 = await tn.start_actor(
|
||||||
name='consumer1', enable_modules=[__name__])
|
name='consumer1',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
portal2 = await tn.start_actor(
|
portal2 = await tn.start_actor(
|
||||||
'consumer2', enable_modules=[__name__])
|
'consumer2',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
|
||||||
# TODO: compact this back as was in last commit once
|
async with (
|
||||||
# 3.9+, see https://github.com/goodboy/tractor/issues/207
|
portal1.open_stream_from(
|
||||||
async with portal1.open_stream_from(
|
|
||||||
stream_forever
|
|
||||||
) as agen1:
|
|
||||||
async with portal2.open_stream_from(
|
|
||||||
stream_forever
|
stream_forever
|
||||||
) as agen2:
|
) as agen1,
|
||||||
async with trio.open_nursery() as n:
|
portal2.open_stream_from(
|
||||||
n.start_soon(streamer, agen1)
|
stream_forever
|
||||||
n.start_soon(cancel, use_signal, .5)
|
) as agen2,
|
||||||
try:
|
):
|
||||||
await streamer(agen2)
|
async with trio.open_nursery() as n:
|
||||||
finally:
|
n.start_soon(streamer, agen1)
|
||||||
# Kill the root nursery thus resulting in
|
n.start_soon(cancel, use_signal, .5)
|
||||||
# normal arbiter channel ops to fail during
|
try:
|
||||||
# teardown. It doesn't seem like this is
|
await streamer(agen2)
|
||||||
# reliably triggered by an external SIGINT.
|
finally:
|
||||||
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
|
# Kill the root nursery thus resulting in
|
||||||
|
# normal arbiter channel ops to fail during
|
||||||
|
# teardown. It doesn't seem like this is
|
||||||
|
# reliably triggered by an external SIGINT.
|
||||||
|
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
# XXX: THIS IS THE KEY THING that
|
# XXX: THIS IS THE KEY THING that
|
||||||
# happens **before** exiting the
|
# happens **before** exiting the
|
||||||
# actor nursery block
|
# actor nursery block
|
||||||
|
|
||||||
# also kill off channels cuz why not
|
# also kill off channels cuz why not
|
||||||
await agen1.aclose()
|
await agen1.aclose()
|
||||||
await agen2.aclose()
|
await agen2.aclose()
|
||||||
finally:
|
finally:
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await trio.sleep(1)
|
await trio.sleep(1)
|
||||||
|
@ -331,10 +339,12 @@ def test_close_channel_explicit(
|
||||||
use_signal,
|
use_signal,
|
||||||
arb_addr,
|
arb_addr,
|
||||||
):
|
):
|
||||||
"""Verify that closing a stream explicitly and killing the actor's
|
'''
|
||||||
|
Verify that closing a stream explicitly and killing the actor's
|
||||||
"root nursery" **before** the containing nursery tears down also
|
"root nursery" **before** the containing nursery tears down also
|
||||||
results in subactor(s) deregistering from the arbiter.
|
results in subactor(s) deregistering from the arbiter.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
with pytest.raises(KeyboardInterrupt):
|
with pytest.raises(KeyboardInterrupt):
|
||||||
trio.run(
|
trio.run(
|
||||||
partial(
|
partial(
|
||||||
|
@ -347,16 +357,18 @@ def test_close_channel_explicit(
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize('use_signal', [False, True])
|
@pytest.mark.parametrize('use_signal', [False, True])
|
||||||
def test_close_channel_explicit_remote_arbiter(
|
def test_close_channel_explicit_remote_registrar(
|
||||||
daemon,
|
daemon,
|
||||||
start_method,
|
start_method,
|
||||||
use_signal,
|
use_signal,
|
||||||
arb_addr,
|
arb_addr,
|
||||||
):
|
):
|
||||||
"""Verify that closing a stream explicitly and killing the actor's
|
'''
|
||||||
|
Verify that closing a stream explicitly and killing the actor's
|
||||||
"root nursery" **before** the containing nursery tears down also
|
"root nursery" **before** the containing nursery tears down also
|
||||||
results in subactor(s) deregistering from the arbiter.
|
results in subactor(s) deregistering from the arbiter.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
with pytest.raises(KeyboardInterrupt):
|
with pytest.raises(KeyboardInterrupt):
|
||||||
trio.run(
|
trio.run(
|
||||||
partial(
|
partial(
|
||||||
|
@ -366,3 +378,51 @@ def test_close_channel_explicit_remote_arbiter(
|
||||||
remote_arbiter=True,
|
remote_arbiter=True,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def kill_transport(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
await ctx.started()
|
||||||
|
actor: tractor.Actor = tractor.current_actor()
|
||||||
|
actor.cancel_server()
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# @pytest.mark.parametrize('use_signal', [False, True])
|
||||||
|
def test_stale_entry_is_deleted(
|
||||||
|
daemon,
|
||||||
|
start_method,
|
||||||
|
arb_addr,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Ensure that when a stale entry is detected in the registrar's table
|
||||||
|
that the `find_actor()` API takes care of deleting the stale entry
|
||||||
|
and not delivering a bad portal.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async def main():
|
||||||
|
|
||||||
|
name: str = 'transport_fails_actor'
|
||||||
|
regport: tractor.Portal
|
||||||
|
tn: tractor.ActorNursery
|
||||||
|
async with (
|
||||||
|
tractor.open_nursery() as tn,
|
||||||
|
tractor.get_registrar(*arb_addr) as regport,
|
||||||
|
):
|
||||||
|
ptl: tractor.Portal = await tn.start_actor(
|
||||||
|
name,
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
async with ptl.open_context(
|
||||||
|
kill_transport,
|
||||||
|
) as (first, ctx):
|
||||||
|
async with tractor.find_actor(name) as maybe_portal:
|
||||||
|
assert maybe_portal is None
|
||||||
|
|
||||||
|
await ptl.cancel_actor()
|
||||||
|
|
||||||
|
trio.run(main)
|
||||||
|
|
|
@ -31,6 +31,7 @@ from ._streaming import (
|
||||||
)
|
)
|
||||||
from ._discovery import (
|
from ._discovery import (
|
||||||
get_arbiter,
|
get_arbiter,
|
||||||
|
get_registrar,
|
||||||
find_actor,
|
find_actor,
|
||||||
wait_for_actor,
|
wait_for_actor,
|
||||||
query_actor,
|
query_actor,
|
||||||
|
@ -77,6 +78,7 @@ __all__ = [
|
||||||
'find_actor',
|
'find_actor',
|
||||||
'query_actor',
|
'query_actor',
|
||||||
'get_arbiter',
|
'get_arbiter',
|
||||||
|
'get_registrar',
|
||||||
'is_root_process',
|
'is_root_process',
|
||||||
'msg',
|
'msg',
|
||||||
'open_actor_cluster',
|
'open_actor_cluster',
|
||||||
|
|
|
@ -35,7 +35,7 @@ from ._state import current_actor, _runtime_vars
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def get_arbiter(
|
async def get_registrar(
|
||||||
|
|
||||||
host: str,
|
host: str,
|
||||||
port: int,
|
port: int,
|
||||||
|
@ -56,11 +56,14 @@ async def get_arbiter(
|
||||||
# (likely a re-entrant call from the arbiter actor)
|
# (likely a re-entrant call from the arbiter actor)
|
||||||
yield LocalPortal(actor, Channel((host, port)))
|
yield LocalPortal(actor, Channel((host, port)))
|
||||||
else:
|
else:
|
||||||
async with _connect_chan(host, port) as chan:
|
async with (
|
||||||
|
_connect_chan(host, port) as chan,
|
||||||
|
open_portal(chan) as arb_portal,
|
||||||
|
):
|
||||||
|
yield arb_portal
|
||||||
|
|
||||||
async with open_portal(chan) as arb_portal:
|
|
||||||
|
|
||||||
yield arb_portal
|
get_arbiter = get_registrar
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
@ -101,7 +104,10 @@ async def query_actor(
|
||||||
|
|
||||||
# TODO: return portals to all available actors - for now just
|
# TODO: return portals to all available actors - for now just
|
||||||
# the last one that registered
|
# the last one that registered
|
||||||
if name == 'arbiter' and actor.is_arbiter:
|
if (
|
||||||
|
name == 'arbiter'
|
||||||
|
and actor.is_arbiter
|
||||||
|
):
|
||||||
raise RuntimeError("The current actor is the arbiter")
|
raise RuntimeError("The current actor is the arbiter")
|
||||||
|
|
||||||
yield sockaddr if sockaddr else None
|
yield sockaddr if sockaddr else None
|
||||||
|
@ -112,7 +118,7 @@ async def find_actor(
|
||||||
name: str,
|
name: str,
|
||||||
arbiter_sockaddr: tuple[str, int] | None = None
|
arbiter_sockaddr: tuple[str, int] | None = None
|
||||||
|
|
||||||
) -> AsyncGenerator[Optional[Portal], None]:
|
) -> AsyncGenerator[Portal | None, None]:
|
||||||
'''
|
'''
|
||||||
Ask the arbiter to find actor(s) by name.
|
Ask the arbiter to find actor(s) by name.
|
||||||
|
|
||||||
|
@ -120,17 +126,49 @@ async def find_actor(
|
||||||
known to the arbiter.
|
known to the arbiter.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async with query_actor(
|
actor = current_actor()
|
||||||
name=name,
|
async with get_arbiter(
|
||||||
arbiter_sockaddr=arbiter_sockaddr,
|
*arbiter_sockaddr or actor._arb_addr
|
||||||
) as sockaddr:
|
) as arb_portal:
|
||||||
|
|
||||||
|
sockaddr = await arb_portal.run_from_ns(
|
||||||
|
'self',
|
||||||
|
'find_actor',
|
||||||
|
name=name,
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: return portals to all available actors - for now just
|
||||||
|
# the last one that registered
|
||||||
|
if (
|
||||||
|
name == 'arbiter'
|
||||||
|
and actor.is_arbiter
|
||||||
|
):
|
||||||
|
raise RuntimeError("The current actor is the arbiter")
|
||||||
|
|
||||||
if sockaddr:
|
if sockaddr:
|
||||||
async with _connect_chan(*sockaddr) as chan:
|
try:
|
||||||
async with open_portal(chan) as portal:
|
async with _connect_chan(*sockaddr) as chan:
|
||||||
yield portal
|
async with open_portal(chan) as portal:
|
||||||
else:
|
yield portal
|
||||||
yield None
|
return
|
||||||
|
|
||||||
|
# most likely we were unable to connect the
|
||||||
|
# transport and there is likely a stale entry in
|
||||||
|
# the registry actor's table, thus we need to
|
||||||
|
# instruct it to clear that stale entry and then
|
||||||
|
# more silently (pretend there was no reason but
|
||||||
|
# to) indicate that the target actor can't be
|
||||||
|
# contacted at that addr.
|
||||||
|
except OSError:
|
||||||
|
# NOTE: ensure we delete the stale entry from the
|
||||||
|
# registar actor.
|
||||||
|
uid: tuple[str, str] = await arb_portal.run_from_ns(
|
||||||
|
'self',
|
||||||
|
'delete_sockaddr',
|
||||||
|
sockaddr=sockaddr,
|
||||||
|
)
|
||||||
|
|
||||||
|
yield None
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
|
|
@ -40,6 +40,7 @@ from contextlib import ExitStack
|
||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
from async_generator import aclosing
|
from async_generator import aclosing
|
||||||
|
from bidict import bidict
|
||||||
from exceptiongroup import BaseExceptionGroup
|
from exceptiongroup import BaseExceptionGroup
|
||||||
import trio # type: ignore
|
import trio # type: ignore
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
@ -1774,10 +1775,10 @@ class Arbiter(Actor):
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs) -> None:
|
def __init__(self, *args, **kwargs) -> None:
|
||||||
|
|
||||||
self._registry: dict[
|
self._registry: bidict[
|
||||||
tuple[str, str],
|
tuple[str, str],
|
||||||
tuple[str, int],
|
tuple[str, int],
|
||||||
] = {}
|
] = bidict({})
|
||||||
self._waiters: dict[
|
self._waiters: dict[
|
||||||
str,
|
str,
|
||||||
# either an event to sync to receiving an actor uid (which
|
# either an event to sync to receiving an actor uid (which
|
||||||
|
@ -1871,3 +1872,23 @@ class Arbiter(Actor):
|
||||||
entry: tuple = self._registry.pop(uid, None)
|
entry: tuple = self._registry.pop(uid, None)
|
||||||
if entry is None:
|
if entry is None:
|
||||||
log.warning(f'Request to de-register {uid} failed?')
|
log.warning(f'Request to de-register {uid} failed?')
|
||||||
|
|
||||||
|
|
||||||
|
async def delete_sockaddr(
|
||||||
|
self,
|
||||||
|
sockaddr: tuple[str, int],
|
||||||
|
|
||||||
|
) -> tuple[str, str]:
|
||||||
|
uid: tuple | None = self._registry.inverse.pop(
|
||||||
|
sockaddr,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
if uid:
|
||||||
|
log.warning(
|
||||||
|
f'Deleting registry entry for {sockaddr}@{uid}!'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
log.warning(
|
||||||
|
f'No registry entry for {sockaddr}@{uid}!'
|
||||||
|
)
|
||||||
|
return uid
|
||||||
|
|
|
@ -216,7 +216,7 @@ def _run_asyncio_task(
|
||||||
try:
|
try:
|
||||||
result = await coro
|
result = await coro
|
||||||
except BaseException as aio_err:
|
except BaseException as aio_err:
|
||||||
log.exception('asyncio task errored')
|
# log.exception('asyncio task errored:')
|
||||||
chan._aio_err = aio_err
|
chan._aio_err = aio_err
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -300,7 +300,7 @@ def _run_asyncio_task(
|
||||||
elif task_err is None:
|
elif task_err is None:
|
||||||
assert aio_err
|
assert aio_err
|
||||||
aio_err.with_traceback(aio_err.__traceback__)
|
aio_err.with_traceback(aio_err.__traceback__)
|
||||||
log.error('infected task errorred')
|
# log.error('infected task errorred')
|
||||||
|
|
||||||
# XXX: alway cancel the scope on error
|
# XXX: alway cancel the scope on error
|
||||||
# in case the trio task is blocking
|
# in case the trio task is blocking
|
||||||
|
@ -356,7 +356,7 @@ async def translate_aio_errors(
|
||||||
# relay cancel through to called ``asyncio`` task
|
# relay cancel through to called ``asyncio`` task
|
||||||
assert chan._aio_task
|
assert chan._aio_task
|
||||||
chan._aio_task.cancel(
|
chan._aio_task.cancel(
|
||||||
msg=f'the `trio` caller task was cancelled: {trio_task.name}'
|
msg=f'`trio`-side caller task cancelled: {trio_task.name}'
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -366,7 +366,7 @@ async def translate_aio_errors(
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
# trio.BrokenResourceError,
|
# trio.BrokenResourceError,
|
||||||
):
|
):
|
||||||
aio_err = chan._aio_err
|
aio_err: BaseException = chan._aio_err
|
||||||
if (
|
if (
|
||||||
task.cancelled() and
|
task.cancelled() and
|
||||||
type(aio_err) is CancelledError
|
type(aio_err) is CancelledError
|
||||||
|
|
Loading…
Reference in New Issue