Compare commits

..

No commits in common. "e696caf8105770a6e294a504deecc5616f1b7177" and "10adf34be582a92f55587259845b09a5054f2265" have entirely different histories.

18 changed files with 1314 additions and 3013 deletions

View File

@ -1,8 +0,0 @@
# vim: ft=ini
# pytest.ini for tractor
[pytest]
# don't show frickin captured logs AGAIN in the report..
addopts = --show-capture='no'
log_cli = false
; minversion = 6.0

View File

@ -298,69 +298,44 @@ async def inf_streamer(
async with ( async with (
ctx.open_stream() as stream, ctx.open_stream() as stream,
trio.open_nursery() as tn, trio.open_nursery() as n,
): ):
async def close_stream_on_sentinel(): async def bail_on_sentinel():
async for msg in stream: async for msg in stream:
if msg == 'done': if msg == 'done':
print(
'streamer RXed "done" sentinel msg!\n'
'CLOSING `MsgStream`!'
)
await stream.aclose() await stream.aclose()
else: else:
print(f'streamer received {msg}') print(f'streamer received {msg}')
else:
print('streamer exited recv loop')
# start termination detector # start termination detector
tn.start_soon(close_stream_on_sentinel) n.start_soon(bail_on_sentinel)
cap: int = 10000 # so that we don't spin forever when bug.. for val in itertools.count():
for val in range(cap):
try: try:
print(f'streamer sending {val}')
await stream.send(val) await stream.send(val)
if val > cap:
raise RuntimeError(
'Streamer never cancelled by setinel?'
)
await trio.sleep(0.001)
# close out the stream gracefully
except trio.ClosedResourceError: except trio.ClosedResourceError:
print('msgstream closed on streamer side!') # close out the stream gracefully
assert stream.closed
break break
else:
raise RuntimeError(
'Streamer not cancelled before finished sending?'
)
print('streamer exited .open_streamer() block') print('terminating streamer')
def test_local_task_fanout_from_stream( def test_local_task_fanout_from_stream():
debug_mode: bool,
):
''' '''
Single stream with multiple local consumer tasks using the Single stream with multiple local consumer tasks using the
``MsgStream.subscribe()` api. ``MsgStream.subscribe()` api.
Ensure all tasks receive all values after stream completes Ensure all tasks receive all values after stream completes sending.
sending.
''' '''
consumers: int = 22 consumers = 22
async def main(): async def main():
counts = Counter() counts = Counter()
async with tractor.open_nursery( async with tractor.open_nursery() as tn:
debug_mode=debug_mode, p = await tn.start_actor(
) as tn:
p: tractor.Portal = await tn.start_actor(
'inf_streamer', 'inf_streamer',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -368,6 +343,7 @@ def test_local_task_fanout_from_stream(
p.open_context(inf_streamer) as (ctx, _), p.open_context(inf_streamer) as (ctx, _),
ctx.open_stream() as stream, ctx.open_stream() as stream,
): ):
async def pull_and_count(name: str): async def pull_and_count(name: str):
# name = trio.lowlevel.current_task().name # name = trio.lowlevel.current_task().name
async with stream.subscribe() as recver: async with stream.subscribe() as recver:
@ -376,7 +352,7 @@ def test_local_task_fanout_from_stream(
tractor.trionics.BroadcastReceiver tractor.trionics.BroadcastReceiver
) )
async for val in recver: async for val in recver:
print(f'bx {name} rx: {val}') # print(f'{name}: {val}')
counts[name] += 1 counts[name] += 1
print(f'{name} bcaster ended') print(f'{name} bcaster ended')
@ -386,14 +362,10 @@ def test_local_task_fanout_from_stream(
with trio.fail_after(3): with trio.fail_after(3):
async with trio.open_nursery() as nurse: async with trio.open_nursery() as nurse:
for i in range(consumers): for i in range(consumers):
nurse.start_soon( nurse.start_soon(pull_and_count, i)
pull_and_count,
i,
)
# delay to let bcast consumers pull msgs
await trio.sleep(0.5) await trio.sleep(0.5)
print('terminating nursery of bcast rxer consumers!') print('\nterminating')
await stream.send('done') await stream.send('done')
print('closed stream connection') print('closed stream connection')

View File

@ -48,13 +48,11 @@ async def do_nuthin():
ids=['no_args', 'unexpected_args'], ids=['no_args', 'unexpected_args'],
) )
def test_remote_error(reg_addr, args_err): def test_remote_error(reg_addr, args_err):
''' """Verify an error raised in a subactor that is propagated
Verify an error raised in a subactor that is propagated
to the parent nursery, contains the underlying boxed builtin to the parent nursery, contains the underlying boxed builtin
error type info and causes cancellation and reraising all the error type info and causes cancellation and reraising all the
way up the stack. way up the stack.
"""
'''
args, errtype = args_err args, errtype = args_err
async def main(): async def main():
@ -67,9 +65,7 @@ def test_remote_error(reg_addr, args_err):
# an exception group outside the nursery since the error # an exception group outside the nursery since the error
# here and the far end task error are one in the same? # here and the far end task error are one in the same?
portal = await nursery.run_in_actor( portal = await nursery.run_in_actor(
assert_err, assert_err, name='errorer', **args
name='errorer',
**args
) )
# get result(s) from main task # get result(s) from main task

View File

@ -5,10 +5,9 @@ Verify the we raise errors when streams are opened prior to
sync-opening a ``tractor.Context`` beforehand. sync-opening a ``tractor.Context`` beforehand.
''' '''
from contextlib import asynccontextmanager as acm # from contextlib import asynccontextmanager as acm
from itertools import count from itertools import count
import platform import platform
from pprint import pformat
from typing import ( from typing import (
Callable, Callable,
) )
@ -250,17 +249,6 @@ def test_simple_context(
trio.run(main) trio.run(main)
@acm
async def expect_ctxc(yay: bool) -> None:
if yay:
try:
yield
except ContextCancelled:
return
else:
yield
@pytest.mark.parametrize( @pytest.mark.parametrize(
'callee_returns_early', 'callee_returns_early',
[True, False], [True, False],
@ -291,60 +279,23 @@ def test_caller_cancels(
async def check_canceller( async def check_canceller(
ctx: Context, ctx: Context,
) -> None: ) -> None:
actor: Actor = current_actor() # should not raise yet return the remote
uid: tuple = actor.uid # context cancelled error.
if (
cancel_method == 'portal'
and not callee_returns_early
):
try:
res = await ctx.result()
assert 0, 'Portal cancel should raise!'
except ContextCancelled as ctxc:
assert ctx.chan._cancel_called
assert ctxc.canceller == uid
assert ctxc is ctx.maybe_error
# NOTE: should not ever raise even in the `ctx`
# case since self-cancellation should swallow the ctxc
# silently!
else:
res = await ctx.result() res = await ctx.result()
# we actually get a result
if callee_returns_early: if callee_returns_early:
assert res == 'yo' assert res == 'yo'
assert ctx.outcome is res
assert ctx.maybe_error is None
else: else:
err: Exception = ctx.outcome err = res
assert isinstance(err, ContextCancelled) assert isinstance(err, ContextCancelled)
assert ( assert (
tuple(err.canceller) tuple(err.canceller)
== ==
uid current_actor().uid
) )
assert (
err
is ctx.maybe_error
is ctx._remote_error
)
if le := ctx._local_error:
assert err is le
# else:
# TODO: what should this be then?
# not defined until block closes right?
#
# await tractor.pause()
# assert ctx._local_error is None
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
@ -354,16 +305,11 @@ def test_caller_cancels(
) )
timeout = 0.5 if not callee_returns_early else 2 timeout = 0.5 if not callee_returns_early else 2
with trio.fail_after(timeout): with trio.fail_after(timeout):
async with ( async with portal.open_context(
expect_ctxc(yay=cancel_method == 'portal'),
portal.open_context(
simple_setup_teardown, simple_setup_teardown,
data=10, data=10,
block_forever=not callee_returns_early, block_forever=not callee_returns_early,
) as (ctx, sent), ) as (ctx, sent):
):
if callee_returns_early: if callee_returns_early:
# ensure we block long enough before sending # ensure we block long enough before sending
@ -385,16 +331,6 @@ def test_caller_cancels(
if cancel_method != 'portal': if cancel_method != 'portal':
await portal.cancel_actor() await portal.cancel_actor()
# since the `.cancel_actor()` call just above
# will cause the `.open_context().__aexit__()` raise
# a ctxc which should in turn cause `ctx._scope` to
# catch any cancellation?
if (
not callee_returns_early
and cancel_method == 'portal'
):
assert ctx._scope.cancelled_caught
trio.run(main) trio.run(main)
@ -497,6 +433,7 @@ async def test_callee_closes_ctx_after_stream_open(
@tractor.context @tractor.context
async def expect_cancelled( async def expect_cancelled(
ctx: Context, ctx: Context,
) -> None: ) -> None:
@ -516,7 +453,7 @@ async def expect_cancelled(
raise raise
else: else:
assert 0, "callee wasn't cancelled !?" assert 0, "Wasn't cancelled!?"
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -535,8 +472,8 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
root: Actor = current_actor() root: Actor = current_actor()
portal = await an.start_actor( portal = await an.start_actor(
'ctx_cancelled', 'ctx_cancelled',
enable_modules=[__name__], enable_modules=[__name__],
@ -549,13 +486,11 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
await portal.run(assert_state, value=True) await portal.run(assert_state, value=True)
# call `ctx.cancel()` explicitly # call cancel explicitly
if use_ctx_cancel_method: if use_ctx_cancel_method:
await ctx.cancel() await ctx.cancel()
# NOTE: means the local side `ctx._scope` will
# have been cancelled by an ctxc ack and thus
# `._scope.cancelled_caught` should be set.
try: try:
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
async for msg in stream: async for msg in stream:
@ -584,35 +519,20 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
assert portal.channel.connected() assert portal.channel.connected()
# ctx is closed here # ctx is closed here
await portal.run( await portal.run(assert_state, value=False)
assert_state,
value=False,
)
else: else:
try: try:
with trio.fail_after(0.2): with trio.fail_after(0.2):
await ctx.result() await ctx.result()
assert 0, "Callee should have blocked!?" assert 0, "Callee should have blocked!?"
except trio.TooSlowError: except trio.TooSlowError:
# NO-OP -> since already called above # NO-OP -> since already called above
await ctx.cancel() await ctx.cancel()
# NOTE: local scope should have absorbed the cancellation since # local scope should have absorbed the cancellation
# in this case we call `ctx.cancel()` and the local assert ctx.cancelled_caught
# `._scope` gets `.cancel_called` on the ctxc ack. assert ctx._remote_error is ctx._local_error
if use_ctx_cancel_method:
assert ctx._scope.cancelled_caught
# rxed ctxc response from far end
assert ctx.cancel_acked
assert (
ctx._remote_error
is ctx._local_error
is ctx.maybe_error
is ctx.outcome
)
try: try:
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
@ -895,10 +815,7 @@ async def echo_back_sequence(
# NOTE: ensure that if the caller is expecting to cancel this task # NOTE: ensure that if the caller is expecting to cancel this task
# that we stay echoing much longer then they are so we don't # that we stay echoing much longer then they are so we don't
# return early instead of receive the cancel msg. # return early instead of receive the cancel msg.
total_batches: int = ( total_batches: int = 1000 if wait_for_cancel else 6
1000 if wait_for_cancel
else 6
)
await ctx.started() await ctx.started()
# await tractor.breakpoint() # await tractor.breakpoint()
@ -917,23 +834,8 @@ async def echo_back_sequence(
) )
seq = list(seq) # bleh, msgpack sometimes ain't decoded right seq = list(seq) # bleh, msgpack sometimes ain't decoded right
for i in range(total_batches): for _ in range(total_batches):
print(f'starting new stream batch {i} iter in child')
batch = [] batch = []
# EoC case, delay a little instead of hot
# iter-stopping (since apparently py3.11+ can do that
# faster then a ctxc can be sent) on the async for
# loop when child was requested to ctxc.
if (
stream.closed
or
ctx.cancel_called
):
print('child stream already closed!?!')
await trio.sleep(0.05)
continue
async for msg in stream: async for msg in stream:
batch.append(msg) batch.append(msg)
if batch == seq: if batch == seq:
@ -944,18 +846,15 @@ async def echo_back_sequence(
print('callee waiting on next') print('callee waiting on next')
print(f'callee echoing back latest batch\n{batch}')
for msg in batch: for msg in batch:
print(f'callee sending msg\n{msg}') print(f'callee sending {msg}')
await stream.send(msg) await stream.send(msg)
try:
return 'yo'
finally:
print( print(
'exiting callee with context:\n' 'EXITING CALLEEE:\n'
f'{pformat(ctx)}\n' f'{ctx.canceller}'
) )
return 'yo'
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -1017,8 +916,8 @@ def test_maybe_allow_overruns_stream(
wait_for_cancel=cancel_ctx, wait_for_cancel=cancel_ctx,
be_slow=(slow_side == 'child'), be_slow=(slow_side == 'child'),
allow_overruns_side=allow_overruns_side, allow_overruns_side=allow_overruns_side,
) as (ctx, sent): ) as (ctx, sent):
assert sent is None assert sent is None
async with ctx.open_stream( async with ctx.open_stream(
@ -1046,10 +945,10 @@ def test_maybe_allow_overruns_stream(
if cancel_ctx: if cancel_ctx:
# cancel the remote task # cancel the remote task
print('Requesting `ctx.cancel()` in parent!') print('sending root side cancel')
await ctx.cancel() await ctx.cancel()
res: str|ContextCancelled = await ctx.result() res = await ctx.result()
if cancel_ctx: if cancel_ctx:
assert isinstance(res, ContextCancelled) assert isinstance(res, ContextCancelled)

View File

@ -10,9 +10,6 @@ import pytest
import trio import trio
import tractor import tractor
from tractor import ( # typing from tractor import ( # typing
Actor,
current_actor,
open_nursery,
Portal, Portal,
Context, Context,
ContextCancelled, ContextCancelled,
@ -241,23 +238,19 @@ async def stream_from_peer(
# caller peer should not be the cancel requester # caller peer should not be the cancel requester
assert not ctx.cancel_called assert not ctx.cancel_called
assert not ctx.cancel_acked
# XXX can NEVER BE TRUE since `._invoke` only # XXX can never be true since `._invoke` only
# sets this AFTER the nursery block this task # sets this AFTER the nursery block this task
# was started in, exits. # was started in, exits.
assert not ctx._scope.cancelled_caught assert not ctx.cancelled_caught
# we never requested cancellation, it was the 'canceller' # we never requested cancellation
# peer.
assert not peer_ctx.cancel_called assert not peer_ctx.cancel_called
assert not peer_ctx.cancel_acked
# the `.open_context()` exit definitely caught # the `.open_context()` exit definitely caught
# a cancellation in the internal `Context._scope` since # a cancellation in the internal `Context._scope` since
# likely the runtime called `_deliver_msg()` after # likely the runtime called `_deliver_msg()` after
# receiving the remote error from the streaming task. # receiving the remote error from the streaming task.
assert not peer_ctx._scope.cancelled_caught assert peer_ctx.cancelled_caught
# TODO / NOTE `.canceller` won't have been set yet # TODO / NOTE `.canceller` won't have been set yet
# here because that machinery is inside # here because that machinery is inside
@ -266,8 +259,6 @@ async def stream_from_peer(
# checkpoint) that cancellation was due to # checkpoint) that cancellation was due to
# a remote, we COULD assert this here..see, # a remote, we COULD assert this here..see,
# https://github.com/goodboy/tractor/issues/368 # https://github.com/goodboy/tractor/issues/368
#
# assert 'canceller' in ctx.canceller
# root/parent actor task should NEVER HAVE cancelled us! # root/parent actor task should NEVER HAVE cancelled us!
assert not ctx.canceller assert not ctx.canceller
@ -365,7 +356,8 @@ def test_peer_canceller(
'just_caller', # but i just met her? 'just_caller', # but i just met her?
enable_modules=[__name__], enable_modules=[__name__],
) )
root: Actor = current_actor()
root = tractor.current_actor()
try: try:
async with ( async with (
@ -403,8 +395,8 @@ def test_peer_canceller(
# not request the sleeper cancellation ;) # not request the sleeper cancellation ;)
except ContextCancelled as ctxerr: except ContextCancelled as ctxerr:
print( print(
'CAUGHT REMOTE CONTEXT CANCEL\n\n' 'CAUGHT REMOTE CONTEXT CANCEL FOM\n'
f'{ctxerr}\n' f'{ctxerr}'
) )
# canceller and caller peers should not # canceller and caller peers should not
@ -412,9 +404,6 @@ def test_peer_canceller(
assert canceller_ctx.canceller is None assert canceller_ctx.canceller is None
assert caller_ctx.canceller is None assert caller_ctx.canceller is None
# we were not the actor, our peer was
assert not sleeper_ctx.cancel_acked
assert ctxerr.canceller[0] == 'canceller' assert ctxerr.canceller[0] == 'canceller'
# XXX NOTE XXX: since THIS `ContextCancelled` # XXX NOTE XXX: since THIS `ContextCancelled`
@ -422,13 +411,11 @@ def test_peer_canceller(
# `sleeper.open_context().__aexit__()` this # `sleeper.open_context().__aexit__()` this
# value is not yet set, however outside this # value is not yet set, however outside this
# block it should be. # block it should be.
assert not sleeper_ctx._scope.cancelled_caught assert not sleeper_ctx.cancelled_caught
# CASE_1: error-during-ctxc-handling,
if error_during_ctxerr_handling: if error_during_ctxerr_handling:
raise RuntimeError('Simulated error during teardown') raise RuntimeError('Simulated error during teardown')
# CASE_2: standard teardown inside in `.open_context()` block
raise raise
# XXX SHOULD NEVER EVER GET HERE XXX # XXX SHOULD NEVER EVER GET HERE XXX
@ -449,6 +436,7 @@ def test_peer_canceller(
else: else:
pytest.fail( pytest.fail(
'did not rx ctxc ?!?\n\n' 'did not rx ctxc ?!?\n\n'
f'{ctxs}\n' f'{ctxs}\n'
) )
@ -459,48 +447,21 @@ def test_peer_canceller(
_loc_err = loc_err _loc_err = loc_err
# NOTE: the main state to check on `Context` is: # NOTE: the main state to check on `Context` is:
# - `.cancelled_caught` (maps to nursery cs)
# - `.cancel_called` (bool of whether this side # - `.cancel_called` (bool of whether this side
# requested) # requested)
# - `.cancel_acked` (bool of whether a ctxc
# response was received due to cancel req).
# - `.maybe_error` (highest prio error to raise
# locally)
# - `.outcome` (final error or result value)
# - `.canceller` (uid of cancel-causing actor-task) # - `.canceller` (uid of cancel-causing actor-task)
# - `._remote_error` (any `RemoteActorError` # - `._remote_error` (any `RemoteActorError`
# instance from other side of context) # instance from other side of context)
# - `._local_error` (any error caught inside the
# `.open_context()` block).
#
# XXX: Deprecated and internal only
# - `.cancelled_caught` (maps to nursery cs)
# - now just use `._scope.cancelled_caught`
# since it maps to the internal (maps to nursery cs)
#
# TODO: are we really planning to use this tho? # TODO: are we really planning to use this tho?
# - `._cancel_msg` (any msg that caused the # - `._cancel_msg` (any msg that caused the
# cancel) # cancel)
# CASE_1: error-during-ctxc-handling, # CASE: error raised during handling of
# - far end cancels due to peer 'canceller', # `ContextCancelled` inside `.open_context()`
# - `ContextCancelled` relayed to this scope, # block
# - inside `.open_context()` ctxc is caught and
# a rte raised instead
#
# => block should raise the rte but all peers
# should be cancelled by US.
#
if error_during_ctxerr_handling: if error_during_ctxerr_handling:
assert isinstance(loc_err, RuntimeError) assert isinstance(loc_err, RuntimeError)
print(f'_loc_err: {_loc_err}\n')
# assert sleeper_ctx._local_error is _loc_err
# assert sleeper_ctx._local_error is _loc_err
assert not (
loc_err
is sleeper_ctx.maybe_error
is sleeper_ctx.outcome
is sleeper_ctx._remote_error
)
# NOTE: this root actor task should have # NOTE: this root actor task should have
# called `Context.cancel()` on the # called `Context.cancel()` on the
@ -534,25 +495,7 @@ def test_peer_canceller(
root.uid root.uid
) )
# since the sleeper errors while handling a # CASE: standard teardown inside in `.open_context()` block
# peer-cancelled (by ctxc) scenario, we expect
# that the `.open_context()` block DOES call
# `.cancel() (despite in this test case it
# being unecessary).
assert (
sleeper_ctx.cancel_called
and
not sleeper_ctx.cancel_acked
)
# CASE_2: standard teardown inside in `.open_context()` block
# - far end cancels due to peer 'canceller',
# - `ContextCancelled` relayed to this scope and
# raised locally without any raise-during-handle,
#
# => inside `.open_context()` ctxc is raised and
# propagated
#
else: else:
assert isinstance(loc_err, ContextCancelled) assert isinstance(loc_err, ContextCancelled)
assert loc_err.canceller == sleeper_ctx.canceller assert loc_err.canceller == sleeper_ctx.canceller
@ -566,42 +509,24 @@ def test_peer_canceller(
# the sleeper's remote error is the error bubbled # the sleeper's remote error is the error bubbled
# out of the context-stack above! # out of the context-stack above!
re = sleeper_ctx.outcome re = sleeper_ctx._remote_error
assert ( assert re is loc_err
re is loc_err
is sleeper_ctx.maybe_error
is sleeper_ctx._remote_error
)
for ctx in ctxs: for ctx in ctxs:
re: BaseException | None = ctx._remote_error re: BaseException | None = ctx._remote_error
re: BaseException|None = ctx.outcome assert re
assert (
re and
(
re is ctx.maybe_error
is ctx._remote_error
)
)
le: trio.MultiError = ctx._local_error
assert (
le
and ctx._local_error
)
# root doesn't cancel sleeper since it's # root doesn't cancel sleeper since it's
# cancelled by its peer. # cancelled by its peer.
if ctx is sleeper_ctx: if ctx is sleeper_ctx:
assert not ctx.cancel_called assert not ctx.cancel_called
assert not ctx.cancel_acked
# since sleeper_ctx.result() IS called # since sleeper_ctx.result() IS called
# above we should have (silently) # above we should have (silently)
# absorbed the corresponding # absorbed the corresponding
# `ContextCancelled` for it and thus # `ContextCancelled` for it and thus
# the logic inside `.cancelled_caught` # the logic inside `.cancelled_caught`
# should trigger! # should trigger!
assert ctx._scope.cancelled_caught assert ctx.cancelled_caught
elif ctx is caller_ctx: elif ctx is caller_ctx:
# since its context was remotely # since its context was remotely
@ -610,33 +535,15 @@ def test_peer_canceller(
# done by the peer and also we never # done by the peer and also we never
assert ctx.cancel_called assert ctx.cancel_called
# TODO: figure out the details of this..? # TODO: figure out the details of
# this..
# if you look the `._local_error` here # if you look the `._local_error` here
# is a multi of ctxc + 2 Cancelleds? # is a multi of ctxc + 2 Cancelleds?
# assert not ctx.cancelled_caught # assert not ctx.cancelled_caught
elif ctx is canceller_ctx:
# XXX NOTE XXX: ONLY the canceller
# will get a self-cancelled outcome
# whilst everyone else gets
# a peer-caused cancellation!
#
# TODO: really we should avoid calling
# .cancel() whenever an interpeer
# cancel takes place since each
# reception of a ctxc
assert (
ctx.cancel_called
and ctx.cancel_acked
)
assert not ctx._scope.cancelled_caught
else: else:
pytest.fail( assert ctx.cancel_called
'Uhh wut ctx is this?\n' assert not ctx.cancelled_caught
f'{ctx}\n'
)
# TODO: do we even need this flag? # TODO: do we even need this flag?
# -> each context should have received # -> each context should have received
@ -652,24 +559,14 @@ def test_peer_canceller(
# `Context.cancel()` SHOULD NOT have been # `Context.cancel()` SHOULD NOT have been
# called inside # called inside
# `Portal.open_context().__aexit__()`. # `Portal.open_context().__aexit__()`.
assert not ( assert not sleeper_ctx.cancel_called
sleeper_ctx.cancel_called
or
sleeper_ctx.cancel_acked
)
# XXX NOTE XXX: and see matching comment above but, # XXX NOTE XXX: and see matching comment above but,
# the `._scope` is only set by `trio` AFTER the # this flag is set only AFTER the `.open_context()`
# `.open_context()` block has exited and should be # has exited and should be set in both outcomes
# set in both outcomes including the case where # including the case where ctx-cancel handling
# ctx-cancel handling itself errors. # itself errors.
assert sleeper_ctx._scope.cancelled_caught assert sleeper_ctx.cancelled_caught
assert _loc_err is sleeper_ctx._local_error
assert (
sleeper_ctx.outcome
is sleeper_ctx.maybe_error
is sleeper_ctx._remote_error
)
raise # always to ensure teardown raise # always to ensure teardown
@ -685,315 +582,12 @@ def test_peer_canceller(
assert excinfo.value.canceller[0] == 'canceller' assert excinfo.value.canceller[0] == 'canceller'
@tractor.context def test_client_tree_spawns_and_cancels_service_subactor():
async def basic_echo_server( ...
ctx: Context, # TODO: test for the modden `mod wks open piker` bug!
peer_name: str = 'stepbro',
) -> None:
'''
Just the simplest `MsgStream` echo server which resays what
you told it but with its uid in front ;)
'''
actor: Actor = tractor.current_actor()
uid: tuple = actor.uid
await ctx.started(uid)
async with ctx.open_stream() as ipc:
async for msg in ipc:
# repack msg pair with our uid
# as first element.
(
client_uid,
i,
) = msg
resp: tuple = (
uid,
i,
)
# OOF! looks like my runtime-error is causing a lockup
# assert 0
await ipc.send(resp)
@tractor.context
async def serve_subactors(
ctx: Context,
peer_name: str,
) -> None:
async with open_nursery() as an:
await ctx.started(peer_name)
async with ctx.open_stream() as reqs:
async for msg in reqs:
peer_name: str = msg
peer: Portal = await an.start_actor(
name=peer_name,
enable_modules=[__name__],
)
print(
'Spawning new subactor\n'
f'{peer_name}\n'
f'|_{peer}\n'
)
await reqs.send((
peer.chan.uid,
peer.chan.raddr,
))
print('Spawner exiting spawn serve loop!')
@tractor.context
async def client_req_subactor(
ctx: Context,
peer_name: str,
# used to simulate a user causing an error to be raised
# directly in thread (like a KBI) to better replicate the
# case where a `modden` CLI client would hang afer requesting
# a `Context.cancel()` to `bigd`'s wks spawner.
reraise_on_cancel: str|None = None,
) -> None:
# TODO: other cases to do with sub lifetimes:
# -[ ] test that we can have the server spawn a sub
# that lives longer then ctx with this client.
# -[ ] test that
# open ctx with peer spawn server and ask it to spawn a little
# bro which we'll then connect and stream with.
async with (
tractor.find_actor(
name='spawn_server',
raise_on_none=True,
# TODO: we should be isolating this from other runs!
# => ideally so we can eventually use something like
# `pytest-xdist` Bo
# registry_addrs=bigd._reg_addrs,
) as spawner,
spawner.open_context(
serve_subactors,
peer_name=peer_name,
) as (spawner_ctx, first),
):
assert first == peer_name
await ctx.started(
'yup i had brudder',
)
async with spawner_ctx.open_stream() as reqs:
# send single spawn request to the server
await reqs.send(peer_name)
with trio.fail_after(3):
(
sub_uid,
sub_raddr,
) = await reqs.receive()
await tell_little_bro(
actor_name=sub_uid[0],
caller='client',
)
# TODO: test different scope-layers of
# cancellation?
# with trio.CancelScope() as cs:
try:
await trio.sleep_forever()
# TODO: would be super nice to have a special injected
# cancel type here (maybe just our ctxc) but using
# some native mechanism in `trio` :p
except (
trio.Cancelled
) as err:
_err = err
if reraise_on_cancel:
errtype = globals()['__builtins__'][reraise_on_cancel]
assert errtype
to_reraise: BaseException = errtype()
print(f'client re-raising on cancel: {repr(to_reraise)}')
raise err
raise
# if cs.cancelled_caught:
# print('client handling expected KBI!')
# await ctx.
# await trio.sleep(
# await tractor.pause()
# await spawner_ctx.cancel()
# cancel spawned sub-actor directly?
# await sub_ctx.cancel()
# maybe cancel runtime?
# await sub.cancel_actor()
async def tell_little_bro(
actor_name: str,
caller: str = ''
):
# contact target actor, do a stream dialog.
async with (
tractor.wait_for_actor(
name=actor_name
) as lb,
lb.open_context(
basic_echo_server,
) as (sub_ctx, first),
sub_ctx.open_stream(
basic_echo_server,
) as echo_ipc,
):
actor: Actor = current_actor()
uid: tuple = actor.uid
for i in range(100):
msg: tuple = (
uid,
i,
)
await echo_ipc.send(msg)
resp = await echo_ipc.receive()
print(
f'{caller} => {actor_name}: {msg}\n'
f'{caller} <= {actor_name}: {resp}\n'
)
(
sub_uid,
_i,
) = resp
assert sub_uid != uid
assert _i == i
@pytest.mark.parametrize(
'raise_client_error',
[None, 'KeyboardInterrupt'],
)
def test_peer_spawns_and_cancels_service_subactor(
debug_mode: bool,
raise_client_error: str,
):
# NOTE: this tests for the modden `mod wks open piker` bug
# discovered as part of implementing workspace ctx
# open-.pause()-ctx.cancel() as part of the CLI..
# -> start actor-tree (server) that offers sub-actor spawns via # -> start actor-tree (server) that offers sub-actor spawns via
# context API # context API
# -> start another full actor-tree (client) which requests to the first to # -> start another full actor-tree (client) which requests to the first to
# spawn over its `@context` ep / api. # spawn over its `@context` ep / api.
# -> client actor cancels the context and should exit gracefully # -> client actor cancels the context and should exit gracefully
# and the server's spawned child should cancel and terminate! # and the server's spawned child should cancel and terminate!
peer_name: str = 'little_bro'
async def main():
async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this.
debug_mode=debug_mode,
) as an:
server: Portal = await an.start_actor(
(server_name := 'spawn_server'),
enable_modules=[__name__],
)
print(f'Spawned `{server_name}`')
client: Portal = await an.start_actor(
client_name := 'client',
enable_modules=[__name__],
)
print(f'Spawned `{client_name}`')
try:
async with (
server.open_context(
serve_subactors,
peer_name=peer_name,
) as (spawn_ctx, first),
client.open_context(
client_req_subactor,
peer_name=peer_name,
reraise_on_cancel=raise_client_error,
) as (client_ctx, client_says),
):
print(
f'Server says: {first}\n'
f'Client says: {client_says}\n'
)
# attach to client-requested-to-spawn
# (grandchild of this root actor) "little_bro"
# and ensure we can also use it as an echo
# server.
async with tractor.wait_for_actor(
name=peer_name,
) as sub:
assert sub
print(
'Sub-spawn came online\n'
f'portal: {sub}\n'
f'.uid: {sub.actor.uid}\n'
f'chan.raddr: {sub.chan.raddr}\n'
)
await tell_little_bro(
actor_name=peer_name,
caller='root',
)
# signal client to raise a KBI
await client_ctx.cancel()
print('root cancelled client, checking that sub-spawn is down')
async with tractor.find_actor(
name=peer_name,
) as sub:
assert not sub
print('root cancelling server/client sub-actors')
# await tractor.pause()
res = await client_ctx.result(hide_tb=False)
assert isinstance(res, ContextCancelled)
assert client_ctx.cancel_acked
assert res.canceller == current_actor().uid
await spawn_ctx.cancel()
# await server.cancel_actor()
# since we called `.cancel_actor()`, `.cancel_ack`
# will not be set on the ctx bc `ctx.cancel()` was not
# called directly fot this confext.
except ContextCancelled as ctxc:
print('caught ctxc from contexts!')
assert ctxc.canceller == current_actor().uid
assert ctxc is spawn_ctx.outcome
assert ctxc is spawn_ctx.maybe_error
raise
# assert spawn_ctx.cancel_acked
assert spawn_ctx.cancel_acked
assert client_ctx.cancel_acked
await client.cancel_actor()
await server.cancel_actor()
# WOA WOA WOA! we need this to close..!!!??
# that's super bad XD
# TODO: why isn't this working!?!?
# we're now outside the `.open_context()` block so
# the internal `Context._scope: CancelScope` should be
# gracefully "closed" ;)
# assert spawn_ctx.cancelled_caught
trio.run(main)

File diff suppressed because it is too large Load Diff

View File

@ -106,25 +106,25 @@ def _trio_main(
Entry point for a `trio_run_in_process` subactor. Entry point for a `trio_run_in_process` subactor.
''' '''
log.info(f"Started new trio process for {actor.uid}")
if actor.loglevel is not None:
log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
get_console_log(actor.loglevel)
log.info(
f"Started {actor.uid}")
_state._current_actor = actor _state._current_actor = actor
log.debug(f"parent_addr is {parent_addr}")
trio_main = partial( trio_main = partial(
async_main, async_main,
actor, actor,
parent_addr=parent_addr parent_addr=parent_addr
) )
if actor.loglevel is not None:
get_console_log(actor.loglevel)
import os
log.info(
'Started new trio process:\n'
f'|_{actor}\n'
f' uid: {actor.uid}\n'
f' pid: {os.getpid()}\n'
f' parent_addr: {parent_addr}\n'
f' loglevel: {actor.loglevel}\n'
)
try: try:
if infect_asyncio: if infect_asyncio:
actor._infected_aio = True actor._infected_aio = True
@ -133,7 +133,7 @@ def _trio_main(
trio.run(trio_main) trio.run(trio_main)
except KeyboardInterrupt: except KeyboardInterrupt:
log.cancel( log.cancel(
f'@{actor.uid} received KBI' f'Actor@{actor.uid} received KBI'
) )
finally: finally:

View File

@ -27,7 +27,6 @@ from typing import (
Type, Type,
TYPE_CHECKING, TYPE_CHECKING,
) )
import textwrap
import traceback import traceback
import exceptiongroup as eg import exceptiongroup as eg
@ -38,9 +37,8 @@ from .log import get_logger
if TYPE_CHECKING: if TYPE_CHECKING:
from ._context import Context from ._context import Context
from .log import StackLevelAdapter
from ._stream import MsgStream from ._stream import MsgStream
from ._ipc import Channel from .log import StackLevelAdapter
log = get_logger('tractor') log = get_logger('tractor')
@ -51,25 +49,6 @@ class ActorFailure(Exception):
"General actor failure" "General actor failure"
class InternalError(RuntimeError):
'''
Entirely unexpected internal machinery error indicating
a completely invalid state or interface.
'''
_body_fields: list[str] = [
'src_actor_uid',
'canceller',
'sender',
]
_msgdata_keys: list[str] = [
'type_str',
] + _body_fields
# TODO: rename to just `RemoteError`? # TODO: rename to just `RemoteError`?
class RemoteActorError(Exception): class RemoteActorError(Exception):
''' '''
@ -81,10 +60,6 @@ class RemoteActorError(Exception):
a special "error" IPC msg sent by some remote actor-runtime. a special "error" IPC msg sent by some remote actor-runtime.
''' '''
reprol_fields: list[str] = [
'src_actor_uid',
]
def __init__( def __init__(
self, self,
message: str, message: str,
@ -102,83 +77,24 @@ class RemoteActorError(Exception):
# - .remote_type # - .remote_type
# also pertains to our long long oustanding issue XD # also pertains to our long long oustanding issue XD
# https://github.com/goodboy/tractor/issues/5 # https://github.com/goodboy/tractor/issues/5
self.boxed_type: str = suberror_type self.type: str = suberror_type
self.msgdata: dict[str, Any] = msgdata self.msgdata: dict[str, Any] = msgdata
@property
def type(self) -> str:
return self.boxed_type
@property
def type_str(self) -> str:
return str(type(self.boxed_type).__name__)
@property @property
def src_actor_uid(self) -> tuple[str, str] | None: def src_actor_uid(self) -> tuple[str, str] | None:
return self.msgdata.get('src_actor_uid') return self.msgdata.get('src_actor_uid')
@property
def tb_str(
self,
indent: str = ' '*3,
) -> str:
if remote_tb := self.msgdata.get('tb_str'):
return textwrap.indent(
remote_tb,
prefix=indent,
)
return ''
def reprol(self) -> str:
'''
Represent this error for "one line" display, like in
a field of our `Context.__repr__()` output.
'''
_repr: str = f'{type(self).__name__}('
for key in self.reprol_fields:
val: Any|None = self.msgdata.get(key)
if val:
_repr += f'{key}={repr(val)} '
return _repr
def __repr__(self) -> str: def __repr__(self) -> str:
if remote_tb := self.msgdata.get('tb_str'):
fields: str = '' pformat(remote_tb)
for key in _body_fields:
val: str|None = self.msgdata.get(key)
if val:
fields += f'{key}={val}\n'
fields: str = textwrap.indent(
fields,
# prefix=' '*2,
prefix=' |_',
)
indent: str = ''*1
body: str = (
f'{fields}'
f' |\n'
f' ------ - ------\n\n'
f'{self.tb_str}\n'
f' ------ - ------\n'
f' _|\n'
)
# f'|\n'
# f' |\n'
if indent:
body: str = textwrap.indent(
body,
prefix=indent,
)
return ( return (
f'<{type(self).__name__}(\n' f'{type(self).__name__}(\n'
f'{body}' f'msgdata={pformat(self.msgdata)}\n'
')>' ')'
) )
return super().__repr__()
# TODO: local recontruction of remote exception deats # TODO: local recontruction of remote exception deats
# def unbox(self) -> BaseException: # def unbox(self) -> BaseException:
# ... # ...
@ -186,9 +102,8 @@ class RemoteActorError(Exception):
class InternalActorError(RemoteActorError): class InternalActorError(RemoteActorError):
''' '''
(Remote) internal `tractor` error indicating failure of some Remote internal ``tractor`` error indicating
primitive, machinery state or lowlevel task that should never failure of some primitive or machinery.
occur.
''' '''
@ -199,9 +114,6 @@ class ContextCancelled(RemoteActorError):
``Portal.cancel_actor()`` or ``Context.cancel()``. ``Portal.cancel_actor()`` or ``Context.cancel()``.
''' '''
reprol_fields: list[str] = [
'canceller',
]
@property @property
def canceller(self) -> tuple[str, str]|None: def canceller(self) -> tuple[str, str]|None:
''' '''
@ -233,9 +145,6 @@ class ContextCancelled(RemoteActorError):
f'{self}' f'{self}'
) )
# to make `.__repr__()` work uniformly
# src_actor_uid = canceller
class TransportClosed(trio.ClosedResourceError): class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use" "Underlying channel transport was closed prior to use"
@ -257,9 +166,6 @@ class StreamOverrun(
RemoteActorError, RemoteActorError,
trio.TooSlowError, trio.TooSlowError,
): ):
reprol_fields: list[str] = [
'sender',
]
''' '''
This stream was overrun by sender This stream was overrun by sender
@ -307,7 +213,6 @@ def pack_error(
] = { ] = {
'tb_str': tb_str, 'tb_str': tb_str,
'type_str': type(exc).__name__, 'type_str': type(exc).__name__,
'boxed_type': type(exc).__name__,
'src_actor_uid': current_actor().uid, 'src_actor_uid': current_actor().uid,
} }
@ -333,8 +238,8 @@ def unpack_error(
msg: dict[str, Any], msg: dict[str, Any],
chan: Channel|None = None, chan=None,
box_type: RemoteActorError = RemoteActorError, err_type=RemoteActorError,
hide_tb: bool = True, hide_tb: bool = True,
@ -359,15 +264,12 @@ def unpack_error(
# retrieve the remote error's msg encoded details # retrieve the remote error's msg encoded details
tb_str: str = error_dict.get('tb_str', '') tb_str: str = error_dict.get('tb_str', '')
message: str = f'{chan.uid}\n' + tb_str message: str = f'{chan.uid}\n' + tb_str
type_name: str = ( type_name: str = error_dict['type_str']
error_dict.get('type_str')
or error_dict['boxed_type']
)
suberror_type: Type[BaseException] = Exception suberror_type: Type[BaseException] = Exception
if type_name == 'ContextCancelled': if type_name == 'ContextCancelled':
box_type = ContextCancelled err_type = ContextCancelled
suberror_type = box_type suberror_type = err_type
else: # try to lookup a suitable local error type else: # try to lookup a suitable local error type
for ns in [ for ns in [
@ -383,7 +285,7 @@ def unpack_error(
): ):
break break
exc = box_type( exc = err_type(
message, message,
suberror_type=suberror_type, suberror_type=suberror_type,
@ -469,8 +371,6 @@ def _raise_from_no_key_in_msg(
) from None ) from None
# `MsgStream` termination msg. # `MsgStream` termination msg.
# TODO: does it make more sense to pack
# the stream._eoc outside this in the calleer always?
elif ( elif (
msg.get('stop') msg.get('stop')
or ( or (

View File

@ -30,6 +30,7 @@ import typing
from typing import ( from typing import (
Any, Any,
runtime_checkable, runtime_checkable,
Optional,
Protocol, Protocol,
Type, Type,
TypeVar, TypeVar,
@ -112,13 +113,6 @@ class MsgpackTCPStream(MsgTransport):
using the ``msgspec`` codec lib. using the ``msgspec`` codec lib.
''' '''
layer_key: int = 4
name_key: str = 'tcp'
# TODO: better naming for this?
# -[ ] check how libp2p does naming for such things?
codec_key: str = 'msgpack'
def __init__( def __init__(
self, self,
stream: trio.SocketStream, stream: trio.SocketStream,
@ -274,7 +268,7 @@ class Channel:
def __init__( def __init__(
self, self,
destaddr: tuple[str, int]|None, destaddr: Optional[tuple[str, int]],
msg_transport_type_key: tuple[str, str] = ('msgpack', 'tcp'), msg_transport_type_key: tuple[str, str] = ('msgpack', 'tcp'),
@ -292,14 +286,14 @@ class Channel:
# Either created in ``.connect()`` or passed in by # Either created in ``.connect()`` or passed in by
# user in ``.from_stream()``. # user in ``.from_stream()``.
self._stream: trio.SocketStream|None = None self._stream: Optional[trio.SocketStream] = None
self._transport: MsgTransport|None = None self.msgstream: Optional[MsgTransport] = None
# set after handshake - always uid of far end # set after handshake - always uid of far end
self.uid: tuple[str, str]|None = None self.uid: Optional[tuple[str, str]] = None
self._agen = self._aiter_recv() self._agen = self._aiter_recv()
self._exc: Exception|None = None # set if far end actor errors self._exc: Optional[Exception] = None # set if far end actor errors
self._closed: bool = False self._closed: bool = False
# flag set by ``Portal.cancel_actor()`` indicating remote # flag set by ``Portal.cancel_actor()`` indicating remote
@ -307,15 +301,6 @@ class Channel:
# runtime. # runtime.
self._cancel_called: bool = False self._cancel_called: bool = False
@property
def msgstream(self) -> MsgTransport:
log.info('`Channel.msgstream` is an old name, use `._transport`')
return self._transport
@property
def transport(self) -> MsgTransport:
return self._transport
@classmethod @classmethod
def from_stream( def from_stream(
cls, cls,
@ -325,44 +310,40 @@ class Channel:
) -> Channel: ) -> Channel:
src, dst = get_stream_addrs(stream) src, dst = get_stream_addrs(stream)
chan = Channel( chan = Channel(destaddr=dst, **kwargs)
destaddr=dst,
**kwargs,
)
# set immediately here from provided instance # set immediately here from provided instance
chan._stream: trio.SocketStream = stream chan._stream = stream
chan.set_msg_transport(stream) chan.set_msg_transport(stream)
return chan return chan
def set_msg_transport( def set_msg_transport(
self, self,
stream: trio.SocketStream, stream: trio.SocketStream,
type_key: tuple[str, str]|None = None, type_key: Optional[tuple[str, str]] = None,
) -> MsgTransport: ) -> MsgTransport:
type_key = type_key or self._transport_key type_key = type_key or self._transport_key
self._transport = get_msg_transport(type_key)(stream) self.msgstream = get_msg_transport(type_key)(stream)
return self._transport return self.msgstream
def __repr__(self) -> str: def __repr__(self) -> str:
if not self._transport: if self.msgstream:
return '<Channel with inactive transport?>'
return repr( return repr(
self._transport.stream.socket._sock self.msgstream.stream.socket._sock
).replace( # type: ignore ).replace( # type: ignore
"socket.socket", "socket.socket",
"Channel", "Channel",
) )
return object.__repr__(self)
@property @property
def laddr(self) -> tuple[str, int]|None: def laddr(self) -> Optional[tuple[str, int]]:
return self._transport.laddr if self._transport else None return self.msgstream.laddr if self.msgstream else None
@property @property
def raddr(self) -> tuple[str, int]|None: def raddr(self) -> Optional[tuple[str, int]]:
return self._transport.raddr if self._transport else None return self.msgstream.raddr if self.msgstream else None
async def connect( async def connect(
self, self,
@ -381,12 +362,12 @@ class Channel:
*destaddr, *destaddr,
**kwargs **kwargs
) )
transport = self.set_msg_transport(stream) msgstream = self.set_msg_transport(stream)
log.transport( log.transport(
f'Opened channel[{type(transport)}]: {self.laddr} -> {self.raddr}' f'Opened channel[{type(msgstream)}]: {self.laddr} -> {self.raddr}'
) )
return transport return msgstream
async def send(self, item: Any) -> None: async def send(self, item: Any) -> None:
@ -394,16 +375,16 @@ class Channel:
'=> send IPC msg:\n\n' '=> send IPC msg:\n\n'
f'{pformat(item)}\n' f'{pformat(item)}\n'
) # type: ignore ) # type: ignore
assert self._transport assert self.msgstream
await self._transport.send(item) await self.msgstream.send(item)
async def recv(self) -> Any: async def recv(self) -> Any:
assert self._transport assert self.msgstream
return await self._transport.recv() return await self.msgstream.recv()
# try: # try:
# return await self._transport.recv() # return await self.msgstream.recv()
# except trio.BrokenResourceError: # except trio.BrokenResourceError:
# if self._autorecon: # if self._autorecon:
# await self._reconnect() # await self._reconnect()
@ -416,8 +397,8 @@ class Channel:
f'Closing channel to {self.uid} ' f'Closing channel to {self.uid} '
f'{self.laddr} -> {self.raddr}' f'{self.laddr} -> {self.raddr}'
) )
assert self._transport assert self.msgstream
await self._transport.stream.aclose() await self.msgstream.stream.aclose()
self._closed = True self._closed = True
async def __aenter__(self): async def __aenter__(self):
@ -468,16 +449,16 @@ class Channel:
Async iterate items from underlying stream. Async iterate items from underlying stream.
''' '''
assert self._transport assert self.msgstream
while True: while True:
try: try:
async for item in self._transport: async for item in self.msgstream:
yield item yield item
# sent = yield item # sent = yield item
# if sent is not None: # if sent is not None:
# # optimization, passing None through all the # # optimization, passing None through all the
# # time is pointless # # time is pointless
# await self._transport.send(sent) # await self.msgstream.send(sent)
except trio.BrokenResourceError: except trio.BrokenResourceError:
# if not self._autorecon: # if not self._autorecon:
@ -490,7 +471,7 @@ class Channel:
# continue # continue
def connected(self) -> bool: def connected(self) -> bool:
return self._transport.connected() if self._transport else False return self.msgstream.connected() if self.msgstream else False
@asynccontextmanager @asynccontextmanager

View File

@ -27,9 +27,8 @@ from __future__ import annotations
import importlib import importlib
import inspect import inspect
from typing import ( from typing import (
Any, Any, Optional,
Callable, Callable, AsyncGenerator,
AsyncGenerator,
Type, Type,
) )
from functools import partial from functools import partial
@ -53,7 +52,6 @@ from ._ipc import Channel
from .log import get_logger from .log import get_logger
from .msg import NamespacePath from .msg import NamespacePath
from ._exceptions import ( from ._exceptions import (
InternalError,
_raise_from_no_key_in_msg, _raise_from_no_key_in_msg,
unpack_error, unpack_error,
NoResult, NoResult,
@ -71,35 +69,18 @@ from ._streaming import (
log = get_logger(__name__) log = get_logger(__name__)
# TODO: rename to `unwrap_result()` and use
# `._raise_from_no_key_in_msg()` (after tweak to
# accept a `chan: Channel` arg) in key block!
def _unwrap_msg( def _unwrap_msg(
msg: dict[str, Any], msg: dict[str, Any],
channel: Channel, channel: Channel
hide_tb: bool = True,
) -> Any: ) -> Any:
''' __tracebackhide__ = True
Unwrap a final result from a `{return: <Any>}` IPC msg.
'''
__tracebackhide__: bool = hide_tb
try: try:
return msg['return'] return msg['return']
except KeyError as ke: except KeyError as ke:
# internal error should never get here # internal error should never get here
assert msg.get('cid'), ( assert msg.get('cid'), "Received internal error at portal?"
"Received internal error at portal?" raise unpack_error(msg, channel) from ke
)
raise unpack_error(
msg,
channel
) from ke
class Portal: class Portal:
@ -126,9 +107,9 @@ class Portal:
cancel_timeout: float = 0.5 cancel_timeout: float = 0.5
def __init__(self, channel: Channel) -> None: def __init__(self, channel: Channel) -> None:
self.chan = channel self.channel = channel
# during the portal's lifetime # during the portal's lifetime
self._result_msg: dict|None = None self._result_msg: Optional[dict] = None
# When set to a ``Context`` (when _submit_for_result is called) # When set to a ``Context`` (when _submit_for_result is called)
# it is expected that ``result()`` will be awaited at some # it is expected that ``result()`` will be awaited at some
@ -137,18 +118,6 @@ class Portal:
self._streams: set[MsgStream] = set() self._streams: set[MsgStream] = set()
self.actor = current_actor() self.actor = current_actor()
@property
def channel(self) -> Channel:
'''
Proxy to legacy attr name..
Consider the shorter `Portal.chan` instead of `.channel` ;)
'''
log.debug(
'Consider the shorter `Portal.chan` instead of `.channel` ;)'
)
return self.chan
async def _submit_for_result( async def _submit_for_result(
self, self,
ns: str, ns: str,
@ -156,14 +125,14 @@ class Portal:
**kwargs **kwargs
) -> None: ) -> None:
assert self._expect_result is None, ( assert self._expect_result is None, \
"A pending main result has already been submitted" "A pending main result has already been submitted"
)
self._expect_result = await self.actor.start_remote_task( self._expect_result = await self.actor.start_remote_task(
self.channel, self.channel,
nsf=NamespacePath(f'{ns}:{func}'), ns,
kwargs=kwargs func,
kwargs
) )
async def _return_once( async def _return_once(
@ -173,7 +142,7 @@ class Portal:
) -> dict[str, Any]: ) -> dict[str, Any]:
assert ctx._remote_func_type == 'asyncfunc' # single response assert ctx._remote_func_type == 'asyncfunc' # single response
msg: dict = await ctx._recv_chan.receive() msg = await ctx._recv_chan.receive()
return msg return msg
async def result(self) -> Any: async def result(self) -> Any:
@ -204,10 +173,7 @@ class Portal:
self._expect_result self._expect_result
) )
return _unwrap_msg( return _unwrap_msg(self._result_msg, self.channel)
self._result_msg,
self.channel,
)
async def _cancel_streams(self): async def _cancel_streams(self):
# terminate all locally running async generator # terminate all locally running async generator
@ -249,33 +215,26 @@ class Portal:
purpose. purpose.
''' '''
chan: Channel = self.channel if not self.channel.connected():
if not chan.connected(): log.cancel("This channel is already closed can't cancel")
log.runtime(
'This channel is already closed, skipping cancel request..'
)
return False return False
reminfo: str = (
f'{self.channel.uid}\n'
f' |_{chan}\n'
)
log.cancel( log.cancel(
f'Sending runtime `.cancel()` request to peer\n\n' f"Sending actor cancel request to {self.channel.uid} on "
f'{reminfo}' f"{self.channel}")
)
self.channel._cancel_called = True
self.channel._cancel_called: bool = True
try: try:
# send cancel cmd - might not get response # send cancel cmd - might not get response
# XXX: sure would be nice to make this work with # XXX: sure would be nice to make this work with
# a proper shield # a proper shield
with trio.move_on_after( with trio.move_on_after(
timeout timeout
or or self.cancel_timeout
self.cancel_timeout
) as cs: ) as cs:
cs.shield: bool = True cs.shield = True
await self.run_from_ns( await self.run_from_ns(
'self', 'self',
'cancel', 'cancel',
@ -283,12 +242,7 @@ class Portal:
return True return True
if cs.cancelled_caught: if cs.cancelled_caught:
# may timeout and we never get an ack (obvi racy) log.cancel(f"May have failed to cancel {self.channel.uid}")
# but that doesn't mean it wasn't cancelled.
log.debug(
'May have failed to cancel peer?\n'
f'{reminfo}'
)
# if we get here some weird cancellation case happened # if we get here some weird cancellation case happened
return False return False
@ -297,11 +251,9 @@ class Portal:
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
): ):
log.debug( log.cancel(
'IPC chan for actor already closed or broken?\n\n' f"{self.channel} for {self.channel.uid} was already "
f'{self.channel.uid}\n' "closed or broken?")
f' |_{self.channel}\n'
)
return False return False
async def run_from_ns( async def run_from_ns(
@ -322,31 +274,25 @@ class Portal:
A special namespace `self` can be used to invoke `Actor` A special namespace `self` can be used to invoke `Actor`
instance methods in the remote runtime. Currently this instance methods in the remote runtime. Currently this
should only ever be used for `Actor` (method) runtime should only be used solely for ``tractor`` runtime
internals! internals.
''' '''
nsf = NamespacePath(
f'{namespace_path}:{function_name}'
)
ctx = await self.actor.start_remote_task( ctx = await self.actor.start_remote_task(
chan=self.channel, self.channel,
nsf=nsf, namespace_path,
kwargs=kwargs, function_name,
kwargs,
) )
ctx._portal = self ctx._portal = self
msg = await self._return_once(ctx) msg = await self._return_once(ctx)
return _unwrap_msg( return _unwrap_msg(msg, self.channel)
msg,
self.channel,
)
async def run( async def run(
self, self,
func: str, func: str,
fn_name: str|None = None, fn_name: Optional[str] = None,
**kwargs **kwargs
) -> Any: ) -> Any:
''' '''
Submit a remote function to be scheduled and run by actor, in Submit a remote function to be scheduled and run by actor, in
@ -365,9 +311,8 @@ class Portal:
DeprecationWarning, DeprecationWarning,
stacklevel=2, stacklevel=2,
) )
fn_mod_path: str = func fn_mod_path = func
assert isinstance(fn_name, str) assert isinstance(fn_name, str)
nsf = NamespacePath(f'{fn_mod_path}:{fn_name}')
else: # function reference was passed directly else: # function reference was passed directly
if ( if (
@ -380,12 +325,13 @@ class Portal:
raise TypeError( raise TypeError(
f'{func} must be a non-streaming async function!') f'{func} must be a non-streaming async function!')
nsf = NamespacePath.from_ref(func) fn_mod_path, fn_name = NamespacePath.from_ref(func).to_tuple()
ctx = await self.actor.start_remote_task( ctx = await self.actor.start_remote_task(
self.channel, self.channel,
nsf=nsf, fn_mod_path,
kwargs=kwargs, fn_name,
kwargs,
) )
ctx._portal = self ctx._portal = self
return _unwrap_msg( return _unwrap_msg(
@ -409,10 +355,15 @@ class Portal:
raise TypeError( raise TypeError(
f'{async_gen_func} must be an async generator function!') f'{async_gen_func} must be an async generator function!')
ctx: Context = await self.actor.start_remote_task( fn_mod_path, fn_name = NamespacePath.from_ref(
async_gen_func
).to_tuple()
ctx = await self.actor.start_remote_task(
self.channel, self.channel,
nsf=NamespacePath.from_ref(async_gen_func), fn_mod_path,
kwargs=kwargs, fn_name,
kwargs
) )
ctx._portal = self ctx._portal = self
@ -422,8 +373,7 @@ class Portal:
try: try:
# deliver receive only stream # deliver receive only stream
async with MsgStream( async with MsgStream(
ctx=ctx, ctx, ctx._recv_chan,
rx_chan=ctx._recv_chan,
) as rchan: ) as rchan:
self._streams.add(rchan) self._streams.add(rchan)
yield rchan yield rchan
@ -450,25 +400,12 @@ class Portal:
# await recv_chan.aclose() # await recv_chan.aclose()
self._streams.remove(rchan) self._streams.remove(rchan)
# TODO: move this impl to `._context` mod and
# instead just bind it here as a method so that the logic
# for ctx stuff stays all in one place (instead of frickin
# having to open this file in tandem every gd time!!! XD)
#
@asynccontextmanager @asynccontextmanager
async def open_context( async def open_context(
self, self,
func: Callable, func: Callable,
allow_overruns: bool = False, allow_overruns: bool = False,
# TODO: if we set this the wrapping `@acm` body will
# still be shown (awkwardly) on pdb REPL entry. Ideally
# we can similarly annotate that frame to NOT show?
hide_tb: bool = False,
# proxied to RPC
**kwargs, **kwargs,
) -> AsyncGenerator[tuple[Context, Any], None]: ) -> AsyncGenerator[tuple[Context, Any], None]:
@ -501,8 +438,6 @@ class Portal:
collection. See ``tractor.Context`` for more details. collection. See ``tractor.Context`` for more details.
''' '''
__tracebackhide__: bool = hide_tb
# conduct target func method structural checks # conduct target func method structural checks
if not inspect.iscoroutinefunction(func) and ( if not inspect.iscoroutinefunction(func) and (
getattr(func, '_tractor_contex_function', False) getattr(func, '_tractor_contex_function', False)
@ -513,12 +448,13 @@ class Portal:
# TODO: i think from here onward should probably # TODO: i think from here onward should probably
# just be factored into an `@acm` inside a new # just be factored into an `@acm` inside a new
# a new `_context.py` mod. # a new `_context.py` mod.
nsf = NamespacePath.from_ref(func) fn_mod_path, fn_name = NamespacePath.from_ref(func).to_tuple()
ctx: Context = await self.actor.start_remote_task( ctx = await self.actor.start_remote_task(
self.channel, self.channel,
nsf=nsf, fn_mod_path,
kwargs=kwargs, fn_name,
kwargs,
# NOTE: it's imporant to expose this since you might # NOTE: it's imporant to expose this since you might
# get the case where the parent who opened the context does # get the case where the parent who opened the context does
@ -556,11 +492,8 @@ class Portal:
# placeholder for any exception raised in the runtime # placeholder for any exception raised in the runtime
# or by user tasks which cause this context's closure. # or by user tasks which cause this context's closure.
scope_err: BaseException | None = None scope_err: BaseException | None = None
ctxc_from_callee: ContextCancelled|None = None
try: try:
async with trio.open_nursery() as nurse: async with trio.open_nursery() as nurse:
# NOTE: used to start overrun queuing tasks
ctx._scope_nursery: trio.Nursery = nurse ctx._scope_nursery: trio.Nursery = nurse
ctx._scope: trio.CancelScope = nurse.cancel_scope ctx._scope: trio.CancelScope = nurse.cancel_scope
@ -568,26 +501,14 @@ class Portal:
# in enter tuple. # in enter tuple.
yield ctx, first yield ctx, first
# ??TODO??: do we still want to consider this or is # between the caller exiting and arriving here the
# the `else:` block handling via a `.result()` # far end may have sent a ctxc-msg or other error,
# call below enough?? # so check for it here immediately and maybe raise
# -[ ] pretty sure `.result()` internals do the # so as to engage the ctxc handling block below!
# same as our ctxc handler below so it ended up
# being same (repeated?) behaviour, but ideally we
# wouldn't have that duplication either by somehow
# factoring the `.result()` handler impl in a way
# that we can re-use it around the `yield` ^ here
# or vice versa?
#
# NOTE: between the caller exiting and arriving
# here the far end may have sent a ctxc-msg or
# other error, so check for it here immediately
# and maybe raise so as to engage the ctxc
# handling block below!
#
# if re := ctx._remote_error: # if re := ctx._remote_error:
# maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err( # maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err(
# re, # re,
# # TODO: do we want this to always raise? # # TODO: do we want this to always raise?
# # - means that on self-ctxc, if/when the # # - means that on self-ctxc, if/when the
# # block is exited before the msg arrives # # block is exited before the msg arrives
@ -605,7 +526,7 @@ class Portal:
# # block? # # block?
# raise_ctxc_from_self_call=True, # raise_ctxc_from_self_call=True,
# ) # )
# ctxc_from_callee = maybe_ctxc # assert maybe_ctxc
# when in allow_overruns mode there may be # when in allow_overruns mode there may be
# lingering overflow sender tasks remaining? # lingering overflow sender tasks remaining?
@ -617,18 +538,13 @@ class Portal:
not ctx._allow_overruns not ctx._allow_overruns
or len(nurse.child_tasks) > 1 or len(nurse.child_tasks) > 1
): ):
raise InternalError( raise RuntimeError(
'Context has sub-tasks but is ' 'Context has sub-tasks but is '
'not in `allow_overruns=True` mode!?' 'not in `allow_overruns=True` mode!?'
) )
# ensure we cancel all overflow sender # ensure cancel of all overflow sender tasks
# tasks started in the nursery when # started in the ctx nursery.
# `._allow_overruns == True`.
#
# NOTE: this means `._scope.cancelled_caught`
# will prolly be set! not sure if that's
# non-ideal or not ???
ctx._scope.cancel() ctx._scope.cancel()
# XXX NOTE XXX: maybe shield against # XXX NOTE XXX: maybe shield against
@ -641,15 +557,14 @@ class Portal:
# of a `Context`. In both cases any `ContextCancelled` # of a `Context`. In both cases any `ContextCancelled`
# raised in this scope-block came from a transport msg # raised in this scope-block came from a transport msg
# relayed from some remote-actor-task which our runtime set # relayed from some remote-actor-task which our runtime set
# as to `Context._remote_error` # as a `Context._remote_error`
# #
# the CASES: # the CASES:
# #
# - if that context IS THE SAME ONE that called # - if that context IS THE SAME ONE that called
# `Context.cancel()`, we want to absorb the error # `Context.cancel()`, we want to absorb the error
# silently and let this `.open_context()` block to exit # silently and let this `.open_context()` block to exit
# without raising, ideally eventually receiving the ctxc # without raising.
# ack msg thus resulting in `ctx.cancel_acked == True`.
# #
# - if it is from some OTHER context (we did NOT call # - if it is from some OTHER context (we did NOT call
# `.cancel()`), we want to re-RAISE IT whilst also # `.cancel()`), we want to re-RAISE IT whilst also
@ -673,7 +588,6 @@ class Portal:
# `Nursery.cancel_scope.cancel()`) # `Nursery.cancel_scope.cancel()`)
except ContextCancelled as ctxc: except ContextCancelled as ctxc:
scope_err = ctxc scope_err = ctxc
ctxc_from_callee = ctxc
# XXX TODO XXX: FIX THIS debug_mode BUGGGG!!! # XXX TODO XXX: FIX THIS debug_mode BUGGGG!!!
# using this code and then resuming the REPL will # using this code and then resuming the REPL will
@ -683,7 +597,6 @@ class Portal:
# documenting it as a definittive example of # documenting it as a definittive example of
# debugging the tractor-runtime itself using it's # debugging the tractor-runtime itself using it's
# own `.devx.` tooling! # own `.devx.` tooling!
#
# await pause() # await pause()
# CASE 2: context was cancelled by local task calling # CASE 2: context was cancelled by local task calling
@ -691,10 +604,15 @@ class Portal:
# exit silently. # exit silently.
if ( if (
ctx._cancel_called ctx._cancel_called
and and (
ctxc is ctx._remote_error ctxc is ctx._remote_error
# ctxc.msgdata == ctx._remote_error.msgdata
# TODO: uhh `Portal.canceller` ain't a thangg
# dawg? (was `self.canceller` before?!?)
and and
ctxc.canceller == self.actor.uid ctxc.canceller == self.actor.uid
)
): ):
log.cancel( log.cancel(
f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n' f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n'
@ -702,9 +620,9 @@ class Portal:
) )
# CASE 1: this context was never cancelled via a local # CASE 1: this context was never cancelled via a local
# task (tree) having called `Context.cancel()`, raise # task (tree) having called `Context.cancel()`, raise
# the error since it was caused by someone else # the error since it was caused by someone else!
# -> probably a remote peer!
else: else:
# await pause()
raise raise
# the above `._scope` can be cancelled due to: # the above `._scope` can be cancelled due to:
@ -717,29 +635,19 @@ class Portal:
# CASE 3: standard local error in this caller/yieldee # CASE 3: standard local error in this caller/yieldee
Exception, Exception,
# CASES 1 & 2: can manifest as a `ctx._scope_nursery` # CASES 1 & 2: normally manifested as
# a `Context._scope_nursery` raised
# exception-group of, # exception-group of,
#
# 1.-`trio.Cancelled`s, since # 1.-`trio.Cancelled`s, since
# `._scope.cancel()` will have been called # `._scope.cancel()` will have been called and any
# (transitively by the runtime calling # `ContextCancelled` absorbed and thus NOT RAISED in
# `._deliver_msg()`) and any `ContextCancelled` # any `Context._maybe_raise_remote_err()`,
# eventually absorbed and thus absorbed/supressed in
# any `Context._maybe_raise_remote_err()` call.
#
# 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]` # 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]`
# from any error delivered from the "callee" side # from any error raised in the "callee" side with
# AND a group-exc is only raised if there was > 1 # a group only raised if there was any more then one
# tasks started *here* in the "caller" / opener # task started here in the "caller" in the
# block. If any one of those tasks calls # `yield`-ed to task.
# `.result()` or `MsgStream.receive()` BaseExceptionGroup, # since overrun handler tasks may have been spawned
# `._maybe_raise_remote_err()` will be transitively
# called and the remote error raised causing all
# tasks to be cancelled.
# NOTE: ^ this case always can happen if any
# overrun handler tasks were spawned!
BaseExceptionGroup,
trio.Cancelled, # NOTE: NOT from inside the ctx._scope trio.Cancelled, # NOTE: NOT from inside the ctx._scope
KeyboardInterrupt, KeyboardInterrupt,
@ -749,38 +657,32 @@ class Portal:
# XXX: ALWAYS request the context to CANCEL ON any ERROR. # XXX: ALWAYS request the context to CANCEL ON any ERROR.
# NOTE: `Context.cancel()` is conversely NEVER CALLED in # NOTE: `Context.cancel()` is conversely NEVER CALLED in
# the `ContextCancelled` "self cancellation absorbed" case # the `ContextCancelled` "self cancellation absorbed" case
# handled in the block above ^^^ !! # handled in the block above!
log.cancel( log.cancel(
'Context terminated due to\n\n' 'Context cancelled for task due to\n'
f'{caller_err}\n' f'{caller_err}\n'
'Sending cancel request..\n'
f'task:{cid}\n'
f'actor:{uid}'
) )
if debug_mode(): if debug_mode():
log.pdb(
'Delaying `ctx.cancel()` until debug lock '
'acquired..'
)
# async with acquire_debug_lock(self.actor.uid): # async with acquire_debug_lock(self.actor.uid):
# pass # pass
# TODO: factor ^ into below for non-root cases? # TODO: factor ^ into below for non-root cases?
was_acquired: bool = await maybe_wait_for_debugger( await maybe_wait_for_debugger()
header_msg=(
'Delaying `ctx.cancel()` until debug lock '
'acquired..\n'
),
)
if was_acquired:
log.pdb( log.pdb(
'Acquired debug lock! ' 'Acquired debug lock! '
'Calling `ctx.cancel()`!\n' 'Calling `ctx.cancel()`!'
) )
# we don't need to cancel the callee if it already
# told us it's cancelled ;p
if ctxc_from_callee is None:
try: try:
await ctx.cancel() await ctx.cancel()
except ( except trio.BrokenResourceError:
trio.BrokenResourceError,
trio.ClosedResourceError,
):
log.warning( log.warning(
'IPC connection for context is broken?\n' 'IPC connection for context is broken?\n'
f'task:{cid}\n' f'task:{cid}\n'
@ -791,11 +693,38 @@ class Portal:
# no local scope error, the "clean exit with a result" case. # no local scope error, the "clean exit with a result" case.
else: else:
# between the caller exiting and arriving here the
# far end may have sent a ctxc-msg or other error,
# so check for it here immediately and maybe raise
# so as to engage the ctxc handling block below!
# if re := ctx._remote_error:
# maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err(
# re,
# # TODO: do we want this to always raise?
# # - means that on self-ctxc, if/when the
# # block is exited before the msg arrives
# # but then the msg during __exit__
# # calling we may not activate the
# # ctxc-handler block below? should we
# # be?
# # - if there's a remote error that arrives
# # after the child has exited, we won't
# # handle until the `finally:` block
# # where `.result()` is always called,
# # again in which case we handle it
# # differently then in the handler block
# # that would normally engage from THIS
# # block?
# raise_ctxc_from_self_call=True,
# )
# assert maybe_ctxc
if ctx.chan.connected(): if ctx.chan.connected():
log.runtime( log.info(
'Waiting on final context result for\n' 'Waiting on final context-task result for\n'
f'peer: {uid}\n' f'task: {cid}\n'
f'|_{ctx._task}\n' f'actor: {uid}'
) )
# XXX NOTE XXX: the below call to # XXX NOTE XXX: the below call to
# `Context.result()` will ALWAYS raise # `Context.result()` will ALWAYS raise
@ -820,18 +749,16 @@ class Portal:
scope_err = berr scope_err = berr
raise raise
# yes! this worx Bp
# from .devx import _debug
# await _debug.pause()
# an exception type boxed in a `RemoteActorError` # an exception type boxed in a `RemoteActorError`
# is returned (meaning it was obvi not raised) # is returned (meaning it was obvi not raised).
# that we want to log-report on.
msgdata: str|None = getattr( msgdata: str|None = getattr(
result_or_err, result_or_err,
'msgdata', 'msgdata',
None None
) )
# yes! this worx Bp
# from .devx import _debug
# await _debug.pause()
match (msgdata, result_or_err): match (msgdata, result_or_err):
case ( case (
{'tb_str': tbstr}, {'tb_str': tbstr},
@ -844,19 +771,13 @@ class Portal:
RemoteActorError(), RemoteActorError(),
): ):
log.exception( log.exception(
'Context remotely errored!\n' f'Context `{fn_name}` remotely errored:\n'
f'<= peer: {uid}\n' f'`{tbstr}`'
f' |_ {nsf}()\n\n'
f'{tbstr}'
) )
case (None, _): case (None, _):
log.runtime( log.runtime(
'Context returned final result from callee task:\n' f'Context {fn_name} returned value from callee:\n'
f'<= peer: {uid}\n' f'`{result_or_err}`'
f' |_ {nsf}()\n\n'
f'`{result_or_err}`\n'
) )
finally: finally:
@ -874,8 +795,9 @@ class Portal:
# operating *in* this scope to have survived # operating *in* this scope to have survived
# we tear down the runtime feeder chan last # we tear down the runtime feeder chan last
# to avoid premature stream clobbers. # to avoid premature stream clobbers.
rxchan: trio.ReceiveChannel = ctx._recv_chan
if ( if (
(rxchan := ctx._recv_chan) rxchan
# maybe TODO: yes i know the below check is # maybe TODO: yes i know the below check is
# touching `trio` memchan internals..BUT, there are # touching `trio` memchan internals..BUT, there are
@ -931,38 +853,28 @@ class Portal:
etype: Type[BaseException] = type(scope_err) etype: Type[BaseException] = type(scope_err)
# CASE 2 # CASE 2
if ( if ctx._cancel_called:
ctx._cancel_called
and ctx.cancel_acked
):
log.cancel( log.cancel(
'Context cancelled by caller task\n' f'Context {fn_name} cancelled by caller with\n'
f'|_{ctx._task}\n\n' f'{etype}'
f'{repr(scope_err)}\n'
) )
# TODO: should we add a `._cancel_req_received`
# flag to determine if the callee manually called
# `ctx.cancel()`?
# -[ ] going to need a cid check no?
# CASE 1 # CASE 1
else: else:
log.cancel( log.cancel(
f'Context terminated due to local scope error:\n' f'Context cancelled by callee with {etype}\n'
f'{etype.__name__}\n' f'target: `{fn_name}`\n'
f'task:{cid}\n'
f'actor:{uid}'
) )
# FINALLY, remove the context from runtime tracking and # FINALLY, remove the context from runtime tracking and
# exit! # exit!
log.runtime( log.runtime(
'Removing IPC ctx opened with peer\n' f'Exiting context opened with {ctx.chan.uid}'
f'{uid}\n'
f'|_{ctx}\n'
) )
self.actor._contexts.pop( self.actor._contexts.pop(
(uid, cid), (self.channel.uid, ctx.cid),
None, None,
) )
@ -999,7 +911,7 @@ class LocalPortal:
async def open_portal( async def open_portal(
channel: Channel, channel: Channel,
nursery: trio.Nursery|None = None, nursery: Optional[trio.Nursery] = None,
start_msg_loop: bool = True, start_msg_loop: bool = True,
shield: bool = False, shield: bool = False,
@ -1024,7 +936,7 @@ async def open_portal(
if channel.uid is None: if channel.uid is None:
await actor._do_handshake(channel) await actor._do_handshake(channel)
msg_loop_cs: trio.CancelScope|None = None msg_loop_cs: Optional[trio.CancelScope] = None
if start_msg_loop: if start_msg_loop:
from ._runtime import process_messages from ._runtime import process_messages
msg_loop_cs = await nursery.start( msg_loop_cs = await nursery.start(

View File

@ -326,7 +326,7 @@ async def open_root_actor(
not entered not entered
and not is_multi_cancelled(err) and not is_multi_cancelled(err)
): ):
logger.exception('Root actor crashed:\n') logger.exception("Root actor crashed:")
# ALWAYS re-raise any error bubbled up from the # ALWAYS re-raise any error bubbled up from the
# runtime! # runtime!
@ -343,7 +343,9 @@ async def open_root_actor(
# tempn.start_soon(an.exited.wait) # tempn.start_soon(an.exited.wait)
logger.cancel("Shutting down root actor") logger.cancel("Shutting down root actor")
await actor.cancel(None) # self cancel await actor.cancel(
requesting_uid=actor.uid,
)
finally: finally:
_state._current_actor = None _state._current_actor = None

File diff suppressed because it is too large Load Diff

View File

@ -196,16 +196,16 @@ async def cancel_on_completion(
result: Any|Exception = await exhaust_portal(portal, actor) result: Any|Exception = await exhaust_portal(portal, actor)
if isinstance(result, Exception): if isinstance(result, Exception):
errors[actor.uid]: Exception = result errors[actor.uid]: Exception = result
log.cancel( log.warning(
'Cancelling subactor runtime due to error:\n\n' 'Cancelling subactor due to error:\n'
f'Portal.cancel_actor() => {portal.channel.uid}\n\n' f'uid: {portal.channel.uid}\n'
f'error: {result}\n' f'error: {result}\n'
) )
else: else:
log.runtime( log.runtime(
'Cancelling subactor gracefully:\n\n' 'Cancelling subactor gracefully:\n'
f'Portal.cancel_actor() => {portal.channel.uid}\n\n' f'uid: {portal.channel.uid}\n'
f'result: {result}\n' f'result: {result}\n'
) )
@ -213,7 +213,7 @@ async def cancel_on_completion(
await portal.cancel_actor() await portal.cancel_actor()
async def hard_kill( async def do_hard_kill(
proc: trio.Process, proc: trio.Process,
terminate_after: int = 3, terminate_after: int = 3,
@ -288,7 +288,7 @@ async def hard_kill(
proc.kill() proc.kill()
async def soft_kill( async def soft_wait(
proc: ProcessType, proc: ProcessType,
wait_func: Callable[ wait_func: Callable[
@ -299,20 +299,17 @@ async def soft_kill(
) -> None: ) -> None:
''' '''
Wait for proc termination but **don't yet** teardown Wait for proc termination but **dont' yet** teardown
std-streams since it will clobber any ongoing pdb REPL std-streams (since it will clobber any ongoing pdb REPL
session. session). This is our "soft" (and thus itself cancellable)
join/reap on an actor-runtime-in-process.
This is our "soft"/graceful, and thus itself also cancellable,
join/reap on an actor-runtime-in-process shutdown; it is
**not** the same as a "hard kill" via an OS signal (for that
see `.hard_kill()`).
''' '''
uid: tuple[str, str] = portal.channel.uid uid: tuple[str, str] = portal.channel.uid
try: try:
log.cancel( log.cancel(
'Soft killing sub-actor via `Portal.cancel_actor()`\n' 'Soft waiting on sub-actor proc:\n'
f'uid: {uid}\n'
f'|_{proc}\n' f'|_{proc}\n'
) )
# wait on sub-proc to signal termination # wait on sub-proc to signal termination
@ -329,9 +326,8 @@ async def soft_kill(
async def cancel_on_proc_deth(): async def cancel_on_proc_deth():
''' '''
"Cancel-the-cancel" request: if we detect that the "Cancel the (actor) cancel" request if we detect
underlying sub-process exited prior to that that the underlying sub-process terminated.
a `Portal.cancel_actor()` call completing .
''' '''
await wait_func(proc) await wait_func(proc)
@ -448,17 +444,14 @@ async def trio_proc(
try: try:
# TODO: needs ``trio_typing`` patch? # TODO: needs ``trio_typing`` patch?
proc = await trio.lowlevel.open_process(spawn_cmd) proc = await trio.lowlevel.open_process(spawn_cmd)
log.runtime(
'Started new sub-proc\n' log.runtime(f"Started {proc}")
f'|_{proc}\n'
)
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer( event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid subactor.uid)
)
except trio.Cancelled: except trio.Cancelled:
cancelled_during_spawn = True cancelled_during_spawn = True
@ -519,7 +512,7 @@ async def trio_proc(
# This is a "soft" (cancellable) join/reap which # This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled`` # will remote cancel the actor on a ``trio.Cancelled``
# condition. # condition.
await soft_kill( await soft_wait(
proc, proc,
trio.Process.wait, trio.Process.wait,
portal portal
@ -548,14 +541,13 @@ async def trio_proc(
with trio.move_on_after(0.5): with trio.move_on_after(0.5):
await proc.wait() await proc.wait()
log.pdb(
'Delaying subproc reaper while debugger locked..'
)
await maybe_wait_for_debugger( await maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get( child_in_debug=_runtime_vars.get(
'_debug_mode', False '_debug_mode', False
), ),
header_msg=(
'Delaying subproc reaper while debugger locked..\n'
),
# TODO: need a diff value then default? # TODO: need a diff value then default?
# poll_steps=9999999, # poll_steps=9999999,
) )
@ -581,7 +573,7 @@ async def trio_proc(
if proc.poll() is None: if proc.poll() is None:
log.cancel(f"Attempting to hard kill {proc}") log.cancel(f"Attempting to hard kill {proc}")
await hard_kill(proc) await do_hard_kill(proc)
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")
else: else:
@ -725,7 +717,7 @@ async def mp_proc(
# This is a "soft" (cancellable) join/reap which # This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled`` # will remote cancel the actor on a ``trio.Cancelled``
# condition. # condition.
await soft_kill( await soft_wait(
proc, proc,
proc_waiter, proc_waiter,
portal portal

View File

@ -95,6 +95,9 @@ class MsgStream(trio.abc.Channel):
try: try:
return msg['yield'] return msg['yield']
except KeyError as kerr: except KeyError as kerr:
# if 'return' in msg:
# return msg
_raise_from_no_key_in_msg( _raise_from_no_key_in_msg(
ctx=self._ctx, ctx=self._ctx,
msg=msg, msg=msg,
@ -125,9 +128,13 @@ class MsgStream(trio.abc.Channel):
# introducing this # introducing this
if self._eoc: if self._eoc:
raise self._eoc raise self._eoc
# raise trio.EndOfChannel
if self._closed: if self._closed:
raise self._closed raise self._closed
# raise trio.ClosedResourceError(
# 'This stream was already closed'
# )
src_err: Exception|None = None src_err: Exception|None = None
try: try:
@ -136,7 +143,6 @@ class MsgStream(trio.abc.Channel):
return msg['yield'] return msg['yield']
except KeyError as kerr: except KeyError as kerr:
# log.exception('GOT KEYERROR')
src_err = kerr src_err = kerr
# NOTE: may raise any of the below error types # NOTE: may raise any of the below error types
@ -155,9 +161,9 @@ class MsgStream(trio.abc.Channel):
# trio.ClosedResourceError, # by self._rx_chan # trio.ClosedResourceError, # by self._rx_chan
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
) as eoc: ) as eoc:
# log.exception('GOT EOC')
src_err = eoc src_err = eoc
self._eoc = eoc self._eoc = eoc
# await trio.sleep(1)
# a ``ClosedResourceError`` indicates that the internal # a ``ClosedResourceError`` indicates that the internal
# feeder memory receive channel was closed likely by the # feeder memory receive channel was closed likely by the
@ -195,7 +201,6 @@ class MsgStream(trio.abc.Channel):
# raise eoc # raise eoc
except trio.ClosedResourceError as cre: # by self._rx_chan except trio.ClosedResourceError as cre: # by self._rx_chan
# log.exception('GOT CRE')
src_err = cre src_err = cre
log.warning( log.warning(
'`Context._rx_chan` was already closed?' '`Context._rx_chan` was already closed?'
@ -206,8 +211,6 @@ class MsgStream(trio.abc.Channel):
# terminated and signal this local iterator to stop # terminated and signal this local iterator to stop
drained: list[Exception|dict] = await self.aclose() drained: list[Exception|dict] = await self.aclose()
if drained: if drained:
# from .devx import pause
# await pause()
log.warning( log.warning(
'Drained context msgs during closure:\n' 'Drained context msgs during closure:\n'
f'{drained}' f'{drained}'
@ -234,64 +237,54 @@ class MsgStream(trio.abc.Channel):
Cancel associated remote actor task and local memory channel on Cancel associated remote actor task and local memory channel on
close. close.
Notes:
- REMEMBER that this is also called by `.__aexit__()` so
careful consideration must be made to handle whatever
internal stsate is mutated, particuarly in terms of
draining IPC msgs!
- more or less we try to maintain adherance to trio's `.aclose()` semantics:
https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
''' '''
# XXX: keep proper adherance to trio's `.aclose()` semantics:
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
rx_chan = self._rx_chan
# rx_chan = self._rx_chan if (
rx_chan._closed
or
self._closed
):
log.cancel(
f'`MsgStream` is already closed\n'
f'.cid: {self._ctx.cid}\n'
f'._rx_chan`: {rx_chan}\n'
f'._eoc: {self._eoc}\n'
f'._closed: {self._eoc}\n'
)
# XXX NOTE XXX
# it's SUPER IMPORTANT that we ensure we don't DOUBLE
# DRAIN msgs on closure so avoid getting stuck handing on
# the `._rx_chan` since we call this method on
# `.__aexit__()` as well!!!
# => SO ENSURE WE CATCH ALL TERMINATION STATES in this
# block including the EoC..
if self.closed:
# this stream has already been closed so silently succeed as # this stream has already been closed so silently succeed as
# per ``trio.AsyncResource`` semantics. # per ``trio.AsyncResource`` semantics.
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
return [] return []
ctx: Context = self._ctx ctx: Context = self._ctx
# caught_eoc: bool = False
drained: list[Exception|dict] = [] drained: list[Exception|dict] = []
while not drained: while not drained:
try: try:
maybe_final_msg = self.receive_nowait() maybe_final_msg = self.receive_nowait()
if maybe_final_msg: if maybe_final_msg:
log.debug( log.cancel(
'Drained un-processed stream msg:\n' 'Drained un-processed stream msg:\n'
f'{pformat(maybe_final_msg)}' f'{pformat(maybe_final_msg)}'
) )
# TODO: inject into parent `Context` buf? # TODO: inject into parent `Context` buf?
drained.append(maybe_final_msg) drained.append(maybe_final_msg)
# NOTE: we only need these handlers due to the
# `.receive_nowait()` call above which may re-raise
# one of these errors on a msg key error!
except trio.WouldBlock as be: except trio.WouldBlock as be:
drained.append(be) drained.append(be)
break break
except trio.EndOfChannel as eoc: except trio.EndOfChannel as eoc:
self._eoc: Exception = eoc
drained.append(eoc) drained.append(eoc)
break # caught_eoc = True
self._eoc: bool = eoc
except trio.ClosedResourceError as cre:
self._closed = cre
drained.append(cre)
break break
except ContextCancelled as ctxc: except ContextCancelled as ctxc:
# log.exception('GOT CTXC')
log.cancel( log.cancel(
'Context was cancelled during stream closure:\n' 'Context was cancelled during stream closure:\n'
f'canceller: {ctxc.canceller}\n' f'canceller: {ctxc.canceller}\n'
@ -346,11 +339,8 @@ class MsgStream(trio.abc.Channel):
# with trio.CancelScope(shield=True): # with trio.CancelScope(shield=True):
# await rx_chan.aclose() # await rx_chan.aclose()
if not self._eoc: # self._eoc: bool = caught_eoc
self._eoc: bool = trio.EndOfChannel(
f'Context stream closed by {self._ctx.side}\n'
f'|_{self}\n'
)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX? # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
# => NO, DEFINITELY NOT! <= # => NO, DEFINITELY NOT! <=
# if we're a bi-dir ``MsgStream`` BECAUSE this same # if we're a bi-dir ``MsgStream`` BECAUSE this same
@ -389,26 +379,6 @@ class MsgStream(trio.abc.Channel):
# self._closed = True # self._closed = True
return drained return drained
@property
def closed(self) -> bool:
if (
(rxc := self._rx_chan._closed)
or
(_closed := self._closed)
or
(_eoc := self._eoc)
):
log.runtime(
f'`MsgStream` is already closed\n'
f'{self}\n'
f' |_cid: {self._ctx.cid}\n'
f' |_rx_chan._closed: {type(rxc)} = {rxc}\n'
f' |_closed: {type(_closed)} = {_closed}\n'
f' |_eoc: {type(_eoc)} = {_eoc}'
)
return True
return False
@acm @acm
async def subscribe( async def subscribe(
self, self,

View File

@ -21,7 +21,6 @@
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
import inspect import inspect
from pprint import pformat
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import typing import typing
import warnings import warnings
@ -34,10 +33,7 @@ from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._runtime import Actor from ._runtime import Actor
from ._portal import Portal from ._portal import Portal
from ._exceptions import ( from ._exceptions import is_multi_cancelled
is_multi_cancelled,
ContextCancelled,
)
from ._root import open_root_actor from ._root import open_root_actor
from . import _state from . import _state
from . import _spawn from . import _spawn
@ -107,14 +103,6 @@ class ActorNursery:
self.errors = errors self.errors = errors
self.exited = trio.Event() self.exited = trio.Event()
# NOTE: when no explicit call is made to
# `.open_root_actor()` by application code,
# `.open_nursery()` will implicitly call it to start the
# actor-tree runtime. In this case we mark ourselves as
# such so that runtime components can be aware for logging
# and syncing purposes to any actor opened nurseries.
self._implicit_runtime_started: bool = False
async def start_actor( async def start_actor(
self, self,
name: str, name: str,
@ -201,16 +189,14 @@ class ActorNursery:
**kwargs, # explicit args to ``fn`` **kwargs, # explicit args to ``fn``
) -> Portal: ) -> Portal:
''' """Spawn a new actor, run a lone task, then terminate the actor and
Spawn a new actor, run a lone task, then terminate the actor and
return its result. return its result.
Actors spawned using this method are kept alive at nursery teardown Actors spawned using this method are kept alive at nursery teardown
until the task spawned by executing ``fn`` completes at which point until the task spawned by executing ``fn`` completes at which point
the actor is terminated. the actor is terminated.
"""
''' mod_path = fn.__module__
mod_path: str = fn.__module__
if name is None: if name is None:
# use the explicit function name if not provided # use the explicit function name if not provided
@ -245,11 +231,7 @@ class ActorNursery:
) )
return portal return portal
async def cancel( async def cancel(self, hard_kill: bool = False) -> None:
self,
hard_kill: bool = False,
) -> None:
''' '''
Cancel this nursery by instructing each subactor to cancel Cancel this nursery by instructing each subactor to cancel
itself and wait for all subactors to terminate. itself and wait for all subactors to terminate.
@ -260,13 +242,10 @@ class ActorNursery:
''' '''
self.cancelled = True self.cancelled = True
# TODO: impl a repr for spawn more compact log.cancel(f"Cancelling nursery in {self._actor.uid}")
# then `._children`..
children: dict = self._children
child_count: int = len(children)
msg: str = f'Cancelling actor nursery with {child_count} children\n'
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with trio.open_nursery() as tn:
async with trio.open_nursery() as nursery:
subactor: Actor subactor: Actor
proc: trio.Process proc: trio.Process
@ -275,7 +254,7 @@ class ActorNursery:
subactor, subactor,
proc, proc,
portal, portal,
) in children.values(): ) in self._children.values():
# TODO: are we ever even going to use this or # TODO: are we ever even going to use this or
# is the spawning backend responsible for such # is the spawning backend responsible for such
@ -287,13 +266,12 @@ class ActorNursery:
if portal is None: # actor hasn't fully spawned yet if portal is None: # actor hasn't fully spawned yet
event = self._actor._peer_connected[subactor.uid] event = self._actor._peer_connected[subactor.uid]
log.warning( log.warning(
f"{subactor.uid} never 't finished spawning?" f"{subactor.uid} wasn't finished spawning?")
)
await event.wait() await event.wait()
# channel/portal should now be up # channel/portal should now be up
_, _, portal = children[subactor.uid] _, _, portal = self._children[subactor.uid]
# XXX should be impossible to get here # XXX should be impossible to get here
# unless method was called from within # unless method was called from within
@ -310,15 +288,13 @@ class ActorNursery:
# spawn cancel tasks for each sub-actor # spawn cancel tasks for each sub-actor
assert portal assert portal
if portal.channel.connected(): if portal.channel.connected():
tn.start_soon(portal.cancel_actor) nursery.start_soon(portal.cancel_actor)
log.cancel(msg)
# if we cancelled the cancel (we hung cancelling remote actors) # if we cancelled the cancel (we hung cancelling remote actors)
# then hard kill all sub-processes # then hard kill all sub-processes
if cs.cancelled_caught: if cs.cancelled_caught:
log.error( log.error(
f'Failed to cancel {self}?\n' f'Failed to cancel {self}\nHard killing process tree!'
'Hard killing underlying subprocess tree!\n'
) )
subactor: Actor subactor: Actor
proc: trio.Process proc: trio.Process
@ -327,7 +303,7 @@ class ActorNursery:
subactor, subactor,
proc, proc,
portal, portal,
) in children.values(): ) in self._children.values():
log.warning(f"Hard killing process {proc}") log.warning(f"Hard killing process {proc}")
proc.terminate() proc.terminate()
@ -367,7 +343,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# the above "daemon actor" nursery will be notified. # the above "daemon actor" nursery will be notified.
async with trio.open_nursery() as ria_nursery: async with trio.open_nursery() as ria_nursery:
an = ActorNursery( anursery = ActorNursery(
actor, actor,
ria_nursery, ria_nursery,
da_nursery, da_nursery,
@ -376,16 +352,16 @@ async def _open_and_supervise_one_cancels_all_nursery(
try: try:
# spawning of actors happens in the caller's scope # spawning of actors happens in the caller's scope
# after we yield upwards # after we yield upwards
yield an yield anursery
# When we didn't error in the caller's scope, # When we didn't error in the caller's scope,
# signal all process-monitor-tasks to conduct # signal all process-monitor-tasks to conduct
# the "hard join phase". # the "hard join phase".
log.runtime( log.runtime(
'Waiting on subactors to complete:\n' f"Waiting on subactors {anursery._children} "
f'{pformat(an._children)}\n' "to complete"
) )
an._join_procs.set() anursery._join_procs.set()
except BaseException as inner_err: except BaseException as inner_err:
errors[actor.uid] = inner_err errors[actor.uid] = inner_err
@ -397,47 +373,34 @@ async def _open_and_supervise_one_cancels_all_nursery(
# Instead try to wait for pdb to be released before # Instead try to wait for pdb to be released before
# tearing down. # tearing down.
await maybe_wait_for_debugger( await maybe_wait_for_debugger(
child_in_debug=an._at_least_one_child_in_debug child_in_debug=anursery._at_least_one_child_in_debug
) )
# if the caller's scope errored then we activate our # if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't # one-cancels-all supervisor strategy (don't
# worry more are coming). # worry more are coming).
an._join_procs.set() anursery._join_procs.set()
# XXX NOTE XXX: hypothetically an error could # XXX: hypothetically an error could be
# be raised and then a cancel signal shows up # raised and then a cancel signal shows up
# slightly after in which case the `else:` # slightly after in which case the `else:`
# block here might not complete? For now, # block here might not complete? For now,
# shield both. # shield both.
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
etype: type = type(inner_err) etype = type(inner_err)
if etype in ( if etype in (
trio.Cancelled, trio.Cancelled,
KeyboardInterrupt, KeyboardInterrupt
) or ( ) or (
is_multi_cancelled(inner_err) is_multi_cancelled(inner_err)
): ):
log.cancel( log.cancel(
f'Actor-nursery cancelled by {etype}\n\n' f"Nursery for {current_actor().uid} "
f"was cancelled with {etype}")
f'{current_actor().uid}\n'
f' |_{an}\n\n'
# TODO: show tb str?
# f'{tb_str}'
)
elif etype in {
ContextCancelled,
}:
log.cancel(
'Actor-nursery caught remote cancellation\n\n'
f'{inner_err.tb_str}'
)
else: else:
log.exception( log.exception(
'Nursery errored with:\n' f"Nursery for {current_actor().uid} "
"errored with:"
# TODO: same thing as in # TODO: same thing as in
# `._invoke()` to compute how to # `._invoke()` to compute how to
@ -450,7 +413,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
) )
# cancel all subactors # cancel all subactors
await an.cancel() await anursery.cancel()
# ria_nursery scope end # ria_nursery scope end
@ -471,22 +434,18 @@ async def _open_and_supervise_one_cancels_all_nursery(
# XXX: yet another guard before allowing the cancel # XXX: yet another guard before allowing the cancel
# sequence in case a (single) child is in debug. # sequence in case a (single) child is in debug.
await maybe_wait_for_debugger( await maybe_wait_for_debugger(
child_in_debug=an._at_least_one_child_in_debug child_in_debug=anursery._at_least_one_child_in_debug
) )
# If actor-local error was raised while waiting on # If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all # ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy: # remaining sub-actors (due to our lone strategy:
# one-cancels-all). # one-cancels-all).
if an._children: log.cancel(f"Nursery cancelling due to {err}")
log.cancel( if anursery._children:
'Actor-nursery cancelling due error type:\n'
f'{err}\n'
)
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await an.cancel() await anursery.cancel()
raise raise
finally: finally:
# No errors were raised while awaiting ".run_in_actor()" # No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as # actors but those actors may have returned remote errors as
@ -495,9 +454,9 @@ async def _open_and_supervise_one_cancels_all_nursery(
# collected in ``errors`` so cancel all actors, summarize # collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise. # all errors and re-raise.
if errors: if errors:
if an._children: if anursery._children:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await an.cancel() await anursery.cancel()
# use `BaseExceptionGroup` as needed # use `BaseExceptionGroup` as needed
if len(errors) > 1: if len(errors) > 1:
@ -532,7 +491,7 @@ async def open_nursery(
which cancellation scopes correspond to each spawned subactor set. which cancellation scopes correspond to each spawned subactor set.
''' '''
implicit_runtime: bool = False implicit_runtime = False
actor = current_actor(err_on_no_runtime=False) actor = current_actor(err_on_no_runtime=False)
@ -544,7 +503,7 @@ async def open_nursery(
log.info("Starting actor runtime!") log.info("Starting actor runtime!")
# mark us for teardown on exit # mark us for teardown on exit
implicit_runtime: bool = True implicit_runtime = True
async with open_root_actor(**kwargs) as actor: async with open_root_actor(**kwargs) as actor:
assert actor is current_actor() assert actor is current_actor()
@ -552,42 +511,24 @@ async def open_nursery(
try: try:
async with _open_and_supervise_one_cancels_all_nursery( async with _open_and_supervise_one_cancels_all_nursery(
actor actor
) as an: ) as anursery:
yield anursery
# NOTE: mark this nursery as having
# implicitly started the root actor so
# that `._runtime` machinery can avoid
# certain teardown synchronization
# blocking/waits and any associated (warn)
# logging when it's known that this
# nursery shouldn't be exited before the
# root actor is.
an._implicit_runtime_started = True
yield an
finally: finally:
# XXX: this event will be set after the root actor anursery.exited.set()
# runtime is already torn down, so we want to
# avoid any blocking on it.
an.exited.set()
else: # sub-nursery case else: # sub-nursery case
try: try:
async with _open_and_supervise_one_cancels_all_nursery( async with _open_and_supervise_one_cancels_all_nursery(
actor actor
) as an: ) as anursery:
yield an yield anursery
finally: finally:
an.exited.set() anursery.exited.set()
finally: finally:
msg: str = ( log.debug("Nursery teardown complete")
'Actor-nursery exited\n'
f'|_{an}\n\n'
)
# shutdown runtime if it was started # shutdown runtime if it was started
if implicit_runtime: if implicit_runtime:
msg += '=> Shutting down actor runtime <=\n' log.info("Shutting down actor tree")
log.info(msg)

View File

@ -999,8 +999,6 @@ async def maybe_wait_for_debugger(
poll_delay: float = 0.1, poll_delay: float = 0.1,
child_in_debug: bool = False, child_in_debug: bool = False,
header_msg: str = '',
) -> None: ) -> None:
if ( if (
@ -1009,8 +1007,6 @@ async def maybe_wait_for_debugger(
): ):
return return
msg: str = header_msg
if ( if (
is_root_process() is_root_process()
): ):
@ -1020,13 +1016,13 @@ async def maybe_wait_for_debugger(
# will make the pdb repl unusable. # will make the pdb repl unusable.
# Instead try to wait for pdb to be released before # Instead try to wait for pdb to be released before
# tearing down. # tearing down.
sub_in_debug: tuple[str, str]|None = Lock.global_actor_in_debug sub_in_debug: tuple[str, str] | None = None
debug_complete: trio.Event|None = Lock.no_remote_has_tty
for istep in range(poll_steps):
if sub_in_debug := Lock.global_actor_in_debug: if sub_in_debug := Lock.global_actor_in_debug:
msg += ( log.pdb(
'Debug `Lock` in use by subactor\n' f'Lock in use by {sub_in_debug}'
f'|_{sub_in_debug}\n'
) )
# TODO: could this make things more deterministic? # TODO: could this make things more deterministic?
# wait to see if a sub-actor task will be # wait to see if a sub-actor task will be
@ -1034,45 +1030,34 @@ async def maybe_wait_for_debugger(
# tick? # tick?
# XXX => but it doesn't seem to work.. # XXX => but it doesn't seem to work..
# await trio.testing.wait_all_tasks_blocked(cushion=0) # await trio.testing.wait_all_tasks_blocked(cushion=0)
else:
log.pdb(
msg
+
'Root immediately acquired debug TTY LOCK'
)
return
for istep in range(poll_steps):
debug_complete: trio.Event|None = Lock.no_remote_has_tty
if ( if (
debug_complete debug_complete
and not debug_complete.is_set() and not debug_complete.is_set()
and sub_in_debug is not None and sub_in_debug is not None
): ):
log.pdb( log.pdb(
msg 'Root has errored but pdb is in use by child\n'
+ 'Waiting on tty lock to release..\n'
'Root is waiting on tty lock to release..\n' f'uid: {sub_in_debug}\n'
) )
await debug_complete.wait() await debug_complete.wait()
log.pdb( log.pdb(
f'Child subactor released debug lock:' f'Child subactor released debug lock!\n'
f'|_{sub_in_debug}\n' f'uid: {sub_in_debug}\n'
) )
if debug_complete.is_set():
break
# is no subactor locking debugger currently? # is no subactor locking debugger currently?
if ( elif (
sub_in_debug is None
and (
debug_complete is None debug_complete is None
or debug_complete.is_set() or sub_in_debug is None
)
): ):
log.pdb( log.pdb(
msg 'Root acquired debug TTY LOCK from child\n'
+ f'uid: {sub_in_debug}'
'Root acquired tty lock!'
) )
break break
@ -1088,14 +1073,8 @@ async def maybe_wait_for_debugger(
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await trio.sleep(poll_delay) await trio.sleep(poll_delay)
continue continue
# fallthrough on failure to acquire..
else: else:
raise RuntimeError( log.pdb('Root acquired debug TTY LOCK')
msg
+
'Root actor failed to acquire debug lock?'
)
# else: # else:
# # TODO: non-root call for #320? # # TODO: non-root call for #320?

View File

@ -43,24 +43,17 @@ IPC-compat cross-mem-boundary object pointer.
# - https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type # - https://github.com/msgpack/msgpack-python#packingunpacking-of-custom-data-type
from __future__ import annotations from __future__ import annotations
from inspect import ( from inspect import isfunction
isfunction,
ismethod,
)
from pkgutil import resolve_name from pkgutil import resolve_name
class NamespacePath(str): class NamespacePath(str):
''' '''
A serializeable `str`-subtype implementing a "namespace A serializeable description of a (function) Python object
pointer" to any Python object reference (like a function) location described by the target's module path and namespace
using the same format as the built-in `pkgutil.resolve_name()` key meant as a message-native "packet" to allows actors to
system. point-and-load objects by an absolute ``str`` (and thus
serializable) reference.
A value describes a target's module-path and namespace-key
separated by a ':' and thus can be easily used as
a IPC-message-native reference-type allowing memory isolated
actors to point-and-load objects via a minimal `str` value.
''' '''
_ref: object | type | None = None _ref: object | type | None = None
@ -88,22 +81,12 @@ class NamespacePath(str):
''' '''
if ( if (
isfunction(ref) isinstance(ref, object)
and not isfunction(ref)
): ):
name: str = getattr(ref, '__name__')
elif ismethod(ref):
# build out the path manually i guess..?
# TODO: better way?
name: str = '.'.join([
type(ref.__self__).__name__,
ref.__func__.__name__,
])
else: # object or other?
# isinstance(ref, object)
# and not isfunction(ref)
name: str = type(ref).__name__ name: str = type(ref).__name__
else:
name: str = getattr(ref, '__name__')
# fully qualified namespace path, tuple. # fully qualified namespace path, tuple.
fqnp: tuple[str, str] = ( fqnp: tuple[str, str] = (

View File

@ -35,24 +35,6 @@ from msgspec import (
structs, structs,
) )
# TODO: auto-gen type sig for input func both for
# type-msgs and logging of RPC tasks?
# taken and modified from:
# https://stackoverflow.com/a/57110117
# import inspect
# from typing import List
# def my_function(input_1: str, input_2: int) -> list[int]:
# pass
# def types_of(func):
# specs = inspect.getfullargspec(func)
# return_type = specs.annotations['return']
# input_types = [t.__name__ for s, t in specs.annotations.items() if s != 'return']
# return f'{func.__name__}({": ".join(input_types)}) -> {return_type}'
# types_of(my_function)
class DiffDump(UserList): class DiffDump(UserList):
''' '''
@ -179,7 +161,6 @@ class Struct(
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v) val_str: str = saferepr(v)
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
return ( return (