commit
85a5d1f849
|
@ -8,6 +8,7 @@ import pytest
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
|
|
||||||
|
pytest_plugins = ['pytester']
|
||||||
_arb_addr = '127.0.0.1', random.randint(1000, 9999)
|
_arb_addr = '127.0.0.1', random.randint(1000, 9999)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
"""
|
||||||
|
Multiple python programs invoking ``tractor.run()``
|
||||||
|
"""
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import signal
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import tractor
|
||||||
|
|
||||||
|
|
||||||
|
from conftest import tractor_test
|
||||||
|
|
||||||
|
|
||||||
|
def sig_prog(proc, sig):
|
||||||
|
"Kill the actor-process with ``sig``."
|
||||||
|
proc.send_signal(sig)
|
||||||
|
ret = proc.wait()
|
||||||
|
assert ret
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def daemon(loglevel, testdir, arb_addr):
|
||||||
|
cmdargs = [
|
||||||
|
sys.executable, '-c',
|
||||||
|
"import tractor; tractor.run_daemon((), arbiter_addr={}, loglevel={})"
|
||||||
|
.format(
|
||||||
|
arb_addr,
|
||||||
|
"'{}'".format(loglevel) if loglevel else None)
|
||||||
|
]
|
||||||
|
proc = testdir.popen(
|
||||||
|
cmdargs,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
)
|
||||||
|
assert not proc.returncode
|
||||||
|
time.sleep(0.2)
|
||||||
|
yield proc
|
||||||
|
# TODO: why sometimes does SIGINT not work on teardown?
|
||||||
|
sig_prog(proc, signal.SIGINT)
|
||||||
|
|
||||||
|
|
||||||
|
def test_abort_on_sigint(daemon):
|
||||||
|
assert daemon.returncode is None
|
||||||
|
time.sleep(0.1)
|
||||||
|
sig_prog(daemon, signal.SIGINT)
|
||||||
|
assert daemon.returncode == 1
|
||||||
|
# XXX: oddly, couldn't get capfd.readouterr() to work here?
|
||||||
|
assert "KeyboardInterrupt" in str(daemon.stderr.read())
|
||||||
|
|
||||||
|
|
||||||
|
@tractor_test
|
||||||
|
async def test_cancel_remote_arbiter(daemon, arb_addr):
|
||||||
|
async with tractor.get_arbiter(*arb_addr) as portal:
|
||||||
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
time.sleep(0.1)
|
||||||
|
# the arbiter channel server is cancelled but not its main task
|
||||||
|
assert daemon.returncode is None
|
||||||
|
|
||||||
|
# no arbiter socket should exist
|
||||||
|
with pytest.raises(OSError):
|
||||||
|
async with tractor.get_arbiter(*arb_addr) as portal:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@tractor_test
|
||||||
|
async def test_register_duplicate_name(daemon):
|
||||||
|
async with tractor.open_nursery() as n:
|
||||||
|
p1 = await n.start_actor('doggy')
|
||||||
|
p2 = await n.start_actor('doggy')
|
||||||
|
|
||||||
|
async with tractor.wait_for_actor('doggy') as portal:
|
||||||
|
assert portal.channel.uid in (p2.channel.uid, p1.channel.uid)
|
||||||
|
|
||||||
|
await n.cancel()
|
|
@ -2,7 +2,9 @@
|
||||||
tractor: An actor model micro-framework built on
|
tractor: An actor model micro-framework built on
|
||||||
``trio`` and ``multiprocessing``.
|
``trio`` and ``multiprocessing``.
|
||||||
"""
|
"""
|
||||||
|
import importlib
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
from typing import Tuple, Any, Optional
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
import trio # type: ignore
|
import trio # type: ignore
|
||||||
|
@ -35,10 +37,10 @@ _default_arbiter_port = 1616
|
||||||
|
|
||||||
async def _main(
|
async def _main(
|
||||||
async_fn: typing.Callable[..., typing.Awaitable],
|
async_fn: typing.Callable[..., typing.Awaitable],
|
||||||
args: typing.Tuple,
|
args: Tuple,
|
||||||
kwargs: typing.Dict[str, typing.Any],
|
kwargs: typing.Dict[str, typing.Any],
|
||||||
name: str,
|
name: str,
|
||||||
arbiter_addr: typing.Tuple[str, int]
|
arbiter_addr: Tuple[str, int]
|
||||||
) -> typing.Any:
|
) -> typing.Any:
|
||||||
"""Async entry point for ``tractor``.
|
"""Async entry point for ``tractor``.
|
||||||
"""
|
"""
|
||||||
|
@ -54,7 +56,7 @@ async def _main(
|
||||||
async with _connect_chan(host, port):
|
async with _connect_chan(host, port):
|
||||||
arbiter_found = True
|
arbiter_found = True
|
||||||
except OSError:
|
except OSError:
|
||||||
log.warn(f"No actor could be found @ {host}:{port}")
|
log.warning(f"No actor could be found @ {host}:{port}")
|
||||||
|
|
||||||
# create a local actor and start up its main routine/task
|
# create a local actor and start up its main routine/task
|
||||||
if arbiter_found: # we were able to connect to an arbiter
|
if arbiter_found: # we were able to connect to an arbiter
|
||||||
|
@ -81,13 +83,28 @@ async def _main(
|
||||||
|
|
||||||
def run(
|
def run(
|
||||||
async_fn: typing.Callable[..., typing.Awaitable],
|
async_fn: typing.Callable[..., typing.Awaitable],
|
||||||
*args: typing.Tuple,
|
*args: Tuple,
|
||||||
name: str = None,
|
name: str = None,
|
||||||
arbiter_addr: typing.Tuple[str, int] = (_default_arbiter_host, _default_arbiter_port),
|
arbiter_addr: Tuple[str, int] = (
|
||||||
|
_default_arbiter_host, _default_arbiter_port),
|
||||||
**kwargs: typing.Dict[str, typing.Any],
|
**kwargs: typing.Dict[str, typing.Any],
|
||||||
):
|
) -> Any:
|
||||||
"""Run a trio-actor async function in process.
|
"""Run a trio-actor async function in process.
|
||||||
|
|
||||||
This is tractor's main entry and the start point for any async actor.
|
This is tractor's main entry and the start point for any async actor.
|
||||||
"""
|
"""
|
||||||
return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr)
|
return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr)
|
||||||
|
|
||||||
|
|
||||||
|
def run_daemon(
|
||||||
|
rpc_modules: Tuple[str],
|
||||||
|
**kwargs
|
||||||
|
) -> None:
|
||||||
|
"""Spawn a single daemon-actor which will repond to RPC.
|
||||||
|
"""
|
||||||
|
kwargs['rpc_module_paths'] = rpc_modules
|
||||||
|
|
||||||
|
for path in rpc_modules:
|
||||||
|
importlib.import_module(path)
|
||||||
|
|
||||||
|
return run(partial(trio.sleep, float('inf')), **kwargs)
|
||||||
|
|
|
@ -233,7 +233,7 @@ class Actor:
|
||||||
try:
|
try:
|
||||||
uid = await _do_handshake(self, chan)
|
uid = await _do_handshake(self, chan)
|
||||||
except StopAsyncIteration:
|
except StopAsyncIteration:
|
||||||
log.warn(f"Channel {chan} failed to handshake")
|
log.warning(f"Channel {chan} failed to handshake")
|
||||||
return
|
return
|
||||||
|
|
||||||
# channel tracking
|
# channel tracking
|
||||||
|
@ -248,7 +248,7 @@ class Actor:
|
||||||
|
|
||||||
chans = self._peers[uid]
|
chans = self._peers[uid]
|
||||||
if chans:
|
if chans:
|
||||||
log.warn(
|
log.warning(
|
||||||
f"already have channel(s) for {uid}:{chans}?"
|
f"already have channel(s) for {uid}:{chans}?"
|
||||||
)
|
)
|
||||||
log.debug(f"Registered {chan} for {uid}")
|
log.debug(f"Registered {chan} for {uid}")
|
||||||
|
@ -400,7 +400,7 @@ class Actor:
|
||||||
parent_addr: Tuple[str, int] = None
|
parent_addr: Tuple[str, int] = None
|
||||||
) -> None:
|
) -> None:
|
||||||
# after fork routine which invokes a fresh ``trio.run``
|
# after fork routine which invokes a fresh ``trio.run``
|
||||||
# log.warn("Log level after fork is {self.loglevel}")
|
# log.warning("Log level after fork is {self.loglevel}")
|
||||||
self._forkserver_info = forkserver_info
|
self._forkserver_info = forkserver_info
|
||||||
from ._trionics import ctx
|
from ._trionics import ctx
|
||||||
if self.loglevel is not None:
|
if self.loglevel is not None:
|
||||||
|
@ -456,7 +456,7 @@ class Actor:
|
||||||
# initial handshake, report who we are, who they are
|
# initial handshake, report who we are, who they are
|
||||||
await _do_handshake(self, chan)
|
await _do_handshake(self, chan)
|
||||||
except OSError: # failed to connect
|
except OSError: # failed to connect
|
||||||
log.warn(
|
log.warning(
|
||||||
f"Failed to connect to parent @ {parent_addr},"
|
f"Failed to connect to parent @ {parent_addr},"
|
||||||
" closing server")
|
" closing server")
|
||||||
await self.cancel()
|
await self.cancel()
|
||||||
|
@ -555,7 +555,7 @@ class Actor:
|
||||||
await arb_portal.run(
|
await arb_portal.run(
|
||||||
'self', 'unregister_actor', uid=self.uid)
|
'self', 'unregister_actor', uid=self.uid)
|
||||||
except OSError:
|
except OSError:
|
||||||
log.warn(f"Unable to unregister {self.name} from arbiter")
|
log.warning(f"Unable to unregister {self.name} from arbiter")
|
||||||
|
|
||||||
async def cancel(self) -> None:
|
async def cancel(self) -> None:
|
||||||
"""Cancel this actor.
|
"""Cancel this actor.
|
||||||
|
@ -668,7 +668,8 @@ class Arbiter(Actor):
|
||||||
events = self._waiters.pop(name, ())
|
events = self._waiters.pop(name, ())
|
||||||
self._waiters.setdefault(name, []).append(uid)
|
self._waiters.setdefault(name, []).append(uid)
|
||||||
for event in events:
|
for event in events:
|
||||||
event.set()
|
if isinstance(event, trio.Event):
|
||||||
|
event.set()
|
||||||
|
|
||||||
def unregister_actor(self, uid: Tuple[str, str]) -> None:
|
def unregister_actor(self, uid: Tuple[str, str]) -> None:
|
||||||
self._registry.pop(uid, None)
|
self._registry.pop(uid, None)
|
||||||
|
|
|
@ -158,11 +158,11 @@ class Channel:
|
||||||
await self.connect()
|
await self.connect()
|
||||||
cancelled = cancel_scope.cancelled_caught
|
cancelled = cancel_scope.cancelled_caught
|
||||||
if cancelled:
|
if cancelled:
|
||||||
log.warn(
|
log.warning(
|
||||||
"Reconnect timed out after 3 seconds, retrying...")
|
"Reconnect timed out after 3 seconds, retrying...")
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
log.warn("Stream connection re-established!")
|
log.warning("Stream connection re-established!")
|
||||||
# run any reconnection sequence
|
# run any reconnection sequence
|
||||||
on_recon = self._recon_seq
|
on_recon = self._recon_seq
|
||||||
if on_recon:
|
if on_recon:
|
||||||
|
@ -171,7 +171,7 @@ class Channel:
|
||||||
except (OSError, ConnectionRefusedError):
|
except (OSError, ConnectionRefusedError):
|
||||||
if not down:
|
if not down:
|
||||||
down = True
|
down = True
|
||||||
log.warn(
|
log.warning(
|
||||||
f"Connection to {self.raddr} went down, waiting"
|
f"Connection to {self.raddr} went down, waiting"
|
||||||
" for re-establishment")
|
" for re-establishment")
|
||||||
await trio.sleep(1)
|
await trio.sleep(1)
|
||||||
|
|
|
@ -186,7 +186,7 @@ class Portal:
|
||||||
async def cancel_actor(self) -> bool:
|
async def cancel_actor(self) -> bool:
|
||||||
"""Cancel the actor on the other end of this portal.
|
"""Cancel the actor on the other end of this portal.
|
||||||
"""
|
"""
|
||||||
log.warn(
|
log.warning(
|
||||||
f"Sending cancel request to {self.channel.uid} on "
|
f"Sending cancel request to {self.channel.uid} on "
|
||||||
f"{self.channel}")
|
f"{self.channel}")
|
||||||
try:
|
try:
|
||||||
|
@ -196,11 +196,11 @@ class Portal:
|
||||||
await self.run('self', 'cancel')
|
await self.run('self', 'cancel')
|
||||||
return True
|
return True
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError:
|
||||||
log.warn(
|
log.warning(
|
||||||
f"{self.channel} for {self.channel.uid} was already closed?")
|
f"{self.channel} for {self.channel.uid} was already closed?")
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
log.warn(f"May have failed to cancel {self.channel.uid}")
|
log.warning(f"May have failed to cancel {self.channel.uid}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -153,7 +153,7 @@ class ActorNursery:
|
||||||
# if it's an async-gen then we should alert the user
|
# if it's an async-gen then we should alert the user
|
||||||
# that we're cancelling it
|
# that we're cancelling it
|
||||||
if inspect.isasyncgen(res):
|
if inspect.isasyncgen(res):
|
||||||
log.warn(
|
log.warning(
|
||||||
f"Blindly consuming asyncgen for {actor.uid}")
|
f"Blindly consuming asyncgen for {actor.uid}")
|
||||||
with trio.fail_after(1):
|
with trio.fail_after(1):
|
||||||
async with aclosing(res) as agen:
|
async with aclosing(res) as agen:
|
||||||
|
@ -177,7 +177,7 @@ class ActorNursery:
|
||||||
self._children.pop(actor.uid)
|
self._children.pop(actor.uid)
|
||||||
# proc terminated, cancel result waiter
|
# proc terminated, cancel result waiter
|
||||||
if cancel_scope:
|
if cancel_scope:
|
||||||
log.warn(
|
log.warning(
|
||||||
f"Cancelling existing result waiter task for {actor.uid}")
|
f"Cancelling existing result waiter task for {actor.uid}")
|
||||||
cancel_scope.cancel()
|
cancel_scope.cancel()
|
||||||
|
|
||||||
|
@ -194,7 +194,7 @@ class ActorNursery:
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
log.warn("Result waiter was cancelled")
|
log.warning("Result waiter was cancelled")
|
||||||
|
|
||||||
# unblocks when all waiter tasks have completed
|
# unblocks when all waiter tasks have completed
|
||||||
children = self._children.copy()
|
children = self._children.copy()
|
||||||
|
@ -213,7 +213,7 @@ class ActorNursery:
|
||||||
directly without any far end graceful ``trio`` cancellation.
|
directly without any far end graceful ``trio`` cancellation.
|
||||||
"""
|
"""
|
||||||
def do_hard_kill(proc):
|
def do_hard_kill(proc):
|
||||||
log.warn(f"Hard killing subactors {self._children}")
|
log.warning(f"Hard killing subactors {self._children}")
|
||||||
proc.terminate()
|
proc.terminate()
|
||||||
# XXX: below doesn't seem to work?
|
# XXX: below doesn't seem to work?
|
||||||
# send KeyBoardInterrupt (trio abort signal) to sub-actors
|
# send KeyBoardInterrupt (trio abort signal) to sub-actors
|
||||||
|
@ -228,7 +228,7 @@ class ActorNursery:
|
||||||
else:
|
else:
|
||||||
if portal is None: # actor hasn't fully spawned yet
|
if portal is None: # actor hasn't fully spawned yet
|
||||||
event = self._actor._peer_connected[subactor.uid]
|
event = self._actor._peer_connected[subactor.uid]
|
||||||
log.warn(
|
log.warning(
|
||||||
f"{subactor.uid} wasn't finished spawning?")
|
f"{subactor.uid} wasn't finished spawning?")
|
||||||
await event.wait()
|
await event.wait()
|
||||||
# channel/portal should now be up
|
# channel/portal should now be up
|
||||||
|
@ -260,7 +260,7 @@ class ActorNursery:
|
||||||
# else block here might not complete? Should both be shielded?
|
# else block here might not complete? Should both be shielded?
|
||||||
with trio.open_cancel_scope(shield=True):
|
with trio.open_cancel_scope(shield=True):
|
||||||
if etype is trio.Cancelled:
|
if etype is trio.Cancelled:
|
||||||
log.warn(
|
log.warning(
|
||||||
f"{current_actor().uid} was cancelled with {etype}"
|
f"{current_actor().uid} was cancelled with {etype}"
|
||||||
", cancelling actor nursery")
|
", cancelling actor nursery")
|
||||||
await self.cancel()
|
await self.cancel()
|
||||||
|
@ -276,7 +276,7 @@ class ActorNursery:
|
||||||
try:
|
try:
|
||||||
await self.wait()
|
await self.wait()
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
log.warn(f"Nursery caught {err}, cancelling")
|
log.warning(f"Nursery caught {err}, cancelling")
|
||||||
await self.cancel()
|
await self.cancel()
|
||||||
raise
|
raise
|
||||||
log.debug(f"Nursery teardown complete")
|
log.debug(f"Nursery teardown complete")
|
||||||
|
|
Loading…
Reference in New Issue