Compare commits

..

No commits in common. "250275d98d52e8076d2797b8c3fabf5f8098d124" and "3f159235374b960d120c55a1be844cd1434ca48b" have entirely different histories.

2 changed files with 58 additions and 142 deletions

View File

@ -76,18 +76,8 @@ async def get_registry(
yield regstr_ptl yield regstr_ptl
# TODO: deprecate and remove _arbiter form
# TODO: deprecate and this remove _arbiter form! get_arbiter = get_registry
@acm
async def get_arbiter(*args, **kwargs):
warnings.warn(
'`tractor.get_arbiter()` is now deprecated!\n'
'Use `.get_registry()` instead!',
DeprecationWarning,
stacklevel=2,
)
async with get_registry(*args, **kwargs) as to_yield:
yield to_yield
@acm @acm

View File

@ -15,10 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
The fundamental core machinery implementing every "actor" including Actor primitives and helpers
the process-local (python-interpreter global) `Actor` state-type
primitive(s), RPC-in-task scheduling, and IPC connectivity and
low-level transport msg handling.
""" """
from __future__ import annotations from __future__ import annotations
@ -44,14 +41,8 @@ import warnings
from async_generator import aclosing from async_generator import aclosing
from exceptiongroup import BaseExceptionGroup from exceptiongroup import BaseExceptionGroup
import trio import trio # type: ignore
from trio import ( from trio_typing import TaskStatus
CancelScope,
)
from trio_typing import (
Nursery,
TaskStatus,
)
from ._ipc import Channel from ._ipc import Channel
from ._context import ( from ._context import (
@ -99,9 +90,10 @@ async def _invoke(
connected IPC channel. connected IPC channel.
This is the core "RPC" `trio.Task` scheduling machinery used to start every This is the core "RPC" `trio.Task` scheduling machinery used to start every
remotely invoked function, normally in `Actor._service_n: Nursery`. remotely invoked function, normally in `Actor._service_n: trio.Nursery`.
''' '''
__tracebackhide__: bool = True
treat_as_gen: bool = False treat_as_gen: bool = False
failed_resp: bool = False failed_resp: bool = False
@ -118,9 +110,9 @@ async def _invoke(
# possibly a traceback (not sure what typing is for this..) # possibly a traceback (not sure what typing is for this..)
tb = None tb = None
cancel_scope = CancelScope() cancel_scope = trio.CancelScope()
# activated cancel scope ref # activated cancel scope ref
cs: CancelScope | None = None cs: trio.CancelScope | None = None
ctx = actor.get_context( ctx = actor.get_context(
chan, chan,
@ -132,7 +124,6 @@ async def _invoke(
) )
context: bool = False context: bool = False
# TODO: deprecate this style..
if getattr(func, '_tractor_stream_function', False): if getattr(func, '_tractor_stream_function', False):
# handle decorated ``@tractor.stream`` async functions # handle decorated ``@tractor.stream`` async functions
sig = inspect.signature(func) sig = inspect.signature(func)
@ -174,7 +165,6 @@ async def _invoke(
except TypeError: except TypeError:
raise raise
# TODO: can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro): if inspect.isasyncgen(coro):
await chan.send({'functype': 'asyncgen', 'cid': cid}) await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: massive gotcha! If the containing scope # XXX: massive gotcha! If the containing scope
@ -205,7 +195,6 @@ async def _invoke(
await chan.send({'stop': True, 'cid': cid}) await chan.send({'stop': True, 'cid': cid})
# one way @stream func that gets treated like an async gen # one way @stream func that gets treated like an async gen
# TODO: can we unify this with the `context=True` impl below?
elif treat_as_gen: elif treat_as_gen:
await chan.send({'functype': 'asyncgen', 'cid': cid}) await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: the async-func may spawn further tasks which push # XXX: the async-func may spawn further tasks which push
@ -222,20 +211,8 @@ async def _invoke(
# far end async gen to tear down # far end async gen to tear down
await chan.send({'stop': True, 'cid': cid}) await chan.send({'stop': True, 'cid': cid})
# our most general case: a remote SC-transitive,
# IPC-linked, cross-actor-task "context"
# ------ - ------
# TODO: every other "func type" should be implemented from # TODO: every other "func type" should be implemented from
# a special case of this impl eventually! # a special case of a context eventually!
# -[ ] streaming funcs should instead of being async-for
# handled directly here wrapped in
# a async-with-open_stream() closure that does the
# normal thing you'd expect a far end streaming context
# to (if written by the app-dev).
# -[ ] one off async funcs can literally just be called
# here and awaited directly, possibly just with a small
# wrapper that calls `Context.started()` and then does
# the `await coro()`?
elif context: elif context:
# context func with support for bi-dir streaming # context func with support for bi-dir streaming
await chan.send({'functype': 'context', 'cid': cid}) await chan.send({'functype': 'context', 'cid': cid})
@ -296,12 +273,11 @@ async def _invoke(
ctx._maybe_raise_remote_err(re) ctx._maybe_raise_remote_err(re)
fname: str = func.__name__ fname: str = func.__name__
cs: CancelScope = ctx._scope cs: trio.CancelScope = ctx._scope
if cs.cancel_called: if cs.cancel_called:
our_uid: tuple = actor.uid
canceller: tuple = ctx.canceller canceller: tuple = ctx.canceller
msg: str = ( msg: str = (
f'`{fname}()`@{our_uid} cancelled by ' f'`{fname}()`@{actor.uid} cancelled by '
) )
# NOTE / TODO: if we end up having # NOTE / TODO: if we end up having
@ -310,8 +286,6 @@ async def _invoke(
# need to change this logic branch since it # need to change this logic branch since it
# will always enter.. # will always enter..
if ctx._cancel_called: if ctx._cancel_called:
# TODO: test for this!!!!!
canceller: tuple = our_uid
msg += 'itself ' msg += 'itself '
# if the channel which spawned the ctx is the # if the channel which spawned the ctx is the
@ -344,76 +318,40 @@ async def _invoke(
canceller=canceller, canceller=canceller,
) )
# regular async function/method # regular async function
# XXX: possibly just a scheduled `Actor._cancel_task()`
# from a remote request to cancel some `Context`.
# ------ - ------
# TODO: ideally we unify this with the above `context=True`
# block such that for any remote invocation ftype, we
# always invoke the far end RPC task scheduling the same
# way: using the linked IPC context machinery.
else: else:
try: try:
await chan.send({ await chan.send({
'functype': 'asyncfunc', 'functype': 'asyncfunc',
'cid': cid 'cid': cid
}) })
except ( except trio.BrokenResourceError:
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
) as ipc_err:
failed_resp = True failed_resp = True
if is_rpc: if is_rpc:
raise raise
else: else:
# TODO: should this be an `.exception()` call?
log.warning( log.warning(
f'Failed to respond to non-rpc request: {func}\n' f'Failed to respond to non-rpc request: {func}'
f'{ipc_err}'
) )
with cancel_scope as cs: with cancel_scope as cs:
ctx._scope: CancelScope = cs ctx._scope = cs
task_status.started(ctx) task_status.started(ctx)
result = await coro result = await coro
fname: str = func.__name__ fname: str = func.__name__
log.runtime(f'{fname}() result: {result}') log.runtime(f'{fname}() result: {result}')
if not failed_resp:
# NOTE: only send result if we know IPC isn't down # only send result if we know IPC isn't down
if ( await chan.send(
not failed_resp {'return': result,
and chan.connected() 'cid': cid}
): )
try:
await chan.send(
{'return': result,
'cid': cid}
)
except (
BrokenPipeError,
trio.BrokenResourceError,
):
log.warning(
'Failed to return result:\n'
f'{func}@{actor.uid}\n'
f'remote chan: {chan.uid}'
)
except ( except (
Exception, Exception,
BaseExceptionGroup, BaseExceptionGroup,
) as err: ) as err:
# always hide this frame from debug REPL if the crash
# originated from an rpc task and we DID NOT fail
# due to an IPC transport error!
if (
is_rpc
and chan.connected()
):
__tracebackhide__: bool = True
if not is_multi_cancelled(err): if not is_multi_cancelled(err):
# TODO: maybe we'll want different "levels" of debugging # TODO: maybe we'll want different "levels" of debugging
@ -447,31 +385,24 @@ async def _invoke(
log.exception("Actor crashed:") log.exception("Actor crashed:")
# always ship errors back to caller # always ship errors back to caller
err_msg: dict[str, dict] = pack_error( err_msg = pack_error(err, tb=tb)
err,
tb=tb,
)
err_msg['cid'] = cid err_msg['cid'] = cid
if is_rpc: try:
try: await chan.send(err_msg)
await chan.send(err_msg)
# TODO: tests for this scenario: # TODO: tests for this scenario:
# - RPC caller closes connection before getting a response # - RPC caller closes connection before getting a response
# should **not** crash this actor.. # should **not** crash this actor..
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
BrokenPipeError, BrokenPipeError,
) as ipc_err: ):
# if we can't propagate the error that's a big boo boo
# if we can't propagate the error that's a big boo boo log.exception(
log.exception( f"Failed to ship error to caller @ {chan.uid} !?"
f"Failed to ship error to caller @ {chan.uid} !?\n" )
f'{ipc_err}'
)
# error is probably from above coro running code *not from the # error is probably from above coro running code *not from the
# underlyingn rpc invocation* since a scope was never allocated # underlyingn rpc invocation* since a scope was never allocated
@ -497,11 +428,7 @@ async def _invoke(
log.warning( log.warning(
f"Task {func} likely errored or cancelled before start") f"Task {func} likely errored or cancelled before start")
else: else:
log.cancel( log.cancel(f'{func.__name__}({kwargs}) failed?')
'Failed to de-alloc internal task!?\n'
f'cid: {cid}\n'
f'{func.__name__}({kwargs})'
)
finally: finally:
if not actor._rpc_tasks: if not actor._rpc_tasks:
@ -518,7 +445,7 @@ async def try_ship_error_to_parent(
err: Exception | BaseExceptionGroup, err: Exception | BaseExceptionGroup,
) -> None: ) -> None:
with CancelScope(shield=True): with trio.CancelScope(shield=True):
try: try:
# internal error so ship to parent without cid # internal error so ship to parent without cid
await channel.send(pack_error(err)) await channel.send(pack_error(err))
@ -570,13 +497,13 @@ class Actor:
msg_buffer_size: int = 2**6 msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()` after fork # nursery placeholders filled in by `async_main()` after fork
_root_n: Nursery | None = None _root_n: trio.Nursery | None = None
_service_n: Nursery | None = None _service_n: trio.Nursery | None = None
_server_n: Nursery | None = None _server_n: trio.Nursery | None = None
# Information about `__main__` from parent # Information about `__main__` from parent
_parent_main_data: dict[str, str] _parent_main_data: dict[str, str]
_parent_chan_cs: CancelScope | None = None _parent_chan_cs: trio.CancelScope | None = None
# syncs for setup/teardown sequences # syncs for setup/teardown sequences
_server_down: trio.Event | None = None _server_down: trio.Event | None = None
@ -1169,12 +1096,12 @@ class Actor:
async def _serve_forever( async def _serve_forever(
self, self,
handler_nursery: Nursery, handler_nursery: trio.Nursery,
*, *,
# (host, port) to bind for channel server # (host, port) to bind for channel server
listen_sockaddrs: list[tuple[str, int]] | None = None, listen_sockaddrs: list[tuple[str, int]] | None = None,
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' '''
Start the channel server, begin listening for new connections. Start the channel server, begin listening for new connections.
@ -1261,7 +1188,7 @@ class Actor:
self._cancel_called = True self._cancel_called = True
# cancel all ongoing rpc tasks # cancel all ongoing rpc tasks
with CancelScope(shield=True): with trio.CancelScope(shield=True):
# kill any debugger request task to avoid deadlock # kill any debugger request task to avoid deadlock
# with the root actor in this tree # with the root actor in this tree
@ -1321,7 +1248,7 @@ class Actor:
# this ctx based lookup ensures the requested task to # this ctx based lookup ensures the requested task to
# be cancelled was indeed spawned by a request from this channel # be cancelled was indeed spawned by a request from this channel
ctx, func, is_complete = self._rpc_tasks[(chan, cid)] ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
scope: CancelScope = ctx._scope scope: trio.CancelScope = ctx._scope
except KeyError: except KeyError:
log.cancel(f"{cid} has already completed/terminated?") log.cancel(f"{cid} has already completed/terminated?")
return True return True
@ -1686,7 +1613,7 @@ async def async_main(
# block it might be actually possible to debug THIS # block it might be actually possible to debug THIS
# machinery in the same way as user task code? # machinery in the same way as user task code?
# if actor.name == 'brokerd.ib': # if actor.name == 'brokerd.ib':
# with CancelScope(shield=True): # with trio.CancelScope(shield=True):
# await _debug.breakpoint() # await _debug.breakpoint()
actor.lifetime_stack.close() actor.lifetime_stack.close()
@ -1728,7 +1655,7 @@ async def async_main(
): ):
log.runtime( log.runtime(
f"Waiting for remaining peers {actor._peers} to clear") f"Waiting for remaining peers {actor._peers} to clear")
with CancelScope(shield=True): with trio.CancelScope(shield=True):
await actor._no_more_peers.wait() await actor._no_more_peers.wait()
log.runtime("All peer channels are complete") log.runtime("All peer channels are complete")
@ -1739,7 +1666,7 @@ async def process_messages(
actor: Actor, actor: Actor,
chan: Channel, chan: Channel,
shield: bool = False, shield: bool = False,
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> bool: ) -> bool:
''' '''
@ -1757,7 +1684,7 @@ async def process_messages(
log.runtime(f"Entering msg loop for {chan} from {chan.uid}") log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
try: try:
with CancelScope(shield=shield) as loop_cs: with trio.CancelScope(shield=shield) as loop_cs:
# this internal scope allows for keeping this message # this internal scope allows for keeping this message
# loop running despite the current task having been # loop running despite the current task having been
# cancelled (eg. `open_portal()` may call this method from # cancelled (eg. `open_portal()` may call this method from
@ -1819,18 +1746,18 @@ async def process_messages(
if ns == 'self': if ns == 'self':
if funcname == 'cancel': if funcname == 'cancel':
func: Callable = actor.cancel func = actor.cancel
kwargs['requesting_uid'] = chan.uid kwargs['requesting_uid'] = chan.uid
# don't start entire actor runtime cancellation # don't start entire actor runtime cancellation
# if this actor is currently in debug mode! # if this actor is currently in debug mode!
pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete pdb_complete = _debug.Lock.local_pdb_complete
if pdb_complete: if pdb_complete:
await pdb_complete.wait() await pdb_complete.wait()
# we immediately start the runtime machinery # we immediately start the runtime machinery
# shutdown # shutdown
with CancelScope(shield=True): with trio.CancelScope(shield=True):
# actor.cancel() was called so kill this # actor.cancel() was called so kill this
# msg loop and break out into # msg loop and break out into
# ``async_main()`` # ``async_main()``
@ -1858,7 +1785,7 @@ async def process_messages(
# we immediately start the runtime machinery # we immediately start the runtime machinery
# shutdown # shutdown
# with CancelScope(shield=True): # with trio.CancelScope(shield=True):
kwargs['chan'] = chan kwargs['chan'] = chan
target_cid = kwargs['cid'] target_cid = kwargs['cid']
kwargs['requesting_uid'] = chan.uid kwargs['requesting_uid'] = chan.uid
@ -1883,7 +1810,7 @@ async def process_messages(
else: else:
# normally registry methods, eg. # normally registry methods, eg.
# ``.register_actor()`` etc. # ``.register_actor()`` etc.
func: Callable = getattr(actor, funcname) func = getattr(actor, funcname)
else: else:
# complain to client about restricted modules # complain to client about restricted modules
@ -1973,10 +1900,9 @@ async def process_messages(
Exception, Exception,
BaseExceptionGroup, BaseExceptionGroup,
) as err: ) as err:
if nursery_cancelled_before_task: if nursery_cancelled_before_task:
sn: Nursery = actor._service_n sn = actor._service_n
assert sn and sn.cancel_scope.cancel_called # sanity assert sn and sn.cancel_scope.cancel_called
log.cancel( log.cancel(
f'Service nursery cancelled before it handled {funcname}' f'Service nursery cancelled before it handled {funcname}'
) )