Compare commits

..

No commits in common. "4fbd469c332d50f923859caad2cdd74c98913469" and "9133f42b07e69cbae2d8c5da077397b946cf4525" have entirely different histories.

12 changed files with 224 additions and 543 deletions

View File

@ -285,14 +285,14 @@ def test_basic_payload_spec(
if invalid_started: if invalid_started:
msg_type_str: str = 'Started' msg_type_str: str = 'Started'
bad_value: int = 10 bad_value_str: str = '10'
elif invalid_return: elif invalid_return:
msg_type_str: str = 'Return' msg_type_str: str = 'Return'
bad_value: str = 'yo' bad_value_str: str = "'yo'"
else: else:
# XXX but should never be used below then.. # XXX but should never be used below then..
msg_type_str: str = '' msg_type_str: str = ''
bad_value: str = '' bad_value_str: str = ''
maybe_mte: MsgTypeError|None = None maybe_mte: MsgTypeError|None = None
should_raise: Exception|None = ( should_raise: Exception|None = (
@ -307,10 +307,8 @@ def test_basic_payload_spec(
raises=should_raise, raises=should_raise,
ensure_in_message=[ ensure_in_message=[
f"invalid `{msg_type_str}` msg payload", f"invalid `{msg_type_str}` msg payload",
f'{bad_value}', f"value: `{bad_value_str}` does not "
f'has type {type(bad_value)!r}', f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
'not match type-spec',
f'`{msg_type_str}.pld: PldMsg|NoneType`',
], ],
# only for debug # only for debug
# post_mortem=True, # post_mortem=True,

View File

@ -38,7 +38,6 @@ from collections import deque
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from contextvars import Token
from dataclasses import ( from dataclasses import (
dataclass, dataclass,
field, field,
@ -122,19 +121,10 @@ class Unresolved:
@dataclass @dataclass
class Context: class Context:
''' '''
An inter-actor, SC transitive, `trio.Task` (pair) An inter-actor, SC transitive, `Task` communication context.
communication context.
(We've also considered other names and ideas: NB: This class should **never be instatiated directly**, it is allocated
- "communicating tasks scope": cts by the runtime in 2 ways:
- "distributed task scope": dts
- "communicating tasks context": ctc
**Got a better idea for naming? Make an issue dawg!**
)
NB: This class should **never be instatiated directly**, it is
allocated by the runtime in 2 ways:
- by entering `Portal.open_context()` which is the primary - by entering `Portal.open_context()` which is the primary
public API for any "parent" task or, public API for any "parent" task or,
- by the RPC machinery's `._rpc._invoke()` as a `ctx` arg - by the RPC machinery's `._rpc._invoke()` as a `ctx` arg
@ -220,16 +210,6 @@ class Context:
# more the the `Context` is needed? # more the the `Context` is needed?
_portal: Portal | None = None _portal: Portal | None = None
@property
def portal(self) -> Portal|None:
'''
Return any wrapping memory-`Portal` if this is
a 'parent'-side task which called `Portal.open_context()`,
otherwise `None`.
'''
return self._portal
# NOTE: each side of the context has its own cancel scope # NOTE: each side of the context has its own cancel scope
# which is exactly the primitive that allows for # which is exactly the primitive that allows for
# cross-actor-task-supervision and thus SC. # cross-actor-task-supervision and thus SC.
@ -319,8 +299,6 @@ class Context:
# boxed exception. NOW, it's used for spawning overrun queuing # boxed exception. NOW, it's used for spawning overrun queuing
# tasks when `.allow_overruns == True` !!! # tasks when `.allow_overruns == True` !!!
_scope_nursery: trio.Nursery|None = None _scope_nursery: trio.Nursery|None = None
# ^-TODO-^ change name?
# -> `._scope_tn` "scope task nursery"
# streaming overrun state tracking # streaming overrun state tracking
_in_overrun: bool = False _in_overrun: bool = False
@ -430,23 +408,10 @@ class Context:
''' '''
return self._cancel_called return self._cancel_called
@cancel_called.setter
def cancel_called(self, val: bool) -> None:
'''
Set the self-cancelled request `bool` value.
'''
# to debug who frickin sets it..
# if val:
# from .devx import pause_from_sync
# pause_from_sync()
self._cancel_called = val
@property @property
def canceller(self) -> tuple[str, str]|None: def canceller(self) -> tuple[str, str]|None:
''' '''
`Actor.uid: tuple[str, str]` of the (remote) ``Actor.uid: tuple[str, str]`` of the (remote)
actor-process who's task was cancelled thus causing this actor-process who's task was cancelled thus causing this
(side of the) context to also be cancelled. (side of the) context to also be cancelled.
@ -550,7 +515,7 @@ class Context:
# the local scope was never cancelled # the local scope was never cancelled
# and instead likely we received a remote side # and instead likely we received a remote side
# # cancellation that was raised inside `.wait_for_result()` # # cancellation that was raised inside `.result()`
# or ( # or (
# (se := self._local_error) # (se := self._local_error)
# and se is re # and se is re
@ -620,10 +585,6 @@ class Context:
self, self,
error: BaseException, error: BaseException,
# TODO: manual toggle for cases where we wouldn't normally
# mark ourselves cancelled but want to?
# set_cancel_called: bool = False,
) -> None: ) -> None:
''' '''
(Maybe) cancel this local scope due to a received remote (Maybe) cancel this local scope due to a received remote
@ -642,7 +603,7 @@ class Context:
- `Portal.open_context()` - `Portal.open_context()`
- `Portal.result()` - `Portal.result()`
- `Context.open_stream()` - `Context.open_stream()`
- `Context.wait_for_result()` - `Context.result()`
when called/closed by actor local task(s). when called/closed by actor local task(s).
@ -768,7 +729,7 @@ class Context:
# Cancel the local `._scope`, catch that # Cancel the local `._scope`, catch that
# `._scope.cancelled_caught` and re-raise any remote error # `._scope.cancelled_caught` and re-raise any remote error
# once exiting (or manually calling `.wait_for_result()`) the # once exiting (or manually calling `.result()`) the
# `.open_context()` block. # `.open_context()` block.
cs: trio.CancelScope = self._scope cs: trio.CancelScope = self._scope
if ( if (
@ -803,9 +764,8 @@ class Context:
# `trio.Cancelled` subtype here ;) # `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368 # https://github.com/goodboy/tractor/issues/368
message: str = 'Cancelling `Context._scope` !\n\n' message: str = 'Cancelling `Context._scope` !\n\n'
# from .devx import pause_from_sync
# pause_from_sync()
self._scope.cancel() self._scope.cancel()
else: else:
message: str = 'NOT cancelling `Context._scope` !\n\n' message: str = 'NOT cancelling `Context._scope` !\n\n'
# from .devx import mk_pdb # from .devx import mk_pdb
@ -885,15 +845,15 @@ class Context:
@property @property
def repr_api(self) -> str: def repr_api(self) -> str:
return 'Portal.open_context()'
# TODO: use `.dev._frame_stack` scanning to find caller!
# ci: CallerInfo|None = self._caller_info # ci: CallerInfo|None = self._caller_info
# if ci: # if ci:
# return ( # return (
# f'{ci.api_nsp}()\n' # f'{ci.api_nsp}()\n'
# ) # )
# TODO: use `.dev._frame_stack` scanning to find caller!
return 'Portal.open_context()'
async def cancel( async def cancel(
self, self,
timeout: float = 0.616, timeout: float = 0.616,
@ -929,8 +889,7 @@ class Context:
''' '''
side: str = self.side side: str = self.side
# XXX for debug via the `@.setter` self._cancel_called: bool = True
self.cancel_called = True
header: str = ( header: str = (
f'Cancelling ctx with peer from {side.upper()} side\n\n' f'Cancelling ctx with peer from {side.upper()} side\n\n'
@ -953,7 +912,7 @@ class Context:
# `._scope.cancel()` since we expect the eventual # `._scope.cancel()` since we expect the eventual
# `ContextCancelled` from the other side to trigger this # `ContextCancelled` from the other side to trigger this
# when the runtime finally receives it during teardown # when the runtime finally receives it during teardown
# (normally in `.wait_for_result()` called from # (normally in `.result()` called from
# `Portal.open_context().__aexit__()`) # `Portal.open_context().__aexit__()`)
if side == 'parent': if side == 'parent':
if not self._portal: if not self._portal:
@ -1066,10 +1025,10 @@ class Context:
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
peer_uid: tuple = self.chan.uid our_uid: tuple = self.chan.uid
# XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
# for "graceful cancellation" case(s): # for "graceful cancellation" case:
# #
# Whenever a "side" of a context (a `Task` running in # Whenever a "side" of a context (a `Task` running in
# an actor) **is** the side which requested ctx # an actor) **is** the side which requested ctx
@ -1086,11 +1045,9 @@ class Context:
# set to the `Actor.uid` of THIS task (i.e. the # set to the `Actor.uid` of THIS task (i.e. the
# cancellation requesting task's actor is the actor # cancellation requesting task's actor is the actor
# checking whether it should absorb the ctxc). # checking whether it should absorb the ctxc).
self_ctxc: bool = self._is_self_cancelled(remote_error)
if ( if (
self_ctxc
and
not raise_ctxc_from_self_call not raise_ctxc_from_self_call
and self._is_self_cancelled(remote_error)
# TODO: ?potentially it is useful to emit certain # TODO: ?potentially it is useful to emit certain
# warning/cancel logs for the cases where the # warning/cancel logs for the cases where the
@ -1120,8 +1077,8 @@ class Context:
and isinstance(remote_error, RemoteActorError) and isinstance(remote_error, RemoteActorError)
and remote_error.boxed_type is StreamOverrun and remote_error.boxed_type is StreamOverrun
# and tuple(remote_error.msgdata['sender']) == peer_uid # and tuple(remote_error.msgdata['sender']) == our_uid
and tuple(remote_error.sender) == peer_uid and tuple(remote_error.sender) == our_uid
): ):
# NOTE: we set the local scope error to any "self # NOTE: we set the local scope error to any "self
# cancellation" error-response thus "absorbing" # cancellation" error-response thus "absorbing"
@ -1183,9 +1140,9 @@ class Context:
of the remote cancellation. of the remote cancellation.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__ = hide_tb
assert self._portal, ( assert self._portal, (
'`Context.wait_for_result()` can not be called from callee side!' "Context.result() can not be called from callee side!"
) )
if self._final_result_is_set(): if self._final_result_is_set():
return self._result return self._result
@ -1240,11 +1197,10 @@ class Context:
# raising something we know might happen # raising something we know might happen
# during cancellation ;) # during cancellation ;)
(not self._cancel_called) (not self._cancel_called)
), )
hide_tb=hide_tb,
) )
# TODO: eventually make `.outcome: Outcome` and thus return # TODO: eventually make `.outcome: Outcome` and thus return
# `self.outcome.unwrap()` here? # `self.outcome.unwrap()` here!
return self.outcome return self.outcome
# TODO: switch this with above! # TODO: switch this with above!
@ -1328,24 +1284,17 @@ class Context:
Any| Any|
RemoteActorError| RemoteActorError|
ContextCancelled ContextCancelled
# TODO: make this a `outcome.Outcome`!
): ):
''' '''
Return the "final outcome" (state) of the far end peer task The final "outcome" from an IPC context which can either be
non-blocking. If the remote task has not completed then this some Value returned from the target `@context`-decorated
field always resolves to the module defined `Unresolved` remote task-as-func, or an `Error` wrapping an exception
handle. raised from an RPC task fault or cancellation.
------ - ------ Note that if the remote task has not terminated then this
TODO->( this is doc-driven-dev content not yet actual ;P ) field always resolves to the module defined `Unresolved` handle.
The final "outcome" from an IPC context which can be any of: TODO: implement this using `outcome.Outcome` types?
- some `outcome.Value` which boxes the returned output from the peer task's
`@context`-decorated remote task-as-func, or
- an `outcome.Error` wrapping an exception raised that same RPC task
after a fault or cancellation, or
- an unresolved `outcome.Outcome` when the peer task is still
executing and has not yet completed.
''' '''
return ( return (
@ -1634,7 +1583,7 @@ class Context:
- NEVER `return` early before delivering the msg! - NEVER `return` early before delivering the msg!
bc if the error is a ctxc and there is a task waiting on bc if the error is a ctxc and there is a task waiting on
`.wait_for_result()` we need the msg to be `.result()` we need the msg to be
`send_chan.send_nowait()`-ed over the `._rx_chan` so `send_chan.send_nowait()`-ed over the `._rx_chan` so
that the error is relayed to that waiter task and thus that the error is relayed to that waiter task and thus
raised in user code! raised in user code!
@ -1879,7 +1828,7 @@ async def open_context_from_portal(
When the "callee" (side that is "called"/started by a call When the "callee" (side that is "called"/started by a call
to *this* method) returns, the caller side (this) unblocks to *this* method) returns, the caller side (this) unblocks
and any final value delivered from the other end can be and any final value delivered from the other end can be
retrieved using the `Contex.wait_for_result()` api. retrieved using the `Contex.result()` api.
The yielded ``Context`` instance further allows for opening The yielded ``Context`` instance further allows for opening
bidirectional streams, explicit cancellation and bidirectional streams, explicit cancellation and
@ -1944,7 +1893,7 @@ async def open_context_from_portal(
) )
assert ctx._remote_func_type == 'context' assert ctx._remote_func_type == 'context'
assert ctx._caller_info assert ctx._caller_info
prior_ctx_tok: Token = _ctxvar_Context.set(ctx) _ctxvar_Context.set(ctx)
# placeholder for any exception raised in the runtime # placeholder for any exception raised in the runtime
# or by user tasks which cause this context's closure. # or by user tasks which cause this context's closure.
@ -2016,14 +1965,14 @@ async def open_context_from_portal(
yield ctx, first yield ctx, first
# ??TODO??: do we still want to consider this or is # ??TODO??: do we still want to consider this or is
# the `else:` block handling via a `.wait_for_result()` # the `else:` block handling via a `.result()`
# call below enough?? # call below enough??
# #
# -[ ] pretty sure `.wait_for_result()` internals do the # -[ ] pretty sure `.result()` internals do the
# same as our ctxc handler below so it ended up # same as our ctxc handler below so it ended up
# being same (repeated?) behaviour, but ideally we # being same (repeated?) behaviour, but ideally we
# wouldn't have that duplication either by somehow # wouldn't have that duplication either by somehow
# factoring the `.wait_for_result()` handler impl in a way # factoring the `.result()` handler impl in a way
# that we can re-use it around the `yield` ^ here # that we can re-use it around the `yield` ^ here
# or vice versa? # or vice versa?
# #
@ -2161,7 +2110,7 @@ async def open_context_from_portal(
# AND a group-exc is only raised if there was > 1 # AND a group-exc is only raised if there was > 1
# tasks started *here* in the "caller" / opener # tasks started *here* in the "caller" / opener
# block. If any one of those tasks calls # block. If any one of those tasks calls
# `.wait_for_result()` or `MsgStream.receive()` # `.result()` or `MsgStream.receive()`
# `._maybe_raise_remote_err()` will be transitively # `._maybe_raise_remote_err()` will be transitively
# called and the remote error raised causing all # called and the remote error raised causing all
# tasks to be cancelled. # tasks to be cancelled.
@ -2231,7 +2180,7 @@ async def open_context_from_portal(
f'|_{ctx._task}\n' f'|_{ctx._task}\n'
) )
# XXX NOTE XXX: the below call to # XXX NOTE XXX: the below call to
# `Context.wait_for_result()` will ALWAYS raise # `Context.result()` will ALWAYS raise
# a `ContextCancelled` (via an embedded call to # a `ContextCancelled` (via an embedded call to
# `Context._maybe_raise_remote_err()`) IFF # `Context._maybe_raise_remote_err()`) IFF
# a `Context._remote_error` was set by the runtime # a `Context._remote_error` was set by the runtime
@ -2241,10 +2190,10 @@ async def open_context_from_portal(
# ALWAYS SET any time "callee" side fails and causes "caller # ALWAYS SET any time "callee" side fails and causes "caller
# side" cancellation via a `ContextCancelled` here. # side" cancellation via a `ContextCancelled` here.
try: try:
result_or_err: Exception|Any = await ctx.wait_for_result() result_or_err: Exception|Any = await ctx.result()
except BaseException as berr: except BaseException as berr:
# on normal teardown, if we get some error # on normal teardown, if we get some error
# raised in `Context.wait_for_result()` we still want to # raised in `Context.result()` we still want to
# save that error on the ctx's state to # save that error on the ctx's state to
# determine things like `.cancelled_caught` for # determine things like `.cancelled_caught` for
# cases where there was remote cancellation but # cases where there was remote cancellation but
@ -2395,9 +2344,6 @@ async def open_context_from_portal(
None, None,
) )
# XXX revert to prior IPC-task-ctx scope
_ctxvar_Context.reset(prior_ctx_tok)
def mk_context( def mk_context(
chan: Channel, chan: Channel,

View File

@ -20,8 +20,7 @@ Sub-process entry points.
""" """
from __future__ import annotations from __future__ import annotations
from functools import partial from functools import partial
import os # import textwrap
import textwrap
from typing import ( from typing import (
Any, Any,
TYPE_CHECKING, TYPE_CHECKING,
@ -59,7 +58,7 @@ def _mp_main(
) -> None: ) -> None:
''' '''
The routine called *after fork* which invokes a fresh `trio.run()` The routine called *after fork* which invokes a fresh ``trio.run``
''' '''
actor._forkserver_info = forkserver_info actor._forkserver_info = forkserver_info
@ -97,107 +96,6 @@ def _mp_main(
log.info(f"Subactor {actor.uid} terminated") log.info(f"Subactor {actor.uid} terminated")
# TODO: move this func to some kinda `.devx._conc_lang.py` eventually
# as we work out our multi-domain state-flow-syntax!
def nest_from_op(
input_op: str,
#
# ?TODO? an idea for a syntax to the state of concurrent systems
# as a "3-domain" (execution, scope, storage) model and using
# a minimal ascii/utf-8 operator-set.
#
# try not to take any of this seriously yet XD
#
# > is a "play operator" indicating (CPU bound)
# exec/work/ops required at the "lowest level computing"
#
# execution primititves (tasks, threads, actors..) denote their
# lifetime with '(' and ')' since parentheses normally are used
# in many langs to denote function calls.
#
# starting = (
# >( opening/starting; beginning of the thread-of-exec (toe?)
# (> opened/started, (finished spawning toe)
# |_<Task: blah blah..> repr of toe, in py these look like <objs>
#
# >) closing/exiting/stopping,
# )> closed/exited/stopped,
# |_<Task: blah blah..>
# [OR <), )< ?? ]
#
# ending = )
# >c) cancelling to close/exit
# c)> cancelled (caused close), OR?
# |_<Actor: ..>
# OR maybe "<c)" which better indicates the cancel being
# "delivered/returned" / returned" to LHS?
#
# >x) erroring to eventuall exit
# x)> errored and terminated
# |_<Actor: ...>
#
# scopes: supers/nurseries, IPC-ctxs, sessions, perms, etc.
# >{ opening
# {> opened
# }> closed
# >} closing
#
# storage: like queues, shm-buffers, files, etc..
# >[ opening
# [> opened
# |_<FileObj: ..>
#
# >] closing
# ]> closed
# IPC ops: channels, transports, msging
# => req msg
# <= resp msg
# <=> 2-way streaming (of msgs)
# <- recv 1 msg
# -> send 1 msg
#
# TODO: still not sure on R/L-HS approach..?
# =>( send-req to exec start (task, actor, thread..)
# (<= recv-req to ^
#
# (<= recv-req ^
# <=( recv-resp opened remote exec primitive
# <=) recv-resp closed
#
# )<=c req to stop due to cancel
# c=>) req to stop due to cancel
#
# =>{ recv-req to open
# <={ send-status that it closed
tree_str: str,
# NOTE: so move back-from-the-left of the `input_op` by
# this amount.
back_from_op: int = 0,
) -> str:
'''
Depth-increment the input (presumably hierarchy/supervision)
input "tree string" below the provided `input_op` execution
operator, so injecting a `"\n|_{input_op}\n"`and indenting the
`tree_str` to nest content aligned with the ops last char.
'''
return (
f'{input_op}\n'
+
textwrap.indent(
tree_str,
prefix=(
len(input_op)
-
(back_from_op + 1)
) * ' ',
)
)
def _trio_main( def _trio_main(
actor: Actor, actor: Actor,
*, *,
@ -209,6 +107,7 @@ def _trio_main(
Entry point for a `trio_run_in_process` subactor. Entry point for a `trio_run_in_process` subactor.
''' '''
# __tracebackhide__: bool = True
_debug.hide_runtime_frames() _debug.hide_runtime_frames()
_state._current_actor = actor _state._current_actor = actor
@ -220,6 +119,7 @@ def _trio_main(
if actor.loglevel is not None: if actor.loglevel is not None:
get_console_log(actor.loglevel) get_console_log(actor.loglevel)
import os
actor_info: str = ( actor_info: str = (
f'|_{actor}\n' f'|_{actor}\n'
f' uid: {actor.uid}\n' f' uid: {actor.uid}\n'
@ -228,23 +128,13 @@ def _trio_main(
f' loglevel: {actor.loglevel}\n' f' loglevel: {actor.loglevel}\n'
) )
log.info( log.info(
'Starting new `trio` subactor:\n' 'Started new trio subactor:\n'
+ +
nest_from_op( '>\n' # like a "started/play"-icon from super perspective
input_op='>(', # see syntax ideas above +
tree_str=actor_info, actor_info,
back_from_op=1,
)
) )
logmeth = log.info
exit_status: str = (
'Subactor exited\n'
+
nest_from_op(
input_op=')>', # like a "closed-to-play"-icon from super perspective
tree_str=actor_info,
)
)
try: try:
if infect_asyncio: if infect_asyncio:
actor._infected_aio = True actor._infected_aio = True
@ -253,28 +143,16 @@ def _trio_main(
trio.run(trio_main) trio.run(trio_main)
except KeyboardInterrupt: except KeyboardInterrupt:
logmeth = log.cancel log.cancel(
exit_status: str = ( 'Actor received KBI\n'
'Actor received KBI (aka an OS-cancel)\n'
+ +
nest_from_op( actor_info
input_op='c)>', # closed due to cancel (see above)
tree_str=actor_info,
)
) )
except BaseException as err:
logmeth = log.error
exit_status: str = (
'Main actor task crashed during exit?\n'
+
nest_from_op(
input_op='x)>', # closed by error
tree_str=actor_info,
)
)
# NOTE since we raise a tb will already be shown on the
# console, thus we do NOT use `.exception()` above.
raise err
finally: finally:
logmeth(exit_status) log.info(
'Subactor terminated\n'
+
'x\n' # like a "crossed-out/killed" from super perspective
+
actor_info
)

View File

@ -121,8 +121,7 @@ class Portal:
) )
return self.chan return self.chan
# TODO: factor this out into a `.highlevel` API-wrapper that uses # TODO: factor this out into an `ActorNursery` wrapper
# a single `.open_context()` call underneath.
async def _submit_for_result( async def _submit_for_result(
self, self,
ns: str, ns: str,
@ -142,22 +141,13 @@ class Portal:
portal=self, portal=self,
) )
# TODO: we should deprecate this API right? since if we remove
# `.run_in_actor()` (and instead move it to a `.highlevel`
# wrapper api (around a single `.open_context()` call) we don't
# really have any notion of a "main" remote task any more?
#
# @api_frame # @api_frame
async def wait_for_result( async def result(self) -> Any:
self,
hide_tb: bool = True,
) -> Any:
''' '''
Return the final result delivered by a `Return`-msg from the Return the result(s) from the remote actor's "main" task.
remote peer actor's "main" task's `return` statement.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__ = True
# Check for non-rpc errors slapped on the # Check for non-rpc errors slapped on the
# channel for which we always raise # channel for which we always raise
exc = self.channel._exc exc = self.channel._exc
@ -192,23 +182,6 @@ class Portal:
return self._final_result_pld return self._final_result_pld
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
async def result(
self,
*args,
**kwargs,
) -> Any|Exception:
typname: str = type(self).__name__
log.warning(
f'`{typname}.result()` is DEPRECATED!\n'
f'Use `{typname}.wait_for_result()` instead!\n'
)
return await self.wait_for_result(
*args,
**kwargs,
)
async def _cancel_streams(self): async def _cancel_streams(self):
# terminate all locally running async generator # terminate all locally running async generator
# IPC calls # IPC calls
@ -267,7 +240,6 @@ class Portal:
f'{reminfo}' f'{reminfo}'
) )
# XXX the one spot we set it?
self.channel._cancel_called: bool = True self.channel._cancel_called: bool = True
try: try:
# send cancel cmd - might not get response # send cancel cmd - might not get response
@ -307,8 +279,6 @@ class Portal:
) )
return False return False
# TODO: do we still need this for low level `Actor`-runtime
# method calls or can we also remove it?
async def run_from_ns( async def run_from_ns(
self, self,
namespace_path: str, namespace_path: str,
@ -346,8 +316,6 @@ class Portal:
expect_msg=Return, expect_msg=Return,
) )
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
async def run( async def run(
self, self,
func: str, func: str,
@ -402,8 +370,6 @@ class Portal:
expect_msg=Return, expect_msg=Return,
) )
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
@acm @acm
async def open_stream_from( async def open_stream_from(
self, self,

View File

@ -21,7 +21,6 @@ Root actor runtime ignition(s).
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
import importlib import importlib
import inspect
import logging import logging
import os import os
import signal import signal
@ -116,16 +115,10 @@ async def open_root_actor(
if ( if (
debug_mode debug_mode
and maybe_enable_greenback and maybe_enable_greenback
and ( and await _debug.maybe_init_greenback(
maybe_mod := await _debug.maybe_init_greenback( raise_not_found=False,
raise_not_found=False,
)
) )
): ):
logger.info(
f'Found `greenback` installed @ {maybe_mod}\n'
'Enabling `tractor.pause_from_sync()` support!\n'
)
os.environ['PYTHONBREAKPOINT'] = ( os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx._debug._sync_pause_from_builtin' 'tractor.devx._debug._sync_pause_from_builtin'
) )
@ -271,9 +264,7 @@ async def open_root_actor(
except OSError: except OSError:
# TODO: make this a "discovery" log level? # TODO: make this a "discovery" log level?
logger.info( logger.warning(f'No actor registry found @ {addr}')
f'No actor registry found @ {addr}\n'
)
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
for addr in registry_addrs: for addr in registry_addrs:
@ -287,6 +278,7 @@ async def open_root_actor(
# Create a new local root-actor instance which IS NOT THE # Create a new local root-actor instance which IS NOT THE
# REGISTRAR # REGISTRAR
if ponged_addrs: if ponged_addrs:
if ensure_registry: if ensure_registry:
raise RuntimeError( raise RuntimeError(
f'Failed to open `{name}`@{ponged_addrs}: ' f'Failed to open `{name}`@{ponged_addrs}: '
@ -373,25 +365,23 @@ async def open_root_actor(
) )
try: try:
yield actor yield actor
except ( except (
Exception, Exception,
BaseExceptionGroup, BaseExceptionGroup,
) as err: ) as err:
# XXX NOTE XXX see equiv note inside
# `._runtime.Actor._stream_handler()` where in the import inspect
# non-root or root-that-opened-this-mahually case we
# wait for the local actor-nursery to exit before
# exiting the transport channel handler.
entered: bool = await _debug._maybe_enter_pm( entered: bool = await _debug._maybe_enter_pm(
err, err,
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
) )
if ( if (
not entered not entered
and and not is_multi_cancelled(err)
not is_multi_cancelled(err)
): ):
logger.exception('Root actor crashed\n') logger.exception('Root actor crashed:\n')
# ALWAYS re-raise any error bubbled up from the # ALWAYS re-raise any error bubbled up from the
# runtime! # runtime!

View File

@ -1046,10 +1046,6 @@ class Actor:
# TODO: another `Struct` for rtvs.. # TODO: another `Struct` for rtvs..
rvs: dict[str, Any] = spawnspec._runtime_vars rvs: dict[str, Any] = spawnspec._runtime_vars
if rvs['_debug_mode']: if rvs['_debug_mode']:
from .devx import (
enable_stack_on_sig,
maybe_init_greenback,
)
try: try:
# TODO: maybe return some status msgs upward # TODO: maybe return some status msgs upward
# to that we can emit them in `con_status` # to that we can emit them in `con_status`
@ -1057,27 +1053,13 @@ class Actor:
log.devx( log.devx(
'Enabling `stackscope` traces on SIGUSR1' 'Enabling `stackscope` traces on SIGUSR1'
) )
from .devx import enable_stack_on_sig
enable_stack_on_sig() enable_stack_on_sig()
except ImportError: except ImportError:
log.warning( log.warning(
'`stackscope` not installed for use in debug mode!' '`stackscope` not installed for use in debug mode!'
) )
if rvs.get('use_greenback', False):
maybe_mod: ModuleType|None = await maybe_init_greenback()
if maybe_mod:
log.devx(
'Activated `greenback` '
'for `tractor.pause_from_sync()` support!'
)
else:
rvs['use_greenback'] = False
log.warning(
'`greenback` not installed for use in debug mode!\n'
'`tractor.pause_from_sync()` not available!'
)
rvs['_is_root'] = False rvs['_is_root'] = False
_state._runtime_vars.update(rvs) _state._runtime_vars.update(rvs)
@ -1735,8 +1717,8 @@ async def async_main(
# Register with the arbiter if we're told its addr # Register with the arbiter if we're told its addr
log.runtime( log.runtime(
f'Registering `{actor.name}` => {pformat(accept_addrs)}\n' f'Registering `{actor.name}` ->\n'
# ^-TODO-^ we should instead show the maddr here^^ f'{pformat(accept_addrs)}'
) )
# TODO: ideally we don't fan out to all registrars # TODO: ideally we don't fan out to all registrars
@ -1794,90 +1776,57 @@ async def async_main(
# Blocks here as expected until the root nursery is # Blocks here as expected until the root nursery is
# killed (i.e. this actor is cancelled or signalled by the parent) # killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as internal_err: except Exception as err:
log.runtime("Closing all actor lifetime contexts")
actor.lifetime_stack.close()
if not is_registered: if not is_registered:
err_report: str = (
'\n'
"Actor runtime (internally) failed BEFORE contacting the registry?\n"
f'registrars -> {actor.reg_addrs} ?!?!\n\n'
'^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n'
'\t>> CALMLY CANCEL YOUR CHILDREN AND CALL YOUR PARENTS <<\n\n'
'\tIf this is a sub-actor hopefully its parent will keep running '
'and cancel/reap this sub-process..\n'
'(well, presuming this error was propagated upward)\n\n'
'\t---------------------------------------------\n'
'\tPLEASE REPORT THIS TRACEBACK IN A BUG REPORT @ ' # oneline
'https://github.com/goodboy/tractor/issues\n'
'\t---------------------------------------------\n'
)
# TODO: I guess we could try to connect back # TODO: I guess we could try to connect back
# to the parent through a channel and engage a debugger # to the parent through a channel and engage a debugger
# once we have that all working with std streams locking? # once we have that all working with std streams locking?
log.exception(err_report) log.exception(
f"Actor errored and failed to register with arbiter "
f"@ {actor.reg_addrs[0]}?")
log.error(
"\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n"
"\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n"
"\tIf this is a sub-actor hopefully its parent will keep running "
"correctly presuming this error was safely ignored..\n\n"
"\tPLEASE REPORT THIS TRACEBACK IN A BUG REPORT: "
"https://github.com/goodboy/tractor/issues\n"
)
if actor._parent_chan: if actor._parent_chan:
await try_ship_error_to_remote( await try_ship_error_to_remote(
actor._parent_chan, actor._parent_chan,
internal_err, err,
) )
# always! # always!
match internal_err: match err:
case ContextCancelled(): case ContextCancelled():
log.cancel( log.cancel(
f'Actor: {actor.uid} was task-context-cancelled with,\n' f'Actor: {actor.uid} was task-context-cancelled with,\n'
f'str(internal_err)' f'str(err)'
) )
case _: case _:
log.exception( log.exception("Actor errored:")
'Main actor-runtime task errored\n' raise
f'<x)\n'
f' |_{actor}\n'
)
raise internal_err
finally: finally:
teardown_report: str = ( log.runtime(
'Main actor-runtime task completed\n' 'Runtime nursery complete'
'-> Closing all actor lifetime contexts..'
) )
# tear down all lifetime contexts if not in guest mode
# XXX: should this just be in the entrypoint?
actor.lifetime_stack.close()
# ?TODO? should this be in `._entry`/`._root` mods instead? # TODO: we can't actually do this bc the debugger
# # uses the _service_n to spawn the lock task, BUT,
# teardown any actor-lifetime-bound contexts # in theory if we had the root nursery surround this finally
ls: ExitStack = actor.lifetime_stack # block it might be actually possible to debug THIS
# only report if there are any registered # machinery in the same way as user task code?
cbs: list[Callable] = [
repr(tup[1].__wrapped__)
for tup in ls._exit_callbacks
]
if cbs:
cbs_str: str = '\n'.join(cbs)
teardown_report += (
'-> Closing actor-lifetime-bound callbacks\n\n'
f'}}>\n'
f' |_{ls}\n'
f' |_{cbs_str}\n'
)
# XXX NOTE XXX this will cause an error which
# prevents any `infected_aio` actor from continuing
# and any callbacks in the `ls` here WILL NOT be
# called!!
# await _debug.pause(shield=True)
ls.close()
# XXX TODO but hard XXX
# we can't actually do this bc the debugger uses the
# _service_n to spawn the lock task, BUT, in theory if we had
# the root nursery surround this finally block it might be
# actually possible to debug THIS machinery in the same way
# as user task code?
#
# if actor.name == 'brokerd.ib': # if actor.name == 'brokerd.ib':
# with CancelScope(shield=True): # with CancelScope(shield=True):
# await _debug.breakpoint() # await _debug.breakpoint()
@ -1907,9 +1856,9 @@ async def async_main(
failed = True failed = True
if failed: if failed:
teardown_report += ( log.warning(
f'-> Failed to unregister {actor.name} from ' f'Failed to unregister {actor.name} from '
f'registar @ {addr}\n' f'registar @ {addr}'
) )
# Ensure all peers (actors connected to us as clients) are finished # Ensure all peers (actors connected to us as clients) are finished
@ -1917,17 +1866,13 @@ async def async_main(
if any( if any(
chan.connected() for chan in chain(*actor._peers.values()) chan.connected() for chan in chain(*actor._peers.values())
): ):
teardown_report += ( log.runtime(
f'-> Waiting for remaining peers {actor._peers} to clear..\n' f"Waiting for remaining peers {actor._peers} to clear")
)
log.runtime(teardown_report)
with CancelScope(shield=True): with CancelScope(shield=True):
await actor._no_more_peers.wait() await actor._no_more_peers.wait()
log.runtime("All peer channels are complete")
teardown_report += ('-> All peer channels are complete\n') log.runtime("Runtime completed")
teardown_report += ('Actor runtime exited')
log.info(teardown_report)
# TODO: rename to `Registry` and move to `._discovery`! # TODO: rename to `Registry` and move to `._discovery`!

View File

@ -149,7 +149,7 @@ async def exhaust_portal(
# XXX: streams should never be reaped here since they should # XXX: streams should never be reaped here since they should
# always be established and shutdown using a context manager api # always be established and shutdown using a context manager api
final: Any = await portal.wait_for_result() final: Any = await portal.result()
except ( except (
Exception, Exception,
@ -223,8 +223,8 @@ async def cancel_on_completion(
async def hard_kill( async def hard_kill(
proc: trio.Process, proc: trio.Process,
terminate_after: int = 1.6, terminate_after: int = 1.6,
# NOTE: for mucking with `.pause()`-ing inside the runtime # NOTE: for mucking with `.pause()`-ing inside the runtime
# whilst also hacking on it XD # whilst also hacking on it XD
# terminate_after: int = 99999, # terminate_after: int = 99999,

View File

@ -44,7 +44,7 @@ _runtime_vars: dict[str, Any] = {
'_root_mailbox': (None, None), '_root_mailbox': (None, None),
'_registry_addrs': [], '_registry_addrs': [],
# for `tractor.pause_from_sync()` & `breakpoint()` support # for `breakpoint()` support
'use_greenback': False, 'use_greenback': False,
} }

View File

@ -80,7 +80,6 @@ class ActorNursery:
''' '''
def __init__( def __init__(
self, self,
# TODO: maybe def these as fields of a struct looking type?
actor: Actor, actor: Actor,
ria_nursery: trio.Nursery, ria_nursery: trio.Nursery,
da_nursery: trio.Nursery, da_nursery: trio.Nursery,
@ -89,10 +88,8 @@ class ActorNursery:
) -> None: ) -> None:
# self.supervisor = supervisor # TODO # self.supervisor = supervisor # TODO
self._actor: Actor = actor self._actor: Actor = actor
self._ria_nursery = ria_nursery
# TODO: rename to `._tn` for our conventional "task-nursery"
self._da_nursery = da_nursery self._da_nursery = da_nursery
self._children: dict[ self._children: dict[
tuple[str, str], tuple[str, str],
tuple[ tuple[
@ -101,13 +98,15 @@ class ActorNursery:
Portal | None, Portal | None,
] ]
] = {} ] = {}
# portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set()
self.cancelled: bool = False self.cancelled: bool = False
self._join_procs = trio.Event() self._join_procs = trio.Event()
self._at_least_one_child_in_debug: bool = False self._at_least_one_child_in_debug: bool = False
self.errors = errors self.errors = errors
self._scope_error: BaseException|None = None
self.exited = trio.Event() self.exited = trio.Event()
self._scope_error: BaseException|None = None
# NOTE: when no explicit call is made to # NOTE: when no explicit call is made to
# `.open_root_actor()` by application code, # `.open_root_actor()` by application code,
@ -117,13 +116,6 @@ class ActorNursery:
# and syncing purposes to any actor opened nurseries. # and syncing purposes to any actor opened nurseries.
self._implicit_runtime_started: bool = False self._implicit_runtime_started: bool = False
# TODO: remove the `.run_in_actor()` API and thus this 2ndary
# nursery when that API get's moved outside this primitive!
self._ria_nursery = ria_nursery
# portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set()
async def start_actor( async def start_actor(
self, self,
name: str, name: str,
@ -134,14 +126,10 @@ class ActorNursery:
rpc_module_paths: list[str]|None = None, rpc_module_paths: list[str]|None = None,
enable_modules: list[str]|None = None, enable_modules: list[str]|None = None,
loglevel: str|None = None, # set log level per subactor loglevel: str|None = None, # set log level per subactor
nursery: trio.Nursery|None = None,
debug_mode: bool|None = None, debug_mode: bool|None = None,
infect_asyncio: bool = False, infect_asyncio: bool = False,
# TODO: ideally we can rm this once we no longer have
# a `._ria_nursery` since the dependent APIs have been
# removed!
nursery: trio.Nursery|None = None,
) -> Portal: ) -> Portal:
''' '''
Start a (daemon) actor: an process that has no designated Start a (daemon) actor: an process that has no designated
@ -212,7 +200,6 @@ class ActorNursery:
# |_ dynamic @context decoration on child side # |_ dynamic @context decoration on child side
# |_ implicit `Portal.open_context() as (ctx, first):` # |_ implicit `Portal.open_context() as (ctx, first):`
# and `return first` on parent side. # and `return first` on parent side.
# |_ mention how it's similar to `trio-parallel` API?
# -[ ] use @api_frame on the wrapper # -[ ] use @api_frame on the wrapper
async def run_in_actor( async def run_in_actor(
self, self,
@ -282,14 +269,11 @@ class ActorNursery:
) -> None: ) -> None:
''' '''
Cancel this actor-nursery by instructing each subactor's Cancel this nursery by instructing each subactor to cancel
runtime to cancel and wait for all underlying sub-processes itself and wait for all subactors to terminate.
to terminate.
If `hard_kill` is set then kill the processes directly using If ``hard_killl`` is set to ``True`` then kill the processes
the spawning-backend's API/OS-machinery without any attempt directly without any far end graceful ``trio`` cancellation.
at (graceful) `trio`-style cancellation using our
`Actor.cancel()`.
''' '''
__runtimeframe__: int = 1 # noqa __runtimeframe__: int = 1 # noqa
@ -645,12 +629,8 @@ async def open_nursery(
f'|_{an}\n' f'|_{an}\n'
) )
# shutdown runtime if it was started
if implicit_runtime: if implicit_runtime:
# shutdown runtime if it was started and report noisly
# that we're did so.
msg += '=> Shutting down actor runtime <=\n' msg += '=> Shutting down actor runtime <=\n'
log.info(msg)
else: log.info(msg)
# keep noise low during std operation.
log.runtime(msg)

View File

@ -29,7 +29,6 @@ from ._debug import (
shield_sigint_handler as shield_sigint_handler, shield_sigint_handler as shield_sigint_handler,
open_crash_handler as open_crash_handler, open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler, maybe_open_crash_handler as maybe_open_crash_handler,
maybe_init_greenback as maybe_init_greenback,
post_mortem as post_mortem, post_mortem as post_mortem,
mk_pdb as mk_pdb, mk_pdb as mk_pdb,
) )

View File

@ -69,7 +69,6 @@ from trio import (
import tractor import tractor
from tractor.log import get_logger from tractor.log import get_logger
from tractor._context import Context from tractor._context import Context
from tractor import _state
from tractor._state import ( from tractor._state import (
current_actor, current_actor,
is_root_process, is_root_process,
@ -88,6 +87,9 @@ if TYPE_CHECKING:
from tractor._runtime import ( from tractor._runtime import (
Actor, Actor,
) )
from tractor.msg import (
_codec,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -1597,16 +1599,12 @@ async def _pause(
try: try:
task: Task = current_task() task: Task = current_task()
except RuntimeError as rte: except RuntimeError as rte:
__tracebackhide__: bool = False log.exception('Failed to get current task?')
log.exception( if actor.is_infected_aio():
'Failed to get current `trio`-task?' raise RuntimeError(
) '`tractor.pause[_from_sync]()` not yet supported '
# if actor.is_infected_aio(): 'for infected `asyncio` mode!'
# mk_pdb().set_trace() ) from rte
# raise RuntimeError(
# '`tractor.pause[_from_sync]()` not yet supported '
# 'directly (infected) `asyncio` tasks!'
# ) from rte
raise raise
@ -2165,22 +2163,22 @@ def maybe_import_greenback(
return False return False
async def maybe_init_greenback(**kwargs) -> None|ModuleType: async def maybe_init_greenback(
try: **kwargs,
if mod := maybe_import_greenback(**kwargs): ) -> None|ModuleType:
await mod.ensure_portal()
log.devx( if mod := maybe_import_greenback(**kwargs):
'`greenback` portal opened!\n' await mod.ensure_portal()
'Sync debug support activated!\n' log.devx(
) '`greenback` portal opened!\n'
return mod 'Sync debug support activated!\n'
except BaseException: )
log.exception('Failed to init `greenback`..') return mod
raise
return None return None
async def _pause_from_bg_root_thread( async def _pause_from_bg_root_thread(
behalf_of_thread: Thread, behalf_of_thread: Thread,
repl: PdbREPL, repl: PdbREPL,
@ -2326,12 +2324,6 @@ def pause_from_sync(
# TODO: once supported, remove this AND the one # TODO: once supported, remove this AND the one
# inside `._pause()`! # inside `._pause()`!
# outstanding impl fixes:
# -[ ] need to make `.shield_sigint()` below work here!
# -[ ] how to handle `asyncio`'s new SIGINT-handler
# injection?
# -[ ] should `breakpoint()` work and what does it normally
# do in `asyncio` ctxs?
if actor.is_infected_aio(): if actor.is_infected_aio():
raise RuntimeError( raise RuntimeError(
'`tractor.pause[_from_sync]()` not yet supported ' '`tractor.pause[_from_sync]()` not yet supported '
@ -2407,37 +2399,18 @@ def pause_from_sync(
else: # we are presumably the `trio.run()` + main thread else: # we are presumably the `trio.run()` + main thread
# raises on not-found by default # raises on not-found by default
greenback: ModuleType = maybe_import_greenback() greenback: ModuleType = maybe_import_greenback()
# TODO: how to ensure this is either dynamically (if
# needed) called here (in some bg tn??) or that the
# subactor always already called it?
# greenback: ModuleType = await maybe_init_greenback()
message += f'-> imported {greenback}\n' message += f'-> imported {greenback}\n'
repl_owner: Task = current_task() repl_owner: Task = current_task()
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n' message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
try: out = greenback.await_(
out = greenback.await_( _pause(
_pause( debug_func=None,
debug_func=None, repl=repl,
repl=repl, hide_tb=hide_tb,
hide_tb=hide_tb, called_from_sync=True,
called_from_sync=True, **_pause_kwargs,
**_pause_kwargs,
)
) )
except RuntimeError as rte: )
if not _state._runtime_vars.get(
'use_greenback',
False,
):
raise RuntimeError(
'`greenback` was never initialized in this actor!?\n\n'
f'{_state._runtime_vars}\n'
) from rte
raise
if out: if out:
bg_task, repl = out bg_task, repl = out
assert repl is repl assert repl is repl
@ -2828,10 +2801,10 @@ def open_crash_handler(
`trio.run()`. `trio.run()`.
''' '''
err: BaseException
try: try:
yield yield
except tuple(catch) as err: except tuple(catch) as err:
if type(err) not in ignore: if type(err) not in ignore:
pdbp.xpm() pdbp.xpm()

View File

@ -558,8 +558,6 @@ def run_as_asyncio_guest(
# normally `Actor._async_main()` as is passed by some boostrap # normally `Actor._async_main()` as is passed by some boostrap
# entrypoint like `._entry._trio_main()`. # entrypoint like `._entry._trio_main()`.
_sigint_loop_pump_delay: float = 0,
) -> None: ) -> None:
# ^-TODO-^ technically whatever `trio_main` returns.. we should # ^-TODO-^ technically whatever `trio_main` returns.. we should
# try to use func-typevar-params at leaast by 3.13! # try to use func-typevar-params at leaast by 3.13!
@ -600,7 +598,7 @@ def run_as_asyncio_guest(
''' '''
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
trio_done_fute = asyncio.Future() trio_done_fut = asyncio.Future()
startup_msg: str = ( startup_msg: str = (
'Starting `asyncio` guest-loop-run\n' 'Starting `asyncio` guest-loop-run\n'
'-> got running loop\n' '-> got running loop\n'
@ -635,13 +633,13 @@ def run_as_asyncio_guest(
f'{error}\n\n' f'{error}\n\n'
f'{tb_str}\n' f'{tb_str}\n'
) )
trio_done_fute.set_exception(error) trio_done_fut.set_exception(error)
# raise inline # raise inline
main_outcome.unwrap() main_outcome.unwrap()
else: else:
trio_done_fute.set_result(main_outcome) trio_done_fut.set_result(main_outcome)
startup_msg += ( startup_msg += (
f'-> created {trio_done_callback!r}\n' f'-> created {trio_done_callback!r}\n'
@ -662,7 +660,7 @@ def run_as_asyncio_guest(
) )
fute_err: BaseException|None = None fute_err: BaseException|None = None
try: try:
out: Outcome = await asyncio.shield(trio_done_fute) out: Outcome = await asyncio.shield(trio_done_fut)
# NOTE will raise (via `Error.unwrap()`) from any # NOTE will raise (via `Error.unwrap()`) from any
# exception packed into the guest-run's `main_outcome`. # exception packed into the guest-run's `main_outcome`.
@ -699,75 +697,83 @@ def run_as_asyncio_guest(
f' |_{actor}.cancel_soon()\n' f' |_{actor}.cancel_soon()\n'
) )
# XXX WARNING XXX the next LOCs are super important, since # TODO: reduce this comment bloc since abandon issues are
# without them, we can get guest-run abandonment cases # now solved?
# where `asyncio` will not schedule or wait on the `trio`
# guest-run task before final shutdown! This is
# particularly true if the `trio` side has tasks doing
# shielded work when a SIGINT condition occurs.
# #
# We now have the # XXX NOTE XXX the next LOC is super important!!!
# `test_infected_asyncio.test_sigint_closes_lifetime_stack()` # => without it, we can get a guest-run abandonment case
# suite to ensure we do not suffer this issues # where asyncio will not trigger `trio` in a final event
# (hopefully) ever again. # loop cycle!
# #
# The original abandonment issue surfaced as 2 different # our test,
# race-condition dependent types scenarios all to do with # `test_infected_asyncio.test_sigint_closes_lifetime_stack()`
# `asyncio` handling SIGINT from the system: # demonstrates how if when we raise a SIGINT-signal in an infected
# child we get a variable race condition outcome where
# either of the following can indeterminately happen,
# #
# - "silent-abandon" (WORST CASE): # - "silent-abandon": `asyncio` abandons the `trio`
# `asyncio` abandons the `trio` guest-run task silently # guest-run task silently and no `trio`-guest-run or
# and no `trio`-guest-run or `tractor`-actor-runtime # `tractor`-actor-runtime teardown happens whatsoever..
# teardown happens whatsoever.. # this is the WORST (race) case outcome.
# #
# - "loud-abandon" (BEST-ish CASE): # - OR, "loud-abandon": the guest run get's abaondoned "loudly" with
# the guest run get's abaondoned "loudly" with `trio` # `trio` reporting a console traceback and further tbs of all
# reporting a console traceback and further tbs of all # the failed shutdown routines also show on console..
# the (failed) GC-triggered shutdown routines which
# thankfully does get dumped to console..
# #
# The abandonment is most easily reproduced if the `trio` # our test can thus fail and (has been parametrized for)
# side has tasks doing shielded work where those tasks # the 2 cases:
# ignore the normal `Cancelled` condition and continue to
# run, but obviously `asyncio` isn't aware of this and at
# some point bails on the guest-run unless we take manual
# intervention..
# #
# To repeat, *WITHOUT THIS* stuff below the guest-run can # - when the parent raises a KBI just after
# get race-conditionally abandoned!! # signalling the child,
# |_silent-abandon => the `Actor.lifetime_stack` will
# never be closed thus leaking a resource!
# -> FAIL!
# |_loud-abandon => despite the abandonment at least the
# stack will be closed out..
# -> PASS
# #
# XXX SOLUTION XXX # - when the parent instead simply waits on `ctx.wait_for_result()`
# ------ - ------ # (i.e. DOES not raise a KBI itself),
# XXX FIRST PART: # |_silent-abandon => test will just hang and thus the ctx
# ------ - ------ # and actor will never be closed/cancelled/shutdown
# the obvious fix to the "silent-abandon" case is to # resulting in leaking a (file) resource since the
# explicitly cancel the actor runtime such that no # `trio`/`tractor` runtime never relays a ctxc back to
# runtime tasks are even left unaware that the guest-run # the parent; the test's timeout will trigger..
# should be terminated due to OS cancellation. # -> FAIL!
# |_loud-abandon => this case seems to never happen??
# #
# XXX FIRST PART XXX, SO, this is a fix to the
# "silent-abandon" case, NOT the `trio`-guest-run
# abandonment issue in general, for which the NEXT LOC
# is apparently a working fix!
actor.cancel_soon() actor.cancel_soon()
# ------ - ------ # XXX NOTE XXX pump the `asyncio` event-loop to allow
# XXX SECOND PART:
# ------ - ------
# Pump the `asyncio` event-loop to allow
# `trio`-side to `trio`-guest-run to complete and # `trio`-side to `trio`-guest-run to complete and
# teardown !! # teardown !!
# #
# oh `asyncio`, how i don't miss you at all XD # *WITHOUT THIS* the guest-run can get race-conditionally abandoned!!
while not trio_done_fute.done(): # XD
#
await asyncio.sleep(.1) # `delay` can't be 0 either XD
while not trio_done_fut.done():
log.runtime( log.runtime(
'Waiting on main guest-run `asyncio` task to complete..\n' 'Waiting on main guest-run `asyncio` task to complete..\n'
f'|_trio_done_fut: {trio_done_fute}\n' f'|_trio_done_fut: {trio_done_fut}\n'
) )
await asyncio.sleep(_sigint_loop_pump_delay) await asyncio.sleep(.1)
# XXX is there any alt API/approach like the internal # XXX: don't actually need the shield.. seems to
# call below but that doesn't block indefinitely..? # make no difference (??) and we know it spawns an
# internal task..
# await asyncio.shield(asyncio.sleep(.1))
# XXX alt approach but can block indefinitely..
# so don't use?
# loop._run_once() # loop._run_once()
try: try:
return trio_done_fute.result() return trio_done_fut.result()
except asyncio.exceptions.InvalidStateError as state_err: except asyncio.exceptions.InvalidStateError as state_err:
# XXX be super dupere noisy about abandonment issues! # XXX be super dupere noisy about abandonment issues!