Compare commits
7 Commits
9133f42b07
...
4fbd469c33
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 4fbd469c33 | |
Tyler Goodlet | cb90f3e6ba | |
Tyler Goodlet | 5e009a8229 | |
Tyler Goodlet | b72a025d0f | |
Tyler Goodlet | 5739e79645 | |
Tyler Goodlet | 2ac999cc3c | |
Tyler Goodlet | 9f9b0b17dc |
|
@ -285,14 +285,14 @@ def test_basic_payload_spec(
|
||||||
|
|
||||||
if invalid_started:
|
if invalid_started:
|
||||||
msg_type_str: str = 'Started'
|
msg_type_str: str = 'Started'
|
||||||
bad_value_str: str = '10'
|
bad_value: int = 10
|
||||||
elif invalid_return:
|
elif invalid_return:
|
||||||
msg_type_str: str = 'Return'
|
msg_type_str: str = 'Return'
|
||||||
bad_value_str: str = "'yo'"
|
bad_value: str = 'yo'
|
||||||
else:
|
else:
|
||||||
# XXX but should never be used below then..
|
# XXX but should never be used below then..
|
||||||
msg_type_str: str = ''
|
msg_type_str: str = ''
|
||||||
bad_value_str: str = ''
|
bad_value: str = ''
|
||||||
|
|
||||||
maybe_mte: MsgTypeError|None = None
|
maybe_mte: MsgTypeError|None = None
|
||||||
should_raise: Exception|None = (
|
should_raise: Exception|None = (
|
||||||
|
@ -307,8 +307,10 @@ def test_basic_payload_spec(
|
||||||
raises=should_raise,
|
raises=should_raise,
|
||||||
ensure_in_message=[
|
ensure_in_message=[
|
||||||
f"invalid `{msg_type_str}` msg payload",
|
f"invalid `{msg_type_str}` msg payload",
|
||||||
f"value: `{bad_value_str}` does not "
|
f'{bad_value}',
|
||||||
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
|
f'has type {type(bad_value)!r}',
|
||||||
|
'not match type-spec',
|
||||||
|
f'`{msg_type_str}.pld: PldMsg|NoneType`',
|
||||||
],
|
],
|
||||||
# only for debug
|
# only for debug
|
||||||
# post_mortem=True,
|
# post_mortem=True,
|
||||||
|
|
|
@ -38,6 +38,7 @@ from collections import deque
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
)
|
)
|
||||||
|
from contextvars import Token
|
||||||
from dataclasses import (
|
from dataclasses import (
|
||||||
dataclass,
|
dataclass,
|
||||||
field,
|
field,
|
||||||
|
@ -121,10 +122,19 @@ class Unresolved:
|
||||||
@dataclass
|
@dataclass
|
||||||
class Context:
|
class Context:
|
||||||
'''
|
'''
|
||||||
An inter-actor, SC transitive, `Task` communication context.
|
An inter-actor, SC transitive, `trio.Task` (pair)
|
||||||
|
communication context.
|
||||||
|
|
||||||
NB: This class should **never be instatiated directly**, it is allocated
|
(We've also considered other names and ideas:
|
||||||
by the runtime in 2 ways:
|
- "communicating tasks scope": cts
|
||||||
|
- "distributed task scope": dts
|
||||||
|
- "communicating tasks context": ctc
|
||||||
|
|
||||||
|
**Got a better idea for naming? Make an issue dawg!**
|
||||||
|
)
|
||||||
|
|
||||||
|
NB: This class should **never be instatiated directly**, it is
|
||||||
|
allocated by the runtime in 2 ways:
|
||||||
- by entering `Portal.open_context()` which is the primary
|
- by entering `Portal.open_context()` which is the primary
|
||||||
public API for any "parent" task or,
|
public API for any "parent" task or,
|
||||||
- by the RPC machinery's `._rpc._invoke()` as a `ctx` arg
|
- by the RPC machinery's `._rpc._invoke()` as a `ctx` arg
|
||||||
|
@ -210,6 +220,16 @@ class Context:
|
||||||
# more the the `Context` is needed?
|
# more the the `Context` is needed?
|
||||||
_portal: Portal | None = None
|
_portal: Portal | None = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def portal(self) -> Portal|None:
|
||||||
|
'''
|
||||||
|
Return any wrapping memory-`Portal` if this is
|
||||||
|
a 'parent'-side task which called `Portal.open_context()`,
|
||||||
|
otherwise `None`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._portal
|
||||||
|
|
||||||
# NOTE: each side of the context has its own cancel scope
|
# NOTE: each side of the context has its own cancel scope
|
||||||
# which is exactly the primitive that allows for
|
# which is exactly the primitive that allows for
|
||||||
# cross-actor-task-supervision and thus SC.
|
# cross-actor-task-supervision and thus SC.
|
||||||
|
@ -299,6 +319,8 @@ class Context:
|
||||||
# boxed exception. NOW, it's used for spawning overrun queuing
|
# boxed exception. NOW, it's used for spawning overrun queuing
|
||||||
# tasks when `.allow_overruns == True` !!!
|
# tasks when `.allow_overruns == True` !!!
|
||||||
_scope_nursery: trio.Nursery|None = None
|
_scope_nursery: trio.Nursery|None = None
|
||||||
|
# ^-TODO-^ change name?
|
||||||
|
# -> `._scope_tn` "scope task nursery"
|
||||||
|
|
||||||
# streaming overrun state tracking
|
# streaming overrun state tracking
|
||||||
_in_overrun: bool = False
|
_in_overrun: bool = False
|
||||||
|
@ -408,10 +430,23 @@ class Context:
|
||||||
'''
|
'''
|
||||||
return self._cancel_called
|
return self._cancel_called
|
||||||
|
|
||||||
|
@cancel_called.setter
|
||||||
|
def cancel_called(self, val: bool) -> None:
|
||||||
|
'''
|
||||||
|
Set the self-cancelled request `bool` value.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# to debug who frickin sets it..
|
||||||
|
# if val:
|
||||||
|
# from .devx import pause_from_sync
|
||||||
|
# pause_from_sync()
|
||||||
|
|
||||||
|
self._cancel_called = val
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def canceller(self) -> tuple[str, str]|None:
|
def canceller(self) -> tuple[str, str]|None:
|
||||||
'''
|
'''
|
||||||
``Actor.uid: tuple[str, str]`` of the (remote)
|
`Actor.uid: tuple[str, str]` of the (remote)
|
||||||
actor-process who's task was cancelled thus causing this
|
actor-process who's task was cancelled thus causing this
|
||||||
(side of the) context to also be cancelled.
|
(side of the) context to also be cancelled.
|
||||||
|
|
||||||
|
@ -515,7 +550,7 @@ class Context:
|
||||||
|
|
||||||
# the local scope was never cancelled
|
# the local scope was never cancelled
|
||||||
# and instead likely we received a remote side
|
# and instead likely we received a remote side
|
||||||
# # cancellation that was raised inside `.result()`
|
# # cancellation that was raised inside `.wait_for_result()`
|
||||||
# or (
|
# or (
|
||||||
# (se := self._local_error)
|
# (se := self._local_error)
|
||||||
# and se is re
|
# and se is re
|
||||||
|
@ -585,6 +620,10 @@ class Context:
|
||||||
self,
|
self,
|
||||||
error: BaseException,
|
error: BaseException,
|
||||||
|
|
||||||
|
# TODO: manual toggle for cases where we wouldn't normally
|
||||||
|
# mark ourselves cancelled but want to?
|
||||||
|
# set_cancel_called: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
(Maybe) cancel this local scope due to a received remote
|
(Maybe) cancel this local scope due to a received remote
|
||||||
|
@ -603,7 +642,7 @@ class Context:
|
||||||
- `Portal.open_context()`
|
- `Portal.open_context()`
|
||||||
- `Portal.result()`
|
- `Portal.result()`
|
||||||
- `Context.open_stream()`
|
- `Context.open_stream()`
|
||||||
- `Context.result()`
|
- `Context.wait_for_result()`
|
||||||
|
|
||||||
when called/closed by actor local task(s).
|
when called/closed by actor local task(s).
|
||||||
|
|
||||||
|
@ -729,7 +768,7 @@ class Context:
|
||||||
|
|
||||||
# Cancel the local `._scope`, catch that
|
# Cancel the local `._scope`, catch that
|
||||||
# `._scope.cancelled_caught` and re-raise any remote error
|
# `._scope.cancelled_caught` and re-raise any remote error
|
||||||
# once exiting (or manually calling `.result()`) the
|
# once exiting (or manually calling `.wait_for_result()`) the
|
||||||
# `.open_context()` block.
|
# `.open_context()` block.
|
||||||
cs: trio.CancelScope = self._scope
|
cs: trio.CancelScope = self._scope
|
||||||
if (
|
if (
|
||||||
|
@ -764,8 +803,9 @@ class Context:
|
||||||
# `trio.Cancelled` subtype here ;)
|
# `trio.Cancelled` subtype here ;)
|
||||||
# https://github.com/goodboy/tractor/issues/368
|
# https://github.com/goodboy/tractor/issues/368
|
||||||
message: str = 'Cancelling `Context._scope` !\n\n'
|
message: str = 'Cancelling `Context._scope` !\n\n'
|
||||||
|
# from .devx import pause_from_sync
|
||||||
|
# pause_from_sync()
|
||||||
self._scope.cancel()
|
self._scope.cancel()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
||||||
# from .devx import mk_pdb
|
# from .devx import mk_pdb
|
||||||
|
@ -845,15 +885,15 @@ class Context:
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def repr_api(self) -> str:
|
def repr_api(self) -> str:
|
||||||
|
return 'Portal.open_context()'
|
||||||
|
|
||||||
|
# TODO: use `.dev._frame_stack` scanning to find caller!
|
||||||
# ci: CallerInfo|None = self._caller_info
|
# ci: CallerInfo|None = self._caller_info
|
||||||
# if ci:
|
# if ci:
|
||||||
# return (
|
# return (
|
||||||
# f'{ci.api_nsp}()\n'
|
# f'{ci.api_nsp}()\n'
|
||||||
# )
|
# )
|
||||||
|
|
||||||
# TODO: use `.dev._frame_stack` scanning to find caller!
|
|
||||||
return 'Portal.open_context()'
|
|
||||||
|
|
||||||
async def cancel(
|
async def cancel(
|
||||||
self,
|
self,
|
||||||
timeout: float = 0.616,
|
timeout: float = 0.616,
|
||||||
|
@ -889,7 +929,8 @@ class Context:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
side: str = self.side
|
side: str = self.side
|
||||||
self._cancel_called: bool = True
|
# XXX for debug via the `@.setter`
|
||||||
|
self.cancel_called = True
|
||||||
|
|
||||||
header: str = (
|
header: str = (
|
||||||
f'Cancelling ctx with peer from {side.upper()} side\n\n'
|
f'Cancelling ctx with peer from {side.upper()} side\n\n'
|
||||||
|
@ -912,7 +953,7 @@ class Context:
|
||||||
# `._scope.cancel()` since we expect the eventual
|
# `._scope.cancel()` since we expect the eventual
|
||||||
# `ContextCancelled` from the other side to trigger this
|
# `ContextCancelled` from the other side to trigger this
|
||||||
# when the runtime finally receives it during teardown
|
# when the runtime finally receives it during teardown
|
||||||
# (normally in `.result()` called from
|
# (normally in `.wait_for_result()` called from
|
||||||
# `Portal.open_context().__aexit__()`)
|
# `Portal.open_context().__aexit__()`)
|
||||||
if side == 'parent':
|
if side == 'parent':
|
||||||
if not self._portal:
|
if not self._portal:
|
||||||
|
@ -1025,10 +1066,10 @@ class Context:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
our_uid: tuple = self.chan.uid
|
peer_uid: tuple = self.chan.uid
|
||||||
|
|
||||||
# XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
|
# XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
|
||||||
# for "graceful cancellation" case:
|
# for "graceful cancellation" case(s):
|
||||||
#
|
#
|
||||||
# Whenever a "side" of a context (a `Task` running in
|
# Whenever a "side" of a context (a `Task` running in
|
||||||
# an actor) **is** the side which requested ctx
|
# an actor) **is** the side which requested ctx
|
||||||
|
@ -1045,9 +1086,11 @@ class Context:
|
||||||
# set to the `Actor.uid` of THIS task (i.e. the
|
# set to the `Actor.uid` of THIS task (i.e. the
|
||||||
# cancellation requesting task's actor is the actor
|
# cancellation requesting task's actor is the actor
|
||||||
# checking whether it should absorb the ctxc).
|
# checking whether it should absorb the ctxc).
|
||||||
|
self_ctxc: bool = self._is_self_cancelled(remote_error)
|
||||||
if (
|
if (
|
||||||
|
self_ctxc
|
||||||
|
and
|
||||||
not raise_ctxc_from_self_call
|
not raise_ctxc_from_self_call
|
||||||
and self._is_self_cancelled(remote_error)
|
|
||||||
|
|
||||||
# TODO: ?potentially it is useful to emit certain
|
# TODO: ?potentially it is useful to emit certain
|
||||||
# warning/cancel logs for the cases where the
|
# warning/cancel logs for the cases where the
|
||||||
|
@ -1077,8 +1120,8 @@ class Context:
|
||||||
and isinstance(remote_error, RemoteActorError)
|
and isinstance(remote_error, RemoteActorError)
|
||||||
and remote_error.boxed_type is StreamOverrun
|
and remote_error.boxed_type is StreamOverrun
|
||||||
|
|
||||||
# and tuple(remote_error.msgdata['sender']) == our_uid
|
# and tuple(remote_error.msgdata['sender']) == peer_uid
|
||||||
and tuple(remote_error.sender) == our_uid
|
and tuple(remote_error.sender) == peer_uid
|
||||||
):
|
):
|
||||||
# NOTE: we set the local scope error to any "self
|
# NOTE: we set the local scope error to any "self
|
||||||
# cancellation" error-response thus "absorbing"
|
# cancellation" error-response thus "absorbing"
|
||||||
|
@ -1140,9 +1183,9 @@ class Context:
|
||||||
of the remote cancellation.
|
of the remote cancellation.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__ = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
assert self._portal, (
|
assert self._portal, (
|
||||||
"Context.result() can not be called from callee side!"
|
'`Context.wait_for_result()` can not be called from callee side!'
|
||||||
)
|
)
|
||||||
if self._final_result_is_set():
|
if self._final_result_is_set():
|
||||||
return self._result
|
return self._result
|
||||||
|
@ -1197,10 +1240,11 @@ class Context:
|
||||||
# raising something we know might happen
|
# raising something we know might happen
|
||||||
# during cancellation ;)
|
# during cancellation ;)
|
||||||
(not self._cancel_called)
|
(not self._cancel_called)
|
||||||
)
|
),
|
||||||
|
hide_tb=hide_tb,
|
||||||
)
|
)
|
||||||
# TODO: eventually make `.outcome: Outcome` and thus return
|
# TODO: eventually make `.outcome: Outcome` and thus return
|
||||||
# `self.outcome.unwrap()` here!
|
# `self.outcome.unwrap()` here?
|
||||||
return self.outcome
|
return self.outcome
|
||||||
|
|
||||||
# TODO: switch this with above!
|
# TODO: switch this with above!
|
||||||
|
@ -1284,17 +1328,24 @@ class Context:
|
||||||
Any|
|
Any|
|
||||||
RemoteActorError|
|
RemoteActorError|
|
||||||
ContextCancelled
|
ContextCancelled
|
||||||
|
# TODO: make this a `outcome.Outcome`!
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
The final "outcome" from an IPC context which can either be
|
Return the "final outcome" (state) of the far end peer task
|
||||||
some Value returned from the target `@context`-decorated
|
non-blocking. If the remote task has not completed then this
|
||||||
remote task-as-func, or an `Error` wrapping an exception
|
field always resolves to the module defined `Unresolved`
|
||||||
raised from an RPC task fault or cancellation.
|
handle.
|
||||||
|
|
||||||
Note that if the remote task has not terminated then this
|
------ - ------
|
||||||
field always resolves to the module defined `Unresolved` handle.
|
TODO->( this is doc-driven-dev content not yet actual ;P )
|
||||||
|
|
||||||
TODO: implement this using `outcome.Outcome` types?
|
The final "outcome" from an IPC context which can be any of:
|
||||||
|
- some `outcome.Value` which boxes the returned output from the peer task's
|
||||||
|
`@context`-decorated remote task-as-func, or
|
||||||
|
- an `outcome.Error` wrapping an exception raised that same RPC task
|
||||||
|
after a fault or cancellation, or
|
||||||
|
- an unresolved `outcome.Outcome` when the peer task is still
|
||||||
|
executing and has not yet completed.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return (
|
return (
|
||||||
|
@ -1583,7 +1634,7 @@ class Context:
|
||||||
|
|
||||||
- NEVER `return` early before delivering the msg!
|
- NEVER `return` early before delivering the msg!
|
||||||
bc if the error is a ctxc and there is a task waiting on
|
bc if the error is a ctxc and there is a task waiting on
|
||||||
`.result()` we need the msg to be
|
`.wait_for_result()` we need the msg to be
|
||||||
`send_chan.send_nowait()`-ed over the `._rx_chan` so
|
`send_chan.send_nowait()`-ed over the `._rx_chan` so
|
||||||
that the error is relayed to that waiter task and thus
|
that the error is relayed to that waiter task and thus
|
||||||
raised in user code!
|
raised in user code!
|
||||||
|
@ -1828,7 +1879,7 @@ async def open_context_from_portal(
|
||||||
When the "callee" (side that is "called"/started by a call
|
When the "callee" (side that is "called"/started by a call
|
||||||
to *this* method) returns, the caller side (this) unblocks
|
to *this* method) returns, the caller side (this) unblocks
|
||||||
and any final value delivered from the other end can be
|
and any final value delivered from the other end can be
|
||||||
retrieved using the `Contex.result()` api.
|
retrieved using the `Contex.wait_for_result()` api.
|
||||||
|
|
||||||
The yielded ``Context`` instance further allows for opening
|
The yielded ``Context`` instance further allows for opening
|
||||||
bidirectional streams, explicit cancellation and
|
bidirectional streams, explicit cancellation and
|
||||||
|
@ -1893,7 +1944,7 @@ async def open_context_from_portal(
|
||||||
)
|
)
|
||||||
assert ctx._remote_func_type == 'context'
|
assert ctx._remote_func_type == 'context'
|
||||||
assert ctx._caller_info
|
assert ctx._caller_info
|
||||||
_ctxvar_Context.set(ctx)
|
prior_ctx_tok: Token = _ctxvar_Context.set(ctx)
|
||||||
|
|
||||||
# placeholder for any exception raised in the runtime
|
# placeholder for any exception raised in the runtime
|
||||||
# or by user tasks which cause this context's closure.
|
# or by user tasks which cause this context's closure.
|
||||||
|
@ -1965,14 +2016,14 @@ async def open_context_from_portal(
|
||||||
yield ctx, first
|
yield ctx, first
|
||||||
|
|
||||||
# ??TODO??: do we still want to consider this or is
|
# ??TODO??: do we still want to consider this or is
|
||||||
# the `else:` block handling via a `.result()`
|
# the `else:` block handling via a `.wait_for_result()`
|
||||||
# call below enough??
|
# call below enough??
|
||||||
#
|
#
|
||||||
# -[ ] pretty sure `.result()` internals do the
|
# -[ ] pretty sure `.wait_for_result()` internals do the
|
||||||
# same as our ctxc handler below so it ended up
|
# same as our ctxc handler below so it ended up
|
||||||
# being same (repeated?) behaviour, but ideally we
|
# being same (repeated?) behaviour, but ideally we
|
||||||
# wouldn't have that duplication either by somehow
|
# wouldn't have that duplication either by somehow
|
||||||
# factoring the `.result()` handler impl in a way
|
# factoring the `.wait_for_result()` handler impl in a way
|
||||||
# that we can re-use it around the `yield` ^ here
|
# that we can re-use it around the `yield` ^ here
|
||||||
# or vice versa?
|
# or vice versa?
|
||||||
#
|
#
|
||||||
|
@ -2110,7 +2161,7 @@ async def open_context_from_portal(
|
||||||
# AND a group-exc is only raised if there was > 1
|
# AND a group-exc is only raised if there was > 1
|
||||||
# tasks started *here* in the "caller" / opener
|
# tasks started *here* in the "caller" / opener
|
||||||
# block. If any one of those tasks calls
|
# block. If any one of those tasks calls
|
||||||
# `.result()` or `MsgStream.receive()`
|
# `.wait_for_result()` or `MsgStream.receive()`
|
||||||
# `._maybe_raise_remote_err()` will be transitively
|
# `._maybe_raise_remote_err()` will be transitively
|
||||||
# called and the remote error raised causing all
|
# called and the remote error raised causing all
|
||||||
# tasks to be cancelled.
|
# tasks to be cancelled.
|
||||||
|
@ -2180,7 +2231,7 @@ async def open_context_from_portal(
|
||||||
f'|_{ctx._task}\n'
|
f'|_{ctx._task}\n'
|
||||||
)
|
)
|
||||||
# XXX NOTE XXX: the below call to
|
# XXX NOTE XXX: the below call to
|
||||||
# `Context.result()` will ALWAYS raise
|
# `Context.wait_for_result()` will ALWAYS raise
|
||||||
# a `ContextCancelled` (via an embedded call to
|
# a `ContextCancelled` (via an embedded call to
|
||||||
# `Context._maybe_raise_remote_err()`) IFF
|
# `Context._maybe_raise_remote_err()`) IFF
|
||||||
# a `Context._remote_error` was set by the runtime
|
# a `Context._remote_error` was set by the runtime
|
||||||
|
@ -2190,10 +2241,10 @@ async def open_context_from_portal(
|
||||||
# ALWAYS SET any time "callee" side fails and causes "caller
|
# ALWAYS SET any time "callee" side fails and causes "caller
|
||||||
# side" cancellation via a `ContextCancelled` here.
|
# side" cancellation via a `ContextCancelled` here.
|
||||||
try:
|
try:
|
||||||
result_or_err: Exception|Any = await ctx.result()
|
result_or_err: Exception|Any = await ctx.wait_for_result()
|
||||||
except BaseException as berr:
|
except BaseException as berr:
|
||||||
# on normal teardown, if we get some error
|
# on normal teardown, if we get some error
|
||||||
# raised in `Context.result()` we still want to
|
# raised in `Context.wait_for_result()` we still want to
|
||||||
# save that error on the ctx's state to
|
# save that error on the ctx's state to
|
||||||
# determine things like `.cancelled_caught` for
|
# determine things like `.cancelled_caught` for
|
||||||
# cases where there was remote cancellation but
|
# cases where there was remote cancellation but
|
||||||
|
@ -2344,6 +2395,9 @@ async def open_context_from_portal(
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# XXX revert to prior IPC-task-ctx scope
|
||||||
|
_ctxvar_Context.reset(prior_ctx_tok)
|
||||||
|
|
||||||
|
|
||||||
def mk_context(
|
def mk_context(
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
|
|
|
@ -20,7 +20,8 @@ Sub-process entry points.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from functools import partial
|
from functools import partial
|
||||||
# import textwrap
|
import os
|
||||||
|
import textwrap
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
@ -58,7 +59,7 @@ def _mp_main(
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
The routine called *after fork* which invokes a fresh ``trio.run``
|
The routine called *after fork* which invokes a fresh `trio.run()`
|
||||||
|
|
||||||
'''
|
'''
|
||||||
actor._forkserver_info = forkserver_info
|
actor._forkserver_info = forkserver_info
|
||||||
|
@ -96,6 +97,107 @@ def _mp_main(
|
||||||
log.info(f"Subactor {actor.uid} terminated")
|
log.info(f"Subactor {actor.uid} terminated")
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: move this func to some kinda `.devx._conc_lang.py` eventually
|
||||||
|
# as we work out our multi-domain state-flow-syntax!
|
||||||
|
def nest_from_op(
|
||||||
|
input_op: str,
|
||||||
|
#
|
||||||
|
# ?TODO? an idea for a syntax to the state of concurrent systems
|
||||||
|
# as a "3-domain" (execution, scope, storage) model and using
|
||||||
|
# a minimal ascii/utf-8 operator-set.
|
||||||
|
#
|
||||||
|
# try not to take any of this seriously yet XD
|
||||||
|
#
|
||||||
|
# > is a "play operator" indicating (CPU bound)
|
||||||
|
# exec/work/ops required at the "lowest level computing"
|
||||||
|
#
|
||||||
|
# execution primititves (tasks, threads, actors..) denote their
|
||||||
|
# lifetime with '(' and ')' since parentheses normally are used
|
||||||
|
# in many langs to denote function calls.
|
||||||
|
#
|
||||||
|
# starting = (
|
||||||
|
# >( opening/starting; beginning of the thread-of-exec (toe?)
|
||||||
|
# (> opened/started, (finished spawning toe)
|
||||||
|
# |_<Task: blah blah..> repr of toe, in py these look like <objs>
|
||||||
|
#
|
||||||
|
# >) closing/exiting/stopping,
|
||||||
|
# )> closed/exited/stopped,
|
||||||
|
# |_<Task: blah blah..>
|
||||||
|
# [OR <), )< ?? ]
|
||||||
|
#
|
||||||
|
# ending = )
|
||||||
|
# >c) cancelling to close/exit
|
||||||
|
# c)> cancelled (caused close), OR?
|
||||||
|
# |_<Actor: ..>
|
||||||
|
# OR maybe "<c)" which better indicates the cancel being
|
||||||
|
# "delivered/returned" / returned" to LHS?
|
||||||
|
#
|
||||||
|
# >x) erroring to eventuall exit
|
||||||
|
# x)> errored and terminated
|
||||||
|
# |_<Actor: ...>
|
||||||
|
#
|
||||||
|
# scopes: supers/nurseries, IPC-ctxs, sessions, perms, etc.
|
||||||
|
# >{ opening
|
||||||
|
# {> opened
|
||||||
|
# }> closed
|
||||||
|
# >} closing
|
||||||
|
#
|
||||||
|
# storage: like queues, shm-buffers, files, etc..
|
||||||
|
# >[ opening
|
||||||
|
# [> opened
|
||||||
|
# |_<FileObj: ..>
|
||||||
|
#
|
||||||
|
# >] closing
|
||||||
|
# ]> closed
|
||||||
|
|
||||||
|
# IPC ops: channels, transports, msging
|
||||||
|
# => req msg
|
||||||
|
# <= resp msg
|
||||||
|
# <=> 2-way streaming (of msgs)
|
||||||
|
# <- recv 1 msg
|
||||||
|
# -> send 1 msg
|
||||||
|
#
|
||||||
|
# TODO: still not sure on R/L-HS approach..?
|
||||||
|
# =>( send-req to exec start (task, actor, thread..)
|
||||||
|
# (<= recv-req to ^
|
||||||
|
#
|
||||||
|
# (<= recv-req ^
|
||||||
|
# <=( recv-resp opened remote exec primitive
|
||||||
|
# <=) recv-resp closed
|
||||||
|
#
|
||||||
|
# )<=c req to stop due to cancel
|
||||||
|
# c=>) req to stop due to cancel
|
||||||
|
#
|
||||||
|
# =>{ recv-req to open
|
||||||
|
# <={ send-status that it closed
|
||||||
|
|
||||||
|
tree_str: str,
|
||||||
|
|
||||||
|
# NOTE: so move back-from-the-left of the `input_op` by
|
||||||
|
# this amount.
|
||||||
|
back_from_op: int = 0,
|
||||||
|
) -> str:
|
||||||
|
'''
|
||||||
|
Depth-increment the input (presumably hierarchy/supervision)
|
||||||
|
input "tree string" below the provided `input_op` execution
|
||||||
|
operator, so injecting a `"\n|_{input_op}\n"`and indenting the
|
||||||
|
`tree_str` to nest content aligned with the ops last char.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return (
|
||||||
|
f'{input_op}\n'
|
||||||
|
+
|
||||||
|
textwrap.indent(
|
||||||
|
tree_str,
|
||||||
|
prefix=(
|
||||||
|
len(input_op)
|
||||||
|
-
|
||||||
|
(back_from_op + 1)
|
||||||
|
) * ' ',
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _trio_main(
|
def _trio_main(
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
*,
|
*,
|
||||||
|
@ -107,7 +209,6 @@ def _trio_main(
|
||||||
Entry point for a `trio_run_in_process` subactor.
|
Entry point for a `trio_run_in_process` subactor.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# __tracebackhide__: bool = True
|
|
||||||
_debug.hide_runtime_frames()
|
_debug.hide_runtime_frames()
|
||||||
|
|
||||||
_state._current_actor = actor
|
_state._current_actor = actor
|
||||||
|
@ -119,7 +220,6 @@ def _trio_main(
|
||||||
|
|
||||||
if actor.loglevel is not None:
|
if actor.loglevel is not None:
|
||||||
get_console_log(actor.loglevel)
|
get_console_log(actor.loglevel)
|
||||||
import os
|
|
||||||
actor_info: str = (
|
actor_info: str = (
|
||||||
f'|_{actor}\n'
|
f'|_{actor}\n'
|
||||||
f' uid: {actor.uid}\n'
|
f' uid: {actor.uid}\n'
|
||||||
|
@ -128,13 +228,23 @@ def _trio_main(
|
||||||
f' loglevel: {actor.loglevel}\n'
|
f' loglevel: {actor.loglevel}\n'
|
||||||
)
|
)
|
||||||
log.info(
|
log.info(
|
||||||
'Started new trio subactor:\n'
|
'Starting new `trio` subactor:\n'
|
||||||
+
|
+
|
||||||
'>\n' # like a "started/play"-icon from super perspective
|
nest_from_op(
|
||||||
+
|
input_op='>(', # see syntax ideas above
|
||||||
actor_info,
|
tree_str=actor_info,
|
||||||
|
back_from_op=1,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
logmeth = log.info
|
||||||
|
exit_status: str = (
|
||||||
|
'Subactor exited\n'
|
||||||
|
+
|
||||||
|
nest_from_op(
|
||||||
|
input_op=')>', # like a "closed-to-play"-icon from super perspective
|
||||||
|
tree_str=actor_info,
|
||||||
|
)
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
if infect_asyncio:
|
if infect_asyncio:
|
||||||
actor._infected_aio = True
|
actor._infected_aio = True
|
||||||
|
@ -143,16 +253,28 @@ def _trio_main(
|
||||||
trio.run(trio_main)
|
trio.run(trio_main)
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
log.cancel(
|
logmeth = log.cancel
|
||||||
'Actor received KBI\n'
|
exit_status: str = (
|
||||||
|
'Actor received KBI (aka an OS-cancel)\n'
|
||||||
+
|
+
|
||||||
actor_info
|
nest_from_op(
|
||||||
|
input_op='c)>', # closed due to cancel (see above)
|
||||||
|
tree_str=actor_info,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
except BaseException as err:
|
||||||
|
logmeth = log.error
|
||||||
|
exit_status: str = (
|
||||||
|
'Main actor task crashed during exit?\n'
|
||||||
|
+
|
||||||
|
nest_from_op(
|
||||||
|
input_op='x)>', # closed by error
|
||||||
|
tree_str=actor_info,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# NOTE since we raise a tb will already be shown on the
|
||||||
|
# console, thus we do NOT use `.exception()` above.
|
||||||
|
raise err
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
log.info(
|
logmeth(exit_status)
|
||||||
'Subactor terminated\n'
|
|
||||||
+
|
|
||||||
'x\n' # like a "crossed-out/killed" from super perspective
|
|
||||||
+
|
|
||||||
actor_info
|
|
||||||
)
|
|
||||||
|
|
|
@ -121,7 +121,8 @@ class Portal:
|
||||||
)
|
)
|
||||||
return self.chan
|
return self.chan
|
||||||
|
|
||||||
# TODO: factor this out into an `ActorNursery` wrapper
|
# TODO: factor this out into a `.highlevel` API-wrapper that uses
|
||||||
|
# a single `.open_context()` call underneath.
|
||||||
async def _submit_for_result(
|
async def _submit_for_result(
|
||||||
self,
|
self,
|
||||||
ns: str,
|
ns: str,
|
||||||
|
@ -141,13 +142,22 @@ class Portal:
|
||||||
portal=self,
|
portal=self,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO: we should deprecate this API right? since if we remove
|
||||||
|
# `.run_in_actor()` (and instead move it to a `.highlevel`
|
||||||
|
# wrapper api (around a single `.open_context()` call) we don't
|
||||||
|
# really have any notion of a "main" remote task any more?
|
||||||
|
#
|
||||||
# @api_frame
|
# @api_frame
|
||||||
async def result(self) -> Any:
|
async def wait_for_result(
|
||||||
|
self,
|
||||||
|
hide_tb: bool = True,
|
||||||
|
) -> Any:
|
||||||
'''
|
'''
|
||||||
Return the result(s) from the remote actor's "main" task.
|
Return the final result delivered by a `Return`-msg from the
|
||||||
|
remote peer actor's "main" task's `return` statement.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__ = True
|
__tracebackhide__: bool = hide_tb
|
||||||
# Check for non-rpc errors slapped on the
|
# Check for non-rpc errors slapped on the
|
||||||
# channel for which we always raise
|
# channel for which we always raise
|
||||||
exc = self.channel._exc
|
exc = self.channel._exc
|
||||||
|
@ -182,6 +192,23 @@ class Portal:
|
||||||
|
|
||||||
return self._final_result_pld
|
return self._final_result_pld
|
||||||
|
|
||||||
|
# TODO: factor this out into a `.highlevel` API-wrapper that uses
|
||||||
|
# a single `.open_context()` call underneath.
|
||||||
|
async def result(
|
||||||
|
self,
|
||||||
|
*args,
|
||||||
|
**kwargs,
|
||||||
|
) -> Any|Exception:
|
||||||
|
typname: str = type(self).__name__
|
||||||
|
log.warning(
|
||||||
|
f'`{typname}.result()` is DEPRECATED!\n'
|
||||||
|
f'Use `{typname}.wait_for_result()` instead!\n'
|
||||||
|
)
|
||||||
|
return await self.wait_for_result(
|
||||||
|
*args,
|
||||||
|
**kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
async def _cancel_streams(self):
|
async def _cancel_streams(self):
|
||||||
# terminate all locally running async generator
|
# terminate all locally running async generator
|
||||||
# IPC calls
|
# IPC calls
|
||||||
|
@ -240,6 +267,7 @@ class Portal:
|
||||||
f'{reminfo}'
|
f'{reminfo}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# XXX the one spot we set it?
|
||||||
self.channel._cancel_called: bool = True
|
self.channel._cancel_called: bool = True
|
||||||
try:
|
try:
|
||||||
# send cancel cmd - might not get response
|
# send cancel cmd - might not get response
|
||||||
|
@ -279,6 +307,8 @@ class Portal:
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
# TODO: do we still need this for low level `Actor`-runtime
|
||||||
|
# method calls or can we also remove it?
|
||||||
async def run_from_ns(
|
async def run_from_ns(
|
||||||
self,
|
self,
|
||||||
namespace_path: str,
|
namespace_path: str,
|
||||||
|
@ -316,6 +346,8 @@ class Portal:
|
||||||
expect_msg=Return,
|
expect_msg=Return,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO: factor this out into a `.highlevel` API-wrapper that uses
|
||||||
|
# a single `.open_context()` call underneath.
|
||||||
async def run(
|
async def run(
|
||||||
self,
|
self,
|
||||||
func: str,
|
func: str,
|
||||||
|
@ -370,6 +402,8 @@ class Portal:
|
||||||
expect_msg=Return,
|
expect_msg=Return,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO: factor this out into a `.highlevel` API-wrapper that uses
|
||||||
|
# a single `.open_context()` call underneath.
|
||||||
@acm
|
@acm
|
||||||
async def open_stream_from(
|
async def open_stream_from(
|
||||||
self,
|
self,
|
||||||
|
|
|
@ -21,6 +21,7 @@ Root actor runtime ignition(s).
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from functools import partial
|
from functools import partial
|
||||||
import importlib
|
import importlib
|
||||||
|
import inspect
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
|
@ -115,10 +116,16 @@ async def open_root_actor(
|
||||||
if (
|
if (
|
||||||
debug_mode
|
debug_mode
|
||||||
and maybe_enable_greenback
|
and maybe_enable_greenback
|
||||||
and await _debug.maybe_init_greenback(
|
and (
|
||||||
raise_not_found=False,
|
maybe_mod := await _debug.maybe_init_greenback(
|
||||||
|
raise_not_found=False,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
|
logger.info(
|
||||||
|
f'Found `greenback` installed @ {maybe_mod}\n'
|
||||||
|
'Enabling `tractor.pause_from_sync()` support!\n'
|
||||||
|
)
|
||||||
os.environ['PYTHONBREAKPOINT'] = (
|
os.environ['PYTHONBREAKPOINT'] = (
|
||||||
'tractor.devx._debug._sync_pause_from_builtin'
|
'tractor.devx._debug._sync_pause_from_builtin'
|
||||||
)
|
)
|
||||||
|
@ -264,7 +271,9 @@ async def open_root_actor(
|
||||||
|
|
||||||
except OSError:
|
except OSError:
|
||||||
# TODO: make this a "discovery" log level?
|
# TODO: make this a "discovery" log level?
|
||||||
logger.warning(f'No actor registry found @ {addr}')
|
logger.info(
|
||||||
|
f'No actor registry found @ {addr}\n'
|
||||||
|
)
|
||||||
|
|
||||||
async with trio.open_nursery() as tn:
|
async with trio.open_nursery() as tn:
|
||||||
for addr in registry_addrs:
|
for addr in registry_addrs:
|
||||||
|
@ -278,7 +287,6 @@ async def open_root_actor(
|
||||||
# Create a new local root-actor instance which IS NOT THE
|
# Create a new local root-actor instance which IS NOT THE
|
||||||
# REGISTRAR
|
# REGISTRAR
|
||||||
if ponged_addrs:
|
if ponged_addrs:
|
||||||
|
|
||||||
if ensure_registry:
|
if ensure_registry:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f'Failed to open `{name}`@{ponged_addrs}: '
|
f'Failed to open `{name}`@{ponged_addrs}: '
|
||||||
|
@ -365,23 +373,25 @@ async def open_root_actor(
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
yield actor
|
yield actor
|
||||||
|
|
||||||
except (
|
except (
|
||||||
Exception,
|
Exception,
|
||||||
BaseExceptionGroup,
|
BaseExceptionGroup,
|
||||||
) as err:
|
) as err:
|
||||||
|
# XXX NOTE XXX see equiv note inside
|
||||||
import inspect
|
# `._runtime.Actor._stream_handler()` where in the
|
||||||
|
# non-root or root-that-opened-this-mahually case we
|
||||||
|
# wait for the local actor-nursery to exit before
|
||||||
|
# exiting the transport channel handler.
|
||||||
entered: bool = await _debug._maybe_enter_pm(
|
entered: bool = await _debug._maybe_enter_pm(
|
||||||
err,
|
err,
|
||||||
api_frame=inspect.currentframe(),
|
api_frame=inspect.currentframe(),
|
||||||
)
|
)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
not entered
|
not entered
|
||||||
and not is_multi_cancelled(err)
|
and
|
||||||
|
not is_multi_cancelled(err)
|
||||||
):
|
):
|
||||||
logger.exception('Root actor crashed:\n')
|
logger.exception('Root actor crashed\n')
|
||||||
|
|
||||||
# ALWAYS re-raise any error bubbled up from the
|
# ALWAYS re-raise any error bubbled up from the
|
||||||
# runtime!
|
# runtime!
|
||||||
|
|
|
@ -1046,6 +1046,10 @@ class Actor:
|
||||||
# TODO: another `Struct` for rtvs..
|
# TODO: another `Struct` for rtvs..
|
||||||
rvs: dict[str, Any] = spawnspec._runtime_vars
|
rvs: dict[str, Any] = spawnspec._runtime_vars
|
||||||
if rvs['_debug_mode']:
|
if rvs['_debug_mode']:
|
||||||
|
from .devx import (
|
||||||
|
enable_stack_on_sig,
|
||||||
|
maybe_init_greenback,
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
# TODO: maybe return some status msgs upward
|
# TODO: maybe return some status msgs upward
|
||||||
# to that we can emit them in `con_status`
|
# to that we can emit them in `con_status`
|
||||||
|
@ -1053,13 +1057,27 @@ class Actor:
|
||||||
log.devx(
|
log.devx(
|
||||||
'Enabling `stackscope` traces on SIGUSR1'
|
'Enabling `stackscope` traces on SIGUSR1'
|
||||||
)
|
)
|
||||||
from .devx import enable_stack_on_sig
|
|
||||||
enable_stack_on_sig()
|
enable_stack_on_sig()
|
||||||
|
|
||||||
except ImportError:
|
except ImportError:
|
||||||
log.warning(
|
log.warning(
|
||||||
'`stackscope` not installed for use in debug mode!'
|
'`stackscope` not installed for use in debug mode!'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if rvs.get('use_greenback', False):
|
||||||
|
maybe_mod: ModuleType|None = await maybe_init_greenback()
|
||||||
|
if maybe_mod:
|
||||||
|
log.devx(
|
||||||
|
'Activated `greenback` '
|
||||||
|
'for `tractor.pause_from_sync()` support!'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
rvs['use_greenback'] = False
|
||||||
|
log.warning(
|
||||||
|
'`greenback` not installed for use in debug mode!\n'
|
||||||
|
'`tractor.pause_from_sync()` not available!'
|
||||||
|
)
|
||||||
|
|
||||||
rvs['_is_root'] = False
|
rvs['_is_root'] = False
|
||||||
_state._runtime_vars.update(rvs)
|
_state._runtime_vars.update(rvs)
|
||||||
|
|
||||||
|
@ -1717,8 +1735,8 @@ async def async_main(
|
||||||
|
|
||||||
# Register with the arbiter if we're told its addr
|
# Register with the arbiter if we're told its addr
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Registering `{actor.name}` ->\n'
|
f'Registering `{actor.name}` => {pformat(accept_addrs)}\n'
|
||||||
f'{pformat(accept_addrs)}'
|
# ^-TODO-^ we should instead show the maddr here^^
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: ideally we don't fan out to all registrars
|
# TODO: ideally we don't fan out to all registrars
|
||||||
|
@ -1776,57 +1794,90 @@ async def async_main(
|
||||||
|
|
||||||
# Blocks here as expected until the root nursery is
|
# Blocks here as expected until the root nursery is
|
||||||
# killed (i.e. this actor is cancelled or signalled by the parent)
|
# killed (i.e. this actor is cancelled or signalled by the parent)
|
||||||
except Exception as err:
|
except Exception as internal_err:
|
||||||
log.runtime("Closing all actor lifetime contexts")
|
|
||||||
actor.lifetime_stack.close()
|
|
||||||
|
|
||||||
if not is_registered:
|
if not is_registered:
|
||||||
|
err_report: str = (
|
||||||
|
'\n'
|
||||||
|
"Actor runtime (internally) failed BEFORE contacting the registry?\n"
|
||||||
|
f'registrars -> {actor.reg_addrs} ?!?!\n\n'
|
||||||
|
|
||||||
|
'^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n'
|
||||||
|
'\t>> CALMLY CANCEL YOUR CHILDREN AND CALL YOUR PARENTS <<\n\n'
|
||||||
|
|
||||||
|
'\tIf this is a sub-actor hopefully its parent will keep running '
|
||||||
|
'and cancel/reap this sub-process..\n'
|
||||||
|
'(well, presuming this error was propagated upward)\n\n'
|
||||||
|
|
||||||
|
'\t---------------------------------------------\n'
|
||||||
|
'\tPLEASE REPORT THIS TRACEBACK IN A BUG REPORT @ ' # oneline
|
||||||
|
'https://github.com/goodboy/tractor/issues\n'
|
||||||
|
'\t---------------------------------------------\n'
|
||||||
|
)
|
||||||
|
|
||||||
# TODO: I guess we could try to connect back
|
# TODO: I guess we could try to connect back
|
||||||
# to the parent through a channel and engage a debugger
|
# to the parent through a channel and engage a debugger
|
||||||
# once we have that all working with std streams locking?
|
# once we have that all working with std streams locking?
|
||||||
log.exception(
|
log.exception(err_report)
|
||||||
f"Actor errored and failed to register with arbiter "
|
|
||||||
f"@ {actor.reg_addrs[0]}?")
|
|
||||||
log.error(
|
|
||||||
"\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n"
|
|
||||||
"\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n"
|
|
||||||
"\tIf this is a sub-actor hopefully its parent will keep running "
|
|
||||||
"correctly presuming this error was safely ignored..\n\n"
|
|
||||||
"\tPLEASE REPORT THIS TRACEBACK IN A BUG REPORT: "
|
|
||||||
"https://github.com/goodboy/tractor/issues\n"
|
|
||||||
)
|
|
||||||
|
|
||||||
if actor._parent_chan:
|
if actor._parent_chan:
|
||||||
await try_ship_error_to_remote(
|
await try_ship_error_to_remote(
|
||||||
actor._parent_chan,
|
actor._parent_chan,
|
||||||
err,
|
internal_err,
|
||||||
)
|
)
|
||||||
|
|
||||||
# always!
|
# always!
|
||||||
match err:
|
match internal_err:
|
||||||
case ContextCancelled():
|
case ContextCancelled():
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Actor: {actor.uid} was task-context-cancelled with,\n'
|
f'Actor: {actor.uid} was task-context-cancelled with,\n'
|
||||||
f'str(err)'
|
f'str(internal_err)'
|
||||||
)
|
)
|
||||||
case _:
|
case _:
|
||||||
log.exception("Actor errored:")
|
log.exception(
|
||||||
raise
|
'Main actor-runtime task errored\n'
|
||||||
|
f'<x)\n'
|
||||||
|
f' |_{actor}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
raise internal_err
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
log.runtime(
|
teardown_report: str = (
|
||||||
'Runtime nursery complete'
|
'Main actor-runtime task completed\n'
|
||||||
'-> Closing all actor lifetime contexts..'
|
|
||||||
)
|
)
|
||||||
# tear down all lifetime contexts if not in guest mode
|
|
||||||
# XXX: should this just be in the entrypoint?
|
|
||||||
actor.lifetime_stack.close()
|
|
||||||
|
|
||||||
# TODO: we can't actually do this bc the debugger
|
# ?TODO? should this be in `._entry`/`._root` mods instead?
|
||||||
# uses the _service_n to spawn the lock task, BUT,
|
#
|
||||||
# in theory if we had the root nursery surround this finally
|
# teardown any actor-lifetime-bound contexts
|
||||||
# block it might be actually possible to debug THIS
|
ls: ExitStack = actor.lifetime_stack
|
||||||
# machinery in the same way as user task code?
|
# only report if there are any registered
|
||||||
|
cbs: list[Callable] = [
|
||||||
|
repr(tup[1].__wrapped__)
|
||||||
|
for tup in ls._exit_callbacks
|
||||||
|
]
|
||||||
|
if cbs:
|
||||||
|
cbs_str: str = '\n'.join(cbs)
|
||||||
|
teardown_report += (
|
||||||
|
'-> Closing actor-lifetime-bound callbacks\n\n'
|
||||||
|
f'}}>\n'
|
||||||
|
f' |_{ls}\n'
|
||||||
|
f' |_{cbs_str}\n'
|
||||||
|
)
|
||||||
|
# XXX NOTE XXX this will cause an error which
|
||||||
|
# prevents any `infected_aio` actor from continuing
|
||||||
|
# and any callbacks in the `ls` here WILL NOT be
|
||||||
|
# called!!
|
||||||
|
# await _debug.pause(shield=True)
|
||||||
|
|
||||||
|
ls.close()
|
||||||
|
|
||||||
|
# XXX TODO but hard XXX
|
||||||
|
# we can't actually do this bc the debugger uses the
|
||||||
|
# _service_n to spawn the lock task, BUT, in theory if we had
|
||||||
|
# the root nursery surround this finally block it might be
|
||||||
|
# actually possible to debug THIS machinery in the same way
|
||||||
|
# as user task code?
|
||||||
|
#
|
||||||
# if actor.name == 'brokerd.ib':
|
# if actor.name == 'brokerd.ib':
|
||||||
# with CancelScope(shield=True):
|
# with CancelScope(shield=True):
|
||||||
# await _debug.breakpoint()
|
# await _debug.breakpoint()
|
||||||
|
@ -1856,9 +1907,9 @@ async def async_main(
|
||||||
failed = True
|
failed = True
|
||||||
|
|
||||||
if failed:
|
if failed:
|
||||||
log.warning(
|
teardown_report += (
|
||||||
f'Failed to unregister {actor.name} from '
|
f'-> Failed to unregister {actor.name} from '
|
||||||
f'registar @ {addr}'
|
f'registar @ {addr}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# Ensure all peers (actors connected to us as clients) are finished
|
# Ensure all peers (actors connected to us as clients) are finished
|
||||||
|
@ -1866,13 +1917,17 @@ async def async_main(
|
||||||
if any(
|
if any(
|
||||||
chan.connected() for chan in chain(*actor._peers.values())
|
chan.connected() for chan in chain(*actor._peers.values())
|
||||||
):
|
):
|
||||||
log.runtime(
|
teardown_report += (
|
||||||
f"Waiting for remaining peers {actor._peers} to clear")
|
f'-> Waiting for remaining peers {actor._peers} to clear..\n'
|
||||||
|
)
|
||||||
|
log.runtime(teardown_report)
|
||||||
with CancelScope(shield=True):
|
with CancelScope(shield=True):
|
||||||
await actor._no_more_peers.wait()
|
await actor._no_more_peers.wait()
|
||||||
log.runtime("All peer channels are complete")
|
|
||||||
|
|
||||||
log.runtime("Runtime completed")
|
teardown_report += ('-> All peer channels are complete\n')
|
||||||
|
|
||||||
|
teardown_report += ('Actor runtime exited')
|
||||||
|
log.info(teardown_report)
|
||||||
|
|
||||||
|
|
||||||
# TODO: rename to `Registry` and move to `._discovery`!
|
# TODO: rename to `Registry` and move to `._discovery`!
|
||||||
|
|
|
@ -149,7 +149,7 @@ async def exhaust_portal(
|
||||||
|
|
||||||
# XXX: streams should never be reaped here since they should
|
# XXX: streams should never be reaped here since they should
|
||||||
# always be established and shutdown using a context manager api
|
# always be established and shutdown using a context manager api
|
||||||
final: Any = await portal.result()
|
final: Any = await portal.wait_for_result()
|
||||||
|
|
||||||
except (
|
except (
|
||||||
Exception,
|
Exception,
|
||||||
|
@ -223,8 +223,8 @@ async def cancel_on_completion(
|
||||||
|
|
||||||
async def hard_kill(
|
async def hard_kill(
|
||||||
proc: trio.Process,
|
proc: trio.Process,
|
||||||
terminate_after: int = 1.6,
|
|
||||||
|
|
||||||
|
terminate_after: int = 1.6,
|
||||||
# NOTE: for mucking with `.pause()`-ing inside the runtime
|
# NOTE: for mucking with `.pause()`-ing inside the runtime
|
||||||
# whilst also hacking on it XD
|
# whilst also hacking on it XD
|
||||||
# terminate_after: int = 99999,
|
# terminate_after: int = 99999,
|
||||||
|
|
|
@ -44,7 +44,7 @@ _runtime_vars: dict[str, Any] = {
|
||||||
'_root_mailbox': (None, None),
|
'_root_mailbox': (None, None),
|
||||||
'_registry_addrs': [],
|
'_registry_addrs': [],
|
||||||
|
|
||||||
# for `breakpoint()` support
|
# for `tractor.pause_from_sync()` & `breakpoint()` support
|
||||||
'use_greenback': False,
|
'use_greenback': False,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -80,6 +80,7 @@ class ActorNursery:
|
||||||
'''
|
'''
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
# TODO: maybe def these as fields of a struct looking type?
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
ria_nursery: trio.Nursery,
|
ria_nursery: trio.Nursery,
|
||||||
da_nursery: trio.Nursery,
|
da_nursery: trio.Nursery,
|
||||||
|
@ -88,8 +89,10 @@ class ActorNursery:
|
||||||
) -> None:
|
) -> None:
|
||||||
# self.supervisor = supervisor # TODO
|
# self.supervisor = supervisor # TODO
|
||||||
self._actor: Actor = actor
|
self._actor: Actor = actor
|
||||||
self._ria_nursery = ria_nursery
|
|
||||||
|
# TODO: rename to `._tn` for our conventional "task-nursery"
|
||||||
self._da_nursery = da_nursery
|
self._da_nursery = da_nursery
|
||||||
|
|
||||||
self._children: dict[
|
self._children: dict[
|
||||||
tuple[str, str],
|
tuple[str, str],
|
||||||
tuple[
|
tuple[
|
||||||
|
@ -98,15 +101,13 @@ class ActorNursery:
|
||||||
Portal | None,
|
Portal | None,
|
||||||
]
|
]
|
||||||
] = {}
|
] = {}
|
||||||
# portals spawned with ``run_in_actor()`` are
|
|
||||||
# cancelled when their "main" result arrives
|
|
||||||
self._cancel_after_result_on_exit: set = set()
|
|
||||||
self.cancelled: bool = False
|
self.cancelled: bool = False
|
||||||
self._join_procs = trio.Event()
|
self._join_procs = trio.Event()
|
||||||
self._at_least_one_child_in_debug: bool = False
|
self._at_least_one_child_in_debug: bool = False
|
||||||
self.errors = errors
|
self.errors = errors
|
||||||
self.exited = trio.Event()
|
|
||||||
self._scope_error: BaseException|None = None
|
self._scope_error: BaseException|None = None
|
||||||
|
self.exited = trio.Event()
|
||||||
|
|
||||||
# NOTE: when no explicit call is made to
|
# NOTE: when no explicit call is made to
|
||||||
# `.open_root_actor()` by application code,
|
# `.open_root_actor()` by application code,
|
||||||
|
@ -116,6 +117,13 @@ class ActorNursery:
|
||||||
# and syncing purposes to any actor opened nurseries.
|
# and syncing purposes to any actor opened nurseries.
|
||||||
self._implicit_runtime_started: bool = False
|
self._implicit_runtime_started: bool = False
|
||||||
|
|
||||||
|
# TODO: remove the `.run_in_actor()` API and thus this 2ndary
|
||||||
|
# nursery when that API get's moved outside this primitive!
|
||||||
|
self._ria_nursery = ria_nursery
|
||||||
|
# portals spawned with ``run_in_actor()`` are
|
||||||
|
# cancelled when their "main" result arrives
|
||||||
|
self._cancel_after_result_on_exit: set = set()
|
||||||
|
|
||||||
async def start_actor(
|
async def start_actor(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
@ -126,10 +134,14 @@ class ActorNursery:
|
||||||
rpc_module_paths: list[str]|None = None,
|
rpc_module_paths: list[str]|None = None,
|
||||||
enable_modules: list[str]|None = None,
|
enable_modules: list[str]|None = None,
|
||||||
loglevel: str|None = None, # set log level per subactor
|
loglevel: str|None = None, # set log level per subactor
|
||||||
nursery: trio.Nursery|None = None,
|
|
||||||
debug_mode: bool|None = None,
|
debug_mode: bool|None = None,
|
||||||
infect_asyncio: bool = False,
|
infect_asyncio: bool = False,
|
||||||
|
|
||||||
|
# TODO: ideally we can rm this once we no longer have
|
||||||
|
# a `._ria_nursery` since the dependent APIs have been
|
||||||
|
# removed!
|
||||||
|
nursery: trio.Nursery|None = None,
|
||||||
|
|
||||||
) -> Portal:
|
) -> Portal:
|
||||||
'''
|
'''
|
||||||
Start a (daemon) actor: an process that has no designated
|
Start a (daemon) actor: an process that has no designated
|
||||||
|
@ -200,6 +212,7 @@ class ActorNursery:
|
||||||
# |_ dynamic @context decoration on child side
|
# |_ dynamic @context decoration on child side
|
||||||
# |_ implicit `Portal.open_context() as (ctx, first):`
|
# |_ implicit `Portal.open_context() as (ctx, first):`
|
||||||
# and `return first` on parent side.
|
# and `return first` on parent side.
|
||||||
|
# |_ mention how it's similar to `trio-parallel` API?
|
||||||
# -[ ] use @api_frame on the wrapper
|
# -[ ] use @api_frame on the wrapper
|
||||||
async def run_in_actor(
|
async def run_in_actor(
|
||||||
self,
|
self,
|
||||||
|
@ -269,11 +282,14 @@ class ActorNursery:
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Cancel this nursery by instructing each subactor to cancel
|
Cancel this actor-nursery by instructing each subactor's
|
||||||
itself and wait for all subactors to terminate.
|
runtime to cancel and wait for all underlying sub-processes
|
||||||
|
to terminate.
|
||||||
|
|
||||||
If ``hard_killl`` is set to ``True`` then kill the processes
|
If `hard_kill` is set then kill the processes directly using
|
||||||
directly without any far end graceful ``trio`` cancellation.
|
the spawning-backend's API/OS-machinery without any attempt
|
||||||
|
at (graceful) `trio`-style cancellation using our
|
||||||
|
`Actor.cancel()`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__runtimeframe__: int = 1 # noqa
|
__runtimeframe__: int = 1 # noqa
|
||||||
|
@ -629,8 +645,12 @@ async def open_nursery(
|
||||||
f'|_{an}\n'
|
f'|_{an}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# shutdown runtime if it was started
|
|
||||||
if implicit_runtime:
|
if implicit_runtime:
|
||||||
|
# shutdown runtime if it was started and report noisly
|
||||||
|
# that we're did so.
|
||||||
msg += '=> Shutting down actor runtime <=\n'
|
msg += '=> Shutting down actor runtime <=\n'
|
||||||
|
log.info(msg)
|
||||||
|
|
||||||
log.info(msg)
|
else:
|
||||||
|
# keep noise low during std operation.
|
||||||
|
log.runtime(msg)
|
||||||
|
|
|
@ -29,6 +29,7 @@ from ._debug import (
|
||||||
shield_sigint_handler as shield_sigint_handler,
|
shield_sigint_handler as shield_sigint_handler,
|
||||||
open_crash_handler as open_crash_handler,
|
open_crash_handler as open_crash_handler,
|
||||||
maybe_open_crash_handler as maybe_open_crash_handler,
|
maybe_open_crash_handler as maybe_open_crash_handler,
|
||||||
|
maybe_init_greenback as maybe_init_greenback,
|
||||||
post_mortem as post_mortem,
|
post_mortem as post_mortem,
|
||||||
mk_pdb as mk_pdb,
|
mk_pdb as mk_pdb,
|
||||||
)
|
)
|
||||||
|
|
|
@ -69,6 +69,7 @@ from trio import (
|
||||||
import tractor
|
import tractor
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
from tractor._context import Context
|
from tractor._context import Context
|
||||||
|
from tractor import _state
|
||||||
from tractor._state import (
|
from tractor._state import (
|
||||||
current_actor,
|
current_actor,
|
||||||
is_root_process,
|
is_root_process,
|
||||||
|
@ -87,9 +88,6 @@ if TYPE_CHECKING:
|
||||||
from tractor._runtime import (
|
from tractor._runtime import (
|
||||||
Actor,
|
Actor,
|
||||||
)
|
)
|
||||||
from tractor.msg import (
|
|
||||||
_codec,
|
|
||||||
)
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
@ -1599,12 +1597,16 @@ async def _pause(
|
||||||
try:
|
try:
|
||||||
task: Task = current_task()
|
task: Task = current_task()
|
||||||
except RuntimeError as rte:
|
except RuntimeError as rte:
|
||||||
log.exception('Failed to get current task?')
|
__tracebackhide__: bool = False
|
||||||
if actor.is_infected_aio():
|
log.exception(
|
||||||
raise RuntimeError(
|
'Failed to get current `trio`-task?'
|
||||||
'`tractor.pause[_from_sync]()` not yet supported '
|
)
|
||||||
'for infected `asyncio` mode!'
|
# if actor.is_infected_aio():
|
||||||
) from rte
|
# mk_pdb().set_trace()
|
||||||
|
# raise RuntimeError(
|
||||||
|
# '`tractor.pause[_from_sync]()` not yet supported '
|
||||||
|
# 'directly (infected) `asyncio` tasks!'
|
||||||
|
# ) from rte
|
||||||
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -2163,22 +2165,22 @@ def maybe_import_greenback(
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
async def maybe_init_greenback(
|
async def maybe_init_greenback(**kwargs) -> None|ModuleType:
|
||||||
**kwargs,
|
try:
|
||||||
) -> None|ModuleType:
|
if mod := maybe_import_greenback(**kwargs):
|
||||||
|
await mod.ensure_portal()
|
||||||
if mod := maybe_import_greenback(**kwargs):
|
log.devx(
|
||||||
await mod.ensure_portal()
|
'`greenback` portal opened!\n'
|
||||||
log.devx(
|
'Sync debug support activated!\n'
|
||||||
'`greenback` portal opened!\n'
|
)
|
||||||
'Sync debug support activated!\n'
|
return mod
|
||||||
)
|
except BaseException:
|
||||||
return mod
|
log.exception('Failed to init `greenback`..')
|
||||||
|
raise
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def _pause_from_bg_root_thread(
|
async def _pause_from_bg_root_thread(
|
||||||
behalf_of_thread: Thread,
|
behalf_of_thread: Thread,
|
||||||
repl: PdbREPL,
|
repl: PdbREPL,
|
||||||
|
@ -2324,6 +2326,12 @@ def pause_from_sync(
|
||||||
|
|
||||||
# TODO: once supported, remove this AND the one
|
# TODO: once supported, remove this AND the one
|
||||||
# inside `._pause()`!
|
# inside `._pause()`!
|
||||||
|
# outstanding impl fixes:
|
||||||
|
# -[ ] need to make `.shield_sigint()` below work here!
|
||||||
|
# -[ ] how to handle `asyncio`'s new SIGINT-handler
|
||||||
|
# injection?
|
||||||
|
# -[ ] should `breakpoint()` work and what does it normally
|
||||||
|
# do in `asyncio` ctxs?
|
||||||
if actor.is_infected_aio():
|
if actor.is_infected_aio():
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
'`tractor.pause[_from_sync]()` not yet supported '
|
'`tractor.pause[_from_sync]()` not yet supported '
|
||||||
|
@ -2399,18 +2407,37 @@ def pause_from_sync(
|
||||||
else: # we are presumably the `trio.run()` + main thread
|
else: # we are presumably the `trio.run()` + main thread
|
||||||
# raises on not-found by default
|
# raises on not-found by default
|
||||||
greenback: ModuleType = maybe_import_greenback()
|
greenback: ModuleType = maybe_import_greenback()
|
||||||
|
|
||||||
|
# TODO: how to ensure this is either dynamically (if
|
||||||
|
# needed) called here (in some bg tn??) or that the
|
||||||
|
# subactor always already called it?
|
||||||
|
# greenback: ModuleType = await maybe_init_greenback()
|
||||||
|
|
||||||
message += f'-> imported {greenback}\n'
|
message += f'-> imported {greenback}\n'
|
||||||
repl_owner: Task = current_task()
|
repl_owner: Task = current_task()
|
||||||
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
|
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
|
||||||
out = greenback.await_(
|
try:
|
||||||
_pause(
|
out = greenback.await_(
|
||||||
debug_func=None,
|
_pause(
|
||||||
repl=repl,
|
debug_func=None,
|
||||||
hide_tb=hide_tb,
|
repl=repl,
|
||||||
called_from_sync=True,
|
hide_tb=hide_tb,
|
||||||
**_pause_kwargs,
|
called_from_sync=True,
|
||||||
|
**_pause_kwargs,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
except RuntimeError as rte:
|
||||||
|
if not _state._runtime_vars.get(
|
||||||
|
'use_greenback',
|
||||||
|
False,
|
||||||
|
):
|
||||||
|
raise RuntimeError(
|
||||||
|
'`greenback` was never initialized in this actor!?\n\n'
|
||||||
|
f'{_state._runtime_vars}\n'
|
||||||
|
) from rte
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
if out:
|
if out:
|
||||||
bg_task, repl = out
|
bg_task, repl = out
|
||||||
assert repl is repl
|
assert repl is repl
|
||||||
|
@ -2801,10 +2828,10 @@ def open_crash_handler(
|
||||||
`trio.run()`.
|
`trio.run()`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
err: BaseException
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
except tuple(catch) as err:
|
except tuple(catch) as err:
|
||||||
|
|
||||||
if type(err) not in ignore:
|
if type(err) not in ignore:
|
||||||
pdbp.xpm()
|
pdbp.xpm()
|
||||||
|
|
||||||
|
|
|
@ -558,6 +558,8 @@ def run_as_asyncio_guest(
|
||||||
# normally `Actor._async_main()` as is passed by some boostrap
|
# normally `Actor._async_main()` as is passed by some boostrap
|
||||||
# entrypoint like `._entry._trio_main()`.
|
# entrypoint like `._entry._trio_main()`.
|
||||||
|
|
||||||
|
_sigint_loop_pump_delay: float = 0,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# ^-TODO-^ technically whatever `trio_main` returns.. we should
|
# ^-TODO-^ technically whatever `trio_main` returns.. we should
|
||||||
# try to use func-typevar-params at leaast by 3.13!
|
# try to use func-typevar-params at leaast by 3.13!
|
||||||
|
@ -598,7 +600,7 @@ def run_as_asyncio_guest(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
trio_done_fut = asyncio.Future()
|
trio_done_fute = asyncio.Future()
|
||||||
startup_msg: str = (
|
startup_msg: str = (
|
||||||
'Starting `asyncio` guest-loop-run\n'
|
'Starting `asyncio` guest-loop-run\n'
|
||||||
'-> got running loop\n'
|
'-> got running loop\n'
|
||||||
|
@ -633,13 +635,13 @@ def run_as_asyncio_guest(
|
||||||
f'{error}\n\n'
|
f'{error}\n\n'
|
||||||
f'{tb_str}\n'
|
f'{tb_str}\n'
|
||||||
)
|
)
|
||||||
trio_done_fut.set_exception(error)
|
trio_done_fute.set_exception(error)
|
||||||
|
|
||||||
# raise inline
|
# raise inline
|
||||||
main_outcome.unwrap()
|
main_outcome.unwrap()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
trio_done_fut.set_result(main_outcome)
|
trio_done_fute.set_result(main_outcome)
|
||||||
|
|
||||||
startup_msg += (
|
startup_msg += (
|
||||||
f'-> created {trio_done_callback!r}\n'
|
f'-> created {trio_done_callback!r}\n'
|
||||||
|
@ -660,7 +662,7 @@ def run_as_asyncio_guest(
|
||||||
)
|
)
|
||||||
fute_err: BaseException|None = None
|
fute_err: BaseException|None = None
|
||||||
try:
|
try:
|
||||||
out: Outcome = await asyncio.shield(trio_done_fut)
|
out: Outcome = await asyncio.shield(trio_done_fute)
|
||||||
|
|
||||||
# NOTE will raise (via `Error.unwrap()`) from any
|
# NOTE will raise (via `Error.unwrap()`) from any
|
||||||
# exception packed into the guest-run's `main_outcome`.
|
# exception packed into the guest-run's `main_outcome`.
|
||||||
|
@ -697,83 +699,75 @@ def run_as_asyncio_guest(
|
||||||
f' |_{actor}.cancel_soon()\n'
|
f' |_{actor}.cancel_soon()\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: reduce this comment bloc since abandon issues are
|
# XXX WARNING XXX the next LOCs are super important, since
|
||||||
# now solved?
|
# without them, we can get guest-run abandonment cases
|
||||||
|
# where `asyncio` will not schedule or wait on the `trio`
|
||||||
|
# guest-run task before final shutdown! This is
|
||||||
|
# particularly true if the `trio` side has tasks doing
|
||||||
|
# shielded work when a SIGINT condition occurs.
|
||||||
#
|
#
|
||||||
# XXX NOTE XXX the next LOC is super important!!!
|
# We now have the
|
||||||
# => without it, we can get a guest-run abandonment case
|
# `test_infected_asyncio.test_sigint_closes_lifetime_stack()`
|
||||||
# where asyncio will not trigger `trio` in a final event
|
# suite to ensure we do not suffer this issues
|
||||||
# loop cycle!
|
# (hopefully) ever again.
|
||||||
#
|
#
|
||||||
# our test,
|
# The original abandonment issue surfaced as 2 different
|
||||||
# `test_infected_asyncio.test_sigint_closes_lifetime_stack()`
|
# race-condition dependent types scenarios all to do with
|
||||||
# demonstrates how if when we raise a SIGINT-signal in an infected
|
# `asyncio` handling SIGINT from the system:
|
||||||
# child we get a variable race condition outcome where
|
|
||||||
# either of the following can indeterminately happen,
|
|
||||||
#
|
#
|
||||||
# - "silent-abandon": `asyncio` abandons the `trio`
|
# - "silent-abandon" (WORST CASE):
|
||||||
# guest-run task silently and no `trio`-guest-run or
|
# `asyncio` abandons the `trio` guest-run task silently
|
||||||
# `tractor`-actor-runtime teardown happens whatsoever..
|
# and no `trio`-guest-run or `tractor`-actor-runtime
|
||||||
# this is the WORST (race) case outcome.
|
# teardown happens whatsoever..
|
||||||
#
|
#
|
||||||
# - OR, "loud-abandon": the guest run get's abaondoned "loudly" with
|
# - "loud-abandon" (BEST-ish CASE):
|
||||||
# `trio` reporting a console traceback and further tbs of all
|
# the guest run get's abaondoned "loudly" with `trio`
|
||||||
# the failed shutdown routines also show on console..
|
# reporting a console traceback and further tbs of all
|
||||||
|
# the (failed) GC-triggered shutdown routines which
|
||||||
|
# thankfully does get dumped to console..
|
||||||
#
|
#
|
||||||
# our test can thus fail and (has been parametrized for)
|
# The abandonment is most easily reproduced if the `trio`
|
||||||
# the 2 cases:
|
# side has tasks doing shielded work where those tasks
|
||||||
|
# ignore the normal `Cancelled` condition and continue to
|
||||||
|
# run, but obviously `asyncio` isn't aware of this and at
|
||||||
|
# some point bails on the guest-run unless we take manual
|
||||||
|
# intervention..
|
||||||
#
|
#
|
||||||
# - when the parent raises a KBI just after
|
# To repeat, *WITHOUT THIS* stuff below the guest-run can
|
||||||
# signalling the child,
|
# get race-conditionally abandoned!!
|
||||||
# |_silent-abandon => the `Actor.lifetime_stack` will
|
|
||||||
# never be closed thus leaking a resource!
|
|
||||||
# -> FAIL!
|
|
||||||
# |_loud-abandon => despite the abandonment at least the
|
|
||||||
# stack will be closed out..
|
|
||||||
# -> PASS
|
|
||||||
#
|
#
|
||||||
# - when the parent instead simply waits on `ctx.wait_for_result()`
|
# XXX SOLUTION XXX
|
||||||
# (i.e. DOES not raise a KBI itself),
|
# ------ - ------
|
||||||
# |_silent-abandon => test will just hang and thus the ctx
|
# XXX FIRST PART:
|
||||||
# and actor will never be closed/cancelled/shutdown
|
# ------ - ------
|
||||||
# resulting in leaking a (file) resource since the
|
# the obvious fix to the "silent-abandon" case is to
|
||||||
# `trio`/`tractor` runtime never relays a ctxc back to
|
# explicitly cancel the actor runtime such that no
|
||||||
# the parent; the test's timeout will trigger..
|
# runtime tasks are even left unaware that the guest-run
|
||||||
# -> FAIL!
|
# should be terminated due to OS cancellation.
|
||||||
# |_loud-abandon => this case seems to never happen??
|
|
||||||
#
|
#
|
||||||
# XXX FIRST PART XXX, SO, this is a fix to the
|
|
||||||
# "silent-abandon" case, NOT the `trio`-guest-run
|
|
||||||
# abandonment issue in general, for which the NEXT LOC
|
|
||||||
# is apparently a working fix!
|
|
||||||
actor.cancel_soon()
|
actor.cancel_soon()
|
||||||
|
|
||||||
# XXX NOTE XXX pump the `asyncio` event-loop to allow
|
# ------ - ------
|
||||||
|
# XXX SECOND PART:
|
||||||
|
# ------ - ------
|
||||||
|
# Pump the `asyncio` event-loop to allow
|
||||||
# `trio`-side to `trio`-guest-run to complete and
|
# `trio`-side to `trio`-guest-run to complete and
|
||||||
# teardown !!
|
# teardown !!
|
||||||
#
|
#
|
||||||
# *WITHOUT THIS* the guest-run can get race-conditionally abandoned!!
|
# oh `asyncio`, how i don't miss you at all XD
|
||||||
# XD
|
while not trio_done_fute.done():
|
||||||
#
|
|
||||||
await asyncio.sleep(.1) # `delay` can't be 0 either XD
|
|
||||||
while not trio_done_fut.done():
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Waiting on main guest-run `asyncio` task to complete..\n'
|
'Waiting on main guest-run `asyncio` task to complete..\n'
|
||||||
f'|_trio_done_fut: {trio_done_fut}\n'
|
f'|_trio_done_fut: {trio_done_fute}\n'
|
||||||
)
|
)
|
||||||
await asyncio.sleep(.1)
|
await asyncio.sleep(_sigint_loop_pump_delay)
|
||||||
|
|
||||||
# XXX: don't actually need the shield.. seems to
|
# XXX is there any alt API/approach like the internal
|
||||||
# make no difference (??) and we know it spawns an
|
# call below but that doesn't block indefinitely..?
|
||||||
# internal task..
|
|
||||||
# await asyncio.shield(asyncio.sleep(.1))
|
|
||||||
|
|
||||||
# XXX alt approach but can block indefinitely..
|
|
||||||
# so don't use?
|
|
||||||
# loop._run_once()
|
# loop._run_once()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
return trio_done_fut.result()
|
return trio_done_fute.result()
|
||||||
except asyncio.exceptions.InvalidStateError as state_err:
|
except asyncio.exceptions.InvalidStateError as state_err:
|
||||||
|
|
||||||
# XXX be super dupere noisy about abandonment issues!
|
# XXX be super dupere noisy about abandonment issues!
|
||||||
|
|
Loading…
Reference in New Issue