Compare commits
18 Commits
7507e269ec
...
fdf0c43bfa
Author | SHA1 | Date |
---|---|---|
|
fdf0c43bfa | |
|
f895c96600 | |
|
ca1a1476bb | |
|
a7c36a9cbe | |
|
22e4b324b1 | |
|
89ed8b67ff | |
|
11bbf15817 | |
|
a18663213a | |
|
d4d09b6071 | |
|
6d10f0c516 | |
|
fa9b57bae0 | |
|
81776a6238 | |
|
144d1f4d94 | |
|
51fdf3524c | |
|
cff69d07fe | |
|
ee94d6d62c | |
|
89b84ed6c0 | |
|
f33f689f34 |
|
@ -65,21 +65,28 @@ async def aggregate(seed):
|
|||
print("AGGREGATOR COMPLETE!")
|
||||
|
||||
|
||||
# this is the main actor and *arbiter*
|
||||
async def main():
|
||||
# a nursery which spawns "actors"
|
||||
async with tractor.open_nursery(
|
||||
arbiter_addr=('127.0.0.1', 1616)
|
||||
) as nursery:
|
||||
async def main() -> list[int]:
|
||||
'''
|
||||
This is the "root" actor's main task's entrypoint.
|
||||
|
||||
By default (and if not otherwise specified) that root process
|
||||
also acts as a "registry actor" / "registrar" on the localhost
|
||||
for the purposes of multi-actor "service discovery".
|
||||
|
||||
'''
|
||||
# yes, a nursery which spawns `trio`-"actors" B)
|
||||
nursery: tractor.ActorNursery
|
||||
async with tractor.open_nursery() as nursery:
|
||||
|
||||
seed = int(1e3)
|
||||
pre_start = time.time()
|
||||
|
||||
portal = await nursery.start_actor(
|
||||
portal: tractor.Portal = await nursery.start_actor(
|
||||
name='aggregator',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
stream: tractor.MsgStream
|
||||
async with portal.open_stream_from(
|
||||
aggregate,
|
||||
seed=seed,
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
'''
|
||||
``async with ():`` inlined context-stream cancellation testing.
|
||||
|
||||
Verify the we raise errors when streams are opened prior to sync-opening
|
||||
a ``tractor.Context`` beforehand.
|
||||
Verify the we raise errors when streams are opened prior to
|
||||
sync-opening a ``tractor.Context`` beforehand.
|
||||
|
||||
'''
|
||||
from contextlib import asynccontextmanager as acm
|
||||
|
@ -922,93 +922,3 @@ def test_maybe_allow_overruns_stream(
|
|||
# if this hits the logic blocks from above are not
|
||||
# exhaustive..
|
||||
pytest.fail('PARAMETRIZED CASE GEN PROBLEM YO')
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def sleep_forever(
|
||||
ctx: tractor.Context,
|
||||
) -> None:
|
||||
await ctx.started()
|
||||
async with ctx.open_stream():
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
@acm
|
||||
async def attach_to_sleep_forever():
|
||||
'''
|
||||
Cancel a context **before** any underlying error is raised in order
|
||||
to trigger a local reception of a ``ContextCancelled`` which **should not**
|
||||
be re-raised in the local surrounding ``Context`` *iff* the cancel was
|
||||
requested by **this** side of the context.
|
||||
|
||||
'''
|
||||
async with tractor.wait_for_actor('sleeper') as p2:
|
||||
async with (
|
||||
p2.open_context(sleep_forever) as (peer_ctx, first),
|
||||
peer_ctx.open_stream(),
|
||||
):
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
# XXX: previously this would trigger local
|
||||
# ``ContextCancelled`` to be received and raised in the
|
||||
# local context overriding any local error due to logic
|
||||
# inside ``_invoke()`` which checked for an error set on
|
||||
# ``Context._error`` and raised it in a cancellation
|
||||
# scenario.
|
||||
# ------
|
||||
# The problem is you can have a remote cancellation that
|
||||
# is part of a local error and we shouldn't raise
|
||||
# ``ContextCancelled`` **iff** we **were not** the side
|
||||
# of the context to initiate it, i.e.
|
||||
# ``Context._cancel_called`` should **NOT** have been
|
||||
# set. The special logic to handle this case is now
|
||||
# inside ``Context._maybe_raise_from_remote_msg()`` XD
|
||||
await peer_ctx.cancel()
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def error_before_started(
|
||||
ctx: tractor.Context,
|
||||
) -> None:
|
||||
'''
|
||||
This simulates exactly an original bug discovered in:
|
||||
https://github.com/pikers/piker/issues/244
|
||||
|
||||
'''
|
||||
async with attach_to_sleep_forever():
|
||||
# send an unserializable type which should raise a type error
|
||||
# here and **NOT BE SWALLOWED** by the surrounding acm!!?!
|
||||
await ctx.started(object())
|
||||
|
||||
|
||||
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
|
||||
'''
|
||||
Verify that an error raised in a remote context which itself opens
|
||||
another remote context, which it cancels, does not ovverride the
|
||||
original error that caused the cancellation of the secondardy
|
||||
context.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.start_actor(
|
||||
'errorer',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
await n.start_actor(
|
||||
'sleeper',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
async with (
|
||||
portal.open_context(
|
||||
error_before_started
|
||||
) as (ctx, sent),
|
||||
):
|
||||
await trio.sleep_forever()
|
||||
|
||||
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
assert excinfo.value.type == TypeError
|
||||
|
|
|
@ -0,0 +1,451 @@
|
|||
'''
|
||||
Codify the cancellation request semantics in terms
|
||||
of one remote actor cancelling another.
|
||||
|
||||
'''
|
||||
# from contextlib import asynccontextmanager as acm
|
||||
import itertools
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
import tractor
|
||||
from tractor import ( # typing
|
||||
Portal,
|
||||
Context,
|
||||
ContextCancelled,
|
||||
)
|
||||
|
||||
|
||||
# def test_self_cancel():
|
||||
# '''
|
||||
# 2 cases:
|
||||
# - calls `Actor.cancel()` locally in some task
|
||||
# - calls LocalPortal.cancel_actor()` ?
|
||||
|
||||
# '''
|
||||
# ...
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def sleep_forever(
|
||||
ctx: Context,
|
||||
) -> None:
|
||||
'''
|
||||
Sync the context, open a stream then just sleep.
|
||||
|
||||
'''
|
||||
await ctx.started()
|
||||
async with ctx.open_stream():
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def error_before_started(
|
||||
ctx: Context,
|
||||
) -> None:
|
||||
'''
|
||||
This simulates exactly an original bug discovered in:
|
||||
https://github.com/pikers/piker/issues/244
|
||||
|
||||
Cancel a context **before** any underlying error is raised so
|
||||
as to trigger a local reception of a ``ContextCancelled`` which
|
||||
SHOULD NOT be re-raised in the local surrounding ``Context``
|
||||
*iff* the cancel was requested by **this** (callee) side of
|
||||
the context.
|
||||
|
||||
'''
|
||||
async with tractor.wait_for_actor('sleeper') as p2:
|
||||
async with (
|
||||
p2.open_context(sleep_forever) as (peer_ctx, first),
|
||||
peer_ctx.open_stream(),
|
||||
):
|
||||
# NOTE: this WAS inside an @acm body but i factored it
|
||||
# out and just put it inline here since i don't think
|
||||
# the mngr part really matters, though maybe it could?
|
||||
try:
|
||||
# XXX NOTE XXX: THIS sends an UNSERIALIZABLE TYPE which
|
||||
# should raise a `TypeError` and **NOT BE SWALLOWED** by
|
||||
# the surrounding try/finally (normally inside the
|
||||
# body of some acm)..
|
||||
await ctx.started(object())
|
||||
# yield
|
||||
finally:
|
||||
# XXX: previously this would trigger local
|
||||
# ``ContextCancelled`` to be received and raised in the
|
||||
# local context overriding any local error due to logic
|
||||
# inside ``_invoke()`` which checked for an error set on
|
||||
# ``Context._error`` and raised it in a cancellation
|
||||
# scenario.
|
||||
# ------
|
||||
# The problem is you can have a remote cancellation that
|
||||
# is part of a local error and we shouldn't raise
|
||||
# ``ContextCancelled`` **iff** we **were not** the side
|
||||
# of the context to initiate it, i.e.
|
||||
# ``Context._cancel_called`` should **NOT** have been
|
||||
# set. The special logic to handle this case is now
|
||||
# inside ``Context._maybe_raise_from_remote_msg()`` XD
|
||||
await peer_ctx.cancel()
|
||||
|
||||
|
||||
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
|
||||
'''
|
||||
Verify that an error raised in a remote context which itself
|
||||
opens YET ANOTHER remote context, which it then cancels, does not
|
||||
override the original error that caused the cancellation of the
|
||||
secondary context.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery() as n:
|
||||
portal = await n.start_actor(
|
||||
'errorer',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
await n.start_actor(
|
||||
'sleeper',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
async with (
|
||||
portal.open_context(
|
||||
error_before_started
|
||||
) as (ctx, sent),
|
||||
):
|
||||
await trio.sleep_forever()
|
||||
|
||||
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
assert excinfo.value.type == TypeError
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def sleep_a_bit_then_cancel_peer(
|
||||
ctx: Context,
|
||||
peer_name: str = 'sleeper',
|
||||
cancel_after: float = .5,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Connect to peer, sleep as per input delay, cancel the peer.
|
||||
|
||||
'''
|
||||
peer: Portal
|
||||
async with tractor.wait_for_actor(peer_name) as peer:
|
||||
await ctx.started()
|
||||
await trio.sleep(cancel_after)
|
||||
await peer.cancel_actor()
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def stream_ints(
|
||||
ctx: Context,
|
||||
):
|
||||
await ctx.started()
|
||||
async with ctx.open_stream() as stream:
|
||||
for i in itertools.count():
|
||||
await stream.send(i)
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def stream_from_peer(
|
||||
ctx: Context,
|
||||
peer_name: str = 'sleeper',
|
||||
) -> None:
|
||||
|
||||
peer: Portal
|
||||
try:
|
||||
async with (
|
||||
tractor.wait_for_actor(peer_name) as peer,
|
||||
peer.open_context(stream_ints) as (peer_ctx, first),
|
||||
peer_ctx.open_stream() as stream,
|
||||
):
|
||||
await ctx.started()
|
||||
# XXX TODO: big set of questions for this
|
||||
# - should we raise `ContextCancelled` or `Cancelled` (rn
|
||||
# it does that) here?!
|
||||
# - test the `ContextCancelled` OUTSIDE the
|
||||
# `.open_context()` call?
|
||||
try:
|
||||
async for msg in stream:
|
||||
print(msg)
|
||||
|
||||
except trio.Cancelled:
|
||||
assert not ctx.cancel_called
|
||||
assert not ctx.cancelled_caught
|
||||
|
||||
assert not peer_ctx.cancel_called
|
||||
assert not peer_ctx.cancelled_caught
|
||||
|
||||
assert 'root' in ctx.cancel_called_remote
|
||||
|
||||
raise # XXX MUST NEVER MASK IT!!
|
||||
|
||||
with trio.CancelScope(shield=True):
|
||||
await tractor.pause()
|
||||
# pass
|
||||
# pytest.fail(
|
||||
raise RuntimeError(
|
||||
'peer never triggered local `[Context]Cancelled`?!?'
|
||||
)
|
||||
|
||||
# NOTE: cancellation of the (sleeper) peer should always
|
||||
# cause a `ContextCancelled` raise in this streaming
|
||||
# actor.
|
||||
except ContextCancelled as ctxerr:
|
||||
assert ctxerr.canceller == 'canceller'
|
||||
assert ctxerr._remote_error is ctxerr
|
||||
|
||||
# CASE 1: we were cancelled by our parent, the root actor.
|
||||
# TODO: there are other cases depending on how the root
|
||||
# actor and it's caller side task are written:
|
||||
# - if the root does not req us to cancel then an
|
||||
# IPC-transport related error should bubble from the async
|
||||
# for loop and thus cause local cancellation both here
|
||||
# and in the root (since in that case this task cancels the
|
||||
# context with the root, not the other way around)
|
||||
assert ctx.cancel_called_remote[0] == 'root'
|
||||
raise
|
||||
|
||||
# except BaseException as err:
|
||||
|
||||
# raise
|
||||
|
||||
# cases:
|
||||
# - some arbitrary remote peer cancels via Portal.cancel_actor().
|
||||
# => all other connected peers should get that cancel requesting peer's
|
||||
# uid in the ctx-cancelled error msg.
|
||||
|
||||
# - peer spawned a sub-actor which (also) spawned a failing task
|
||||
# which was unhandled and propagated up to the immediate
|
||||
# parent, the peer to the actor that also spawned a remote task
|
||||
# task in that same peer-parent.
|
||||
|
||||
# - peer cancelled itself - so other peers should
|
||||
# get errors reflecting that the peer was itself the .canceller?
|
||||
|
||||
# - WE cancelled the peer and thus should not see any raised
|
||||
# `ContextCancelled` as it should be reaped silently?
|
||||
# => pretty sure `test_context_stream_semantics::test_caller_cancels()`
|
||||
# already covers this case?
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'error_during_ctxerr_handling',
|
||||
[False, True],
|
||||
)
|
||||
def test_peer_canceller(
|
||||
error_during_ctxerr_handling: bool,
|
||||
):
|
||||
'''
|
||||
Verify that a cancellation triggered by an in-actor-tree peer
|
||||
results in a cancelled errors with all other actors which have
|
||||
opened contexts to that same actor.
|
||||
|
||||
legend:
|
||||
name>
|
||||
a "play button" that indicates a new runtime instance,
|
||||
an individual actor with `name`.
|
||||
|
||||
.subname>
|
||||
a subactor who's parent should be on some previous
|
||||
line and be less indented.
|
||||
|
||||
.actor0> ()-> .actor1>
|
||||
a inter-actor task context opened (by `async with `Portal.open_context()`)
|
||||
from actor0 *into* actor1.
|
||||
|
||||
.actor0> ()<=> .actor1>
|
||||
a inter-actor task context opened (as above)
|
||||
from actor0 *into* actor1 which INCLUDES an additional
|
||||
stream open using `async with Context.open_stream()`.
|
||||
|
||||
|
||||
------ - ------
|
||||
supervision view
|
||||
------ - ------
|
||||
root>
|
||||
.sleeper> TODO: SOME SYNTAX SHOWING JUST SLEEPING
|
||||
.just_caller> ()=> .sleeper>
|
||||
.canceller> ()-> .sleeper>
|
||||
TODO: how define calling `Portal.cancel_actor()`
|
||||
|
||||
In this case a `ContextCancelled` with `.errorer` set to the
|
||||
requesting actor, in this case 'canceller', should be relayed
|
||||
to all other actors who have also opened a (remote task)
|
||||
context with that now cancelled actor.
|
||||
|
||||
------ - ------
|
||||
task view
|
||||
------ - ------
|
||||
So there are 5 context open in total with 3 from the root to
|
||||
its children and 2 from children to their peers:
|
||||
1. root> ()-> .sleeper>
|
||||
2. root> ()-> .streamer>
|
||||
3. root> ()-> .canceller>
|
||||
|
||||
4. .streamer> ()<=> .sleep>
|
||||
5. .canceller> ()-> .sleeper>
|
||||
- calls `Portal.cancel_actor()`
|
||||
|
||||
|
||||
'''
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery() as an:
|
||||
canceller: Portal = await an.start_actor(
|
||||
'canceller',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
sleeper: Portal = await an.start_actor(
|
||||
'sleeper',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
just_caller: Portal = await an.start_actor(
|
||||
'just_caller', # but i just met her?
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
try:
|
||||
async with (
|
||||
sleeper.open_context(
|
||||
sleep_forever,
|
||||
) as (sleeper_ctx, sent),
|
||||
|
||||
just_caller.open_context(
|
||||
stream_from_peer,
|
||||
) as (caller_ctx, sent),
|
||||
|
||||
canceller.open_context(
|
||||
sleep_a_bit_then_cancel_peer,
|
||||
) as (canceller_ctx, sent),
|
||||
|
||||
):
|
||||
ctxs: list[Context] = [
|
||||
sleeper_ctx,
|
||||
caller_ctx,
|
||||
canceller_ctx,
|
||||
]
|
||||
|
||||
try:
|
||||
print('PRE CONTEXT RESULT')
|
||||
await sleeper_ctx.result()
|
||||
|
||||
# should never get here
|
||||
pytest.fail(
|
||||
'Context.result() did not raise ctx-cancelled?'
|
||||
)
|
||||
|
||||
# TODO: not sure why this isn't catching
|
||||
# but maybe we need an `ExceptionGroup` and
|
||||
# the whole except *errs: thinger in 3.11?
|
||||
except ContextCancelled as ctxerr:
|
||||
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}')
|
||||
|
||||
# canceller and caller peers should not
|
||||
# have been remotely cancelled.
|
||||
assert canceller_ctx.cancel_called_remote is None
|
||||
assert caller_ctx.cancel_called_remote is None
|
||||
|
||||
assert ctxerr.canceller[0] == 'canceller'
|
||||
|
||||
# XXX NOTE XXX: since THIS `ContextCancelled`
|
||||
# HAS NOT YET bubbled up to the
|
||||
# `sleeper.open_context().__aexit__()` this
|
||||
# value is not yet set, however outside this
|
||||
# block it should be.
|
||||
assert not sleeper_ctx.cancelled_caught
|
||||
|
||||
# TODO: a test which ensures this error is
|
||||
# bubbled and caught (NOT MASKED) by the
|
||||
# runtime!!!
|
||||
if error_during_ctxerr_handling:
|
||||
raise RuntimeError('Simulated error during teardown')
|
||||
|
||||
raise
|
||||
|
||||
# SHOULD NEVER GET HERE!
|
||||
except BaseException:
|
||||
pytest.fail('did not rx ctx-cancelled error?')
|
||||
else:
|
||||
pytest.fail('did not rx ctx-cancelled error?')
|
||||
|
||||
except (
|
||||
ContextCancelled,
|
||||
RuntimeError,
|
||||
)as ctxerr:
|
||||
_err = ctxerr
|
||||
|
||||
if error_during_ctxerr_handling:
|
||||
assert isinstance(ctxerr, RuntimeError)
|
||||
|
||||
# NOTE: this root actor task should have
|
||||
# called `Context.cancel()` on the
|
||||
# `.__aexit__()` to every opened ctx.
|
||||
for ctx in ctxs:
|
||||
assert ctx.cancel_called
|
||||
|
||||
# each context should have received
|
||||
# a silently absorbed context cancellation
|
||||
# from its peer actor's task.
|
||||
assert ctx.chan.uid == ctx.cancel_called_remote
|
||||
|
||||
# this root actor task should have
|
||||
# cancelled all opened contexts except
|
||||
# the sleeper which is cancelled by its
|
||||
# peer "canceller"
|
||||
if ctx is not sleeper_ctx:
|
||||
assert ctx._remote_error.canceller[0] == 'root'
|
||||
|
||||
else:
|
||||
assert ctxerr.canceller[0] == 'canceller'
|
||||
|
||||
# the sleeper's remote error is the error bubbled
|
||||
# out of the context-stack above!
|
||||
re = sleeper_ctx._remote_error
|
||||
assert re is ctxerr
|
||||
|
||||
for ctx in ctxs:
|
||||
|
||||
if ctx is sleeper_ctx:
|
||||
assert not ctx.cancel_called
|
||||
assert ctx.cancelled_caught
|
||||
else:
|
||||
assert ctx.cancel_called
|
||||
assert not ctx.cancelled_caught
|
||||
|
||||
# each context should have received
|
||||
# a silently absorbed context cancellation
|
||||
# from its peer actor's task.
|
||||
assert ctx.chan.uid == ctx.cancel_called_remote
|
||||
|
||||
# NOTE: when an inter-peer cancellation
|
||||
# occurred, we DO NOT expect this
|
||||
# root-actor-task to have requested a cancel of
|
||||
# the context since cancellation was caused by
|
||||
# the "canceller" peer and thus
|
||||
# `Context.cancel()` SHOULD NOT have been
|
||||
# called inside
|
||||
# `Portal.open_context().__aexit__()`.
|
||||
assert not sleeper_ctx.cancel_called
|
||||
|
||||
# XXX NOTE XXX: and see matching comment above but,
|
||||
# this flag is set only AFTER the `.open_context()`
|
||||
# has exited and should be set in both outcomes
|
||||
# including the case where ctx-cancel handling
|
||||
# itself errors.
|
||||
assert sleeper_ctx.cancelled_caught
|
||||
assert sleeper_ctx.cancel_called_remote[0] == 'sleeper'
|
||||
|
||||
# await tractor.pause()
|
||||
raise # always to ensure teardown
|
||||
|
||||
if error_during_ctxerr_handling:
|
||||
with pytest.raises(RuntimeError) as excinfo:
|
||||
trio.run(main)
|
||||
else:
|
||||
|
||||
with pytest.raises(ContextCancelled) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
assert excinfo.value.type == ContextCancelled
|
||||
assert excinfo.value.canceller[0] == 'canceller'
|
|
@ -23,8 +23,8 @@ from exceptiongroup import BaseExceptionGroup
|
|||
from ._clustering import open_actor_cluster
|
||||
from ._ipc import Channel
|
||||
from ._context import (
|
||||
Context,
|
||||
context,
|
||||
Context, # the type
|
||||
context, # a func-decorator
|
||||
)
|
||||
from ._streaming import (
|
||||
MsgStream,
|
||||
|
|
|
@ -86,26 +86,51 @@ class Context:
|
|||
|
||||
'''
|
||||
chan: Channel
|
||||
cid: str
|
||||
cid: str # "context id", more or less a unique linked-task-pair id
|
||||
|
||||
# these are the "feeder" channels for delivering
|
||||
# message values to the local task from the runtime
|
||||
# msg processing loop.
|
||||
# the "feeder" channels for delivering message values to the
|
||||
# local task from the runtime's msg processing loop.
|
||||
_recv_chan: trio.MemoryReceiveChannel
|
||||
_send_chan: trio.MemorySendChannel
|
||||
|
||||
# the "invocation type" of the far end task-entry-point
|
||||
# function, normally matching a logic block inside
|
||||
# `._runtime.invoke()`.
|
||||
_remote_func_type: str | None = None
|
||||
|
||||
# only set on the caller side
|
||||
_portal: Portal | None = None # type: ignore # noqa
|
||||
# NOTE: (for now) only set (a portal) on the caller side since
|
||||
# the callee doesn't generally need a ref to one and should
|
||||
# normally need to explicitly ask for handle to its peer if
|
||||
# more the the `Context` is needed?
|
||||
_portal: Portal | None = None
|
||||
|
||||
# NOTE: each side of the context has its own cancel scope
|
||||
# which is exactly the primitive that allows for
|
||||
# cross-actor-task-supervision and thus SC.
|
||||
_scope: trio.CancelScope | None = None
|
||||
_result: Any | int = None
|
||||
_remote_error: BaseException | None = None
|
||||
|
||||
# cancellation state
|
||||
_cancel_called: bool = False
|
||||
_cancelled_remote: tuple | None = None
|
||||
_cancel_msg: str | None = None
|
||||
_scope: trio.CancelScope | None = None
|
||||
_cancel_called: bool = False # did WE cancel the far end?
|
||||
_cancelled_remote: tuple[str, str] | None = None
|
||||
|
||||
# NOTE: we try to ensure assignment of a "cancel msg" since
|
||||
# there's always going to be an "underlying reason" that any
|
||||
# context was closed due to either a remote side error or
|
||||
# a call to `.cancel()` which triggers `ContextCancelled`.
|
||||
_cancel_msg: str | dict | None = None
|
||||
|
||||
# NOTE: this state var used by the runtime to determine if the
|
||||
# `pdbp` REPL is allowed to engage on contexts terminated via
|
||||
# a `ContextCancelled` due to a call to `.cancel()` triggering
|
||||
# "graceful closure" on either side:
|
||||
# - `._runtime._invoke()` will check this flag before engaging
|
||||
# the crash handler REPL in such cases where the "callee"
|
||||
# raises the cancellation,
|
||||
# - `.devx._debug.lock_tty_for_child()` will set it to `False` if
|
||||
# the global tty-lock has been configured to filter out some
|
||||
# actors from being able to acquire the debugger lock.
|
||||
_enter_debugger_on_cancel: bool = True
|
||||
|
||||
@property
|
||||
|
@ -173,41 +198,76 @@ class Context:
|
|||
|
||||
async def _maybe_cancel_and_set_remote_error(
|
||||
self,
|
||||
error_msg: dict[str, Any],
|
||||
error: BaseException,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
(Maybe) unpack and raise a msg error into the local scope
|
||||
nursery for this context.
|
||||
(Maybe) cancel this local scope due to a received remote
|
||||
error (normally via an IPC msg) which the actor runtime
|
||||
routes to this context.
|
||||
|
||||
Acts as a form of "relay" for a remote error raised
|
||||
in the corresponding remote callee task.
|
||||
Acts as a form of "relay" for a remote error raised in the
|
||||
corresponding remote task's `Context` wherein the next time
|
||||
the local task exectutes a checkpoint, a `trio.Cancelled`
|
||||
will be raised and depending on the type and source of the
|
||||
original remote error, and whether or not the local task
|
||||
called `.cancel()` itself prior, an equivalent
|
||||
`ContextCancelled` or `RemoteActorError` wrapping the
|
||||
remote error may be raised here by any of,
|
||||
|
||||
- `Portal.open_context()`
|
||||
- `Portal.result()`
|
||||
- `Context.open_stream()`
|
||||
- `Context.result()`
|
||||
|
||||
when called/closed by actor local task(s).
|
||||
|
||||
NOTEs & TODOs:
|
||||
- It is expected that the caller has previously unwrapped
|
||||
the remote error using a call to `unpack_error()` and
|
||||
provides that output exception value as the input
|
||||
`error` argument here.
|
||||
- If this is an error message from a context opened by
|
||||
`Portal.open_context()` we want to interrupt any
|
||||
ongoing local tasks operating within that `Context`'s
|
||||
cancel-scope so as to be notified ASAP of the remote
|
||||
error and engage any caller handling (eg. for
|
||||
cross-process task supervision).
|
||||
- In some cases we may want to raise the remote error
|
||||
immediately since there is no guarantee the locally
|
||||
operating task(s) will attempt to execute a checkpoint
|
||||
any time soon; in such cases there are 2 possible
|
||||
approaches depending on the current task's work and
|
||||
wrapping "thread" type:
|
||||
|
||||
- `trio`-native-and-graceful: only ever wait for tasks
|
||||
to exec a next `trio.lowlevel.checkpoint()` assuming
|
||||
that any such task must do so to interact with the
|
||||
actor runtime and IPC interfaces.
|
||||
|
||||
- (NOT IMPLEMENTED) system-level-aggressive: maybe we
|
||||
could eventually interrupt sync code (invoked using
|
||||
`trio.to_thread` or some other adapter layer) with
|
||||
a signal (a custom unix one for example?
|
||||
https://stackoverflow.com/a/5744185) depending on the
|
||||
task's wrapping thread-type such that long running
|
||||
sync code should never cause the delay of actor
|
||||
supervision tasks such as cancellation and respawn
|
||||
logic.
|
||||
|
||||
'''
|
||||
# If this is an error message from a context opened by
|
||||
# ``Portal.open_context()`` we want to interrupt any ongoing
|
||||
# (child) tasks within that context to be notified of the remote
|
||||
# error relayed here.
|
||||
#
|
||||
# The reason we may want to raise the remote error immediately
|
||||
# is that there is no guarantee the associated local task(s)
|
||||
# will attempt to read from any locally opened stream any time
|
||||
# soon.
|
||||
#
|
||||
# NOTE: this only applies when
|
||||
# ``Portal.open_context()`` has been called since it is assumed
|
||||
# (currently) that other portal APIs (``Portal.run()``,
|
||||
# ``.run_in_actor()``) do their own error checking at the point
|
||||
# of the call and result processing.
|
||||
error = unpack_error(
|
||||
error_msg,
|
||||
self.chan,
|
||||
)
|
||||
# XXX: currently this should only be used when
|
||||
# `Portal.open_context()` has been opened since it's
|
||||
# assumed that other portal APIs like,
|
||||
# - `Portal.run()`,
|
||||
# - `ActorNursery.run_in_actor()`
|
||||
# do their own error checking at their own call points and
|
||||
# result processing.
|
||||
|
||||
# XXX: set the remote side's error so that after we cancel
|
||||
# whatever task is the opener of this context it can raise
|
||||
# that error as the reason.
|
||||
self._remote_error = error
|
||||
self._remote_error: BaseException = error
|
||||
|
||||
# always record the remote actor's uid since its cancellation
|
||||
# state is directly linked to ours (the local one).
|
||||
|
@ -232,35 +292,25 @@ class Context:
|
|||
else:
|
||||
log.error(
|
||||
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
|
||||
f'{error_msg["error"]["tb_str"]}'
|
||||
f'{error}'
|
||||
)
|
||||
# TODO: tempted to **not** do this by-reraising in a
|
||||
# nursery and instead cancel a surrounding scope, detect
|
||||
# the cancellation, then lookup the error that was set?
|
||||
# YES! this is way better and simpler!
|
||||
if (
|
||||
self._scope
|
||||
):
|
||||
if self._scope:
|
||||
# from trio.testing import wait_all_tasks_blocked
|
||||
# await wait_all_tasks_blocked()
|
||||
# self._cancelled_remote = self.chan.uid
|
||||
self._scope.cancel()
|
||||
|
||||
# NOTE: this usage actually works here B)
|
||||
# from ._debug import breakpoint
|
||||
# await breakpoint()
|
||||
|
||||
# XXX: this will break early callee results sending
|
||||
# since when `.result()` is finally called, this
|
||||
# chan will be closed..
|
||||
# if self._recv_chan:
|
||||
# await self._recv_chan.aclose()
|
||||
# this REPL usage actually works here BD
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
|
||||
async def cancel(
|
||||
self,
|
||||
msg: str | None = None,
|
||||
timeout: float = 0.616,
|
||||
# timeout: float = 1000,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -270,15 +320,12 @@ class Context:
|
|||
Timeout quickly in an attempt to sidestep 2-generals...
|
||||
|
||||
'''
|
||||
side = 'caller' if self._portal else 'callee'
|
||||
if msg:
|
||||
assert side == 'callee', 'Only callee side can provide cancel msg'
|
||||
side: str = 'caller' if self._portal else 'callee'
|
||||
log.cancel(
|
||||
f'Cancelling {side} side of context to {self.chan.uid}'
|
||||
)
|
||||
|
||||
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
|
||||
|
||||
self._cancel_called = True
|
||||
# await _debug.breakpoint()
|
||||
# breakpoint()
|
||||
self._cancel_called: bool = True
|
||||
|
||||
if side == 'caller':
|
||||
if not self._portal:
|
||||
|
@ -286,12 +333,13 @@ class Context:
|
|||
"No portal found, this is likely a callee side context"
|
||||
)
|
||||
|
||||
cid = self.cid
|
||||
cid: str = self.cid
|
||||
with trio.move_on_after(timeout) as cs:
|
||||
cs.shield = True
|
||||
log.cancel(
|
||||
f"Cancelling stream {cid} to "
|
||||
f"{self._portal.channel.uid}")
|
||||
f'Cancelling stream {cid} to '
|
||||
f'{self._portal.channel.uid}'
|
||||
)
|
||||
|
||||
# NOTE: we're telling the far end actor to cancel a task
|
||||
# corresponding to *this actor*. The far end local channel
|
||||
|
@ -310,17 +358,17 @@ class Context:
|
|||
# if not self._portal.channel.connected():
|
||||
if not self.chan.connected():
|
||||
log.cancel(
|
||||
"May have failed to cancel remote task "
|
||||
f"{cid} for {self._portal.channel.uid}")
|
||||
'May have failed to cancel remote task '
|
||||
f'{cid} for {self._portal.channel.uid}'
|
||||
)
|
||||
else:
|
||||
log.cancel(
|
||||
"Timed out on cancelling remote task "
|
||||
f"{cid} for {self._portal.channel.uid}")
|
||||
'Timed out on cancel request of remote task '
|
||||
f'{cid} for {self._portal.channel.uid}'
|
||||
)
|
||||
|
||||
# callee side remote task
|
||||
else:
|
||||
self._cancel_msg = msg
|
||||
|
||||
# TODO: should we have an explicit cancel message
|
||||
# or is relaying the local `trio.Cancelled` as an
|
||||
# {'error': trio.Cancelled, cid: "blah"} enough?
|
||||
|
@ -331,7 +379,6 @@ class Context:
|
|||
|
||||
@acm
|
||||
async def open_stream(
|
||||
|
||||
self,
|
||||
allow_overruns: bool | None = False,
|
||||
msg_buffer_size: int | None = None,
|
||||
|
@ -350,10 +397,10 @@ class Context:
|
|||
``Portal.open_context()``. In the future this may change but
|
||||
currently there seems to be no obvious reason to support
|
||||
"re-opening":
|
||||
- pausing a stream can be done with a message.
|
||||
- task errors will normally require a restart of the entire
|
||||
scope of the inter-actor task context due to the nature of
|
||||
``trio``'s cancellation system.
|
||||
- pausing a stream can be done with a message.
|
||||
- task errors will normally require a restart of the entire
|
||||
scope of the inter-actor task context due to the nature of
|
||||
``trio``'s cancellation system.
|
||||
|
||||
'''
|
||||
actor = current_actor()
|
||||
|
@ -435,18 +482,19 @@ class Context:
|
|||
self,
|
||||
err: Exception,
|
||||
) -> None:
|
||||
'''
|
||||
Maybe raise a remote error depending on who (which task from
|
||||
which actor) requested a cancellation (if any).
|
||||
|
||||
'''
|
||||
# NOTE: whenever the context's "opener" side (task) **is**
|
||||
# the side which requested the cancellation (likekly via
|
||||
# ``Context.cancel()``), we don't want to re-raise that
|
||||
# cancellation signal locally (would be akin to
|
||||
# a ``trio.Nursery`` nursery raising ``trio.Cancelled``
|
||||
# whenever ``CancelScope.cancel()`` was called) and instead
|
||||
# silently reap the expected cancellation "error"-msg.
|
||||
# if 'pikerd' in err.msgdata['tb_str']:
|
||||
# # from . import _debug
|
||||
# # await _debug.breakpoint()
|
||||
# breakpoint()
|
||||
|
||||
# whenever ``CancelScope.cancel()`` was called) and
|
||||
# instead silently reap the expected cancellation
|
||||
# "error"-msg.
|
||||
if (
|
||||
isinstance(err, ContextCancelled)
|
||||
and (
|
||||
|
@ -457,7 +505,18 @@ class Context:
|
|||
):
|
||||
return err
|
||||
|
||||
raise err # from None
|
||||
# NOTE: currently we are masking underlying runtime errors
|
||||
# which are often superfluous to user handler code. not
|
||||
# sure if this is still needed / desired for all operation?
|
||||
# TODO: maybe we can only NOT mask if:
|
||||
# - [ ] debug mode is enabled or,
|
||||
# - [ ] a certain log level is set?
|
||||
# - [ ] consider using `.with_traceback()` to filter out
|
||||
# runtime frames from the tb explicitly?
|
||||
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
|
||||
# https://stackoverflow.com/a/24752607
|
||||
__tracebackhide__: bool = True
|
||||
raise err from None
|
||||
|
||||
async def result(self) -> Any | Exception:
|
||||
'''
|
||||
|
@ -485,16 +544,12 @@ class Context:
|
|||
of the remote cancellation.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = True
|
||||
assert self._portal, "Context.result() can not be called from callee!"
|
||||
assert self._recv_chan
|
||||
|
||||
# from . import _debug
|
||||
# await _debug.breakpoint()
|
||||
|
||||
re = self._remote_error
|
||||
if re:
|
||||
self._maybe_raise_remote_err(re)
|
||||
return re
|
||||
if re := self._remote_error:
|
||||
return self._maybe_raise_remote_err(re)
|
||||
|
||||
if (
|
||||
self._result == id(self)
|
||||
|
@ -505,9 +560,9 @@ class Context:
|
|||
# and discarding any bi dir stream msgs still
|
||||
# in transit from the far end.
|
||||
while True:
|
||||
msg = await self._recv_chan.receive()
|
||||
try:
|
||||
self._result = msg['return']
|
||||
msg = await self._recv_chan.receive()
|
||||
self._result: Any = msg['return']
|
||||
|
||||
# NOTE: we don't need to do this right?
|
||||
# XXX: only close the rx mem chan AFTER
|
||||
|
@ -516,6 +571,26 @@ class Context:
|
|||
# await self._recv_chan.aclose()
|
||||
|
||||
break
|
||||
|
||||
# NOTE: we get here if the far end was
|
||||
# `ContextCancelled` in 2 cases:
|
||||
# 1. we requested the cancellation and thus
|
||||
# SHOULD NOT raise that far end error,
|
||||
# 2. WE DID NOT REQUEST that cancel and thus
|
||||
# SHOULD RAISE HERE!
|
||||
except trio.Cancelled:
|
||||
|
||||
# CASE 2: mask the local cancelled-error(s)
|
||||
# only when we are sure the remote error is the
|
||||
# (likely) source cause of this local runtime
|
||||
# task's cancellation.
|
||||
if re := self._remote_error:
|
||||
self._maybe_raise_remote_err(re)
|
||||
|
||||
# CASE 1: we DID request the cancel we simply
|
||||
# continue to bubble up as normal.
|
||||
raise
|
||||
|
||||
except KeyError: # as msgerr:
|
||||
|
||||
if 'yield' in msg:
|
||||
|
@ -529,7 +604,8 @@ class Context:
|
|||
|
||||
# internal error should never get here
|
||||
assert msg.get('cid'), (
|
||||
"Received internal error at portal?")
|
||||
"Received internal error at portal?"
|
||||
)
|
||||
|
||||
err = unpack_error(
|
||||
msg,
|
||||
|
@ -537,9 +613,12 @@ class Context:
|
|||
) # from msgerr
|
||||
|
||||
err = self._maybe_raise_remote_err(err)
|
||||
self._remote_err = err
|
||||
self._remote_error = err
|
||||
|
||||
return self._remote_error or self._result
|
||||
if re := self._remote_error:
|
||||
return self._maybe_raise_remote_err(re)
|
||||
|
||||
return self._result
|
||||
|
||||
async def started(
|
||||
self,
|
||||
|
@ -548,7 +627,7 @@ class Context:
|
|||
) -> None:
|
||||
'''
|
||||
Indicate to calling actor's task that this linked context
|
||||
has started and send ``value`` to the other side.
|
||||
has started and send ``value`` to the other side via IPC.
|
||||
|
||||
On the calling side ``value`` is the second item delivered
|
||||
in the tuple returned by ``Portal.open_context()``.
|
||||
|
@ -556,19 +635,17 @@ class Context:
|
|||
'''
|
||||
if self._portal:
|
||||
raise RuntimeError(
|
||||
f"Caller side context {self} can not call started!")
|
||||
f'Caller side context {self} can not call started!'
|
||||
)
|
||||
|
||||
elif self._started_called:
|
||||
raise RuntimeError(
|
||||
f"called 'started' twice on context with {self.chan.uid}")
|
||||
f'called `.started()` twice on context with {self.chan.uid}'
|
||||
)
|
||||
|
||||
await self.chan.send({'started': value, 'cid': self.cid})
|
||||
self._started_called = True
|
||||
|
||||
# TODO: do we need a restart api?
|
||||
# async def restart(self) -> None:
|
||||
# pass
|
||||
|
||||
async def _drain_overflows(
|
||||
self,
|
||||
) -> None:
|
||||
|
@ -623,10 +700,21 @@ class Context:
|
|||
self,
|
||||
msg: dict,
|
||||
|
||||
draining: bool = False,
|
||||
# draining: bool = False,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
Deliver an IPC msg received from a transport-channel to
|
||||
this context's underlying mem chan for handling by
|
||||
user operating tasks; deliver a bool indicating whether the
|
||||
msg was immediately sent.
|
||||
|
||||
If `._allow_overruns == True` (maybe) append the msg to an
|
||||
"overflow queue" and start a "drainer task" (inside the
|
||||
`._scope_nursery: trio.Nursery`) which ensures that such
|
||||
messages are eventually sent if possible.
|
||||
|
||||
'''
|
||||
cid = self.cid
|
||||
chan = self.chan
|
||||
uid = chan.uid
|
||||
|
@ -637,8 +725,12 @@ class Context:
|
|||
)
|
||||
|
||||
error = msg.get('error')
|
||||
if error:
|
||||
await self._maybe_cancel_and_set_remote_error(msg)
|
||||
if error := unpack_error(
|
||||
msg,
|
||||
self.chan,
|
||||
):
|
||||
self._cancel_msg = msg
|
||||
await self._maybe_cancel_and_set_remote_error(error)
|
||||
|
||||
if (
|
||||
self._in_overrun
|
||||
|
@ -670,6 +762,7 @@ class Context:
|
|||
# the sender; the main motivation is that using bp can block the
|
||||
# msg handling loop which calls into this method!
|
||||
except trio.WouldBlock:
|
||||
|
||||
# XXX: always push an error even if the local
|
||||
# receiver is in overrun state.
|
||||
# await self._maybe_cancel_and_set_remote_error(msg)
|
||||
|
|
|
@ -39,8 +39,11 @@ class ActorFailure(Exception):
|
|||
|
||||
|
||||
class RemoteActorError(Exception):
|
||||
'''
|
||||
Remote actor exception bundled locally
|
||||
|
||||
'''
|
||||
# TODO: local recontruction of remote exception deats
|
||||
"Remote actor exception bundled locally"
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
|
@ -110,18 +113,24 @@ class AsyncioCancelled(Exception):
|
|||
|
||||
def pack_error(
|
||||
exc: BaseException,
|
||||
tb=None,
|
||||
tb: str | None = None,
|
||||
|
||||
) -> dict[str, Any]:
|
||||
"""Create an "error message" for tranmission over
|
||||
a channel (aka the wire).
|
||||
"""
|
||||
) -> dict[str, dict]:
|
||||
'''
|
||||
Create an "error message" encoded for wire transport via an IPC
|
||||
`Channel`; expected to be unpacked on the receiver side using
|
||||
`unpack_error()` below.
|
||||
|
||||
'''
|
||||
if tb:
|
||||
tb_str = ''.join(traceback.format_tb(tb))
|
||||
else:
|
||||
tb_str = traceback.format_exc()
|
||||
|
||||
error_msg = {
|
||||
error_msg: dict[
|
||||
str,
|
||||
str | tuple[str, str]
|
||||
] = {
|
||||
'tb_str': tb_str,
|
||||
'type_str': type(exc).__name__,
|
||||
'src_actor_uid': current_actor().uid,
|
||||
|
@ -139,23 +148,33 @@ def unpack_error(
|
|||
chan=None,
|
||||
err_type=RemoteActorError
|
||||
|
||||
) -> Exception:
|
||||
) -> None | Exception:
|
||||
'''
|
||||
Unpack an 'error' message from the wire
|
||||
into a local ``RemoteActorError``.
|
||||
into a local `RemoteActorError` (subtype).
|
||||
|
||||
NOTE: this routine DOES not RAISE the embedded remote error,
|
||||
which is the responsibilitiy of the caller.
|
||||
|
||||
'''
|
||||
__tracebackhide__ = True
|
||||
error = msg['error']
|
||||
__tracebackhide__: bool = True
|
||||
|
||||
tb_str = error.get('tb_str', '')
|
||||
message = f"{chan.uid}\n" + tb_str
|
||||
type_name = error['type_str']
|
||||
error_dict: dict[str, dict] | None
|
||||
if (
|
||||
error_dict := msg.get('error')
|
||||
) is None:
|
||||
# no error field, nothing to unpack.
|
||||
return None
|
||||
|
||||
# retrieve the remote error's msg encoded details
|
||||
tb_str: str = error_dict.get('tb_str', '')
|
||||
message: str = f'{chan.uid}\n' + tb_str
|
||||
type_name: str = error_dict['type_str']
|
||||
suberror_type: Type[BaseException] = Exception
|
||||
|
||||
if type_name == 'ContextCancelled':
|
||||
err_type = ContextCancelled
|
||||
suberror_type = RemoteActorError
|
||||
suberror_type = err_type
|
||||
|
||||
else: # try to lookup a suitable local error type
|
||||
for ns in [
|
||||
|
@ -164,18 +183,19 @@ def unpack_error(
|
|||
eg,
|
||||
trio,
|
||||
]:
|
||||
try:
|
||||
suberror_type = getattr(ns, type_name)
|
||||
if suberror_type := getattr(
|
||||
ns,
|
||||
type_name,
|
||||
False,
|
||||
):
|
||||
break
|
||||
except AttributeError:
|
||||
continue
|
||||
|
||||
exc = err_type(
|
||||
message,
|
||||
suberror_type=suberror_type,
|
||||
|
||||
# unpack other fields into error type init
|
||||
**msg['error'],
|
||||
**error_dict,
|
||||
)
|
||||
|
||||
return exc
|
||||
|
|
|
@ -15,8 +15,12 @@
|
|||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Memory boundary "Portals": an API for structured
|
||||
concurrency linked tasks running in disparate memory domains.
|
||||
Memory "portal" contruct.
|
||||
|
||||
"Memory portals" are both an API and set of IPC wrapping primitives
|
||||
for managing structured concurrency "cancel-scope linked" tasks
|
||||
running in disparate virtual memory domains - at least in different
|
||||
OS processes, possibly on different (hardware) hosts.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
|
@ -66,20 +70,21 @@ def _unwrap_msg(
|
|||
raise unpack_error(msg, channel) from None
|
||||
|
||||
|
||||
# TODO: maybe move this to ._exceptions?
|
||||
class MessagingError(Exception):
|
||||
'Some kind of unexpected SC messaging dialog issue'
|
||||
|
||||
|
||||
class Portal:
|
||||
'''
|
||||
A 'portal' to a(n) (remote) ``Actor``.
|
||||
A 'portal' to a memory-domain-separated `Actor`.
|
||||
|
||||
A portal is "opened" (and eventually closed) by one side of an
|
||||
inter-actor communication context. The side which opens the portal
|
||||
is equivalent to a "caller" in function parlance and usually is
|
||||
either the called actor's parent (in process tree hierarchy terms)
|
||||
or a client interested in scheduling work to be done remotely in a
|
||||
far process.
|
||||
process which has a separate (virtual) memory domain.
|
||||
|
||||
The portal api allows the "caller" actor to invoke remote routines
|
||||
and receive results through an underlying ``tractor.Channel`` as
|
||||
|
@ -89,9 +94,9 @@ class Portal:
|
|||
like having a "portal" between the seperate actor memory spaces.
|
||||
|
||||
'''
|
||||
# the timeout for a remote cancel request sent to
|
||||
# a(n) (peer) actor.
|
||||
cancel_timeout = 0.5
|
||||
# global timeout for remote cancel requests sent to
|
||||
# connected (peer) actors.
|
||||
cancel_timeout: float = 0.5
|
||||
|
||||
def __init__(self, channel: Channel) -> None:
|
||||
self.channel = channel
|
||||
|
@ -191,7 +196,15 @@ class Portal:
|
|||
|
||||
) -> bool:
|
||||
'''
|
||||
Cancel the actor on the other end of this portal.
|
||||
Cancel the actor runtime (and thus process) on the far
|
||||
end of this portal.
|
||||
|
||||
**NOTE** THIS CANCELS THE ENTIRE RUNTIME AND THE
|
||||
SUBPROCESS, it DOES NOT just cancel the remote task. If you
|
||||
want to have a handle to cancel a remote ``tri.Task`` look
|
||||
at `.open_context()` and the definition of
|
||||
`._context.Context.cancel()` which CAN be used for this
|
||||
purpose.
|
||||
|
||||
'''
|
||||
if not self.channel.connected():
|
||||
|
@ -385,12 +398,32 @@ class Portal:
|
|||
|
||||
) -> AsyncGenerator[tuple[Context, Any], None]:
|
||||
'''
|
||||
Open an inter-actor task context.
|
||||
Open an inter-actor "task context"; a remote task is
|
||||
scheduled and cancel-scope-state-linked to a `trio.run()` across
|
||||
memory boundaries in another actor's runtime.
|
||||
|
||||
This is a synchronous API which allows for deterministic
|
||||
setup/teardown of a remote task. The yielded ``Context`` further
|
||||
allows for opening bidirectional streams, explicit cancellation
|
||||
and synchronized final result collection. See ``tractor.Context``.
|
||||
This is an `@acm` API which allows for deterministic setup
|
||||
and teardown of a remotely scheduled task in another remote
|
||||
actor. Once opened, the 2 now "linked" tasks run completely
|
||||
in parallel in each actor's runtime with their enclosing
|
||||
`trio.CancelScope`s kept in a synced state wherein if
|
||||
either side errors or cancels an equivalent error is
|
||||
relayed to the other side via an SC-compat IPC protocol.
|
||||
|
||||
The yielded `tuple` is a pair delivering a `tractor.Context`
|
||||
and any first value "sent" by the "callee" task via a call
|
||||
to `Context.started(<value: Any>)`; this side of the
|
||||
context does not unblock until the "callee" task calls
|
||||
`.started()` in similar style to `trio.Nursery.start()`.
|
||||
When the "callee" (side that is "called"/started by a call
|
||||
to *this* method) returns, the caller side (this) unblocks
|
||||
and any final value delivered from the other end can be
|
||||
retrieved using the `Contex.result()` api.
|
||||
|
||||
The yielded ``Context`` instance further allows for opening
|
||||
bidirectional streams, explicit cancellation and
|
||||
structurred-concurrency-synchronized final result-msg
|
||||
collection. See ``tractor.Context`` for more details.
|
||||
|
||||
'''
|
||||
# conduct target func method structural checks
|
||||
|
@ -423,47 +456,52 @@ class Portal:
|
|||
)
|
||||
|
||||
assert ctx._remote_func_type == 'context'
|
||||
msg = await ctx._recv_chan.receive()
|
||||
msg: dict = await ctx._recv_chan.receive()
|
||||
|
||||
try:
|
||||
# the "first" value here is delivered by the callee's
|
||||
# ``Context.started()`` call.
|
||||
first = msg['started']
|
||||
ctx._started_called = True
|
||||
ctx._started_called: bool = True
|
||||
|
||||
except KeyError:
|
||||
assert msg.get('cid'), ("Received internal error at context?")
|
||||
if not (cid := msg.get('cid')):
|
||||
raise MessagingError(
|
||||
'Received internal error at context?\n'
|
||||
'No call-id (cid) in startup msg?'
|
||||
)
|
||||
|
||||
if msg.get('error'):
|
||||
# raise kerr from unpack_error(msg, self.channel)
|
||||
# NOTE: mask the key error with the remote one
|
||||
raise unpack_error(msg, self.channel) from None
|
||||
else:
|
||||
raise MessagingError(
|
||||
f'Context for {ctx.cid} was expecting a `started` message'
|
||||
f' but received a non-error msg:\n{pformat(msg)}'
|
||||
f'Context for {cid} was expecting a `started` message'
|
||||
' but received a non-error msg:\n'
|
||||
f'{pformat(msg)}'
|
||||
)
|
||||
|
||||
_err: BaseException | None = None
|
||||
ctx._portal: Portal = self
|
||||
|
||||
uid: tuple = self.channel.uid
|
||||
cid: str = ctx.cid
|
||||
etype: Type[BaseException] | None = None
|
||||
|
||||
# deliver context instance and .started() msg value in enter
|
||||
# tuple.
|
||||
# placeholder for any exception raised in the runtime
|
||||
# or by user tasks which cause this context's closure.
|
||||
scope_err: BaseException | None = None
|
||||
try:
|
||||
async with trio.open_nursery() as nurse:
|
||||
ctx._scope_nursery = nurse
|
||||
ctx._scope = nurse.cancel_scope
|
||||
ctx._scope_nursery: trio.Nursery = nurse
|
||||
ctx._scope: trio.CancelScope = nurse.cancel_scope
|
||||
|
||||
# deliver context instance and .started() msg value
|
||||
# in enter tuple.
|
||||
yield ctx, first
|
||||
|
||||
# when in allow_ovveruns mode there may be lingering
|
||||
# overflow sender tasks remaining?
|
||||
# when in allow_overruns mode there may be
|
||||
# lingering overflow sender tasks remaining?
|
||||
if nurse.child_tasks:
|
||||
# ensure we are in overrun state with
|
||||
# ``._allow_overruns=True`` bc otherwise
|
||||
# XXX: ensure we are in overrun state
|
||||
# with ``._allow_overruns=True`` bc otherwise
|
||||
# there should be no tasks in this nursery!
|
||||
if (
|
||||
not ctx._allow_overruns
|
||||
|
@ -471,47 +509,72 @@ class Portal:
|
|||
):
|
||||
raise RuntimeError(
|
||||
'Context has sub-tasks but is '
|
||||
'not in `allow_overruns=True` Mode!?'
|
||||
'not in `allow_overruns=True` mode!?'
|
||||
)
|
||||
|
||||
# ensure cancel of all overflow sender tasks
|
||||
# started in the ctx nursery.
|
||||
ctx._scope.cancel()
|
||||
|
||||
except ContextCancelled as err:
|
||||
_err = err
|
||||
# XXX: (maybe) shield/mask context-cancellations that were
|
||||
# initiated by any of the context's 2 tasks. There are
|
||||
# subsequently 2 operating cases for a "graceful cancel"
|
||||
# of a `Context`:
|
||||
#
|
||||
# 1.*this* side's task called `Context.cancel()`, in
|
||||
# which case we mask the `ContextCancelled` from bubbling
|
||||
# to the opener (much like how `trio.Nursery` swallows
|
||||
# any `trio.Cancelled` bubbled by a call to
|
||||
# `Nursery.cancel_scope.cancel()`)
|
||||
#
|
||||
# 2.*the other* side's (callee/spawned) task cancelled due
|
||||
# to a self or peer cancellation request in which case we
|
||||
# DO let the error bubble to the opener.
|
||||
except ContextCancelled as ctxc:
|
||||
scope_err = ctxc
|
||||
|
||||
# swallow and mask cross-actor task context cancels that
|
||||
# were initiated by *this* side's task.
|
||||
# CASE 1: this context was never cancelled
|
||||
# via a local task's call to `Context.cancel()`.
|
||||
if not ctx._cancel_called:
|
||||
# XXX: this should NEVER happen!
|
||||
# from ._debug import breakpoint
|
||||
# await breakpoint()
|
||||
raise
|
||||
|
||||
# if the context was cancelled by client code
|
||||
# then we don't need to raise since user code
|
||||
# is expecting this and the block should exit.
|
||||
# CASE 2: context was cancelled by local task calling
|
||||
# `.cancel()`, we don't raise and the exit block should
|
||||
# exit silently.
|
||||
else:
|
||||
log.debug(f'Context {ctx} cancelled gracefully')
|
||||
log.debug(
|
||||
f'Context {ctx} cancelled gracefully with:\n'
|
||||
f'{ctxc}'
|
||||
)
|
||||
|
||||
except (
|
||||
BaseException,
|
||||
# - a standard error in the caller/yieldee
|
||||
Exception,
|
||||
|
||||
# more specifically, we need to handle these but not
|
||||
# sure it's worth being pedantic:
|
||||
# Exception,
|
||||
# trio.Cancelled,
|
||||
# KeyboardInterrupt,
|
||||
# - a runtime teardown exception-group and/or
|
||||
# cancellation request from a caller task.
|
||||
BaseExceptionGroup,
|
||||
trio.Cancelled,
|
||||
KeyboardInterrupt,
|
||||
|
||||
) as err:
|
||||
etype = type(err)
|
||||
scope_err = err
|
||||
|
||||
# cancel ourselves on any error.
|
||||
# XXX: request cancel of this context on any error.
|
||||
# NOTE: `Context.cancel()` is conversely NOT called in
|
||||
# the `ContextCancelled` "cancellation requested" case
|
||||
# above.
|
||||
log.cancel(
|
||||
'Context cancelled for task, sending cancel request..\n'
|
||||
'Context cancelled for task due to\n'
|
||||
f'{err}\n'
|
||||
'Sending cancel request..\n'
|
||||
f'task:{cid}\n'
|
||||
f'actor:{uid}'
|
||||
)
|
||||
try:
|
||||
|
||||
await ctx.cancel()
|
||||
except trio.BrokenResourceError:
|
||||
log.warning(
|
||||
|
@ -520,8 +583,9 @@ class Portal:
|
|||
f'actor:{uid}'
|
||||
)
|
||||
|
||||
raise
|
||||
raise # duh
|
||||
|
||||
# no scope error case
|
||||
else:
|
||||
if ctx.chan.connected():
|
||||
log.info(
|
||||
|
@ -529,10 +593,20 @@ class Portal:
|
|||
f'task: {cid}\n'
|
||||
f'actor: {uid}'
|
||||
)
|
||||
# XXX NOTE XXX: the below call to
|
||||
# `Context.result()` will ALWAYS raise
|
||||
# a `ContextCancelled` (via an embedded call to
|
||||
# `Context._maybe_raise_remote_err()`) IFF
|
||||
# a `Context._remote_error` was set by the runtime
|
||||
# via a call to
|
||||
# `Context._maybe_cancel_and_set_remote_error()`
|
||||
# which IS SET any time the far end fails and
|
||||
# causes "caller side" cancellation via
|
||||
# a `ContextCancelled` here.
|
||||
result = await ctx.result()
|
||||
log.runtime(
|
||||
f'Context {fn_name} returned '
|
||||
f'value from callee `{result}`'
|
||||
f'Context {fn_name} returned value from callee:\n'
|
||||
f'`{result}`'
|
||||
)
|
||||
|
||||
finally:
|
||||
|
@ -540,22 +614,73 @@ class Portal:
|
|||
# operating *in* this scope to have survived
|
||||
# we tear down the runtime feeder chan last
|
||||
# to avoid premature stream clobbers.
|
||||
if ctx._recv_chan is not None:
|
||||
# should we encapsulate this in the context api?
|
||||
await ctx._recv_chan.aclose()
|
||||
rxchan: trio.ReceiveChannel = ctx._recv_chan
|
||||
if (
|
||||
rxchan
|
||||
|
||||
if etype:
|
||||
# maybe TODO: yes i know the below check is
|
||||
# touching `trio` memchan internals..BUT, there are
|
||||
# only a couple ways to avoid a `trio.Cancelled`
|
||||
# bubbling from the `.aclose()` call below:
|
||||
#
|
||||
# - catch and mask it via the cancel-scope-shielded call
|
||||
# as we are rn (manual and frowned upon) OR,
|
||||
# - specially handle the case where `scope_err` is
|
||||
# one of {`BaseExceptionGroup`, `trio.Cancelled`}
|
||||
# and then presume that the `.aclose()` call will
|
||||
# raise a `trio.Cancelled` and just don't call it
|
||||
# in those cases..
|
||||
#
|
||||
# that latter approach is more logic, LOC, and more
|
||||
# convoluted so for now stick with the first
|
||||
# psuedo-hack-workaround where we just try to avoid
|
||||
# the shielded call as much as we can detect from
|
||||
# the memchan's `._closed` state..
|
||||
#
|
||||
# XXX MOTIVATION XXX-> we generally want to raise
|
||||
# any underlying actor-runtime/internals error that
|
||||
# surfaces from a bug in tractor itself so it can
|
||||
# be easily detected/fixed AND, we also want to
|
||||
# minimize noisy runtime tracebacks (normally due
|
||||
# to the cross-actor linked task scope machinery
|
||||
# teardown) displayed to user-code and instead only
|
||||
# displaying `ContextCancelled` traces where the
|
||||
# cause of crash/exit IS due to something in
|
||||
# user/app code on either end of the context.
|
||||
and not rxchan._closed
|
||||
):
|
||||
# XXX NOTE XXX: and again as per above, we mask any
|
||||
# `trio.Cancelled` raised here so as to NOT mask
|
||||
# out any exception group or legit (remote) ctx
|
||||
# error that sourced from the remote task or its
|
||||
# runtime.
|
||||
with trio.CancelScope(shield=True):
|
||||
await ctx._recv_chan.aclose()
|
||||
|
||||
# XXX: since we always (maybe) re-raise (and thus also
|
||||
# mask runtime machinery related
|
||||
# multi-`trio.Cancelled`s) any scope error which was
|
||||
# the underlying cause of this context's exit, add
|
||||
# different log msgs for each of the (2) cases.
|
||||
if scope_err is not None:
|
||||
etype: Type[BaseException] = type(scope_err)
|
||||
|
||||
# CASE 2
|
||||
if ctx._cancel_called:
|
||||
log.cancel(
|
||||
f'Context {fn_name} cancelled by caller with\n{etype}'
|
||||
f'Context {fn_name} cancelled by caller with\n'
|
||||
f'{etype}'
|
||||
)
|
||||
elif _err is not None:
|
||||
|
||||
# CASE 1
|
||||
else:
|
||||
log.cancel(
|
||||
f'Context for task cancelled by callee with {etype}\n'
|
||||
f'Context cancelled by callee with {etype}\n'
|
||||
f'target: `{fn_name}`\n'
|
||||
f'task:{cid}\n'
|
||||
f'actor:{uid}'
|
||||
)
|
||||
|
||||
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
||||
# wait for any immediate child in debug before popping the
|
||||
# context from the runtime msg loop otherwise inside
|
||||
|
@ -564,10 +689,9 @@ class Portal:
|
|||
# a "stop" msg for a stream), this can result in a deadlock
|
||||
# where the root is waiting on the lock to clear but the
|
||||
# child has already cleared it and clobbered IPC.
|
||||
from ._debug import maybe_wait_for_debugger
|
||||
await maybe_wait_for_debugger()
|
||||
|
||||
# remove the context from runtime tracking
|
||||
# FINALLY, remove the context from runtime tracking and
|
||||
# exit Bo
|
||||
self.actor._contexts.pop(
|
||||
(self.channel.uid, ctx.cid),
|
||||
None,
|
||||
|
|
|
@ -347,14 +347,13 @@ async def _invoke(
|
|||
and ctx._enter_debugger_on_cancel
|
||||
)
|
||||
):
|
||||
# XXX: is there any case where we'll want to debug IPC
|
||||
# disconnects as a default?
|
||||
#
|
||||
# I can't think of a reason that inspecting
|
||||
# this type of failure will be useful for respawns or
|
||||
# recovery logic - the only case is some kind of strange bug
|
||||
# in our transport layer itself? Going to keep this
|
||||
# open ended for now.
|
||||
# XXX QUESTION XXX: is there any case where we'll
|
||||
# want to debug IPC disconnects as a default?
|
||||
# => I can't think of a reason that inspecting this
|
||||
# type of failure will be useful for respawns or
|
||||
# recovery logic - the only case is some kind of
|
||||
# strange bug in our transport layer itself? Going
|
||||
# to keep this open ended for now.
|
||||
entered_debug = await _debug._maybe_enter_pm(err)
|
||||
|
||||
if not entered_debug:
|
||||
|
@ -448,17 +447,18 @@ class Actor:
|
|||
(swappable) network protocols.
|
||||
|
||||
|
||||
Each "actor" is ``trio.run()`` scheduled "runtime" composed of many
|
||||
concurrent tasks in a single thread. The "runtime" tasks conduct
|
||||
a slew of low(er) level functions to make it possible for message
|
||||
passing between actors as well as the ability to create new actors
|
||||
(aka new "runtimes" in new processes which are supervised via
|
||||
a nursery construct). Each task which sends messages to a task in
|
||||
a "peer" (not necessarily a parent-child, depth hierarchy)) is able
|
||||
to do so via an "address", which maps IPC connections across memory
|
||||
boundaries, and task request id which allows for per-actor
|
||||
tasks to send and receive messages to specific peer-actor tasks with
|
||||
which there is an ongoing RPC/IPC dialog.
|
||||
Each "actor" is ``trio.run()`` scheduled "runtime" composed of
|
||||
many concurrent tasks in a single thread. The "runtime" tasks
|
||||
conduct a slew of low(er) level functions to make it possible
|
||||
for message passing between actors as well as the ability to
|
||||
create new actors (aka new "runtimes" in new processes which
|
||||
are supervised via a nursery construct). Each task which sends
|
||||
messages to a task in a "peer" (not necessarily a parent-child,
|
||||
depth hierarchy) is able to do so via an "address", which maps
|
||||
IPC connections across memory boundaries, and a task request id
|
||||
which allows for per-actor tasks to send and receive messages
|
||||
to specific peer-actor tasks with which there is an ongoing
|
||||
RPC/IPC dialog.
|
||||
|
||||
'''
|
||||
# ugh, we need to get rid of this and replace with a "registry" sys
|
||||
|
|
|
@ -199,6 +199,10 @@ async def do_hard_kill(
|
|||
proc: trio.Process,
|
||||
terminate_after: int = 3,
|
||||
|
||||
# NOTE: for mucking with `.pause()`-ing inside the runtime
|
||||
# whilst also hacking on it XD
|
||||
# terminate_after: int = 99999,
|
||||
|
||||
) -> None:
|
||||
# NOTE: this timeout used to do nothing since we were shielding
|
||||
# the ``.wait()`` inside ``new_proc()`` which will pretty much
|
||||
|
|
|
@ -54,6 +54,60 @@ log = get_logger(__name__)
|
|||
# messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
|
||||
# - use __slots__ on ``Context``?
|
||||
|
||||
def _raise_from_no_yield_msg(
|
||||
stream: MsgStream,
|
||||
msg: dict,
|
||||
src_err: KeyError,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
Raise an appopriate local error when a `MsgStream` msg arrives
|
||||
which does not contain the expected (under normal operation)
|
||||
`'yield'` field.
|
||||
|
||||
'''
|
||||
# internal error should never get here
|
||||
assert msg.get('cid'), ("Received internal error at portal?")
|
||||
|
||||
# TODO: handle 2 cases with 3.10+ match syntax
|
||||
# - 'stop'
|
||||
# - 'error'
|
||||
# possibly just handle msg['stop'] here!
|
||||
|
||||
if stream._closed:
|
||||
raise trio.ClosedResourceError('This stream was closed')
|
||||
|
||||
if msg.get('stop') or stream._eoc:
|
||||
log.debug(f"{stream} was stopped at remote end")
|
||||
|
||||
# XXX: important to set so that a new ``.receive()``
|
||||
# call (likely by another task using a broadcast receiver)
|
||||
# doesn't accidentally pull the ``return`` message
|
||||
# value out of the underlying feed mem chan!
|
||||
stream._eoc = True
|
||||
|
||||
# # when the send is closed we assume the stream has
|
||||
# # terminated and signal this local iterator to stop
|
||||
# await stream.aclose()
|
||||
|
||||
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
||||
# raise a ``StopAsyncIteration`` **and** in our catch
|
||||
# block below it will trigger ``.aclose()``.
|
||||
raise trio.EndOfChannel from src_err
|
||||
|
||||
# TODO: test that shows stream raising an expected error!!!
|
||||
elif msg.get('error'):
|
||||
# raise the error message
|
||||
raise unpack_error(msg, stream._ctx.chan)
|
||||
|
||||
# always re-raise the source error if no translation error
|
||||
# case is activated above.
|
||||
raise src_err
|
||||
# raise RuntimeError(
|
||||
# 'Unknown non-yield stream msg?\n'
|
||||
# f'{msg}'
|
||||
# )
|
||||
|
||||
|
||||
class MsgStream(trio.abc.Channel):
|
||||
'''
|
||||
|
@ -91,11 +145,20 @@ class MsgStream(trio.abc.Channel):
|
|||
# delegate directly to underlying mem channel
|
||||
def receive_nowait(self):
|
||||
msg = self._rx_chan.receive_nowait()
|
||||
return msg['yield']
|
||||
try:
|
||||
return msg['yield']
|
||||
except KeyError as kerr:
|
||||
_raise_from_no_yield_msg(
|
||||
stream=self,
|
||||
msg=msg,
|
||||
src_err=kerr,
|
||||
)
|
||||
|
||||
async def receive(self):
|
||||
'''Async receive a single msg from the IPC transport, the next
|
||||
in sequence for this stream.
|
||||
'''
|
||||
Receive a single msg from the IPC transport, the next in
|
||||
sequence sent by the far end task (possibly in order as
|
||||
determined by the underlying protocol).
|
||||
|
||||
'''
|
||||
# see ``.aclose()`` for notes on the old behaviour prior to
|
||||
|
@ -110,43 +173,12 @@ class MsgStream(trio.abc.Channel):
|
|||
msg = await self._rx_chan.receive()
|
||||
return msg['yield']
|
||||
|
||||
except KeyError as err:
|
||||
# internal error should never get here
|
||||
assert msg.get('cid'), ("Received internal error at portal?")
|
||||
|
||||
# TODO: handle 2 cases with 3.10 match syntax
|
||||
# - 'stop'
|
||||
# - 'error'
|
||||
# possibly just handle msg['stop'] here!
|
||||
|
||||
if self._closed:
|
||||
raise trio.ClosedResourceError('This stream was closed')
|
||||
|
||||
if msg.get('stop') or self._eoc:
|
||||
log.debug(f"{self} was stopped at remote end")
|
||||
|
||||
# XXX: important to set so that a new ``.receive()``
|
||||
# call (likely by another task using a broadcast receiver)
|
||||
# doesn't accidentally pull the ``return`` message
|
||||
# value out of the underlying feed mem chan!
|
||||
self._eoc = True
|
||||
|
||||
# # when the send is closed we assume the stream has
|
||||
# # terminated and signal this local iterator to stop
|
||||
# await self.aclose()
|
||||
|
||||
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
||||
# raise a ``StopAsyncIteration`` **and** in our catch
|
||||
# block below it will trigger ``.aclose()``.
|
||||
raise trio.EndOfChannel from err
|
||||
|
||||
# TODO: test that shows stream raising an expected error!!!
|
||||
elif msg.get('error'):
|
||||
# raise the error message
|
||||
raise unpack_error(msg, self._ctx.chan)
|
||||
|
||||
else:
|
||||
raise
|
||||
except KeyError as kerr:
|
||||
_raise_from_no_yield_msg(
|
||||
stream=self,
|
||||
msg=msg,
|
||||
src_err=kerr,
|
||||
)
|
||||
|
||||
except (
|
||||
trio.ClosedResourceError, # by self._rx_chan
|
||||
|
|
|
@ -193,15 +193,39 @@ def get_logger(
|
|||
'''
|
||||
log = rlog = logging.getLogger(_root_name)
|
||||
|
||||
if name and name != _proj_name:
|
||||
if (
|
||||
name
|
||||
and name != _proj_name
|
||||
):
|
||||
|
||||
# handling for modules that use ``get_logger(__name__)`` to
|
||||
# avoid duplicate project-package token in msg output
|
||||
rname, _, tail = name.partition('.')
|
||||
if rname == _root_name:
|
||||
name = tail
|
||||
# NOTE: for handling for modules that use ``get_logger(__name__)``
|
||||
# we make the following stylistic choice:
|
||||
# - always avoid duplicate project-package token
|
||||
# in msg output: i.e. tractor.tractor _ipc.py in header
|
||||
# looks ridiculous XD
|
||||
# - never show the leaf module name in the {name} part
|
||||
# since in python the {filename} is always this same
|
||||
# module-file.
|
||||
|
||||
sub_name: None | str = None
|
||||
rname, _, sub_name = name.partition('.')
|
||||
pkgpath, _, modfilename = sub_name.rpartition('.')
|
||||
|
||||
# NOTE: for tractor itself never include the last level
|
||||
# module key in the name such that something like: eg.
|
||||
# 'tractor.trionics._broadcast` only includes the first
|
||||
# 2 tokens in the (coloured) name part.
|
||||
if rname == 'tractor':
|
||||
sub_name = pkgpath
|
||||
|
||||
if _root_name in sub_name:
|
||||
duplicate, _, sub_name = sub_name.partition('.')
|
||||
|
||||
if not sub_name:
|
||||
log = rlog
|
||||
else:
|
||||
log = rlog.getChild(sub_name)
|
||||
|
||||
log = rlog.getChild(name)
|
||||
log.level = rlog.level
|
||||
|
||||
# add our actor-task aware adapter which will dynamically look up
|
||||
|
@ -254,3 +278,7 @@ def get_console_log(
|
|||
|
||||
def get_loglevel() -> str:
|
||||
return _default_loglevel
|
||||
|
||||
|
||||
# global module logger for tractor itself
|
||||
log = get_logger('tractor')
|
||||
|
|
|
@ -43,38 +43,62 @@ Built-in messaging patterns, types, APIs and helpers.
|
|||
# - https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type
|
||||
|
||||
from __future__ import annotations
|
||||
from inspect import isfunction
|
||||
from pkgutil import resolve_name
|
||||
|
||||
|
||||
class NamespacePath(str):
|
||||
'''
|
||||
A serializeable description of a (function) Python object location
|
||||
described by the target's module path and namespace key meant as
|
||||
a message-native "packet" to allows actors to point-and-load objects
|
||||
by absolute reference.
|
||||
A serializeable description of a (function) Python object
|
||||
location described by the target's module path and namespace
|
||||
key meant as a message-native "packet" to allows actors to
|
||||
point-and-load objects by an absolute ``str`` (and thus
|
||||
serializable) reference.
|
||||
|
||||
'''
|
||||
_ref: object = None
|
||||
_ref: object | type | None = None
|
||||
|
||||
def load_ref(self) -> object:
|
||||
def load_ref(self) -> object | type:
|
||||
if self._ref is None:
|
||||
self._ref = resolve_name(self)
|
||||
return self._ref
|
||||
|
||||
def to_tuple(
|
||||
self,
|
||||
@staticmethod
|
||||
def _mk_fqnp(ref: type | object) -> tuple[str, str]:
|
||||
'''
|
||||
Generate a minial ``str`` pair which describes a python
|
||||
object's namespace path and object/type name.
|
||||
|
||||
) -> tuple[str, str]:
|
||||
ref = self.load_ref()
|
||||
return ref.__module__, getattr(ref, '__name__', '')
|
||||
In more precise terms something like:
|
||||
- 'py.namespace.path:object_name',
|
||||
- eg.'tractor.msg:NamespacePath' will be the ``str`` form
|
||||
of THIS type XD
|
||||
|
||||
'''
|
||||
if (
|
||||
isinstance(ref, object)
|
||||
and not isfunction(ref)
|
||||
):
|
||||
name: str = type(ref).__name__
|
||||
else:
|
||||
name: str = getattr(ref, '__name__')
|
||||
|
||||
# fully qualified namespace path, tuple.
|
||||
fqnp: tuple[str, str] = (
|
||||
ref.__module__,
|
||||
name,
|
||||
)
|
||||
return fqnp
|
||||
|
||||
@classmethod
|
||||
def from_ref(
|
||||
cls,
|
||||
ref,
|
||||
ref: type | object,
|
||||
|
||||
) -> NamespacePath:
|
||||
return cls(':'.join(
|
||||
(ref.__module__,
|
||||
getattr(ref, '__name__', ''))
|
||||
))
|
||||
|
||||
fqnp: tuple[str, str] = cls._mk_fqnp(ref)
|
||||
return cls(':'.join(fqnp))
|
||||
|
||||
def to_tuple(self) -> tuple[str, str]:
|
||||
return self._mk_fqnp(self.load_ref())
|
||||
|
|
|
@ -70,6 +70,7 @@ async def _enter_and_wait(
|
|||
unwrapped: dict[int, T],
|
||||
all_entered: trio.Event,
|
||||
parent_exit: trio.Event,
|
||||
seed: int,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -80,7 +81,10 @@ async def _enter_and_wait(
|
|||
async with mngr as value:
|
||||
unwrapped[id(mngr)] = value
|
||||
|
||||
if all(unwrapped.values()):
|
||||
if all(
|
||||
val != seed
|
||||
for val in unwrapped.values()
|
||||
):
|
||||
all_entered.set()
|
||||
|
||||
await parent_exit.wait()
|
||||
|
@ -91,7 +95,13 @@ async def gather_contexts(
|
|||
|
||||
mngrs: Sequence[AsyncContextManager[T]],
|
||||
|
||||
) -> AsyncGenerator[tuple[Optional[T], ...], None]:
|
||||
) -> AsyncGenerator[
|
||||
tuple[
|
||||
T | None,
|
||||
...
|
||||
],
|
||||
None,
|
||||
]:
|
||||
'''
|
||||
Concurrently enter a sequence of async context managers, each in
|
||||
a separate ``trio`` task and deliver the unwrapped values in the
|
||||
|
@ -104,7 +114,11 @@ async def gather_contexts(
|
|||
entered and exited, and cancellation just works.
|
||||
|
||||
'''
|
||||
unwrapped: dict[int, Optional[T]] = {}.fromkeys(id(mngr) for mngr in mngrs)
|
||||
seed: int = id(mngrs)
|
||||
unwrapped: dict[int, T | None] = {}.fromkeys(
|
||||
(id(mngr) for mngr in mngrs),
|
||||
seed,
|
||||
)
|
||||
|
||||
all_entered = trio.Event()
|
||||
parent_exit = trio.Event()
|
||||
|
@ -116,8 +130,9 @@ async def gather_contexts(
|
|||
|
||||
if not mngrs:
|
||||
raise ValueError(
|
||||
'input mngrs is empty?\n'
|
||||
'Did try to use inline generator syntax?'
|
||||
'`.trionics.gather_contexts()` input mngrs is empty?\n'
|
||||
'Did try to use inline generator syntax?\n'
|
||||
'Use a non-lazy iterator or sequence type intead!'
|
||||
)
|
||||
|
||||
async with trio.open_nursery() as n:
|
||||
|
@ -128,6 +143,7 @@ async def gather_contexts(
|
|||
unwrapped,
|
||||
all_entered,
|
||||
parent_exit,
|
||||
seed,
|
||||
)
|
||||
|
||||
# deliver control once all managers have started up
|
||||
|
|
Loading…
Reference in New Issue