Compare commits
10 Commits
d4ca1a15a5
...
7e49ac678b
Author | SHA1 | Date |
---|---|---|
|
7e49ac678b | |
|
7a075494f1 | |
|
c3aa29e7fa | |
|
9f6acf9ac3 | |
|
2a69d179e6 | |
|
c51a49b045 | |
|
6627a3bfda | |
|
285ebba4b1 | |
|
20628cc0b8 | |
|
2536c5b3d2 |
|
@ -72,11 +72,13 @@ def test_resource_only_entered_once(key_on):
|
||||||
with trio.move_on_after(0.5):
|
with trio.move_on_after(0.5):
|
||||||
async with (
|
async with (
|
||||||
tractor.open_root_actor(),
|
tractor.open_root_actor(),
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as tn,
|
||||||
):
|
):
|
||||||
|
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
n.start_soon(enter_cached_mngr, f'task_{i}')
|
tn.start_soon(
|
||||||
|
enter_cached_mngr,
|
||||||
|
f'task_{i}',
|
||||||
|
)
|
||||||
await trio.sleep(0.001)
|
await trio.sleep(0.001)
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
@ -98,21 +100,32 @@ async def streamer(
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_stream() -> Awaitable[tractor.MsgStream]:
|
async def open_stream() -> Awaitable[
|
||||||
|
tuple[
|
||||||
|
tractor.ActorNursery,
|
||||||
|
tractor.MsgStream,
|
||||||
|
]
|
||||||
|
]:
|
||||||
try:
|
try:
|
||||||
async with tractor.open_nursery() as an:
|
async with tractor.open_nursery() as an:
|
||||||
portal = await an.start_actor(
|
portal = await an.start_actor(
|
||||||
'streamer',
|
'streamer',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
try:
|
||||||
async with (
|
async with (
|
||||||
portal.open_context(streamer) as (ctx, first),
|
portal.open_context(streamer) as (ctx, first),
|
||||||
ctx.open_stream() as stream,
|
ctx.open_stream() as stream,
|
||||||
):
|
):
|
||||||
yield stream
|
print('Entered open_stream() caller')
|
||||||
|
yield an, stream
|
||||||
|
print('Exited open_stream() caller')
|
||||||
|
|
||||||
print('Cancelling streamer')
|
finally:
|
||||||
|
print(
|
||||||
|
'Cancelling streamer with,\n'
|
||||||
|
'=> `Portal.cancel_actor()`'
|
||||||
|
)
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
print('Cancelled streamer')
|
print('Cancelled streamer')
|
||||||
|
|
||||||
|
@ -130,8 +143,12 @@ async def maybe_open_stream(taskname: str):
|
||||||
async with tractor.trionics.maybe_open_context(
|
async with tractor.trionics.maybe_open_context(
|
||||||
# NOTE: all secondary tasks should cache hit on the same key
|
# NOTE: all secondary tasks should cache hit on the same key
|
||||||
acm_func=open_stream,
|
acm_func=open_stream,
|
||||||
) as (cache_hit, stream):
|
) as (
|
||||||
|
cache_hit,
|
||||||
|
(an, stream)
|
||||||
|
):
|
||||||
|
# when the actor + portal + ctx + stream has already been
|
||||||
|
# allocated we want to just bcast to this task.
|
||||||
if cache_hit:
|
if cache_hit:
|
||||||
print(f'{taskname} loaded from cache')
|
print(f'{taskname} loaded from cache')
|
||||||
|
|
||||||
|
@ -139,10 +156,43 @@ async def maybe_open_stream(taskname: str):
|
||||||
# if this feed is already allocated by the first
|
# if this feed is already allocated by the first
|
||||||
# task that entereed
|
# task that entereed
|
||||||
async with stream.subscribe() as bstream:
|
async with stream.subscribe() as bstream:
|
||||||
yield bstream
|
yield an, bstream
|
||||||
|
print(
|
||||||
|
f'cached task exited\n'
|
||||||
|
f')>\n'
|
||||||
|
f' |_{taskname}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# we should always unreg the "cloned" bcrc for this
|
||||||
|
# consumer-task
|
||||||
|
assert id(bstream) not in bstream._state.subs
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# yield the actual stream
|
# yield the actual stream
|
||||||
yield stream
|
try:
|
||||||
|
yield an, stream
|
||||||
|
finally:
|
||||||
|
print(
|
||||||
|
f'NON-cached task exited\n'
|
||||||
|
f')>\n'
|
||||||
|
f' |_{taskname}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
first_bstream = stream._broadcaster
|
||||||
|
bcrx_state = first_bstream._state
|
||||||
|
subs: dict[int, int] = bcrx_state.subs
|
||||||
|
if len(subs) == 1:
|
||||||
|
assert id(first_bstream) in subs
|
||||||
|
# ^^TODO! the bcrx should always de-allocate all subs,
|
||||||
|
# including the implicit first one allocated on entry
|
||||||
|
# by the first subscribing peer task, no?
|
||||||
|
#
|
||||||
|
# -[ ] adjust `MsgStream.subscribe()` to do this mgmt!
|
||||||
|
# |_ allows reverting `MsgStream.receive()` to the
|
||||||
|
# non-bcaster method.
|
||||||
|
# |_ we can decide whether to reset `._broadcaster`?
|
||||||
|
#
|
||||||
|
# await tractor.pause(shield=True)
|
||||||
|
|
||||||
|
|
||||||
def test_open_local_sub_to_stream(
|
def test_open_local_sub_to_stream(
|
||||||
|
@ -159,16 +209,24 @@ def test_open_local_sub_to_stream(
|
||||||
|
|
||||||
if debug_mode:
|
if debug_mode:
|
||||||
timeout = 999
|
timeout = 999
|
||||||
|
print(f'IN debug_mode, setting large timeout={timeout!r}..')
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
|
||||||
full = list(range(1000))
|
full = list(range(1000))
|
||||||
|
an: tractor.ActorNursery|None = None
|
||||||
|
num_tasks: int = 10
|
||||||
|
|
||||||
async def get_sub_and_pull(taskname: str):
|
async def get_sub_and_pull(taskname: str):
|
||||||
|
|
||||||
|
nonlocal an
|
||||||
|
|
||||||
stream: tractor.MsgStream
|
stream: tractor.MsgStream
|
||||||
async with (
|
async with (
|
||||||
maybe_open_stream(taskname) as stream,
|
maybe_open_stream(taskname) as (
|
||||||
|
an,
|
||||||
|
stream,
|
||||||
|
),
|
||||||
):
|
):
|
||||||
if '0' in taskname:
|
if '0' in taskname:
|
||||||
assert isinstance(stream, tractor.MsgStream)
|
assert isinstance(stream, tractor.MsgStream)
|
||||||
|
@ -180,34 +238,70 @@ def test_open_local_sub_to_stream(
|
||||||
|
|
||||||
first = await stream.receive()
|
first = await stream.receive()
|
||||||
print(f'{taskname} started with value {first}')
|
print(f'{taskname} started with value {first}')
|
||||||
seq = []
|
seq: list[int] = []
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
seq.append(msg)
|
seq.append(msg)
|
||||||
|
|
||||||
assert set(seq).issubset(set(full))
|
assert set(seq).issubset(set(full))
|
||||||
|
|
||||||
|
# end of @acm block
|
||||||
print(f'{taskname} finished')
|
print(f'{taskname} finished')
|
||||||
|
|
||||||
|
root: tractor.Actor
|
||||||
with trio.fail_after(timeout) as cs:
|
with trio.fail_after(timeout) as cs:
|
||||||
# TODO: turns out this isn't multi-task entrant XD
|
# TODO: turns out this isn't multi-task entrant XD
|
||||||
# We probably need an indepotent entry semantic?
|
# We probably need an indepotent entry semantic?
|
||||||
async with tractor.open_root_actor(
|
async with tractor.open_root_actor(
|
||||||
debug_mode=debug_mode,
|
debug_mode=debug_mode,
|
||||||
):
|
# maybe_enable_greenback=True,
|
||||||
|
#
|
||||||
|
# ^TODO? doesn't seem to mk breakpoint() usage work
|
||||||
|
# bc each bg task needs to open a portal??
|
||||||
|
# - [ ] we should consider making this part of
|
||||||
|
# our taskman defaults?
|
||||||
|
# |_see https://github.com/goodboy/tractor/pull/363
|
||||||
|
#
|
||||||
|
) as root:
|
||||||
|
assert root.is_registrar
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
):
|
):
|
||||||
for i in range(10):
|
for i in range(num_tasks):
|
||||||
tn.start_soon(
|
tn.start_soon(
|
||||||
get_sub_and_pull,
|
get_sub_and_pull,
|
||||||
f'task_{i}',
|
f'task_{i}',
|
||||||
)
|
)
|
||||||
await trio.sleep(0.001)
|
await trio.sleep(0.001)
|
||||||
|
|
||||||
print('all consumer tasks finished')
|
print('all consumer tasks finished!')
|
||||||
|
|
||||||
|
# ?XXX, ensure actor-nursery is shutdown or we might
|
||||||
|
# hang here due to a minor task deadlock/race-condition?
|
||||||
|
#
|
||||||
|
# - seems that all we need is a checkpoint to ensure
|
||||||
|
# the last suspended task, which is inside
|
||||||
|
# `.maybe_open_context()`, can do the
|
||||||
|
# `Portal.cancel_actor()` call?
|
||||||
|
#
|
||||||
|
# - if that bg task isn't resumed, then this blocks
|
||||||
|
# timeout might hit before that?
|
||||||
|
#
|
||||||
|
if root.ipc_server.has_peers():
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
|
# alt approach, cancel the entire `an`
|
||||||
|
# await tractor.pause()
|
||||||
|
# await an.cancel()
|
||||||
|
|
||||||
|
# end of runtime scope
|
||||||
|
print('root actor terminated.')
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
pytest.fail(
|
pytest.fail(
|
||||||
'Should NOT time out in `open_root_actor()` ?'
|
'Should NOT time out in `open_root_actor()` ?'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
print('exiting main.')
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
|
@ -67,7 +67,6 @@ async def ensure_sequence(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_sequence_streamer(
|
async def open_sequence_streamer(
|
||||||
|
|
||||||
sequence: list[int],
|
sequence: list[int],
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
start_method: str,
|
start_method: str,
|
||||||
|
@ -96,23 +95,27 @@ async def open_sequence_streamer(
|
||||||
|
|
||||||
|
|
||||||
def test_stream_fan_out_to_local_subscriptions(
|
def test_stream_fan_out_to_local_subscriptions(
|
||||||
reg_addr,
|
debug_mode: bool,
|
||||||
|
reg_addr: tuple,
|
||||||
start_method,
|
start_method,
|
||||||
):
|
):
|
||||||
|
|
||||||
sequence = list(range(1000))
|
sequence = list(range(1000))
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
with trio.fail_after(9):
|
||||||
async with open_sequence_streamer(
|
async with open_sequence_streamer(
|
||||||
sequence,
|
sequence,
|
||||||
reg_addr,
|
reg_addr,
|
||||||
start_method,
|
start_method,
|
||||||
) as stream:
|
) as stream:
|
||||||
|
|
||||||
async with trio.open_nursery() as n:
|
async with (
|
||||||
|
collapse_eg(),
|
||||||
|
trio.open_nursery() as tn,
|
||||||
|
):
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
n.start_soon(
|
tn.start_soon(
|
||||||
ensure_sequence,
|
ensure_sequence,
|
||||||
stream,
|
stream,
|
||||||
sequence.copy(),
|
sequence.copy(),
|
||||||
|
@ -151,6 +154,8 @@ def test_consumer_and_parent_maybe_lag(
|
||||||
sequence = list(range(300))
|
sequence = list(range(300))
|
||||||
parent_delay, sub_delay = task_delays
|
parent_delay, sub_delay = task_delays
|
||||||
|
|
||||||
|
# TODO, maybe mak a cm-deco for main()s?
|
||||||
|
with trio.fail_after(3):
|
||||||
async with open_sequence_streamer(
|
async with open_sequence_streamer(
|
||||||
sequence,
|
sequence,
|
||||||
reg_addr,
|
reg_addr,
|
||||||
|
@ -285,7 +290,11 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
|
||||||
|
|
||||||
|
|
||||||
def test_subscribe_errors_after_close():
|
def test_subscribe_errors_after_close():
|
||||||
|
'''
|
||||||
|
Verify after calling `BroadcastReceiver.aclose()` you can't
|
||||||
|
"re-open" it via `.subscribe()`.
|
||||||
|
|
||||||
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
|
|
||||||
size = 1
|
size = 1
|
||||||
|
@ -293,6 +302,8 @@ def test_subscribe_errors_after_close():
|
||||||
async with broadcast_receiver(rx, size) as brx:
|
async with broadcast_receiver(rx, size) as brx:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
assert brx.key not in brx._state.subs
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# open and close
|
# open and close
|
||||||
async with brx.subscribe():
|
async with brx.subscribe():
|
||||||
|
@ -302,7 +313,7 @@ def test_subscribe_errors_after_close():
|
||||||
assert brx.key not in brx._state.subs
|
assert brx.key not in brx._state.subs
|
||||||
|
|
||||||
else:
|
else:
|
||||||
assert 0
|
pytest.fail('brx.subscribe() never raised!?')
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
|
@ -300,7 +300,7 @@ class Portal:
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX the one spot we set it?
|
# XXX the one spot we set it?
|
||||||
self.channel._cancel_called: bool = True
|
chan._cancel_called: bool = True
|
||||||
try:
|
try:
|
||||||
# send cancel cmd - might not get response
|
# send cancel cmd - might not get response
|
||||||
# XXX: sure would be nice to make this work with
|
# XXX: sure would be nice to make this work with
|
||||||
|
|
|
@ -552,6 +552,14 @@ class Actor:
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
# ?TODO, factor this meth-iface into a new `.rpc` subsys primitive?
|
||||||
|
# - _get_rpc_func(),
|
||||||
|
# - _deliver_ctx_payload(),
|
||||||
|
# - get_context(),
|
||||||
|
# - start_remote_task(),
|
||||||
|
# - cancel_rpc_tasks(),
|
||||||
|
# - _cancel_task(),
|
||||||
|
#
|
||||||
def _get_rpc_func(self, ns, funcname):
|
def _get_rpc_func(self, ns, funcname):
|
||||||
'''
|
'''
|
||||||
Try to lookup and return a target RPC func from the
|
Try to lookup and return a target RPC func from the
|
||||||
|
@ -1119,14 +1127,6 @@ class Actor:
|
||||||
self._cancel_complete.set()
|
self._cancel_complete.set()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# XXX: hard kill logic if needed?
|
|
||||||
# def _hard_mofo_kill(self):
|
|
||||||
# # If we're the root actor or zombied kill everything
|
|
||||||
# if self._parent_chan is None: # TODO: more robust check
|
|
||||||
# root = trio.lowlevel.current_root_task()
|
|
||||||
# for n in root.child_nurseries:
|
|
||||||
# n.cancel_scope.cancel()
|
|
||||||
|
|
||||||
async def _cancel_task(
|
async def _cancel_task(
|
||||||
self,
|
self,
|
||||||
cid: str,
|
cid: str,
|
||||||
|
@ -1361,25 +1361,13 @@ class Actor:
|
||||||
'''
|
'''
|
||||||
return self.accept_addrs[0]
|
return self.accept_addrs[0]
|
||||||
|
|
||||||
def get_parent(self) -> Portal:
|
# TODO, this should delegate ONLY to the
|
||||||
'''
|
# `._spawn_spec._runtime_vars: dict` / `._state` APIs?
|
||||||
Return a `Portal` to our parent.
|
#
|
||||||
|
# XXX, AH RIGHT that's why..
|
||||||
'''
|
# it's bc we pass this as a CLI flag to the child.py precisely
|
||||||
assert self._parent_chan, "No parent channel for this actor?"
|
# bc we need the bootstrapping pre `async_main()`.. but maybe
|
||||||
return Portal(self._parent_chan)
|
# keep this as an impl deat and not part of the pub iface impl?
|
||||||
|
|
||||||
def get_chans(
|
|
||||||
self,
|
|
||||||
uid: tuple[str, str],
|
|
||||||
|
|
||||||
) -> list[Channel]:
|
|
||||||
'''
|
|
||||||
Return all IPC channels to the actor with provided `uid`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
return self._ipc_server._peers[uid]
|
|
||||||
|
|
||||||
def is_infected_aio(self) -> bool:
|
def is_infected_aio(self) -> bool:
|
||||||
'''
|
'''
|
||||||
If `True`, this actor is running `trio` in guest mode on
|
If `True`, this actor is running `trio` in guest mode on
|
||||||
|
@ -1390,6 +1378,23 @@ class Actor:
|
||||||
'''
|
'''
|
||||||
return self._infected_aio
|
return self._infected_aio
|
||||||
|
|
||||||
|
# ?TODO, is this the right type for this method?
|
||||||
|
def get_parent(self) -> Portal:
|
||||||
|
'''
|
||||||
|
Return a `Portal` to our parent.
|
||||||
|
|
||||||
|
'''
|
||||||
|
assert self._parent_chan, "No parent channel for this actor?"
|
||||||
|
return Portal(self._parent_chan)
|
||||||
|
|
||||||
|
# XXX: hard kill logic if needed?
|
||||||
|
# def _hard_mofo_kill(self):
|
||||||
|
# # If we're the root actor or zombied kill everything
|
||||||
|
# if self._parent_chan is None: # TODO: more robust check
|
||||||
|
# root = trio.lowlevel.current_root_task()
|
||||||
|
# for n in root.child_nurseries:
|
||||||
|
# n.cancel_scope.cancel()
|
||||||
|
|
||||||
|
|
||||||
async def async_main(
|
async def async_main(
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
|
@ -1755,9 +1760,7 @@ async def async_main(
|
||||||
f' {pformat(ipc_server._peers)}'
|
f' {pformat(ipc_server._peers)}'
|
||||||
)
|
)
|
||||||
log.runtime(teardown_report)
|
log.runtime(teardown_report)
|
||||||
await ipc_server.wait_for_no_more_peers(
|
await ipc_server.wait_for_no_more_peers()
|
||||||
shield=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
teardown_report += (
|
teardown_report += (
|
||||||
'-]> all peer channels are complete.\n'
|
'-]> all peer channels are complete.\n'
|
||||||
|
|
|
@ -102,6 +102,9 @@ class MsgStream(trio.abc.Channel):
|
||||||
self._eoc: bool|trio.EndOfChannel = False
|
self._eoc: bool|trio.EndOfChannel = False
|
||||||
self._closed: bool|trio.ClosedResourceError = False
|
self._closed: bool|trio.ClosedResourceError = False
|
||||||
|
|
||||||
|
def is_eoc(self) -> bool|trio.EndOfChannel:
|
||||||
|
return self._eoc
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def ctx(self) -> Context:
|
def ctx(self) -> Context:
|
||||||
'''
|
'''
|
||||||
|
@ -188,7 +191,14 @@ class MsgStream(trio.abc.Channel):
|
||||||
|
|
||||||
return pld
|
return pld
|
||||||
|
|
||||||
async def receive(
|
# XXX NOTE, this is left private because in `.subscribe()` usage
|
||||||
|
# we rebind the public `.recieve()` to a `BroadcastReceiver` but
|
||||||
|
# on `.subscribe().__aexit__()`, for the first task which enters,
|
||||||
|
# we want to revert to this msg-stream-instance's method since
|
||||||
|
# mult-task-tracking provided by the b-caster is then no longer
|
||||||
|
# necessary.
|
||||||
|
#
|
||||||
|
async def _receive(
|
||||||
self,
|
self,
|
||||||
hide_tb: bool = False,
|
hide_tb: bool = False,
|
||||||
):
|
):
|
||||||
|
@ -313,6 +323,8 @@ class MsgStream(trio.abc.Channel):
|
||||||
|
|
||||||
raise src_err
|
raise src_err
|
||||||
|
|
||||||
|
receive = _receive
|
||||||
|
|
||||||
async def aclose(self) -> list[Exception|dict]:
|
async def aclose(self) -> list[Exception|dict]:
|
||||||
'''
|
'''
|
||||||
Cancel associated remote actor task and local memory channel on
|
Cancel associated remote actor task and local memory channel on
|
||||||
|
@ -528,10 +540,15 @@ class MsgStream(trio.abc.Channel):
|
||||||
receiver wrapper.
|
receiver wrapper.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# NOTE: This operation is indempotent and non-reversible, so be
|
# XXX NOTE, This operation was originally implemented as
|
||||||
# sure you can deal with any (theoretical) overhead of the the
|
# indempotent and non-reversible, so you had to be **VERY**
|
||||||
# allocated ``BroadcastReceiver`` before calling this method for
|
# aware of any (theoretical) overhead from the allocated
|
||||||
# the first time.
|
# `BroadcastReceiver.receive()`.
|
||||||
|
#
|
||||||
|
# HOWEVER, NOw we do revert and de-alloc the ._broadcaster
|
||||||
|
# when the final caller (task) exits.
|
||||||
|
#
|
||||||
|
bcast: BroadcastReceiver|None = None
|
||||||
if self._broadcaster is None:
|
if self._broadcaster is None:
|
||||||
|
|
||||||
bcast = self._broadcaster = broadcast_receiver(
|
bcast = self._broadcaster = broadcast_receiver(
|
||||||
|
@ -541,19 +558,30 @@ class MsgStream(trio.abc.Channel):
|
||||||
|
|
||||||
# TODO: can remove this kwarg right since
|
# TODO: can remove this kwarg right since
|
||||||
# by default behaviour is to do this anyway?
|
# by default behaviour is to do this anyway?
|
||||||
receive_afunc=self.receive,
|
receive_afunc=self._receive,
|
||||||
)
|
)
|
||||||
|
|
||||||
# NOTE: we override the original stream instance's receive
|
# XXX NOTE, we override the original stream instance's
|
||||||
# method to now delegate to the broadcaster's ``.receive()``
|
# receive method to instead delegate to the broadcaster's
|
||||||
# such that new subscribers will be copied received values
|
# `.receive()` such that new subscribers (multiple
|
||||||
# and this stream doesn't have to expect it's original
|
# `trio.Task`s) will be copied received values and the
|
||||||
# consumer(s) to get a new broadcast rx handle.
|
# *first* task to enter here doesn't have to expect its original consumer(s)
|
||||||
|
# to get a new broadcast rx handle; everything happens
|
||||||
|
# underneath this iface seemlessly.
|
||||||
|
#
|
||||||
self.receive = bcast.receive # type: ignore
|
self.receive = bcast.receive # type: ignore
|
||||||
# seems there's no graceful way to type this with ``mypy``?
|
# seems there's no graceful way to type this with `mypy`?
|
||||||
# https://github.com/python/mypy/issues/708
|
# https://github.com/python/mypy/issues/708
|
||||||
|
|
||||||
async with self._broadcaster.subscribe() as bstream:
|
# TODO, prevent re-entrant sub scope?
|
||||||
|
# if self._broadcaster._closed:
|
||||||
|
# raise RuntimeError(
|
||||||
|
# 'This stream
|
||||||
|
|
||||||
|
try:
|
||||||
|
aenter = self._broadcaster.subscribe()
|
||||||
|
async with aenter as bstream:
|
||||||
|
# ?TODO, move into test suite?
|
||||||
assert bstream.key != self._broadcaster.key
|
assert bstream.key != self._broadcaster.key
|
||||||
assert bstream._recv == self._broadcaster._recv
|
assert bstream._recv == self._broadcaster._recv
|
||||||
|
|
||||||
|
@ -563,8 +591,28 @@ class MsgStream(trio.abc.Channel):
|
||||||
# stream instance.
|
# stream instance.
|
||||||
bstream.send = self.send # type: ignore
|
bstream.send = self.send # type: ignore
|
||||||
|
|
||||||
|
# newly-allocated instance
|
||||||
yield bstream
|
yield bstream
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# XXX, the first-enterer task should, like all other
|
||||||
|
# subs, close the first allocated bcrx, which adjusts the
|
||||||
|
# common `bcrx.state`
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
if bcast is not None:
|
||||||
|
await bcast.aclose()
|
||||||
|
|
||||||
|
# XXX, when the bcrx.state reports there are no more subs
|
||||||
|
# we can revert to this obj's method, removing any
|
||||||
|
# delegation overhead!
|
||||||
|
if (
|
||||||
|
(orig_bcast := self._broadcaster)
|
||||||
|
and
|
||||||
|
not orig_bcast.state.subs
|
||||||
|
):
|
||||||
|
self.receive = self._receive
|
||||||
|
# self._broadcaster = None
|
||||||
|
|
||||||
async def send(
|
async def send(
|
||||||
self,
|
self,
|
||||||
data: Any,
|
data: Any,
|
||||||
|
|
|
@ -117,7 +117,6 @@ class ActorNursery:
|
||||||
]
|
]
|
||||||
] = {}
|
] = {}
|
||||||
|
|
||||||
self.cancelled: bool = False
|
|
||||||
self._join_procs = trio.Event()
|
self._join_procs = trio.Event()
|
||||||
self._at_least_one_child_in_debug: bool = False
|
self._at_least_one_child_in_debug: bool = False
|
||||||
self.errors = errors
|
self.errors = errors
|
||||||
|
@ -135,10 +134,53 @@ class ActorNursery:
|
||||||
# TODO: remove the `.run_in_actor()` API and thus this 2ndary
|
# TODO: remove the `.run_in_actor()` API and thus this 2ndary
|
||||||
# nursery when that API get's moved outside this primitive!
|
# nursery when that API get's moved outside this primitive!
|
||||||
self._ria_nursery = ria_nursery
|
self._ria_nursery = ria_nursery
|
||||||
|
|
||||||
|
# TODO, factor this into a .hilevel api!
|
||||||
|
#
|
||||||
# portals spawned with ``run_in_actor()`` are
|
# portals spawned with ``run_in_actor()`` are
|
||||||
# cancelled when their "main" result arrives
|
# cancelled when their "main" result arrives
|
||||||
self._cancel_after_result_on_exit: set = set()
|
self._cancel_after_result_on_exit: set = set()
|
||||||
|
|
||||||
|
# trio.Nursery-like cancel (request) statuses
|
||||||
|
self._cancelled_caught: bool = False
|
||||||
|
self._cancel_called: bool = False
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cancel_called(self) -> bool:
|
||||||
|
'''
|
||||||
|
Records whether cancellation has been requested for this
|
||||||
|
actor-nursery by a call to `.cancel()` either due to,
|
||||||
|
- an explicit call by some actor-local-task,
|
||||||
|
- an implicit call due to an error/cancel emited inside
|
||||||
|
the `tractor.open_nursery()` block.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._cancel_called
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cancelled_caught(self) -> bool:
|
||||||
|
'''
|
||||||
|
Set when this nursery was able to cance all spawned subactors
|
||||||
|
gracefully via an (implicit) call to `.cancel()`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._cancelled_caught
|
||||||
|
|
||||||
|
# TODO! remove internal/test-suite usage!
|
||||||
|
@property
|
||||||
|
def cancelled(self) -> bool:
|
||||||
|
warnings.warn(
|
||||||
|
"`ActorNursery.cancelled` is now deprecated, use "
|
||||||
|
" `.cancel_called` instead.",
|
||||||
|
DeprecationWarning,
|
||||||
|
stacklevel=2,
|
||||||
|
)
|
||||||
|
return (
|
||||||
|
self._cancel_called
|
||||||
|
# and
|
||||||
|
# self._cancelled_caught
|
||||||
|
)
|
||||||
|
|
||||||
async def start_actor(
|
async def start_actor(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
@ -316,7 +358,7 @@ class ActorNursery:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__runtimeframe__: int = 1 # noqa
|
__runtimeframe__: int = 1 # noqa
|
||||||
self.cancelled = True
|
self._cancel_called = True
|
||||||
|
|
||||||
# TODO: impl a repr for spawn more compact
|
# TODO: impl a repr for spawn more compact
|
||||||
# then `._children`..
|
# then `._children`..
|
||||||
|
@ -394,6 +436,8 @@ class ActorNursery:
|
||||||
) in children.values():
|
) in children.values():
|
||||||
log.warning(f"Hard killing process {proc}")
|
log.warning(f"Hard killing process {proc}")
|
||||||
proc.terminate()
|
proc.terminate()
|
||||||
|
else:
|
||||||
|
self._cancelled_caught
|
||||||
|
|
||||||
# mark ourselves as having (tried to have) cancelled all subactors
|
# mark ourselves as having (tried to have) cancelled all subactors
|
||||||
self._join_procs.set()
|
self._join_procs.set()
|
||||||
|
|
|
@ -101,11 +101,27 @@ class Channel:
|
||||||
# ^XXX! ONLY set if a remote actor sends an `Error`-msg
|
# ^XXX! ONLY set if a remote actor sends an `Error`-msg
|
||||||
self._closed: bool = False
|
self._closed: bool = False
|
||||||
|
|
||||||
# flag set by ``Portal.cancel_actor()`` indicating remote
|
# flag set by `Portal.cancel_actor()` indicating remote
|
||||||
# (possibly peer) cancellation of the far end actor
|
# (possibly peer) cancellation of the far end actor runtime.
|
||||||
# runtime.
|
|
||||||
self._cancel_called: bool = False
|
self._cancel_called: bool = False
|
||||||
|
|
||||||
|
@property
|
||||||
|
def closed(self) -> bool:
|
||||||
|
'''
|
||||||
|
Was `.aclose()` successfully called?
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._closed
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cancel_called(self) -> bool:
|
||||||
|
'''
|
||||||
|
Set when `Portal.cancel_actor()` is called on a portal which
|
||||||
|
wraps this IPC channel.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._cancel_called
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def uid(self) -> tuple[str, str]:
|
def uid(self) -> tuple[str, str]:
|
||||||
'''
|
'''
|
||||||
|
|
|
@ -814,10 +814,14 @@ class Server(Struct):
|
||||||
|
|
||||||
async def wait_for_no_more_peers(
|
async def wait_for_no_more_peers(
|
||||||
self,
|
self,
|
||||||
shield: bool = False,
|
# XXX, should this even be allowed?
|
||||||
|
# -> i've seen it cause hangs on teardown
|
||||||
|
# in `test_resource_cache.py`
|
||||||
|
# _shield: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
with trio.CancelScope(shield=shield):
|
|
||||||
await self._no_more_peers.wait()
|
await self._no_more_peers.wait()
|
||||||
|
# with trio.CancelScope(shield=_shield):
|
||||||
|
# await self._no_more_peers.wait()
|
||||||
|
|
||||||
async def wait_for_peer(
|
async def wait_for_peer(
|
||||||
self,
|
self,
|
||||||
|
|
|
@ -100,6 +100,32 @@ class Lagged(trio.TooSlowError):
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
|
||||||
|
def wrap_rx_for_eoc(
|
||||||
|
rx: AsyncReceiver,
|
||||||
|
) -> AsyncReceiver:
|
||||||
|
|
||||||
|
match rx:
|
||||||
|
case trio.MemoryReceiveChannel():
|
||||||
|
|
||||||
|
# XXX, taken verbatim from .receive_nowait()
|
||||||
|
def is_eoc() -> bool:
|
||||||
|
if not rx._state.open_send_channels:
|
||||||
|
return trio.EndOfChannel
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
rx.is_eoc = is_eoc
|
||||||
|
|
||||||
|
case _:
|
||||||
|
# XXX, ensure we define a private field!
|
||||||
|
# case tractor.MsgStream:
|
||||||
|
assert (
|
||||||
|
getattr(rx, '_eoc', False) is not None
|
||||||
|
)
|
||||||
|
|
||||||
|
return rx
|
||||||
|
|
||||||
|
|
||||||
class BroadcastState(Struct):
|
class BroadcastState(Struct):
|
||||||
'''
|
'''
|
||||||
Common state to all receivers of a broadcast.
|
Common state to all receivers of a broadcast.
|
||||||
|
@ -186,11 +212,23 @@ class BroadcastReceiver(ReceiveChannel):
|
||||||
state.subs[self.key] = -1
|
state.subs[self.key] = -1
|
||||||
|
|
||||||
# underlying for this receiver
|
# underlying for this receiver
|
||||||
self._rx = rx_chan
|
self._rx = wrap_rx_for_eoc(rx_chan)
|
||||||
self._recv = receive_afunc or rx_chan.receive
|
self._recv = receive_afunc or rx_chan.receive
|
||||||
self._closed: bool = False
|
self._closed: bool = False
|
||||||
self._raise_on_lag = raise_on_lag
|
self._raise_on_lag = raise_on_lag
|
||||||
|
|
||||||
|
@property
|
||||||
|
def state(self) -> BroadcastState:
|
||||||
|
'''
|
||||||
|
Read-only access to this receivers internal `._state`
|
||||||
|
instance ref.
|
||||||
|
|
||||||
|
If you just want to read the high-level state metrics,
|
||||||
|
use `.state.statistics()`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._state
|
||||||
|
|
||||||
def receive_nowait(
|
def receive_nowait(
|
||||||
self,
|
self,
|
||||||
_key: int | None = None,
|
_key: int | None = None,
|
||||||
|
@ -215,7 +253,23 @@ class BroadcastReceiver(ReceiveChannel):
|
||||||
try:
|
try:
|
||||||
seq = state.subs[key]
|
seq = state.subs[key]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
# from tractor import pause_from_sync
|
||||||
|
# pause_from_sync(shield=True)
|
||||||
|
if (
|
||||||
|
(rx_eoc := self._rx.is_eoc())
|
||||||
|
or
|
||||||
|
self.state.eoc
|
||||||
|
):
|
||||||
|
raise trio.EndOfChannel(
|
||||||
|
'Broadcast-Rx underlying already ended!'
|
||||||
|
) from rx_eoc
|
||||||
|
|
||||||
if self._closed:
|
if self._closed:
|
||||||
|
# if (rx_eoc := self._rx._eoc):
|
||||||
|
# raise trio.EndOfChannel(
|
||||||
|
# 'Broadcast-Rx underlying already ended!'
|
||||||
|
# ) from rx_eoc
|
||||||
|
|
||||||
raise trio.ClosedResourceError
|
raise trio.ClosedResourceError
|
||||||
|
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
|
@ -453,8 +507,9 @@ class BroadcastReceiver(ReceiveChannel):
|
||||||
self._closed = True
|
self._closed = True
|
||||||
|
|
||||||
|
|
||||||
|
# NOTE, this can we use as an `@acm` since `BroadcastReceiver`
|
||||||
|
# derives from `ReceiveChannel`.
|
||||||
def broadcast_receiver(
|
def broadcast_receiver(
|
||||||
|
|
||||||
recv_chan: AsyncReceiver,
|
recv_chan: AsyncReceiver,
|
||||||
max_buffer_size: int,
|
max_buffer_size: int,
|
||||||
receive_afunc: Callable[[], Awaitable[Any]]|None = None,
|
receive_afunc: Callable[[], Awaitable[Any]]|None = None,
|
||||||
|
|
|
@ -289,7 +289,10 @@ async def maybe_open_context(
|
||||||
)
|
)
|
||||||
_Cache.users += 1
|
_Cache.users += 1
|
||||||
lock.release()
|
lock.release()
|
||||||
yield False, yielded
|
yield (
|
||||||
|
False, # cache_hit = "no"
|
||||||
|
yielded,
|
||||||
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
_Cache.users += 1
|
_Cache.users += 1
|
||||||
|
@ -303,7 +306,10 @@ async def maybe_open_context(
|
||||||
# f'{ctx_key!r} -> {yielded!r}\n'
|
# f'{ctx_key!r} -> {yielded!r}\n'
|
||||||
)
|
)
|
||||||
lock.release()
|
lock.release()
|
||||||
yield True, yielded
|
yield (
|
||||||
|
True, # cache_hit = "yes"
|
||||||
|
yielded,
|
||||||
|
)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
_Cache.users -= 1
|
_Cache.users -= 1
|
||||||
|
|
Loading…
Reference in New Issue