|
|
@ -15,7 +15,10 @@
|
|
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
Actor primitives and helpers
|
|
|
|
The fundamental core machinery implementing every "actor" including
|
|
|
|
|
|
|
|
the process-local (python-interpreter global) `Actor` state-type
|
|
|
|
|
|
|
|
primitive(s), RPC-in-task scheduling, and IPC connectivity and
|
|
|
|
|
|
|
|
low-level transport msg handling.
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from __future__ import annotations
|
|
|
@ -41,8 +44,14 @@ import warnings
|
|
|
|
|
|
|
|
|
|
|
|
from async_generator import aclosing
|
|
|
|
from async_generator import aclosing
|
|
|
|
from exceptiongroup import BaseExceptionGroup
|
|
|
|
from exceptiongroup import BaseExceptionGroup
|
|
|
|
import trio # type: ignore
|
|
|
|
import trio
|
|
|
|
from trio_typing import TaskStatus
|
|
|
|
from trio import (
|
|
|
|
|
|
|
|
CancelScope,
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
from trio_typing import (
|
|
|
|
|
|
|
|
Nursery,
|
|
|
|
|
|
|
|
TaskStatus,
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
from ._ipc import Channel
|
|
|
|
from ._ipc import Channel
|
|
|
|
from ._context import (
|
|
|
|
from ._context import (
|
|
|
@ -90,10 +99,9 @@ async def _invoke(
|
|
|
|
connected IPC channel.
|
|
|
|
connected IPC channel.
|
|
|
|
|
|
|
|
|
|
|
|
This is the core "RPC" `trio.Task` scheduling machinery used to start every
|
|
|
|
This is the core "RPC" `trio.Task` scheduling machinery used to start every
|
|
|
|
remotely invoked function, normally in `Actor._service_n: trio.Nursery`.
|
|
|
|
remotely invoked function, normally in `Actor._service_n: Nursery`.
|
|
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
'''
|
|
|
|
__tracebackhide__: bool = True
|
|
|
|
|
|
|
|
treat_as_gen: bool = False
|
|
|
|
treat_as_gen: bool = False
|
|
|
|
failed_resp: bool = False
|
|
|
|
failed_resp: bool = False
|
|
|
|
|
|
|
|
|
|
|
@ -110,9 +118,9 @@ async def _invoke(
|
|
|
|
# possibly a traceback (not sure what typing is for this..)
|
|
|
|
# possibly a traceback (not sure what typing is for this..)
|
|
|
|
tb = None
|
|
|
|
tb = None
|
|
|
|
|
|
|
|
|
|
|
|
cancel_scope = trio.CancelScope()
|
|
|
|
cancel_scope = CancelScope()
|
|
|
|
# activated cancel scope ref
|
|
|
|
# activated cancel scope ref
|
|
|
|
cs: trio.CancelScope | None = None
|
|
|
|
cs: CancelScope | None = None
|
|
|
|
|
|
|
|
|
|
|
|
ctx = actor.get_context(
|
|
|
|
ctx = actor.get_context(
|
|
|
|
chan,
|
|
|
|
chan,
|
|
|
@ -124,6 +132,7 @@ async def _invoke(
|
|
|
|
)
|
|
|
|
)
|
|
|
|
context: bool = False
|
|
|
|
context: bool = False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# TODO: deprecate this style..
|
|
|
|
if getattr(func, '_tractor_stream_function', False):
|
|
|
|
if getattr(func, '_tractor_stream_function', False):
|
|
|
|
# handle decorated ``@tractor.stream`` async functions
|
|
|
|
# handle decorated ``@tractor.stream`` async functions
|
|
|
|
sig = inspect.signature(func)
|
|
|
|
sig = inspect.signature(func)
|
|
|
@ -165,6 +174,7 @@ async def _invoke(
|
|
|
|
except TypeError:
|
|
|
|
except TypeError:
|
|
|
|
raise
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# TODO: can we unify this with the `context=True` impl below?
|
|
|
|
if inspect.isasyncgen(coro):
|
|
|
|
if inspect.isasyncgen(coro):
|
|
|
|
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
|
|
|
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
|
|
|
# XXX: massive gotcha! If the containing scope
|
|
|
|
# XXX: massive gotcha! If the containing scope
|
|
|
@ -195,6 +205,7 @@ async def _invoke(
|
|
|
|
await chan.send({'stop': True, 'cid': cid})
|
|
|
|
await chan.send({'stop': True, 'cid': cid})
|
|
|
|
|
|
|
|
|
|
|
|
# one way @stream func that gets treated like an async gen
|
|
|
|
# one way @stream func that gets treated like an async gen
|
|
|
|
|
|
|
|
# TODO: can we unify this with the `context=True` impl below?
|
|
|
|
elif treat_as_gen:
|
|
|
|
elif treat_as_gen:
|
|
|
|
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
|
|
|
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
|
|
|
# XXX: the async-func may spawn further tasks which push
|
|
|
|
# XXX: the async-func may spawn further tasks which push
|
|
|
@ -211,8 +222,20 @@ async def _invoke(
|
|
|
|
# far end async gen to tear down
|
|
|
|
# far end async gen to tear down
|
|
|
|
await chan.send({'stop': True, 'cid': cid})
|
|
|
|
await chan.send({'stop': True, 'cid': cid})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# our most general case: a remote SC-transitive,
|
|
|
|
|
|
|
|
# IPC-linked, cross-actor-task "context"
|
|
|
|
|
|
|
|
# ------ - ------
|
|
|
|
# TODO: every other "func type" should be implemented from
|
|
|
|
# TODO: every other "func type" should be implemented from
|
|
|
|
# a special case of a context eventually!
|
|
|
|
# a special case of this impl eventually!
|
|
|
|
|
|
|
|
# -[ ] streaming funcs should instead of being async-for
|
|
|
|
|
|
|
|
# handled directly here wrapped in
|
|
|
|
|
|
|
|
# a async-with-open_stream() closure that does the
|
|
|
|
|
|
|
|
# normal thing you'd expect a far end streaming context
|
|
|
|
|
|
|
|
# to (if written by the app-dev).
|
|
|
|
|
|
|
|
# -[ ] one off async funcs can literally just be called
|
|
|
|
|
|
|
|
# here and awaited directly, possibly just with a small
|
|
|
|
|
|
|
|
# wrapper that calls `Context.started()` and then does
|
|
|
|
|
|
|
|
# the `await coro()`?
|
|
|
|
elif context:
|
|
|
|
elif context:
|
|
|
|
# context func with support for bi-dir streaming
|
|
|
|
# context func with support for bi-dir streaming
|
|
|
|
await chan.send({'functype': 'context', 'cid': cid})
|
|
|
|
await chan.send({'functype': 'context', 'cid': cid})
|
|
|
@ -273,11 +296,12 @@ async def _invoke(
|
|
|
|
ctx._maybe_raise_remote_err(re)
|
|
|
|
ctx._maybe_raise_remote_err(re)
|
|
|
|
|
|
|
|
|
|
|
|
fname: str = func.__name__
|
|
|
|
fname: str = func.__name__
|
|
|
|
cs: trio.CancelScope = ctx._scope
|
|
|
|
cs: CancelScope = ctx._scope
|
|
|
|
if cs.cancel_called:
|
|
|
|
if cs.cancel_called:
|
|
|
|
|
|
|
|
our_uid: tuple = actor.uid
|
|
|
|
canceller: tuple = ctx.canceller
|
|
|
|
canceller: tuple = ctx.canceller
|
|
|
|
msg: str = (
|
|
|
|
msg: str = (
|
|
|
|
f'`{fname}()`@{actor.uid} cancelled by '
|
|
|
|
f'`{fname}()`@{our_uid} cancelled by '
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# NOTE / TODO: if we end up having
|
|
|
|
# NOTE / TODO: if we end up having
|
|
|
@ -286,6 +310,8 @@ async def _invoke(
|
|
|
|
# need to change this logic branch since it
|
|
|
|
# need to change this logic branch since it
|
|
|
|
# will always enter..
|
|
|
|
# will always enter..
|
|
|
|
if ctx._cancel_called:
|
|
|
|
if ctx._cancel_called:
|
|
|
|
|
|
|
|
# TODO: test for this!!!!!
|
|
|
|
|
|
|
|
canceller: tuple = our_uid
|
|
|
|
msg += 'itself '
|
|
|
|
msg += 'itself '
|
|
|
|
|
|
|
|
|
|
|
|
# if the channel which spawned the ctx is the
|
|
|
|
# if the channel which spawned the ctx is the
|
|
|
@ -318,40 +344,76 @@ async def _invoke(
|
|
|
|
canceller=canceller,
|
|
|
|
canceller=canceller,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# regular async function
|
|
|
|
# regular async function/method
|
|
|
|
|
|
|
|
# XXX: possibly just a scheduled `Actor._cancel_task()`
|
|
|
|
|
|
|
|
# from a remote request to cancel some `Context`.
|
|
|
|
|
|
|
|
# ------ - ------
|
|
|
|
|
|
|
|
# TODO: ideally we unify this with the above `context=True`
|
|
|
|
|
|
|
|
# block such that for any remote invocation ftype, we
|
|
|
|
|
|
|
|
# always invoke the far end RPC task scheduling the same
|
|
|
|
|
|
|
|
# way: using the linked IPC context machinery.
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
await chan.send({
|
|
|
|
await chan.send({
|
|
|
|
'functype': 'asyncfunc',
|
|
|
|
'functype': 'asyncfunc',
|
|
|
|
'cid': cid
|
|
|
|
'cid': cid
|
|
|
|
})
|
|
|
|
})
|
|
|
|
except trio.BrokenResourceError:
|
|
|
|
except (
|
|
|
|
|
|
|
|
trio.ClosedResourceError,
|
|
|
|
|
|
|
|
trio.BrokenResourceError,
|
|
|
|
|
|
|
|
BrokenPipeError,
|
|
|
|
|
|
|
|
) as ipc_err:
|
|
|
|
failed_resp = True
|
|
|
|
failed_resp = True
|
|
|
|
if is_rpc:
|
|
|
|
if is_rpc:
|
|
|
|
raise
|
|
|
|
raise
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
|
|
|
|
# TODO: should this be an `.exception()` call?
|
|
|
|
log.warning(
|
|
|
|
log.warning(
|
|
|
|
f'Failed to respond to non-rpc request: {func}'
|
|
|
|
f'Failed to respond to non-rpc request: {func}\n'
|
|
|
|
|
|
|
|
f'{ipc_err}'
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
with cancel_scope as cs:
|
|
|
|
with cancel_scope as cs:
|
|
|
|
ctx._scope = cs
|
|
|
|
ctx._scope: CancelScope = cs
|
|
|
|
task_status.started(ctx)
|
|
|
|
task_status.started(ctx)
|
|
|
|
result = await coro
|
|
|
|
result = await coro
|
|
|
|
fname: str = func.__name__
|
|
|
|
fname: str = func.__name__
|
|
|
|
log.runtime(f'{fname}() result: {result}')
|
|
|
|
log.runtime(f'{fname}() result: {result}')
|
|
|
|
if not failed_resp:
|
|
|
|
|
|
|
|
# only send result if we know IPC isn't down
|
|
|
|
# NOTE: only send result if we know IPC isn't down
|
|
|
|
|
|
|
|
if (
|
|
|
|
|
|
|
|
not failed_resp
|
|
|
|
|
|
|
|
and chan.connected()
|
|
|
|
|
|
|
|
):
|
|
|
|
|
|
|
|
try:
|
|
|
|
await chan.send(
|
|
|
|
await chan.send(
|
|
|
|
{'return': result,
|
|
|
|
{'return': result,
|
|
|
|
'cid': cid}
|
|
|
|
'cid': cid}
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
except (
|
|
|
|
|
|
|
|
BrokenPipeError,
|
|
|
|
|
|
|
|
trio.BrokenResourceError,
|
|
|
|
|
|
|
|
):
|
|
|
|
|
|
|
|
log.warning(
|
|
|
|
|
|
|
|
'Failed to return result:\n'
|
|
|
|
|
|
|
|
f'{func}@{actor.uid}\n'
|
|
|
|
|
|
|
|
f'remote chan: {chan.uid}'
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
except (
|
|
|
|
except (
|
|
|
|
Exception,
|
|
|
|
Exception,
|
|
|
|
BaseExceptionGroup,
|
|
|
|
BaseExceptionGroup,
|
|
|
|
) as err:
|
|
|
|
) as err:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# always hide this frame from debug REPL if the crash
|
|
|
|
|
|
|
|
# originated from an rpc task and we DID NOT fail
|
|
|
|
|
|
|
|
# due to an IPC transport error!
|
|
|
|
|
|
|
|
if (
|
|
|
|
|
|
|
|
is_rpc
|
|
|
|
|
|
|
|
and chan.connected()
|
|
|
|
|
|
|
|
):
|
|
|
|
|
|
|
|
__tracebackhide__: bool = True
|
|
|
|
|
|
|
|
|
|
|
|
if not is_multi_cancelled(err):
|
|
|
|
if not is_multi_cancelled(err):
|
|
|
|
|
|
|
|
|
|
|
|
# TODO: maybe we'll want different "levels" of debugging
|
|
|
|
# TODO: maybe we'll want different "levels" of debugging
|
|
|
@ -385,9 +447,13 @@ async def _invoke(
|
|
|
|
log.exception("Actor crashed:")
|
|
|
|
log.exception("Actor crashed:")
|
|
|
|
|
|
|
|
|
|
|
|
# always ship errors back to caller
|
|
|
|
# always ship errors back to caller
|
|
|
|
err_msg = pack_error(err, tb=tb)
|
|
|
|
err_msg: dict[str, dict] = pack_error(
|
|
|
|
|
|
|
|
err,
|
|
|
|
|
|
|
|
tb=tb,
|
|
|
|
|
|
|
|
)
|
|
|
|
err_msg['cid'] = cid
|
|
|
|
err_msg['cid'] = cid
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if is_rpc:
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
await chan.send(err_msg)
|
|
|
|
await chan.send(err_msg)
|
|
|
|
|
|
|
|
|
|
|
@ -398,10 +464,13 @@ async def _invoke(
|
|
|
|
trio.ClosedResourceError,
|
|
|
|
trio.ClosedResourceError,
|
|
|
|
trio.BrokenResourceError,
|
|
|
|
trio.BrokenResourceError,
|
|
|
|
BrokenPipeError,
|
|
|
|
BrokenPipeError,
|
|
|
|
):
|
|
|
|
) as ipc_err:
|
|
|
|
|
|
|
|
|
|
|
|
# if we can't propagate the error that's a big boo boo
|
|
|
|
# if we can't propagate the error that's a big boo boo
|
|
|
|
log.exception(
|
|
|
|
log.exception(
|
|
|
|
f"Failed to ship error to caller @ {chan.uid} !?"
|
|
|
|
f"Failed to ship error to caller @ {chan.uid} !?\n"
|
|
|
|
|
|
|
|
f'{ipc_err}'
|
|
|
|
|
|
|
|
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# error is probably from above coro running code *not from the
|
|
|
|
# error is probably from above coro running code *not from the
|
|
|
@ -428,7 +497,11 @@ async def _invoke(
|
|
|
|
log.warning(
|
|
|
|
log.warning(
|
|
|
|
f"Task {func} likely errored or cancelled before start")
|
|
|
|
f"Task {func} likely errored or cancelled before start")
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
log.cancel(f'{func.__name__}({kwargs}) failed?')
|
|
|
|
log.cancel(
|
|
|
|
|
|
|
|
'Failed to de-alloc internal task!?\n'
|
|
|
|
|
|
|
|
f'cid: {cid}\n'
|
|
|
|
|
|
|
|
f'{func.__name__}({kwargs})'
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
finally:
|
|
|
|
if not actor._rpc_tasks:
|
|
|
|
if not actor._rpc_tasks:
|
|
|
@ -445,7 +518,7 @@ async def try_ship_error_to_parent(
|
|
|
|
err: Exception | BaseExceptionGroup,
|
|
|
|
err: Exception | BaseExceptionGroup,
|
|
|
|
|
|
|
|
|
|
|
|
) -> None:
|
|
|
|
) -> None:
|
|
|
|
with trio.CancelScope(shield=True):
|
|
|
|
with CancelScope(shield=True):
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
# internal error so ship to parent without cid
|
|
|
|
# internal error so ship to parent without cid
|
|
|
|
await channel.send(pack_error(err))
|
|
|
|
await channel.send(pack_error(err))
|
|
|
@ -497,13 +570,13 @@ class Actor:
|
|
|
|
msg_buffer_size: int = 2**6
|
|
|
|
msg_buffer_size: int = 2**6
|
|
|
|
|
|
|
|
|
|
|
|
# nursery placeholders filled in by `async_main()` after fork
|
|
|
|
# nursery placeholders filled in by `async_main()` after fork
|
|
|
|
_root_n: trio.Nursery | None = None
|
|
|
|
_root_n: Nursery | None = None
|
|
|
|
_service_n: trio.Nursery | None = None
|
|
|
|
_service_n: Nursery | None = None
|
|
|
|
_server_n: trio.Nursery | None = None
|
|
|
|
_server_n: Nursery | None = None
|
|
|
|
|
|
|
|
|
|
|
|
# Information about `__main__` from parent
|
|
|
|
# Information about `__main__` from parent
|
|
|
|
_parent_main_data: dict[str, str]
|
|
|
|
_parent_main_data: dict[str, str]
|
|
|
|
_parent_chan_cs: trio.CancelScope | None = None
|
|
|
|
_parent_chan_cs: CancelScope | None = None
|
|
|
|
|
|
|
|
|
|
|
|
# syncs for setup/teardown sequences
|
|
|
|
# syncs for setup/teardown sequences
|
|
|
|
_server_down: trio.Event | None = None
|
|
|
|
_server_down: trio.Event | None = None
|
|
|
@ -1096,12 +1169,12 @@ class Actor:
|
|
|
|
|
|
|
|
|
|
|
|
async def _serve_forever(
|
|
|
|
async def _serve_forever(
|
|
|
|
self,
|
|
|
|
self,
|
|
|
|
handler_nursery: trio.Nursery,
|
|
|
|
handler_nursery: Nursery,
|
|
|
|
*,
|
|
|
|
*,
|
|
|
|
# (host, port) to bind for channel server
|
|
|
|
# (host, port) to bind for channel server
|
|
|
|
listen_sockaddrs: list[tuple[str, int]] | None = None,
|
|
|
|
listen_sockaddrs: list[tuple[str, int]] | None = None,
|
|
|
|
|
|
|
|
|
|
|
|
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
|
|
|
|
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
|
|
|
|
) -> None:
|
|
|
|
) -> None:
|
|
|
|
'''
|
|
|
|
'''
|
|
|
|
Start the channel server, begin listening for new connections.
|
|
|
|
Start the channel server, begin listening for new connections.
|
|
|
@ -1188,7 +1261,7 @@ class Actor:
|
|
|
|
self._cancel_called = True
|
|
|
|
self._cancel_called = True
|
|
|
|
|
|
|
|
|
|
|
|
# cancel all ongoing rpc tasks
|
|
|
|
# cancel all ongoing rpc tasks
|
|
|
|
with trio.CancelScope(shield=True):
|
|
|
|
with CancelScope(shield=True):
|
|
|
|
|
|
|
|
|
|
|
|
# kill any debugger request task to avoid deadlock
|
|
|
|
# kill any debugger request task to avoid deadlock
|
|
|
|
# with the root actor in this tree
|
|
|
|
# with the root actor in this tree
|
|
|
@ -1248,7 +1321,7 @@ class Actor:
|
|
|
|
# this ctx based lookup ensures the requested task to
|
|
|
|
# this ctx based lookup ensures the requested task to
|
|
|
|
# be cancelled was indeed spawned by a request from this channel
|
|
|
|
# be cancelled was indeed spawned by a request from this channel
|
|
|
|
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
|
|
|
|
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
|
|
|
|
scope: trio.CancelScope = ctx._scope
|
|
|
|
scope: CancelScope = ctx._scope
|
|
|
|
except KeyError:
|
|
|
|
except KeyError:
|
|
|
|
log.cancel(f"{cid} has already completed/terminated?")
|
|
|
|
log.cancel(f"{cid} has already completed/terminated?")
|
|
|
|
return True
|
|
|
|
return True
|
|
|
@ -1613,7 +1686,7 @@ async def async_main(
|
|
|
|
# block it might be actually possible to debug THIS
|
|
|
|
# block it might be actually possible to debug THIS
|
|
|
|
# machinery in the same way as user task code?
|
|
|
|
# machinery in the same way as user task code?
|
|
|
|
# if actor.name == 'brokerd.ib':
|
|
|
|
# if actor.name == 'brokerd.ib':
|
|
|
|
# with trio.CancelScope(shield=True):
|
|
|
|
# with CancelScope(shield=True):
|
|
|
|
# await _debug.breakpoint()
|
|
|
|
# await _debug.breakpoint()
|
|
|
|
|
|
|
|
|
|
|
|
actor.lifetime_stack.close()
|
|
|
|
actor.lifetime_stack.close()
|
|
|
@ -1655,7 +1728,7 @@ async def async_main(
|
|
|
|
):
|
|
|
|
):
|
|
|
|
log.runtime(
|
|
|
|
log.runtime(
|
|
|
|
f"Waiting for remaining peers {actor._peers} to clear")
|
|
|
|
f"Waiting for remaining peers {actor._peers} to clear")
|
|
|
|
with trio.CancelScope(shield=True):
|
|
|
|
with CancelScope(shield=True):
|
|
|
|
await actor._no_more_peers.wait()
|
|
|
|
await actor._no_more_peers.wait()
|
|
|
|
log.runtime("All peer channels are complete")
|
|
|
|
log.runtime("All peer channels are complete")
|
|
|
|
|
|
|
|
|
|
|
@ -1666,7 +1739,7 @@ async def process_messages(
|
|
|
|
actor: Actor,
|
|
|
|
actor: Actor,
|
|
|
|
chan: Channel,
|
|
|
|
chan: Channel,
|
|
|
|
shield: bool = False,
|
|
|
|
shield: bool = False,
|
|
|
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
|
|
|
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
|
|
|
|
|
|
|
|
|
|
|
|
) -> bool:
|
|
|
|
) -> bool:
|
|
|
|
'''
|
|
|
|
'''
|
|
|
@ -1684,7 +1757,7 @@ async def process_messages(
|
|
|
|
|
|
|
|
|
|
|
|
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
|
|
|
|
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
with trio.CancelScope(shield=shield) as loop_cs:
|
|
|
|
with CancelScope(shield=shield) as loop_cs:
|
|
|
|
# this internal scope allows for keeping this message
|
|
|
|
# this internal scope allows for keeping this message
|
|
|
|
# loop running despite the current task having been
|
|
|
|
# loop running despite the current task having been
|
|
|
|
# cancelled (eg. `open_portal()` may call this method from
|
|
|
|
# cancelled (eg. `open_portal()` may call this method from
|
|
|
@ -1746,18 +1819,18 @@ async def process_messages(
|
|
|
|
|
|
|
|
|
|
|
|
if ns == 'self':
|
|
|
|
if ns == 'self':
|
|
|
|
if funcname == 'cancel':
|
|
|
|
if funcname == 'cancel':
|
|
|
|
func = actor.cancel
|
|
|
|
func: Callable = actor.cancel
|
|
|
|
kwargs['requesting_uid'] = chan.uid
|
|
|
|
kwargs['requesting_uid'] = chan.uid
|
|
|
|
|
|
|
|
|
|
|
|
# don't start entire actor runtime cancellation
|
|
|
|
# don't start entire actor runtime cancellation
|
|
|
|
# if this actor is currently in debug mode!
|
|
|
|
# if this actor is currently in debug mode!
|
|
|
|
pdb_complete = _debug.Lock.local_pdb_complete
|
|
|
|
pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete
|
|
|
|
if pdb_complete:
|
|
|
|
if pdb_complete:
|
|
|
|
await pdb_complete.wait()
|
|
|
|
await pdb_complete.wait()
|
|
|
|
|
|
|
|
|
|
|
|
# we immediately start the runtime machinery
|
|
|
|
# we immediately start the runtime machinery
|
|
|
|
# shutdown
|
|
|
|
# shutdown
|
|
|
|
with trio.CancelScope(shield=True):
|
|
|
|
with CancelScope(shield=True):
|
|
|
|
# actor.cancel() was called so kill this
|
|
|
|
# actor.cancel() was called so kill this
|
|
|
|
# msg loop and break out into
|
|
|
|
# msg loop and break out into
|
|
|
|
# ``async_main()``
|
|
|
|
# ``async_main()``
|
|
|
@ -1785,7 +1858,7 @@ async def process_messages(
|
|
|
|
|
|
|
|
|
|
|
|
# we immediately start the runtime machinery
|
|
|
|
# we immediately start the runtime machinery
|
|
|
|
# shutdown
|
|
|
|
# shutdown
|
|
|
|
# with trio.CancelScope(shield=True):
|
|
|
|
# with CancelScope(shield=True):
|
|
|
|
kwargs['chan'] = chan
|
|
|
|
kwargs['chan'] = chan
|
|
|
|
target_cid = kwargs['cid']
|
|
|
|
target_cid = kwargs['cid']
|
|
|
|
kwargs['requesting_uid'] = chan.uid
|
|
|
|
kwargs['requesting_uid'] = chan.uid
|
|
|
@ -1810,7 +1883,7 @@ async def process_messages(
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
# normally registry methods, eg.
|
|
|
|
# normally registry methods, eg.
|
|
|
|
# ``.register_actor()`` etc.
|
|
|
|
# ``.register_actor()`` etc.
|
|
|
|
func = getattr(actor, funcname)
|
|
|
|
func: Callable = getattr(actor, funcname)
|
|
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
# complain to client about restricted modules
|
|
|
|
# complain to client about restricted modules
|
|
|
@ -1900,9 +1973,10 @@ async def process_messages(
|
|
|
|
Exception,
|
|
|
|
Exception,
|
|
|
|
BaseExceptionGroup,
|
|
|
|
BaseExceptionGroup,
|
|
|
|
) as err:
|
|
|
|
) as err:
|
|
|
|
|
|
|
|
|
|
|
|
if nursery_cancelled_before_task:
|
|
|
|
if nursery_cancelled_before_task:
|
|
|
|
sn = actor._service_n
|
|
|
|
sn: Nursery = actor._service_n
|
|
|
|
assert sn and sn.cancel_scope.cancel_called
|
|
|
|
assert sn and sn.cancel_scope.cancel_called # sanity
|
|
|
|
log.cancel(
|
|
|
|
log.cancel(
|
|
|
|
f'Service nursery cancelled before it handled {funcname}'
|
|
|
|
f'Service nursery cancelled before it handled {funcname}'
|
|
|
|
)
|
|
|
|
)
|
|
|
|