Compare commits
	
		
			No commits in common. "534277daa5d699bf1c9c53865c1c65564a60ac19" and "872c47213a3b6f4bc6eea019ff6495f994e8a841" have entirely different histories. 
		
	
	
		
			534277daa5
			...
			872c47213a
		
	
		|  | @ -6,7 +6,6 @@ async def gen(): | |||
|     yield 'yo' | ||||
|     await tractor.breakpoint() | ||||
|     yield 'yo' | ||||
|     await tractor.breakpoint() | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
|  | @ -14,35 +13,35 @@ async def just_bp( | |||
|     ctx: tractor.Context, | ||||
| ) -> None: | ||||
| 
 | ||||
|     await ctx.started() | ||||
|     await ctx.started('yo bpin here') | ||||
|     await tractor.breakpoint() | ||||
| 
 | ||||
|     # TODO: bps and errors in this call.. | ||||
|     async for val in gen(): | ||||
|         print(val) | ||||
|     # async for val in gen(): | ||||
|     #     print(val) | ||||
| 
 | ||||
|     # await trio.sleep(0.5) | ||||
|     await trio.sleep(0.5) | ||||
| 
 | ||||
|     # prematurely destroy the connection | ||||
|     await ctx.chan.aclose() | ||||
| 
 | ||||
|     # THIS CAUSES AN UNRECOVERABLE HANG | ||||
|     # without latest ``pdbpp``: | ||||
|     # THIS CAUSES AN UNRECOVERABLE HANG!? | ||||
|     assert 0 | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| async def main(): | ||||
|     async with tractor.open_nursery( | ||||
|         loglevel='transport', | ||||
|         debug_mode=True, | ||||
|     ) as n: | ||||
|         p = await n.start_actor( | ||||
|             'bp_boi', | ||||
|             enable_modules=[__name__], | ||||
|             # debug_mode=True, | ||||
|         ) | ||||
|         async with p.open_context( | ||||
|             just_bp, | ||||
|         ) as (ctx, first): | ||||
| 
 | ||||
|             # await tractor.breakpoint() | ||||
|             # breakpoint() | ||||
|             await trio.sleep_forever() | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
|  | @ -26,11 +26,8 @@ import importlib | |||
| import importlib.util | ||||
| import inspect | ||||
| import uuid | ||||
| from typing import ( | ||||
|     Any, Optional, | ||||
|     Union, TYPE_CHECKING, | ||||
|     Callable, | ||||
| ) | ||||
| import typing | ||||
| from typing import Any, Optional, Union | ||||
| from types import ModuleType | ||||
| import sys | ||||
| import os | ||||
|  | @ -60,10 +57,6 @@ from . import _state | |||
| from . import _mp_fixup_main | ||||
| 
 | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from ._supervise import ActorNursery | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger('tractor') | ||||
| 
 | ||||
| 
 | ||||
|  | @ -72,7 +65,7 @@ async def _invoke( | |||
|     actor: 'Actor', | ||||
|     cid: str, | ||||
|     chan: Channel, | ||||
|     func: Callable, | ||||
|     func: typing.Callable, | ||||
|     kwargs: dict[str, Any], | ||||
| 
 | ||||
|     is_rpc: bool = True, | ||||
|  | @ -207,7 +200,7 @@ async def _invoke( | |||
|                 ctx = actor._contexts.pop((chan.uid, cid)) | ||||
|                 if ctx: | ||||
|                     log.runtime( | ||||
|                         f'Context entrypoint {func} was terminated:\n{ctx}' | ||||
|                         f'Context entrypoint for {func} was terminated:\n{ctx}' | ||||
|                     ) | ||||
| 
 | ||||
|             assert cs | ||||
|  | @ -323,9 +316,7 @@ async def try_ship_error_to_parent( | |||
|             trio.ClosedResourceError, | ||||
|             trio.BrokenResourceError, | ||||
|         ): | ||||
|             # in SC terms this is one of the worst things that can | ||||
|             # happen and creates the 2-general's dilemma. | ||||
|             log.critical( | ||||
|             log.error( | ||||
|                 f"Failed to ship error to parent " | ||||
|                 f"{channel.uid}, channel was closed" | ||||
|             ) | ||||
|  | @ -433,7 +424,7 @@ class Actor: | |||
|         # (chan, cid) -> (cancel_scope, func) | ||||
|         self._rpc_tasks: dict[ | ||||
|             tuple[Channel, str], | ||||
|             tuple[trio.CancelScope, Callable, trio.Event] | ||||
|             tuple[trio.CancelScope, typing.Callable, trio.Event] | ||||
|         ] = {} | ||||
| 
 | ||||
|         # map {actor uids -> Context} | ||||
|  | @ -522,7 +513,6 @@ class Actor: | |||
|         self._no_more_peers = trio.Event()  # unset | ||||
| 
 | ||||
|         chan = Channel.from_stream(stream) | ||||
|         uid: Optional[tuple[str, str]] = chan.uid | ||||
|         log.runtime(f"New connection to us {chan}") | ||||
| 
 | ||||
|         # send/receive initial handshake response | ||||
|  | @ -570,51 +560,33 @@ class Actor: | |||
|         # append new channel | ||||
|         self._peers[uid].append(chan) | ||||
| 
 | ||||
|         local_nursery: Optional[ActorNursery] = None  # noqa | ||||
|         disconnected: bool = False | ||||
| 
 | ||||
|         # Begin channel management - respond to remote requests and | ||||
|         # process received reponses. | ||||
|         try: | ||||
|             disconnected = await self._process_messages(chan) | ||||
|             await self._process_messages(chan) | ||||
| 
 | ||||
|         except ( | ||||
|             trio.Cancelled, | ||||
|         ): | ||||
|         except trio.Cancelled: | ||||
|             log.cancel(f"Msg loop was cancelled for {chan}") | ||||
|             raise | ||||
| 
 | ||||
|         finally: | ||||
|             local_nursery = self._actoruid2nursery.get(uid, local_nursery) | ||||
| 
 | ||||
|             # This is set in ``Portal.cancel_actor()``. So if | ||||
|             # the peer was cancelled we try to wait for them | ||||
|             # to tear down their side of the connection before | ||||
|             # moving on with closing our own side. | ||||
|             local_nursery = self._actoruid2nursery.get(chan.uid) | ||||
|             if ( | ||||
|                 local_nursery | ||||
|             ): | ||||
|                 if disconnected: | ||||
|                     # if the transport died and this actor is still | ||||
|                     # registered within a local nursery, we report that the | ||||
|                     # IPC layer may have failed unexpectedly since it may be | ||||
|                     # the cause of other downstream errors. | ||||
|                     entry = local_nursery._children.get(uid) | ||||
|                     if entry: | ||||
|                         _, proc, _ = entry | ||||
|                         log.error(f'Actor {uid}@{proc} IPC connection broke!?') | ||||
|                         # if proc.poll() is not None: | ||||
|                         #     log.error('Actor {uid} proc died and IPC broke?') | ||||
| 
 | ||||
|                 log.cancel(f"Waiting on cancel request to peer {chan.uid}") | ||||
|                 # XXX: this is a soft wait on the channel (and its | ||||
|                 # underlying transport protocol) to close from the | ||||
|                 # remote peer side since we presume that any channel | ||||
|                 # which is mapped to a sub-actor (i.e. it's managed by | ||||
|                 # one of our local nurseries) has a message is sent to | ||||
|                 # the peer likely by this actor (which is now in | ||||
|                 # a cancelled condition) when the local runtime here is | ||||
|                 # now cancelled while (presumably) in the middle of msg | ||||
|                 # underlying transport protocol) to close from the remote | ||||
|                 # peer side since we presume that any channel which | ||||
|                 # is mapped to a sub-actor (i.e. it's managed by | ||||
|                 # one of our local nurseries) | ||||
|                 # message is sent to the peer likely by this actor which is | ||||
|                 # now in a cancelled condition) when the local runtime here | ||||
|                 # is now cancelled while (presumably) in the middle of msg | ||||
|                 # loop processing. | ||||
|                 with trio.move_on_after(0.5) as cs: | ||||
|                     cs.shield = True | ||||
|  | @ -637,8 +609,6 @@ class Actor: | |||
| 
 | ||||
|                     await local_nursery.exited.wait() | ||||
| 
 | ||||
|                 # if local_nursery._children | ||||
| 
 | ||||
|             # ``Channel`` teardown and closure sequence | ||||
| 
 | ||||
|             # Drop ref to channel so it can be gc-ed and disconnected | ||||
|  | @ -648,7 +618,7 @@ class Actor: | |||
| 
 | ||||
|             if not chans: | ||||
|                 log.runtime(f"No more channels for {chan.uid}") | ||||
|                 self._peers.pop(uid, None) | ||||
|                 self._peers.pop(chan.uid, None) | ||||
| 
 | ||||
|                 # for (uid, cid) in self._contexts.copy(): | ||||
|                 #     if chan.uid == uid: | ||||
|  | @ -656,13 +626,11 @@ class Actor: | |||
| 
 | ||||
|             log.runtime(f"Peers is {self._peers}") | ||||
| 
 | ||||
|             # No more channels to other actors (at all) registered | ||||
|             # as connected. | ||||
|             if not self._peers: | ||||
|             if not self._peers:  # no more channels connected | ||||
|                 log.runtime("Signalling no more peer channels") | ||||
|                 self._no_more_peers.set() | ||||
| 
 | ||||
|             # XXX: is this necessary (GC should do it)? | ||||
|             # # XXX: is this necessary (GC should do it?) | ||||
|             if chan.connected(): | ||||
|                 # if the channel is still connected it may mean the far | ||||
|                 # end has not closed and we may have gotten here due to | ||||
|  | @ -697,8 +665,8 @@ class Actor: | |||
|             ctx = self._contexts[(uid, cid)] | ||||
|         except KeyError: | ||||
|             log.warning( | ||||
|                 f'Ignoring msg from [no-longer/un]known context {uid}:' | ||||
|                 f'\n{msg}') | ||||
|                     f'Ignoring msg from [no-longer/un]known context with {uid}:' | ||||
|                     f'\n{msg}') | ||||
|             return | ||||
| 
 | ||||
|         send_chan = ctx._send_chan | ||||
|  | @ -845,7 +813,7 @@ class Actor: | |||
|         shield: bool = False, | ||||
|         task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, | ||||
| 
 | ||||
|     ) -> bool: | ||||
|     ) -> None: | ||||
|         ''' | ||||
|         Process messages for the channel async-RPC style. | ||||
| 
 | ||||
|  | @ -871,7 +839,7 @@ class Actor: | |||
|                     if msg is None:  # loop terminate sentinel | ||||
| 
 | ||||
|                         log.cancel( | ||||
|                             f"Channel to {chan.uid} terminated?\n" | ||||
|                             f"Channerl to {chan.uid} terminated?\n" | ||||
|                             "Cancelling all associated tasks..") | ||||
| 
 | ||||
|                         for (channel, cid) in self._rpc_tasks.copy(): | ||||
|  | @ -1018,9 +986,6 @@ class Actor: | |||
|             # up. | ||||
|             log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') | ||||
| 
 | ||||
|             # transport **was** disconnected | ||||
|             return True | ||||
| 
 | ||||
|         except (Exception, trio.MultiError) as err: | ||||
|             if nursery_cancelled_before_task: | ||||
|                 sn = self._service_n | ||||
|  | @ -1045,9 +1010,6 @@ class Actor: | |||
|                 f"Exiting msg loop for {chan} from {chan.uid} " | ||||
|                 f"with last msg:\n{msg}") | ||||
| 
 | ||||
|         # transport **was not** disconnected | ||||
|         return False | ||||
| 
 | ||||
|     async def _from_parent( | ||||
|         self, | ||||
|         parent_addr: Optional[tuple[str, int]], | ||||
|  |  | |||
|  | @ -31,7 +31,6 @@ from typing import ( | |||
|     AsyncIterator, | ||||
|     AsyncGenerator, | ||||
| ) | ||||
| from types import FrameType | ||||
| 
 | ||||
| import tractor | ||||
| import trio | ||||
|  | @ -49,8 +48,7 @@ try: | |||
| except ImportError: | ||||
|     # pdbpp is installed in regular mode...it monkey patches stuff | ||||
|     import pdb | ||||
|     xpm = getattr(pdb, 'xpm', None) | ||||
|     assert xpm, "pdbpp is not installed?"  # type: ignore | ||||
|     assert pdb.xpm, "pdbpp is not installed?"  # type: ignore | ||||
|     pdbpp = pdb | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
|  | @ -260,11 +258,16 @@ async def _hijack_stdin_for_child( | |||
|     orig_handler = signal.signal( | ||||
|         signal.SIGINT, | ||||
|         shield_sigint, | ||||
|         # partial(shield_sigint, pdb=pdb), | ||||
|     ) | ||||
| #     try: | ||||
| #         yield | ||||
|     try: | ||||
|         with ( | ||||
|             trio.CancelScope(shield=True), | ||||
|             # disable_sigint(), | ||||
|         ): | ||||
| 
 | ||||
|             try: | ||||
|                 lock = None | ||||
|                 async with _acquire_debug_lock(subactor_uid) as lock: | ||||
|  | @ -377,7 +380,7 @@ async def wait_for_parent_stdin_hijack( | |||
|             log.debug(f"Child {actor_uid} released parent stdio lock") | ||||
| 
 | ||||
| 
 | ||||
| def mk_mpdb() -> tuple[MultiActorPdb, Callable]: | ||||
| def mk_mpdb() -> (MultiActorPdb, Callable): | ||||
| 
 | ||||
|     pdb = MultiActorPdb() | ||||
|     signal.signal = pdbpp.hideframe(signal.signal) | ||||
|  | @ -531,10 +534,9 @@ async def _breakpoint( | |||
| 
 | ||||
|         _pdb_release_hook = teardown | ||||
| 
 | ||||
|     # frame = sys._getframe() | ||||
|     # last_f = frame.f_back | ||||
|     # last_f.f_globals['__tracebackhide__'] = True | ||||
| 
 | ||||
|     frame = sys._getframe() | ||||
|     last_f = frame.f_back | ||||
|     last_f.f_globals['__tracebackhide__'] = True | ||||
|     try: | ||||
|         # block here one (at the appropriate frame *up*) where | ||||
|         # ``breakpoint()`` was awaited and begin handling stdio. | ||||
|  | @ -580,6 +582,10 @@ def shield_sigint( | |||
|     ''' | ||||
|     __tracebackhide__ = True | ||||
| 
 | ||||
|     frame = sys._getframe() | ||||
|     last_f = frame.f_back | ||||
|     last_f.f_globals['__tracebackhide__'] = True | ||||
| 
 | ||||
|     global _local_task_in_debug, _global_actor_in_debug | ||||
|     in_debug = _global_actor_in_debug | ||||
| 
 | ||||
|  | @ -596,7 +602,6 @@ def shield_sigint( | |||
|             log.pdb( | ||||
|                 f"Ignoring SIGINT while child in debug mode: `{in_debug}`" | ||||
|             ) | ||||
| 
 | ||||
|         else: | ||||
|             log.pdb( | ||||
|                 "Ignoring SIGINT while in debug mode" | ||||
|  | @ -653,25 +658,24 @@ def shield_sigint( | |||
| 
 | ||||
| 
 | ||||
| def _set_trace( | ||||
|     actor: Optional[tractor._actor.Actor] = None, | ||||
|     actor: Optional[tractor.Actor] = None, | ||||
|     pdb: Optional[MultiActorPdb] = None, | ||||
| ): | ||||
|     __tracebackhide__ = True | ||||
|     actor = actor or tractor.current_actor() | ||||
| 
 | ||||
|     # XXX: on latest ``pdbpp`` i guess we don't need this? | ||||
|     # frame = sys._getframe() | ||||
|     # last_f = frame.f_back | ||||
|     # last_f.f_globals['__tracebackhide__'] = True | ||||
| 
 | ||||
|     # start 2 levels up in user code | ||||
|     frame: FrameType = sys._getframe() | ||||
|     if frame: | ||||
|         frame = frame.f_back.f_back  # type: ignore | ||||
| 
 | ||||
|     if pdb and actor is not None: | ||||
|         log.pdb(f"\nAttaching pdb to actor: {actor.uid}\n") | ||||
| 
 | ||||
|         pdb.set_trace( | ||||
|             # start 2 levels up in user code | ||||
|             frame=sys._getframe().f_back.f_back, | ||||
|         ) | ||||
| 
 | ||||
|     else: | ||||
|         pdb, undo_sigint = mk_mpdb() | ||||
| 
 | ||||
|  | @ -679,7 +683,12 @@ def _set_trace( | |||
|         global _local_task_in_debug, _pdb_release_hook | ||||
|         _local_task_in_debug = 'sync' | ||||
| 
 | ||||
|     pdb.set_trace(frame=frame) | ||||
|         _pdb_release_hook = undo_sigint | ||||
| 
 | ||||
|         pdb.set_trace( | ||||
|             # start 2 levels up in user code | ||||
|             frame=sys._getframe().f_back.f_back, | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| breakpoint = partial( | ||||
|  | @ -689,7 +698,7 @@ breakpoint = partial( | |||
| 
 | ||||
| 
 | ||||
| def _post_mortem( | ||||
|     actor: tractor._actor.Actor, | ||||
|     actor: tractor.Actor, | ||||
|     pdb: MultiActorPdb, | ||||
| 
 | ||||
| ) -> None: | ||||
|  |  | |||
|  | @ -24,8 +24,7 @@ import importlib | |||
| import inspect | ||||
| from typing import ( | ||||
|     Any, Optional, | ||||
|     Callable, AsyncGenerator, | ||||
|     Type, | ||||
|     Callable, AsyncGenerator | ||||
| ) | ||||
| from functools import partial | ||||
| from dataclasses import dataclass | ||||
|  | @ -443,10 +442,6 @@ class Portal: | |||
|         _err: Optional[BaseException] = None | ||||
|         ctx._portal = self | ||||
| 
 | ||||
|         uid = self.channel.uid | ||||
|         cid = ctx.cid | ||||
|         etype: Optional[Type[BaseException]] = None | ||||
| 
 | ||||
|         # deliver context instance and .started() msg value in open tuple. | ||||
|         try: | ||||
|             async with trio.open_nursery() as scope_nursery: | ||||
|  | @ -482,24 +477,13 @@ class Portal: | |||
|             # KeyboardInterrupt, | ||||
| 
 | ||||
|         ) as err: | ||||
|             etype = type(err) | ||||
|             _err = err | ||||
|             # the context cancels itself on any cancel | ||||
|             # causing error. | ||||
|             log.cancel( | ||||
|                 f'Context to {self.channel.uid} sending cancel request..') | ||||
| 
 | ||||
|             if ctx.chan.connected(): | ||||
|                 log.cancel( | ||||
|                     'Context cancelled for task, sending cancel request..\n' | ||||
|                     f'task:{cid}\n' | ||||
|                     f'actor:{uid}' | ||||
|                 ) | ||||
|                 await ctx.cancel() | ||||
|             else: | ||||
|                 log.warning( | ||||
|                     'IPC connection for context is broken?\n' | ||||
|                     f'task:{cid}\n' | ||||
|                     f'actor:{uid}' | ||||
|                 ) | ||||
| 
 | ||||
|             await ctx.cancel() | ||||
|             raise | ||||
| 
 | ||||
|         finally: | ||||
|  | @ -508,13 +492,7 @@ class Portal: | |||
|             # sure we get the error the underlying feeder mem chan. | ||||
|             # if it's not raised here it *should* be raised from the | ||||
|             # msg loop nursery right? | ||||
|             if ctx.chan.connected(): | ||||
|                 log.info( | ||||
|                     'Waiting on final context-task result for\n' | ||||
|                     f'task:{cid}\n' | ||||
|                     f'actor:{uid}' | ||||
|                 ) | ||||
|                 result = await ctx.result() | ||||
|             result = await ctx.result() | ||||
| 
 | ||||
|             # though it should be impossible for any tasks | ||||
|             # operating *in* this scope to have survived | ||||
|  | @ -524,17 +502,14 @@ class Portal: | |||
|                 # should we encapsulate this in the context api? | ||||
|                 await ctx._recv_chan.aclose() | ||||
| 
 | ||||
|             if etype: | ||||
|             if _err: | ||||
|                 if ctx._cancel_called: | ||||
|                     log.cancel( | ||||
|                         f'Context {fn_name} cancelled by caller with\n{etype}' | ||||
|                         f'Context {fn_name} cancelled by caller with\n{_err}' | ||||
|                     ) | ||||
|                 elif _err is not None: | ||||
|                     log.cancel( | ||||
|                         f'Context for task cancelled by callee with {etype}\n' | ||||
|                         f'target: `{fn_name}`\n' | ||||
|                         f'task:{cid}\n' | ||||
|                         f'actor:{uid}' | ||||
|                         f'Context {fn_name} cancelled by callee with\n{_err}' | ||||
|                     ) | ||||
|             else: | ||||
|                 log.runtime( | ||||
|  |  | |||
|  | @ -295,7 +295,7 @@ async def new_proc( | |||
|             # the OS; it otherwise can be passed via the parent channel if | ||||
|             # we prefer in the future (for privacy). | ||||
|             "--uid", | ||||
|             str(uid), | ||||
|             str(subactor.uid), | ||||
|             # Address the child must connect to on startup | ||||
|             "--parent_addr", | ||||
|             str(parent_addr) | ||||
|  | @ -321,7 +321,8 @@ async def new_proc( | |||
|                 # wait for actor to spawn and connect back to us | ||||
|                 # channel should have handshake completed by the | ||||
|                 # local actor by the time we get a ref to it | ||||
|                 event, chan = await actor_nursery._actor.wait_for_peer(uid) | ||||
|                 event, chan = await actor_nursery._actor.wait_for_peer( | ||||
|                     subactor.uid) | ||||
| 
 | ||||
|             except trio.Cancelled: | ||||
|                 cancelled_during_spawn = True | ||||
|  | @ -362,54 +363,10 @@ async def new_proc( | |||
|             task_status.started(portal) | ||||
| 
 | ||||
|             # wait for ActorNursery.wait() to be called | ||||
|             n_exited = actor_nursery._join_procs | ||||
|             with trio.CancelScope(shield=True): | ||||
|                 await n_exited.wait() | ||||
|                 await actor_nursery._join_procs.wait() | ||||
| 
 | ||||
|             async with trio.open_nursery() as nursery: | ||||
| 
 | ||||
|                 async def soft_wait_and_maybe_cancel_ria_task(): | ||||
|                     # This is a "soft" (cancellable) join/reap which | ||||
|                     # will remote cancel the actor on a ``trio.Cancelled`` | ||||
|                     # condition. | ||||
|                     await soft_wait( | ||||
|                         proc, | ||||
|                         trio.Process.wait, | ||||
|                         portal | ||||
|                     ) | ||||
| 
 | ||||
|                     if n_exited.is_set(): | ||||
|                         # cancel result waiter that may have been spawned in | ||||
|                         # tandem if not done already | ||||
|                         log.warning( | ||||
|                             "Cancelling existing result waiter task for " | ||||
|                             f"{subactor.uid}" | ||||
|                         ) | ||||
|                         nursery.cancel_scope.cancel() | ||||
| 
 | ||||
|                     else: | ||||
|                         log.warning( | ||||
|                             f'Process for actor {uid} terminated before' | ||||
|                             'nursery exit. ' 'This may mean an IPC' | ||||
|                             'connection failed!' | ||||
|                         ) | ||||
| 
 | ||||
|                 nursery.start_soon(soft_wait_and_maybe_cancel_ria_task) | ||||
| 
 | ||||
|                 # TODO: when we finally remove the `.run_in_actor()` api | ||||
|                 # we should be able to entirely drop these 2 blocking calls: | ||||
|                 # - we don't need to wait on nursery exit to capture | ||||
|                 #   process-spawn-machinery level errors (and propagate them). | ||||
|                 # - we don't need to wait on final results from ria portals | ||||
|                 #   since this will be done in some higher level wrapper API. | ||||
| 
 | ||||
|                 # XXX: interestingly we can't put this here bc it causes | ||||
|                 # the pub-sub tests to fail? wth.. should probably drop | ||||
|                 # those XD | ||||
|                 # wait for ActorNursery.wait() to be called | ||||
|                 # with trio.CancelScope(shield=True): | ||||
|                 #     await n_exited.wait() | ||||
| 
 | ||||
|                 if portal in actor_nursery._cancel_after_result_on_exit: | ||||
|                     nursery.start_soon( | ||||
|                         cancel_on_completion, | ||||
|  | @ -418,6 +375,22 @@ async def new_proc( | |||
|                         errors | ||||
|                     ) | ||||
| 
 | ||||
|                 # This is a "soft" (cancellable) join/reap which | ||||
|                 # will remote cancel the actor on a ``trio.Cancelled`` | ||||
|                 # condition. | ||||
|                 await soft_wait( | ||||
|                     proc, | ||||
|                     trio.Process.wait, | ||||
|                     portal | ||||
|                 ) | ||||
| 
 | ||||
|                 # cancel result waiter that may have been spawned in | ||||
|                 # tandem if not done already | ||||
|                 log.warning( | ||||
|                     "Cancelling existing result waiter task for " | ||||
|                     f"{subactor.uid}") | ||||
|                 nursery.cancel_scope.cancel() | ||||
| 
 | ||||
|         finally: | ||||
|             # The "hard" reap since no actor zombies are allowed! | ||||
|             # XXX: do this **after** cancellation/tearfown to avoid | ||||
|  | @ -434,10 +407,8 @@ async def new_proc( | |||
|                                 await proc.wait() | ||||
| 
 | ||||
|                     if is_root_process(): | ||||
| 
 | ||||
|                         await maybe_wait_for_debugger( | ||||
|                             child_in_debug=_runtime_vars.get( | ||||
|                                 '_debug_mode', False) | ||||
|                             child_in_debug=_runtime_vars.get('_debug_mode', False), | ||||
|                         ) | ||||
| 
 | ||||
|                     if proc.poll() is None: | ||||
|  |  | |||
|  | @ -604,8 +604,7 @@ class Context: | |||
|                     self._portal._streams.remove(rchan) | ||||
| 
 | ||||
|     async def result(self) -> Any: | ||||
|         ''' | ||||
|         From a caller side, wait for and return the final result from | ||||
|         '''From a caller side, wait for and return the final result from | ||||
|         the callee side task. | ||||
| 
 | ||||
|         ''' | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue