Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet 7e49ac678b WIP, "revertible" or "dynamic" multicast streams
TODO, write up the deats, prolly by distilling (todo) notes from
`tests/test_resource_cache.py::test_open_local_sub_to_stream` comments!
2025-07-15 22:15:19 -04:00
Tyler Goodlet 7a075494f1 Well then, I guess it just needed, a checkpoint XD
Here I was thinking the bcaster (usage) maybe required a rework but,
NOPE it's just bc a checkpoint was needed in the parent task owning the
`tn` which spawns `get_sub_and_pull()` tasks to ensure the bg allocated
`an`/portal is eventually cancel-called..

Ah well, at least i started a patch for `MsgStream.subscribe()` to make
it multicast revertible.. XD

Anyway, I tossed in some checks & notes related to all that unnecessary
effort since I do think i'll move forward implementing it:
- for the `cache_hit` case always verify that the `bcast` clone is
  unregistered from the common state subs after
  `.subscribe().__aexit__()`.
- do a light check that the implicit `MsgStream._broadcaster` is always
  the only bcrx instance left-leaked into that state.. that is until
  i get the proper de-allocation/reversion from multicast -> unicast
  working.
- put in mega detailed note about the required parent-task checkpoint.
2025-07-15 21:59:42 -04:00
Tyler Goodlet c3aa29e7fa TOSQASH 285ebba: woops still use `bcrx._state` for now.. 2025-07-15 19:59:03 -04:00
Tyler Goodlet 9f6acf9ac3 Switch nursery to `CancelScope`-status properties
Been meaning to do this forever and a recent test hang finally drove me
to it Bp

Like it sounds, adopt the "cancel-status" properties on `ActorNursery`
use already on our `Context` and derived from `trio.CancelScope`:

- add new private `._cancel_called` (set in the head of `.cancel()`)
  & `._cancelled_caught` (set in the tail) instance vars with matching
  read-only `@properties`.

- drop the instance-var and instead delegate a `.cancelled: bool`
  property to `._cancel_called` and add a usage deprecation warning
  (since removing it breaks a buncha tests).
2025-07-15 19:29:38 -04:00
Tyler Goodlet 2a69d179e6 Add `Channel.closed/.cancel_called`
I.e. the public properties for the private instance var equivs; improves
expected introspection usage.
2025-07-15 17:32:42 -04:00
Tyler Goodlet c51a49b045 Set `Channel._cancel_called` via `chan` var
In `Portal.cancel_actor()` that is, at the least to make it easier to
ref search from an editor Bp
2025-07-15 17:31:08 -04:00
Tyler Goodlet 6627a3bfda Never shield-wait `ipc_server.wait_for_no_more_peers()`
As mentioned in prior testing commit, it can cause the worst kind of
hangs, the SIGINT ignoring kind.. Pretty sure there was never any reason
outside some esoteric multi-actor debugging case, and pretty sure that
already was solved?
2025-07-15 17:28:48 -04:00
Tyler Goodlet 285ebba4b1 Tool-up `test_resource_cache.test_open_local_sub_to_stream`
Since I recently discovered a very subtle race-case that can sometimes
cause the suite to hang, seemingly due to the `an: ActorNursery`
allocated *behind* the `.trionics.maybe_open_context()` usage; this can
result in never cancelling the 'streamer' subactor despite the `main()`
timeout-guard?

This led me to dig in and find that the underlying issue was 2-fold,

- our `BroadcastReceiver` termination-mgmt semantics in
  `MsgStream.subscribe()` can result in the first subscribing task to
  always keep the `MsgStream._broadcaster` instance allocated; it's
  never `.aclose()`ed, which makes it tough to determine (and thus
  trace) when all subscriber-tasks are actually complete and
  exited-from-`.subscribe()`..

- i was shield waiting `.ipc._server.Server.wait_for_no_more_peers()` in
  `._runtime.async_main()`'s shutdown sequence which would then compound
  the issue resulting in a SIGINT-shielded hang.. the worst kind XD

Actual changes here are just styling, printing, and some mucking with
passing the `an`-ref up to the parent task in the root-actor where i was
doing a conditional `ActorNursery.cancel()` to mk sure that was actually
the problem. Presuming this is fixed the `.pause()` i left unmasked
should never hit.
2025-07-15 16:48:46 -04:00
Tyler Goodlet 20628cc0b8 Go multi-line-style tuples in `maybe_enter_context()`
Allows for an inline comment of the first "cache hit" bool element.
2025-07-15 16:12:08 -04:00
Tyler Goodlet 2536c5b3d2 More prep-to-reduce the `Actor` method-iface
- drop the (never/un)used `.get_chans()`.
- add #TODO for factoring many methods into a new `.rpc`-subsys/pkg
  primitive, like an `RPCMngr/Server` type eventually.
- add todo to maybe mv `.get_parent()` elsewhere?
- move masked `._hard_mofo_kill()` to bottom.
2025-07-15 07:21:11 -04:00
10 changed files with 441 additions and 160 deletions

View File

@ -72,11 +72,13 @@ def test_resource_only_entered_once(key_on):
with trio.move_on_after(0.5): with trio.move_on_after(0.5):
async with ( async with (
tractor.open_root_actor(), tractor.open_root_actor(),
trio.open_nursery() as n, trio.open_nursery() as tn,
): ):
for i in range(10): for i in range(10):
n.start_soon(enter_cached_mngr, f'task_{i}') tn.start_soon(
enter_cached_mngr,
f'task_{i}',
)
await trio.sleep(0.001) await trio.sleep(0.001)
trio.run(main) trio.run(main)
@ -98,23 +100,34 @@ async def streamer(
@acm @acm
async def open_stream() -> Awaitable[tractor.MsgStream]: async def open_stream() -> Awaitable[
tuple[
tractor.ActorNursery,
tractor.MsgStream,
]
]:
try: try:
async with tractor.open_nursery() as an: async with tractor.open_nursery() as an:
portal = await an.start_actor( portal = await an.start_actor(
'streamer', 'streamer',
enable_modules=[__name__], enable_modules=[__name__],
) )
async with ( try:
portal.open_context(streamer) as (ctx, first), async with (
ctx.open_stream() as stream, portal.open_context(streamer) as (ctx, first),
): ctx.open_stream() as stream,
yield stream ):
print('Entered open_stream() caller')
yield an, stream
print('Exited open_stream() caller')
print('Cancelling streamer') finally:
await portal.cancel_actor() print(
print('Cancelled streamer') 'Cancelling streamer with,\n'
'=> `Portal.cancel_actor()`'
)
await portal.cancel_actor()
print('Cancelled streamer')
except Exception as err: except Exception as err:
print( print(
@ -130,8 +143,12 @@ async def maybe_open_stream(taskname: str):
async with tractor.trionics.maybe_open_context( async with tractor.trionics.maybe_open_context(
# NOTE: all secondary tasks should cache hit on the same key # NOTE: all secondary tasks should cache hit on the same key
acm_func=open_stream, acm_func=open_stream,
) as (cache_hit, stream): ) as (
cache_hit,
(an, stream)
):
# when the actor + portal + ctx + stream has already been
# allocated we want to just bcast to this task.
if cache_hit: if cache_hit:
print(f'{taskname} loaded from cache') print(f'{taskname} loaded from cache')
@ -139,10 +156,43 @@ async def maybe_open_stream(taskname: str):
# if this feed is already allocated by the first # if this feed is already allocated by the first
# task that entereed # task that entereed
async with stream.subscribe() as bstream: async with stream.subscribe() as bstream:
yield bstream yield an, bstream
print(
f'cached task exited\n'
f')>\n'
f' |_{taskname}\n'
)
# we should always unreg the "cloned" bcrc for this
# consumer-task
assert id(bstream) not in bstream._state.subs
else: else:
# yield the actual stream # yield the actual stream
yield stream try:
yield an, stream
finally:
print(
f'NON-cached task exited\n'
f')>\n'
f' |_{taskname}\n'
)
first_bstream = stream._broadcaster
bcrx_state = first_bstream._state
subs: dict[int, int] = bcrx_state.subs
if len(subs) == 1:
assert id(first_bstream) in subs
# ^^TODO! the bcrx should always de-allocate all subs,
# including the implicit first one allocated on entry
# by the first subscribing peer task, no?
#
# -[ ] adjust `MsgStream.subscribe()` to do this mgmt!
# |_ allows reverting `MsgStream.receive()` to the
# non-bcaster method.
# |_ we can decide whether to reset `._broadcaster`?
#
# await tractor.pause(shield=True)
def test_open_local_sub_to_stream( def test_open_local_sub_to_stream(
@ -159,16 +209,24 @@ def test_open_local_sub_to_stream(
if debug_mode: if debug_mode:
timeout = 999 timeout = 999
print(f'IN debug_mode, setting large timeout={timeout!r}..')
async def main(): async def main():
full = list(range(1000)) full = list(range(1000))
an: tractor.ActorNursery|None = None
num_tasks: int = 10
async def get_sub_and_pull(taskname: str): async def get_sub_and_pull(taskname: str):
nonlocal an
stream: tractor.MsgStream stream: tractor.MsgStream
async with ( async with (
maybe_open_stream(taskname) as stream, maybe_open_stream(taskname) as (
an,
stream,
),
): ):
if '0' in taskname: if '0' in taskname:
assert isinstance(stream, tractor.MsgStream) assert isinstance(stream, tractor.MsgStream)
@ -180,34 +238,70 @@ def test_open_local_sub_to_stream(
first = await stream.receive() first = await stream.receive()
print(f'{taskname} started with value {first}') print(f'{taskname} started with value {first}')
seq = [] seq: list[int] = []
async for msg in stream: async for msg in stream:
seq.append(msg) seq.append(msg)
assert set(seq).issubset(set(full)) assert set(seq).issubset(set(full))
# end of @acm block
print(f'{taskname} finished') print(f'{taskname} finished')
root: tractor.Actor
with trio.fail_after(timeout) as cs: with trio.fail_after(timeout) as cs:
# TODO: turns out this isn't multi-task entrant XD # TODO: turns out this isn't multi-task entrant XD
# We probably need an indepotent entry semantic? # We probably need an indepotent entry semantic?
async with tractor.open_root_actor( async with tractor.open_root_actor(
debug_mode=debug_mode, debug_mode=debug_mode,
): # maybe_enable_greenback=True,
#
# ^TODO? doesn't seem to mk breakpoint() usage work
# bc each bg task needs to open a portal??
# - [ ] we should consider making this part of
# our taskman defaults?
# |_see https://github.com/goodboy/tractor/pull/363
#
) as root:
assert root.is_registrar
async with ( async with (
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
for i in range(10): for i in range(num_tasks):
tn.start_soon( tn.start_soon(
get_sub_and_pull, get_sub_and_pull,
f'task_{i}', f'task_{i}',
) )
await trio.sleep(0.001) await trio.sleep(0.001)
print('all consumer tasks finished') print('all consumer tasks finished!')
# ?XXX, ensure actor-nursery is shutdown or we might
# hang here due to a minor task deadlock/race-condition?
#
# - seems that all we need is a checkpoint to ensure
# the last suspended task, which is inside
# `.maybe_open_context()`, can do the
# `Portal.cancel_actor()` call?
#
# - if that bg task isn't resumed, then this blocks
# timeout might hit before that?
#
if root.ipc_server.has_peers():
await trio.lowlevel.checkpoint()
# alt approach, cancel the entire `an`
# await tractor.pause()
# await an.cancel()
# end of runtime scope
print('root actor terminated.')
if cs.cancelled_caught: if cs.cancelled_caught:
pytest.fail( pytest.fail(
'Should NOT time out in `open_root_actor()` ?' 'Should NOT time out in `open_root_actor()` ?'
) )
print('exiting main.')
trio.run(main) trio.run(main)

View File

@ -67,7 +67,6 @@ async def ensure_sequence(
@acm @acm
async def open_sequence_streamer( async def open_sequence_streamer(
sequence: list[int], sequence: list[int],
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
start_method: str, start_method: str,
@ -96,39 +95,43 @@ async def open_sequence_streamer(
def test_stream_fan_out_to_local_subscriptions( def test_stream_fan_out_to_local_subscriptions(
reg_addr, debug_mode: bool,
reg_addr: tuple,
start_method, start_method,
): ):
sequence = list(range(1000)) sequence = list(range(1000))
async def main(): async def main():
with trio.fail_after(9):
async with open_sequence_streamer(
sequence,
reg_addr,
start_method,
) as stream:
async with open_sequence_streamer( async with (
sequence, collapse_eg(),
reg_addr, trio.open_nursery() as tn,
start_method, ):
) as stream: for i in range(10):
tn.start_soon(
ensure_sequence,
stream,
sequence.copy(),
name=f'consumer_{i}',
)
async with trio.open_nursery() as n: await stream.send(tuple(sequence))
for i in range(10):
n.start_soon(
ensure_sequence,
stream,
sequence.copy(),
name=f'consumer_{i}',
)
await stream.send(tuple(sequence)) async for value in stream:
print(f'source stream rx: {value}')
assert value == sequence[0]
sequence.remove(value)
async for value in stream: if not sequence:
print(f'source stream rx: {value}') # fully consumed
assert value == sequence[0] break
sequence.remove(value)
if not sequence:
# fully consumed
break
trio.run(main) trio.run(main)
@ -151,67 +154,69 @@ def test_consumer_and_parent_maybe_lag(
sequence = list(range(300)) sequence = list(range(300))
parent_delay, sub_delay = task_delays parent_delay, sub_delay = task_delays
async with open_sequence_streamer( # TODO, maybe mak a cm-deco for main()s?
sequence, with trio.fail_after(3):
reg_addr, async with open_sequence_streamer(
start_method, sequence,
) as stream: reg_addr,
start_method,
) as stream:
try: try:
async with ( async with (
collapse_eg(), collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
tn.start_soon( tn.start_soon(
ensure_sequence, ensure_sequence,
stream, stream,
sequence.copy(), sequence.copy(),
sub_delay, sub_delay,
name='consumer_task', name='consumer_task',
) )
await stream.send(tuple(sequence)) await stream.send(tuple(sequence))
# async for value in stream: # async for value in stream:
lagged = False lagged = False
lag_count = 0 lag_count = 0
while True: while True:
try: try:
value = await stream.receive() value = await stream.receive()
print(f'source stream rx: {value}') print(f'source stream rx: {value}')
if lagged: if lagged:
# re set the sequence starting at our last # re set the sequence starting at our last
# value # value
sequence = sequence[sequence.index(value) + 1:] sequence = sequence[sequence.index(value) + 1:]
else: else:
assert value == sequence[0] assert value == sequence[0]
sequence.remove(value) sequence.remove(value)
lagged = False lagged = False
except Lagged: except Lagged:
lagged = True lagged = True
print(f'source stream lagged after {value}') print(f'source stream lagged after {value}')
lag_count += 1 lag_count += 1
continue continue
# lag the parent # lag the parent
await trio.sleep(parent_delay) await trio.sleep(parent_delay)
if not sequence: if not sequence:
# fully consumed # fully consumed
break break
print(f'parent + source stream lagged: {lag_count}') print(f'parent + source stream lagged: {lag_count}')
if parent_delay > sub_delay: if parent_delay > sub_delay:
assert lag_count > 0 assert lag_count > 0
except Lagged: except Lagged:
# child was lagged # child was lagged
assert parent_delay < sub_delay assert parent_delay < sub_delay
trio.run(main) trio.run(main)
@ -285,7 +290,11 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
def test_subscribe_errors_after_close(): def test_subscribe_errors_after_close():
'''
Verify after calling `BroadcastReceiver.aclose()` you can't
"re-open" it via `.subscribe()`.
'''
async def main(): async def main():
size = 1 size = 1
@ -293,6 +302,8 @@ def test_subscribe_errors_after_close():
async with broadcast_receiver(rx, size) as brx: async with broadcast_receiver(rx, size) as brx:
pass pass
assert brx.key not in brx._state.subs
try: try:
# open and close # open and close
async with brx.subscribe(): async with brx.subscribe():
@ -302,7 +313,7 @@ def test_subscribe_errors_after_close():
assert brx.key not in brx._state.subs assert brx.key not in brx._state.subs
else: else:
assert 0 pytest.fail('brx.subscribe() never raised!?')
trio.run(main) trio.run(main)

View File

@ -300,7 +300,7 @@ class Portal:
) )
# XXX the one spot we set it? # XXX the one spot we set it?
self.channel._cancel_called: bool = True chan._cancel_called: bool = True
try: try:
# send cancel cmd - might not get response # send cancel cmd - might not get response
# XXX: sure would be nice to make this work with # XXX: sure would be nice to make this work with

View File

@ -552,6 +552,14 @@ class Actor:
) )
raise raise
# ?TODO, factor this meth-iface into a new `.rpc` subsys primitive?
# - _get_rpc_func(),
# - _deliver_ctx_payload(),
# - get_context(),
# - start_remote_task(),
# - cancel_rpc_tasks(),
# - _cancel_task(),
#
def _get_rpc_func(self, ns, funcname): def _get_rpc_func(self, ns, funcname):
''' '''
Try to lookup and return a target RPC func from the Try to lookup and return a target RPC func from the
@ -1119,14 +1127,6 @@ class Actor:
self._cancel_complete.set() self._cancel_complete.set()
return True return True
# XXX: hard kill logic if needed?
# def _hard_mofo_kill(self):
# # If we're the root actor or zombied kill everything
# if self._parent_chan is None: # TODO: more robust check
# root = trio.lowlevel.current_root_task()
# for n in root.child_nurseries:
# n.cancel_scope.cancel()
async def _cancel_task( async def _cancel_task(
self, self,
cid: str, cid: str,
@ -1361,25 +1361,13 @@ class Actor:
''' '''
return self.accept_addrs[0] return self.accept_addrs[0]
def get_parent(self) -> Portal: # TODO, this should delegate ONLY to the
''' # `._spawn_spec._runtime_vars: dict` / `._state` APIs?
Return a `Portal` to our parent. #
# XXX, AH RIGHT that's why..
''' # it's bc we pass this as a CLI flag to the child.py precisely
assert self._parent_chan, "No parent channel for this actor?" # bc we need the bootstrapping pre `async_main()`.. but maybe
return Portal(self._parent_chan) # keep this as an impl deat and not part of the pub iface impl?
def get_chans(
self,
uid: tuple[str, str],
) -> list[Channel]:
'''
Return all IPC channels to the actor with provided `uid`.
'''
return self._ipc_server._peers[uid]
def is_infected_aio(self) -> bool: def is_infected_aio(self) -> bool:
''' '''
If `True`, this actor is running `trio` in guest mode on If `True`, this actor is running `trio` in guest mode on
@ -1390,6 +1378,23 @@ class Actor:
''' '''
return self._infected_aio return self._infected_aio
# ?TODO, is this the right type for this method?
def get_parent(self) -> Portal:
'''
Return a `Portal` to our parent.
'''
assert self._parent_chan, "No parent channel for this actor?"
return Portal(self._parent_chan)
# XXX: hard kill logic if needed?
# def _hard_mofo_kill(self):
# # If we're the root actor or zombied kill everything
# if self._parent_chan is None: # TODO: more robust check
# root = trio.lowlevel.current_root_task()
# for n in root.child_nurseries:
# n.cancel_scope.cancel()
async def async_main( async def async_main(
actor: Actor, actor: Actor,
@ -1755,9 +1760,7 @@ async def async_main(
f' {pformat(ipc_server._peers)}' f' {pformat(ipc_server._peers)}'
) )
log.runtime(teardown_report) log.runtime(teardown_report)
await ipc_server.wait_for_no_more_peers( await ipc_server.wait_for_no_more_peers()
shield=True,
)
teardown_report += ( teardown_report += (
'-]> all peer channels are complete.\n' '-]> all peer channels are complete.\n'

View File

@ -102,6 +102,9 @@ class MsgStream(trio.abc.Channel):
self._eoc: bool|trio.EndOfChannel = False self._eoc: bool|trio.EndOfChannel = False
self._closed: bool|trio.ClosedResourceError = False self._closed: bool|trio.ClosedResourceError = False
def is_eoc(self) -> bool|trio.EndOfChannel:
return self._eoc
@property @property
def ctx(self) -> Context: def ctx(self) -> Context:
''' '''
@ -188,7 +191,14 @@ class MsgStream(trio.abc.Channel):
return pld return pld
async def receive( # XXX NOTE, this is left private because in `.subscribe()` usage
# we rebind the public `.recieve()` to a `BroadcastReceiver` but
# on `.subscribe().__aexit__()`, for the first task which enters,
# we want to revert to this msg-stream-instance's method since
# mult-task-tracking provided by the b-caster is then no longer
# necessary.
#
async def _receive(
self, self,
hide_tb: bool = False, hide_tb: bool = False,
): ):
@ -313,6 +323,8 @@ class MsgStream(trio.abc.Channel):
raise src_err raise src_err
receive = _receive
async def aclose(self) -> list[Exception|dict]: async def aclose(self) -> list[Exception|dict]:
''' '''
Cancel associated remote actor task and local memory channel on Cancel associated remote actor task and local memory channel on
@ -528,10 +540,15 @@ class MsgStream(trio.abc.Channel):
receiver wrapper. receiver wrapper.
''' '''
# NOTE: This operation is indempotent and non-reversible, so be # XXX NOTE, This operation was originally implemented as
# sure you can deal with any (theoretical) overhead of the the # indempotent and non-reversible, so you had to be **VERY**
# allocated ``BroadcastReceiver`` before calling this method for # aware of any (theoretical) overhead from the allocated
# the first time. # `BroadcastReceiver.receive()`.
#
# HOWEVER, NOw we do revert and de-alloc the ._broadcaster
# when the final caller (task) exits.
#
bcast: BroadcastReceiver|None = None
if self._broadcaster is None: if self._broadcaster is None:
bcast = self._broadcaster = broadcast_receiver( bcast = self._broadcaster = broadcast_receiver(
@ -541,29 +558,60 @@ class MsgStream(trio.abc.Channel):
# TODO: can remove this kwarg right since # TODO: can remove this kwarg right since
# by default behaviour is to do this anyway? # by default behaviour is to do this anyway?
receive_afunc=self.receive, receive_afunc=self._receive,
) )
# NOTE: we override the original stream instance's receive # XXX NOTE, we override the original stream instance's
# method to now delegate to the broadcaster's ``.receive()`` # receive method to instead delegate to the broadcaster's
# such that new subscribers will be copied received values # `.receive()` such that new subscribers (multiple
# and this stream doesn't have to expect it's original # `trio.Task`s) will be copied received values and the
# consumer(s) to get a new broadcast rx handle. # *first* task to enter here doesn't have to expect its original consumer(s)
# to get a new broadcast rx handle; everything happens
# underneath this iface seemlessly.
#
self.receive = bcast.receive # type: ignore self.receive = bcast.receive # type: ignore
# seems there's no graceful way to type this with ``mypy``? # seems there's no graceful way to type this with `mypy`?
# https://github.com/python/mypy/issues/708 # https://github.com/python/mypy/issues/708
async with self._broadcaster.subscribe() as bstream: # TODO, prevent re-entrant sub scope?
assert bstream.key != self._broadcaster.key # if self._broadcaster._closed:
assert bstream._recv == self._broadcaster._recv # raise RuntimeError(
# 'This stream
# NOTE: we patch on a `.send()` to the bcaster so that the try:
# caller can still conduct 2-way streaming using this aenter = self._broadcaster.subscribe()
# ``bstream`` handle transparently as though it was the msg async with aenter as bstream:
# stream instance. # ?TODO, move into test suite?
bstream.send = self.send # type: ignore assert bstream.key != self._broadcaster.key
assert bstream._recv == self._broadcaster._recv
yield bstream # NOTE: we patch on a `.send()` to the bcaster so that the
# caller can still conduct 2-way streaming using this
# ``bstream`` handle transparently as though it was the msg
# stream instance.
bstream.send = self.send # type: ignore
# newly-allocated instance
yield bstream
finally:
# XXX, the first-enterer task should, like all other
# subs, close the first allocated bcrx, which adjusts the
# common `bcrx.state`
with trio.CancelScope(shield=True):
if bcast is not None:
await bcast.aclose()
# XXX, when the bcrx.state reports there are no more subs
# we can revert to this obj's method, removing any
# delegation overhead!
if (
(orig_bcast := self._broadcaster)
and
not orig_bcast.state.subs
):
self.receive = self._receive
# self._broadcaster = None
async def send( async def send(
self, self,

View File

@ -117,7 +117,6 @@ class ActorNursery:
] ]
] = {} ] = {}
self.cancelled: bool = False
self._join_procs = trio.Event() self._join_procs = trio.Event()
self._at_least_one_child_in_debug: bool = False self._at_least_one_child_in_debug: bool = False
self.errors = errors self.errors = errors
@ -135,10 +134,53 @@ class ActorNursery:
# TODO: remove the `.run_in_actor()` API and thus this 2ndary # TODO: remove the `.run_in_actor()` API and thus this 2ndary
# nursery when that API get's moved outside this primitive! # nursery when that API get's moved outside this primitive!
self._ria_nursery = ria_nursery self._ria_nursery = ria_nursery
# TODO, factor this into a .hilevel api!
#
# portals spawned with ``run_in_actor()`` are # portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives # cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set() self._cancel_after_result_on_exit: set = set()
# trio.Nursery-like cancel (request) statuses
self._cancelled_caught: bool = False
self._cancel_called: bool = False
@property
def cancel_called(self) -> bool:
'''
Records whether cancellation has been requested for this
actor-nursery by a call to `.cancel()` either due to,
- an explicit call by some actor-local-task,
- an implicit call due to an error/cancel emited inside
the `tractor.open_nursery()` block.
'''
return self._cancel_called
@property
def cancelled_caught(self) -> bool:
'''
Set when this nursery was able to cance all spawned subactors
gracefully via an (implicit) call to `.cancel()`.
'''
return self._cancelled_caught
# TODO! remove internal/test-suite usage!
@property
def cancelled(self) -> bool:
warnings.warn(
"`ActorNursery.cancelled` is now deprecated, use "
" `.cancel_called` instead.",
DeprecationWarning,
stacklevel=2,
)
return (
self._cancel_called
# and
# self._cancelled_caught
)
async def start_actor( async def start_actor(
self, self,
name: str, name: str,
@ -316,7 +358,7 @@ class ActorNursery:
''' '''
__runtimeframe__: int = 1 # noqa __runtimeframe__: int = 1 # noqa
self.cancelled = True self._cancel_called = True
# TODO: impl a repr for spawn more compact # TODO: impl a repr for spawn more compact
# then `._children`.. # then `._children`..
@ -394,6 +436,8 @@ class ActorNursery:
) in children.values(): ) in children.values():
log.warning(f"Hard killing process {proc}") log.warning(f"Hard killing process {proc}")
proc.terminate() proc.terminate()
else:
self._cancelled_caught
# mark ourselves as having (tried to have) cancelled all subactors # mark ourselves as having (tried to have) cancelled all subactors
self._join_procs.set() self._join_procs.set()

View File

@ -101,11 +101,27 @@ class Channel:
# ^XXX! ONLY set if a remote actor sends an `Error`-msg # ^XXX! ONLY set if a remote actor sends an `Error`-msg
self._closed: bool = False self._closed: bool = False
# flag set by ``Portal.cancel_actor()`` indicating remote # flag set by `Portal.cancel_actor()` indicating remote
# (possibly peer) cancellation of the far end actor # (possibly peer) cancellation of the far end actor runtime.
# runtime.
self._cancel_called: bool = False self._cancel_called: bool = False
@property
def closed(self) -> bool:
'''
Was `.aclose()` successfully called?
'''
return self._closed
@property
def cancel_called(self) -> bool:
'''
Set when `Portal.cancel_actor()` is called on a portal which
wraps this IPC channel.
'''
return self._cancel_called
@property @property
def uid(self) -> tuple[str, str]: def uid(self) -> tuple[str, str]:
''' '''

View File

@ -814,10 +814,14 @@ class Server(Struct):
async def wait_for_no_more_peers( async def wait_for_no_more_peers(
self, self,
shield: bool = False, # XXX, should this even be allowed?
# -> i've seen it cause hangs on teardown
# in `test_resource_cache.py`
# _shield: bool = False,
) -> None: ) -> None:
with trio.CancelScope(shield=shield): await self._no_more_peers.wait()
await self._no_more_peers.wait() # with trio.CancelScope(shield=_shield):
# await self._no_more_peers.wait()
async def wait_for_peer( async def wait_for_peer(
self, self,

View File

@ -100,6 +100,32 @@ class Lagged(trio.TooSlowError):
''' '''
def wrap_rx_for_eoc(
rx: AsyncReceiver,
) -> AsyncReceiver:
match rx:
case trio.MemoryReceiveChannel():
# XXX, taken verbatim from .receive_nowait()
def is_eoc() -> bool:
if not rx._state.open_send_channels:
return trio.EndOfChannel
return False
rx.is_eoc = is_eoc
case _:
# XXX, ensure we define a private field!
# case tractor.MsgStream:
assert (
getattr(rx, '_eoc', False) is not None
)
return rx
class BroadcastState(Struct): class BroadcastState(Struct):
''' '''
Common state to all receivers of a broadcast. Common state to all receivers of a broadcast.
@ -186,11 +212,23 @@ class BroadcastReceiver(ReceiveChannel):
state.subs[self.key] = -1 state.subs[self.key] = -1
# underlying for this receiver # underlying for this receiver
self._rx = rx_chan self._rx = wrap_rx_for_eoc(rx_chan)
self._recv = receive_afunc or rx_chan.receive self._recv = receive_afunc or rx_chan.receive
self._closed: bool = False self._closed: bool = False
self._raise_on_lag = raise_on_lag self._raise_on_lag = raise_on_lag
@property
def state(self) -> BroadcastState:
'''
Read-only access to this receivers internal `._state`
instance ref.
If you just want to read the high-level state metrics,
use `.state.statistics()`.
'''
return self._state
def receive_nowait( def receive_nowait(
self, self,
_key: int | None = None, _key: int | None = None,
@ -215,7 +253,23 @@ class BroadcastReceiver(ReceiveChannel):
try: try:
seq = state.subs[key] seq = state.subs[key]
except KeyError: except KeyError:
# from tractor import pause_from_sync
# pause_from_sync(shield=True)
if (
(rx_eoc := self._rx.is_eoc())
or
self.state.eoc
):
raise trio.EndOfChannel(
'Broadcast-Rx underlying already ended!'
) from rx_eoc
if self._closed: if self._closed:
# if (rx_eoc := self._rx._eoc):
# raise trio.EndOfChannel(
# 'Broadcast-Rx underlying already ended!'
# ) from rx_eoc
raise trio.ClosedResourceError raise trio.ClosedResourceError
raise RuntimeError( raise RuntimeError(
@ -453,8 +507,9 @@ class BroadcastReceiver(ReceiveChannel):
self._closed = True self._closed = True
# NOTE, this can we use as an `@acm` since `BroadcastReceiver`
# derives from `ReceiveChannel`.
def broadcast_receiver( def broadcast_receiver(
recv_chan: AsyncReceiver, recv_chan: AsyncReceiver,
max_buffer_size: int, max_buffer_size: int,
receive_afunc: Callable[[], Awaitable[Any]]|None = None, receive_afunc: Callable[[], Awaitable[Any]]|None = None,

View File

@ -289,7 +289,10 @@ async def maybe_open_context(
) )
_Cache.users += 1 _Cache.users += 1
lock.release() lock.release()
yield False, yielded yield (
False, # cache_hit = "no"
yielded,
)
else: else:
_Cache.users += 1 _Cache.users += 1
@ -303,7 +306,10 @@ async def maybe_open_context(
# f'{ctx_key!r} -> {yielded!r}\n' # f'{ctx_key!r} -> {yielded!r}\n'
) )
lock.release() lock.release()
yield True, yielded yield (
True, # cache_hit = "yes"
yielded,
)
finally: finally:
_Cache.users -= 1 _Cache.users -= 1