Merge pull request #144 from goodboy/dereg_on_channel_aclose

Fix for dereg failure on manual stream close leading to an internal nursery composition rework.
matrix
goodboy 2020-08-13 13:56:47 -04:00 committed by GitHub
commit 4da16325f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 408 additions and 193 deletions

View File

@ -2,7 +2,7 @@
# #
# tractor: a trionic actor model built on `multiprocessing` and `trio` # tractor: a trionic actor model built on `multiprocessing` and `trio`
# #
# Copyright (C) 2018 Tyler Goodlet # Copyright (C) 2018-2020 Tyler Goodlet
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by # it under the terms of the GNU General Public License as published by

View File

@ -124,9 +124,13 @@ def sig_prog(proc, sig):
def daemon(loglevel, testdir, arb_addr): def daemon(loglevel, testdir, arb_addr):
"""Run a daemon actor as a "remote arbiter". """Run a daemon actor as a "remote arbiter".
""" """
if loglevel in ('trace', 'debug'):
# too much logging will lock up the subproc (smh)
loglevel = 'info'
cmdargs = [ cmdargs = [
sys.executable, '-c', sys.executable, '-c',
"import tractor; tractor.run_daemon((), arbiter_addr={}, loglevel={})" "import tractor; tractor.run_daemon([], arbiter_addr={}, loglevel={})"
.format( .format(
arb_addr, arb_addr,
"'{}'".format(loglevel) if loglevel else None) "'{}'".format(loglevel) if loglevel else None)

View File

@ -196,11 +196,13 @@ def test_subactors_unregister_on_cancel(
""" """
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
tractor.run( tractor.run(
spawn_and_check_registry, partial(
arb_addr, spawn_and_check_registry,
use_signal, arb_addr,
False, use_signal,
with_streaming, remote_arbiter=False,
with_streaming=with_streaming,
),
arbiter_addr=arb_addr arbiter_addr=arb_addr
) )
@ -220,11 +222,118 @@ def test_subactors_unregister_on_cancel_remote_daemon(
""" """
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
tractor.run( tractor.run(
spawn_and_check_registry, partial(
arb_addr, spawn_and_check_registry,
use_signal, arb_addr,
True, use_signal,
with_streaming, remote_arbiter=True,
with_streaming=with_streaming,
),
# XXX: required to use remote daemon!
arbiter_addr=arb_addr
)
async def streamer(agen):
async for item in agen:
print(item)
async def close_chans_before_nursery(
arb_addr: tuple,
use_signal: bool,
remote_arbiter: bool = False,
) -> None:
# logic for how many actors should still be
# in the registry at teardown.
if remote_arbiter:
entries_at_end = 2
else:
entries_at_end = 1
async with tractor.get_arbiter(*arb_addr) as aportal:
try:
get_reg = partial(aportal.run, 'self', 'get_registry')
async with tractor.open_nursery() as tn:
portal1 = await tn.run_in_actor('consumer1', stream_forever)
agen1 = await portal1.result()
portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__])
agen2 = await portal2.run(__name__, 'stream_forever')
async with trio.open_nursery() as n:
n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
# XXX: THIS IS THE KEY THING that happens
# **before** exiting the actor nursery block
# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()
finally:
with trio.CancelScope(shield=True):
await trio.sleep(.5)
# all subactors should have de-registered
registry = await get_reg()
assert portal1.channel.uid not in registry
assert portal2.channel.uid not in registry
assert len(registry) == entries_at_end
@pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit(
start_method,
use_signal,
arb_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
"""
with pytest.raises(KeyboardInterrupt):
tractor.run(
partial(
close_chans_before_nursery,
arb_addr,
use_signal,
remote_arbiter=False,
),
# XXX: required to use remote daemon!
arbiter_addr=arb_addr
)
@pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter(
daemon,
start_method,
use_signal,
arb_addr,
):
"""Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
"""
with pytest.raises(KeyboardInterrupt):
tractor.run(
partial(
close_chans_before_nursery,
arb_addr,
use_signal,
remote_arbiter=True,
),
# XXX: required to use remote daemon! # XXX: required to use remote daemon!
arbiter_addr=arb_addr arbiter_addr=arb_addr
) )

View File

@ -31,7 +31,7 @@ def test_no_main():
@tractor_test @tractor_test
async def test_self_is_registered(): async def test_self_is_registered(arb_addr):
"Verify waiting on the arbiter to register itself using the standard api." "Verify waiting on the arbiter to register itself using the standard api."
actor = tractor.current_actor() actor = tractor.current_actor()
assert actor.is_arbiter assert actor.is_arbiter

View File

@ -204,18 +204,18 @@ def test_multi_actor_subs_arbiter_pub(
await trio.sleep(0.5) await trio.sleep(0.5)
await even_portal.cancel_actor() await even_portal.cancel_actor()
await trio.sleep(0.5) await trio.sleep(1)
if pub_actor == 'arbiter': if pub_actor == 'arbiter':
assert 'even' not in get_topics() assert 'even' not in get_topics()
await odd_portal.cancel_actor() await odd_portal.cancel_actor()
await trio.sleep(1) await trio.sleep(2)
if pub_actor == 'arbiter': if pub_actor == 'arbiter':
while get_topics(): while get_topics():
await trio.sleep(0.1) await trio.sleep(0.1)
if time.time() - start > 1: if time.time() - start > 2:
pytest.fail("odds subscription never dropped?") pytest.fail("odds subscription never dropped?")
else: else:
await master_portal.cancel_actor() await master_portal.cancel_actor()

View File

@ -4,7 +4,7 @@ tractor: An actor model micro-framework built on
""" """
import importlib import importlib
from functools import partial from functools import partial
from typing import Tuple, Any, Optional from typing import Tuple, Any, Optional, List
import typing import typing
import trio # type: ignore import trio # type: ignore
@ -115,7 +115,7 @@ def run(
def run_daemon( def run_daemon(
rpc_module_paths: Tuple[str], rpc_module_paths: List[str],
**kwargs **kwargs
) -> None: ) -> None:
"""Spawn daemon actor which will respond to RPC. """Spawn daemon actor which will respond to RPC.
@ -124,7 +124,7 @@ def run_daemon(
``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned ``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned
is meant to run forever responding to RPC requests. is meant to run forever responding to RPC requests.
""" """
kwargs['rpc_module_paths'] = rpc_module_paths kwargs['rpc_module_paths'] = list(rpc_module_paths)
for path in rpc_module_paths: for path in rpc_module_paths:
importlib.import_module(path) importlib.import_module(path)

View File

@ -167,9 +167,10 @@ class Actor:
""" """
is_arbiter: bool = False is_arbiter: bool = False
# placeholders filled in by `_async_main` after fork # nursery placeholders filled in by `_async_main()` after fork
_root_nursery: trio.Nursery _root_n: Optional[trio.Nursery] = None
_server_nursery: trio.Nursery _service_n: Optional[trio.Nursery] = None
_server_n: Optional[trio.Nursery] = None
# Information about `__main__` from parent # Information about `__main__` from parent
_parent_main_data: Dict[str, str] _parent_main_data: Dict[str, str]
@ -293,7 +294,7 @@ class Actor:
) -> None: ) -> None:
"""Entry point for new inbound connections to the channel server. """Entry point for new inbound connections to the channel server.
""" """
self._no_more_peers = trio.Event() self._no_more_peers = trio.Event() # unset
chan = Channel(stream=stream) chan = Channel(stream=stream)
log.info(f"New connection to us {chan}") log.info(f"New connection to us {chan}")
@ -427,13 +428,13 @@ class Actor:
msg = None msg = None
log.debug(f"Entering msg loop for {chan} from {chan.uid}") log.debug(f"Entering msg loop for {chan} from {chan.uid}")
try: try:
with trio.CancelScope(shield=shield) as cs: with trio.CancelScope(shield=shield) as loop_cs:
# this internal scope allows for keeping this message # this internal scope allows for keeping this message
# loop running despite the current task having been # loop running despite the current task having been
# cancelled (eg. `open_portal()` may call this method from # cancelled (eg. `open_portal()` may call this method from
# a locally spawned task) and recieve this scope using # a locally spawned task) and recieve this scope using
# ``scope = Nursery.start()`` # ``scope = Nursery.start()``
task_status.started(cs) task_status.started(loop_cs)
async for msg in chan: async for msg in chan:
if msg is None: # loop terminate sentinel if msg is None: # loop terminate sentinel
log.debug( log.debug(
@ -496,7 +497,8 @@ class Actor:
# spin up a task for the requested function # spin up a task for the requested function
log.debug(f"Spawning task for {func}") log.debug(f"Spawning task for {func}")
cs = await self._root_nursery.start( assert self._service_n
cs = await self._service_n.start(
partial(_invoke, self, cid, chan, func, kwargs), partial(_invoke, self, cid, chan, func, kwargs),
name=funcname, name=funcname,
) )
@ -514,6 +516,13 @@ class Actor:
# cancelled gracefully if requested # cancelled gracefully if requested
self._rpc_tasks[(chan, cid)] = ( self._rpc_tasks[(chan, cid)] = (
cs, func, trio.Event()) cs, func, trio.Event())
else:
# self.cancel() was called so kill this msg loop
# and break out into ``_async_main()``
log.warning(f"{self.uid} was remotely cancelled")
loop_cs.cancel()
break
log.debug( log.debug(
f"Waiting on next msg for {chan} from {chan.uid}") f"Waiting on next msg for {chan} from {chan.uid}")
else: else:
@ -540,6 +549,47 @@ class Actor:
f"Exiting msg loop for {chan} from {chan.uid} " f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}") f"with last msg:\n{msg}")
async def _chan_to_parent(
self,
parent_addr: Optional[Tuple[str, int]],
) -> Tuple[Channel, Optional[Tuple[str, int]]]:
try:
# Connect back to the parent actor and conduct initial
# handshake. From this point on if we error, we
# attempt to ship the exception back to the parent.
chan = Channel(
destaddr=parent_addr,
)
await chan.connect()
# Initial handshake: swap names.
await self._do_handshake(chan)
accept_addr: Optional[Tuple[str, int]] = None
if self._spawn_method == "trio":
# Receive runtime state from our parent
parent_data = await chan.recv()
log.debug(
"Recieved state from parent:\n"
f"{parent_data}"
)
accept_addr = (
parent_data.pop('bind_host'),
parent_data.pop('bind_port'),
)
for attr, value in parent_data.items():
setattr(self, attr, value)
return chan, accept_addr
except OSError: # failed to connect
log.warning(
f"Failed to connect to parent @ {parent_addr},"
" closing server")
await self.cancel()
raise
async def _async_main( async def _async_main(
self, self,
accept_addr: Optional[Tuple[str, int]] = None, accept_addr: Optional[Tuple[str, int]] = None,
@ -561,88 +611,92 @@ class Actor:
""" """
registered_with_arbiter = False registered_with_arbiter = False
try: try:
async with trio.open_nursery() as nursery:
self._root_nursery = nursery
# TODO: just make `parent_addr` a bool system (see above)? # establish primary connection with immediate parent
if parent_addr is not None: self._parent_chan = None
try: if parent_addr is not None:
# Connect back to the parent actor and conduct initial self._parent_chan, accept_addr_from_rent = await self._chan_to_parent(
# handshake. From this point on if we error, we parent_addr)
# attempt to ship the exception back to the parent.
chan = self._parent_chan = Channel( # either it's passed in because we're not a child
destaddr=parent_addr, # or because we're running in mp mode
if accept_addr_from_rent is not None:
accept_addr = accept_addr_from_rent
# load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent
# but **before** starting the message loop for that channel
# such that import errors are properly propagated upwards
self.load_modules()
# The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in
# a deterministic way.
async with trio.open_nursery() as root_nursery:
self._root_n = root_nursery
assert self._root_n
async with trio.open_nursery() as service_nursery:
# This nursery is used to handle all inbound
# connections to us such that if the TCP server
# is killed, connections can continue to process
# in the background until this nursery is cancelled.
self._service_n = service_nursery
assert self._service_n
# Startup up the channel server with,
# - subactor: the bind address is sent by our parent
# over our established channel
# - root actor: the ``accept_addr`` passed to this method
assert accept_addr
host, port = accept_addr
self._server_n = await service_nursery.start(
partial(
self._serve_forever,
service_nursery,
accept_host=host,
accept_port=port
) )
await chan.connect()
# Initial handshake: swap names.
await self._do_handshake(chan)
if self._spawn_method == "trio":
# Receive runtime state from our parent
parent_data = await chan.recv()
log.debug(
"Recieved state from parent:\n"
f"{parent_data}"
)
accept_addr = (
parent_data.pop('bind_host'),
parent_data.pop('bind_port'),
)
for attr, value in parent_data.items():
setattr(self, attr, value)
except OSError: # failed to connect
log.warning(
f"Failed to connect to parent @ {parent_addr},"
" closing server")
await self.cancel()
self._parent_chan = None
raise
# load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent
# but **before** starting the message loop for that channel
# such that import errors are properly propagated upwards
self.load_modules()
# Startup up channel server with,
# - subactor: the bind address sent to us by our parent
# over our established channel
# - root actor: the ``accept_addr`` passed to this method
assert accept_addr
host, port = accept_addr
await nursery.start(
partial(
self._serve_forever,
accept_host=host,
accept_port=port
) )
)
# Begin handling our new connection back to parent. # Register with the arbiter if we're told its addr
# This is done here since we don't want to start log.debug(f"Registering {self} for role `{self.name}`")
# processing parent requests until our server is assert isinstance(self._arb_addr, tuple)
# 100% up and running.
if self._parent_chan: async with get_arbiter(*self._arb_addr) as arb_portal:
nursery.start_soon( await arb_portal.run(
self._process_messages, self._parent_chan) 'self',
'register_actor',
uid=self.uid,
sockaddr=self.accept_addr,
)
# Register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`")
assert isinstance(self._arb_addr, tuple)
async with get_arbiter(*self._arb_addr) as arb_portal:
await arb_portal.run(
'self', 'register_actor',
uid=self.uid, sockaddr=self.accept_addr)
registered_with_arbiter = True registered_with_arbiter = True
task_status.started() # init steps complete
log.debug("Waiting on root nursery to complete") task_status.started()
# Blocks here as expected until the channel server is # Begin handling our new connection back to our
# parent. This is done last since we don't want to
# start processing parent requests until our channel
# server is 100% up and running.
if self._parent_chan:
await root_nursery.start(
partial(
self._process_messages,
self._parent_chan,
shield=True,
)
)
log.info("Waiting on service nursery to complete")
log.info("Service nursery complete")
log.info("Waiting on root nursery to complete")
# Blocks here as expected until the root nursery is
# killed (i.e. this actor is cancelled or signalled by the parent) # killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as err: except (trio.MultiError, Exception) as err:
if not registered_with_arbiter: if not registered_with_arbiter:
# TODO: I guess we could try to connect back # TODO: I guess we could try to connect back
# to the parent through a channel and engage a debugger # to the parent through a channel and engage a debugger
@ -658,100 +712,123 @@ class Actor:
) )
if self._parent_chan: if self._parent_chan:
try: with trio.CancelScope(shield=True):
# internal error so ship to parent without cid try:
await self._parent_chan.send(pack_error(err)) # internal error so ship to parent without cid
except trio.ClosedResourceError: await self._parent_chan.send(pack_error(err))
log.error( except trio.ClosedResourceError:
f"Failed to ship error to parent " log.error(
f"{self._parent_chan.uid}, channel was closed") f"Failed to ship error to parent "
log.exception("Actor errored:") f"{self._parent_chan.uid}, channel was closed")
if isinstance(err, ModuleNotFoundError): # always!
raise log.exception("Actor errored:")
else: raise
# XXX wait, why?
# causes a hang if I always raise..
# A parent process does something weird here?
# i'm so lost now..
raise
finally: finally:
if registered_with_arbiter: log.info("Root nursery complete")
# with trio.move_on_after(3) as cs:
# cs.shield = True
await self._do_unreg(self._arb_addr)
# terminate actor once all it's peers (actors that connected # Unregister actor from the arbiter
# to it as clients) have disappeared if registered_with_arbiter and (
self._arb_addr is not None
):
failed = False
with trio.move_on_after(5) as cs:
cs.shield = True
try:
async with get_arbiter(*self._arb_addr) as arb_portal:
await arb_portal.run(
'self', 'unregister_actor', uid=self.uid)
except OSError:
failed = True
if cs.cancelled_caught:
failed = True
if failed:
log.warning(
f"Failed to unregister {self.name} from arbiter")
# Ensure all peers (actors connected to us as clients) are finished
if not self._no_more_peers.is_set(): if not self._no_more_peers.is_set():
if any( if any(
chan.connected() for chan in chain(*self._peers.values()) chan.connected() for chan in chain(*self._peers.values())
): ):
log.debug( log.debug(
f"Waiting for remaining peers {self._peers} to clear") f"Waiting for remaining peers {self._peers} to clear")
await self._no_more_peers.wait() with trio.CancelScope(shield=True):
await self._no_more_peers.wait()
log.debug("All peer channels are complete") log.debug("All peer channels are complete")
# tear down channel server no matter what since we errored log.debug("Runtime completed")
# or completed
self.cancel_server()
async def _serve_forever( async def _serve_forever(
self, self,
handler_nursery: trio.Nursery,
*, *,
# (host, port) to bind for channel server # (host, port) to bind for channel server
accept_host: Tuple[str, int] = None, accept_host: Tuple[str, int] = None,
accept_port: int = 0, accept_port: int = 0,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Start the channel server, begin listening for new connections. """Start the channel server, begin listening for new connections.
This will cause an actor to continue living (blocking) until This will cause an actor to continue living (blocking) until
``cancel_server()`` is called. ``cancel_server()`` is called.
""" """
async with trio.open_nursery() as nursery: self._server_down = trio.Event()
self._server_nursery = nursery
# TODO: might want to consider having a separate nursery
# for the stream handler such that the server can be cancelled
# whilst leaving existing channels up
listeners: List[trio.abc.Listener] = await nursery.start(
partial(
trio.serve_tcp,
self._stream_handler,
# new connections will stay alive even if this server
# is cancelled
handler_nursery=self._root_nursery,
port=accept_port, host=accept_host,
)
)
log.debug("Started tcp server(s) on" # type: ignore
f" {[l.socket for l in listeners]}")
self._listeners.extend(listeners)
task_status.started()
async def _do_unreg(self, arbiter_addr: Optional[Tuple[str, int]]) -> None:
# UNregister actor from the arbiter
try: try:
if arbiter_addr is not None: async with trio.open_nursery() as server_n:
async with get_arbiter(*arbiter_addr) as arb_portal: listeners: List[trio.abc.Listener] = await server_n.start(
await arb_portal.run( partial(
'self', 'unregister_actor', uid=self.uid) trio.serve_tcp,
except OSError: self._stream_handler,
log.warning(f"Unable to unregister {self.name} from arbiter") # new connections will stay alive even if this server
# is cancelled
handler_nursery=handler_nursery,
port=accept_port,
host=accept_host,
)
)
log.debug("Started tcp server(s) on" # type: ignore
f" {[l.socket for l in listeners]}")
self._listeners.extend(listeners)
task_status.started(server_n)
finally:
# signal the server is down since nursery above terminated
self._server_down.set()
async def cancel(self) -> None: async def cancel(self) -> bool:
"""Cancel this actor. """Cancel this actor.
The sequence in order is: The "deterministic" teardown sequence in order is:
- cancelling all rpc tasks - cancel all ongoing rpc tasks by cancel scope
- cancelling the channel server - cancel the channel server to prevent new inbound
- cancel the "root" nursery connections
- cancel the "service" nursery reponsible for
spawning new rpc tasks
- return control the parent channel message loop
""" """
# cancel all ongoing rpc tasks # cancel all ongoing rpc tasks
await self.cancel_rpc_tasks() with trio.CancelScope(shield=True):
self.cancel_server() # kill all ongoing tasks
self._root_nursery.cancel_scope.cancel() await self.cancel_rpc_tasks()
# stop channel server
self.cancel_server()
await self._server_down.wait()
# rekt all channel loops
if self._service_n:
self._service_n.cancel_scope.cancel()
return True
# XXX: hard kill logic if needed?
# def _hard_mofo_kill(self):
# # If we're the root actor or zombied kill everything
# if self._parent_chan is None: # TODO: more robust check
# root = trio.lowlevel.current_root_task()
# for n in root.child_nurseries:
# n.cancel_scope.cancel()
async def _cancel_task(self, cid, chan): async def _cancel_task(self, cid, chan):
"""Cancel a local task by call-id / channel. """Cancel a local task by call-id / channel.
@ -804,11 +881,12 @@ class Actor:
"""Cancel the internal channel server nursery thereby """Cancel the internal channel server nursery thereby
preventing any new inbound connections from being established. preventing any new inbound connections from being established.
""" """
log.debug("Shutting down channel server") if self._server_n:
self._server_nursery.cancel_scope.cancel() log.debug("Shutting down channel server")
self._server_n.cancel_scope.cancel()
@property @property
def accept_addr(self) -> Tuple[str, int]: def accept_addr(self) -> Optional[Tuple[str, int]]:
"""Primary address to which the channel server is bound. """Primary address to which the channel server is bound.
""" """
# throws OSError on failure # throws OSError on failure

View File

@ -22,7 +22,8 @@ log = get_logger('tractor')
@asynccontextmanager @asynccontextmanager
async def maybe_open_nursery( async def maybe_open_nursery(
nursery: trio.Nursery = None nursery: trio.Nursery = None,
shield: bool = False,
) -> typing.AsyncGenerator[trio.Nursery, Any]: ) -> typing.AsyncGenerator[trio.Nursery, Any]:
"""Create a new nursery if None provided. """Create a new nursery if None provided.
@ -32,6 +33,7 @@ async def maybe_open_nursery(
yield nursery yield nursery
else: else:
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
nursery.cancel_scope.shield = shield
yield nursery yield nursery
@ -275,6 +277,8 @@ class Portal:
f"{self.channel}") f"{self.channel}")
try: try:
# send cancel cmd - might not get response # send cancel cmd - might not get response
# XXX: sure would be nice to make this work with a proper shield
# with trio.CancelScope(shield=True):
with trio.move_on_after(0.5) as cancel_scope: with trio.move_on_after(0.5) as cancel_scope:
cancel_scope.shield = True cancel_scope.shield = True
await self.run('self', 'cancel') await self.run('self', 'cancel')
@ -314,7 +318,9 @@ class LocalPortal:
@asynccontextmanager @asynccontextmanager
async def open_portal( async def open_portal(
channel: Channel, channel: Channel,
nursery: Optional[trio.Nursery] = None nursery: Optional[trio.Nursery] = None,
start_msg_loop: bool = True,
shield: bool = False,
) -> typing.AsyncGenerator[Portal, None]: ) -> typing.AsyncGenerator[Portal, None]:
"""Open a ``Portal`` through the provided ``channel``. """Open a ``Portal`` through the provided ``channel``.
@ -324,7 +330,7 @@ async def open_portal(
assert actor assert actor
was_connected = False was_connected = False
async with maybe_open_nursery(nursery) as nursery: async with maybe_open_nursery(nursery, shield=shield) as nursery:
if not channel.connected(): if not channel.connected():
await channel.connect() await channel.connect()
was_connected = True was_connected = True
@ -332,15 +338,17 @@ async def open_portal(
if channel.uid is None: if channel.uid is None:
await actor._do_handshake(channel) await actor._do_handshake(channel)
msg_loop_cs: trio.CancelScope = await nursery.start( msg_loop_cs: Optional[trio.CancelScope] = None
partial( if start_msg_loop:
actor._process_messages, msg_loop_cs = await nursery.start(
channel, partial(
# if the local task is cancelled we want to keep actor._process_messages,
# the msg loop running until our block ends channel,
shield=True, # if the local task is cancelled we want to keep
# the msg loop running until our block ends
shield=True,
)
) )
)
portal = Portal(channel) portal = Portal(channel)
try: try:
yield portal yield portal
@ -352,6 +360,7 @@ async def open_portal(
await channel.send(None) await channel.send(None)
# cancel background msg loop task # cancel background msg loop task
msg_loop_cs.cancel() if msg_loop_cs:
msg_loop_cs.cancel()
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()

View File

@ -148,7 +148,7 @@ async def cancel_on_completion(
else: else:
log.info( log.info(
f"Cancelling {portal.channel.uid} gracefully " f"Cancelling {portal.channel.uid} gracefully "
"after result {result}") f"after result {result}")
# cancel the process now that we have a final result # cancel the process now that we have a final result
await portal.cancel_actor() await portal.cancel_actor()
@ -159,7 +159,6 @@ async def spawn_subactor(
subactor: 'Actor', subactor: 'Actor',
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
): ):
spawn_cmd = [ spawn_cmd = [
sys.executable, sys.executable,
"-m", "-m",
@ -184,13 +183,19 @@ async def spawn_subactor(
] ]
proc = await trio.open_process(spawn_cmd) proc = await trio.open_process(spawn_cmd)
yield proc try:
yield proc
finally:
# XXX: do this **after** cancellation/tearfown
# to avoid killing the process too early
# since trio does this internally on ``__aexit__()``
# XXX: do this **after** cancellation/tearfown # NOTE: we always "shield" join sub procs in
# to avoid killing the process too early # the outer scope since no actor zombies are
# since trio does this internally on ``__aexit__()`` # ever allowed. This ``__aexit__()`` also shields
async with proc: # internally.
log.debug(f"Terminating {proc}") async with proc:
log.debug(f"Terminating {proc}")
async def new_proc( async def new_proc(
@ -243,16 +248,21 @@ async def new_proc(
task_status.started(portal) task_status.started(portal)
# wait for ActorNursery.wait() to be called # wait for ActorNursery.wait() to be called
await actor_nursery._join_procs.wait() with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()
if portal in actor_nursery._cancel_after_result_on_exit: if portal in actor_nursery._cancel_after_result_on_exit:
cancel_scope = await nursery.start( cancel_scope = await nursery.start(
cancel_on_completion, portal, subactor, errors) cancel_on_completion, portal, subactor, errors)
# Wait for proc termination but **dont'** yet call # Wait for proc termination but **dont' yet** call
# ``trio.Process.__aexit__()`` (it tears down stdio # ``trio.Process.__aexit__()`` (it tears down stdio
# which will kill any waiting remote pdb trace). # which will kill any waiting remote pdb trace).
await proc.wait()
# always "hard" join sub procs:
# no actor zombies allowed
with trio.CancelScope(shield=True):
await proc.wait()
else: else:
# `multiprocessing` # `multiprocessing`
assert _ctx assert _ctx

View File

@ -18,6 +18,8 @@ from . import _spawn
log = get_logger('tractor') log = get_logger('tractor')
_default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0)
class ActorNursery: class ActorNursery:
"""Spawn scoped subprocess actors. """Spawn scoped subprocess actors.
@ -48,7 +50,7 @@ class ActorNursery:
self, self,
name: str, name: str,
*, *,
bind_addr: Tuple[str, int] = ('127.0.0.1', 0), bind_addr: Tuple[str, int] = _default_bind_addr,
statespace: Optional[Dict[str, Any]] = None, statespace: Optional[Dict[str, Any]] = None,
rpc_module_paths: List[str] = None, rpc_module_paths: List[str] = None,
loglevel: str = None, # set log level per subactor loglevel: str = None, # set log level per subactor
@ -89,7 +91,7 @@ class ActorNursery:
name: str, name: str,
fn: typing.Callable, fn: typing.Callable,
*, *,
bind_addr: Tuple[str, int] = ('127.0.0.1', 0), bind_addr: Tuple[str, int] = _default_bind_addr,
rpc_module_paths: Optional[List[str]] = None, rpc_module_paths: Optional[List[str]] = None,
statespace: Dict[str, Any] = None, statespace: Dict[str, Any] = None,
loglevel: str = None, # set log level per subactor loglevel: str = None, # set log level per subactor

View File

@ -143,7 +143,7 @@ def pub(
- packetizer: ``Callable[[str, Any], Any]`` a callback who receives - packetizer: ``Callable[[str, Any], Any]`` a callback who receives
the topic and value from the publisher function each ``yield`` such that the topic and value from the publisher function each ``yield`` such that
whatever is returned is sent as the published value to subscribers of whatever is returned is sent as the published value to subscribers of
that topic. By default this is a dict ``{topic: value}``. that topic. By default this is a dict ``{topic: str: value: Any}``.
As an example, to make a subscriber call the above function: As an example, to make a subscriber call the above function:
@ -161,7 +161,7 @@ def pub(
task_name='source1', task_name='source1',
) )
) )
async for value in portal.result(): async for value in await portal.result():
print(f"Subscriber received {value}") print(f"Subscriber received {value}")
@ -264,4 +264,5 @@ def pub(
# ``wrapt.decorator`` doesn't seem to want to play nice with its # ``wrapt.decorator`` doesn't seem to want to play nice with its
# whole "adapter" thing... # whole "adapter" thing...
wrapped._tractor_stream_function = True # type: ignore wrapped._tractor_stream_function = True # type: ignore
return wrapper(wrapped) return wrapper(wrapped)

View File

@ -2,7 +2,7 @@ import inspect
import platform import platform
from functools import partial, wraps from functools import partial, wraps
from .. import run from tractor import run
__all__ = ['tractor_test'] __all__ = ['tractor_test']
@ -38,6 +38,7 @@ def tractor_test(fn):
# injects test suite fixture value to test as well # injects test suite fixture value to test as well
# as `run()` # as `run()`
kwargs['arb_addr'] = arb_addr kwargs['arb_addr'] = arb_addr
if 'loglevel' in inspect.signature(fn).parameters: if 'loglevel' in inspect.signature(fn).parameters:
# allows test suites to define a 'loglevel' fixture # allows test suites to define a 'loglevel' fixture
# that activates the internal logging # that activates the internal logging
@ -52,6 +53,7 @@ def tractor_test(fn):
if 'start_method' in inspect.signature(fn).parameters: if 'start_method' in inspect.signature(fn).parameters:
# set of subprocess spawning backends # set of subprocess spawning backends
kwargs['start_method'] = start_method kwargs['start_method'] = start_method
return run( return run(
partial(fn, *args, **kwargs), partial(fn, *args, **kwargs),
arbiter_addr=arb_addr, arbiter_addr=arb_addr,