Compare commits
No commits in common. "250275d98d52e8076d2797b8c3fabf5f8098d124" and "3f159235374b960d120c55a1be844cd1434ca48b" have entirely different histories.
250275d98d
...
3f15923537
|
@ -76,18 +76,8 @@ async def get_registry(
|
|||
yield regstr_ptl
|
||||
|
||||
|
||||
|
||||
# TODO: deprecate and this remove _arbiter form!
|
||||
@acm
|
||||
async def get_arbiter(*args, **kwargs):
|
||||
warnings.warn(
|
||||
'`tractor.get_arbiter()` is now deprecated!\n'
|
||||
'Use `.get_registry()` instead!',
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
async with get_registry(*args, **kwargs) as to_yield:
|
||||
yield to_yield
|
||||
# TODO: deprecate and remove _arbiter form
|
||||
get_arbiter = get_registry
|
||||
|
||||
|
||||
@acm
|
||||
|
|
|
@ -15,10 +15,7 @@
|
|||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
The fundamental core machinery implementing every "actor" including
|
||||
the process-local (python-interpreter global) `Actor` state-type
|
||||
primitive(s), RPC-in-task scheduling, and IPC connectivity and
|
||||
low-level transport msg handling.
|
||||
Actor primitives and helpers
|
||||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
@ -44,14 +41,8 @@ import warnings
|
|||
|
||||
from async_generator import aclosing
|
||||
from exceptiongroup import BaseExceptionGroup
|
||||
import trio
|
||||
from trio import (
|
||||
CancelScope,
|
||||
)
|
||||
from trio_typing import (
|
||||
Nursery,
|
||||
TaskStatus,
|
||||
)
|
||||
import trio # type: ignore
|
||||
from trio_typing import TaskStatus
|
||||
|
||||
from ._ipc import Channel
|
||||
from ._context import (
|
||||
|
@ -99,9 +90,10 @@ async def _invoke(
|
|||
connected IPC channel.
|
||||
|
||||
This is the core "RPC" `trio.Task` scheduling machinery used to start every
|
||||
remotely invoked function, normally in `Actor._service_n: Nursery`.
|
||||
remotely invoked function, normally in `Actor._service_n: trio.Nursery`.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = True
|
||||
treat_as_gen: bool = False
|
||||
failed_resp: bool = False
|
||||
|
||||
|
@ -118,9 +110,9 @@ async def _invoke(
|
|||
# possibly a traceback (not sure what typing is for this..)
|
||||
tb = None
|
||||
|
||||
cancel_scope = CancelScope()
|
||||
cancel_scope = trio.CancelScope()
|
||||
# activated cancel scope ref
|
||||
cs: CancelScope | None = None
|
||||
cs: trio.CancelScope | None = None
|
||||
|
||||
ctx = actor.get_context(
|
||||
chan,
|
||||
|
@ -132,7 +124,6 @@ async def _invoke(
|
|||
)
|
||||
context: bool = False
|
||||
|
||||
# TODO: deprecate this style..
|
||||
if getattr(func, '_tractor_stream_function', False):
|
||||
# handle decorated ``@tractor.stream`` async functions
|
||||
sig = inspect.signature(func)
|
||||
|
@ -174,7 +165,6 @@ async def _invoke(
|
|||
except TypeError:
|
||||
raise
|
||||
|
||||
# TODO: can we unify this with the `context=True` impl below?
|
||||
if inspect.isasyncgen(coro):
|
||||
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
||||
# XXX: massive gotcha! If the containing scope
|
||||
|
@ -205,7 +195,6 @@ async def _invoke(
|
|||
await chan.send({'stop': True, 'cid': cid})
|
||||
|
||||
# one way @stream func that gets treated like an async gen
|
||||
# TODO: can we unify this with the `context=True` impl below?
|
||||
elif treat_as_gen:
|
||||
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
||||
# XXX: the async-func may spawn further tasks which push
|
||||
|
@ -222,20 +211,8 @@ async def _invoke(
|
|||
# far end async gen to tear down
|
||||
await chan.send({'stop': True, 'cid': cid})
|
||||
|
||||
# our most general case: a remote SC-transitive,
|
||||
# IPC-linked, cross-actor-task "context"
|
||||
# ------ - ------
|
||||
# TODO: every other "func type" should be implemented from
|
||||
# a special case of this impl eventually!
|
||||
# -[ ] streaming funcs should instead of being async-for
|
||||
# handled directly here wrapped in
|
||||
# a async-with-open_stream() closure that does the
|
||||
# normal thing you'd expect a far end streaming context
|
||||
# to (if written by the app-dev).
|
||||
# -[ ] one off async funcs can literally just be called
|
||||
# here and awaited directly, possibly just with a small
|
||||
# wrapper that calls `Context.started()` and then does
|
||||
# the `await coro()`?
|
||||
# a special case of a context eventually!
|
||||
elif context:
|
||||
# context func with support for bi-dir streaming
|
||||
await chan.send({'functype': 'context', 'cid': cid})
|
||||
|
@ -296,12 +273,11 @@ async def _invoke(
|
|||
ctx._maybe_raise_remote_err(re)
|
||||
|
||||
fname: str = func.__name__
|
||||
cs: CancelScope = ctx._scope
|
||||
cs: trio.CancelScope = ctx._scope
|
||||
if cs.cancel_called:
|
||||
our_uid: tuple = actor.uid
|
||||
canceller: tuple = ctx.canceller
|
||||
msg: str = (
|
||||
f'`{fname}()`@{our_uid} cancelled by '
|
||||
f'`{fname}()`@{actor.uid} cancelled by '
|
||||
)
|
||||
|
||||
# NOTE / TODO: if we end up having
|
||||
|
@ -310,8 +286,6 @@ async def _invoke(
|
|||
# need to change this logic branch since it
|
||||
# will always enter..
|
||||
if ctx._cancel_called:
|
||||
# TODO: test for this!!!!!
|
||||
canceller: tuple = our_uid
|
||||
msg += 'itself '
|
||||
|
||||
# if the channel which spawned the ctx is the
|
||||
|
@ -344,76 +318,40 @@ async def _invoke(
|
|||
canceller=canceller,
|
||||
)
|
||||
|
||||
# regular async function/method
|
||||
# XXX: possibly just a scheduled `Actor._cancel_task()`
|
||||
# from a remote request to cancel some `Context`.
|
||||
# ------ - ------
|
||||
# TODO: ideally we unify this with the above `context=True`
|
||||
# block such that for any remote invocation ftype, we
|
||||
# always invoke the far end RPC task scheduling the same
|
||||
# way: using the linked IPC context machinery.
|
||||
# regular async function
|
||||
else:
|
||||
try:
|
||||
await chan.send({
|
||||
'functype': 'asyncfunc',
|
||||
'cid': cid
|
||||
})
|
||||
except (
|
||||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
BrokenPipeError,
|
||||
) as ipc_err:
|
||||
except trio.BrokenResourceError:
|
||||
failed_resp = True
|
||||
if is_rpc:
|
||||
raise
|
||||
else:
|
||||
# TODO: should this be an `.exception()` call?
|
||||
log.warning(
|
||||
f'Failed to respond to non-rpc request: {func}\n'
|
||||
f'{ipc_err}'
|
||||
f'Failed to respond to non-rpc request: {func}'
|
||||
)
|
||||
|
||||
with cancel_scope as cs:
|
||||
ctx._scope: CancelScope = cs
|
||||
ctx._scope = cs
|
||||
task_status.started(ctx)
|
||||
result = await coro
|
||||
fname: str = func.__name__
|
||||
log.runtime(f'{fname}() result: {result}')
|
||||
|
||||
# NOTE: only send result if we know IPC isn't down
|
||||
if (
|
||||
not failed_resp
|
||||
and chan.connected()
|
||||
):
|
||||
try:
|
||||
await chan.send(
|
||||
{'return': result,
|
||||
'cid': cid}
|
||||
)
|
||||
except (
|
||||
BrokenPipeError,
|
||||
trio.BrokenResourceError,
|
||||
):
|
||||
log.warning(
|
||||
'Failed to return result:\n'
|
||||
f'{func}@{actor.uid}\n'
|
||||
f'remote chan: {chan.uid}'
|
||||
)
|
||||
if not failed_resp:
|
||||
# only send result if we know IPC isn't down
|
||||
await chan.send(
|
||||
{'return': result,
|
||||
'cid': cid}
|
||||
)
|
||||
|
||||
except (
|
||||
Exception,
|
||||
BaseExceptionGroup,
|
||||
) as err:
|
||||
|
||||
# always hide this frame from debug REPL if the crash
|
||||
# originated from an rpc task and we DID NOT fail
|
||||
# due to an IPC transport error!
|
||||
if (
|
||||
is_rpc
|
||||
and chan.connected()
|
||||
):
|
||||
__tracebackhide__: bool = True
|
||||
|
||||
if not is_multi_cancelled(err):
|
||||
|
||||
# TODO: maybe we'll want different "levels" of debugging
|
||||
|
@ -447,31 +385,24 @@ async def _invoke(
|
|||
log.exception("Actor crashed:")
|
||||
|
||||
# always ship errors back to caller
|
||||
err_msg: dict[str, dict] = pack_error(
|
||||
err,
|
||||
tb=tb,
|
||||
)
|
||||
err_msg = pack_error(err, tb=tb)
|
||||
err_msg['cid'] = cid
|
||||
|
||||
if is_rpc:
|
||||
try:
|
||||
await chan.send(err_msg)
|
||||
try:
|
||||
await chan.send(err_msg)
|
||||
|
||||
# TODO: tests for this scenario:
|
||||
# - RPC caller closes connection before getting a response
|
||||
# should **not** crash this actor..
|
||||
except (
|
||||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
BrokenPipeError,
|
||||
) as ipc_err:
|
||||
|
||||
# if we can't propagate the error that's a big boo boo
|
||||
log.exception(
|
||||
f"Failed to ship error to caller @ {chan.uid} !?\n"
|
||||
f'{ipc_err}'
|
||||
|
||||
)
|
||||
# TODO: tests for this scenario:
|
||||
# - RPC caller closes connection before getting a response
|
||||
# should **not** crash this actor..
|
||||
except (
|
||||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
BrokenPipeError,
|
||||
):
|
||||
# if we can't propagate the error that's a big boo boo
|
||||
log.exception(
|
||||
f"Failed to ship error to caller @ {chan.uid} !?"
|
||||
)
|
||||
|
||||
# error is probably from above coro running code *not from the
|
||||
# underlyingn rpc invocation* since a scope was never allocated
|
||||
|
@ -497,11 +428,7 @@ async def _invoke(
|
|||
log.warning(
|
||||
f"Task {func} likely errored or cancelled before start")
|
||||
else:
|
||||
log.cancel(
|
||||
'Failed to de-alloc internal task!?\n'
|
||||
f'cid: {cid}\n'
|
||||
f'{func.__name__}({kwargs})'
|
||||
)
|
||||
log.cancel(f'{func.__name__}({kwargs}) failed?')
|
||||
|
||||
finally:
|
||||
if not actor._rpc_tasks:
|
||||
|
@ -518,7 +445,7 @@ async def try_ship_error_to_parent(
|
|||
err: Exception | BaseExceptionGroup,
|
||||
|
||||
) -> None:
|
||||
with CancelScope(shield=True):
|
||||
with trio.CancelScope(shield=True):
|
||||
try:
|
||||
# internal error so ship to parent without cid
|
||||
await channel.send(pack_error(err))
|
||||
|
@ -570,13 +497,13 @@ class Actor:
|
|||
msg_buffer_size: int = 2**6
|
||||
|
||||
# nursery placeholders filled in by `async_main()` after fork
|
||||
_root_n: Nursery | None = None
|
||||
_service_n: Nursery | None = None
|
||||
_server_n: Nursery | None = None
|
||||
_root_n: trio.Nursery | None = None
|
||||
_service_n: trio.Nursery | None = None
|
||||
_server_n: trio.Nursery | None = None
|
||||
|
||||
# Information about `__main__` from parent
|
||||
_parent_main_data: dict[str, str]
|
||||
_parent_chan_cs: CancelScope | None = None
|
||||
_parent_chan_cs: trio.CancelScope | None = None
|
||||
|
||||
# syncs for setup/teardown sequences
|
||||
_server_down: trio.Event | None = None
|
||||
|
@ -1169,12 +1096,12 @@ class Actor:
|
|||
|
||||
async def _serve_forever(
|
||||
self,
|
||||
handler_nursery: Nursery,
|
||||
handler_nursery: trio.Nursery,
|
||||
*,
|
||||
# (host, port) to bind for channel server
|
||||
listen_sockaddrs: list[tuple[str, int]] | None = None,
|
||||
|
||||
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
|
||||
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
|
||||
) -> None:
|
||||
'''
|
||||
Start the channel server, begin listening for new connections.
|
||||
|
@ -1261,7 +1188,7 @@ class Actor:
|
|||
self._cancel_called = True
|
||||
|
||||
# cancel all ongoing rpc tasks
|
||||
with CancelScope(shield=True):
|
||||
with trio.CancelScope(shield=True):
|
||||
|
||||
# kill any debugger request task to avoid deadlock
|
||||
# with the root actor in this tree
|
||||
|
@ -1321,7 +1248,7 @@ class Actor:
|
|||
# this ctx based lookup ensures the requested task to
|
||||
# be cancelled was indeed spawned by a request from this channel
|
||||
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
|
||||
scope: CancelScope = ctx._scope
|
||||
scope: trio.CancelScope = ctx._scope
|
||||
except KeyError:
|
||||
log.cancel(f"{cid} has already completed/terminated?")
|
||||
return True
|
||||
|
@ -1686,7 +1613,7 @@ async def async_main(
|
|||
# block it might be actually possible to debug THIS
|
||||
# machinery in the same way as user task code?
|
||||
# if actor.name == 'brokerd.ib':
|
||||
# with CancelScope(shield=True):
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await _debug.breakpoint()
|
||||
|
||||
actor.lifetime_stack.close()
|
||||
|
@ -1728,7 +1655,7 @@ async def async_main(
|
|||
):
|
||||
log.runtime(
|
||||
f"Waiting for remaining peers {actor._peers} to clear")
|
||||
with CancelScope(shield=True):
|
||||
with trio.CancelScope(shield=True):
|
||||
await actor._no_more_peers.wait()
|
||||
log.runtime("All peer channels are complete")
|
||||
|
||||
|
@ -1739,7 +1666,7 @@ async def process_messages(
|
|||
actor: Actor,
|
||||
chan: Channel,
|
||||
shield: bool = False,
|
||||
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
|
@ -1757,7 +1684,7 @@ async def process_messages(
|
|||
|
||||
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
|
||||
try:
|
||||
with CancelScope(shield=shield) as loop_cs:
|
||||
with trio.CancelScope(shield=shield) as loop_cs:
|
||||
# this internal scope allows for keeping this message
|
||||
# loop running despite the current task having been
|
||||
# cancelled (eg. `open_portal()` may call this method from
|
||||
|
@ -1819,18 +1746,18 @@ async def process_messages(
|
|||
|
||||
if ns == 'self':
|
||||
if funcname == 'cancel':
|
||||
func: Callable = actor.cancel
|
||||
func = actor.cancel
|
||||
kwargs['requesting_uid'] = chan.uid
|
||||
|
||||
# don't start entire actor runtime cancellation
|
||||
# if this actor is currently in debug mode!
|
||||
pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete
|
||||
pdb_complete = _debug.Lock.local_pdb_complete
|
||||
if pdb_complete:
|
||||
await pdb_complete.wait()
|
||||
|
||||
# we immediately start the runtime machinery
|
||||
# shutdown
|
||||
with CancelScope(shield=True):
|
||||
with trio.CancelScope(shield=True):
|
||||
# actor.cancel() was called so kill this
|
||||
# msg loop and break out into
|
||||
# ``async_main()``
|
||||
|
@ -1858,7 +1785,7 @@ async def process_messages(
|
|||
|
||||
# we immediately start the runtime machinery
|
||||
# shutdown
|
||||
# with CancelScope(shield=True):
|
||||
# with trio.CancelScope(shield=True):
|
||||
kwargs['chan'] = chan
|
||||
target_cid = kwargs['cid']
|
||||
kwargs['requesting_uid'] = chan.uid
|
||||
|
@ -1883,7 +1810,7 @@ async def process_messages(
|
|||
else:
|
||||
# normally registry methods, eg.
|
||||
# ``.register_actor()`` etc.
|
||||
func: Callable = getattr(actor, funcname)
|
||||
func = getattr(actor, funcname)
|
||||
|
||||
else:
|
||||
# complain to client about restricted modules
|
||||
|
@ -1973,10 +1900,9 @@ async def process_messages(
|
|||
Exception,
|
||||
BaseExceptionGroup,
|
||||
) as err:
|
||||
|
||||
if nursery_cancelled_before_task:
|
||||
sn: Nursery = actor._service_n
|
||||
assert sn and sn.cancel_scope.cancel_called # sanity
|
||||
sn = actor._service_n
|
||||
assert sn and sn.cancel_scope.cancel_called
|
||||
log.cancel(
|
||||
f'Service nursery cancelled before it handled {funcname}'
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue