Compare commits

..

No commits in common. "250275d98d52e8076d2797b8c3fabf5f8098d124" and "3f159235374b960d120c55a1be844cd1434ca48b" have entirely different histories.

2 changed files with 58 additions and 142 deletions

View File

@ -76,18 +76,8 @@ async def get_registry(
yield regstr_ptl
# TODO: deprecate and this remove _arbiter form!
@acm
async def get_arbiter(*args, **kwargs):
warnings.warn(
'`tractor.get_arbiter()` is now deprecated!\n'
'Use `.get_registry()` instead!',
DeprecationWarning,
stacklevel=2,
)
async with get_registry(*args, **kwargs) as to_yield:
yield to_yield
# TODO: deprecate and remove _arbiter form
get_arbiter = get_registry
@acm

View File

@ -15,10 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
The fundamental core machinery implementing every "actor" including
the process-local (python-interpreter global) `Actor` state-type
primitive(s), RPC-in-task scheduling, and IPC connectivity and
low-level transport msg handling.
Actor primitives and helpers
"""
from __future__ import annotations
@ -44,14 +41,8 @@ import warnings
from async_generator import aclosing
from exceptiongroup import BaseExceptionGroup
import trio
from trio import (
CancelScope,
)
from trio_typing import (
Nursery,
TaskStatus,
)
import trio # type: ignore
from trio_typing import TaskStatus
from ._ipc import Channel
from ._context import (
@ -99,9 +90,10 @@ async def _invoke(
connected IPC channel.
This is the core "RPC" `trio.Task` scheduling machinery used to start every
remotely invoked function, normally in `Actor._service_n: Nursery`.
remotely invoked function, normally in `Actor._service_n: trio.Nursery`.
'''
__tracebackhide__: bool = True
treat_as_gen: bool = False
failed_resp: bool = False
@ -118,9 +110,9 @@ async def _invoke(
# possibly a traceback (not sure what typing is for this..)
tb = None
cancel_scope = CancelScope()
cancel_scope = trio.CancelScope()
# activated cancel scope ref
cs: CancelScope | None = None
cs: trio.CancelScope | None = None
ctx = actor.get_context(
chan,
@ -132,7 +124,6 @@ async def _invoke(
)
context: bool = False
# TODO: deprecate this style..
if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions
sig = inspect.signature(func)
@ -174,7 +165,6 @@ async def _invoke(
except TypeError:
raise
# TODO: can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro):
await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: massive gotcha! If the containing scope
@ -205,7 +195,6 @@ async def _invoke(
await chan.send({'stop': True, 'cid': cid})
# one way @stream func that gets treated like an async gen
# TODO: can we unify this with the `context=True` impl below?
elif treat_as_gen:
await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: the async-func may spawn further tasks which push
@ -222,20 +211,8 @@ async def _invoke(
# far end async gen to tear down
await chan.send({'stop': True, 'cid': cid})
# our most general case: a remote SC-transitive,
# IPC-linked, cross-actor-task "context"
# ------ - ------
# TODO: every other "func type" should be implemented from
# a special case of this impl eventually!
# -[ ] streaming funcs should instead of being async-for
# handled directly here wrapped in
# a async-with-open_stream() closure that does the
# normal thing you'd expect a far end streaming context
# to (if written by the app-dev).
# -[ ] one off async funcs can literally just be called
# here and awaited directly, possibly just with a small
# wrapper that calls `Context.started()` and then does
# the `await coro()`?
# a special case of a context eventually!
elif context:
# context func with support for bi-dir streaming
await chan.send({'functype': 'context', 'cid': cid})
@ -296,12 +273,11 @@ async def _invoke(
ctx._maybe_raise_remote_err(re)
fname: str = func.__name__
cs: CancelScope = ctx._scope
cs: trio.CancelScope = ctx._scope
if cs.cancel_called:
our_uid: tuple = actor.uid
canceller: tuple = ctx.canceller
msg: str = (
f'`{fname}()`@{our_uid} cancelled by '
f'`{fname}()`@{actor.uid} cancelled by '
)
# NOTE / TODO: if we end up having
@ -310,8 +286,6 @@ async def _invoke(
# need to change this logic branch since it
# will always enter..
if ctx._cancel_called:
# TODO: test for this!!!!!
canceller: tuple = our_uid
msg += 'itself '
# if the channel which spawned the ctx is the
@ -344,76 +318,40 @@ async def _invoke(
canceller=canceller,
)
# regular async function/method
# XXX: possibly just a scheduled `Actor._cancel_task()`
# from a remote request to cancel some `Context`.
# ------ - ------
# TODO: ideally we unify this with the above `context=True`
# block such that for any remote invocation ftype, we
# always invoke the far end RPC task scheduling the same
# way: using the linked IPC context machinery.
# regular async function
else:
try:
await chan.send({
'functype': 'asyncfunc',
'cid': cid
})
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
) as ipc_err:
except trio.BrokenResourceError:
failed_resp = True
if is_rpc:
raise
else:
# TODO: should this be an `.exception()` call?
log.warning(
f'Failed to respond to non-rpc request: {func}\n'
f'{ipc_err}'
f'Failed to respond to non-rpc request: {func}'
)
with cancel_scope as cs:
ctx._scope: CancelScope = cs
ctx._scope = cs
task_status.started(ctx)
result = await coro
fname: str = func.__name__
log.runtime(f'{fname}() result: {result}')
# NOTE: only send result if we know IPC isn't down
if (
not failed_resp
and chan.connected()
):
try:
await chan.send(
{'return': result,
'cid': cid}
)
except (
BrokenPipeError,
trio.BrokenResourceError,
):
log.warning(
'Failed to return result:\n'
f'{func}@{actor.uid}\n'
f'remote chan: {chan.uid}'
)
if not failed_resp:
# only send result if we know IPC isn't down
await chan.send(
{'return': result,
'cid': cid}
)
except (
Exception,
BaseExceptionGroup,
) as err:
# always hide this frame from debug REPL if the crash
# originated from an rpc task and we DID NOT fail
# due to an IPC transport error!
if (
is_rpc
and chan.connected()
):
__tracebackhide__: bool = True
if not is_multi_cancelled(err):
# TODO: maybe we'll want different "levels" of debugging
@ -447,31 +385,24 @@ async def _invoke(
log.exception("Actor crashed:")
# always ship errors back to caller
err_msg: dict[str, dict] = pack_error(
err,
tb=tb,
)
err_msg = pack_error(err, tb=tb)
err_msg['cid'] = cid
if is_rpc:
try:
await chan.send(err_msg)
try:
await chan.send(err_msg)
# TODO: tests for this scenario:
# - RPC caller closes connection before getting a response
# should **not** crash this actor..
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
) as ipc_err:
# if we can't propagate the error that's a big boo boo
log.exception(
f"Failed to ship error to caller @ {chan.uid} !?\n"
f'{ipc_err}'
)
# TODO: tests for this scenario:
# - RPC caller closes connection before getting a response
# should **not** crash this actor..
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
):
# if we can't propagate the error that's a big boo boo
log.exception(
f"Failed to ship error to caller @ {chan.uid} !?"
)
# error is probably from above coro running code *not from the
# underlyingn rpc invocation* since a scope was never allocated
@ -497,11 +428,7 @@ async def _invoke(
log.warning(
f"Task {func} likely errored or cancelled before start")
else:
log.cancel(
'Failed to de-alloc internal task!?\n'
f'cid: {cid}\n'
f'{func.__name__}({kwargs})'
)
log.cancel(f'{func.__name__}({kwargs}) failed?')
finally:
if not actor._rpc_tasks:
@ -518,7 +445,7 @@ async def try_ship_error_to_parent(
err: Exception | BaseExceptionGroup,
) -> None:
with CancelScope(shield=True):
with trio.CancelScope(shield=True):
try:
# internal error so ship to parent without cid
await channel.send(pack_error(err))
@ -570,13 +497,13 @@ class Actor:
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()` after fork
_root_n: Nursery | None = None
_service_n: Nursery | None = None
_server_n: Nursery | None = None
_root_n: trio.Nursery | None = None
_service_n: trio.Nursery | None = None
_server_n: trio.Nursery | None = None
# Information about `__main__` from parent
_parent_main_data: dict[str, str]
_parent_chan_cs: CancelScope | None = None
_parent_chan_cs: trio.CancelScope | None = None
# syncs for setup/teardown sequences
_server_down: trio.Event | None = None
@ -1169,12 +1096,12 @@ class Actor:
async def _serve_forever(
self,
handler_nursery: Nursery,
handler_nursery: trio.Nursery,
*,
# (host, port) to bind for channel server
listen_sockaddrs: list[tuple[str, int]] | None = None,
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Start the channel server, begin listening for new connections.
@ -1261,7 +1188,7 @@ class Actor:
self._cancel_called = True
# cancel all ongoing rpc tasks
with CancelScope(shield=True):
with trio.CancelScope(shield=True):
# kill any debugger request task to avoid deadlock
# with the root actor in this tree
@ -1321,7 +1248,7 @@ class Actor:
# this ctx based lookup ensures the requested task to
# be cancelled was indeed spawned by a request from this channel
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
scope: CancelScope = ctx._scope
scope: trio.CancelScope = ctx._scope
except KeyError:
log.cancel(f"{cid} has already completed/terminated?")
return True
@ -1686,7 +1613,7 @@ async def async_main(
# block it might be actually possible to debug THIS
# machinery in the same way as user task code?
# if actor.name == 'brokerd.ib':
# with CancelScope(shield=True):
# with trio.CancelScope(shield=True):
# await _debug.breakpoint()
actor.lifetime_stack.close()
@ -1728,7 +1655,7 @@ async def async_main(
):
log.runtime(
f"Waiting for remaining peers {actor._peers} to clear")
with CancelScope(shield=True):
with trio.CancelScope(shield=True):
await actor._no_more_peers.wait()
log.runtime("All peer channels are complete")
@ -1739,7 +1666,7 @@ async def process_messages(
actor: Actor,
chan: Channel,
shield: bool = False,
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> bool:
'''
@ -1757,7 +1684,7 @@ async def process_messages(
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
try:
with CancelScope(shield=shield) as loop_cs:
with trio.CancelScope(shield=shield) as loop_cs:
# this internal scope allows for keeping this message
# loop running despite the current task having been
# cancelled (eg. `open_portal()` may call this method from
@ -1819,18 +1746,18 @@ async def process_messages(
if ns == 'self':
if funcname == 'cancel':
func: Callable = actor.cancel
func = actor.cancel
kwargs['requesting_uid'] = chan.uid
# don't start entire actor runtime cancellation
# if this actor is currently in debug mode!
pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete
pdb_complete = _debug.Lock.local_pdb_complete
if pdb_complete:
await pdb_complete.wait()
# we immediately start the runtime machinery
# shutdown
with CancelScope(shield=True):
with trio.CancelScope(shield=True):
# actor.cancel() was called so kill this
# msg loop and break out into
# ``async_main()``
@ -1858,7 +1785,7 @@ async def process_messages(
# we immediately start the runtime machinery
# shutdown
# with CancelScope(shield=True):
# with trio.CancelScope(shield=True):
kwargs['chan'] = chan
target_cid = kwargs['cid']
kwargs['requesting_uid'] = chan.uid
@ -1883,7 +1810,7 @@ async def process_messages(
else:
# normally registry methods, eg.
# ``.register_actor()`` etc.
func: Callable = getattr(actor, funcname)
func = getattr(actor, funcname)
else:
# complain to client about restricted modules
@ -1973,10 +1900,9 @@ async def process_messages(
Exception,
BaseExceptionGroup,
) as err:
if nursery_cancelled_before_task:
sn: Nursery = actor._service_n
assert sn and sn.cancel_scope.cancel_called # sanity
sn = actor._service_n
assert sn and sn.cancel_scope.cancel_called
log.cancel(
f'Service nursery cancelled before it handled {funcname}'
)