Compare commits

..

No commits in common. "4fbd469c332d50f923859caad2cdd74c98913469" and "9133f42b07e69cbae2d8c5da077397b946cf4525" have entirely different histories.

12 changed files with 224 additions and 543 deletions

View File

@ -285,14 +285,14 @@ def test_basic_payload_spec(
if invalid_started:
msg_type_str: str = 'Started'
bad_value: int = 10
bad_value_str: str = '10'
elif invalid_return:
msg_type_str: str = 'Return'
bad_value: str = 'yo'
bad_value_str: str = "'yo'"
else:
# XXX but should never be used below then..
msg_type_str: str = ''
bad_value: str = ''
bad_value_str: str = ''
maybe_mte: MsgTypeError|None = None
should_raise: Exception|None = (
@ -307,10 +307,8 @@ def test_basic_payload_spec(
raises=should_raise,
ensure_in_message=[
f"invalid `{msg_type_str}` msg payload",
f'{bad_value}',
f'has type {type(bad_value)!r}',
'not match type-spec',
f'`{msg_type_str}.pld: PldMsg|NoneType`',
f"value: `{bad_value_str}` does not "
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
],
# only for debug
# post_mortem=True,

View File

@ -38,7 +38,6 @@ from collections import deque
from contextlib import (
asynccontextmanager as acm,
)
from contextvars import Token
from dataclasses import (
dataclass,
field,
@ -122,19 +121,10 @@ class Unresolved:
@dataclass
class Context:
'''
An inter-actor, SC transitive, `trio.Task` (pair)
communication context.
An inter-actor, SC transitive, `Task` communication context.
(We've also considered other names and ideas:
- "communicating tasks scope": cts
- "distributed task scope": dts
- "communicating tasks context": ctc
**Got a better idea for naming? Make an issue dawg!**
)
NB: This class should **never be instatiated directly**, it is
allocated by the runtime in 2 ways:
NB: This class should **never be instatiated directly**, it is allocated
by the runtime in 2 ways:
- by entering `Portal.open_context()` which is the primary
public API for any "parent" task or,
- by the RPC machinery's `._rpc._invoke()` as a `ctx` arg
@ -220,16 +210,6 @@ class Context:
# more the the `Context` is needed?
_portal: Portal | None = None
@property
def portal(self) -> Portal|None:
'''
Return any wrapping memory-`Portal` if this is
a 'parent'-side task which called `Portal.open_context()`,
otherwise `None`.
'''
return self._portal
# NOTE: each side of the context has its own cancel scope
# which is exactly the primitive that allows for
# cross-actor-task-supervision and thus SC.
@ -319,8 +299,6 @@ class Context:
# boxed exception. NOW, it's used for spawning overrun queuing
# tasks when `.allow_overruns == True` !!!
_scope_nursery: trio.Nursery|None = None
# ^-TODO-^ change name?
# -> `._scope_tn` "scope task nursery"
# streaming overrun state tracking
_in_overrun: bool = False
@ -430,23 +408,10 @@ class Context:
'''
return self._cancel_called
@cancel_called.setter
def cancel_called(self, val: bool) -> None:
'''
Set the self-cancelled request `bool` value.
'''
# to debug who frickin sets it..
# if val:
# from .devx import pause_from_sync
# pause_from_sync()
self._cancel_called = val
@property
def canceller(self) -> tuple[str, str]|None:
'''
`Actor.uid: tuple[str, str]` of the (remote)
``Actor.uid: tuple[str, str]`` of the (remote)
actor-process who's task was cancelled thus causing this
(side of the) context to also be cancelled.
@ -550,7 +515,7 @@ class Context:
# the local scope was never cancelled
# and instead likely we received a remote side
# # cancellation that was raised inside `.wait_for_result()`
# # cancellation that was raised inside `.result()`
# or (
# (se := self._local_error)
# and se is re
@ -620,10 +585,6 @@ class Context:
self,
error: BaseException,
# TODO: manual toggle for cases where we wouldn't normally
# mark ourselves cancelled but want to?
# set_cancel_called: bool = False,
) -> None:
'''
(Maybe) cancel this local scope due to a received remote
@ -642,7 +603,7 @@ class Context:
- `Portal.open_context()`
- `Portal.result()`
- `Context.open_stream()`
- `Context.wait_for_result()`
- `Context.result()`
when called/closed by actor local task(s).
@ -768,7 +729,7 @@ class Context:
# Cancel the local `._scope`, catch that
# `._scope.cancelled_caught` and re-raise any remote error
# once exiting (or manually calling `.wait_for_result()`) the
# once exiting (or manually calling `.result()`) the
# `.open_context()` block.
cs: trio.CancelScope = self._scope
if (
@ -803,9 +764,8 @@ class Context:
# `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368
message: str = 'Cancelling `Context._scope` !\n\n'
# from .devx import pause_from_sync
# pause_from_sync()
self._scope.cancel()
else:
message: str = 'NOT cancelling `Context._scope` !\n\n'
# from .devx import mk_pdb
@ -885,15 +845,15 @@ class Context:
@property
def repr_api(self) -> str:
return 'Portal.open_context()'
# TODO: use `.dev._frame_stack` scanning to find caller!
# ci: CallerInfo|None = self._caller_info
# if ci:
# return (
# f'{ci.api_nsp}()\n'
# )
# TODO: use `.dev._frame_stack` scanning to find caller!
return 'Portal.open_context()'
async def cancel(
self,
timeout: float = 0.616,
@ -929,8 +889,7 @@ class Context:
'''
side: str = self.side
# XXX for debug via the `@.setter`
self.cancel_called = True
self._cancel_called: bool = True
header: str = (
f'Cancelling ctx with peer from {side.upper()} side\n\n'
@ -953,7 +912,7 @@ class Context:
# `._scope.cancel()` since we expect the eventual
# `ContextCancelled` from the other side to trigger this
# when the runtime finally receives it during teardown
# (normally in `.wait_for_result()` called from
# (normally in `.result()` called from
# `Portal.open_context().__aexit__()`)
if side == 'parent':
if not self._portal:
@ -1066,10 +1025,10 @@ class Context:
'''
__tracebackhide__: bool = hide_tb
peer_uid: tuple = self.chan.uid
our_uid: tuple = self.chan.uid
# XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
# for "graceful cancellation" case(s):
# for "graceful cancellation" case:
#
# Whenever a "side" of a context (a `Task` running in
# an actor) **is** the side which requested ctx
@ -1086,11 +1045,9 @@ class Context:
# set to the `Actor.uid` of THIS task (i.e. the
# cancellation requesting task's actor is the actor
# checking whether it should absorb the ctxc).
self_ctxc: bool = self._is_self_cancelled(remote_error)
if (
self_ctxc
and
not raise_ctxc_from_self_call
and self._is_self_cancelled(remote_error)
# TODO: ?potentially it is useful to emit certain
# warning/cancel logs for the cases where the
@ -1120,8 +1077,8 @@ class Context:
and isinstance(remote_error, RemoteActorError)
and remote_error.boxed_type is StreamOverrun
# and tuple(remote_error.msgdata['sender']) == peer_uid
and tuple(remote_error.sender) == peer_uid
# and tuple(remote_error.msgdata['sender']) == our_uid
and tuple(remote_error.sender) == our_uid
):
# NOTE: we set the local scope error to any "self
# cancellation" error-response thus "absorbing"
@ -1183,9 +1140,9 @@ class Context:
of the remote cancellation.
'''
__tracebackhide__: bool = hide_tb
__tracebackhide__ = hide_tb
assert self._portal, (
'`Context.wait_for_result()` can not be called from callee side!'
"Context.result() can not be called from callee side!"
)
if self._final_result_is_set():
return self._result
@ -1240,11 +1197,10 @@ class Context:
# raising something we know might happen
# during cancellation ;)
(not self._cancel_called)
),
hide_tb=hide_tb,
)
)
# TODO: eventually make `.outcome: Outcome` and thus return
# `self.outcome.unwrap()` here?
# `self.outcome.unwrap()` here!
return self.outcome
# TODO: switch this with above!
@ -1328,24 +1284,17 @@ class Context:
Any|
RemoteActorError|
ContextCancelled
# TODO: make this a `outcome.Outcome`!
):
'''
Return the "final outcome" (state) of the far end peer task
non-blocking. If the remote task has not completed then this
field always resolves to the module defined `Unresolved`
handle.
The final "outcome" from an IPC context which can either be
some Value returned from the target `@context`-decorated
remote task-as-func, or an `Error` wrapping an exception
raised from an RPC task fault or cancellation.
------ - ------
TODO->( this is doc-driven-dev content not yet actual ;P )
Note that if the remote task has not terminated then this
field always resolves to the module defined `Unresolved` handle.
The final "outcome" from an IPC context which can be any of:
- some `outcome.Value` which boxes the returned output from the peer task's
`@context`-decorated remote task-as-func, or
- an `outcome.Error` wrapping an exception raised that same RPC task
after a fault or cancellation, or
- an unresolved `outcome.Outcome` when the peer task is still
executing and has not yet completed.
TODO: implement this using `outcome.Outcome` types?
'''
return (
@ -1634,7 +1583,7 @@ class Context:
- NEVER `return` early before delivering the msg!
bc if the error is a ctxc and there is a task waiting on
`.wait_for_result()` we need the msg to be
`.result()` we need the msg to be
`send_chan.send_nowait()`-ed over the `._rx_chan` so
that the error is relayed to that waiter task and thus
raised in user code!
@ -1879,7 +1828,7 @@ async def open_context_from_portal(
When the "callee" (side that is "called"/started by a call
to *this* method) returns, the caller side (this) unblocks
and any final value delivered from the other end can be
retrieved using the `Contex.wait_for_result()` api.
retrieved using the `Contex.result()` api.
The yielded ``Context`` instance further allows for opening
bidirectional streams, explicit cancellation and
@ -1944,7 +1893,7 @@ async def open_context_from_portal(
)
assert ctx._remote_func_type == 'context'
assert ctx._caller_info
prior_ctx_tok: Token = _ctxvar_Context.set(ctx)
_ctxvar_Context.set(ctx)
# placeholder for any exception raised in the runtime
# or by user tasks which cause this context's closure.
@ -2016,14 +1965,14 @@ async def open_context_from_portal(
yield ctx, first
# ??TODO??: do we still want to consider this or is
# the `else:` block handling via a `.wait_for_result()`
# the `else:` block handling via a `.result()`
# call below enough??
#
# -[ ] pretty sure `.wait_for_result()` internals do the
# -[ ] pretty sure `.result()` internals do the
# same as our ctxc handler below so it ended up
# being same (repeated?) behaviour, but ideally we
# wouldn't have that duplication either by somehow
# factoring the `.wait_for_result()` handler impl in a way
# factoring the `.result()` handler impl in a way
# that we can re-use it around the `yield` ^ here
# or vice versa?
#
@ -2161,7 +2110,7 @@ async def open_context_from_portal(
# AND a group-exc is only raised if there was > 1
# tasks started *here* in the "caller" / opener
# block. If any one of those tasks calls
# `.wait_for_result()` or `MsgStream.receive()`
# `.result()` or `MsgStream.receive()`
# `._maybe_raise_remote_err()` will be transitively
# called and the remote error raised causing all
# tasks to be cancelled.
@ -2231,7 +2180,7 @@ async def open_context_from_portal(
f'|_{ctx._task}\n'
)
# XXX NOTE XXX: the below call to
# `Context.wait_for_result()` will ALWAYS raise
# `Context.result()` will ALWAYS raise
# a `ContextCancelled` (via an embedded call to
# `Context._maybe_raise_remote_err()`) IFF
# a `Context._remote_error` was set by the runtime
@ -2241,10 +2190,10 @@ async def open_context_from_portal(
# ALWAYS SET any time "callee" side fails and causes "caller
# side" cancellation via a `ContextCancelled` here.
try:
result_or_err: Exception|Any = await ctx.wait_for_result()
result_or_err: Exception|Any = await ctx.result()
except BaseException as berr:
# on normal teardown, if we get some error
# raised in `Context.wait_for_result()` we still want to
# raised in `Context.result()` we still want to
# save that error on the ctx's state to
# determine things like `.cancelled_caught` for
# cases where there was remote cancellation but
@ -2395,9 +2344,6 @@ async def open_context_from_portal(
None,
)
# XXX revert to prior IPC-task-ctx scope
_ctxvar_Context.reset(prior_ctx_tok)
def mk_context(
chan: Channel,

View File

@ -20,8 +20,7 @@ Sub-process entry points.
"""
from __future__ import annotations
from functools import partial
import os
import textwrap
# import textwrap
from typing import (
Any,
TYPE_CHECKING,
@ -59,7 +58,7 @@ def _mp_main(
) -> None:
'''
The routine called *after fork* which invokes a fresh `trio.run()`
The routine called *after fork* which invokes a fresh ``trio.run``
'''
actor._forkserver_info = forkserver_info
@ -97,107 +96,6 @@ def _mp_main(
log.info(f"Subactor {actor.uid} terminated")
# TODO: move this func to some kinda `.devx._conc_lang.py` eventually
# as we work out our multi-domain state-flow-syntax!
def nest_from_op(
input_op: str,
#
# ?TODO? an idea for a syntax to the state of concurrent systems
# as a "3-domain" (execution, scope, storage) model and using
# a minimal ascii/utf-8 operator-set.
#
# try not to take any of this seriously yet XD
#
# > is a "play operator" indicating (CPU bound)
# exec/work/ops required at the "lowest level computing"
#
# execution primititves (tasks, threads, actors..) denote their
# lifetime with '(' and ')' since parentheses normally are used
# in many langs to denote function calls.
#
# starting = (
# >( opening/starting; beginning of the thread-of-exec (toe?)
# (> opened/started, (finished spawning toe)
# |_<Task: blah blah..> repr of toe, in py these look like <objs>
#
# >) closing/exiting/stopping,
# )> closed/exited/stopped,
# |_<Task: blah blah..>
# [OR <), )< ?? ]
#
# ending = )
# >c) cancelling to close/exit
# c)> cancelled (caused close), OR?
# |_<Actor: ..>
# OR maybe "<c)" which better indicates the cancel being
# "delivered/returned" / returned" to LHS?
#
# >x) erroring to eventuall exit
# x)> errored and terminated
# |_<Actor: ...>
#
# scopes: supers/nurseries, IPC-ctxs, sessions, perms, etc.
# >{ opening
# {> opened
# }> closed
# >} closing
#
# storage: like queues, shm-buffers, files, etc..
# >[ opening
# [> opened
# |_<FileObj: ..>
#
# >] closing
# ]> closed
# IPC ops: channels, transports, msging
# => req msg
# <= resp msg
# <=> 2-way streaming (of msgs)
# <- recv 1 msg
# -> send 1 msg
#
# TODO: still not sure on R/L-HS approach..?
# =>( send-req to exec start (task, actor, thread..)
# (<= recv-req to ^
#
# (<= recv-req ^
# <=( recv-resp opened remote exec primitive
# <=) recv-resp closed
#
# )<=c req to stop due to cancel
# c=>) req to stop due to cancel
#
# =>{ recv-req to open
# <={ send-status that it closed
tree_str: str,
# NOTE: so move back-from-the-left of the `input_op` by
# this amount.
back_from_op: int = 0,
) -> str:
'''
Depth-increment the input (presumably hierarchy/supervision)
input "tree string" below the provided `input_op` execution
operator, so injecting a `"\n|_{input_op}\n"`and indenting the
`tree_str` to nest content aligned with the ops last char.
'''
return (
f'{input_op}\n'
+
textwrap.indent(
tree_str,
prefix=(
len(input_op)
-
(back_from_op + 1)
) * ' ',
)
)
def _trio_main(
actor: Actor,
*,
@ -209,6 +107,7 @@ def _trio_main(
Entry point for a `trio_run_in_process` subactor.
'''
# __tracebackhide__: bool = True
_debug.hide_runtime_frames()
_state._current_actor = actor
@ -220,6 +119,7 @@ def _trio_main(
if actor.loglevel is not None:
get_console_log(actor.loglevel)
import os
actor_info: str = (
f'|_{actor}\n'
f' uid: {actor.uid}\n'
@ -228,23 +128,13 @@ def _trio_main(
f' loglevel: {actor.loglevel}\n'
)
log.info(
'Starting new `trio` subactor:\n'
'Started new trio subactor:\n'
+
nest_from_op(
input_op='>(', # see syntax ideas above
tree_str=actor_info,
back_from_op=1,
)
'>\n' # like a "started/play"-icon from super perspective
+
actor_info,
)
logmeth = log.info
exit_status: str = (
'Subactor exited\n'
+
nest_from_op(
input_op=')>', # like a "closed-to-play"-icon from super perspective
tree_str=actor_info,
)
)
try:
if infect_asyncio:
actor._infected_aio = True
@ -253,28 +143,16 @@ def _trio_main(
trio.run(trio_main)
except KeyboardInterrupt:
logmeth = log.cancel
exit_status: str = (
'Actor received KBI (aka an OS-cancel)\n'
log.cancel(
'Actor received KBI\n'
+
nest_from_op(
input_op='c)>', # closed due to cancel (see above)
tree_str=actor_info,
)
actor_info
)
except BaseException as err:
logmeth = log.error
exit_status: str = (
'Main actor task crashed during exit?\n'
+
nest_from_op(
input_op='x)>', # closed by error
tree_str=actor_info,
)
)
# NOTE since we raise a tb will already be shown on the
# console, thus we do NOT use `.exception()` above.
raise err
finally:
logmeth(exit_status)
log.info(
'Subactor terminated\n'
+
'x\n' # like a "crossed-out/killed" from super perspective
+
actor_info
)

View File

@ -121,8 +121,7 @@ class Portal:
)
return self.chan
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
# TODO: factor this out into an `ActorNursery` wrapper
async def _submit_for_result(
self,
ns: str,
@ -142,22 +141,13 @@ class Portal:
portal=self,
)
# TODO: we should deprecate this API right? since if we remove
# `.run_in_actor()` (and instead move it to a `.highlevel`
# wrapper api (around a single `.open_context()` call) we don't
# really have any notion of a "main" remote task any more?
#
# @api_frame
async def wait_for_result(
self,
hide_tb: bool = True,
) -> Any:
async def result(self) -> Any:
'''
Return the final result delivered by a `Return`-msg from the
remote peer actor's "main" task's `return` statement.
Return the result(s) from the remote actor's "main" task.
'''
__tracebackhide__: bool = hide_tb
__tracebackhide__ = True
# Check for non-rpc errors slapped on the
# channel for which we always raise
exc = self.channel._exc
@ -192,23 +182,6 @@ class Portal:
return self._final_result_pld
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
async def result(
self,
*args,
**kwargs,
) -> Any|Exception:
typname: str = type(self).__name__
log.warning(
f'`{typname}.result()` is DEPRECATED!\n'
f'Use `{typname}.wait_for_result()` instead!\n'
)
return await self.wait_for_result(
*args,
**kwargs,
)
async def _cancel_streams(self):
# terminate all locally running async generator
# IPC calls
@ -267,7 +240,6 @@ class Portal:
f'{reminfo}'
)
# XXX the one spot we set it?
self.channel._cancel_called: bool = True
try:
# send cancel cmd - might not get response
@ -307,8 +279,6 @@ class Portal:
)
return False
# TODO: do we still need this for low level `Actor`-runtime
# method calls or can we also remove it?
async def run_from_ns(
self,
namespace_path: str,
@ -346,8 +316,6 @@ class Portal:
expect_msg=Return,
)
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
async def run(
self,
func: str,
@ -402,8 +370,6 @@ class Portal:
expect_msg=Return,
)
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
@acm
async def open_stream_from(
self,

View File

@ -21,7 +21,6 @@ Root actor runtime ignition(s).
from contextlib import asynccontextmanager as acm
from functools import partial
import importlib
import inspect
import logging
import os
import signal
@ -116,16 +115,10 @@ async def open_root_actor(
if (
debug_mode
and maybe_enable_greenback
and (
maybe_mod := await _debug.maybe_init_greenback(
raise_not_found=False,
)
and await _debug.maybe_init_greenback(
raise_not_found=False,
)
):
logger.info(
f'Found `greenback` installed @ {maybe_mod}\n'
'Enabling `tractor.pause_from_sync()` support!\n'
)
os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx._debug._sync_pause_from_builtin'
)
@ -271,9 +264,7 @@ async def open_root_actor(
except OSError:
# TODO: make this a "discovery" log level?
logger.info(
f'No actor registry found @ {addr}\n'
)
logger.warning(f'No actor registry found @ {addr}')
async with trio.open_nursery() as tn:
for addr in registry_addrs:
@ -287,6 +278,7 @@ async def open_root_actor(
# Create a new local root-actor instance which IS NOT THE
# REGISTRAR
if ponged_addrs:
if ensure_registry:
raise RuntimeError(
f'Failed to open `{name}`@{ponged_addrs}: '
@ -373,25 +365,23 @@ async def open_root_actor(
)
try:
yield actor
except (
Exception,
BaseExceptionGroup,
) as err:
# XXX NOTE XXX see equiv note inside
# `._runtime.Actor._stream_handler()` where in the
# non-root or root-that-opened-this-mahually case we
# wait for the local actor-nursery to exit before
# exiting the transport channel handler.
import inspect
entered: bool = await _debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
)
if (
not entered
and
not is_multi_cancelled(err)
and not is_multi_cancelled(err)
):
logger.exception('Root actor crashed\n')
logger.exception('Root actor crashed:\n')
# ALWAYS re-raise any error bubbled up from the
# runtime!

View File

@ -1046,10 +1046,6 @@ class Actor:
# TODO: another `Struct` for rtvs..
rvs: dict[str, Any] = spawnspec._runtime_vars
if rvs['_debug_mode']:
from .devx import (
enable_stack_on_sig,
maybe_init_greenback,
)
try:
# TODO: maybe return some status msgs upward
# to that we can emit them in `con_status`
@ -1057,27 +1053,13 @@ class Actor:
log.devx(
'Enabling `stackscope` traces on SIGUSR1'
)
from .devx import enable_stack_on_sig
enable_stack_on_sig()
except ImportError:
log.warning(
'`stackscope` not installed for use in debug mode!'
)
if rvs.get('use_greenback', False):
maybe_mod: ModuleType|None = await maybe_init_greenback()
if maybe_mod:
log.devx(
'Activated `greenback` '
'for `tractor.pause_from_sync()` support!'
)
else:
rvs['use_greenback'] = False
log.warning(
'`greenback` not installed for use in debug mode!\n'
'`tractor.pause_from_sync()` not available!'
)
rvs['_is_root'] = False
_state._runtime_vars.update(rvs)
@ -1735,8 +1717,8 @@ async def async_main(
# Register with the arbiter if we're told its addr
log.runtime(
f'Registering `{actor.name}` => {pformat(accept_addrs)}\n'
# ^-TODO-^ we should instead show the maddr here^^
f'Registering `{actor.name}` ->\n'
f'{pformat(accept_addrs)}'
)
# TODO: ideally we don't fan out to all registrars
@ -1794,90 +1776,57 @@ async def async_main(
# Blocks here as expected until the root nursery is
# killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as internal_err:
except Exception as err:
log.runtime("Closing all actor lifetime contexts")
actor.lifetime_stack.close()
if not is_registered:
err_report: str = (
'\n'
"Actor runtime (internally) failed BEFORE contacting the registry?\n"
f'registrars -> {actor.reg_addrs} ?!?!\n\n'
'^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n'
'\t>> CALMLY CANCEL YOUR CHILDREN AND CALL YOUR PARENTS <<\n\n'
'\tIf this is a sub-actor hopefully its parent will keep running '
'and cancel/reap this sub-process..\n'
'(well, presuming this error was propagated upward)\n\n'
'\t---------------------------------------------\n'
'\tPLEASE REPORT THIS TRACEBACK IN A BUG REPORT @ ' # oneline
'https://github.com/goodboy/tractor/issues\n'
'\t---------------------------------------------\n'
)
# TODO: I guess we could try to connect back
# to the parent through a channel and engage a debugger
# once we have that all working with std streams locking?
log.exception(err_report)
log.exception(
f"Actor errored and failed to register with arbiter "
f"@ {actor.reg_addrs[0]}?")
log.error(
"\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n"
"\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n"
"\tIf this is a sub-actor hopefully its parent will keep running "
"correctly presuming this error was safely ignored..\n\n"
"\tPLEASE REPORT THIS TRACEBACK IN A BUG REPORT: "
"https://github.com/goodboy/tractor/issues\n"
)
if actor._parent_chan:
await try_ship_error_to_remote(
actor._parent_chan,
internal_err,
err,
)
# always!
match internal_err:
match err:
case ContextCancelled():
log.cancel(
f'Actor: {actor.uid} was task-context-cancelled with,\n'
f'str(internal_err)'
f'str(err)'
)
case _:
log.exception(
'Main actor-runtime task errored\n'
f'<x)\n'
f' |_{actor}\n'
)
raise internal_err
log.exception("Actor errored:")
raise
finally:
teardown_report: str = (
'Main actor-runtime task completed\n'
log.runtime(
'Runtime nursery complete'
'-> Closing all actor lifetime contexts..'
)
# tear down all lifetime contexts if not in guest mode
# XXX: should this just be in the entrypoint?
actor.lifetime_stack.close()
# ?TODO? should this be in `._entry`/`._root` mods instead?
#
# teardown any actor-lifetime-bound contexts
ls: ExitStack = actor.lifetime_stack
# only report if there are any registered
cbs: list[Callable] = [
repr(tup[1].__wrapped__)
for tup in ls._exit_callbacks
]
if cbs:
cbs_str: str = '\n'.join(cbs)
teardown_report += (
'-> Closing actor-lifetime-bound callbacks\n\n'
f'}}>\n'
f' |_{ls}\n'
f' |_{cbs_str}\n'
)
# XXX NOTE XXX this will cause an error which
# prevents any `infected_aio` actor from continuing
# and any callbacks in the `ls` here WILL NOT be
# called!!
# await _debug.pause(shield=True)
ls.close()
# XXX TODO but hard XXX
# we can't actually do this bc the debugger uses the
# _service_n to spawn the lock task, BUT, in theory if we had
# the root nursery surround this finally block it might be
# actually possible to debug THIS machinery in the same way
# as user task code?
#
# TODO: we can't actually do this bc the debugger
# uses the _service_n to spawn the lock task, BUT,
# in theory if we had the root nursery surround this finally
# block it might be actually possible to debug THIS
# machinery in the same way as user task code?
# if actor.name == 'brokerd.ib':
# with CancelScope(shield=True):
# await _debug.breakpoint()
@ -1907,9 +1856,9 @@ async def async_main(
failed = True
if failed:
teardown_report += (
f'-> Failed to unregister {actor.name} from '
f'registar @ {addr}\n'
log.warning(
f'Failed to unregister {actor.name} from '
f'registar @ {addr}'
)
# Ensure all peers (actors connected to us as clients) are finished
@ -1917,17 +1866,13 @@ async def async_main(
if any(
chan.connected() for chan in chain(*actor._peers.values())
):
teardown_report += (
f'-> Waiting for remaining peers {actor._peers} to clear..\n'
)
log.runtime(teardown_report)
log.runtime(
f"Waiting for remaining peers {actor._peers} to clear")
with CancelScope(shield=True):
await actor._no_more_peers.wait()
log.runtime("All peer channels are complete")
teardown_report += ('-> All peer channels are complete\n')
teardown_report += ('Actor runtime exited')
log.info(teardown_report)
log.runtime("Runtime completed")
# TODO: rename to `Registry` and move to `._discovery`!

View File

@ -149,7 +149,7 @@ async def exhaust_portal(
# XXX: streams should never be reaped here since they should
# always be established and shutdown using a context manager api
final: Any = await portal.wait_for_result()
final: Any = await portal.result()
except (
Exception,
@ -223,8 +223,8 @@ async def cancel_on_completion(
async def hard_kill(
proc: trio.Process,
terminate_after: int = 1.6,
# NOTE: for mucking with `.pause()`-ing inside the runtime
# whilst also hacking on it XD
# terminate_after: int = 99999,

View File

@ -44,7 +44,7 @@ _runtime_vars: dict[str, Any] = {
'_root_mailbox': (None, None),
'_registry_addrs': [],
# for `tractor.pause_from_sync()` & `breakpoint()` support
# for `breakpoint()` support
'use_greenback': False,
}

View File

@ -80,7 +80,6 @@ class ActorNursery:
'''
def __init__(
self,
# TODO: maybe def these as fields of a struct looking type?
actor: Actor,
ria_nursery: trio.Nursery,
da_nursery: trio.Nursery,
@ -89,10 +88,8 @@ class ActorNursery:
) -> None:
# self.supervisor = supervisor # TODO
self._actor: Actor = actor
# TODO: rename to `._tn` for our conventional "task-nursery"
self._ria_nursery = ria_nursery
self._da_nursery = da_nursery
self._children: dict[
tuple[str, str],
tuple[
@ -101,13 +98,15 @@ class ActorNursery:
Portal | None,
]
] = {}
# portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set()
self.cancelled: bool = False
self._join_procs = trio.Event()
self._at_least_one_child_in_debug: bool = False
self.errors = errors
self._scope_error: BaseException|None = None
self.exited = trio.Event()
self._scope_error: BaseException|None = None
# NOTE: when no explicit call is made to
# `.open_root_actor()` by application code,
@ -117,13 +116,6 @@ class ActorNursery:
# and syncing purposes to any actor opened nurseries.
self._implicit_runtime_started: bool = False
# TODO: remove the `.run_in_actor()` API and thus this 2ndary
# nursery when that API get's moved outside this primitive!
self._ria_nursery = ria_nursery
# portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set()
async def start_actor(
self,
name: str,
@ -134,14 +126,10 @@ class ActorNursery:
rpc_module_paths: list[str]|None = None,
enable_modules: list[str]|None = None,
loglevel: str|None = None, # set log level per subactor
nursery: trio.Nursery|None = None,
debug_mode: bool|None = None,
infect_asyncio: bool = False,
# TODO: ideally we can rm this once we no longer have
# a `._ria_nursery` since the dependent APIs have been
# removed!
nursery: trio.Nursery|None = None,
) -> Portal:
'''
Start a (daemon) actor: an process that has no designated
@ -212,7 +200,6 @@ class ActorNursery:
# |_ dynamic @context decoration on child side
# |_ implicit `Portal.open_context() as (ctx, first):`
# and `return first` on parent side.
# |_ mention how it's similar to `trio-parallel` API?
# -[ ] use @api_frame on the wrapper
async def run_in_actor(
self,
@ -282,14 +269,11 @@ class ActorNursery:
) -> None:
'''
Cancel this actor-nursery by instructing each subactor's
runtime to cancel and wait for all underlying sub-processes
to terminate.
Cancel this nursery by instructing each subactor to cancel
itself and wait for all subactors to terminate.
If `hard_kill` is set then kill the processes directly using
the spawning-backend's API/OS-machinery without any attempt
at (graceful) `trio`-style cancellation using our
`Actor.cancel()`.
If ``hard_killl`` is set to ``True`` then kill the processes
directly without any far end graceful ``trio`` cancellation.
'''
__runtimeframe__: int = 1 # noqa
@ -645,12 +629,8 @@ async def open_nursery(
f'|_{an}\n'
)
# shutdown runtime if it was started
if implicit_runtime:
# shutdown runtime if it was started and report noisly
# that we're did so.
msg += '=> Shutting down actor runtime <=\n'
log.info(msg)
else:
# keep noise low during std operation.
log.runtime(msg)
log.info(msg)

View File

@ -29,7 +29,6 @@ from ._debug import (
shield_sigint_handler as shield_sigint_handler,
open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler,
maybe_init_greenback as maybe_init_greenback,
post_mortem as post_mortem,
mk_pdb as mk_pdb,
)

View File

@ -69,7 +69,6 @@ from trio import (
import tractor
from tractor.log import get_logger
from tractor._context import Context
from tractor import _state
from tractor._state import (
current_actor,
is_root_process,
@ -88,6 +87,9 @@ if TYPE_CHECKING:
from tractor._runtime import (
Actor,
)
from tractor.msg import (
_codec,
)
log = get_logger(__name__)
@ -1597,16 +1599,12 @@ async def _pause(
try:
task: Task = current_task()
except RuntimeError as rte:
__tracebackhide__: bool = False
log.exception(
'Failed to get current `trio`-task?'
)
# if actor.is_infected_aio():
# mk_pdb().set_trace()
# raise RuntimeError(
# '`tractor.pause[_from_sync]()` not yet supported '
# 'directly (infected) `asyncio` tasks!'
# ) from rte
log.exception('Failed to get current task?')
if actor.is_infected_aio():
raise RuntimeError(
'`tractor.pause[_from_sync]()` not yet supported '
'for infected `asyncio` mode!'
) from rte
raise
@ -2165,22 +2163,22 @@ def maybe_import_greenback(
return False
async def maybe_init_greenback(**kwargs) -> None|ModuleType:
try:
if mod := maybe_import_greenback(**kwargs):
await mod.ensure_portal()
log.devx(
'`greenback` portal opened!\n'
'Sync debug support activated!\n'
)
return mod
except BaseException:
log.exception('Failed to init `greenback`..')
raise
async def maybe_init_greenback(
**kwargs,
) -> None|ModuleType:
if mod := maybe_import_greenback(**kwargs):
await mod.ensure_portal()
log.devx(
'`greenback` portal opened!\n'
'Sync debug support activated!\n'
)
return mod
return None
async def _pause_from_bg_root_thread(
behalf_of_thread: Thread,
repl: PdbREPL,
@ -2326,12 +2324,6 @@ def pause_from_sync(
# TODO: once supported, remove this AND the one
# inside `._pause()`!
# outstanding impl fixes:
# -[ ] need to make `.shield_sigint()` below work here!
# -[ ] how to handle `asyncio`'s new SIGINT-handler
# injection?
# -[ ] should `breakpoint()` work and what does it normally
# do in `asyncio` ctxs?
if actor.is_infected_aio():
raise RuntimeError(
'`tractor.pause[_from_sync]()` not yet supported '
@ -2407,37 +2399,18 @@ def pause_from_sync(
else: # we are presumably the `trio.run()` + main thread
# raises on not-found by default
greenback: ModuleType = maybe_import_greenback()
# TODO: how to ensure this is either dynamically (if
# needed) called here (in some bg tn??) or that the
# subactor always already called it?
# greenback: ModuleType = await maybe_init_greenback()
message += f'-> imported {greenback}\n'
repl_owner: Task = current_task()
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
try:
out = greenback.await_(
_pause(
debug_func=None,
repl=repl,
hide_tb=hide_tb,
called_from_sync=True,
**_pause_kwargs,
)
out = greenback.await_(
_pause(
debug_func=None,
repl=repl,
hide_tb=hide_tb,
called_from_sync=True,
**_pause_kwargs,
)
except RuntimeError as rte:
if not _state._runtime_vars.get(
'use_greenback',
False,
):
raise RuntimeError(
'`greenback` was never initialized in this actor!?\n\n'
f'{_state._runtime_vars}\n'
) from rte
raise
)
if out:
bg_task, repl = out
assert repl is repl
@ -2828,10 +2801,10 @@ def open_crash_handler(
`trio.run()`.
'''
err: BaseException
try:
yield
except tuple(catch) as err:
if type(err) not in ignore:
pdbp.xpm()

View File

@ -558,8 +558,6 @@ def run_as_asyncio_guest(
# normally `Actor._async_main()` as is passed by some boostrap
# entrypoint like `._entry._trio_main()`.
_sigint_loop_pump_delay: float = 0,
) -> None:
# ^-TODO-^ technically whatever `trio_main` returns.. we should
# try to use func-typevar-params at leaast by 3.13!
@ -600,7 +598,7 @@ def run_as_asyncio_guest(
'''
loop = asyncio.get_running_loop()
trio_done_fute = asyncio.Future()
trio_done_fut = asyncio.Future()
startup_msg: str = (
'Starting `asyncio` guest-loop-run\n'
'-> got running loop\n'
@ -635,13 +633,13 @@ def run_as_asyncio_guest(
f'{error}\n\n'
f'{tb_str}\n'
)
trio_done_fute.set_exception(error)
trio_done_fut.set_exception(error)
# raise inline
main_outcome.unwrap()
else:
trio_done_fute.set_result(main_outcome)
trio_done_fut.set_result(main_outcome)
startup_msg += (
f'-> created {trio_done_callback!r}\n'
@ -662,7 +660,7 @@ def run_as_asyncio_guest(
)
fute_err: BaseException|None = None
try:
out: Outcome = await asyncio.shield(trio_done_fute)
out: Outcome = await asyncio.shield(trio_done_fut)
# NOTE will raise (via `Error.unwrap()`) from any
# exception packed into the guest-run's `main_outcome`.
@ -699,75 +697,83 @@ def run_as_asyncio_guest(
f' |_{actor}.cancel_soon()\n'
)
# XXX WARNING XXX the next LOCs are super important, since
# without them, we can get guest-run abandonment cases
# where `asyncio` will not schedule or wait on the `trio`
# guest-run task before final shutdown! This is
# particularly true if the `trio` side has tasks doing
# shielded work when a SIGINT condition occurs.
# TODO: reduce this comment bloc since abandon issues are
# now solved?
#
# We now have the
# `test_infected_asyncio.test_sigint_closes_lifetime_stack()`
# suite to ensure we do not suffer this issues
# (hopefully) ever again.
# XXX NOTE XXX the next LOC is super important!!!
# => without it, we can get a guest-run abandonment case
# where asyncio will not trigger `trio` in a final event
# loop cycle!
#
# The original abandonment issue surfaced as 2 different
# race-condition dependent types scenarios all to do with
# `asyncio` handling SIGINT from the system:
# our test,
# `test_infected_asyncio.test_sigint_closes_lifetime_stack()`
# demonstrates how if when we raise a SIGINT-signal in an infected
# child we get a variable race condition outcome where
# either of the following can indeterminately happen,
#
# - "silent-abandon" (WORST CASE):
# `asyncio` abandons the `trio` guest-run task silently
# and no `trio`-guest-run or `tractor`-actor-runtime
# teardown happens whatsoever..
# - "silent-abandon": `asyncio` abandons the `trio`
# guest-run task silently and no `trio`-guest-run or
# `tractor`-actor-runtime teardown happens whatsoever..
# this is the WORST (race) case outcome.
#
# - "loud-abandon" (BEST-ish CASE):
# the guest run get's abaondoned "loudly" with `trio`
# reporting a console traceback and further tbs of all
# the (failed) GC-triggered shutdown routines which
# thankfully does get dumped to console..
# - OR, "loud-abandon": the guest run get's abaondoned "loudly" with
# `trio` reporting a console traceback and further tbs of all
# the failed shutdown routines also show on console..
#
# The abandonment is most easily reproduced if the `trio`
# side has tasks doing shielded work where those tasks
# ignore the normal `Cancelled` condition and continue to
# run, but obviously `asyncio` isn't aware of this and at
# some point bails on the guest-run unless we take manual
# intervention..
# our test can thus fail and (has been parametrized for)
# the 2 cases:
#
# To repeat, *WITHOUT THIS* stuff below the guest-run can
# get race-conditionally abandoned!!
# - when the parent raises a KBI just after
# signalling the child,
# |_silent-abandon => the `Actor.lifetime_stack` will
# never be closed thus leaking a resource!
# -> FAIL!
# |_loud-abandon => despite the abandonment at least the
# stack will be closed out..
# -> PASS
#
# XXX SOLUTION XXX
# ------ - ------
# XXX FIRST PART:
# ------ - ------
# the obvious fix to the "silent-abandon" case is to
# explicitly cancel the actor runtime such that no
# runtime tasks are even left unaware that the guest-run
# should be terminated due to OS cancellation.
# - when the parent instead simply waits on `ctx.wait_for_result()`
# (i.e. DOES not raise a KBI itself),
# |_silent-abandon => test will just hang and thus the ctx
# and actor will never be closed/cancelled/shutdown
# resulting in leaking a (file) resource since the
# `trio`/`tractor` runtime never relays a ctxc back to
# the parent; the test's timeout will trigger..
# -> FAIL!
# |_loud-abandon => this case seems to never happen??
#
# XXX FIRST PART XXX, SO, this is a fix to the
# "silent-abandon" case, NOT the `trio`-guest-run
# abandonment issue in general, for which the NEXT LOC
# is apparently a working fix!
actor.cancel_soon()
# ------ - ------
# XXX SECOND PART:
# ------ - ------
# Pump the `asyncio` event-loop to allow
# XXX NOTE XXX pump the `asyncio` event-loop to allow
# `trio`-side to `trio`-guest-run to complete and
# teardown !!
#
# oh `asyncio`, how i don't miss you at all XD
while not trio_done_fute.done():
# *WITHOUT THIS* the guest-run can get race-conditionally abandoned!!
# XD
#
await asyncio.sleep(.1) # `delay` can't be 0 either XD
while not trio_done_fut.done():
log.runtime(
'Waiting on main guest-run `asyncio` task to complete..\n'
f'|_trio_done_fut: {trio_done_fute}\n'
f'|_trio_done_fut: {trio_done_fut}\n'
)
await asyncio.sleep(_sigint_loop_pump_delay)
await asyncio.sleep(.1)
# XXX is there any alt API/approach like the internal
# call below but that doesn't block indefinitely..?
# XXX: don't actually need the shield.. seems to
# make no difference (??) and we know it spawns an
# internal task..
# await asyncio.shield(asyncio.sleep(.1))
# XXX alt approach but can block indefinitely..
# so don't use?
# loop._run_once()
try:
return trio_done_fute.result()
return trio_done_fut.result()
except asyncio.exceptions.InvalidStateError as state_err:
# XXX be super dupere noisy about abandonment issues!