Compare commits

..

No commits in common. "fdf0c43bfa1d94e0c6a6be9a090d1eaad3ae7408" and "7507e269ec2f769e4f6cc208dec51f69c62558e9" have entirely different histories.

13 changed files with 372 additions and 1081 deletions

View File

@ -65,28 +65,21 @@ async def aggregate(seed):
print("AGGREGATOR COMPLETE!") print("AGGREGATOR COMPLETE!")
async def main() -> list[int]: # this is the main actor and *arbiter*
''' async def main():
This is the "root" actor's main task's entrypoint. # a nursery which spawns "actors"
async with tractor.open_nursery(
By default (and if not otherwise specified) that root process arbiter_addr=('127.0.0.1', 1616)
also acts as a "registry actor" / "registrar" on the localhost ) as nursery:
for the purposes of multi-actor "service discovery".
'''
# yes, a nursery which spawns `trio`-"actors" B)
nursery: tractor.ActorNursery
async with tractor.open_nursery() as nursery:
seed = int(1e3) seed = int(1e3)
pre_start = time.time() pre_start = time.time()
portal: tractor.Portal = await nursery.start_actor( portal = await nursery.start_actor(
name='aggregator', name='aggregator',
enable_modules=[__name__], enable_modules=[__name__],
) )
stream: tractor.MsgStream
async with portal.open_stream_from( async with portal.open_stream_from(
aggregate, aggregate,
seed=seed, seed=seed,

View File

@ -1,8 +1,8 @@
''' '''
``async with ():`` inlined context-stream cancellation testing. ``async with ():`` inlined context-stream cancellation testing.
Verify the we raise errors when streams are opened prior to Verify the we raise errors when streams are opened prior to sync-opening
sync-opening a ``tractor.Context`` beforehand. a ``tractor.Context`` beforehand.
''' '''
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
@ -922,3 +922,93 @@ def test_maybe_allow_overruns_stream(
# if this hits the logic blocks from above are not # if this hits the logic blocks from above are not
# exhaustive.. # exhaustive..
pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO') pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
@tractor.context
async def sleep_forever(
ctx: tractor.Context,
) -> None:
await ctx.started()
async with ctx.open_stream():
await trio.sleep_forever()
@acm
async def attach_to_sleep_forever():
'''
Cancel a context **before** any underlying error is raised in order
to trigger a local reception of a ``ContextCancelled`` which **should not**
be re-raised in the local surrounding ``Context`` *iff* the cancel was
requested by **this** side of the context.
'''
async with tractor.wait_for_actor('sleeper') as p2:
async with (
p2.open_context(sleep_forever) as (peer_ctx, first),
peer_ctx.open_stream(),
):
try:
yield
finally:
# XXX: previously this would trigger local
# ``ContextCancelled`` to be received and raised in the
# local context overriding any local error due to logic
# inside ``_invoke()`` which checked for an error set on
# ``Context._error`` and raised it in a cancellation
# scenario.
# ------
# The problem is you can have a remote cancellation that
# is part of a local error and we shouldn't raise
# ``ContextCancelled`` **iff** we **were not** the side
# of the context to initiate it, i.e.
# ``Context._cancel_called`` should **NOT** have been
# set. The special logic to handle this case is now
# inside ``Context._maybe_raise_from_remote_msg()`` XD
await peer_ctx.cancel()
@tractor.context
async def error_before_started(
ctx: tractor.Context,
) -> None:
'''
This simulates exactly an original bug discovered in:
https://github.com/pikers/piker/issues/244
'''
async with attach_to_sleep_forever():
# send an unserializable type which should raise a type error
# here and **NOT BE SWALLOWED** by the surrounding acm!!?!
await ctx.started(object())
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
'''
Verify that an error raised in a remote context which itself opens
another remote context, which it cancels, does not ovverride the
original error that caused the cancellation of the secondardy
context.
'''
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'errorer',
enable_modules=[__name__],
)
await n.start_actor(
'sleeper',
enable_modules=[__name__],
)
async with (
portal.open_context(
error_before_started
) as (ctx, sent),
):
await trio.sleep_forever()
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
assert excinfo.value.type == TypeError

View File

@ -1,451 +0,0 @@
'''
Codify the cancellation request semantics in terms
of one remote actor cancelling another.
'''
# from contextlib import asynccontextmanager as acm
import itertools
import pytest
import trio
import tractor
from tractor import ( # typing
Portal,
Context,
ContextCancelled,
)
# def test_self_cancel():
# '''
# 2 cases:
# - calls `Actor.cancel()` locally in some task
# - calls LocalPortal.cancel_actor()` ?
# '''
# ...
@tractor.context
async def sleep_forever(
ctx: Context,
) -> None:
'''
Sync the context, open a stream then just sleep.
'''
await ctx.started()
async with ctx.open_stream():
await trio.sleep_forever()
@tractor.context
async def error_before_started(
ctx: Context,
) -> None:
'''
This simulates exactly an original bug discovered in:
https://github.com/pikers/piker/issues/244
Cancel a context **before** any underlying error is raised so
as to trigger a local reception of a ``ContextCancelled`` which
SHOULD NOT be re-raised in the local surrounding ``Context``
*iff* the cancel was requested by **this** (callee) side of
the context.
'''
async with tractor.wait_for_actor('sleeper') as p2:
async with (
p2.open_context(sleep_forever) as (peer_ctx, first),
peer_ctx.open_stream(),
):
# NOTE: this WAS inside an @acm body but i factored it
# out and just put it inline here since i don't think
# the mngr part really matters, though maybe it could?
try:
# XXX NOTE XXX: THIS sends an UNSERIALIZABLE TYPE which
# should raise a `TypeError` and **NOT BE SWALLOWED** by
# the surrounding try/finally (normally inside the
# body of some acm)..
await ctx.started(object())
# yield
finally:
# XXX: previously this would trigger local
# ``ContextCancelled`` to be received and raised in the
# local context overriding any local error due to logic
# inside ``_invoke()`` which checked for an error set on
# ``Context._error`` and raised it in a cancellation
# scenario.
# ------
# The problem is you can have a remote cancellation that
# is part of a local error and we shouldn't raise
# ``ContextCancelled`` **iff** we **were not** the side
# of the context to initiate it, i.e.
# ``Context._cancel_called`` should **NOT** have been
# set. The special logic to handle this case is now
# inside ``Context._maybe_raise_from_remote_msg()`` XD
await peer_ctx.cancel()
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
'''
Verify that an error raised in a remote context which itself
opens YET ANOTHER remote context, which it then cancels, does not
override the original error that caused the cancellation of the
secondary context.
'''
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'errorer',
enable_modules=[__name__],
)
await n.start_actor(
'sleeper',
enable_modules=[__name__],
)
async with (
portal.open_context(
error_before_started
) as (ctx, sent),
):
await trio.sleep_forever()
with pytest.raises(tractor.RemoteActorError) as excinfo:
trio.run(main)
assert excinfo.value.type == TypeError
@tractor.context
async def sleep_a_bit_then_cancel_peer(
ctx: Context,
peer_name: str = 'sleeper',
cancel_after: float = .5,
) -> None:
'''
Connect to peer, sleep as per input delay, cancel the peer.
'''
peer: Portal
async with tractor.wait_for_actor(peer_name) as peer:
await ctx.started()
await trio.sleep(cancel_after)
await peer.cancel_actor()
@tractor.context
async def stream_ints(
ctx: Context,
):
await ctx.started()
async with ctx.open_stream() as stream:
for i in itertools.count():
await stream.send(i)
@tractor.context
async def stream_from_peer(
ctx: Context,
peer_name: str = 'sleeper',
) -> None:
peer: Portal
try:
async with (
tractor.wait_for_actor(peer_name) as peer,
peer.open_context(stream_ints) as (peer_ctx, first),
peer_ctx.open_stream() as stream,
):
await ctx.started()
# XXX TODO: big set of questions for this
# - should we raise `ContextCancelled` or `Cancelled` (rn
# it does that) here?!
# - test the `ContextCancelled` OUTSIDE the
# `.open_context()` call?
try:
async for msg in stream:
print(msg)
except trio.Cancelled:
assert not ctx.cancel_called
assert not ctx.cancelled_caught
assert not peer_ctx.cancel_called
assert not peer_ctx.cancelled_caught
assert 'root' in ctx.cancel_called_remote
raise # XXX MUST NEVER MASK IT!!
with trio.CancelScope(shield=True):
await tractor.pause()
# pass
# pytest.fail(
raise RuntimeError(
'peer never triggered local `[Context]Cancelled`?!?'
)
# NOTE: cancellation of the (sleeper) peer should always
# cause a `ContextCancelled` raise in this streaming
# actor.
except ContextCancelled as ctxerr:
assert ctxerr.canceller == 'canceller'
assert ctxerr._remote_error is ctxerr
# CASE 1: we were cancelled by our parent, the root actor.
# TODO: there are other cases depending on how the root
# actor and it's caller side task are written:
# - if the root does not req us to cancel then an
# IPC-transport related error should bubble from the async
# for loop and thus cause local cancellation both here
# and in the root (since in that case this task cancels the
# context with the root, not the other way around)
assert ctx.cancel_called_remote[0] == 'root'
raise
# except BaseException as err:
# raise
# cases:
# - some arbitrary remote peer cancels via Portal.cancel_actor().
# => all other connected peers should get that cancel requesting peer's
# uid in the ctx-cancelled error msg.
# - peer spawned a sub-actor which (also) spawned a failing task
# which was unhandled and propagated up to the immediate
# parent, the peer to the actor that also spawned a remote task
# task in that same peer-parent.
# - peer cancelled itself - so other peers should
# get errors reflecting that the peer was itself the .canceller?
# - WE cancelled the peer and thus should not see any raised
# `ContextCancelled` as it should be reaped silently?
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
# already covers this case?
@pytest.mark.parametrize(
'error_during_ctxerr_handling',
[False, True],
)
def test_peer_canceller(
error_during_ctxerr_handling: bool,
):
'''
Verify that a cancellation triggered by an in-actor-tree peer
results in a cancelled errors with all other actors which have
opened contexts to that same actor.
legend:
name>
a "play button" that indicates a new runtime instance,
an individual actor with `name`.
.subname>
a subactor who's parent should be on some previous
line and be less indented.
.actor0> ()-> .actor1>
a inter-actor task context opened (by `async with `Portal.open_context()`)
from actor0 *into* actor1.
.actor0> ()<=> .actor1>
a inter-actor task context opened (as above)
from actor0 *into* actor1 which INCLUDES an additional
stream open using `async with Context.open_stream()`.
------ - ------
supervision view
------ - ------
root>
.sleeper> TODO: SOME SYNTAX SHOWING JUST SLEEPING
.just_caller> ()=> .sleeper>
.canceller> ()-> .sleeper>
TODO: how define calling `Portal.cancel_actor()`
In this case a `ContextCancelled` with `.errorer` set to the
requesting actor, in this case 'canceller', should be relayed
to all other actors who have also opened a (remote task)
context with that now cancelled actor.
------ - ------
task view
------ - ------
So there are 5 context open in total with 3 from the root to
its children and 2 from children to their peers:
1. root> ()-> .sleeper>
2. root> ()-> .streamer>
3. root> ()-> .canceller>
4. .streamer> ()<=> .sleep>
5. .canceller> ()-> .sleeper>
- calls `Portal.cancel_actor()`
'''
async def main():
async with tractor.open_nursery() as an:
canceller: Portal = await an.start_actor(
'canceller',
enable_modules=[__name__],
)
sleeper: Portal = await an.start_actor(
'sleeper',
enable_modules=[__name__],
)
just_caller: Portal = await an.start_actor(
'just_caller', # but i just met her?
enable_modules=[__name__],
)
try:
async with (
sleeper.open_context(
sleep_forever,
) as (sleeper_ctx, sent),
just_caller.open_context(
stream_from_peer,
) as (caller_ctx, sent),
canceller.open_context(
sleep_a_bit_then_cancel_peer,
) as (canceller_ctx, sent),
):
ctxs: list[Context] = [
sleeper_ctx,
caller_ctx,
canceller_ctx,
]
try:
print('PRE CONTEXT RESULT')
await sleeper_ctx.result()
# should never get here
pytest.fail(
'Context.result() did not raise ctx-cancelled?'
)
# TODO: not sure why this isn't catching
# but maybe we need an `ExceptionGroup` and
# the whole except *errs: thinger in 3.11?
except ContextCancelled as ctxerr:
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
# canceller and caller peers should not
# have been remotely cancelled.
assert canceller_ctx.cancel_called_remote is None
assert caller_ctx.cancel_called_remote is None
assert ctxerr.canceller[0] == 'canceller'
# XXX NOTE XXX: since THIS `ContextCancelled`
# HAS NOT YET bubbled up to the
# `sleeper.open_context().__aexit__()` this
# value is not yet set, however outside this
# block it should be.
assert not sleeper_ctx.cancelled_caught
# TODO: a test which ensures this error is
# bubbled and caught (NOT MASKED) by the
# runtime!!!
if error_during_ctxerr_handling:
raise RuntimeError('Simulated error during teardown')
raise
# SHOULD NEVER GET HERE!
except BaseException:
pytest.fail('did not rx ctx-cancelled error?')
else:
pytest.fail('did not rx ctx-cancelled error?')
except (
ContextCancelled,
RuntimeError,
)as ctxerr:
_err = ctxerr
if error_during_ctxerr_handling:
assert isinstance(ctxerr, RuntimeError)
# NOTE: this root actor task should have
# called `Context.cancel()` on the
# `.__aexit__()` to every opened ctx.
for ctx in ctxs:
assert ctx.cancel_called
# each context should have received
# a silently absorbed context cancellation
# from its peer actor's task.
assert ctx.chan.uid == ctx.cancel_called_remote
# this root actor task should have
# cancelled all opened contexts except
# the sleeper which is cancelled by its
# peer "canceller"
if ctx is not sleeper_ctx:
assert ctx._remote_error.canceller[0] == 'root'
else:
assert ctxerr.canceller[0] == 'canceller'
# the sleeper's remote error is the error bubbled
# out of the context-stack above!
re = sleeper_ctx._remote_error
assert re is ctxerr
for ctx in ctxs:
if ctx is sleeper_ctx:
assert not ctx.cancel_called
assert ctx.cancelled_caught
else:
assert ctx.cancel_called
assert not ctx.cancelled_caught
# each context should have received
# a silently absorbed context cancellation
# from its peer actor's task.
assert ctx.chan.uid == ctx.cancel_called_remote
# NOTE: when an inter-peer cancellation
# occurred, we DO NOT expect this
# root-actor-task to have requested a cancel of
# the context since cancellation was caused by
# the "canceller" peer and thus
# `Context.cancel()` SHOULD NOT have been
# called inside
# `Portal.open_context().__aexit__()`.
assert not sleeper_ctx.cancel_called
# XXX NOTE XXX: and see matching comment above but,
# this flag is set only AFTER the `.open_context()`
# has exited and should be set in both outcomes
# including the case where ctx-cancel handling
# itself errors.
assert sleeper_ctx.cancelled_caught
assert sleeper_ctx.cancel_called_remote[0] == 'sleeper'
# await tractor.pause()
raise # always to ensure teardown
if error_during_ctxerr_handling:
with pytest.raises(RuntimeError) as excinfo:
trio.run(main)
else:
with pytest.raises(ContextCancelled) as excinfo:
trio.run(main)
assert excinfo.value.type == ContextCancelled
assert excinfo.value.canceller[0] == 'canceller'

View File

@ -23,8 +23,8 @@ from exceptiongroup import BaseExceptionGroup
from ._clustering import open_actor_cluster from ._clustering import open_actor_cluster
from ._ipc import Channel from ._ipc import Channel
from ._context import ( from ._context import (
Context, # the type Context,
context, # a func-decorator context,
) )
from ._streaming import ( from ._streaming import (
MsgStream, MsgStream,

View File

@ -86,51 +86,26 @@ class Context:
''' '''
chan: Channel chan: Channel
cid: str # "context id", more or less a unique linked-task-pair id cid: str
# the "feeder" channels for delivering message values to the # these are the "feeder" channels for delivering
# local task from the runtime's msg processing loop. # message values to the local task from the runtime
# msg processing loop.
_recv_chan: trio.MemoryReceiveChannel _recv_chan: trio.MemoryReceiveChannel
_send_chan: trio.MemorySendChannel _send_chan: trio.MemorySendChannel
# the "invocation type" of the far end task-entry-point
# function, normally matching a logic block inside
# `._runtime.invoke()`.
_remote_func_type: str | None = None _remote_func_type: str | None = None
# NOTE: (for now) only set (a portal) on the caller side since # only set on the caller side
# the callee doesn't generally need a ref to one and should _portal: Portal | None = None # type: ignore # noqa
# normally need to explicitly ask for handle to its peer if
# more the the `Context` is needed?
_portal: Portal | None = None
# NOTE: each side of the context has its own cancel scope
# which is exactly the primitive that allows for
# cross-actor-task-supervision and thus SC.
_scope: trio.CancelScope | None = None
_result: Any | int = None _result: Any | int = None
_remote_error: BaseException | None = None _remote_error: BaseException | None = None
# cancellation state # cancellation state
_cancel_called: bool = False # did WE cancel the far end? _cancel_called: bool = False
_cancelled_remote: tuple[str, str] | None = None _cancelled_remote: tuple | None = None
_cancel_msg: str | None = None
# NOTE: we try to ensure assignment of a "cancel msg" since _scope: trio.CancelScope | None = None
# there's always going to be an "underlying reason" that any
# context was closed due to either a remote side error or
# a call to `.cancel()` which triggers `ContextCancelled`.
_cancel_msg: str | dict | None = None
# NOTE: this state var used by the runtime to determine if the
# `pdbp` REPL is allowed to engage on contexts terminated via
# a `ContextCancelled` due to a call to `.cancel()` triggering
# "graceful closure" on either side:
# - `._runtime._invoke()` will check this flag before engaging
# the crash handler REPL in such cases where the "callee"
# raises the cancellation,
# - `.devx._debug.lock_tty_for_child()` will set it to `False` if
# the global tty-lock has been configured to filter out some
# actors from being able to acquire the debugger lock.
_enter_debugger_on_cancel: bool = True _enter_debugger_on_cancel: bool = True
@property @property
@ -198,76 +173,41 @@ class Context:
async def _maybe_cancel_and_set_remote_error( async def _maybe_cancel_and_set_remote_error(
self, self,
error: BaseException, error_msg: dict[str, Any],
) -> None: ) -> None:
''' '''
(Maybe) cancel this local scope due to a received remote (Maybe) unpack and raise a msg error into the local scope
error (normally via an IPC msg) which the actor runtime nursery for this context.
routes to this context.
Acts as a form of "relay" for a remote error raised in the Acts as a form of "relay" for a remote error raised
corresponding remote task's `Context` wherein the next time in the corresponding remote callee task.
the local task exectutes a checkpoint, a `trio.Cancelled`
will be raised and depending on the type and source of the
original remote error, and whether or not the local task
called `.cancel()` itself prior, an equivalent
`ContextCancelled` or `RemoteActorError` wrapping the
remote error may be raised here by any of,
- `Portal.open_context()`
- `Portal.result()`
- `Context.open_stream()`
- `Context.result()`
when called/closed by actor local task(s).
NOTEs & TODOs:
- It is expected that the caller has previously unwrapped
the remote error using a call to `unpack_error()` and
provides that output exception value as the input
`error` argument here.
- If this is an error message from a context opened by
`Portal.open_context()` we want to interrupt any
ongoing local tasks operating within that `Context`'s
cancel-scope so as to be notified ASAP of the remote
error and engage any caller handling (eg. for
cross-process task supervision).
- In some cases we may want to raise the remote error
immediately since there is no guarantee the locally
operating task(s) will attempt to execute a checkpoint
any time soon; in such cases there are 2 possible
approaches depending on the current task's work and
wrapping "thread" type:
- `trio`-native-and-graceful: only ever wait for tasks
to exec a next `trio.lowlevel.checkpoint()` assuming
that any such task must do so to interact with the
actor runtime and IPC interfaces.
- (NOT IMPLEMENTED) system-level-aggressive: maybe we
could eventually interrupt sync code (invoked using
`trio.to_thread` or some other adapter layer) with
a signal (a custom unix one for example?
https://stackoverflow.com/a/5744185) depending on the
task's wrapping thread-type such that long running
sync code should never cause the delay of actor
supervision tasks such as cancellation and respawn
logic.
''' '''
# XXX: currently this should only be used when # If this is an error message from a context opened by
# `Portal.open_context()` has been opened since it's # ``Portal.open_context()`` we want to interrupt any ongoing
# assumed that other portal APIs like, # (child) tasks within that context to be notified of the remote
# - `Portal.run()`, # error relayed here.
# - `ActorNursery.run_in_actor()` #
# do their own error checking at their own call points and # The reason we may want to raise the remote error immediately
# result processing. # is that there is no guarantee the associated local task(s)
# will attempt to read from any locally opened stream any time
# soon.
#
# NOTE: this only applies when
# ``Portal.open_context()`` has been called since it is assumed
# (currently) that other portal APIs (``Portal.run()``,
# ``.run_in_actor()``) do their own error checking at the point
# of the call and result processing.
error = unpack_error(
error_msg,
self.chan,
)
# XXX: set the remote side's error so that after we cancel # XXX: set the remote side's error so that after we cancel
# whatever task is the opener of this context it can raise # whatever task is the opener of this context it can raise
# that error as the reason. # that error as the reason.
self._remote_error: BaseException = error self._remote_error = error
# always record the remote actor's uid since its cancellation # always record the remote actor's uid since its cancellation
# state is directly linked to ours (the local one). # state is directly linked to ours (the local one).
@ -292,25 +232,35 @@ class Context:
else: else:
log.error( log.error(
f'Remote context error for {self.chan.uid}:{self.cid}:\n' f'Remote context error for {self.chan.uid}:{self.cid}:\n'
f'{error}' f'{error_msg["error"]["tb_str"]}'
) )
# TODO: tempted to **not** do this by-reraising in a # TODO: tempted to **not** do this by-reraising in a
# nursery and instead cancel a surrounding scope, detect # nursery and instead cancel a surrounding scope, detect
# the cancellation, then lookup the error that was set? # the cancellation, then lookup the error that was set?
# YES! this is way better and simpler! # YES! this is way better and simpler!
if self._scope: if (
self._scope
):
# from trio.testing import wait_all_tasks_blocked # from trio.testing import wait_all_tasks_blocked
# await wait_all_tasks_blocked() # await wait_all_tasks_blocked()
# self._cancelled_remote = self.chan.uid # self._cancelled_remote = self.chan.uid
self._scope.cancel() self._scope.cancel()
# this REPL usage actually works here BD # NOTE: this usage actually works here B)
# from .devx._debug import pause # from ._debug import breakpoint
# await pause() # await breakpoint()
# XXX: this will break early callee results sending
# since when `.result()` is finally called, this
# chan will be closed..
# if self._recv_chan:
# await self._recv_chan.aclose()
async def cancel( async def cancel(
self, self,
msg: str | None = None,
timeout: float = 0.616, timeout: float = 0.616,
# timeout: float = 1000,
) -> None: ) -> None:
''' '''
@ -320,12 +270,15 @@ class Context:
Timeout quickly in an attempt to sidestep 2-generals... Timeout quickly in an attempt to sidestep 2-generals...
''' '''
side: str = 'caller' if self._portal else 'callee' side = 'caller' if self._portal else 'callee'
log.cancel( if msg:
f'Cancelling {side} side of context to {self.chan.uid}' assert side == 'callee', 'Only callee side can provide cancel msg'
)
self._cancel_called: bool = True log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
self._cancel_called = True
# await _debug.breakpoint()
# breakpoint()
if side == 'caller': if side == 'caller':
if not self._portal: if not self._portal:
@ -333,13 +286,12 @@ class Context:
"No portal found, this is likely a callee side context" "No portal found, this is likely a callee side context"
) )
cid: str = self.cid cid = self.cid
with trio.move_on_after(timeout) as cs: with trio.move_on_after(timeout) as cs:
cs.shield = True cs.shield = True
log.cancel( log.cancel(
f'Cancelling stream {cid} to ' f"Cancelling stream {cid} to "
f'{self._portal.channel.uid}' f"{self._portal.channel.uid}")
)
# NOTE: we're telling the far end actor to cancel a task # NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel # corresponding to *this actor*. The far end local channel
@ -358,17 +310,17 @@ class Context:
# if not self._portal.channel.connected(): # if not self._portal.channel.connected():
if not self.chan.connected(): if not self.chan.connected():
log.cancel( log.cancel(
'May have failed to cancel remote task ' "May have failed to cancel remote task "
f'{cid} for {self._portal.channel.uid}' f"{cid} for {self._portal.channel.uid}")
)
else: else:
log.cancel( log.cancel(
'Timed out on cancel request of remote task ' "Timed out on cancelling remote task "
f'{cid} for {self._portal.channel.uid}' f"{cid} for {self._portal.channel.uid}")
)
# callee side remote task # callee side remote task
else: else:
self._cancel_msg = msg
# TODO: should we have an explicit cancel message # TODO: should we have an explicit cancel message
# or is relaying the local `trio.Cancelled` as an # or is relaying the local `trio.Cancelled` as an
# {'error': trio.Cancelled, cid: "blah"} enough? # {'error': trio.Cancelled, cid: "blah"} enough?
@ -379,6 +331,7 @@ class Context:
@acm @acm
async def open_stream( async def open_stream(
self, self,
allow_overruns: bool | None = False, allow_overruns: bool | None = False,
msg_buffer_size: int | None = None, msg_buffer_size: int | None = None,
@ -397,10 +350,10 @@ class Context:
``Portal.open_context()``. In the future this may change but ``Portal.open_context()``. In the future this may change but
currently there seems to be no obvious reason to support currently there seems to be no obvious reason to support
"re-opening": "re-opening":
- pausing a stream can be done with a message. - pausing a stream can be done with a message.
- task errors will normally require a restart of the entire - task errors will normally require a restart of the entire
scope of the inter-actor task context due to the nature of scope of the inter-actor task context due to the nature of
``trio``'s cancellation system. ``trio``'s cancellation system.
''' '''
actor = current_actor() actor = current_actor()
@ -482,19 +435,18 @@ class Context:
self, self,
err: Exception, err: Exception,
) -> None: ) -> None:
'''
Maybe raise a remote error depending on who (which task from
which actor) requested a cancellation (if any).
'''
# NOTE: whenever the context's "opener" side (task) **is** # NOTE: whenever the context's "opener" side (task) **is**
# the side which requested the cancellation (likekly via # the side which requested the cancellation (likekly via
# ``Context.cancel()``), we don't want to re-raise that # ``Context.cancel()``), we don't want to re-raise that
# cancellation signal locally (would be akin to # cancellation signal locally (would be akin to
# a ``trio.Nursery`` nursery raising ``trio.Cancelled`` # a ``trio.Nursery`` nursery raising ``trio.Cancelled``
# whenever ``CancelScope.cancel()`` was called) and # whenever ``CancelScope.cancel()`` was called) and instead
# instead silently reap the expected cancellation # silently reap the expected cancellation "error"-msg.
# "error"-msg. # if 'pikerd' in err.msgdata['tb_str']:
# # from . import _debug
# # await _debug.breakpoint()
# breakpoint()
if ( if (
isinstance(err, ContextCancelled) isinstance(err, ContextCancelled)
and ( and (
@ -505,18 +457,7 @@ class Context:
): ):
return err return err
# NOTE: currently we are masking underlying runtime errors raise err # from None
# which are often superfluous to user handler code. not
# sure if this is still needed / desired for all operation?
# TODO: maybe we can only NOT mask if:
# - [ ] debug mode is enabled or,
# - [ ] a certain log level is set?
# - [ ] consider using `.with_traceback()` to filter out
# runtime frames from the tb explicitly?
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
# https://stackoverflow.com/a/24752607
__tracebackhide__: bool = True
raise err from None
async def result(self) -> Any | Exception: async def result(self) -> Any | Exception:
''' '''
@ -544,12 +485,16 @@ class Context:
of the remote cancellation. of the remote cancellation.
''' '''
__tracebackhide__: bool = True
assert self._portal, "Context.result() can not be called from callee!" assert self._portal, "Context.result() can not be called from callee!"
assert self._recv_chan assert self._recv_chan
if re := self._remote_error: # from . import _debug
return self._maybe_raise_remote_err(re) # await _debug.breakpoint()
re = self._remote_error
if re:
self._maybe_raise_remote_err(re)
return re
if ( if (
self._result == id(self) self._result == id(self)
@ -560,9 +505,9 @@ class Context:
# and discarding any bi dir stream msgs still # and discarding any bi dir stream msgs still
# in transit from the far end. # in transit from the far end.
while True: while True:
msg = await self._recv_chan.receive()
try: try:
msg = await self._recv_chan.receive() self._result = msg['return']
self._result: Any = msg['return']
# NOTE: we don't need to do this right? # NOTE: we don't need to do this right?
# XXX: only close the rx mem chan AFTER # XXX: only close the rx mem chan AFTER
@ -571,26 +516,6 @@ class Context:
# await self._recv_chan.aclose() # await self._recv_chan.aclose()
break break
# NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases:
# 1. we requested the cancellation and thus
# SHOULD NOT raise that far end error,
# 2. WE DID NOT REQUEST that cancel and thus
# SHOULD RAISE HERE!
except trio.Cancelled:
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is the
# (likely) source cause of this local runtime
# task's cancellation.
if re := self._remote_error:
self._maybe_raise_remote_err(re)
# CASE 1: we DID request the cancel we simply
# continue to bubble up as normal.
raise
except KeyError: # as msgerr: except KeyError: # as msgerr:
if 'yield' in msg: if 'yield' in msg:
@ -604,8 +529,7 @@ class Context:
# internal error should never get here # internal error should never get here
assert msg.get('cid'), ( assert msg.get('cid'), (
"Received internal error at portal?" "Received internal error at portal?")
)
err = unpack_error( err = unpack_error(
msg, msg,
@ -613,12 +537,9 @@ class Context:
) # from msgerr ) # from msgerr
err = self._maybe_raise_remote_err(err) err = self._maybe_raise_remote_err(err)
self._remote_error = err self._remote_err = err
if re := self._remote_error: return self._remote_error or self._result
return self._maybe_raise_remote_err(re)
return self._result
async def started( async def started(
self, self,
@ -627,7 +548,7 @@ class Context:
) -> None: ) -> None:
''' '''
Indicate to calling actor's task that this linked context Indicate to calling actor's task that this linked context
has started and send ``value`` to the other side via IPC. has started and send ``value`` to the other side.
On the calling side ``value`` is the second item delivered On the calling side ``value`` is the second item delivered
in the tuple returned by ``Portal.open_context()``. in the tuple returned by ``Portal.open_context()``.
@ -635,17 +556,19 @@ class Context:
''' '''
if self._portal: if self._portal:
raise RuntimeError( raise RuntimeError(
f'Caller side context {self} can not call started!' f"Caller side context {self} can not call started!")
)
elif self._started_called: elif self._started_called:
raise RuntimeError( raise RuntimeError(
f'called `.started()` twice on context with {self.chan.uid}' f"called 'started' twice on context with {self.chan.uid}")
)
await self.chan.send({'started': value, 'cid': self.cid}) await self.chan.send({'started': value, 'cid': self.cid})
self._started_called = True self._started_called = True
# TODO: do we need a restart api?
# async def restart(self) -> None:
# pass
async def _drain_overflows( async def _drain_overflows(
self, self,
) -> None: ) -> None:
@ -700,21 +623,10 @@ class Context:
self, self,
msg: dict, msg: dict,
# draining: bool = False, draining: bool = False,
) -> bool: ) -> bool:
'''
Deliver an IPC msg received from a transport-channel to
this context's underlying mem chan for handling by
user operating tasks; deliver a bool indicating whether the
msg was immediately sent.
If `._allow_overruns == True` (maybe) append the msg to an
"overflow queue" and start a "drainer task" (inside the
`._scope_nursery: trio.Nursery`) which ensures that such
messages are eventually sent if possible.
'''
cid = self.cid cid = self.cid
chan = self.chan chan = self.chan
uid = chan.uid uid = chan.uid
@ -725,12 +637,8 @@ class Context:
) )
error = msg.get('error') error = msg.get('error')
if error := unpack_error( if error:
msg, await self._maybe_cancel_and_set_remote_error(msg)
self.chan,
):
self._cancel_msg = msg
await self._maybe_cancel_and_set_remote_error(error)
if ( if (
self._in_overrun self._in_overrun
@ -762,7 +670,6 @@ class Context:
# the sender; the main motivation is that using bp can block the # the sender; the main motivation is that using bp can block the
# msg handling loop which calls into this method! # msg handling loop which calls into this method!
except trio.WouldBlock: except trio.WouldBlock:
# XXX: always push an error even if the local # XXX: always push an error even if the local
# receiver is in overrun state. # receiver is in overrun state.
# await self._maybe_cancel_and_set_remote_error(msg) # await self._maybe_cancel_and_set_remote_error(msg)

View File

@ -39,11 +39,8 @@ class ActorFailure(Exception):
class RemoteActorError(Exception): class RemoteActorError(Exception):
'''
Remote actor exception bundled locally
'''
# TODO: local recontruction of remote exception deats # TODO: local recontruction of remote exception deats
"Remote actor exception bundled locally"
def __init__( def __init__(
self, self,
message: str, message: str,
@ -113,24 +110,18 @@ class AsyncioCancelled(Exception):
def pack_error( def pack_error(
exc: BaseException, exc: BaseException,
tb: str | None = None, tb=None,
) -> dict[str, dict]: ) -> dict[str, Any]:
''' """Create an "error message" for tranmission over
Create an "error message" encoded for wire transport via an IPC a channel (aka the wire).
`Channel`; expected to be unpacked on the receiver side using """
`unpack_error()` below.
'''
if tb: if tb:
tb_str = ''.join(traceback.format_tb(tb)) tb_str = ''.join(traceback.format_tb(tb))
else: else:
tb_str = traceback.format_exc() tb_str = traceback.format_exc()
error_msg: dict[ error_msg = {
str,
str | tuple[str, str]
] = {
'tb_str': tb_str, 'tb_str': tb_str,
'type_str': type(exc).__name__, 'type_str': type(exc).__name__,
'src_actor_uid': current_actor().uid, 'src_actor_uid': current_actor().uid,
@ -148,33 +139,23 @@ def unpack_error(
chan=None, chan=None,
err_type=RemoteActorError err_type=RemoteActorError
) -> None | Exception: ) -> Exception:
''' '''
Unpack an 'error' message from the wire Unpack an 'error' message from the wire
into a local `RemoteActorError` (subtype). into a local ``RemoteActorError``.
NOTE: this routine DOES not RAISE the embedded remote error,
which is the responsibilitiy of the caller.
''' '''
__tracebackhide__: bool = True __tracebackhide__ = True
error = msg['error']
error_dict: dict[str, dict] | None tb_str = error.get('tb_str', '')
if ( message = f"{chan.uid}\n" + tb_str
error_dict := msg.get('error') type_name = error['type_str']
) is None:
# no error field, nothing to unpack.
return None
# retrieve the remote error's msg encoded details
tb_str: str = error_dict.get('tb_str', '')
message: str = f'{chan.uid}\n' + tb_str
type_name: str = error_dict['type_str']
suberror_type: Type[BaseException] = Exception suberror_type: Type[BaseException] = Exception
if type_name == 'ContextCancelled': if type_name == 'ContextCancelled':
err_type = ContextCancelled err_type = ContextCancelled
suberror_type = err_type suberror_type = RemoteActorError
else: # try to lookup a suitable local error type else: # try to lookup a suitable local error type
for ns in [ for ns in [
@ -183,19 +164,18 @@ def unpack_error(
eg, eg,
trio, trio,
]: ]:
if suberror_type := getattr( try:
ns, suberror_type = getattr(ns, type_name)
type_name,
False,
):
break break
except AttributeError:
continue
exc = err_type( exc = err_type(
message, message,
suberror_type=suberror_type, suberror_type=suberror_type,
# unpack other fields into error type init # unpack other fields into error type init
**error_dict, **msg['error'],
) )
return exc return exc

View File

@ -15,12 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
Memory "portal" contruct. Memory boundary "Portals": an API for structured
concurrency linked tasks running in disparate memory domains.
"Memory portals" are both an API and set of IPC wrapping primitives
for managing structured concurrency "cancel-scope linked" tasks
running in disparate virtual memory domains - at least in different
OS processes, possibly on different (hardware) hosts.
''' '''
from __future__ import annotations from __future__ import annotations
@ -70,21 +66,20 @@ def _unwrap_msg(
raise unpack_error(msg, channel) from None raise unpack_error(msg, channel) from None
# TODO: maybe move this to ._exceptions?
class MessagingError(Exception): class MessagingError(Exception):
'Some kind of unexpected SC messaging dialog issue' 'Some kind of unexpected SC messaging dialog issue'
class Portal: class Portal:
''' '''
A 'portal' to a memory-domain-separated `Actor`. A 'portal' to a(n) (remote) ``Actor``.
A portal is "opened" (and eventually closed) by one side of an A portal is "opened" (and eventually closed) by one side of an
inter-actor communication context. The side which opens the portal inter-actor communication context. The side which opens the portal
is equivalent to a "caller" in function parlance and usually is is equivalent to a "caller" in function parlance and usually is
either the called actor's parent (in process tree hierarchy terms) either the called actor's parent (in process tree hierarchy terms)
or a client interested in scheduling work to be done remotely in a or a client interested in scheduling work to be done remotely in a
process which has a separate (virtual) memory domain. far process.
The portal api allows the "caller" actor to invoke remote routines The portal api allows the "caller" actor to invoke remote routines
and receive results through an underlying ``tractor.Channel`` as and receive results through an underlying ``tractor.Channel`` as
@ -94,9 +89,9 @@ class Portal:
like having a "portal" between the seperate actor memory spaces. like having a "portal" between the seperate actor memory spaces.
''' '''
# global timeout for remote cancel requests sent to # the timeout for a remote cancel request sent to
# connected (peer) actors. # a(n) (peer) actor.
cancel_timeout: float = 0.5 cancel_timeout = 0.5
def __init__(self, channel: Channel) -> None: def __init__(self, channel: Channel) -> None:
self.channel = channel self.channel = channel
@ -196,15 +191,7 @@ class Portal:
) -> bool: ) -> bool:
''' '''
Cancel the actor runtime (and thus process) on the far Cancel the actor on the other end of this portal.
end of this portal.
**NOTE** THIS CANCELS THE ENTIRE RUNTIME AND THE
SUBPROCESS, it DOES NOT just cancel the remote task. If you
want to have a handle to cancel a remote ``tri.Task`` look
at `.open_context()` and the definition of
`._context.Context.cancel()` which CAN be used for this
purpose.
''' '''
if not self.channel.connected(): if not self.channel.connected():
@ -398,32 +385,12 @@ class Portal:
) -> AsyncGenerator[tuple[Context, Any], None]: ) -> AsyncGenerator[tuple[Context, Any], None]:
''' '''
Open an inter-actor "task context"; a remote task is Open an inter-actor task context.
scheduled and cancel-scope-state-linked to a `trio.run()` across
memory boundaries in another actor's runtime.
This is an `@acm` API which allows for deterministic setup This is a synchronous API which allows for deterministic
and teardown of a remotely scheduled task in another remote setup/teardown of a remote task. The yielded ``Context`` further
actor. Once opened, the 2 now "linked" tasks run completely allows for opening bidirectional streams, explicit cancellation
in parallel in each actor's runtime with their enclosing and synchronized final result collection. See ``tractor.Context``.
`trio.CancelScope`s kept in a synced state wherein if
either side errors or cancels an equivalent error is
relayed to the other side via an SC-compat IPC protocol.
The yielded `tuple` is a pair delivering a `tractor.Context`
and any first value "sent" by the "callee" task via a call
to `Context.started(<value: Any>)`; this side of the
context does not unblock until the "callee" task calls
`.started()` in similar style to `trio.Nursery.start()`.
When the "callee" (side that is "called"/started by a call
to *this* method) returns, the caller side (this) unblocks
and any final value delivered from the other end can be
retrieved using the `Contex.result()` api.
The yielded ``Context`` instance further allows for opening
bidirectional streams, explicit cancellation and
structurred-concurrency-synchronized final result-msg
collection. See ``tractor.Context`` for more details.
''' '''
# conduct target func method structural checks # conduct target func method structural checks
@ -456,52 +423,47 @@ class Portal:
) )
assert ctx._remote_func_type == 'context' assert ctx._remote_func_type == 'context'
msg: dict = await ctx._recv_chan.receive() msg = await ctx._recv_chan.receive()
try: try:
# the "first" value here is delivered by the callee's # the "first" value here is delivered by the callee's
# ``Context.started()`` call. # ``Context.started()`` call.
first = msg['started'] first = msg['started']
ctx._started_called: bool = True ctx._started_called = True
except KeyError: except KeyError:
if not (cid := msg.get('cid')): assert msg.get('cid'), ("Received internal error at context?")
raise MessagingError(
'Received internal error at context?\n'
'No call-id (cid) in startup msg?'
)
if msg.get('error'): if msg.get('error'):
# NOTE: mask the key error with the remote one # raise kerr from unpack_error(msg, self.channel)
raise unpack_error(msg, self.channel) from None raise unpack_error(msg, self.channel) from None
else: else:
raise MessagingError( raise MessagingError(
f'Context for {cid} was expecting a `started` message' f'Context for {ctx.cid} was expecting a `started` message'
' but received a non-error msg:\n' f' but received a non-error msg:\n{pformat(msg)}'
f'{pformat(msg)}'
) )
_err: BaseException | None = None
ctx._portal: Portal = self ctx._portal: Portal = self
uid: tuple = self.channel.uid uid: tuple = self.channel.uid
cid: str = ctx.cid cid: str = ctx.cid
etype: Type[BaseException] | None = None
# placeholder for any exception raised in the runtime # deliver context instance and .started() msg value in enter
# or by user tasks which cause this context's closure. # tuple.
scope_err: BaseException | None = None
try: try:
async with trio.open_nursery() as nurse: async with trio.open_nursery() as nurse:
ctx._scope_nursery: trio.Nursery = nurse ctx._scope_nursery = nurse
ctx._scope: trio.CancelScope = nurse.cancel_scope ctx._scope = nurse.cancel_scope
# deliver context instance and .started() msg value
# in enter tuple.
yield ctx, first yield ctx, first
# when in allow_overruns mode there may be # when in allow_ovveruns mode there may be lingering
# lingering overflow sender tasks remaining? # overflow sender tasks remaining?
if nurse.child_tasks: if nurse.child_tasks:
# XXX: ensure we are in overrun state # ensure we are in overrun state with
# with ``._allow_overruns=True`` bc otherwise # ``._allow_overruns=True`` bc otherwise
# there should be no tasks in this nursery! # there should be no tasks in this nursery!
if ( if (
not ctx._allow_overruns not ctx._allow_overruns
@ -509,72 +471,47 @@ class Portal:
): ):
raise RuntimeError( raise RuntimeError(
'Context has sub-tasks but is ' 'Context has sub-tasks but is '
'not in `allow_overruns=True` mode!?' 'not in `allow_overruns=True` Mode!?'
) )
# ensure cancel of all overflow sender tasks
# started in the ctx nursery.
ctx._scope.cancel() ctx._scope.cancel()
# XXX: (maybe) shield/mask context-cancellations that were except ContextCancelled as err:
# initiated by any of the context's 2 tasks. There are _err = err
# subsequently 2 operating cases for a "graceful cancel"
# of a `Context`:
#
# 1.*this* side's task called `Context.cancel()`, in
# which case we mask the `ContextCancelled` from bubbling
# to the opener (much like how `trio.Nursery` swallows
# any `trio.Cancelled` bubbled by a call to
# `Nursery.cancel_scope.cancel()`)
#
# 2.*the other* side's (callee/spawned) task cancelled due
# to a self or peer cancellation request in which case we
# DO let the error bubble to the opener.
except ContextCancelled as ctxc:
scope_err = ctxc
# CASE 1: this context was never cancelled # swallow and mask cross-actor task context cancels that
# via a local task's call to `Context.cancel()`. # were initiated by *this* side's task.
if not ctx._cancel_called: if not ctx._cancel_called:
# XXX: this should NEVER happen! # XXX: this should NEVER happen!
# from ._debug import breakpoint # from ._debug import breakpoint
# await breakpoint() # await breakpoint()
raise raise
# CASE 2: context was cancelled by local task calling # if the context was cancelled by client code
# `.cancel()`, we don't raise and the exit block should # then we don't need to raise since user code
# exit silently. # is expecting this and the block should exit.
else: else:
log.debug( log.debug(f'Context {ctx} cancelled gracefully')
f'Context {ctx} cancelled gracefully with:\n'
f'{ctxc}'
)
except ( except (
# - a standard error in the caller/yieldee BaseException,
Exception,
# - a runtime teardown exception-group and/or # more specifically, we need to handle these but not
# cancellation request from a caller task. # sure it's worth being pedantic:
BaseExceptionGroup, # Exception,
trio.Cancelled, # trio.Cancelled,
KeyboardInterrupt, # KeyboardInterrupt,
) as err: ) as err:
scope_err = err etype = type(err)
# XXX: request cancel of this context on any error. # cancel ourselves on any error.
# NOTE: `Context.cancel()` is conversely NOT called in
# the `ContextCancelled` "cancellation requested" case
# above.
log.cancel( log.cancel(
'Context cancelled for task due to\n' 'Context cancelled for task, sending cancel request..\n'
f'{err}\n'
'Sending cancel request..\n'
f'task:{cid}\n' f'task:{cid}\n'
f'actor:{uid}' f'actor:{uid}'
) )
try: try:
await ctx.cancel() await ctx.cancel()
except trio.BrokenResourceError: except trio.BrokenResourceError:
log.warning( log.warning(
@ -583,9 +520,8 @@ class Portal:
f'actor:{uid}' f'actor:{uid}'
) )
raise # duh raise
# no scope error case
else: else:
if ctx.chan.connected(): if ctx.chan.connected():
log.info( log.info(
@ -593,20 +529,10 @@ class Portal:
f'task: {cid}\n' f'task: {cid}\n'
f'actor: {uid}' f'actor: {uid}'
) )
# XXX NOTE XXX: the below call to
# `Context.result()` will ALWAYS raise
# a `ContextCancelled` (via an embedded call to
# `Context._maybe_raise_remote_err()`) IFF
# a `Context._remote_error` was set by the runtime
# via a call to
# `Context._maybe_cancel_and_set_remote_error()`
# which IS SET any time the far end fails and
# causes "caller side" cancellation via
# a `ContextCancelled` here.
result = await ctx.result() result = await ctx.result()
log.runtime( log.runtime(
f'Context {fn_name} returned value from callee:\n' f'Context {fn_name} returned '
f'`{result}`' f'value from callee `{result}`'
) )
finally: finally:
@ -614,73 +540,22 @@ class Portal:
# operating *in* this scope to have survived # operating *in* this scope to have survived
# we tear down the runtime feeder chan last # we tear down the runtime feeder chan last
# to avoid premature stream clobbers. # to avoid premature stream clobbers.
rxchan: trio.ReceiveChannel = ctx._recv_chan if ctx._recv_chan is not None:
if ( # should we encapsulate this in the context api?
rxchan await ctx._recv_chan.aclose()
# maybe TODO: yes i know the below check is if etype:
# touching `trio` memchan internals..BUT, there are
# only a couple ways to avoid a `trio.Cancelled`
# bubbling from the `.aclose()` call below:
#
# - catch and mask it via the cancel-scope-shielded call
# as we are rn (manual and frowned upon) OR,
# - specially handle the case where `scope_err` is
# one of {`BaseExceptionGroup`, `trio.Cancelled`}
# and then presume that the `.aclose()` call will
# raise a `trio.Cancelled` and just don't call it
# in those cases..
#
# that latter approach is more logic, LOC, and more
# convoluted so for now stick with the first
# psuedo-hack-workaround where we just try to avoid
# the shielded call as much as we can detect from
# the memchan's `._closed` state..
#
# XXX MOTIVATION XXX-> we generally want to raise
# any underlying actor-runtime/internals error that
# surfaces from a bug in tractor itself so it can
# be easily detected/fixed AND, we also want to
# minimize noisy runtime tracebacks (normally due
# to the cross-actor linked task scope machinery
# teardown) displayed to user-code and instead only
# displaying `ContextCancelled` traces where the
# cause of crash/exit IS due to something in
# user/app code on either end of the context.
and not rxchan._closed
):
# XXX NOTE XXX: and again as per above, we mask any
# `trio.Cancelled` raised here so as to NOT mask
# out any exception group or legit (remote) ctx
# error that sourced from the remote task or its
# runtime.
with trio.CancelScope(shield=True):
await ctx._recv_chan.aclose()
# XXX: since we always (maybe) re-raise (and thus also
# mask runtime machinery related
# multi-`trio.Cancelled`s) any scope error which was
# the underlying cause of this context's exit, add
# different log msgs for each of the (2) cases.
if scope_err is not None:
etype: Type[BaseException] = type(scope_err)
# CASE 2
if ctx._cancel_called: if ctx._cancel_called:
log.cancel( log.cancel(
f'Context {fn_name} cancelled by caller with\n' f'Context {fn_name} cancelled by caller with\n{etype}'
f'{etype}'
) )
elif _err is not None:
# CASE 1
else:
log.cancel( log.cancel(
f'Context cancelled by callee with {etype}\n' f'Context for task cancelled by callee with {etype}\n'
f'target: `{fn_name}`\n' f'target: `{fn_name}`\n'
f'task:{cid}\n' f'task:{cid}\n'
f'actor:{uid}' f'actor:{uid}'
) )
# XXX: (MEGA IMPORTANT) if this is a root opened process we # XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the # wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside # context from the runtime msg loop otherwise inside
@ -689,9 +564,10 @@ class Portal:
# a "stop" msg for a stream), this can result in a deadlock # a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the # where the root is waiting on the lock to clear but the
# child has already cleared it and clobbered IPC. # child has already cleared it and clobbered IPC.
from ._debug import maybe_wait_for_debugger
await maybe_wait_for_debugger()
# FINALLY, remove the context from runtime tracking and # remove the context from runtime tracking
# exit Bo
self.actor._contexts.pop( self.actor._contexts.pop(
(self.channel.uid, ctx.cid), (self.channel.uid, ctx.cid),
None, None,

View File

@ -347,13 +347,14 @@ async def _invoke(
and ctx._enter_debugger_on_cancel and ctx._enter_debugger_on_cancel
) )
): ):
# XXX QUESTION XXX: is there any case where we'll # XXX: is there any case where we'll want to debug IPC
# want to debug IPC disconnects as a default? # disconnects as a default?
# => I can't think of a reason that inspecting this #
# type of failure will be useful for respawns or # I can't think of a reason that inspecting
# recovery logic - the only case is some kind of # this type of failure will be useful for respawns or
# strange bug in our transport layer itself? Going # recovery logic - the only case is some kind of strange bug
# to keep this open ended for now. # in our transport layer itself? Going to keep this
# open ended for now.
entered_debug = await _debug._maybe_enter_pm(err) entered_debug = await _debug._maybe_enter_pm(err)
if not entered_debug: if not entered_debug:
@ -447,18 +448,17 @@ class Actor:
(swappable) network protocols. (swappable) network protocols.
Each "actor" is ``trio.run()`` scheduled "runtime" composed of Each "actor" is ``trio.run()`` scheduled "runtime" composed of many
many concurrent tasks in a single thread. The "runtime" tasks concurrent tasks in a single thread. The "runtime" tasks conduct
conduct a slew of low(er) level functions to make it possible a slew of low(er) level functions to make it possible for message
for message passing between actors as well as the ability to passing between actors as well as the ability to create new actors
create new actors (aka new "runtimes" in new processes which (aka new "runtimes" in new processes which are supervised via
are supervised via a nursery construct). Each task which sends a nursery construct). Each task which sends messages to a task in
messages to a task in a "peer" (not necessarily a parent-child, a "peer" (not necessarily a parent-child, depth hierarchy)) is able
depth hierarchy) is able to do so via an "address", which maps to do so via an "address", which maps IPC connections across memory
IPC connections across memory boundaries, and a task request id boundaries, and task request id which allows for per-actor
which allows for per-actor tasks to send and receive messages tasks to send and receive messages to specific peer-actor tasks with
to specific peer-actor tasks with which there is an ongoing which there is an ongoing RPC/IPC dialog.
RPC/IPC dialog.
''' '''
# ugh, we need to get rid of this and replace with a "registry" sys # ugh, we need to get rid of this and replace with a "registry" sys

View File

@ -199,10 +199,6 @@ async def do_hard_kill(
proc: trio.Process, proc: trio.Process,
terminate_after: int = 3, terminate_after: int = 3,
# NOTE: for mucking with `.pause()`-ing inside the runtime
# whilst also hacking on it XD
# terminate_after: int = 99999,
) -> None: ) -> None:
# NOTE: this timeout used to do nothing since we were shielding # NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much # the ``.wait()`` inside ``new_proc()`` which will pretty much

View File

@ -54,60 +54,6 @@ log = get_logger(__name__)
# messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]): # messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
# - use __slots__ on ``Context``? # - use __slots__ on ``Context``?
def _raise_from_no_yield_msg(
stream: MsgStream,
msg: dict,
src_err: KeyError,
) -> bool:
'''
Raise an appopriate local error when a `MsgStream` msg arrives
which does not contain the expected (under normal operation)
`'yield'` field.
'''
# internal error should never get here
assert msg.get('cid'), ("Received internal error at portal?")
# TODO: handle 2 cases with 3.10+ match syntax
# - 'stop'
# - 'error'
# possibly just handle msg['stop'] here!
if stream._closed:
raise trio.ClosedResourceError('This stream was closed')
if msg.get('stop') or stream._eoc:
log.debug(f"{stream} was stopped at remote end")
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc = True
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel from src_err
# TODO: test that shows stream raising an expected error!!!
elif msg.get('error'):
# raise the error message
raise unpack_error(msg, stream._ctx.chan)
# always re-raise the source error if no translation error
# case is activated above.
raise src_err
# raise RuntimeError(
# 'Unknown non-yield stream msg?\n'
# f'{msg}'
# )
class MsgStream(trio.abc.Channel): class MsgStream(trio.abc.Channel):
''' '''
@ -145,20 +91,11 @@ class MsgStream(trio.abc.Channel):
# delegate directly to underlying mem channel # delegate directly to underlying mem channel
def receive_nowait(self): def receive_nowait(self):
msg = self._rx_chan.receive_nowait() msg = self._rx_chan.receive_nowait()
try: return msg['yield']
return msg['yield']
except KeyError as kerr:
_raise_from_no_yield_msg(
stream=self,
msg=msg,
src_err=kerr,
)
async def receive(self): async def receive(self):
''' '''Async receive a single msg from the IPC transport, the next
Receive a single msg from the IPC transport, the next in in sequence for this stream.
sequence sent by the far end task (possibly in order as
determined by the underlying protocol).
''' '''
# see ``.aclose()`` for notes on the old behaviour prior to # see ``.aclose()`` for notes on the old behaviour prior to
@ -173,12 +110,43 @@ class MsgStream(trio.abc.Channel):
msg = await self._rx_chan.receive() msg = await self._rx_chan.receive()
return msg['yield'] return msg['yield']
except KeyError as kerr: except KeyError as err:
_raise_from_no_yield_msg( # internal error should never get here
stream=self, assert msg.get('cid'), ("Received internal error at portal?")
msg=msg,
src_err=kerr, # TODO: handle 2 cases with 3.10 match syntax
) # - 'stop'
# - 'error'
# possibly just handle msg['stop'] here!
if self._closed:
raise trio.ClosedResourceError('This stream was closed')
if msg.get('stop') or self._eoc:
log.debug(f"{self} was stopped at remote end")
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
self._eoc = True
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await self.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel from err
# TODO: test that shows stream raising an expected error!!!
elif msg.get('error'):
# raise the error message
raise unpack_error(msg, self._ctx.chan)
else:
raise
except ( except (
trio.ClosedResourceError, # by self._rx_chan trio.ClosedResourceError, # by self._rx_chan

View File

@ -193,39 +193,15 @@ def get_logger(
''' '''
log = rlog = logging.getLogger(_root_name) log = rlog = logging.getLogger(_root_name)
if ( if name and name != _proj_name:
name
and name != _proj_name
):
# NOTE: for handling for modules that use ``get_logger(__name__)`` # handling for modules that use ``get_logger(__name__)`` to
# we make the following stylistic choice: # avoid duplicate project-package token in msg output
# - always avoid duplicate project-package token rname, _, tail = name.partition('.')
# in msg output: i.e. tractor.tractor _ipc.py in header if rname == _root_name:
# looks ridiculous XD name = tail
# - never show the leaf module name in the {name} part
# since in python the {filename} is always this same
# module-file.
sub_name: None | str = None
rname, _, sub_name = name.partition('.')
pkgpath, _, modfilename = sub_name.rpartition('.')
# NOTE: for tractor itself never include the last level
# module key in the name such that something like: eg.
# 'tractor.trionics._broadcast` only includes the first
# 2 tokens in the (coloured) name part.
if rname == 'tractor':
sub_name = pkgpath
if _root_name in sub_name:
duplicate, _, sub_name = sub_name.partition('.')
if not sub_name:
log = rlog
else:
log = rlog.getChild(sub_name)
log = rlog.getChild(name)
log.level = rlog.level log.level = rlog.level
# add our actor-task aware adapter which will dynamically look up # add our actor-task aware adapter which will dynamically look up
@ -278,7 +254,3 @@ def get_console_log(
def get_loglevel() -> str: def get_loglevel() -> str:
return _default_loglevel return _default_loglevel
# global module logger for tractor itself
log = get_logger('tractor')

View File

@ -43,62 +43,38 @@ Built-in messaging patterns, types, APIs and helpers.
# - https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type # - https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type
from __future__ import annotations from __future__ import annotations
from inspect import isfunction
from pkgutil import resolve_name from pkgutil import resolve_name
class NamespacePath(str): class NamespacePath(str):
''' '''
A serializeable description of a (function) Python object A serializeable description of a (function) Python object location
location described by the target's module path and namespace described by the target's module path and namespace key meant as
key meant as a message-native "packet" to allows actors to a message-native "packet" to allows actors to point-and-load objects
point-and-load objects by an absolute ``str`` (and thus by absolute reference.
serializable) reference.
''' '''
_ref: object | type | None = None _ref: object = None
def load_ref(self) -> object | type: def load_ref(self) -> object:
if self._ref is None: if self._ref is None:
self._ref = resolve_name(self) self._ref = resolve_name(self)
return self._ref return self._ref
@staticmethod def to_tuple(
def _mk_fqnp(ref: type | object) -> tuple[str, str]: self,
'''
Generate a minial ``str`` pair which describes a python
object's namespace path and object/type name.
In more precise terms something like: ) -> tuple[str, str]:
- 'py.namespace.path:object_name', ref = self.load_ref()
- eg.'tractor.msg:NamespacePath' will be the ``str`` form return ref.__module__, getattr(ref, '__name__', '')
of THIS type XD
'''
if (
isinstance(ref, object)
and not isfunction(ref)
):
name: str = type(ref).__name__
else:
name: str = getattr(ref, '__name__')
# fully qualified namespace path, tuple.
fqnp: tuple[str, str] = (
ref.__module__,
name,
)
return fqnp
@classmethod @classmethod
def from_ref( def from_ref(
cls, cls,
ref: type | object, ref,
) -> NamespacePath: ) -> NamespacePath:
return cls(':'.join(
fqnp: tuple[str, str] = cls._mk_fqnp(ref) (ref.__module__,
return cls(':'.join(fqnp)) getattr(ref, '__name__', ''))
))
def to_tuple(self) -> tuple[str, str]:
return self._mk_fqnp(self.load_ref())

View File

@ -70,7 +70,6 @@ async def _enter_and_wait(
unwrapped: dict[int, T], unwrapped: dict[int, T],
all_entered: trio.Event, all_entered: trio.Event,
parent_exit: trio.Event, parent_exit: trio.Event,
seed: int,
) -> None: ) -> None:
''' '''
@ -81,10 +80,7 @@ async def _enter_and_wait(
async with mngr as value: async with mngr as value:
unwrapped[id(mngr)] = value unwrapped[id(mngr)] = value
if all( if all(unwrapped.values()):
val != seed
for val in unwrapped.values()
):
all_entered.set() all_entered.set()
await parent_exit.wait() await parent_exit.wait()
@ -95,13 +91,7 @@ async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]], mngrs: Sequence[AsyncContextManager[T]],
) -> AsyncGenerator[ ) -> AsyncGenerator[tuple[Optional[T], ...], None]:
tuple[
T | None,
...
],
None,
]:
''' '''
Concurrently enter a sequence of async context managers, each in Concurrently enter a sequence of async context managers, each in
a separate ``trio`` task and deliver the unwrapped values in the a separate ``trio`` task and deliver the unwrapped values in the
@ -114,11 +104,7 @@ async def gather_contexts(
entered and exited, and cancellation just works. entered and exited, and cancellation just works.
''' '''
seed: int = id(mngrs) unwrapped: dict[int, Optional[T]] = {}.fromkeys(id(mngr) for mngr in mngrs)
unwrapped: dict[int, T | None] = {}.fromkeys(
(id(mngr) for mngr in mngrs),
seed,
)
all_entered = trio.Event() all_entered = trio.Event()
parent_exit = trio.Event() parent_exit = trio.Event()
@ -130,9 +116,8 @@ async def gather_contexts(
if not mngrs: if not mngrs:
raise ValueError( raise ValueError(
'`.trionics.gather_contexts()` input mngrs is empty?\n' 'input mngrs is empty?\n'
'Did try to use inline generator syntax?\n' 'Did try to use inline generator syntax?'
'Use a non-lazy iterator or sequence type intead!'
) )
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
@ -143,7 +128,6 @@ async def gather_contexts(
unwrapped, unwrapped,
all_entered, all_entered,
parent_exit, parent_exit,
seed,
) )
# deliver control once all managers have started up # deliver control once all managers have started up