Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet bce2de9e20 Add example that triggers bug #302 2022-02-17 13:20:05 -05:00
Tyler Goodlet 16e4da6958 Add back in async gen loop 2022-02-17 13:13:18 -05:00
Tyler Goodlet c79363f4a1 Pre-declare disconnected flag 2022-02-17 13:13:18 -05:00
Tyler Goodlet 013e766a16 Avoid attr error XD 2022-02-17 13:13:18 -05:00
Tyler Goodlet 61cc393a1a Type annot updates 2022-02-17 13:13:18 -05:00
Tyler Goodlet ee3dc5123f Drop uneeded backframe traceback hide annotation 2022-02-17 13:13:18 -05:00
Tyler Goodlet f8494de478 Run first soft wait inside a task
Theoretically we can actually concurrently soft wait the process and the
nursery exit event, the only problem at the moment is some pubsub tests
puking. You're right in guessing this isn't really changing anything but
it is meant to be a reminder. If we can add this the spawn task can
report when a process dies earlier then expected and in the longer term
once we remove the `ActorNursery.run_in_actor()` API, we can probably do
away with both the nursery exit event and the portal result fetching.
2022-02-17 13:13:16 -05:00
Tyler Goodlet fc90e1f171 Make `Actor._process_messages()` report disconnects
The method now returns a `bool` which flags whether the transport died
to the caller and allows for reporting a disconnect in the
channel-transport handler task. This is something a user will normally
want to know about on the caller side especially after seeing
a traceback from the peer (if in tree) on console.
2022-02-17 13:12:42 -05:00
Tyler Goodlet 02210d8f8e Only cancel/get-result from a ctx if transport is up
There's no point in sending a cancel message to the remote linked task
and especially no reason to block waiting on a result from that task if
the transport layer is detected to be disconnected. We expect that the
transport shouldn't go down at the layer of the message loop
(reconnection logic should be handled in the transport layer itself) so
if we detect the channel is not connected we don't bother requesting
cancels nor waiting on a final result message.

Why?

- if the connection goes down in error the caller side won't have a way
  to know "how long" it should block to wait for a cancel ack or result
  and causes a potential hang that may require an additional ctrl-c from
  the user especially if using the debugger or if the traceback is not
  seen on console.
- obviously there's no point in waiting for messages when there's no
  transport to deliver them XD

Further, add some more detailed cancel logging detailing the task and
actor ids.
2022-02-17 13:12:42 -05:00
Tyler Goodlet 194c0e120d Drop high log level in ctx example 2022-02-17 13:12:42 -05:00
7 changed files with 234 additions and 102 deletions

View File

@ -6,6 +6,7 @@ async def gen():
yield 'yo' yield 'yo'
await tractor.breakpoint() await tractor.breakpoint()
yield 'yo' yield 'yo'
await tractor.breakpoint()
@tractor.context @tractor.context
@ -13,35 +14,35 @@ async def just_bp(
ctx: tractor.Context, ctx: tractor.Context,
) -> None: ) -> None:
await ctx.started('yo bpin here') await ctx.started()
await tractor.breakpoint() await tractor.breakpoint()
# async for val in gen(): # TODO: bps and errors in this call..
# print(val) async for val in gen():
print(val)
await trio.sleep(0.5) # await trio.sleep(0.5)
# THIS CAUSES AN UNRECOVERABLE HANG!? # prematurely destroy the connection
await ctx.chan.aclose()
# THIS CAUSES AN UNRECOVERABLE HANG
# without latest ``pdbpp``:
assert 0 assert 0
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
loglevel='transport',
debug_mode=True, debug_mode=True,
) as n: ) as n:
p = await n.start_actor( p = await n.start_actor(
'bp_boi', 'bp_boi',
enable_modules=[__name__], enable_modules=[__name__],
# debug_mode=True,
) )
async with p.open_context( async with p.open_context(
just_bp, just_bp,
) as (ctx, first): ) as (ctx, first):
# await tractor.breakpoint()
# breakpoint()
await trio.sleep_forever() await trio.sleep_forever()

View File

@ -0,0 +1,49 @@
import trio
import click
import tractor
import pydantic
# from multiprocessing import shared_memory
@tractor.context
async def just_sleep(
ctx: tractor.Context,
**kwargs,
) -> None:
'''
Test a small ping-pong 2-way streaming server.
'''
await ctx.started()
await trio.sleep_forever()
async def main() -> None:
proc = await trio.open_process( (
'python',
'-c',
'import trio; trio.run(trio.sleep_forever)',
))
await proc.wait()
# await trio.sleep_forever()
# async with tractor.open_nursery() as n:
# portal = await n.start_actor(
# 'rpc_server',
# enable_modules=[__name__],
# )
# async with portal.open_context(
# just_sleep, # taken from pytest parameterization
# ) as (ctx, sent):
# await trio.sleep_forever()
if __name__ == '__main__':
import time
# time.sleep(999)
trio.run(main)

View File

@ -26,8 +26,11 @@ import importlib
import importlib.util import importlib.util
import inspect import inspect
import uuid import uuid
import typing from typing import (
from typing import Any, Optional, Union Any, Optional,
Union, TYPE_CHECKING,
Callable,
)
from types import ModuleType from types import ModuleType
import sys import sys
import os import os
@ -57,6 +60,10 @@ from . import _state
from . import _mp_fixup_main from . import _mp_fixup_main
if TYPE_CHECKING:
from ._supervise import ActorNursery
log = get_logger('tractor') log = get_logger('tractor')
@ -65,7 +72,7 @@ async def _invoke(
actor: 'Actor', actor: 'Actor',
cid: str, cid: str,
chan: Channel, chan: Channel,
func: typing.Callable, func: Callable,
kwargs: dict[str, Any], kwargs: dict[str, Any],
is_rpc: bool = True, is_rpc: bool = True,
@ -200,7 +207,7 @@ async def _invoke(
ctx = actor._contexts.pop((chan.uid, cid)) ctx = actor._contexts.pop((chan.uid, cid))
if ctx: if ctx:
log.runtime( log.runtime(
f'Context entrypoint for {func} was terminated:\n{ctx}' f'Context entrypoint {func} was terminated:\n{ctx}'
) )
assert cs assert cs
@ -316,7 +323,9 @@ async def try_ship_error_to_parent(
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
): ):
log.error( # in SC terms this is one of the worst things that can
# happen and creates the 2-general's dilemma.
log.critical(
f"Failed to ship error to parent " f"Failed to ship error to parent "
f"{channel.uid}, channel was closed" f"{channel.uid}, channel was closed"
) )
@ -424,7 +433,7 @@ class Actor:
# (chan, cid) -> (cancel_scope, func) # (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: dict[ self._rpc_tasks: dict[
tuple[Channel, str], tuple[Channel, str],
tuple[trio.CancelScope, typing.Callable, trio.Event] tuple[trio.CancelScope, Callable, trio.Event]
] = {} ] = {}
# map {actor uids -> Context} # map {actor uids -> Context}
@ -513,6 +522,7 @@ class Actor:
self._no_more_peers = trio.Event() # unset self._no_more_peers = trio.Event() # unset
chan = Channel.from_stream(stream) chan = Channel.from_stream(stream)
uid: Optional[tuple[str, str]] = chan.uid
log.runtime(f"New connection to us {chan}") log.runtime(f"New connection to us {chan}")
# send/receive initial handshake response # send/receive initial handshake response
@ -560,33 +570,51 @@ class Actor:
# append new channel # append new channel
self._peers[uid].append(chan) self._peers[uid].append(chan)
local_nursery: Optional[ActorNursery] = None # noqa
disconnected: bool = False
# Begin channel management - respond to remote requests and # Begin channel management - respond to remote requests and
# process received reponses. # process received reponses.
try: try:
await self._process_messages(chan) disconnected = await self._process_messages(chan)
except trio.Cancelled: except (
trio.Cancelled,
):
log.cancel(f"Msg loop was cancelled for {chan}") log.cancel(f"Msg loop was cancelled for {chan}")
raise raise
finally: finally:
local_nursery = self._actoruid2nursery.get(uid, local_nursery)
# This is set in ``Portal.cancel_actor()``. So if # This is set in ``Portal.cancel_actor()``. So if
# the peer was cancelled we try to wait for them # the peer was cancelled we try to wait for them
# to tear down their side of the connection before # to tear down their side of the connection before
# moving on with closing our own side. # moving on with closing our own side.
local_nursery = self._actoruid2nursery.get(chan.uid)
if ( if (
local_nursery local_nursery
): ):
if disconnected:
# if the transport died and this actor is still
# registered within a local nursery, we report that the
# IPC layer may have failed unexpectedly since it may be
# the cause of other downstream errors.
entry = local_nursery._children.get(uid)
if entry:
_, proc, _ = entry
log.error(f'Actor {uid}@{proc} IPC connection broke!?')
# if proc.poll() is not None:
# log.error('Actor {uid} proc died and IPC broke?')
log.cancel(f"Waiting on cancel request to peer {chan.uid}") log.cancel(f"Waiting on cancel request to peer {chan.uid}")
# XXX: this is a soft wait on the channel (and its # XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the remote # underlying transport protocol) to close from the
# peer side since we presume that any channel which # remote peer side since we presume that any channel
# is mapped to a sub-actor (i.e. it's managed by # which is mapped to a sub-actor (i.e. it's managed by
# one of our local nurseries) # one of our local nurseries) has a message is sent to
# message is sent to the peer likely by this actor which is # the peer likely by this actor (which is now in
# now in a cancelled condition) when the local runtime here # a cancelled condition) when the local runtime here is
# is now cancelled while (presumably) in the middle of msg # now cancelled while (presumably) in the middle of msg
# loop processing. # loop processing.
with trio.move_on_after(0.5) as cs: with trio.move_on_after(0.5) as cs:
cs.shield = True cs.shield = True
@ -609,6 +637,8 @@ class Actor:
await local_nursery.exited.wait() await local_nursery.exited.wait()
# if local_nursery._children
# ``Channel`` teardown and closure sequence # ``Channel`` teardown and closure sequence
# Drop ref to channel so it can be gc-ed and disconnected # Drop ref to channel so it can be gc-ed and disconnected
@ -618,7 +648,7 @@ class Actor:
if not chans: if not chans:
log.runtime(f"No more channels for {chan.uid}") log.runtime(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None) self._peers.pop(uid, None)
# for (uid, cid) in self._contexts.copy(): # for (uid, cid) in self._contexts.copy():
# if chan.uid == uid: # if chan.uid == uid:
@ -626,11 +656,13 @@ class Actor:
log.runtime(f"Peers is {self._peers}") log.runtime(f"Peers is {self._peers}")
if not self._peers: # no more channels connected # No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channels") log.runtime("Signalling no more peer channels")
self._no_more_peers.set() self._no_more_peers.set()
# # XXX: is this necessary (GC should do it?) # XXX: is this necessary (GC should do it)?
if chan.connected(): if chan.connected():
# if the channel is still connected it may mean the far # if the channel is still connected it may mean the far
# end has not closed and we may have gotten here due to # end has not closed and we may have gotten here due to
@ -665,8 +697,8 @@ class Actor:
ctx = self._contexts[(uid, cid)] ctx = self._contexts[(uid, cid)]
except KeyError: except KeyError:
log.warning( log.warning(
f'Ignoring msg from [no-longer/un]known context with {uid}:' f'Ignoring msg from [no-longer/un]known context {uid}:'
f'\n{msg}') f'\n{msg}')
return return
send_chan = ctx._send_chan send_chan = ctx._send_chan
@ -813,7 +845,7 @@ class Actor:
shield: bool = False, shield: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> bool:
''' '''
Process messages for the channel async-RPC style. Process messages for the channel async-RPC style.
@ -839,7 +871,7 @@ class Actor:
if msg is None: # loop terminate sentinel if msg is None: # loop terminate sentinel
log.cancel( log.cancel(
f"Channerl to {chan.uid} terminated?\n" f"Channel to {chan.uid} terminated?\n"
"Cancelling all associated tasks..") "Cancelling all associated tasks..")
for (channel, cid) in self._rpc_tasks.copy(): for (channel, cid) in self._rpc_tasks.copy():
@ -986,6 +1018,9 @@ class Actor:
# up. # up.
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
# transport **was** disconnected
return True
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
if nursery_cancelled_before_task: if nursery_cancelled_before_task:
sn = self._service_n sn = self._service_n
@ -1010,6 +1045,9 @@ class Actor:
f"Exiting msg loop for {chan} from {chan.uid} " f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}") f"with last msg:\n{msg}")
# transport **was not** disconnected
return False
async def _from_parent( async def _from_parent(
self, self,
parent_addr: Optional[tuple[str, int]], parent_addr: Optional[tuple[str, int]],

View File

@ -49,7 +49,8 @@ try:
except ImportError: except ImportError:
# pdbpp is installed in regular mode...it monkey patches stuff # pdbpp is installed in regular mode...it monkey patches stuff
import pdb import pdb
assert pdb.xpm, "pdbpp is not installed?" # type: ignore xpm = getattr(pdb, 'xpm', None)
assert xpm, "pdbpp is not installed?" # type: ignore
pdbpp = pdb pdbpp = pdb
log = get_logger(__name__) log = get_logger(__name__)
@ -259,16 +260,11 @@ async def _hijack_stdin_for_child(
orig_handler = signal.signal( orig_handler = signal.signal(
signal.SIGINT, signal.SIGINT,
shield_sigint, shield_sigint,
# partial(shield_sigint, pdb=pdb),
) )
# try:
# yield
try: try:
with ( with (
trio.CancelScope(shield=True), trio.CancelScope(shield=True),
# disable_sigint(),
): ):
try: try:
lock = None lock = None
async with _acquire_debug_lock(subactor_uid) as lock: async with _acquire_debug_lock(subactor_uid) as lock:
@ -584,10 +580,6 @@ def shield_sigint(
''' '''
__tracebackhide__ = True __tracebackhide__ = True
frame = sys._getframe()
last_f = frame.f_back
last_f.f_globals['__tracebackhide__'] = True
global _local_task_in_debug, _global_actor_in_debug global _local_task_in_debug, _global_actor_in_debug
in_debug = _global_actor_in_debug in_debug = _global_actor_in_debug
@ -604,6 +596,7 @@ def shield_sigint(
log.pdb( log.pdb(
f"Ignoring SIGINT while child in debug mode: `{in_debug}`" f"Ignoring SIGINT while child in debug mode: `{in_debug}`"
) )
else: else:
log.pdb( log.pdb(
"Ignoring SIGINT while in debug mode" "Ignoring SIGINT while in debug mode"

View File

@ -24,7 +24,8 @@ import importlib
import inspect import inspect
from typing import ( from typing import (
Any, Optional, Any, Optional,
Callable, AsyncGenerator Callable, AsyncGenerator,
Type,
) )
from functools import partial from functools import partial
from dataclasses import dataclass from dataclasses import dataclass
@ -442,6 +443,10 @@ class Portal:
_err: Optional[BaseException] = None _err: Optional[BaseException] = None
ctx._portal = self ctx._portal = self
uid = self.channel.uid
cid = ctx.cid
etype: Optional[Type[BaseException]] = None
# deliver context instance and .started() msg value in open tuple. # deliver context instance and .started() msg value in open tuple.
try: try:
async with trio.open_nursery() as scope_nursery: async with trio.open_nursery() as scope_nursery:
@ -477,13 +482,24 @@ class Portal:
# KeyboardInterrupt, # KeyboardInterrupt,
) as err: ) as err:
_err = err etype = type(err)
# the context cancels itself on any cancel # the context cancels itself on any cancel
# causing error. # causing error.
log.cancel(
f'Context to {self.channel.uid} sending cancel request..')
await ctx.cancel() if ctx.chan.connected():
log.cancel(
'Context cancelled for task, sending cancel request..\n'
f'task:{cid}\n'
f'actor:{uid}'
)
await ctx.cancel()
else:
log.warning(
'IPC connection for context is broken?\n'
f'task:{cid}\n'
f'actor:{uid}'
)
raise raise
finally: finally:
@ -492,7 +508,13 @@ class Portal:
# sure we get the error the underlying feeder mem chan. # sure we get the error the underlying feeder mem chan.
# if it's not raised here it *should* be raised from the # if it's not raised here it *should* be raised from the
# msg loop nursery right? # msg loop nursery right?
result = await ctx.result() if ctx.chan.connected():
log.info(
'Waiting on final context-task result for\n'
f'task:{cid}\n'
f'actor:{uid}'
)
result = await ctx.result()
# though it should be impossible for any tasks # though it should be impossible for any tasks
# operating *in* this scope to have survived # operating *in* this scope to have survived
@ -502,14 +524,17 @@ class Portal:
# should we encapsulate this in the context api? # should we encapsulate this in the context api?
await ctx._recv_chan.aclose() await ctx._recv_chan.aclose()
if _err: if etype:
if ctx._cancel_called: if ctx._cancel_called:
log.cancel( log.cancel(
f'Context {fn_name} cancelled by caller with\n{_err}' f'Context {fn_name} cancelled by caller with\n{etype}'
) )
elif _err is not None: elif _err is not None:
log.cancel( log.cancel(
f'Context {fn_name} cancelled by callee with\n{_err}' f'Context for task cancelled by callee with {etype}\n'
f'target: `{fn_name}`\n'
f'task:{cid}\n'
f'actor:{uid}'
) )
else: else:
log.runtime( log.runtime(

View File

@ -18,17 +18,30 @@
Machinery for actor process spawning using multiple backends. Machinery for actor process spawning using multiple backends.
""" """
from __future__ import annotations
import sys import sys
import multiprocessing as mp
import platform import platform
from typing import ( from typing import (
Any, Optional, Callable, TypeVar, TYPE_CHECKING Any, Dict, Optional, Callable,
TypeVar,
) )
from collections.abc import Awaitable from collections.abc import Awaitable
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
try:
from multiprocessing import semaphore_tracker # type: ignore
resource_tracker = semaphore_tracker
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker
except ImportError:
# 3.8 introduces a more general version that also tracks shared mems
from multiprocessing import resource_tracker # type: ignore
from multiprocessing import forkserver # type: ignore
from typing import Tuple
from . import _forkserver_override
from ._debug import ( from ._debug import (
maybe_wait_for_debugger, maybe_wait_for_debugger,
acquire_debug_lock, acquire_debug_lock,
@ -47,11 +60,8 @@ from ._entry import _mp_main
from ._exceptions import ActorFailure from ._exceptions import ActorFailure
if TYPE_CHECKING:
import multiprocessing as mp
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
log = get_logger('tractor') log = get_logger('tractor')
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
# placeholder for an mp start context if so using that backend # placeholder for an mp start context if so using that backend
_ctx: Optional[mp.context.BaseContext] = None _ctx: Optional[mp.context.BaseContext] = None
@ -60,7 +70,6 @@ _spawn_method: str = "trio"
if platform.system() == 'Windows': if platform.system() == 'Windows':
import multiprocessing as mp
_ctx = mp.get_context("spawn") _ctx = mp.get_context("spawn")
async def proc_waiter(proc: mp.Process) -> None: async def proc_waiter(proc: mp.Process) -> None:
@ -83,7 +92,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
``subprocess.Popen``. ``subprocess.Popen``.
''' '''
import multiprocessing as mp
global _ctx global _ctx
global _spawn_method global _spawn_method
@ -100,7 +108,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
f"Spawn method `{name}` is invalid please choose one of {methods}" f"Spawn method `{name}` is invalid please choose one of {methods}"
) )
elif name == 'forkserver': elif name == 'forkserver':
from . import _forkserver_override
_forkserver_override.override_stdlib() _forkserver_override.override_stdlib()
_ctx = mp.get_context(name) _ctx = mp.get_context(name)
elif name == 'trio': elif name == 'trio':
@ -148,7 +155,7 @@ async def cancel_on_completion(
portal: Portal, portal: Portal,
actor: Actor, actor: Actor,
errors: dict[tuple[str, str], Exception], errors: Dict[Tuple[str, str], Exception],
) -> None: ) -> None:
''' '''
@ -251,12 +258,12 @@ async def new_proc(
name: str, name: str,
actor_nursery: 'ActorNursery', # type: ignore # noqa actor_nursery: 'ActorNursery', # type: ignore # noqa
subactor: Actor, subactor: Actor,
errors: dict[tuple[str, str], Exception], errors: Dict[Tuple[str, str], Exception],
# passed through to actor main # passed through to actor main
bind_addr: tuple[str, int], bind_addr: Tuple[str, int],
parent_addr: tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child _runtime_vars: Dict[str, Any], # serialized and sent to _child
*, *,
@ -288,7 +295,7 @@ async def new_proc(
# the OS; it otherwise can be passed via the parent channel if # the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy). # we prefer in the future (for privacy).
"--uid", "--uid",
str(subactor.uid), str(uid),
# Address the child must connect to on startup # Address the child must connect to on startup
"--parent_addr", "--parent_addr",
str(parent_addr) str(parent_addr)
@ -314,8 +321,7 @@ async def new_proc(
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer( event, chan = await actor_nursery._actor.wait_for_peer(uid)
subactor.uid)
except trio.Cancelled: except trio.Cancelled:
cancelled_during_spawn = True cancelled_during_spawn = True
@ -356,10 +362,54 @@ async def new_proc(
task_status.started(portal) task_status.started(portal)
# wait for ActorNursery.wait() to be called # wait for ActorNursery.wait() to be called
n_exited = actor_nursery._join_procs
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait() await n_exited.wait()
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
async def soft_wait_and_maybe_cancel_ria_task():
# This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled``
# condition.
await soft_wait(
proc,
trio.Process.wait,
portal
)
if n_exited.is_set():
# cancel result waiter that may have been spawned in
# tandem if not done already
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}"
)
nursery.cancel_scope.cancel()
else:
log.warning(
f'Process for actor {uid} terminated before'
'nursery exit. ' 'This may mean an IPC'
'connection failed!'
)
nursery.start_soon(soft_wait_and_maybe_cancel_ria_task)
# TODO: when we finally remove the `.run_in_actor()` api
# we should be able to entirely drop these 2 blocking calls:
# - we don't need to wait on nursery exit to capture
# process-spawn-machinery level errors (and propagate them).
# - we don't need to wait on final results from ria portals
# since this will be done in some higher level wrapper API.
# XXX: interestingly we can't put this here bc it causes
# the pub-sub tests to fail? wth.. should probably drop
# those XD
# wait for ActorNursery.wait() to be called
# with trio.CancelScope(shield=True):
# await n_exited.wait()
if portal in actor_nursery._cancel_after_result_on_exit: if portal in actor_nursery._cancel_after_result_on_exit:
nursery.start_soon( nursery.start_soon(
cancel_on_completion, cancel_on_completion,
@ -368,22 +418,6 @@ async def new_proc(
errors errors
) )
# This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled``
# condition.
await soft_wait(
proc,
trio.Process.wait,
portal
)
# cancel result waiter that may have been spawned in
# tandem if not done already
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
nursery.cancel_scope.cancel()
finally: finally:
# The "hard" reap since no actor zombies are allowed! # The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid # XXX: do this **after** cancellation/tearfown to avoid
@ -400,9 +434,10 @@ async def new_proc(
await proc.wait() await proc.wait()
if is_root_process(): if is_root_process():
await maybe_wait_for_debugger( await maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get( child_in_debug=_runtime_vars.get(
'_debug_mode', False), '_debug_mode', False)
) )
if proc.poll() is None: if proc.poll() is None:
@ -441,30 +476,20 @@ async def mp_new_proc(
name: str, name: str,
actor_nursery: 'ActorNursery', # type: ignore # noqa actor_nursery: 'ActorNursery', # type: ignore # noqa
subactor: Actor, subactor: Actor,
errors: dict[tuple[str, str], Exception], errors: Dict[Tuple[str, str], Exception],
# passed through to actor main # passed through to actor main
bind_addr: tuple[str, int], bind_addr: Tuple[str, int],
parent_addr: tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child _runtime_vars: Dict[str, Any], # serialized and sent to _child
*, *,
infect_asyncio: bool = False, infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
# uggh zone
try:
from multiprocessing import semaphore_tracker # type: ignore
resource_tracker = semaphore_tracker
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker # noqa
except ImportError:
# 3.8 introduces a more general version that also tracks shared mems
from multiprocessing import resource_tracker # type: ignore
assert _ctx assert _ctx
start_method = _ctx.get_start_method() start_method = _ctx.get_start_method()
if start_method == 'forkserver': if start_method == 'forkserver':
from multiprocessing import forkserver # type: ignore
# XXX do our hackery on the stdlib to avoid multiple # XXX do our hackery on the stdlib to avoid multiple
# forkservers (one at each subproc layer). # forkservers (one at each subproc layer).
fs = forkserver._forkserver fs = forkserver._forkserver

View File

@ -604,7 +604,8 @@ class Context:
self._portal._streams.remove(rchan) self._portal._streams.remove(rchan)
async def result(self) -> Any: async def result(self) -> Any:
'''From a caller side, wait for and return the final result from '''
From a caller side, wait for and return the final result from
the callee side task. the callee side task.
''' '''