Compare commits
18 Commits
7507e269ec
...
fdf0c43bfa
Author | SHA1 | Date |
---|---|---|
|
fdf0c43bfa | |
|
f895c96600 | |
|
ca1a1476bb | |
|
a7c36a9cbe | |
|
22e4b324b1 | |
|
89ed8b67ff | |
|
11bbf15817 | |
|
a18663213a | |
|
d4d09b6071 | |
|
6d10f0c516 | |
|
fa9b57bae0 | |
|
81776a6238 | |
|
144d1f4d94 | |
|
51fdf3524c | |
|
cff69d07fe | |
|
ee94d6d62c | |
|
89b84ed6c0 | |
|
f33f689f34 |
|
@ -65,21 +65,28 @@ async def aggregate(seed):
|
||||||
print("AGGREGATOR COMPLETE!")
|
print("AGGREGATOR COMPLETE!")
|
||||||
|
|
||||||
|
|
||||||
# this is the main actor and *arbiter*
|
async def main() -> list[int]:
|
||||||
async def main():
|
'''
|
||||||
# a nursery which spawns "actors"
|
This is the "root" actor's main task's entrypoint.
|
||||||
async with tractor.open_nursery(
|
|
||||||
arbiter_addr=('127.0.0.1', 1616)
|
By default (and if not otherwise specified) that root process
|
||||||
) as nursery:
|
also acts as a "registry actor" / "registrar" on the localhost
|
||||||
|
for the purposes of multi-actor "service discovery".
|
||||||
|
|
||||||
|
'''
|
||||||
|
# yes, a nursery which spawns `trio`-"actors" B)
|
||||||
|
nursery: tractor.ActorNursery
|
||||||
|
async with tractor.open_nursery() as nursery:
|
||||||
|
|
||||||
seed = int(1e3)
|
seed = int(1e3)
|
||||||
pre_start = time.time()
|
pre_start = time.time()
|
||||||
|
|
||||||
portal = await nursery.start_actor(
|
portal: tractor.Portal = await nursery.start_actor(
|
||||||
name='aggregator',
|
name='aggregator',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
stream: tractor.MsgStream
|
||||||
async with portal.open_stream_from(
|
async with portal.open_stream_from(
|
||||||
aggregate,
|
aggregate,
|
||||||
seed=seed,
|
seed=seed,
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
'''
|
'''
|
||||||
``async with ():`` inlined context-stream cancellation testing.
|
``async with ():`` inlined context-stream cancellation testing.
|
||||||
|
|
||||||
Verify the we raise errors when streams are opened prior to sync-opening
|
Verify the we raise errors when streams are opened prior to
|
||||||
a ``tractor.Context`` beforehand.
|
sync-opening a ``tractor.Context`` beforehand.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
|
@ -922,93 +922,3 @@ def test_maybe_allow_overruns_stream(
|
||||||
# if this hits the logic blocks from above are not
|
# if this hits the logic blocks from above are not
|
||||||
# exhaustive..
|
# exhaustive..
|
||||||
pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
|
pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
|
||||||
async def sleep_forever(
|
|
||||||
ctx: tractor.Context,
|
|
||||||
) -> None:
|
|
||||||
await ctx.started()
|
|
||||||
async with ctx.open_stream():
|
|
||||||
await trio.sleep_forever()
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
|
||||||
async def attach_to_sleep_forever():
|
|
||||||
'''
|
|
||||||
Cancel a context **before** any underlying error is raised in order
|
|
||||||
to trigger a local reception of a ``ContextCancelled`` which **should not**
|
|
||||||
be re-raised in the local surrounding ``Context`` *iff* the cancel was
|
|
||||||
requested by **this** side of the context.
|
|
||||||
|
|
||||||
'''
|
|
||||||
async with tractor.wait_for_actor('sleeper') as p2:
|
|
||||||
async with (
|
|
||||||
p2.open_context(sleep_forever) as (peer_ctx, first),
|
|
||||||
peer_ctx.open_stream(),
|
|
||||||
):
|
|
||||||
try:
|
|
||||||
yield
|
|
||||||
finally:
|
|
||||||
# XXX: previously this would trigger local
|
|
||||||
# ``ContextCancelled`` to be received and raised in the
|
|
||||||
# local context overriding any local error due to logic
|
|
||||||
# inside ``_invoke()`` which checked for an error set on
|
|
||||||
# ``Context._error`` and raised it in a cancellation
|
|
||||||
# scenario.
|
|
||||||
# ------
|
|
||||||
# The problem is you can have a remote cancellation that
|
|
||||||
# is part of a local error and we shouldn't raise
|
|
||||||
# ``ContextCancelled`` **iff** we **were not** the side
|
|
||||||
# of the context to initiate it, i.e.
|
|
||||||
# ``Context._cancel_called`` should **NOT** have been
|
|
||||||
# set. The special logic to handle this case is now
|
|
||||||
# inside ``Context._maybe_raise_from_remote_msg()`` XD
|
|
||||||
await peer_ctx.cancel()
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
|
||||||
async def error_before_started(
|
|
||||||
ctx: tractor.Context,
|
|
||||||
) -> None:
|
|
||||||
'''
|
|
||||||
This simulates exactly an original bug discovered in:
|
|
||||||
https://github.com/pikers/piker/issues/244
|
|
||||||
|
|
||||||
'''
|
|
||||||
async with attach_to_sleep_forever():
|
|
||||||
# send an unserializable type which should raise a type error
|
|
||||||
# here and **NOT BE SWALLOWED** by the surrounding acm!!?!
|
|
||||||
await ctx.started(object())
|
|
||||||
|
|
||||||
|
|
||||||
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
|
|
||||||
'''
|
|
||||||
Verify that an error raised in a remote context which itself opens
|
|
||||||
another remote context, which it cancels, does not ovverride the
|
|
||||||
original error that caused the cancellation of the secondardy
|
|
||||||
context.
|
|
||||||
|
|
||||||
'''
|
|
||||||
async def main():
|
|
||||||
async with tractor.open_nursery() as n:
|
|
||||||
portal = await n.start_actor(
|
|
||||||
'errorer',
|
|
||||||
enable_modules=[__name__],
|
|
||||||
)
|
|
||||||
await n.start_actor(
|
|
||||||
'sleeper',
|
|
||||||
enable_modules=[__name__],
|
|
||||||
)
|
|
||||||
|
|
||||||
async with (
|
|
||||||
portal.open_context(
|
|
||||||
error_before_started
|
|
||||||
) as (ctx, sent),
|
|
||||||
):
|
|
||||||
await trio.sleep_forever()
|
|
||||||
|
|
||||||
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
|
||||||
trio.run(main)
|
|
||||||
|
|
||||||
assert excinfo.value.type == TypeError
|
|
||||||
|
|
|
@ -0,0 +1,451 @@
|
||||||
|
'''
|
||||||
|
Codify the cancellation request semantics in terms
|
||||||
|
of one remote actor cancelling another.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# from contextlib import asynccontextmanager as acm
|
||||||
|
import itertools
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import trio
|
||||||
|
import tractor
|
||||||
|
from tractor import ( # typing
|
||||||
|
Portal,
|
||||||
|
Context,
|
||||||
|
ContextCancelled,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# def test_self_cancel():
|
||||||
|
# '''
|
||||||
|
# 2 cases:
|
||||||
|
# - calls `Actor.cancel()` locally in some task
|
||||||
|
# - calls LocalPortal.cancel_actor()` ?
|
||||||
|
|
||||||
|
# '''
|
||||||
|
# ...
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def sleep_forever(
|
||||||
|
ctx: Context,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Sync the context, open a stream then just sleep.
|
||||||
|
|
||||||
|
'''
|
||||||
|
await ctx.started()
|
||||||
|
async with ctx.open_stream():
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def error_before_started(
|
||||||
|
ctx: Context,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
This simulates exactly an original bug discovered in:
|
||||||
|
https://github.com/pikers/piker/issues/244
|
||||||
|
|
||||||
|
Cancel a context **before** any underlying error is raised so
|
||||||
|
as to trigger a local reception of a ``ContextCancelled`` which
|
||||||
|
SHOULD NOT be re-raised in the local surrounding ``Context``
|
||||||
|
*iff* the cancel was requested by **this** (callee) side of
|
||||||
|
the context.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async with tractor.wait_for_actor('sleeper') as p2:
|
||||||
|
async with (
|
||||||
|
p2.open_context(sleep_forever) as (peer_ctx, first),
|
||||||
|
peer_ctx.open_stream(),
|
||||||
|
):
|
||||||
|
# NOTE: this WAS inside an @acm body but i factored it
|
||||||
|
# out and just put it inline here since i don't think
|
||||||
|
# the mngr part really matters, though maybe it could?
|
||||||
|
try:
|
||||||
|
# XXX NOTE XXX: THIS sends an UNSERIALIZABLE TYPE which
|
||||||
|
# should raise a `TypeError` and **NOT BE SWALLOWED** by
|
||||||
|
# the surrounding try/finally (normally inside the
|
||||||
|
# body of some acm)..
|
||||||
|
await ctx.started(object())
|
||||||
|
# yield
|
||||||
|
finally:
|
||||||
|
# XXX: previously this would trigger local
|
||||||
|
# ``ContextCancelled`` to be received and raised in the
|
||||||
|
# local context overriding any local error due to logic
|
||||||
|
# inside ``_invoke()`` which checked for an error set on
|
||||||
|
# ``Context._error`` and raised it in a cancellation
|
||||||
|
# scenario.
|
||||||
|
# ------
|
||||||
|
# The problem is you can have a remote cancellation that
|
||||||
|
# is part of a local error and we shouldn't raise
|
||||||
|
# ``ContextCancelled`` **iff** we **were not** the side
|
||||||
|
# of the context to initiate it, i.e.
|
||||||
|
# ``Context._cancel_called`` should **NOT** have been
|
||||||
|
# set. The special logic to handle this case is now
|
||||||
|
# inside ``Context._maybe_raise_from_remote_msg()`` XD
|
||||||
|
await peer_ctx.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
|
||||||
|
'''
|
||||||
|
Verify that an error raised in a remote context which itself
|
||||||
|
opens YET ANOTHER remote context, which it then cancels, does not
|
||||||
|
override the original error that caused the cancellation of the
|
||||||
|
secondary context.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async def main():
|
||||||
|
async with tractor.open_nursery() as n:
|
||||||
|
portal = await n.start_actor(
|
||||||
|
'errorer',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
await n.start_actor(
|
||||||
|
'sleeper',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
|
||||||
|
async with (
|
||||||
|
portal.open_context(
|
||||||
|
error_before_started
|
||||||
|
) as (ctx, sent),
|
||||||
|
):
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
assert excinfo.value.type == TypeError
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def sleep_a_bit_then_cancel_peer(
|
||||||
|
ctx: Context,
|
||||||
|
peer_name: str = 'sleeper',
|
||||||
|
cancel_after: float = .5,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Connect to peer, sleep as per input delay, cancel the peer.
|
||||||
|
|
||||||
|
'''
|
||||||
|
peer: Portal
|
||||||
|
async with tractor.wait_for_actor(peer_name) as peer:
|
||||||
|
await ctx.started()
|
||||||
|
await trio.sleep(cancel_after)
|
||||||
|
await peer.cancel_actor()
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def stream_ints(
|
||||||
|
ctx: Context,
|
||||||
|
):
|
||||||
|
await ctx.started()
|
||||||
|
async with ctx.open_stream() as stream:
|
||||||
|
for i in itertools.count():
|
||||||
|
await stream.send(i)
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def stream_from_peer(
|
||||||
|
ctx: Context,
|
||||||
|
peer_name: str = 'sleeper',
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
peer: Portal
|
||||||
|
try:
|
||||||
|
async with (
|
||||||
|
tractor.wait_for_actor(peer_name) as peer,
|
||||||
|
peer.open_context(stream_ints) as (peer_ctx, first),
|
||||||
|
peer_ctx.open_stream() as stream,
|
||||||
|
):
|
||||||
|
await ctx.started()
|
||||||
|
# XXX TODO: big set of questions for this
|
||||||
|
# - should we raise `ContextCancelled` or `Cancelled` (rn
|
||||||
|
# it does that) here?!
|
||||||
|
# - test the `ContextCancelled` OUTSIDE the
|
||||||
|
# `.open_context()` call?
|
||||||
|
try:
|
||||||
|
async for msg in stream:
|
||||||
|
print(msg)
|
||||||
|
|
||||||
|
except trio.Cancelled:
|
||||||
|
assert not ctx.cancel_called
|
||||||
|
assert not ctx.cancelled_caught
|
||||||
|
|
||||||
|
assert not peer_ctx.cancel_called
|
||||||
|
assert not peer_ctx.cancelled_caught
|
||||||
|
|
||||||
|
assert 'root' in ctx.cancel_called_remote
|
||||||
|
|
||||||
|
raise # XXX MUST NEVER MASK IT!!
|
||||||
|
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await tractor.pause()
|
||||||
|
# pass
|
||||||
|
# pytest.fail(
|
||||||
|
raise RuntimeError(
|
||||||
|
'peer never triggered local `[Context]Cancelled`?!?'
|
||||||
|
)
|
||||||
|
|
||||||
|
# NOTE: cancellation of the (sleeper) peer should always
|
||||||
|
# cause a `ContextCancelled` raise in this streaming
|
||||||
|
# actor.
|
||||||
|
except ContextCancelled as ctxerr:
|
||||||
|
assert ctxerr.canceller == 'canceller'
|
||||||
|
assert ctxerr._remote_error is ctxerr
|
||||||
|
|
||||||
|
# CASE 1: we were cancelled by our parent, the root actor.
|
||||||
|
# TODO: there are other cases depending on how the root
|
||||||
|
# actor and it's caller side task are written:
|
||||||
|
# - if the root does not req us to cancel then an
|
||||||
|
# IPC-transport related error should bubble from the async
|
||||||
|
# for loop and thus cause local cancellation both here
|
||||||
|
# and in the root (since in that case this task cancels the
|
||||||
|
# context with the root, not the other way around)
|
||||||
|
assert ctx.cancel_called_remote[0] == 'root'
|
||||||
|
raise
|
||||||
|
|
||||||
|
# except BaseException as err:
|
||||||
|
|
||||||
|
# raise
|
||||||
|
|
||||||
|
# cases:
|
||||||
|
# - some arbitrary remote peer cancels via Portal.cancel_actor().
|
||||||
|
# => all other connected peers should get that cancel requesting peer's
|
||||||
|
# uid in the ctx-cancelled error msg.
|
||||||
|
|
||||||
|
# - peer spawned a sub-actor which (also) spawned a failing task
|
||||||
|
# which was unhandled and propagated up to the immediate
|
||||||
|
# parent, the peer to the actor that also spawned a remote task
|
||||||
|
# task in that same peer-parent.
|
||||||
|
|
||||||
|
# - peer cancelled itself - so other peers should
|
||||||
|
# get errors reflecting that the peer was itself the .canceller?
|
||||||
|
|
||||||
|
# - WE cancelled the peer and thus should not see any raised
|
||||||
|
# `ContextCancelled` as it should be reaped silently?
|
||||||
|
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
||||||
|
# already covers this case?
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'error_during_ctxerr_handling',
|
||||||
|
[False, True],
|
||||||
|
)
|
||||||
|
def test_peer_canceller(
|
||||||
|
error_during_ctxerr_handling: bool,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify that a cancellation triggered by an in-actor-tree peer
|
||||||
|
results in a cancelled errors with all other actors which have
|
||||||
|
opened contexts to that same actor.
|
||||||
|
|
||||||
|
legend:
|
||||||
|
name>
|
||||||
|
a "play button" that indicates a new runtime instance,
|
||||||
|
an individual actor with `name`.
|
||||||
|
|
||||||
|
.subname>
|
||||||
|
a subactor who's parent should be on some previous
|
||||||
|
line and be less indented.
|
||||||
|
|
||||||
|
.actor0> ()-> .actor1>
|
||||||
|
a inter-actor task context opened (by `async with `Portal.open_context()`)
|
||||||
|
from actor0 *into* actor1.
|
||||||
|
|
||||||
|
.actor0> ()<=> .actor1>
|
||||||
|
a inter-actor task context opened (as above)
|
||||||
|
from actor0 *into* actor1 which INCLUDES an additional
|
||||||
|
stream open using `async with Context.open_stream()`.
|
||||||
|
|
||||||
|
|
||||||
|
------ - ------
|
||||||
|
supervision view
|
||||||
|
------ - ------
|
||||||
|
root>
|
||||||
|
.sleeper> TODO: SOME SYNTAX SHOWING JUST SLEEPING
|
||||||
|
.just_caller> ()=> .sleeper>
|
||||||
|
.canceller> ()-> .sleeper>
|
||||||
|
TODO: how define calling `Portal.cancel_actor()`
|
||||||
|
|
||||||
|
In this case a `ContextCancelled` with `.errorer` set to the
|
||||||
|
requesting actor, in this case 'canceller', should be relayed
|
||||||
|
to all other actors who have also opened a (remote task)
|
||||||
|
context with that now cancelled actor.
|
||||||
|
|
||||||
|
------ - ------
|
||||||
|
task view
|
||||||
|
------ - ------
|
||||||
|
So there are 5 context open in total with 3 from the root to
|
||||||
|
its children and 2 from children to their peers:
|
||||||
|
1. root> ()-> .sleeper>
|
||||||
|
2. root> ()-> .streamer>
|
||||||
|
3. root> ()-> .canceller>
|
||||||
|
|
||||||
|
4. .streamer> ()<=> .sleep>
|
||||||
|
5. .canceller> ()-> .sleeper>
|
||||||
|
- calls `Portal.cancel_actor()`
|
||||||
|
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
async with tractor.open_nursery() as an:
|
||||||
|
canceller: Portal = await an.start_actor(
|
||||||
|
'canceller',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
sleeper: Portal = await an.start_actor(
|
||||||
|
'sleeper',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
just_caller: Portal = await an.start_actor(
|
||||||
|
'just_caller', # but i just met her?
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with (
|
||||||
|
sleeper.open_context(
|
||||||
|
sleep_forever,
|
||||||
|
) as (sleeper_ctx, sent),
|
||||||
|
|
||||||
|
just_caller.open_context(
|
||||||
|
stream_from_peer,
|
||||||
|
) as (caller_ctx, sent),
|
||||||
|
|
||||||
|
canceller.open_context(
|
||||||
|
sleep_a_bit_then_cancel_peer,
|
||||||
|
) as (canceller_ctx, sent),
|
||||||
|
|
||||||
|
):
|
||||||
|
ctxs: list[Context] = [
|
||||||
|
sleeper_ctx,
|
||||||
|
caller_ctx,
|
||||||
|
canceller_ctx,
|
||||||
|
]
|
||||||
|
|
||||||
|
try:
|
||||||
|
print('PRE CONTEXT RESULT')
|
||||||
|
await sleeper_ctx.result()
|
||||||
|
|
||||||
|
# should never get here
|
||||||
|
pytest.fail(
|
||||||
|
'Context.result() did not raise ctx-cancelled?'
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: not sure why this isn't catching
|
||||||
|
# but maybe we need an `ExceptionGroup` and
|
||||||
|
# the whole except *errs: thinger in 3.11?
|
||||||
|
except ContextCancelled as ctxerr:
|
||||||
|
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
|
||||||
|
|
||||||
|
# canceller and caller peers should not
|
||||||
|
# have been remotely cancelled.
|
||||||
|
assert canceller_ctx.cancel_called_remote is None
|
||||||
|
assert caller_ctx.cancel_called_remote is None
|
||||||
|
|
||||||
|
assert ctxerr.canceller[0] == 'canceller'
|
||||||
|
|
||||||
|
# XXX NOTE XXX: since THIS `ContextCancelled`
|
||||||
|
# HAS NOT YET bubbled up to the
|
||||||
|
# `sleeper.open_context().__aexit__()` this
|
||||||
|
# value is not yet set, however outside this
|
||||||
|
# block it should be.
|
||||||
|
assert not sleeper_ctx.cancelled_caught
|
||||||
|
|
||||||
|
# TODO: a test which ensures this error is
|
||||||
|
# bubbled and caught (NOT MASKED) by the
|
||||||
|
# runtime!!!
|
||||||
|
if error_during_ctxerr_handling:
|
||||||
|
raise RuntimeError('Simulated error during teardown')
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
|
# SHOULD NEVER GET HERE!
|
||||||
|
except BaseException:
|
||||||
|
pytest.fail('did not rx ctx-cancelled error?')
|
||||||
|
else:
|
||||||
|
pytest.fail('did not rx ctx-cancelled error?')
|
||||||
|
|
||||||
|
except (
|
||||||
|
ContextCancelled,
|
||||||
|
RuntimeError,
|
||||||
|
)as ctxerr:
|
||||||
|
_err = ctxerr
|
||||||
|
|
||||||
|
if error_during_ctxerr_handling:
|
||||||
|
assert isinstance(ctxerr, RuntimeError)
|
||||||
|
|
||||||
|
# NOTE: this root actor task should have
|
||||||
|
# called `Context.cancel()` on the
|
||||||
|
# `.__aexit__()` to every opened ctx.
|
||||||
|
for ctx in ctxs:
|
||||||
|
assert ctx.cancel_called
|
||||||
|
|
||||||
|
# each context should have received
|
||||||
|
# a silently absorbed context cancellation
|
||||||
|
# from its peer actor's task.
|
||||||
|
assert ctx.chan.uid == ctx.cancel_called_remote
|
||||||
|
|
||||||
|
# this root actor task should have
|
||||||
|
# cancelled all opened contexts except
|
||||||
|
# the sleeper which is cancelled by its
|
||||||
|
# peer "canceller"
|
||||||
|
if ctx is not sleeper_ctx:
|
||||||
|
assert ctx._remote_error.canceller[0] == 'root'
|
||||||
|
|
||||||
|
else:
|
||||||
|
assert ctxerr.canceller[0] == 'canceller'
|
||||||
|
|
||||||
|
# the sleeper's remote error is the error bubbled
|
||||||
|
# out of the context-stack above!
|
||||||
|
re = sleeper_ctx._remote_error
|
||||||
|
assert re is ctxerr
|
||||||
|
|
||||||
|
for ctx in ctxs:
|
||||||
|
|
||||||
|
if ctx is sleeper_ctx:
|
||||||
|
assert not ctx.cancel_called
|
||||||
|
assert ctx.cancelled_caught
|
||||||
|
else:
|
||||||
|
assert ctx.cancel_called
|
||||||
|
assert not ctx.cancelled_caught
|
||||||
|
|
||||||
|
# each context should have received
|
||||||
|
# a silently absorbed context cancellation
|
||||||
|
# from its peer actor's task.
|
||||||
|
assert ctx.chan.uid == ctx.cancel_called_remote
|
||||||
|
|
||||||
|
# NOTE: when an inter-peer cancellation
|
||||||
|
# occurred, we DO NOT expect this
|
||||||
|
# root-actor-task to have requested a cancel of
|
||||||
|
# the context since cancellation was caused by
|
||||||
|
# the "canceller" peer and thus
|
||||||
|
# `Context.cancel()` SHOULD NOT have been
|
||||||
|
# called inside
|
||||||
|
# `Portal.open_context().__aexit__()`.
|
||||||
|
assert not sleeper_ctx.cancel_called
|
||||||
|
|
||||||
|
# XXX NOTE XXX: and see matching comment above but,
|
||||||
|
# this flag is set only AFTER the `.open_context()`
|
||||||
|
# has exited and should be set in both outcomes
|
||||||
|
# including the case where ctx-cancel handling
|
||||||
|
# itself errors.
|
||||||
|
assert sleeper_ctx.cancelled_caught
|
||||||
|
assert sleeper_ctx.cancel_called_remote[0] == 'sleeper'
|
||||||
|
|
||||||
|
# await tractor.pause()
|
||||||
|
raise # always to ensure teardown
|
||||||
|
|
||||||
|
if error_during_ctxerr_handling:
|
||||||
|
with pytest.raises(RuntimeError) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
else:
|
||||||
|
|
||||||
|
with pytest.raises(ContextCancelled) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
assert excinfo.value.type == ContextCancelled
|
||||||
|
assert excinfo.value.canceller[0] == 'canceller'
|
|
@ -23,8 +23,8 @@ from exceptiongroup import BaseExceptionGroup
|
||||||
from ._clustering import open_actor_cluster
|
from ._clustering import open_actor_cluster
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
from ._context import (
|
from ._context import (
|
||||||
Context,
|
Context, # the type
|
||||||
context,
|
context, # a func-decorator
|
||||||
)
|
)
|
||||||
from ._streaming import (
|
from ._streaming import (
|
||||||
MsgStream,
|
MsgStream,
|
||||||
|
|
|
@ -86,26 +86,51 @@ class Context:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
chan: Channel
|
chan: Channel
|
||||||
cid: str
|
cid: str # "context id", more or less a unique linked-task-pair id
|
||||||
|
|
||||||
# these are the "feeder" channels for delivering
|
# the "feeder" channels for delivering message values to the
|
||||||
# message values to the local task from the runtime
|
# local task from the runtime's msg processing loop.
|
||||||
# msg processing loop.
|
|
||||||
_recv_chan: trio.MemoryReceiveChannel
|
_recv_chan: trio.MemoryReceiveChannel
|
||||||
_send_chan: trio.MemorySendChannel
|
_send_chan: trio.MemorySendChannel
|
||||||
|
|
||||||
|
# the "invocation type" of the far end task-entry-point
|
||||||
|
# function, normally matching a logic block inside
|
||||||
|
# `._runtime.invoke()`.
|
||||||
_remote_func_type: str | None = None
|
_remote_func_type: str | None = None
|
||||||
|
|
||||||
# only set on the caller side
|
# NOTE: (for now) only set (a portal) on the caller side since
|
||||||
_portal: Portal | None = None # type: ignore # noqa
|
# the callee doesn't generally need a ref to one and should
|
||||||
|
# normally need to explicitly ask for handle to its peer if
|
||||||
|
# more the the `Context` is needed?
|
||||||
|
_portal: Portal | None = None
|
||||||
|
|
||||||
|
# NOTE: each side of the context has its own cancel scope
|
||||||
|
# which is exactly the primitive that allows for
|
||||||
|
# cross-actor-task-supervision and thus SC.
|
||||||
|
_scope: trio.CancelScope | None = None
|
||||||
_result: Any | int = None
|
_result: Any | int = None
|
||||||
_remote_error: BaseException | None = None
|
_remote_error: BaseException | None = None
|
||||||
|
|
||||||
# cancellation state
|
# cancellation state
|
||||||
_cancel_called: bool = False
|
_cancel_called: bool = False # did WE cancel the far end?
|
||||||
_cancelled_remote: tuple | None = None
|
_cancelled_remote: tuple[str, str] | None = None
|
||||||
_cancel_msg: str | None = None
|
|
||||||
_scope: trio.CancelScope | None = None
|
# NOTE: we try to ensure assignment of a "cancel msg" since
|
||||||
|
# there's always going to be an "underlying reason" that any
|
||||||
|
# context was closed due to either a remote side error or
|
||||||
|
# a call to `.cancel()` which triggers `ContextCancelled`.
|
||||||
|
_cancel_msg: str | dict | None = None
|
||||||
|
|
||||||
|
# NOTE: this state var used by the runtime to determine if the
|
||||||
|
# `pdbp` REPL is allowed to engage on contexts terminated via
|
||||||
|
# a `ContextCancelled` due to a call to `.cancel()` triggering
|
||||||
|
# "graceful closure" on either side:
|
||||||
|
# - `._runtime._invoke()` will check this flag before engaging
|
||||||
|
# the crash handler REPL in such cases where the "callee"
|
||||||
|
# raises the cancellation,
|
||||||
|
# - `.devx._debug.lock_tty_for_child()` will set it to `False` if
|
||||||
|
# the global tty-lock has been configured to filter out some
|
||||||
|
# actors from being able to acquire the debugger lock.
|
||||||
_enter_debugger_on_cancel: bool = True
|
_enter_debugger_on_cancel: bool = True
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -173,41 +198,76 @@ class Context:
|
||||||
|
|
||||||
async def _maybe_cancel_and_set_remote_error(
|
async def _maybe_cancel_and_set_remote_error(
|
||||||
self,
|
self,
|
||||||
error_msg: dict[str, Any],
|
error: BaseException,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
(Maybe) unpack and raise a msg error into the local scope
|
(Maybe) cancel this local scope due to a received remote
|
||||||
nursery for this context.
|
error (normally via an IPC msg) which the actor runtime
|
||||||
|
routes to this context.
|
||||||
|
|
||||||
Acts as a form of "relay" for a remote error raised
|
Acts as a form of "relay" for a remote error raised in the
|
||||||
in the corresponding remote callee task.
|
corresponding remote task's `Context` wherein the next time
|
||||||
|
the local task exectutes a checkpoint, a `trio.Cancelled`
|
||||||
|
will be raised and depending on the type and source of the
|
||||||
|
original remote error, and whether or not the local task
|
||||||
|
called `.cancel()` itself prior, an equivalent
|
||||||
|
`ContextCancelled` or `RemoteActorError` wrapping the
|
||||||
|
remote error may be raised here by any of,
|
||||||
|
|
||||||
|
- `Portal.open_context()`
|
||||||
|
- `Portal.result()`
|
||||||
|
- `Context.open_stream()`
|
||||||
|
- `Context.result()`
|
||||||
|
|
||||||
|
when called/closed by actor local task(s).
|
||||||
|
|
||||||
|
NOTEs & TODOs:
|
||||||
|
- It is expected that the caller has previously unwrapped
|
||||||
|
the remote error using a call to `unpack_error()` and
|
||||||
|
provides that output exception value as the input
|
||||||
|
`error` argument here.
|
||||||
|
- If this is an error message from a context opened by
|
||||||
|
`Portal.open_context()` we want to interrupt any
|
||||||
|
ongoing local tasks operating within that `Context`'s
|
||||||
|
cancel-scope so as to be notified ASAP of the remote
|
||||||
|
error and engage any caller handling (eg. for
|
||||||
|
cross-process task supervision).
|
||||||
|
- In some cases we may want to raise the remote error
|
||||||
|
immediately since there is no guarantee the locally
|
||||||
|
operating task(s) will attempt to execute a checkpoint
|
||||||
|
any time soon; in such cases there are 2 possible
|
||||||
|
approaches depending on the current task's work and
|
||||||
|
wrapping "thread" type:
|
||||||
|
|
||||||
|
- `trio`-native-and-graceful: only ever wait for tasks
|
||||||
|
to exec a next `trio.lowlevel.checkpoint()` assuming
|
||||||
|
that any such task must do so to interact with the
|
||||||
|
actor runtime and IPC interfaces.
|
||||||
|
|
||||||
|
- (NOT IMPLEMENTED) system-level-aggressive: maybe we
|
||||||
|
could eventually interrupt sync code (invoked using
|
||||||
|
`trio.to_thread` or some other adapter layer) with
|
||||||
|
a signal (a custom unix one for example?
|
||||||
|
https://stackoverflow.com/a/5744185) depending on the
|
||||||
|
task's wrapping thread-type such that long running
|
||||||
|
sync code should never cause the delay of actor
|
||||||
|
supervision tasks such as cancellation and respawn
|
||||||
|
logic.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# If this is an error message from a context opened by
|
# XXX: currently this should only be used when
|
||||||
# ``Portal.open_context()`` we want to interrupt any ongoing
|
# `Portal.open_context()` has been opened since it's
|
||||||
# (child) tasks within that context to be notified of the remote
|
# assumed that other portal APIs like,
|
||||||
# error relayed here.
|
# - `Portal.run()`,
|
||||||
#
|
# - `ActorNursery.run_in_actor()`
|
||||||
# The reason we may want to raise the remote error immediately
|
# do their own error checking at their own call points and
|
||||||
# is that there is no guarantee the associated local task(s)
|
# result processing.
|
||||||
# will attempt to read from any locally opened stream any time
|
|
||||||
# soon.
|
|
||||||
#
|
|
||||||
# NOTE: this only applies when
|
|
||||||
# ``Portal.open_context()`` has been called since it is assumed
|
|
||||||
# (currently) that other portal APIs (``Portal.run()``,
|
|
||||||
# ``.run_in_actor()``) do their own error checking at the point
|
|
||||||
# of the call and result processing.
|
|
||||||
error = unpack_error(
|
|
||||||
error_msg,
|
|
||||||
self.chan,
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX: set the remote side's error so that after we cancel
|
# XXX: set the remote side's error so that after we cancel
|
||||||
# whatever task is the opener of this context it can raise
|
# whatever task is the opener of this context it can raise
|
||||||
# that error as the reason.
|
# that error as the reason.
|
||||||
self._remote_error = error
|
self._remote_error: BaseException = error
|
||||||
|
|
||||||
# always record the remote actor's uid since its cancellation
|
# always record the remote actor's uid since its cancellation
|
||||||
# state is directly linked to ours (the local one).
|
# state is directly linked to ours (the local one).
|
||||||
|
@ -232,35 +292,25 @@ class Context:
|
||||||
else:
|
else:
|
||||||
log.error(
|
log.error(
|
||||||
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
|
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
|
||||||
f'{error_msg["error"]["tb_str"]}'
|
f'{error}'
|
||||||
)
|
)
|
||||||
# TODO: tempted to **not** do this by-reraising in a
|
# TODO: tempted to **not** do this by-reraising in a
|
||||||
# nursery and instead cancel a surrounding scope, detect
|
# nursery and instead cancel a surrounding scope, detect
|
||||||
# the cancellation, then lookup the error that was set?
|
# the cancellation, then lookup the error that was set?
|
||||||
# YES! this is way better and simpler!
|
# YES! this is way better and simpler!
|
||||||
if (
|
if self._scope:
|
||||||
self._scope
|
|
||||||
):
|
|
||||||
# from trio.testing import wait_all_tasks_blocked
|
# from trio.testing import wait_all_tasks_blocked
|
||||||
# await wait_all_tasks_blocked()
|
# await wait_all_tasks_blocked()
|
||||||
# self._cancelled_remote = self.chan.uid
|
# self._cancelled_remote = self.chan.uid
|
||||||
self._scope.cancel()
|
self._scope.cancel()
|
||||||
|
|
||||||
# NOTE: this usage actually works here B)
|
# this REPL usage actually works here BD
|
||||||
# from ._debug import breakpoint
|
# from .devx._debug import pause
|
||||||
# await breakpoint()
|
# await pause()
|
||||||
|
|
||||||
# XXX: this will break early callee results sending
|
|
||||||
# since when `.result()` is finally called, this
|
|
||||||
# chan will be closed..
|
|
||||||
# if self._recv_chan:
|
|
||||||
# await self._recv_chan.aclose()
|
|
||||||
|
|
||||||
async def cancel(
|
async def cancel(
|
||||||
self,
|
self,
|
||||||
msg: str | None = None,
|
|
||||||
timeout: float = 0.616,
|
timeout: float = 0.616,
|
||||||
# timeout: float = 1000,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -270,15 +320,12 @@ class Context:
|
||||||
Timeout quickly in an attempt to sidestep 2-generals...
|
Timeout quickly in an attempt to sidestep 2-generals...
|
||||||
|
|
||||||
'''
|
'''
|
||||||
side = 'caller' if self._portal else 'callee'
|
side: str = 'caller' if self._portal else 'callee'
|
||||||
if msg:
|
log.cancel(
|
||||||
assert side == 'callee', 'Only callee side can provide cancel msg'
|
f'Cancelling {side} side of context to {self.chan.uid}'
|
||||||
|
)
|
||||||
|
|
||||||
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
|
self._cancel_called: bool = True
|
||||||
|
|
||||||
self._cancel_called = True
|
|
||||||
# await _debug.breakpoint()
|
|
||||||
# breakpoint()
|
|
||||||
|
|
||||||
if side == 'caller':
|
if side == 'caller':
|
||||||
if not self._portal:
|
if not self._portal:
|
||||||
|
@ -286,12 +333,13 @@ class Context:
|
||||||
"No portal found, this is likely a callee side context"
|
"No portal found, this is likely a callee side context"
|
||||||
)
|
)
|
||||||
|
|
||||||
cid = self.cid
|
cid: str = self.cid
|
||||||
with trio.move_on_after(timeout) as cs:
|
with trio.move_on_after(timeout) as cs:
|
||||||
cs.shield = True
|
cs.shield = True
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"Cancelling stream {cid} to "
|
f'Cancelling stream {cid} to '
|
||||||
f"{self._portal.channel.uid}")
|
f'{self._portal.channel.uid}'
|
||||||
|
)
|
||||||
|
|
||||||
# NOTE: we're telling the far end actor to cancel a task
|
# NOTE: we're telling the far end actor to cancel a task
|
||||||
# corresponding to *this actor*. The far end local channel
|
# corresponding to *this actor*. The far end local channel
|
||||||
|
@ -310,17 +358,17 @@ class Context:
|
||||||
# if not self._portal.channel.connected():
|
# if not self._portal.channel.connected():
|
||||||
if not self.chan.connected():
|
if not self.chan.connected():
|
||||||
log.cancel(
|
log.cancel(
|
||||||
"May have failed to cancel remote task "
|
'May have failed to cancel remote task '
|
||||||
f"{cid} for {self._portal.channel.uid}")
|
f'{cid} for {self._portal.channel.uid}'
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
"Timed out on cancelling remote task "
|
'Timed out on cancel request of remote task '
|
||||||
f"{cid} for {self._portal.channel.uid}")
|
f'{cid} for {self._portal.channel.uid}'
|
||||||
|
)
|
||||||
|
|
||||||
# callee side remote task
|
# callee side remote task
|
||||||
else:
|
else:
|
||||||
self._cancel_msg = msg
|
|
||||||
|
|
||||||
# TODO: should we have an explicit cancel message
|
# TODO: should we have an explicit cancel message
|
||||||
# or is relaying the local `trio.Cancelled` as an
|
# or is relaying the local `trio.Cancelled` as an
|
||||||
# {'error': trio.Cancelled, cid: "blah"} enough?
|
# {'error': trio.Cancelled, cid: "blah"} enough?
|
||||||
|
@ -331,7 +379,6 @@ class Context:
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_stream(
|
async def open_stream(
|
||||||
|
|
||||||
self,
|
self,
|
||||||
allow_overruns: bool | None = False,
|
allow_overruns: bool | None = False,
|
||||||
msg_buffer_size: int | None = None,
|
msg_buffer_size: int | None = None,
|
||||||
|
@ -350,10 +397,10 @@ class Context:
|
||||||
``Portal.open_context()``. In the future this may change but
|
``Portal.open_context()``. In the future this may change but
|
||||||
currently there seems to be no obvious reason to support
|
currently there seems to be no obvious reason to support
|
||||||
"re-opening":
|
"re-opening":
|
||||||
- pausing a stream can be done with a message.
|
- pausing a stream can be done with a message.
|
||||||
- task errors will normally require a restart of the entire
|
- task errors will normally require a restart of the entire
|
||||||
scope of the inter-actor task context due to the nature of
|
scope of the inter-actor task context due to the nature of
|
||||||
``trio``'s cancellation system.
|
``trio``'s cancellation system.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
actor = current_actor()
|
actor = current_actor()
|
||||||
|
@ -435,18 +482,19 @@ class Context:
|
||||||
self,
|
self,
|
||||||
err: Exception,
|
err: Exception,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
'''
|
||||||
|
Maybe raise a remote error depending on who (which task from
|
||||||
|
which actor) requested a cancellation (if any).
|
||||||
|
|
||||||
|
'''
|
||||||
# NOTE: whenever the context's "opener" side (task) **is**
|
# NOTE: whenever the context's "opener" side (task) **is**
|
||||||
# the side which requested the cancellation (likekly via
|
# the side which requested the cancellation (likekly via
|
||||||
# ``Context.cancel()``), we don't want to re-raise that
|
# ``Context.cancel()``), we don't want to re-raise that
|
||||||
# cancellation signal locally (would be akin to
|
# cancellation signal locally (would be akin to
|
||||||
# a ``trio.Nursery`` nursery raising ``trio.Cancelled``
|
# a ``trio.Nursery`` nursery raising ``trio.Cancelled``
|
||||||
# whenever ``CancelScope.cancel()`` was called) and instead
|
# whenever ``CancelScope.cancel()`` was called) and
|
||||||
# silently reap the expected cancellation "error"-msg.
|
# instead silently reap the expected cancellation
|
||||||
# if 'pikerd' in err.msgdata['tb_str']:
|
# "error"-msg.
|
||||||
# # from . import _debug
|
|
||||||
# # await _debug.breakpoint()
|
|
||||||
# breakpoint()
|
|
||||||
|
|
||||||
if (
|
if (
|
||||||
isinstance(err, ContextCancelled)
|
isinstance(err, ContextCancelled)
|
||||||
and (
|
and (
|
||||||
|
@ -457,7 +505,18 @@ class Context:
|
||||||
):
|
):
|
||||||
return err
|
return err
|
||||||
|
|
||||||
raise err # from None
|
# NOTE: currently we are masking underlying runtime errors
|
||||||
|
# which are often superfluous to user handler code. not
|
||||||
|
# sure if this is still needed / desired for all operation?
|
||||||
|
# TODO: maybe we can only NOT mask if:
|
||||||
|
# - [ ] debug mode is enabled or,
|
||||||
|
# - [ ] a certain log level is set?
|
||||||
|
# - [ ] consider using `.with_traceback()` to filter out
|
||||||
|
# runtime frames from the tb explicitly?
|
||||||
|
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
|
||||||
|
# https://stackoverflow.com/a/24752607
|
||||||
|
__tracebackhide__: bool = True
|
||||||
|
raise err from None
|
||||||
|
|
||||||
async def result(self) -> Any | Exception:
|
async def result(self) -> Any | Exception:
|
||||||
'''
|
'''
|
||||||
|
@ -485,16 +544,12 @@ class Context:
|
||||||
of the remote cancellation.
|
of the remote cancellation.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
__tracebackhide__: bool = True
|
||||||
assert self._portal, "Context.result() can not be called from callee!"
|
assert self._portal, "Context.result() can not be called from callee!"
|
||||||
assert self._recv_chan
|
assert self._recv_chan
|
||||||
|
|
||||||
# from . import _debug
|
if re := self._remote_error:
|
||||||
# await _debug.breakpoint()
|
return self._maybe_raise_remote_err(re)
|
||||||
|
|
||||||
re = self._remote_error
|
|
||||||
if re:
|
|
||||||
self._maybe_raise_remote_err(re)
|
|
||||||
return re
|
|
||||||
|
|
||||||
if (
|
if (
|
||||||
self._result == id(self)
|
self._result == id(self)
|
||||||
|
@ -505,9 +560,9 @@ class Context:
|
||||||
# and discarding any bi dir stream msgs still
|
# and discarding any bi dir stream msgs still
|
||||||
# in transit from the far end.
|
# in transit from the far end.
|
||||||
while True:
|
while True:
|
||||||
msg = await self._recv_chan.receive()
|
|
||||||
try:
|
try:
|
||||||
self._result = msg['return']
|
msg = await self._recv_chan.receive()
|
||||||
|
self._result: Any = msg['return']
|
||||||
|
|
||||||
# NOTE: we don't need to do this right?
|
# NOTE: we don't need to do this right?
|
||||||
# XXX: only close the rx mem chan AFTER
|
# XXX: only close the rx mem chan AFTER
|
||||||
|
@ -516,6 +571,26 @@ class Context:
|
||||||
# await self._recv_chan.aclose()
|
# await self._recv_chan.aclose()
|
||||||
|
|
||||||
break
|
break
|
||||||
|
|
||||||
|
# NOTE: we get here if the far end was
|
||||||
|
# `ContextCancelled` in 2 cases:
|
||||||
|
# 1. we requested the cancellation and thus
|
||||||
|
# SHOULD NOT raise that far end error,
|
||||||
|
# 2. WE DID NOT REQUEST that cancel and thus
|
||||||
|
# SHOULD RAISE HERE!
|
||||||
|
except trio.Cancelled:
|
||||||
|
|
||||||
|
# CASE 2: mask the local cancelled-error(s)
|
||||||
|
# only when we are sure the remote error is the
|
||||||
|
# (likely) source cause of this local runtime
|
||||||
|
# task's cancellation.
|
||||||
|
if re := self._remote_error:
|
||||||
|
self._maybe_raise_remote_err(re)
|
||||||
|
|
||||||
|
# CASE 1: we DID request the cancel we simply
|
||||||
|
# continue to bubble up as normal.
|
||||||
|
raise
|
||||||
|
|
||||||
except KeyError: # as msgerr:
|
except KeyError: # as msgerr:
|
||||||
|
|
||||||
if 'yield' in msg:
|
if 'yield' in msg:
|
||||||
|
@ -529,7 +604,8 @@ class Context:
|
||||||
|
|
||||||
# internal error should never get here
|
# internal error should never get here
|
||||||
assert msg.get('cid'), (
|
assert msg.get('cid'), (
|
||||||
"Received internal error at portal?")
|
"Received internal error at portal?"
|
||||||
|
)
|
||||||
|
|
||||||
err = unpack_error(
|
err = unpack_error(
|
||||||
msg,
|
msg,
|
||||||
|
@ -537,9 +613,12 @@ class Context:
|
||||||
) # from msgerr
|
) # from msgerr
|
||||||
|
|
||||||
err = self._maybe_raise_remote_err(err)
|
err = self._maybe_raise_remote_err(err)
|
||||||
self._remote_err = err
|
self._remote_error = err
|
||||||
|
|
||||||
return self._remote_error or self._result
|
if re := self._remote_error:
|
||||||
|
return self._maybe_raise_remote_err(re)
|
||||||
|
|
||||||
|
return self._result
|
||||||
|
|
||||||
async def started(
|
async def started(
|
||||||
self,
|
self,
|
||||||
|
@ -548,7 +627,7 @@ class Context:
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Indicate to calling actor's task that this linked context
|
Indicate to calling actor's task that this linked context
|
||||||
has started and send ``value`` to the other side.
|
has started and send ``value`` to the other side via IPC.
|
||||||
|
|
||||||
On the calling side ``value`` is the second item delivered
|
On the calling side ``value`` is the second item delivered
|
||||||
in the tuple returned by ``Portal.open_context()``.
|
in the tuple returned by ``Portal.open_context()``.
|
||||||
|
@ -556,19 +635,17 @@ class Context:
|
||||||
'''
|
'''
|
||||||
if self._portal:
|
if self._portal:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f"Caller side context {self} can not call started!")
|
f'Caller side context {self} can not call started!'
|
||||||
|
)
|
||||||
|
|
||||||
elif self._started_called:
|
elif self._started_called:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f"called 'started' twice on context with {self.chan.uid}")
|
f'called `.started()` twice on context with {self.chan.uid}'
|
||||||
|
)
|
||||||
|
|
||||||
await self.chan.send({'started': value, 'cid': self.cid})
|
await self.chan.send({'started': value, 'cid': self.cid})
|
||||||
self._started_called = True
|
self._started_called = True
|
||||||
|
|
||||||
# TODO: do we need a restart api?
|
|
||||||
# async def restart(self) -> None:
|
|
||||||
# pass
|
|
||||||
|
|
||||||
async def _drain_overflows(
|
async def _drain_overflows(
|
||||||
self,
|
self,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -623,10 +700,21 @@ class Context:
|
||||||
self,
|
self,
|
||||||
msg: dict,
|
msg: dict,
|
||||||
|
|
||||||
draining: bool = False,
|
# draining: bool = False,
|
||||||
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
|
'''
|
||||||
|
Deliver an IPC msg received from a transport-channel to
|
||||||
|
this context's underlying mem chan for handling by
|
||||||
|
user operating tasks; deliver a bool indicating whether the
|
||||||
|
msg was immediately sent.
|
||||||
|
|
||||||
|
If `._allow_overruns == True` (maybe) append the msg to an
|
||||||
|
"overflow queue" and start a "drainer task" (inside the
|
||||||
|
`._scope_nursery: trio.Nursery`) which ensures that such
|
||||||
|
messages are eventually sent if possible.
|
||||||
|
|
||||||
|
'''
|
||||||
cid = self.cid
|
cid = self.cid
|
||||||
chan = self.chan
|
chan = self.chan
|
||||||
uid = chan.uid
|
uid = chan.uid
|
||||||
|
@ -637,8 +725,12 @@ class Context:
|
||||||
)
|
)
|
||||||
|
|
||||||
error = msg.get('error')
|
error = msg.get('error')
|
||||||
if error:
|
if error := unpack_error(
|
||||||
await self._maybe_cancel_and_set_remote_error(msg)
|
msg,
|
||||||
|
self.chan,
|
||||||
|
):
|
||||||
|
self._cancel_msg = msg
|
||||||
|
await self._maybe_cancel_and_set_remote_error(error)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
self._in_overrun
|
self._in_overrun
|
||||||
|
@ -670,6 +762,7 @@ class Context:
|
||||||
# the sender; the main motivation is that using bp can block the
|
# the sender; the main motivation is that using bp can block the
|
||||||
# msg handling loop which calls into this method!
|
# msg handling loop which calls into this method!
|
||||||
except trio.WouldBlock:
|
except trio.WouldBlock:
|
||||||
|
|
||||||
# XXX: always push an error even if the local
|
# XXX: always push an error even if the local
|
||||||
# receiver is in overrun state.
|
# receiver is in overrun state.
|
||||||
# await self._maybe_cancel_and_set_remote_error(msg)
|
# await self._maybe_cancel_and_set_remote_error(msg)
|
||||||
|
|
|
@ -39,8 +39,11 @@ class ActorFailure(Exception):
|
||||||
|
|
||||||
|
|
||||||
class RemoteActorError(Exception):
|
class RemoteActorError(Exception):
|
||||||
|
'''
|
||||||
|
Remote actor exception bundled locally
|
||||||
|
|
||||||
|
'''
|
||||||
# TODO: local recontruction of remote exception deats
|
# TODO: local recontruction of remote exception deats
|
||||||
"Remote actor exception bundled locally"
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
message: str,
|
message: str,
|
||||||
|
@ -110,18 +113,24 @@ class AsyncioCancelled(Exception):
|
||||||
|
|
||||||
def pack_error(
|
def pack_error(
|
||||||
exc: BaseException,
|
exc: BaseException,
|
||||||
tb=None,
|
tb: str | None = None,
|
||||||
|
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, dict]:
|
||||||
"""Create an "error message" for tranmission over
|
'''
|
||||||
a channel (aka the wire).
|
Create an "error message" encoded for wire transport via an IPC
|
||||||
"""
|
`Channel`; expected to be unpacked on the receiver side using
|
||||||
|
`unpack_error()` below.
|
||||||
|
|
||||||
|
'''
|
||||||
if tb:
|
if tb:
|
||||||
tb_str = ''.join(traceback.format_tb(tb))
|
tb_str = ''.join(traceback.format_tb(tb))
|
||||||
else:
|
else:
|
||||||
tb_str = traceback.format_exc()
|
tb_str = traceback.format_exc()
|
||||||
|
|
||||||
error_msg = {
|
error_msg: dict[
|
||||||
|
str,
|
||||||
|
str | tuple[str, str]
|
||||||
|
] = {
|
||||||
'tb_str': tb_str,
|
'tb_str': tb_str,
|
||||||
'type_str': type(exc).__name__,
|
'type_str': type(exc).__name__,
|
||||||
'src_actor_uid': current_actor().uid,
|
'src_actor_uid': current_actor().uid,
|
||||||
|
@ -139,23 +148,33 @@ def unpack_error(
|
||||||
chan=None,
|
chan=None,
|
||||||
err_type=RemoteActorError
|
err_type=RemoteActorError
|
||||||
|
|
||||||
) -> Exception:
|
) -> None | Exception:
|
||||||
'''
|
'''
|
||||||
Unpack an 'error' message from the wire
|
Unpack an 'error' message from the wire
|
||||||
into a local ``RemoteActorError``.
|
into a local `RemoteActorError` (subtype).
|
||||||
|
|
||||||
|
NOTE: this routine DOES not RAISE the embedded remote error,
|
||||||
|
which is the responsibilitiy of the caller.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__ = True
|
__tracebackhide__: bool = True
|
||||||
error = msg['error']
|
|
||||||
|
|
||||||
tb_str = error.get('tb_str', '')
|
error_dict: dict[str, dict] | None
|
||||||
message = f"{chan.uid}\n" + tb_str
|
if (
|
||||||
type_name = error['type_str']
|
error_dict := msg.get('error')
|
||||||
|
) is None:
|
||||||
|
# no error field, nothing to unpack.
|
||||||
|
return None
|
||||||
|
|
||||||
|
# retrieve the remote error's msg encoded details
|
||||||
|
tb_str: str = error_dict.get('tb_str', '')
|
||||||
|
message: str = f'{chan.uid}\n' + tb_str
|
||||||
|
type_name: str = error_dict['type_str']
|
||||||
suberror_type: Type[BaseException] = Exception
|
suberror_type: Type[BaseException] = Exception
|
||||||
|
|
||||||
if type_name == 'ContextCancelled':
|
if type_name == 'ContextCancelled':
|
||||||
err_type = ContextCancelled
|
err_type = ContextCancelled
|
||||||
suberror_type = RemoteActorError
|
suberror_type = err_type
|
||||||
|
|
||||||
else: # try to lookup a suitable local error type
|
else: # try to lookup a suitable local error type
|
||||||
for ns in [
|
for ns in [
|
||||||
|
@ -164,18 +183,19 @@ def unpack_error(
|
||||||
eg,
|
eg,
|
||||||
trio,
|
trio,
|
||||||
]:
|
]:
|
||||||
try:
|
if suberror_type := getattr(
|
||||||
suberror_type = getattr(ns, type_name)
|
ns,
|
||||||
|
type_name,
|
||||||
|
False,
|
||||||
|
):
|
||||||
break
|
break
|
||||||
except AttributeError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
exc = err_type(
|
exc = err_type(
|
||||||
message,
|
message,
|
||||||
suberror_type=suberror_type,
|
suberror_type=suberror_type,
|
||||||
|
|
||||||
# unpack other fields into error type init
|
# unpack other fields into error type init
|
||||||
**msg['error'],
|
**error_dict,
|
||||||
)
|
)
|
||||||
|
|
||||||
return exc
|
return exc
|
||||||
|
|
|
@ -15,8 +15,12 @@
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
Memory boundary "Portals": an API for structured
|
Memory "portal" contruct.
|
||||||
concurrency linked tasks running in disparate memory domains.
|
|
||||||
|
"Memory portals" are both an API and set of IPC wrapping primitives
|
||||||
|
for managing structured concurrency "cancel-scope linked" tasks
|
||||||
|
running in disparate virtual memory domains - at least in different
|
||||||
|
OS processes, possibly on different (hardware) hosts.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
@ -66,20 +70,21 @@ def _unwrap_msg(
|
||||||
raise unpack_error(msg, channel) from None
|
raise unpack_error(msg, channel) from None
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: maybe move this to ._exceptions?
|
||||||
class MessagingError(Exception):
|
class MessagingError(Exception):
|
||||||
'Some kind of unexpected SC messaging dialog issue'
|
'Some kind of unexpected SC messaging dialog issue'
|
||||||
|
|
||||||
|
|
||||||
class Portal:
|
class Portal:
|
||||||
'''
|
'''
|
||||||
A 'portal' to a(n) (remote) ``Actor``.
|
A 'portal' to a memory-domain-separated `Actor`.
|
||||||
|
|
||||||
A portal is "opened" (and eventually closed) by one side of an
|
A portal is "opened" (and eventually closed) by one side of an
|
||||||
inter-actor communication context. The side which opens the portal
|
inter-actor communication context. The side which opens the portal
|
||||||
is equivalent to a "caller" in function parlance and usually is
|
is equivalent to a "caller" in function parlance and usually is
|
||||||
either the called actor's parent (in process tree hierarchy terms)
|
either the called actor's parent (in process tree hierarchy terms)
|
||||||
or a client interested in scheduling work to be done remotely in a
|
or a client interested in scheduling work to be done remotely in a
|
||||||
far process.
|
process which has a separate (virtual) memory domain.
|
||||||
|
|
||||||
The portal api allows the "caller" actor to invoke remote routines
|
The portal api allows the "caller" actor to invoke remote routines
|
||||||
and receive results through an underlying ``tractor.Channel`` as
|
and receive results through an underlying ``tractor.Channel`` as
|
||||||
|
@ -89,9 +94,9 @@ class Portal:
|
||||||
like having a "portal" between the seperate actor memory spaces.
|
like having a "portal" between the seperate actor memory spaces.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# the timeout for a remote cancel request sent to
|
# global timeout for remote cancel requests sent to
|
||||||
# a(n) (peer) actor.
|
# connected (peer) actors.
|
||||||
cancel_timeout = 0.5
|
cancel_timeout: float = 0.5
|
||||||
|
|
||||||
def __init__(self, channel: Channel) -> None:
|
def __init__(self, channel: Channel) -> None:
|
||||||
self.channel = channel
|
self.channel = channel
|
||||||
|
@ -191,7 +196,15 @@ class Portal:
|
||||||
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
'''
|
'''
|
||||||
Cancel the actor on the other end of this portal.
|
Cancel the actor runtime (and thus process) on the far
|
||||||
|
end of this portal.
|
||||||
|
|
||||||
|
**NOTE** THIS CANCELS THE ENTIRE RUNTIME AND THE
|
||||||
|
SUBPROCESS, it DOES NOT just cancel the remote task. If you
|
||||||
|
want to have a handle to cancel a remote ``tri.Task`` look
|
||||||
|
at `.open_context()` and the definition of
|
||||||
|
`._context.Context.cancel()` which CAN be used for this
|
||||||
|
purpose.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if not self.channel.connected():
|
if not self.channel.connected():
|
||||||
|
@ -385,12 +398,32 @@ class Portal:
|
||||||
|
|
||||||
) -> AsyncGenerator[tuple[Context, Any], None]:
|
) -> AsyncGenerator[tuple[Context, Any], None]:
|
||||||
'''
|
'''
|
||||||
Open an inter-actor task context.
|
Open an inter-actor "task context"; a remote task is
|
||||||
|
scheduled and cancel-scope-state-linked to a `trio.run()` across
|
||||||
|
memory boundaries in another actor's runtime.
|
||||||
|
|
||||||
This is a synchronous API which allows for deterministic
|
This is an `@acm` API which allows for deterministic setup
|
||||||
setup/teardown of a remote task. The yielded ``Context`` further
|
and teardown of a remotely scheduled task in another remote
|
||||||
allows for opening bidirectional streams, explicit cancellation
|
actor. Once opened, the 2 now "linked" tasks run completely
|
||||||
and synchronized final result collection. See ``tractor.Context``.
|
in parallel in each actor's runtime with their enclosing
|
||||||
|
`trio.CancelScope`s kept in a synced state wherein if
|
||||||
|
either side errors or cancels an equivalent error is
|
||||||
|
relayed to the other side via an SC-compat IPC protocol.
|
||||||
|
|
||||||
|
The yielded `tuple` is a pair delivering a `tractor.Context`
|
||||||
|
and any first value "sent" by the "callee" task via a call
|
||||||
|
to `Context.started(<value: Any>)`; this side of the
|
||||||
|
context does not unblock until the "callee" task calls
|
||||||
|
`.started()` in similar style to `trio.Nursery.start()`.
|
||||||
|
When the "callee" (side that is "called"/started by a call
|
||||||
|
to *this* method) returns, the caller side (this) unblocks
|
||||||
|
and any final value delivered from the other end can be
|
||||||
|
retrieved using the `Contex.result()` api.
|
||||||
|
|
||||||
|
The yielded ``Context`` instance further allows for opening
|
||||||
|
bidirectional streams, explicit cancellation and
|
||||||
|
structurred-concurrency-synchronized final result-msg
|
||||||
|
collection. See ``tractor.Context`` for more details.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# conduct target func method structural checks
|
# conduct target func method structural checks
|
||||||
|
@ -423,47 +456,52 @@ class Portal:
|
||||||
)
|
)
|
||||||
|
|
||||||
assert ctx._remote_func_type == 'context'
|
assert ctx._remote_func_type == 'context'
|
||||||
msg = await ctx._recv_chan.receive()
|
msg: dict = await ctx._recv_chan.receive()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# the "first" value here is delivered by the callee's
|
# the "first" value here is delivered by the callee's
|
||||||
# ``Context.started()`` call.
|
# ``Context.started()`` call.
|
||||||
first = msg['started']
|
first = msg['started']
|
||||||
ctx._started_called = True
|
ctx._started_called: bool = True
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
assert msg.get('cid'), ("Received internal error at context?")
|
if not (cid := msg.get('cid')):
|
||||||
|
raise MessagingError(
|
||||||
|
'Received internal error at context?\n'
|
||||||
|
'No call-id (cid) in startup msg?'
|
||||||
|
)
|
||||||
|
|
||||||
if msg.get('error'):
|
if msg.get('error'):
|
||||||
# raise kerr from unpack_error(msg, self.channel)
|
# NOTE: mask the key error with the remote one
|
||||||
raise unpack_error(msg, self.channel) from None
|
raise unpack_error(msg, self.channel) from None
|
||||||
else:
|
else:
|
||||||
raise MessagingError(
|
raise MessagingError(
|
||||||
f'Context for {ctx.cid} was expecting a `started` message'
|
f'Context for {cid} was expecting a `started` message'
|
||||||
f' but received a non-error msg:\n{pformat(msg)}'
|
' but received a non-error msg:\n'
|
||||||
|
f'{pformat(msg)}'
|
||||||
)
|
)
|
||||||
|
|
||||||
_err: BaseException | None = None
|
|
||||||
ctx._portal: Portal = self
|
ctx._portal: Portal = self
|
||||||
|
|
||||||
uid: tuple = self.channel.uid
|
uid: tuple = self.channel.uid
|
||||||
cid: str = ctx.cid
|
cid: str = ctx.cid
|
||||||
etype: Type[BaseException] | None = None
|
|
||||||
|
|
||||||
# deliver context instance and .started() msg value in enter
|
# placeholder for any exception raised in the runtime
|
||||||
# tuple.
|
# or by user tasks which cause this context's closure.
|
||||||
|
scope_err: BaseException | None = None
|
||||||
try:
|
try:
|
||||||
async with trio.open_nursery() as nurse:
|
async with trio.open_nursery() as nurse:
|
||||||
ctx._scope_nursery = nurse
|
ctx._scope_nursery: trio.Nursery = nurse
|
||||||
ctx._scope = nurse.cancel_scope
|
ctx._scope: trio.CancelScope = nurse.cancel_scope
|
||||||
|
|
||||||
|
# deliver context instance and .started() msg value
|
||||||
|
# in enter tuple.
|
||||||
yield ctx, first
|
yield ctx, first
|
||||||
|
|
||||||
# when in allow_ovveruns mode there may be lingering
|
# when in allow_overruns mode there may be
|
||||||
# overflow sender tasks remaining?
|
# lingering overflow sender tasks remaining?
|
||||||
if nurse.child_tasks:
|
if nurse.child_tasks:
|
||||||
# ensure we are in overrun state with
|
# XXX: ensure we are in overrun state
|
||||||
# ``._allow_overruns=True`` bc otherwise
|
# with ``._allow_overruns=True`` bc otherwise
|
||||||
# there should be no tasks in this nursery!
|
# there should be no tasks in this nursery!
|
||||||
if (
|
if (
|
||||||
not ctx._allow_overruns
|
not ctx._allow_overruns
|
||||||
|
@ -471,47 +509,72 @@ class Portal:
|
||||||
):
|
):
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
'Context has sub-tasks but is '
|
'Context has sub-tasks but is '
|
||||||
'not in `allow_overruns=True` Mode!?'
|
'not in `allow_overruns=True` mode!?'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# ensure cancel of all overflow sender tasks
|
||||||
|
# started in the ctx nursery.
|
||||||
ctx._scope.cancel()
|
ctx._scope.cancel()
|
||||||
|
|
||||||
except ContextCancelled as err:
|
# XXX: (maybe) shield/mask context-cancellations that were
|
||||||
_err = err
|
# initiated by any of the context's 2 tasks. There are
|
||||||
|
# subsequently 2 operating cases for a "graceful cancel"
|
||||||
|
# of a `Context`:
|
||||||
|
#
|
||||||
|
# 1.*this* side's task called `Context.cancel()`, in
|
||||||
|
# which case we mask the `ContextCancelled` from bubbling
|
||||||
|
# to the opener (much like how `trio.Nursery` swallows
|
||||||
|
# any `trio.Cancelled` bubbled by a call to
|
||||||
|
# `Nursery.cancel_scope.cancel()`)
|
||||||
|
#
|
||||||
|
# 2.*the other* side's (callee/spawned) task cancelled due
|
||||||
|
# to a self or peer cancellation request in which case we
|
||||||
|
# DO let the error bubble to the opener.
|
||||||
|
except ContextCancelled as ctxc:
|
||||||
|
scope_err = ctxc
|
||||||
|
|
||||||
# swallow and mask cross-actor task context cancels that
|
# CASE 1: this context was never cancelled
|
||||||
# were initiated by *this* side's task.
|
# via a local task's call to `Context.cancel()`.
|
||||||
if not ctx._cancel_called:
|
if not ctx._cancel_called:
|
||||||
# XXX: this should NEVER happen!
|
# XXX: this should NEVER happen!
|
||||||
# from ._debug import breakpoint
|
# from ._debug import breakpoint
|
||||||
# await breakpoint()
|
# await breakpoint()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# if the context was cancelled by client code
|
# CASE 2: context was cancelled by local task calling
|
||||||
# then we don't need to raise since user code
|
# `.cancel()`, we don't raise and the exit block should
|
||||||
# is expecting this and the block should exit.
|
# exit silently.
|
||||||
else:
|
else:
|
||||||
log.debug(f'Context {ctx} cancelled gracefully')
|
log.debug(
|
||||||
|
f'Context {ctx} cancelled gracefully with:\n'
|
||||||
|
f'{ctxc}'
|
||||||
|
)
|
||||||
|
|
||||||
except (
|
except (
|
||||||
BaseException,
|
# - a standard error in the caller/yieldee
|
||||||
|
Exception,
|
||||||
|
|
||||||
# more specifically, we need to handle these but not
|
# - a runtime teardown exception-group and/or
|
||||||
# sure it's worth being pedantic:
|
# cancellation request from a caller task.
|
||||||
# Exception,
|
BaseExceptionGroup,
|
||||||
# trio.Cancelled,
|
trio.Cancelled,
|
||||||
# KeyboardInterrupt,
|
KeyboardInterrupt,
|
||||||
|
|
||||||
) as err:
|
) as err:
|
||||||
etype = type(err)
|
scope_err = err
|
||||||
|
|
||||||
# cancel ourselves on any error.
|
# XXX: request cancel of this context on any error.
|
||||||
|
# NOTE: `Context.cancel()` is conversely NOT called in
|
||||||
|
# the `ContextCancelled` "cancellation requested" case
|
||||||
|
# above.
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Context cancelled for task, sending cancel request..\n'
|
'Context cancelled for task due to\n'
|
||||||
|
f'{err}\n'
|
||||||
|
'Sending cancel request..\n'
|
||||||
f'task:{cid}\n'
|
f'task:{cid}\n'
|
||||||
f'actor:{uid}'
|
f'actor:{uid}'
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
|
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
except trio.BrokenResourceError:
|
except trio.BrokenResourceError:
|
||||||
log.warning(
|
log.warning(
|
||||||
|
@ -520,8 +583,9 @@ class Portal:
|
||||||
f'actor:{uid}'
|
f'actor:{uid}'
|
||||||
)
|
)
|
||||||
|
|
||||||
raise
|
raise # duh
|
||||||
|
|
||||||
|
# no scope error case
|
||||||
else:
|
else:
|
||||||
if ctx.chan.connected():
|
if ctx.chan.connected():
|
||||||
log.info(
|
log.info(
|
||||||
|
@ -529,10 +593,20 @@ class Portal:
|
||||||
f'task: {cid}\n'
|
f'task: {cid}\n'
|
||||||
f'actor: {uid}'
|
f'actor: {uid}'
|
||||||
)
|
)
|
||||||
|
# XXX NOTE XXX: the below call to
|
||||||
|
# `Context.result()` will ALWAYS raise
|
||||||
|
# a `ContextCancelled` (via an embedded call to
|
||||||
|
# `Context._maybe_raise_remote_err()`) IFF
|
||||||
|
# a `Context._remote_error` was set by the runtime
|
||||||
|
# via a call to
|
||||||
|
# `Context._maybe_cancel_and_set_remote_error()`
|
||||||
|
# which IS SET any time the far end fails and
|
||||||
|
# causes "caller side" cancellation via
|
||||||
|
# a `ContextCancelled` here.
|
||||||
result = await ctx.result()
|
result = await ctx.result()
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Context {fn_name} returned '
|
f'Context {fn_name} returned value from callee:\n'
|
||||||
f'value from callee `{result}`'
|
f'`{result}`'
|
||||||
)
|
)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
@ -540,22 +614,73 @@ class Portal:
|
||||||
# operating *in* this scope to have survived
|
# operating *in* this scope to have survived
|
||||||
# we tear down the runtime feeder chan last
|
# we tear down the runtime feeder chan last
|
||||||
# to avoid premature stream clobbers.
|
# to avoid premature stream clobbers.
|
||||||
if ctx._recv_chan is not None:
|
rxchan: trio.ReceiveChannel = ctx._recv_chan
|
||||||
# should we encapsulate this in the context api?
|
if (
|
||||||
await ctx._recv_chan.aclose()
|
rxchan
|
||||||
|
|
||||||
if etype:
|
# maybe TODO: yes i know the below check is
|
||||||
|
# touching `trio` memchan internals..BUT, there are
|
||||||
|
# only a couple ways to avoid a `trio.Cancelled`
|
||||||
|
# bubbling from the `.aclose()` call below:
|
||||||
|
#
|
||||||
|
# - catch and mask it via the cancel-scope-shielded call
|
||||||
|
# as we are rn (manual and frowned upon) OR,
|
||||||
|
# - specially handle the case where `scope_err` is
|
||||||
|
# one of {`BaseExceptionGroup`, `trio.Cancelled`}
|
||||||
|
# and then presume that the `.aclose()` call will
|
||||||
|
# raise a `trio.Cancelled` and just don't call it
|
||||||
|
# in those cases..
|
||||||
|
#
|
||||||
|
# that latter approach is more logic, LOC, and more
|
||||||
|
# convoluted so for now stick with the first
|
||||||
|
# psuedo-hack-workaround where we just try to avoid
|
||||||
|
# the shielded call as much as we can detect from
|
||||||
|
# the memchan's `._closed` state..
|
||||||
|
#
|
||||||
|
# XXX MOTIVATION XXX-> we generally want to raise
|
||||||
|
# any underlying actor-runtime/internals error that
|
||||||
|
# surfaces from a bug in tractor itself so it can
|
||||||
|
# be easily detected/fixed AND, we also want to
|
||||||
|
# minimize noisy runtime tracebacks (normally due
|
||||||
|
# to the cross-actor linked task scope machinery
|
||||||
|
# teardown) displayed to user-code and instead only
|
||||||
|
# displaying `ContextCancelled` traces where the
|
||||||
|
# cause of crash/exit IS due to something in
|
||||||
|
# user/app code on either end of the context.
|
||||||
|
and not rxchan._closed
|
||||||
|
):
|
||||||
|
# XXX NOTE XXX: and again as per above, we mask any
|
||||||
|
# `trio.Cancelled` raised here so as to NOT mask
|
||||||
|
# out any exception group or legit (remote) ctx
|
||||||
|
# error that sourced from the remote task or its
|
||||||
|
# runtime.
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await ctx._recv_chan.aclose()
|
||||||
|
|
||||||
|
# XXX: since we always (maybe) re-raise (and thus also
|
||||||
|
# mask runtime machinery related
|
||||||
|
# multi-`trio.Cancelled`s) any scope error which was
|
||||||
|
# the underlying cause of this context's exit, add
|
||||||
|
# different log msgs for each of the (2) cases.
|
||||||
|
if scope_err is not None:
|
||||||
|
etype: Type[BaseException] = type(scope_err)
|
||||||
|
|
||||||
|
# CASE 2
|
||||||
if ctx._cancel_called:
|
if ctx._cancel_called:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Context {fn_name} cancelled by caller with\n{etype}'
|
f'Context {fn_name} cancelled by caller with\n'
|
||||||
|
f'{etype}'
|
||||||
)
|
)
|
||||||
elif _err is not None:
|
|
||||||
|
# CASE 1
|
||||||
|
else:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Context for task cancelled by callee with {etype}\n'
|
f'Context cancelled by callee with {etype}\n'
|
||||||
f'target: `{fn_name}`\n'
|
f'target: `{fn_name}`\n'
|
||||||
f'task:{cid}\n'
|
f'task:{cid}\n'
|
||||||
f'actor:{uid}'
|
f'actor:{uid}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
||||||
# wait for any immediate child in debug before popping the
|
# wait for any immediate child in debug before popping the
|
||||||
# context from the runtime msg loop otherwise inside
|
# context from the runtime msg loop otherwise inside
|
||||||
|
@ -564,10 +689,9 @@ class Portal:
|
||||||
# a "stop" msg for a stream), this can result in a deadlock
|
# a "stop" msg for a stream), this can result in a deadlock
|
||||||
# where the root is waiting on the lock to clear but the
|
# where the root is waiting on the lock to clear but the
|
||||||
# child has already cleared it and clobbered IPC.
|
# child has already cleared it and clobbered IPC.
|
||||||
from ._debug import maybe_wait_for_debugger
|
|
||||||
await maybe_wait_for_debugger()
|
|
||||||
|
|
||||||
# remove the context from runtime tracking
|
# FINALLY, remove the context from runtime tracking and
|
||||||
|
# exit Bo
|
||||||
self.actor._contexts.pop(
|
self.actor._contexts.pop(
|
||||||
(self.channel.uid, ctx.cid),
|
(self.channel.uid, ctx.cid),
|
||||||
None,
|
None,
|
||||||
|
|
|
@ -347,14 +347,13 @@ async def _invoke(
|
||||||
and ctx._enter_debugger_on_cancel
|
and ctx._enter_debugger_on_cancel
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
# XXX: is there any case where we'll want to debug IPC
|
# XXX QUESTION XXX: is there any case where we'll
|
||||||
# disconnects as a default?
|
# want to debug IPC disconnects as a default?
|
||||||
#
|
# => I can't think of a reason that inspecting this
|
||||||
# I can't think of a reason that inspecting
|
# type of failure will be useful for respawns or
|
||||||
# this type of failure will be useful for respawns or
|
# recovery logic - the only case is some kind of
|
||||||
# recovery logic - the only case is some kind of strange bug
|
# strange bug in our transport layer itself? Going
|
||||||
# in our transport layer itself? Going to keep this
|
# to keep this open ended for now.
|
||||||
# open ended for now.
|
|
||||||
entered_debug = await _debug._maybe_enter_pm(err)
|
entered_debug = await _debug._maybe_enter_pm(err)
|
||||||
|
|
||||||
if not entered_debug:
|
if not entered_debug:
|
||||||
|
@ -448,17 +447,18 @@ class Actor:
|
||||||
(swappable) network protocols.
|
(swappable) network protocols.
|
||||||
|
|
||||||
|
|
||||||
Each "actor" is ``trio.run()`` scheduled "runtime" composed of many
|
Each "actor" is ``trio.run()`` scheduled "runtime" composed of
|
||||||
concurrent tasks in a single thread. The "runtime" tasks conduct
|
many concurrent tasks in a single thread. The "runtime" tasks
|
||||||
a slew of low(er) level functions to make it possible for message
|
conduct a slew of low(er) level functions to make it possible
|
||||||
passing between actors as well as the ability to create new actors
|
for message passing between actors as well as the ability to
|
||||||
(aka new "runtimes" in new processes which are supervised via
|
create new actors (aka new "runtimes" in new processes which
|
||||||
a nursery construct). Each task which sends messages to a task in
|
are supervised via a nursery construct). Each task which sends
|
||||||
a "peer" (not necessarily a parent-child, depth hierarchy)) is able
|
messages to a task in a "peer" (not necessarily a parent-child,
|
||||||
to do so via an "address", which maps IPC connections across memory
|
depth hierarchy) is able to do so via an "address", which maps
|
||||||
boundaries, and task request id which allows for per-actor
|
IPC connections across memory boundaries, and a task request id
|
||||||
tasks to send and receive messages to specific peer-actor tasks with
|
which allows for per-actor tasks to send and receive messages
|
||||||
which there is an ongoing RPC/IPC dialog.
|
to specific peer-actor tasks with which there is an ongoing
|
||||||
|
RPC/IPC dialog.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# ugh, we need to get rid of this and replace with a "registry" sys
|
# ugh, we need to get rid of this and replace with a "registry" sys
|
||||||
|
|
|
@ -199,6 +199,10 @@ async def do_hard_kill(
|
||||||
proc: trio.Process,
|
proc: trio.Process,
|
||||||
terminate_after: int = 3,
|
terminate_after: int = 3,
|
||||||
|
|
||||||
|
# NOTE: for mucking with `.pause()`-ing inside the runtime
|
||||||
|
# whilst also hacking on it XD
|
||||||
|
# terminate_after: int = 99999,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# NOTE: this timeout used to do nothing since we were shielding
|
# NOTE: this timeout used to do nothing since we were shielding
|
||||||
# the ``.wait()`` inside ``new_proc()`` which will pretty much
|
# the ``.wait()`` inside ``new_proc()`` which will pretty much
|
||||||
|
|
|
@ -54,6 +54,60 @@ log = get_logger(__name__)
|
||||||
# messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
|
# messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
|
||||||
# - use __slots__ on ``Context``?
|
# - use __slots__ on ``Context``?
|
||||||
|
|
||||||
|
def _raise_from_no_yield_msg(
|
||||||
|
stream: MsgStream,
|
||||||
|
msg: dict,
|
||||||
|
src_err: KeyError,
|
||||||
|
|
||||||
|
) -> bool:
|
||||||
|
'''
|
||||||
|
Raise an appopriate local error when a `MsgStream` msg arrives
|
||||||
|
which does not contain the expected (under normal operation)
|
||||||
|
`'yield'` field.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# internal error should never get here
|
||||||
|
assert msg.get('cid'), ("Received internal error at portal?")
|
||||||
|
|
||||||
|
# TODO: handle 2 cases with 3.10+ match syntax
|
||||||
|
# - 'stop'
|
||||||
|
# - 'error'
|
||||||
|
# possibly just handle msg['stop'] here!
|
||||||
|
|
||||||
|
if stream._closed:
|
||||||
|
raise trio.ClosedResourceError('This stream was closed')
|
||||||
|
|
||||||
|
if msg.get('stop') or stream._eoc:
|
||||||
|
log.debug(f"{stream} was stopped at remote end")
|
||||||
|
|
||||||
|
# XXX: important to set so that a new ``.receive()``
|
||||||
|
# call (likely by another task using a broadcast receiver)
|
||||||
|
# doesn't accidentally pull the ``return`` message
|
||||||
|
# value out of the underlying feed mem chan!
|
||||||
|
stream._eoc = True
|
||||||
|
|
||||||
|
# # when the send is closed we assume the stream has
|
||||||
|
# # terminated and signal this local iterator to stop
|
||||||
|
# await stream.aclose()
|
||||||
|
|
||||||
|
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
||||||
|
# raise a ``StopAsyncIteration`` **and** in our catch
|
||||||
|
# block below it will trigger ``.aclose()``.
|
||||||
|
raise trio.EndOfChannel from src_err
|
||||||
|
|
||||||
|
# TODO: test that shows stream raising an expected error!!!
|
||||||
|
elif msg.get('error'):
|
||||||
|
# raise the error message
|
||||||
|
raise unpack_error(msg, stream._ctx.chan)
|
||||||
|
|
||||||
|
# always re-raise the source error if no translation error
|
||||||
|
# case is activated above.
|
||||||
|
raise src_err
|
||||||
|
# raise RuntimeError(
|
||||||
|
# 'Unknown non-yield stream msg?\n'
|
||||||
|
# f'{msg}'
|
||||||
|
# )
|
||||||
|
|
||||||
|
|
||||||
class MsgStream(trio.abc.Channel):
|
class MsgStream(trio.abc.Channel):
|
||||||
'''
|
'''
|
||||||
|
@ -91,11 +145,20 @@ class MsgStream(trio.abc.Channel):
|
||||||
# delegate directly to underlying mem channel
|
# delegate directly to underlying mem channel
|
||||||
def receive_nowait(self):
|
def receive_nowait(self):
|
||||||
msg = self._rx_chan.receive_nowait()
|
msg = self._rx_chan.receive_nowait()
|
||||||
return msg['yield']
|
try:
|
||||||
|
return msg['yield']
|
||||||
|
except KeyError as kerr:
|
||||||
|
_raise_from_no_yield_msg(
|
||||||
|
stream=self,
|
||||||
|
msg=msg,
|
||||||
|
src_err=kerr,
|
||||||
|
)
|
||||||
|
|
||||||
async def receive(self):
|
async def receive(self):
|
||||||
'''Async receive a single msg from the IPC transport, the next
|
'''
|
||||||
in sequence for this stream.
|
Receive a single msg from the IPC transport, the next in
|
||||||
|
sequence sent by the far end task (possibly in order as
|
||||||
|
determined by the underlying protocol).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# see ``.aclose()`` for notes on the old behaviour prior to
|
# see ``.aclose()`` for notes on the old behaviour prior to
|
||||||
|
@ -110,43 +173,12 @@ class MsgStream(trio.abc.Channel):
|
||||||
msg = await self._rx_chan.receive()
|
msg = await self._rx_chan.receive()
|
||||||
return msg['yield']
|
return msg['yield']
|
||||||
|
|
||||||
except KeyError as err:
|
except KeyError as kerr:
|
||||||
# internal error should never get here
|
_raise_from_no_yield_msg(
|
||||||
assert msg.get('cid'), ("Received internal error at portal?")
|
stream=self,
|
||||||
|
msg=msg,
|
||||||
# TODO: handle 2 cases with 3.10 match syntax
|
src_err=kerr,
|
||||||
# - 'stop'
|
)
|
||||||
# - 'error'
|
|
||||||
# possibly just handle msg['stop'] here!
|
|
||||||
|
|
||||||
if self._closed:
|
|
||||||
raise trio.ClosedResourceError('This stream was closed')
|
|
||||||
|
|
||||||
if msg.get('stop') or self._eoc:
|
|
||||||
log.debug(f"{self} was stopped at remote end")
|
|
||||||
|
|
||||||
# XXX: important to set so that a new ``.receive()``
|
|
||||||
# call (likely by another task using a broadcast receiver)
|
|
||||||
# doesn't accidentally pull the ``return`` message
|
|
||||||
# value out of the underlying feed mem chan!
|
|
||||||
self._eoc = True
|
|
||||||
|
|
||||||
# # when the send is closed we assume the stream has
|
|
||||||
# # terminated and signal this local iterator to stop
|
|
||||||
# await self.aclose()
|
|
||||||
|
|
||||||
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
|
||||||
# raise a ``StopAsyncIteration`` **and** in our catch
|
|
||||||
# block below it will trigger ``.aclose()``.
|
|
||||||
raise trio.EndOfChannel from err
|
|
||||||
|
|
||||||
# TODO: test that shows stream raising an expected error!!!
|
|
||||||
elif msg.get('error'):
|
|
||||||
# raise the error message
|
|
||||||
raise unpack_error(msg, self._ctx.chan)
|
|
||||||
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
|
|
||||||
except (
|
except (
|
||||||
trio.ClosedResourceError, # by self._rx_chan
|
trio.ClosedResourceError, # by self._rx_chan
|
||||||
|
|
|
@ -193,15 +193,39 @@ def get_logger(
|
||||||
'''
|
'''
|
||||||
log = rlog = logging.getLogger(_root_name)
|
log = rlog = logging.getLogger(_root_name)
|
||||||
|
|
||||||
if name and name != _proj_name:
|
if (
|
||||||
|
name
|
||||||
|
and name != _proj_name
|
||||||
|
):
|
||||||
|
|
||||||
# handling for modules that use ``get_logger(__name__)`` to
|
# NOTE: for handling for modules that use ``get_logger(__name__)``
|
||||||
# avoid duplicate project-package token in msg output
|
# we make the following stylistic choice:
|
||||||
rname, _, tail = name.partition('.')
|
# - always avoid duplicate project-package token
|
||||||
if rname == _root_name:
|
# in msg output: i.e. tractor.tractor _ipc.py in header
|
||||||
name = tail
|
# looks ridiculous XD
|
||||||
|
# - never show the leaf module name in the {name} part
|
||||||
|
# since in python the {filename} is always this same
|
||||||
|
# module-file.
|
||||||
|
|
||||||
|
sub_name: None | str = None
|
||||||
|
rname, _, sub_name = name.partition('.')
|
||||||
|
pkgpath, _, modfilename = sub_name.rpartition('.')
|
||||||
|
|
||||||
|
# NOTE: for tractor itself never include the last level
|
||||||
|
# module key in the name such that something like: eg.
|
||||||
|
# 'tractor.trionics._broadcast` only includes the first
|
||||||
|
# 2 tokens in the (coloured) name part.
|
||||||
|
if rname == 'tractor':
|
||||||
|
sub_name = pkgpath
|
||||||
|
|
||||||
|
if _root_name in sub_name:
|
||||||
|
duplicate, _, sub_name = sub_name.partition('.')
|
||||||
|
|
||||||
|
if not sub_name:
|
||||||
|
log = rlog
|
||||||
|
else:
|
||||||
|
log = rlog.getChild(sub_name)
|
||||||
|
|
||||||
log = rlog.getChild(name)
|
|
||||||
log.level = rlog.level
|
log.level = rlog.level
|
||||||
|
|
||||||
# add our actor-task aware adapter which will dynamically look up
|
# add our actor-task aware adapter which will dynamically look up
|
||||||
|
@ -254,3 +278,7 @@ def get_console_log(
|
||||||
|
|
||||||
def get_loglevel() -> str:
|
def get_loglevel() -> str:
|
||||||
return _default_loglevel
|
return _default_loglevel
|
||||||
|
|
||||||
|
|
||||||
|
# global module logger for tractor itself
|
||||||
|
log = get_logger('tractor')
|
||||||
|
|
|
@ -43,38 +43,62 @@ Built-in messaging patterns, types, APIs and helpers.
|
||||||
# - https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type
|
# - https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
from inspect import isfunction
|
||||||
from pkgutil import resolve_name
|
from pkgutil import resolve_name
|
||||||
|
|
||||||
|
|
||||||
class NamespacePath(str):
|
class NamespacePath(str):
|
||||||
'''
|
'''
|
||||||
A serializeable description of a (function) Python object location
|
A serializeable description of a (function) Python object
|
||||||
described by the target's module path and namespace key meant as
|
location described by the target's module path and namespace
|
||||||
a message-native "packet" to allows actors to point-and-load objects
|
key meant as a message-native "packet" to allows actors to
|
||||||
by absolute reference.
|
point-and-load objects by an absolute ``str`` (and thus
|
||||||
|
serializable) reference.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
_ref: object = None
|
_ref: object | type | None = None
|
||||||
|
|
||||||
def load_ref(self) -> object:
|
def load_ref(self) -> object | type:
|
||||||
if self._ref is None:
|
if self._ref is None:
|
||||||
self._ref = resolve_name(self)
|
self._ref = resolve_name(self)
|
||||||
return self._ref
|
return self._ref
|
||||||
|
|
||||||
def to_tuple(
|
@staticmethod
|
||||||
self,
|
def _mk_fqnp(ref: type | object) -> tuple[str, str]:
|
||||||
|
'''
|
||||||
|
Generate a minial ``str`` pair which describes a python
|
||||||
|
object's namespace path and object/type name.
|
||||||
|
|
||||||
) -> tuple[str, str]:
|
In more precise terms something like:
|
||||||
ref = self.load_ref()
|
- 'py.namespace.path:object_name',
|
||||||
return ref.__module__, getattr(ref, '__name__', '')
|
- eg.'tractor.msg:NamespacePath' will be the ``str`` form
|
||||||
|
of THIS type XD
|
||||||
|
|
||||||
|
'''
|
||||||
|
if (
|
||||||
|
isinstance(ref, object)
|
||||||
|
and not isfunction(ref)
|
||||||
|
):
|
||||||
|
name: str = type(ref).__name__
|
||||||
|
else:
|
||||||
|
name: str = getattr(ref, '__name__')
|
||||||
|
|
||||||
|
# fully qualified namespace path, tuple.
|
||||||
|
fqnp: tuple[str, str] = (
|
||||||
|
ref.__module__,
|
||||||
|
name,
|
||||||
|
)
|
||||||
|
return fqnp
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_ref(
|
def from_ref(
|
||||||
cls,
|
cls,
|
||||||
ref,
|
ref: type | object,
|
||||||
|
|
||||||
) -> NamespacePath:
|
) -> NamespacePath:
|
||||||
return cls(':'.join(
|
|
||||||
(ref.__module__,
|
fqnp: tuple[str, str] = cls._mk_fqnp(ref)
|
||||||
getattr(ref, '__name__', ''))
|
return cls(':'.join(fqnp))
|
||||||
))
|
|
||||||
|
def to_tuple(self) -> tuple[str, str]:
|
||||||
|
return self._mk_fqnp(self.load_ref())
|
||||||
|
|
|
@ -70,6 +70,7 @@ async def _enter_and_wait(
|
||||||
unwrapped: dict[int, T],
|
unwrapped: dict[int, T],
|
||||||
all_entered: trio.Event,
|
all_entered: trio.Event,
|
||||||
parent_exit: trio.Event,
|
parent_exit: trio.Event,
|
||||||
|
seed: int,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -80,7 +81,10 @@ async def _enter_and_wait(
|
||||||
async with mngr as value:
|
async with mngr as value:
|
||||||
unwrapped[id(mngr)] = value
|
unwrapped[id(mngr)] = value
|
||||||
|
|
||||||
if all(unwrapped.values()):
|
if all(
|
||||||
|
val != seed
|
||||||
|
for val in unwrapped.values()
|
||||||
|
):
|
||||||
all_entered.set()
|
all_entered.set()
|
||||||
|
|
||||||
await parent_exit.wait()
|
await parent_exit.wait()
|
||||||
|
@ -91,7 +95,13 @@ async def gather_contexts(
|
||||||
|
|
||||||
mngrs: Sequence[AsyncContextManager[T]],
|
mngrs: Sequence[AsyncContextManager[T]],
|
||||||
|
|
||||||
) -> AsyncGenerator[tuple[Optional[T], ...], None]:
|
) -> AsyncGenerator[
|
||||||
|
tuple[
|
||||||
|
T | None,
|
||||||
|
...
|
||||||
|
],
|
||||||
|
None,
|
||||||
|
]:
|
||||||
'''
|
'''
|
||||||
Concurrently enter a sequence of async context managers, each in
|
Concurrently enter a sequence of async context managers, each in
|
||||||
a separate ``trio`` task and deliver the unwrapped values in the
|
a separate ``trio`` task and deliver the unwrapped values in the
|
||||||
|
@ -104,7 +114,11 @@ async def gather_contexts(
|
||||||
entered and exited, and cancellation just works.
|
entered and exited, and cancellation just works.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
unwrapped: dict[int, Optional[T]] = {}.fromkeys(id(mngr) for mngr in mngrs)
|
seed: int = id(mngrs)
|
||||||
|
unwrapped: dict[int, T | None] = {}.fromkeys(
|
||||||
|
(id(mngr) for mngr in mngrs),
|
||||||
|
seed,
|
||||||
|
)
|
||||||
|
|
||||||
all_entered = trio.Event()
|
all_entered = trio.Event()
|
||||||
parent_exit = trio.Event()
|
parent_exit = trio.Event()
|
||||||
|
@ -116,8 +130,9 @@ async def gather_contexts(
|
||||||
|
|
||||||
if not mngrs:
|
if not mngrs:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
'input mngrs is empty?\n'
|
'`.trionics.gather_contexts()` input mngrs is empty?\n'
|
||||||
'Did try to use inline generator syntax?'
|
'Did try to use inline generator syntax?\n'
|
||||||
|
'Use a non-lazy iterator or sequence type intead!'
|
||||||
)
|
)
|
||||||
|
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as n:
|
||||||
|
@ -128,6 +143,7 @@ async def gather_contexts(
|
||||||
unwrapped,
|
unwrapped,
|
||||||
all_entered,
|
all_entered,
|
||||||
parent_exit,
|
parent_exit,
|
||||||
|
seed,
|
||||||
)
|
)
|
||||||
|
|
||||||
# deliver control once all managers have started up
|
# deliver control once all managers have started up
|
||||||
|
|
Loading…
Reference in New Issue