Compare commits
10 Commits
d4ca1a15a5
...
7e49ac678b
Author | SHA1 | Date |
---|---|---|
|
7e49ac678b | |
|
7a075494f1 | |
|
c3aa29e7fa | |
|
9f6acf9ac3 | |
|
2a69d179e6 | |
|
c51a49b045 | |
|
6627a3bfda | |
|
285ebba4b1 | |
|
20628cc0b8 | |
|
2536c5b3d2 |
|
@ -72,11 +72,13 @@ def test_resource_only_entered_once(key_on):
|
|||
with trio.move_on_after(0.5):
|
||||
async with (
|
||||
tractor.open_root_actor(),
|
||||
trio.open_nursery() as n,
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
|
||||
for i in range(10):
|
||||
n.start_soon(enter_cached_mngr, f'task_{i}')
|
||||
tn.start_soon(
|
||||
enter_cached_mngr,
|
||||
f'task_{i}',
|
||||
)
|
||||
await trio.sleep(0.001)
|
||||
|
||||
trio.run(main)
|
||||
|
@ -98,23 +100,34 @@ async def streamer(
|
|||
|
||||
|
||||
@acm
|
||||
async def open_stream() -> Awaitable[tractor.MsgStream]:
|
||||
|
||||
async def open_stream() -> Awaitable[
|
||||
tuple[
|
||||
tractor.ActorNursery,
|
||||
tractor.MsgStream,
|
||||
]
|
||||
]:
|
||||
try:
|
||||
async with tractor.open_nursery() as an:
|
||||
portal = await an.start_actor(
|
||||
'streamer',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
async with (
|
||||
portal.open_context(streamer) as (ctx, first),
|
||||
ctx.open_stream() as stream,
|
||||
):
|
||||
yield stream
|
||||
try:
|
||||
async with (
|
||||
portal.open_context(streamer) as (ctx, first),
|
||||
ctx.open_stream() as stream,
|
||||
):
|
||||
print('Entered open_stream() caller')
|
||||
yield an, stream
|
||||
print('Exited open_stream() caller')
|
||||
|
||||
print('Cancelling streamer')
|
||||
await portal.cancel_actor()
|
||||
print('Cancelled streamer')
|
||||
finally:
|
||||
print(
|
||||
'Cancelling streamer with,\n'
|
||||
'=> `Portal.cancel_actor()`'
|
||||
)
|
||||
await portal.cancel_actor()
|
||||
print('Cancelled streamer')
|
||||
|
||||
except Exception as err:
|
||||
print(
|
||||
|
@ -130,8 +143,12 @@ async def maybe_open_stream(taskname: str):
|
|||
async with tractor.trionics.maybe_open_context(
|
||||
# NOTE: all secondary tasks should cache hit on the same key
|
||||
acm_func=open_stream,
|
||||
) as (cache_hit, stream):
|
||||
|
||||
) as (
|
||||
cache_hit,
|
||||
(an, stream)
|
||||
):
|
||||
# when the actor + portal + ctx + stream has already been
|
||||
# allocated we want to just bcast to this task.
|
||||
if cache_hit:
|
||||
print(f'{taskname} loaded from cache')
|
||||
|
||||
|
@ -139,10 +156,43 @@ async def maybe_open_stream(taskname: str):
|
|||
# if this feed is already allocated by the first
|
||||
# task that entereed
|
||||
async with stream.subscribe() as bstream:
|
||||
yield bstream
|
||||
yield an, bstream
|
||||
print(
|
||||
f'cached task exited\n'
|
||||
f')>\n'
|
||||
f' |_{taskname}\n'
|
||||
)
|
||||
|
||||
# we should always unreg the "cloned" bcrc for this
|
||||
# consumer-task
|
||||
assert id(bstream) not in bstream._state.subs
|
||||
|
||||
else:
|
||||
# yield the actual stream
|
||||
yield stream
|
||||
try:
|
||||
yield an, stream
|
||||
finally:
|
||||
print(
|
||||
f'NON-cached task exited\n'
|
||||
f')>\n'
|
||||
f' |_{taskname}\n'
|
||||
)
|
||||
|
||||
first_bstream = stream._broadcaster
|
||||
bcrx_state = first_bstream._state
|
||||
subs: dict[int, int] = bcrx_state.subs
|
||||
if len(subs) == 1:
|
||||
assert id(first_bstream) in subs
|
||||
# ^^TODO! the bcrx should always de-allocate all subs,
|
||||
# including the implicit first one allocated on entry
|
||||
# by the first subscribing peer task, no?
|
||||
#
|
||||
# -[ ] adjust `MsgStream.subscribe()` to do this mgmt!
|
||||
# |_ allows reverting `MsgStream.receive()` to the
|
||||
# non-bcaster method.
|
||||
# |_ we can decide whether to reset `._broadcaster`?
|
||||
#
|
||||
# await tractor.pause(shield=True)
|
||||
|
||||
|
||||
def test_open_local_sub_to_stream(
|
||||
|
@ -159,16 +209,24 @@ def test_open_local_sub_to_stream(
|
|||
|
||||
if debug_mode:
|
||||
timeout = 999
|
||||
print(f'IN debug_mode, setting large timeout={timeout!r}..')
|
||||
|
||||
async def main():
|
||||
|
||||
full = list(range(1000))
|
||||
an: tractor.ActorNursery|None = None
|
||||
num_tasks: int = 10
|
||||
|
||||
async def get_sub_and_pull(taskname: str):
|
||||
|
||||
nonlocal an
|
||||
|
||||
stream: tractor.MsgStream
|
||||
async with (
|
||||
maybe_open_stream(taskname) as stream,
|
||||
maybe_open_stream(taskname) as (
|
||||
an,
|
||||
stream,
|
||||
),
|
||||
):
|
||||
if '0' in taskname:
|
||||
assert isinstance(stream, tractor.MsgStream)
|
||||
|
@ -180,34 +238,70 @@ def test_open_local_sub_to_stream(
|
|||
|
||||
first = await stream.receive()
|
||||
print(f'{taskname} started with value {first}')
|
||||
seq = []
|
||||
seq: list[int] = []
|
||||
async for msg in stream:
|
||||
seq.append(msg)
|
||||
|
||||
assert set(seq).issubset(set(full))
|
||||
|
||||
# end of @acm block
|
||||
print(f'{taskname} finished')
|
||||
|
||||
root: tractor.Actor
|
||||
with trio.fail_after(timeout) as cs:
|
||||
# TODO: turns out this isn't multi-task entrant XD
|
||||
# We probably need an indepotent entry semantic?
|
||||
async with tractor.open_root_actor(
|
||||
debug_mode=debug_mode,
|
||||
):
|
||||
# maybe_enable_greenback=True,
|
||||
#
|
||||
# ^TODO? doesn't seem to mk breakpoint() usage work
|
||||
# bc each bg task needs to open a portal??
|
||||
# - [ ] we should consider making this part of
|
||||
# our taskman defaults?
|
||||
# |_see https://github.com/goodboy/tractor/pull/363
|
||||
#
|
||||
) as root:
|
||||
assert root.is_registrar
|
||||
|
||||
async with (
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
for i in range(10):
|
||||
for i in range(num_tasks):
|
||||
tn.start_soon(
|
||||
get_sub_and_pull,
|
||||
f'task_{i}',
|
||||
)
|
||||
await trio.sleep(0.001)
|
||||
|
||||
print('all consumer tasks finished')
|
||||
print('all consumer tasks finished!')
|
||||
|
||||
# ?XXX, ensure actor-nursery is shutdown or we might
|
||||
# hang here due to a minor task deadlock/race-condition?
|
||||
#
|
||||
# - seems that all we need is a checkpoint to ensure
|
||||
# the last suspended task, which is inside
|
||||
# `.maybe_open_context()`, can do the
|
||||
# `Portal.cancel_actor()` call?
|
||||
#
|
||||
# - if that bg task isn't resumed, then this blocks
|
||||
# timeout might hit before that?
|
||||
#
|
||||
if root.ipc_server.has_peers():
|
||||
await trio.lowlevel.checkpoint()
|
||||
|
||||
# alt approach, cancel the entire `an`
|
||||
# await tractor.pause()
|
||||
# await an.cancel()
|
||||
|
||||
# end of runtime scope
|
||||
print('root actor terminated.')
|
||||
|
||||
if cs.cancelled_caught:
|
||||
pytest.fail(
|
||||
'Should NOT time out in `open_root_actor()` ?'
|
||||
)
|
||||
|
||||
print('exiting main.')
|
||||
|
||||
trio.run(main)
|
||||
|
|
|
@ -67,7 +67,6 @@ async def ensure_sequence(
|
|||
|
||||
@acm
|
||||
async def open_sequence_streamer(
|
||||
|
||||
sequence: list[int],
|
||||
reg_addr: tuple[str, int],
|
||||
start_method: str,
|
||||
|
@ -96,39 +95,43 @@ async def open_sequence_streamer(
|
|||
|
||||
|
||||
def test_stream_fan_out_to_local_subscriptions(
|
||||
reg_addr,
|
||||
debug_mode: bool,
|
||||
reg_addr: tuple,
|
||||
start_method,
|
||||
):
|
||||
|
||||
sequence = list(range(1000))
|
||||
|
||||
async def main():
|
||||
with trio.fail_after(9):
|
||||
async with open_sequence_streamer(
|
||||
sequence,
|
||||
reg_addr,
|
||||
start_method,
|
||||
) as stream:
|
||||
|
||||
async with open_sequence_streamer(
|
||||
sequence,
|
||||
reg_addr,
|
||||
start_method,
|
||||
) as stream:
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
for i in range(10):
|
||||
tn.start_soon(
|
||||
ensure_sequence,
|
||||
stream,
|
||||
sequence.copy(),
|
||||
name=f'consumer_{i}',
|
||||
)
|
||||
|
||||
async with trio.open_nursery() as n:
|
||||
for i in range(10):
|
||||
n.start_soon(
|
||||
ensure_sequence,
|
||||
stream,
|
||||
sequence.copy(),
|
||||
name=f'consumer_{i}',
|
||||
)
|
||||
await stream.send(tuple(sequence))
|
||||
|
||||
await stream.send(tuple(sequence))
|
||||
async for value in stream:
|
||||
print(f'source stream rx: {value}')
|
||||
assert value == sequence[0]
|
||||
sequence.remove(value)
|
||||
|
||||
async for value in stream:
|
||||
print(f'source stream rx: {value}')
|
||||
assert value == sequence[0]
|
||||
sequence.remove(value)
|
||||
|
||||
if not sequence:
|
||||
# fully consumed
|
||||
break
|
||||
if not sequence:
|
||||
# fully consumed
|
||||
break
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
@ -151,67 +154,69 @@ def test_consumer_and_parent_maybe_lag(
|
|||
sequence = list(range(300))
|
||||
parent_delay, sub_delay = task_delays
|
||||
|
||||
async with open_sequence_streamer(
|
||||
sequence,
|
||||
reg_addr,
|
||||
start_method,
|
||||
) as stream:
|
||||
# TODO, maybe mak a cm-deco for main()s?
|
||||
with trio.fail_after(3):
|
||||
async with open_sequence_streamer(
|
||||
sequence,
|
||||
reg_addr,
|
||||
start_method,
|
||||
) as stream:
|
||||
|
||||
try:
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
try:
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
|
||||
tn.start_soon(
|
||||
ensure_sequence,
|
||||
stream,
|
||||
sequence.copy(),
|
||||
sub_delay,
|
||||
name='consumer_task',
|
||||
)
|
||||
tn.start_soon(
|
||||
ensure_sequence,
|
||||
stream,
|
||||
sequence.copy(),
|
||||
sub_delay,
|
||||
name='consumer_task',
|
||||
)
|
||||
|
||||
await stream.send(tuple(sequence))
|
||||
await stream.send(tuple(sequence))
|
||||
|
||||
# async for value in stream:
|
||||
lagged = False
|
||||
lag_count = 0
|
||||
# async for value in stream:
|
||||
lagged = False
|
||||
lag_count = 0
|
||||
|
||||
while True:
|
||||
try:
|
||||
value = await stream.receive()
|
||||
print(f'source stream rx: {value}')
|
||||
while True:
|
||||
try:
|
||||
value = await stream.receive()
|
||||
print(f'source stream rx: {value}')
|
||||
|
||||
if lagged:
|
||||
# re set the sequence starting at our last
|
||||
# value
|
||||
sequence = sequence[sequence.index(value) + 1:]
|
||||
else:
|
||||
assert value == sequence[0]
|
||||
sequence.remove(value)
|
||||
if lagged:
|
||||
# re set the sequence starting at our last
|
||||
# value
|
||||
sequence = sequence[sequence.index(value) + 1:]
|
||||
else:
|
||||
assert value == sequence[0]
|
||||
sequence.remove(value)
|
||||
|
||||
lagged = False
|
||||
lagged = False
|
||||
|
||||
except Lagged:
|
||||
lagged = True
|
||||
print(f'source stream lagged after {value}')
|
||||
lag_count += 1
|
||||
continue
|
||||
except Lagged:
|
||||
lagged = True
|
||||
print(f'source stream lagged after {value}')
|
||||
lag_count += 1
|
||||
continue
|
||||
|
||||
# lag the parent
|
||||
await trio.sleep(parent_delay)
|
||||
# lag the parent
|
||||
await trio.sleep(parent_delay)
|
||||
|
||||
if not sequence:
|
||||
# fully consumed
|
||||
break
|
||||
print(f'parent + source stream lagged: {lag_count}')
|
||||
if not sequence:
|
||||
# fully consumed
|
||||
break
|
||||
print(f'parent + source stream lagged: {lag_count}')
|
||||
|
||||
if parent_delay > sub_delay:
|
||||
assert lag_count > 0
|
||||
if parent_delay > sub_delay:
|
||||
assert lag_count > 0
|
||||
|
||||
except Lagged:
|
||||
# child was lagged
|
||||
assert parent_delay < sub_delay
|
||||
except Lagged:
|
||||
# child was lagged
|
||||
assert parent_delay < sub_delay
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
@ -285,7 +290,11 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
|
|||
|
||||
|
||||
def test_subscribe_errors_after_close():
|
||||
'''
|
||||
Verify after calling `BroadcastReceiver.aclose()` you can't
|
||||
"re-open" it via `.subscribe()`.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
|
||||
size = 1
|
||||
|
@ -293,6 +302,8 @@ def test_subscribe_errors_after_close():
|
|||
async with broadcast_receiver(rx, size) as brx:
|
||||
pass
|
||||
|
||||
assert brx.key not in brx._state.subs
|
||||
|
||||
try:
|
||||
# open and close
|
||||
async with brx.subscribe():
|
||||
|
@ -302,7 +313,7 @@ def test_subscribe_errors_after_close():
|
|||
assert brx.key not in brx._state.subs
|
||||
|
||||
else:
|
||||
assert 0
|
||||
pytest.fail('brx.subscribe() never raised!?')
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
|
|
@ -300,7 +300,7 @@ class Portal:
|
|||
)
|
||||
|
||||
# XXX the one spot we set it?
|
||||
self.channel._cancel_called: bool = True
|
||||
chan._cancel_called: bool = True
|
||||
try:
|
||||
# send cancel cmd - might not get response
|
||||
# XXX: sure would be nice to make this work with
|
||||
|
|
|
@ -552,6 +552,14 @@ class Actor:
|
|||
)
|
||||
raise
|
||||
|
||||
# ?TODO, factor this meth-iface into a new `.rpc` subsys primitive?
|
||||
# - _get_rpc_func(),
|
||||
# - _deliver_ctx_payload(),
|
||||
# - get_context(),
|
||||
# - start_remote_task(),
|
||||
# - cancel_rpc_tasks(),
|
||||
# - _cancel_task(),
|
||||
#
|
||||
def _get_rpc_func(self, ns, funcname):
|
||||
'''
|
||||
Try to lookup and return a target RPC func from the
|
||||
|
@ -1119,14 +1127,6 @@ class Actor:
|
|||
self._cancel_complete.set()
|
||||
return True
|
||||
|
||||
# XXX: hard kill logic if needed?
|
||||
# def _hard_mofo_kill(self):
|
||||
# # If we're the root actor or zombied kill everything
|
||||
# if self._parent_chan is None: # TODO: more robust check
|
||||
# root = trio.lowlevel.current_root_task()
|
||||
# for n in root.child_nurseries:
|
||||
# n.cancel_scope.cancel()
|
||||
|
||||
async def _cancel_task(
|
||||
self,
|
||||
cid: str,
|
||||
|
@ -1361,25 +1361,13 @@ class Actor:
|
|||
'''
|
||||
return self.accept_addrs[0]
|
||||
|
||||
def get_parent(self) -> Portal:
|
||||
'''
|
||||
Return a `Portal` to our parent.
|
||||
|
||||
'''
|
||||
assert self._parent_chan, "No parent channel for this actor?"
|
||||
return Portal(self._parent_chan)
|
||||
|
||||
def get_chans(
|
||||
self,
|
||||
uid: tuple[str, str],
|
||||
|
||||
) -> list[Channel]:
|
||||
'''
|
||||
Return all IPC channels to the actor with provided `uid`.
|
||||
|
||||
'''
|
||||
return self._ipc_server._peers[uid]
|
||||
|
||||
# TODO, this should delegate ONLY to the
|
||||
# `._spawn_spec._runtime_vars: dict` / `._state` APIs?
|
||||
#
|
||||
# XXX, AH RIGHT that's why..
|
||||
# it's bc we pass this as a CLI flag to the child.py precisely
|
||||
# bc we need the bootstrapping pre `async_main()`.. but maybe
|
||||
# keep this as an impl deat and not part of the pub iface impl?
|
||||
def is_infected_aio(self) -> bool:
|
||||
'''
|
||||
If `True`, this actor is running `trio` in guest mode on
|
||||
|
@ -1390,6 +1378,23 @@ class Actor:
|
|||
'''
|
||||
return self._infected_aio
|
||||
|
||||
# ?TODO, is this the right type for this method?
|
||||
def get_parent(self) -> Portal:
|
||||
'''
|
||||
Return a `Portal` to our parent.
|
||||
|
||||
'''
|
||||
assert self._parent_chan, "No parent channel for this actor?"
|
||||
return Portal(self._parent_chan)
|
||||
|
||||
# XXX: hard kill logic if needed?
|
||||
# def _hard_mofo_kill(self):
|
||||
# # If we're the root actor or zombied kill everything
|
||||
# if self._parent_chan is None: # TODO: more robust check
|
||||
# root = trio.lowlevel.current_root_task()
|
||||
# for n in root.child_nurseries:
|
||||
# n.cancel_scope.cancel()
|
||||
|
||||
|
||||
async def async_main(
|
||||
actor: Actor,
|
||||
|
@ -1755,9 +1760,7 @@ async def async_main(
|
|||
f' {pformat(ipc_server._peers)}'
|
||||
)
|
||||
log.runtime(teardown_report)
|
||||
await ipc_server.wait_for_no_more_peers(
|
||||
shield=True,
|
||||
)
|
||||
await ipc_server.wait_for_no_more_peers()
|
||||
|
||||
teardown_report += (
|
||||
'-]> all peer channels are complete.\n'
|
||||
|
|
|
@ -102,6 +102,9 @@ class MsgStream(trio.abc.Channel):
|
|||
self._eoc: bool|trio.EndOfChannel = False
|
||||
self._closed: bool|trio.ClosedResourceError = False
|
||||
|
||||
def is_eoc(self) -> bool|trio.EndOfChannel:
|
||||
return self._eoc
|
||||
|
||||
@property
|
||||
def ctx(self) -> Context:
|
||||
'''
|
||||
|
@ -188,7 +191,14 @@ class MsgStream(trio.abc.Channel):
|
|||
|
||||
return pld
|
||||
|
||||
async def receive(
|
||||
# XXX NOTE, this is left private because in `.subscribe()` usage
|
||||
# we rebind the public `.recieve()` to a `BroadcastReceiver` but
|
||||
# on `.subscribe().__aexit__()`, for the first task which enters,
|
||||
# we want to revert to this msg-stream-instance's method since
|
||||
# mult-task-tracking provided by the b-caster is then no longer
|
||||
# necessary.
|
||||
#
|
||||
async def _receive(
|
||||
self,
|
||||
hide_tb: bool = False,
|
||||
):
|
||||
|
@ -313,6 +323,8 @@ class MsgStream(trio.abc.Channel):
|
|||
|
||||
raise src_err
|
||||
|
||||
receive = _receive
|
||||
|
||||
async def aclose(self) -> list[Exception|dict]:
|
||||
'''
|
||||
Cancel associated remote actor task and local memory channel on
|
||||
|
@ -528,10 +540,15 @@ class MsgStream(trio.abc.Channel):
|
|||
receiver wrapper.
|
||||
|
||||
'''
|
||||
# NOTE: This operation is indempotent and non-reversible, so be
|
||||
# sure you can deal with any (theoretical) overhead of the the
|
||||
# allocated ``BroadcastReceiver`` before calling this method for
|
||||
# the first time.
|
||||
# XXX NOTE, This operation was originally implemented as
|
||||
# indempotent and non-reversible, so you had to be **VERY**
|
||||
# aware of any (theoretical) overhead from the allocated
|
||||
# `BroadcastReceiver.receive()`.
|
||||
#
|
||||
# HOWEVER, NOw we do revert and de-alloc the ._broadcaster
|
||||
# when the final caller (task) exits.
|
||||
#
|
||||
bcast: BroadcastReceiver|None = None
|
||||
if self._broadcaster is None:
|
||||
|
||||
bcast = self._broadcaster = broadcast_receiver(
|
||||
|
@ -541,29 +558,60 @@ class MsgStream(trio.abc.Channel):
|
|||
|
||||
# TODO: can remove this kwarg right since
|
||||
# by default behaviour is to do this anyway?
|
||||
receive_afunc=self.receive,
|
||||
receive_afunc=self._receive,
|
||||
)
|
||||
|
||||
# NOTE: we override the original stream instance's receive
|
||||
# method to now delegate to the broadcaster's ``.receive()``
|
||||
# such that new subscribers will be copied received values
|
||||
# and this stream doesn't have to expect it's original
|
||||
# consumer(s) to get a new broadcast rx handle.
|
||||
# XXX NOTE, we override the original stream instance's
|
||||
# receive method to instead delegate to the broadcaster's
|
||||
# `.receive()` such that new subscribers (multiple
|
||||
# `trio.Task`s) will be copied received values and the
|
||||
# *first* task to enter here doesn't have to expect its original consumer(s)
|
||||
# to get a new broadcast rx handle; everything happens
|
||||
# underneath this iface seemlessly.
|
||||
#
|
||||
self.receive = bcast.receive # type: ignore
|
||||
# seems there's no graceful way to type this with ``mypy``?
|
||||
# seems there's no graceful way to type this with `mypy`?
|
||||
# https://github.com/python/mypy/issues/708
|
||||
|
||||
async with self._broadcaster.subscribe() as bstream:
|
||||
assert bstream.key != self._broadcaster.key
|
||||
assert bstream._recv == self._broadcaster._recv
|
||||
# TODO, prevent re-entrant sub scope?
|
||||
# if self._broadcaster._closed:
|
||||
# raise RuntimeError(
|
||||
# 'This stream
|
||||
|
||||
# NOTE: we patch on a `.send()` to the bcaster so that the
|
||||
# caller can still conduct 2-way streaming using this
|
||||
# ``bstream`` handle transparently as though it was the msg
|
||||
# stream instance.
|
||||
bstream.send = self.send # type: ignore
|
||||
try:
|
||||
aenter = self._broadcaster.subscribe()
|
||||
async with aenter as bstream:
|
||||
# ?TODO, move into test suite?
|
||||
assert bstream.key != self._broadcaster.key
|
||||
assert bstream._recv == self._broadcaster._recv
|
||||
|
||||
yield bstream
|
||||
# NOTE: we patch on a `.send()` to the bcaster so that the
|
||||
# caller can still conduct 2-way streaming using this
|
||||
# ``bstream`` handle transparently as though it was the msg
|
||||
# stream instance.
|
||||
bstream.send = self.send # type: ignore
|
||||
|
||||
# newly-allocated instance
|
||||
yield bstream
|
||||
|
||||
finally:
|
||||
# XXX, the first-enterer task should, like all other
|
||||
# subs, close the first allocated bcrx, which adjusts the
|
||||
# common `bcrx.state`
|
||||
with trio.CancelScope(shield=True):
|
||||
if bcast is not None:
|
||||
await bcast.aclose()
|
||||
|
||||
# XXX, when the bcrx.state reports there are no more subs
|
||||
# we can revert to this obj's method, removing any
|
||||
# delegation overhead!
|
||||
if (
|
||||
(orig_bcast := self._broadcaster)
|
||||
and
|
||||
not orig_bcast.state.subs
|
||||
):
|
||||
self.receive = self._receive
|
||||
# self._broadcaster = None
|
||||
|
||||
async def send(
|
||||
self,
|
||||
|
|
|
@ -117,7 +117,6 @@ class ActorNursery:
|
|||
]
|
||||
] = {}
|
||||
|
||||
self.cancelled: bool = False
|
||||
self._join_procs = trio.Event()
|
||||
self._at_least_one_child_in_debug: bool = False
|
||||
self.errors = errors
|
||||
|
@ -135,10 +134,53 @@ class ActorNursery:
|
|||
# TODO: remove the `.run_in_actor()` API and thus this 2ndary
|
||||
# nursery when that API get's moved outside this primitive!
|
||||
self._ria_nursery = ria_nursery
|
||||
|
||||
# TODO, factor this into a .hilevel api!
|
||||
#
|
||||
# portals spawned with ``run_in_actor()`` are
|
||||
# cancelled when their "main" result arrives
|
||||
self._cancel_after_result_on_exit: set = set()
|
||||
|
||||
# trio.Nursery-like cancel (request) statuses
|
||||
self._cancelled_caught: bool = False
|
||||
self._cancel_called: bool = False
|
||||
|
||||
@property
|
||||
def cancel_called(self) -> bool:
|
||||
'''
|
||||
Records whether cancellation has been requested for this
|
||||
actor-nursery by a call to `.cancel()` either due to,
|
||||
- an explicit call by some actor-local-task,
|
||||
- an implicit call due to an error/cancel emited inside
|
||||
the `tractor.open_nursery()` block.
|
||||
|
||||
'''
|
||||
return self._cancel_called
|
||||
|
||||
@property
|
||||
def cancelled_caught(self) -> bool:
|
||||
'''
|
||||
Set when this nursery was able to cance all spawned subactors
|
||||
gracefully via an (implicit) call to `.cancel()`.
|
||||
|
||||
'''
|
||||
return self._cancelled_caught
|
||||
|
||||
# TODO! remove internal/test-suite usage!
|
||||
@property
|
||||
def cancelled(self) -> bool:
|
||||
warnings.warn(
|
||||
"`ActorNursery.cancelled` is now deprecated, use "
|
||||
" `.cancel_called` instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
return (
|
||||
self._cancel_called
|
||||
# and
|
||||
# self._cancelled_caught
|
||||
)
|
||||
|
||||
async def start_actor(
|
||||
self,
|
||||
name: str,
|
||||
|
@ -316,7 +358,7 @@ class ActorNursery:
|
|||
|
||||
'''
|
||||
__runtimeframe__: int = 1 # noqa
|
||||
self.cancelled = True
|
||||
self._cancel_called = True
|
||||
|
||||
# TODO: impl a repr for spawn more compact
|
||||
# then `._children`..
|
||||
|
@ -394,6 +436,8 @@ class ActorNursery:
|
|||
) in children.values():
|
||||
log.warning(f"Hard killing process {proc}")
|
||||
proc.terminate()
|
||||
else:
|
||||
self._cancelled_caught
|
||||
|
||||
# mark ourselves as having (tried to have) cancelled all subactors
|
||||
self._join_procs.set()
|
||||
|
|
|
@ -101,11 +101,27 @@ class Channel:
|
|||
# ^XXX! ONLY set if a remote actor sends an `Error`-msg
|
||||
self._closed: bool = False
|
||||
|
||||
# flag set by ``Portal.cancel_actor()`` indicating remote
|
||||
# (possibly peer) cancellation of the far end actor
|
||||
# runtime.
|
||||
# flag set by `Portal.cancel_actor()` indicating remote
|
||||
# (possibly peer) cancellation of the far end actor runtime.
|
||||
self._cancel_called: bool = False
|
||||
|
||||
@property
|
||||
def closed(self) -> bool:
|
||||
'''
|
||||
Was `.aclose()` successfully called?
|
||||
|
||||
'''
|
||||
return self._closed
|
||||
|
||||
@property
|
||||
def cancel_called(self) -> bool:
|
||||
'''
|
||||
Set when `Portal.cancel_actor()` is called on a portal which
|
||||
wraps this IPC channel.
|
||||
|
||||
'''
|
||||
return self._cancel_called
|
||||
|
||||
@property
|
||||
def uid(self) -> tuple[str, str]:
|
||||
'''
|
||||
|
|
|
@ -814,10 +814,14 @@ class Server(Struct):
|
|||
|
||||
async def wait_for_no_more_peers(
|
||||
self,
|
||||
shield: bool = False,
|
||||
# XXX, should this even be allowed?
|
||||
# -> i've seen it cause hangs on teardown
|
||||
# in `test_resource_cache.py`
|
||||
# _shield: bool = False,
|
||||
) -> None:
|
||||
with trio.CancelScope(shield=shield):
|
||||
await self._no_more_peers.wait()
|
||||
await self._no_more_peers.wait()
|
||||
# with trio.CancelScope(shield=_shield):
|
||||
# await self._no_more_peers.wait()
|
||||
|
||||
async def wait_for_peer(
|
||||
self,
|
||||
|
|
|
@ -100,6 +100,32 @@ class Lagged(trio.TooSlowError):
|
|||
'''
|
||||
|
||||
|
||||
def wrap_rx_for_eoc(
|
||||
rx: AsyncReceiver,
|
||||
) -> AsyncReceiver:
|
||||
|
||||
match rx:
|
||||
case trio.MemoryReceiveChannel():
|
||||
|
||||
# XXX, taken verbatim from .receive_nowait()
|
||||
def is_eoc() -> bool:
|
||||
if not rx._state.open_send_channels:
|
||||
return trio.EndOfChannel
|
||||
|
||||
return False
|
||||
|
||||
rx.is_eoc = is_eoc
|
||||
|
||||
case _:
|
||||
# XXX, ensure we define a private field!
|
||||
# case tractor.MsgStream:
|
||||
assert (
|
||||
getattr(rx, '_eoc', False) is not None
|
||||
)
|
||||
|
||||
return rx
|
||||
|
||||
|
||||
class BroadcastState(Struct):
|
||||
'''
|
||||
Common state to all receivers of a broadcast.
|
||||
|
@ -186,11 +212,23 @@ class BroadcastReceiver(ReceiveChannel):
|
|||
state.subs[self.key] = -1
|
||||
|
||||
# underlying for this receiver
|
||||
self._rx = rx_chan
|
||||
self._rx = wrap_rx_for_eoc(rx_chan)
|
||||
self._recv = receive_afunc or rx_chan.receive
|
||||
self._closed: bool = False
|
||||
self._raise_on_lag = raise_on_lag
|
||||
|
||||
@property
|
||||
def state(self) -> BroadcastState:
|
||||
'''
|
||||
Read-only access to this receivers internal `._state`
|
||||
instance ref.
|
||||
|
||||
If you just want to read the high-level state metrics,
|
||||
use `.state.statistics()`.
|
||||
|
||||
'''
|
||||
return self._state
|
||||
|
||||
def receive_nowait(
|
||||
self,
|
||||
_key: int | None = None,
|
||||
|
@ -215,7 +253,23 @@ class BroadcastReceiver(ReceiveChannel):
|
|||
try:
|
||||
seq = state.subs[key]
|
||||
except KeyError:
|
||||
# from tractor import pause_from_sync
|
||||
# pause_from_sync(shield=True)
|
||||
if (
|
||||
(rx_eoc := self._rx.is_eoc())
|
||||
or
|
||||
self.state.eoc
|
||||
):
|
||||
raise trio.EndOfChannel(
|
||||
'Broadcast-Rx underlying already ended!'
|
||||
) from rx_eoc
|
||||
|
||||
if self._closed:
|
||||
# if (rx_eoc := self._rx._eoc):
|
||||
# raise trio.EndOfChannel(
|
||||
# 'Broadcast-Rx underlying already ended!'
|
||||
# ) from rx_eoc
|
||||
|
||||
raise trio.ClosedResourceError
|
||||
|
||||
raise RuntimeError(
|
||||
|
@ -453,8 +507,9 @@ class BroadcastReceiver(ReceiveChannel):
|
|||
self._closed = True
|
||||
|
||||
|
||||
# NOTE, this can we use as an `@acm` since `BroadcastReceiver`
|
||||
# derives from `ReceiveChannel`.
|
||||
def broadcast_receiver(
|
||||
|
||||
recv_chan: AsyncReceiver,
|
||||
max_buffer_size: int,
|
||||
receive_afunc: Callable[[], Awaitable[Any]]|None = None,
|
||||
|
|
|
@ -289,7 +289,10 @@ async def maybe_open_context(
|
|||
)
|
||||
_Cache.users += 1
|
||||
lock.release()
|
||||
yield False, yielded
|
||||
yield (
|
||||
False, # cache_hit = "no"
|
||||
yielded,
|
||||
)
|
||||
|
||||
else:
|
||||
_Cache.users += 1
|
||||
|
@ -303,7 +306,10 @@ async def maybe_open_context(
|
|||
# f'{ctx_key!r} -> {yielded!r}\n'
|
||||
)
|
||||
lock.release()
|
||||
yield True, yielded
|
||||
yield (
|
||||
True, # cache_hit = "yes"
|
||||
yielded,
|
||||
)
|
||||
|
||||
finally:
|
||||
_Cache.users -= 1
|
||||
|
|
Loading…
Reference in New Issue