Compare commits

..

No commits in common. "main" and "strict_egs_everywhere" have entirely different histories.

23 changed files with 308 additions and 1247 deletions

View File

@ -1,35 +0,0 @@
import trio
import tractor
async def main():
async with tractor.open_root_actor(
debug_mode=True,
loglevel='cancel',
) as _root:
# manually trigger self-cancellation and wait
# for it to fully trigger.
_root.cancel_soon()
await _root._cancel_complete.wait()
print('root cancelled')
# now ensure we can still use the REPL
try:
await tractor.pause()
except trio.Cancelled as _taskc:
assert (root_cs := _root._root_tn.cancel_scope).cancel_called
# NOTE^^ above logic but inside `open_root_actor()` and
# passed to the `shield=` expression is effectively what
# we're testing here!
await tractor.pause(shield=root_cs.cancel_called)
# XXX, if shield logic *is wrong* inside `open_root_actor()`'s
# crash-handler block this should never be interacted,
# instead `trio.Cancelled` would be bubbled up: the original
# BUG.
assert 0
if __name__ == '__main__':
trio.run(main)

View File

@ -1,13 +1,13 @@
"""
That "native" debug mode better work!
All these tests can be understood (somewhat) by running the
equivalent `examples/debugging/` scripts manually.
All these tests can be understood (somewhat) by running the equivalent
`examples/debugging/` scripts manually.
TODO:
- none of these tests have been run successfully on windows yet but
there's been manual testing that verified it works.
- wonder if any of it'll work on OS X?
- none of these tests have been run successfully on windows yet but
there's been manual testing that verified it works.
- wonder if any of it'll work on OS X?
"""
from __future__ import annotations
@ -925,7 +925,6 @@ def test_post_mortem_api(
"<Task 'name_error'",
"NameError",
"('child'",
'getattr(doggypants)', # exc-LoC
]
)
if ctlc:
@ -942,8 +941,8 @@ def test_post_mortem_api(
"<Task '__main__.main'",
"('root'",
"NameError",
"tractor.post_mortem()",
"src_uid=('child'",
"tractor.post_mortem()", # in `main()`-LoC
]
)
if ctlc:
@ -961,10 +960,6 @@ def test_post_mortem_api(
"('root'",
"NameError",
"src_uid=('child'",
# raising line in `main()` but from crash-handling
# in `tractor.open_nursery()`.
'async with p.open_context(name_error) as (ctx, first):',
]
)
if ctlc:
@ -1156,54 +1151,6 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
)
def test_crash_handling_within_cancelled_root_actor(
spawn: PexpectSpawner,
):
'''
Ensure that when only a root-actor is started via `open_root_actor()`
we can crash-handle in debug-mode despite self-cancellation.
More-or-less ensures we conditionally shield the pause in
`._root.open_root_actor()`'s `await debug._maybe_enter_pm()`
call.
'''
child = spawn('root_self_cancelled_w_error')
child.expect(PROMPT)
assert_before(
child,
[
"Actor.cancel_soon()` was called!",
"root cancelled",
_pause_msg,
"('root'", # actor name
]
)
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,
[
_crash_msg,
"('root'", # actor name
"AssertionError",
"assert 0",
]
)
child.sendline('c')
child.expect(EOF)
assert_before(
child,
[
"AssertionError",
"assert 0",
]
)
# TODO: better error for "non-ideal" usage from the root actor.
# -[ ] if called from an async scope emit a message that suggests
# using `await tractor.pause()` instead since it's less overhead

View File

@ -1,114 +0,0 @@
'''
Unit-ish tests for specific IPC transport protocol backends.
'''
from __future__ import annotations
from pathlib import Path
import pytest
import trio
import tractor
from tractor import (
Actor,
_state,
_addr,
)
@pytest.fixture
def bindspace_dir_str() -> str:
rt_dir: Path = tractor._state.get_rt_dir()
bs_dir: Path = rt_dir / 'doggy'
bs_dir_str: str = str(bs_dir)
assert not bs_dir.is_dir()
yield bs_dir_str
# delete it on suite teardown.
# ?TODO? should we support this internally
# or is leaking it ok?
if bs_dir.is_dir():
bs_dir.rmdir()
def test_uds_bindspace_created_implicitly(
debug_mode: bool,
bindspace_dir_str: str,
):
registry_addr: tuple = (
f'{bindspace_dir_str}',
'registry@doggy.sock',
)
bs_dir_str: str = registry_addr[0]
# XXX, ensure bindspace-dir DNE beforehand!
assert not Path(bs_dir_str).is_dir()
async def main():
async with tractor.open_nursery(
enable_transports=['uds'],
registry_addrs=[registry_addr],
debug_mode=debug_mode,
) as _an:
# XXX MUST be created implicitly by
# `.ipc._uds.start_listener()`!
assert Path(bs_dir_str).is_dir()
root: Actor = tractor.current_actor()
assert root.is_registrar
assert registry_addr in root.reg_addrs
assert (
registry_addr
in
_state._runtime_vars['_registry_addrs']
)
assert (
_addr.wrap_address(registry_addr)
in
root.registry_addrs
)
trio.run(main)
def test_uds_double_listen_raises_connerr(
debug_mode: bool,
bindspace_dir_str: str,
):
registry_addr: tuple = (
f'{bindspace_dir_str}',
'registry@doggy.sock',
)
async def main():
async with tractor.open_nursery(
enable_transports=['uds'],
registry_addrs=[registry_addr],
debug_mode=debug_mode,
) as _an:
# runtime up
root: Actor = tractor.current_actor()
from tractor.ipc._uds import (
start_listener,
UDSAddress,
)
ya_bound_addr: UDSAddress = root.registry_addrs[0]
try:
await start_listener(
addr=ya_bound_addr,
)
except ConnectionError as connerr:
assert type(src_exc := connerr.__context__) is OSError
assert 'Address already in use' in src_exc.args
# complete, exit test.
else:
pytest.fail('It dint raise a connerr !?')
trio.run(main)

View File

@ -236,10 +236,7 @@ async def stream_forever():
async def test_cancel_infinite_streamer(start_method):
# stream for at most 1 seconds
with (
trio.fail_after(4),
trio.move_on_after(1) as cancel_scope
):
with trio.move_on_after(1) as cancel_scope:
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'donny',

View File

@ -571,16 +571,14 @@ def test_basic_interloop_channel_stream(
fan_out: bool,
):
async def main():
# TODO, figure out min timeout here!
with trio.fail_after(6):
async with tractor.open_nursery() as an:
portal = await an.run_in_actor(
stream_from_aio,
infect_asyncio=True,
fan_out=fan_out,
)
# should raise RAE diectly
await portal.result()
async with tractor.open_nursery() as an:
portal = await an.run_in_actor(
stream_from_aio,
infect_asyncio=True,
fan_out=fan_out,
)
# should raise RAE diectly
await portal.result()
trio.run(main)
@ -1088,108 +1086,6 @@ def test_sigint_closes_lifetime_stack(
trio.run(main)
# ?TODO asyncio.Task fn-deco?
# -[ ] do sig checkingat import time like @context?
# -[ ] maybe name it @aio_task ??
# -[ ] chan: to_asyncio.InterloopChannel ??
async def raise_before_started(
# from_trio: asyncio.Queue,
# to_trio: trio.abc.SendChannel,
chan: to_asyncio.LinkedTaskChannel,
) -> None:
'''
`asyncio.Task` entry point which RTEs before calling
`to_trio.send_nowait()`.
'''
await asyncio.sleep(0.2)
raise RuntimeError('Some shite went wrong before `.send_nowait()`!!')
# to_trio.send_nowait('Uhh we shouldve RTE-d ^^ ??')
chan.started_nowait('Uhh we shouldve RTE-d ^^ ??')
await asyncio.sleep(float('inf'))
@tractor.context
async def caching_ep(
ctx: tractor.Context,
):
log = tractor.log.get_logger('caching_ep')
log.info('syncing via `ctx.started()`')
await ctx.started()
# XXX, allocate the `open_channel_from()` inside
# a `.trionics.maybe_open_context()`.
chan: to_asyncio.LinkedTaskChannel
async with (
tractor.trionics.maybe_open_context(
acm_func=tractor.to_asyncio.open_channel_from,
kwargs={
'target': raise_before_started,
# ^XXX, kwarg to `open_channel_from()`
},
# lock around current actor task access
key=tractor.current_actor().uid,
) as (cache_hit, (clients, chan)),
):
if cache_hit:
log.error(
'Re-using cached `.open_from_channel()` call!\n'
)
else:
log.info(
'Allocating SHOULD-FAIL `.open_from_channel()`\n'
)
await trio.sleep_forever()
def test_aio_side_raises_before_started(
reg_addr: tuple[str, int],
debug_mode: bool,
loglevel: str,
):
'''
Simulates connection-err from `piker.brokers.ib.api`..
Ensure any error raised by child-`asyncio.Task` BEFORE
`chan.started()`
'''
# delay = 999 if debug_mode else 1
async def main():
with trio.fail_after(3):
an: tractor.ActorNursery
async with tractor.open_nursery(
debug_mode=debug_mode,
loglevel=loglevel,
) as an:
p: tractor.Portal = await an.start_actor(
'lchan_cacher_that_raises_fast',
enable_modules=[__name__],
infect_asyncio=True,
)
async with p.open_context(
caching_ep,
) as (ctx, first):
assert not first
with pytest.raises(
expected_exception=(RemoteActorError),
) as excinfo:
trio.run(main)
# ensure `asyncio.Task` exception is bubbled
# allll the way erp!!
rae = excinfo.value
assert rae.boxed_type is RuntimeError
# TODO: debug_mode tests once we get support for `asyncio`!
#
# -[ ] need tests to wrap both scripts:

View File

@ -1,6 +1,5 @@
'''
Suites for our `.trionics.maybe_open_context()` multi-task
shared-cached `@acm` API.
Async context manager cache api testing: ``trionics.maybe_open_context():``
'''
from contextlib import asynccontextmanager as acm
@ -10,15 +9,6 @@ from typing import Awaitable
import pytest
import trio
import tractor
from tractor.trionics import (
maybe_open_context,
)
from tractor.log import (
get_console_log,
get_logger,
)
log = get_logger(__name__)
_resource: int = 0
@ -62,7 +52,7 @@ def test_resource_only_entered_once(key_on):
# different task names per task will be used
kwargs = {'task_name': name}
async with maybe_open_context(
async with tractor.trionics.maybe_open_context(
maybe_increment_counter,
kwargs=kwargs,
key=key,
@ -82,13 +72,11 @@ def test_resource_only_entered_once(key_on):
with trio.move_on_after(0.5):
async with (
tractor.open_root_actor(),
trio.open_nursery() as tn,
trio.open_nursery() as n,
):
for i in range(10):
tn.start_soon(
enter_cached_mngr,
f'task_{i}',
)
n.start_soon(enter_cached_mngr, f'task_{i}')
await trio.sleep(0.001)
trio.run(main)
@ -110,34 +98,23 @@ async def streamer(
@acm
async def open_stream() -> Awaitable[
tuple[
tractor.ActorNursery,
tractor.MsgStream,
]
]:
async def open_stream() -> Awaitable[tractor.MsgStream]:
try:
async with tractor.open_nursery() as an:
portal = await an.start_actor(
'streamer',
enable_modules=[__name__],
)
try:
async with (
portal.open_context(streamer) as (ctx, first),
ctx.open_stream() as stream,
):
print('Entered open_stream() caller')
yield an, stream
print('Exited open_stream() caller')
async with (
portal.open_context(streamer) as (ctx, first),
ctx.open_stream() as stream,
):
yield stream
finally:
print(
'Cancelling streamer with,\n'
'=> `Portal.cancel_actor()`'
)
await portal.cancel_actor()
print('Cancelled streamer')
print('Cancelling streamer')
await portal.cancel_actor()
print('Cancelled streamer')
except Exception as err:
print(
@ -150,15 +127,11 @@ async def open_stream() -> Awaitable[
@acm
async def maybe_open_stream(taskname: str):
async with maybe_open_context(
async with tractor.trionics.maybe_open_context(
# NOTE: all secondary tasks should cache hit on the same key
acm_func=open_stream,
) as (
cache_hit,
(an, stream)
):
# when the actor + portal + ctx + stream has already been
# allocated we want to just bcast to this task.
) as (cache_hit, stream):
if cache_hit:
print(f'{taskname} loaded from cache')
@ -166,43 +139,10 @@ async def maybe_open_stream(taskname: str):
# if this feed is already allocated by the first
# task that entereed
async with stream.subscribe() as bstream:
yield an, bstream
print(
f'cached task exited\n'
f')>\n'
f' |_{taskname}\n'
)
# we should always unreg the "cloned" bcrc for this
# consumer-task
assert id(bstream) not in bstream._state.subs
yield bstream
else:
# yield the actual stream
try:
yield an, stream
finally:
print(
f'NON-cached task exited\n'
f')>\n'
f' |_{taskname}\n'
)
first_bstream = stream._broadcaster
bcrx_state = first_bstream._state
subs: dict[int, int] = bcrx_state.subs
if len(subs) == 1:
assert id(first_bstream) in subs
# ^^TODO! the bcrx should always de-allocate all subs,
# including the implicit first one allocated on entry
# by the first subscribing peer task, no?
#
# -[ ] adjust `MsgStream.subscribe()` to do this mgmt!
# |_ allows reverting `MsgStream.receive()` to the
# non-bcaster method.
# |_ we can decide whether to reset `._broadcaster`?
#
# await tractor.pause(shield=True)
yield stream
def test_open_local_sub_to_stream(
@ -219,24 +159,16 @@ def test_open_local_sub_to_stream(
if debug_mode:
timeout = 999
print(f'IN debug_mode, setting large timeout={timeout!r}..')
async def main():
full = list(range(1000))
an: tractor.ActorNursery|None = None
num_tasks: int = 10
async def get_sub_and_pull(taskname: str):
nonlocal an
stream: tractor.MsgStream
async with (
maybe_open_stream(taskname) as (
an,
stream,
),
maybe_open_stream(taskname) as stream,
):
if '0' in taskname:
assert isinstance(stream, tractor.MsgStream)
@ -248,159 +180,34 @@ def test_open_local_sub_to_stream(
first = await stream.receive()
print(f'{taskname} started with value {first}')
seq: list[int] = []
seq = []
async for msg in stream:
seq.append(msg)
assert set(seq).issubset(set(full))
# end of @acm block
print(f'{taskname} finished')
root: tractor.Actor
with trio.fail_after(timeout) as cs:
# TODO: turns out this isn't multi-task entrant XD
# We probably need an indepotent entry semantic?
async with tractor.open_root_actor(
debug_mode=debug_mode,
# maybe_enable_greenback=True,
#
# ^TODO? doesn't seem to mk breakpoint() usage work
# bc each bg task needs to open a portal??
# - [ ] we should consider making this part of
# our taskman defaults?
# |_see https://github.com/goodboy/tractor/pull/363
#
) as root:
assert root.is_registrar
):
async with (
trio.open_nursery() as tn,
):
for i in range(num_tasks):
for i in range(10):
tn.start_soon(
get_sub_and_pull,
f'task_{i}',
)
await trio.sleep(0.001)
print('all consumer tasks finished!')
# ?XXX, ensure actor-nursery is shutdown or we might
# hang here due to a minor task deadlock/race-condition?
#
# - seems that all we need is a checkpoint to ensure
# the last suspended task, which is inside
# `.maybe_open_context()`, can do the
# `Portal.cancel_actor()` call?
#
# - if that bg task isn't resumed, then this blocks
# timeout might hit before that?
#
if root.ipc_server.has_peers():
await trio.lowlevel.checkpoint()
# alt approach, cancel the entire `an`
# await tractor.pause()
# await an.cancel()
# end of runtime scope
print('root actor terminated.')
print('all consumer tasks finished')
if cs.cancelled_caught:
pytest.fail(
'Should NOT time out in `open_root_actor()` ?'
)
print('exiting main.')
trio.run(main)
@acm
async def cancel_outer_cs(
cs: trio.CancelScope|None = None,
delay: float = 0,
):
# on first task delay this enough to block
# the 2nd task but then cancel it mid sleep
# so that the tn.start() inside the key-err handler block
# is cancelled and would previously corrupt the
# mutext state.
log.info(f'task entering sleep({delay})')
await trio.sleep(delay)
if cs:
log.info('task calling cs.cancel()')
cs.cancel()
trio.lowlevel.checkpoint()
yield
await trio.sleep_forever()
def test_lock_not_corrupted_on_fast_cancel(
debug_mode: bool,
loglevel: str,
):
'''
Verify that if the caching-task (the first to enter
`maybe_open_context()`) is cancelled mid-cache-miss, the embedded
mutex can never be left in a corrupted state.
That is, the lock is always eventually released ensuring a peer
(cache-hitting) task will never,
- be left to inf-block/hang on the `lock.acquire()`.
- try to release the lock when still owned by the caching-task
due to it having erronously exited without calling
`lock.release()`.
'''
delay: float = 1.
async def use_moc(
cs: trio.CancelScope|None,
delay: float,
):
log.info('task entering moc')
async with maybe_open_context(
cancel_outer_cs,
kwargs={
'cs': cs,
'delay': delay,
},
) as (cache_hit, _null):
if cache_hit:
log.info('2nd task entered')
else:
log.info('1st task entered')
await trio.sleep_forever()
async def main():
with trio.fail_after(delay + 2):
async with (
tractor.open_root_actor(
debug_mode=debug_mode,
loglevel=loglevel,
),
trio.open_nursery() as tn,
):
get_console_log('info')
log.info('yo starting')
cs = tn.cancel_scope
tn.start_soon(
use_moc,
cs,
delay,
name='child',
)
with trio.CancelScope() as rent_cs:
await use_moc(
cs=rent_cs,
delay=delay,
)
trio.run(main)

View File

@ -154,7 +154,7 @@ class Context:
2 cancel-scope-linked, communicating and parallel executing
`Task`s. Contexts are allocated on each side of any task
RPC-linked msg dialog, i.e. for every request to a remote
actor from a `Portal`. On the "child" side a context is
actor from a `Portal`. On the "callee" side a context is
always allocated inside `._rpc._invoke()`.
TODO: more detailed writeup on cancellation, error and
@ -222,8 +222,8 @@ class Context:
# `._runtime.invoke()`.
_remote_func_type: str | None = None
# NOTE: (for now) only set (a portal) on the parent side since
# the child doesn't generally need a ref to one and should
# NOTE: (for now) only set (a portal) on the caller side since
# the callee doesn't generally need a ref to one and should
# normally need to explicitly ask for handle to its peer if
# more the the `Context` is needed?
_portal: Portal | None = None
@ -252,12 +252,12 @@ class Context:
_outcome_msg: Return|Error|ContextCancelled = Unresolved
# on a clean exit there should be a final value
# delivered from the far end "child" task, so
# delivered from the far end "callee" task, so
# this value is only set on one side.
# _result: Any | int = None
_result: PayloadT|Unresolved = Unresolved
# if the local "parent" task errors this value is always set
# if the local "caller" task errors this value is always set
# to the error that was captured in the
# `Portal.open_context().__aexit__()` teardown block OR, in
# 2 special cases when an (maybe) expected remote error
@ -293,7 +293,7 @@ class Context:
# a `ContextCancelled` due to a call to `.cancel()` triggering
# "graceful closure" on either side:
# - `._runtime._invoke()` will check this flag before engaging
# the crash handler REPL in such cases where the "child"
# the crash handler REPL in such cases where the "callee"
# raises the cancellation,
# - `.devx.debug.lock_stdio_for_peer()` will set it to `False` if
# the global tty-lock has been configured to filter out some
@ -307,8 +307,8 @@ class Context:
_stream_opened: bool = False
_stream: MsgStream|None = None
# the parent-task's calling-fn's frame-info, the frame above
# `Portal.open_context()`, for introspection/logging.
# caller of `Portal.open_context()` for
# logging purposes mostly
_caller_info: CallerInfo|None = None
# overrun handling machinery
@ -529,11 +529,11 @@ class Context:
'''
Exactly the value of `self._scope.cancelled_caught`
(delegation) and should only be (able to be read as)
`True` for a `.side == "parent"` ctx wherein the
`True` for a `.side == "caller"` ctx wherein the
`Portal.open_context()` block was exited due to a call to
`._scope.cancel()` - which should only ocurr in 2 cases:
- a parent side calls `.cancel()`, the far side cancels
- a caller side calls `.cancel()`, the far side cancels
and delivers back a `ContextCancelled` (making
`.cancel_acked == True`) and `._scope.cancel()` is
called by `._maybe_cancel_and_set_remote_error()` which
@ -542,20 +542,20 @@ class Context:
=> `._scope.cancelled_caught == True` by normal `trio`
cs semantics.
- a parent side is delivered a `._remote_error:
- a caller side is delivered a `._remote_error:
RemoteActorError` via `._deliver_msg()` and a transitive
call to `_maybe_cancel_and_set_remote_error()` calls
`._scope.cancel()` and that cancellation eventually
results in `trio.Cancelled`(s) caught in the
`.open_context()` handling around the @acm's `yield`.
Only as an FYI, in the "child" side case it can also be
Only as an FYI, in the "callee" side case it can also be
set but never is readable by any task outside the RPC
machinery in `._invoke()` since,:
- when a child side calls `.cancel()`, `._scope.cancel()`
- when a callee side calls `.cancel()`, `._scope.cancel()`
is called immediately and handled specially inside
`._invoke()` to raise a `ContextCancelled` which is then
sent to the parent side.
sent to the caller side.
However, `._scope.cancelled_caught` can NEVER be
accessed/read as `True` by any RPC invoked task since it
@ -666,7 +666,7 @@ class Context:
when called/closed by actor local task(s).
NOTEs:
- It is expected that the parent has previously unwrapped
- It is expected that the caller has previously unwrapped
the remote error using a call to `unpack_error()` and
provides that output exception value as the input
`error` argument *here*.
@ -676,7 +676,7 @@ class Context:
`Portal.open_context()` (ideally) we want to interrupt
any ongoing local tasks operating within that
`Context`'s cancel-scope so as to be notified ASAP of
the remote error and engage any parent handling (eg.
the remote error and engage any caller handling (eg.
for cross-process task supervision).
- In some cases we may want to raise the remote error
@ -886,11 +886,6 @@ class Context:
@property
def repr_caller(self) -> str:
'''
Render a "namespace-path" style representation of the calling
task-fn.
'''
ci: CallerInfo|None = self._caller_info
if ci:
return (
@ -904,7 +899,7 @@ class Context:
def repr_api(self) -> str:
return 'Portal.open_context()'
# TODO: use `.dev._frame_stack` scanning to find caller fn!
# TODO: use `.dev._frame_stack` scanning to find caller!
# ci: CallerInfo|None = self._caller_info
# if ci:
# return (
@ -939,7 +934,7 @@ class Context:
=> That is, an IPC `Context` (this) **does not**
have the same semantics as a `trio.CancelScope`.
If the parent (who entered the `Portal.open_context()`)
If the caller (who entered the `Portal.open_context()`)
desires that the internal block's cancel-scope be
cancelled it should open its own `trio.CancelScope` and
manage it as needed.
@ -1011,6 +1006,7 @@ class Context:
else:
log.cancel(
f'Timed out on cancel request of remote task?\n'
f'\n'
f'{reminfo}'
)
@ -1021,7 +1017,7 @@ class Context:
# `_invoke()` RPC task.
#
# NOTE: on this side we ALWAYS cancel the local scope
# since the parent expects a `ContextCancelled` to be sent
# since the caller expects a `ContextCancelled` to be sent
# from `._runtime._invoke()` back to the other side. The
# logic for catching the result of the below
# `._scope.cancel()` is inside the `._runtime._invoke()`
@ -1194,8 +1190,8 @@ class Context:
) -> Any|Exception:
'''
From some (parent) side task, wait for and return the final
result from the remote (child) side's task.
From some (caller) side task, wait for and return the final
result from the remote (callee) side's task.
This provides a mechanism for one task running in some actor to wait
on another task at the other side, in some other actor, to terminate.
@ -1491,12 +1487,6 @@ class Context:
):
status = 'peer-cancelled'
case (
Unresolved,
trio.Cancelled(), # any error-type
) if self.canceller:
status = 'actor-cancelled'
# (remote) error condition
case (
Unresolved,
@ -1610,7 +1600,7 @@ class Context:
raise err
# TODO: maybe a flag to by-pass encode op if already done
# here in parent?
# here in caller?
await self.chan.send(started_msg)
# set msg-related internal runtime-state
@ -1686,7 +1676,7 @@ class Context:
XXX RULES XXX
------ - ------
- NEVER raise remote errors from this method; a calling runtime-task.
- NEVER raise remote errors from this method; a runtime task caller.
An error "delivered" to a ctx should always be raised by
the corresponding local task operating on the
`Portal`/`Context` APIs.
@ -1762,7 +1752,7 @@ class Context:
else:
report = (
'Queueing OVERRUN msg on parent task:\n\n'
'Queueing OVERRUN msg on caller task:\n\n'
+ report
)
log.debug(report)
@ -1958,12 +1948,12 @@ async def open_context_from_portal(
IPC protocol.
The yielded `tuple` is a pair delivering a `tractor.Context`
and any first value "sent" by the "child" task via a call
and any first value "sent" by the "callee" task via a call
to `Context.started(<value: Any>)`; this side of the
context does not unblock until the "child" task calls
context does not unblock until the "callee" task calls
`.started()` in similar style to `trio.Nursery.start()`.
When the "child" (side that is "called"/started by a call
to *this* method) returns, the parent side (this) unblocks
When the "callee" (side that is "called"/started by a call
to *this* method) returns, the caller side (this) unblocks
and any final value delivered from the other end can be
retrieved using the `Contex.wait_for_result()` api.
@ -1976,7 +1966,7 @@ async def open_context_from_portal(
__tracebackhide__: bool = hide_tb
# denote this frame as a "runtime frame" for stack
# introspection where we report the parent code in logging
# introspection where we report the caller code in logging
# and error message content.
# NOTE: 2 bc of the wrapping `@acm`
__runtimeframe__: int = 2 # noqa
@ -2035,7 +2025,7 @@ async def open_context_from_portal(
# placeholder for any exception raised in the runtime
# or by user tasks which cause this context's closure.
scope_err: BaseException|None = None
ctxc_from_child: ContextCancelled|None = None
ctxc_from_callee: ContextCancelled|None = None
try:
async with (
collapse_eg(),
@ -2114,7 +2104,7 @@ async def open_context_from_portal(
# that we can re-use it around the `yield` ^ here
# or vice versa?
#
# maybe TODO NOTE: between the parent exiting and
# maybe TODO NOTE: between the caller exiting and
# arriving here the far end may have sent a ctxc-msg or
# other error, so the quetion is whether we should check
# for it here immediately and maybe raise so as to engage
@ -2180,16 +2170,16 @@ async def open_context_from_portal(
# request in which case we DO let the error bubble to the
# opener.
#
# 2-THIS "parent" task somewhere invoked `Context.cancel()`
# and received a `ContextCanclled` from the "child"
# 2-THIS "caller" task somewhere invoked `Context.cancel()`
# and received a `ContextCanclled` from the "callee"
# task, in which case we mask the `ContextCancelled` from
# bubbling to this "parent" (much like how `trio.Nursery`
# bubbling to this "caller" (much like how `trio.Nursery`
# swallows any `trio.Cancelled` bubbled by a call to
# `Nursery.cancel_scope.cancel()`)
except ContextCancelled as ctxc:
scope_err = ctxc
ctx._local_error: BaseException = scope_err
ctxc_from_child = ctxc
ctxc_from_callee = ctxc
# XXX TODO XXX: FIX THIS debug_mode BUGGGG!!!
# using this code and then resuming the REPL will
@ -2226,11 +2216,11 @@ async def open_context_from_portal(
# the above `._scope` can be cancelled due to:
# 1. an explicit self cancel via `Context.cancel()` or
# `Actor.cancel()`,
# 2. any "child"-side remote error, possibly also a cancellation
# 2. any "callee"-side remote error, possibly also a cancellation
# request by some peer,
# 3. any "parent" (aka THIS scope's) local error raised in the above `yield`
# 3. any "caller" (aka THIS scope's) local error raised in the above `yield`
except (
# CASE 3: standard local error in this parent/yieldee
# CASE 3: standard local error in this caller/yieldee
Exception,
# CASES 1 & 2: can manifest as a `ctx._scope_nursery`
@ -2244,9 +2234,9 @@ async def open_context_from_portal(
# any `Context._maybe_raise_remote_err()` call.
#
# 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]`
# from any error delivered from the "child" side
# from any error delivered from the "callee" side
# AND a group-exc is only raised if there was > 1
# tasks started *here* in the "parent" / opener
# tasks started *here* in the "caller" / opener
# block. If any one of those tasks calls
# `.wait_for_result()` or `MsgStream.receive()`
# `._maybe_raise_remote_err()` will be transitively
@ -2259,8 +2249,8 @@ async def open_context_from_portal(
trio.Cancelled, # NOTE: NOT from inside the ctx._scope
KeyboardInterrupt,
) as rent_err:
scope_err = rent_err
) as caller_err:
scope_err = caller_err
ctx._local_error: BaseException = scope_err
# XXX: ALWAYS request the context to CANCEL ON any ERROR.
@ -2278,7 +2268,7 @@ async def open_context_from_portal(
logmeth = log.exception
logmeth(
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n'
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()}\n'
)
if debug_mode():
@ -2299,9 +2289,9 @@ async def open_context_from_portal(
'Calling `ctx.cancel()`!\n'
)
# we don't need to cancel the child if it already
# we don't need to cancel the callee if it already
# told us it's cancelled ;p
if ctxc_from_child is None:
if ctxc_from_callee is None:
try:
await ctx.cancel()
except (
@ -2332,8 +2322,8 @@ async def open_context_from_portal(
# via a call to
# `Context._maybe_cancel_and_set_remote_error()`.
# As per `Context._deliver_msg()`, that error IS
# ALWAYS SET any time "child" side fails and causes
# "parent side" cancellation via a `ContextCancelled` here.
# ALWAYS SET any time "callee" side fails and causes "caller
# side" cancellation via a `ContextCancelled` here.
try:
result_or_err: Exception|Any = await ctx.wait_for_result()
except BaseException as berr:
@ -2369,7 +2359,7 @@ async def open_context_from_portal(
)
case (None, _):
log.runtime(
'Context returned final result from child task:\n'
'Context returned final result from callee task:\n'
f'<= peer: {uid}\n'
f' |_ {nsf}()\n\n'
@ -2464,7 +2454,7 @@ async def open_context_from_portal(
)
# TODO: should we add a `._cancel_req_received`
# flag to determine if the child manually called
# flag to determine if the callee manually called
# `ctx.cancel()`?
# -[ ] going to need a cid check no?
@ -2520,7 +2510,7 @@ def mk_context(
recv_chan: trio.MemoryReceiveChannel
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
# TODO: only scan parent-info if log level so high!
# TODO: only scan caller-info if log level so high!
from .devx._frame_stack import find_caller_info
caller_info: CallerInfo|None = find_caller_info()

View File

@ -300,7 +300,7 @@ class Portal:
)
# XXX the one spot we set it?
chan._cancel_called: bool = True
self.channel._cancel_called: bool = True
try:
# send cancel cmd - might not get response
# XXX: sure would be nice to make this work with

View File

@ -481,11 +481,10 @@ async def open_root_actor(
collapse_eg(),
trio.open_nursery() as root_tn,
# ?TODO? finally-footgun below?
# XXX, finally-footgun below?
# -> see note on why shielding.
# maybe_raise_from_masking_exc(),
):
actor._root_tn = root_tn
# `_runtime.async_main()` creates an internal nursery
# and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called
@ -524,11 +523,6 @@ async def open_root_actor(
err,
api_frame=inspect.currentframe(),
debug_filter=debug_filter,
# XXX NOTE, required to debug root-actor
# crashes under cancellation conditions; so
# most of them!
shield=root_tn.cancel_scope.cancel_called,
)
if (
@ -568,7 +562,6 @@ async def open_root_actor(
f'{op_nested_actor_repr}'
)
# XXX, THIS IS A *finally-footgun*!
# (also mentioned in with-block above)
# -> though already shields iternally it can
# taskc here and mask underlying errors raised in
# the try-block above?

View File

@ -384,7 +384,7 @@ async def _errors_relayed_via_ipc(
# RPC task bookeeping.
# since RPC tasks are scheduled inside a flat
# `Actor._service_tn`, we add "handles" to each such that
# `Actor._service_n`, we add "handles" to each such that
# they can be individually ccancelled.
finally:
@ -462,7 +462,7 @@ async def _invoke(
connected IPC channel.
This is the core "RPC" `trio.Task` scheduling machinery used to start every
remotely invoked function, normally in `Actor._service_tn: Nursery`.
remotely invoked function, normally in `Actor._service_n: Nursery`.
'''
__tracebackhide__: bool = hide_tb
@ -642,7 +642,7 @@ async def _invoke(
tn: Nursery
rpc_ctx_cs: CancelScope
async with (
collapse_eg(hide_tb=False),
collapse_eg(),
trio.open_nursery() as tn,
msgops.maybe_limit_plds(
ctx=ctx,
@ -823,44 +823,24 @@ async def _invoke(
f'after having {ctx.repr_state!r}\n'
)
if merr:
logmeth: Callable = log.error
if (
# ctxc: by `Context.cancel()`
isinstance(merr, ContextCancelled)
if isinstance(merr, ContextCancelled):
logmeth: Callable = log.runtime
# out-of-layer cancellation, one of:
# - actorc: by `Portal.cancel_actor()`
# - OSc: by SIGINT or `Process.signal()`
or (
isinstance(merr, trio.Cancelled)
and
ctx.canceller
)
):
logmeth: Callable = log.cancel
descr_str += (
f' with {merr!r}\n'
)
elif (
not isinstance(merr, RemoteActorError)
):
tb_str: str = ''.join(
traceback.format_exception(merr)
)
if not isinstance(merr, RemoteActorError):
tb_str: str = ''.join(traceback.format_exception(merr))
descr_str += (
f'\n{merr!r}\n' # needed?
f'{tb_str}\n'
f'\n'
f'scope_error:\n'
f'{scope_err!r}\n'
)
else:
descr_str += (
f'{merr!r}\n'
)
descr_str += f'\n{merr!r}\n'
else:
descr_str += (
f'\n'
f'with final result {ctx.outcome!r}\n'
)
descr_str += f'\nwith final result {ctx.outcome!r}\n'
logmeth(
f'{message}\n'
@ -936,7 +916,7 @@ async def process_messages(
Receive (multiplexed) per-`Channel` RPC requests as msgs from
remote processes; schedule target async funcs as local
`trio.Task`s inside the `Actor._service_tn: Nursery`.
`trio.Task`s inside the `Actor._service_n: Nursery`.
Depending on msg type, non-`cmd` (task spawning/starting)
request payloads (eg. `started`, `yield`, `return`, `error`)
@ -961,7 +941,7 @@ async def process_messages(
'''
actor: Actor = _state.current_actor()
assert actor._service_tn # runtime state sanity
assert actor._service_n # runtime state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we
# should use it?
@ -1172,7 +1152,7 @@ async def process_messages(
start_status += '->( scheduling new task..\n'
log.runtime(start_status)
try:
ctx: Context = await actor._service_tn.start(
ctx: Context = await actor._service_n.start(
partial(
_invoke,
actor,
@ -1312,7 +1292,7 @@ async def process_messages(
) as err:
if nursery_cancelled_before_task:
sn: Nursery = actor._service_tn
sn: Nursery = actor._service_n
assert sn and sn.cancel_scope.cancel_called # sanity
log.cancel(
f'Service nursery cancelled before it handled {funcname}'

View File

@ -35,15 +35,6 @@ for running all lower level spawning, supervision and msging layers:
SC-transitive RPC via scheduling of `trio` tasks.
- registration of newly spawned actors with the discovery sys.
Glossary:
--------
- tn: a `trio.Nursery` or "task nursery".
- an: an `ActorNursery` or "actor nursery".
- root: top/parent-most scope/task/process/actor (or other runtime
primitive) in a hierarchical tree.
- parent-ish: "higher-up" in the runtime-primitive hierarchy.
- child-ish: "lower-down" in the runtime-primitive hierarchy.
'''
from __future__ import annotations
from contextlib import (
@ -85,7 +76,6 @@ from tractor.msg import (
)
from .trionics import (
collapse_eg,
maybe_open_nursery,
)
from .ipc import (
Channel,
@ -183,11 +173,10 @@ class Actor:
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()`,
# - after fork for subactors.
# - during boot for the root actor.
_root_tn: Nursery|None = None
_service_tn: Nursery|None = None
# nursery placeholders filled in by `async_main()` after fork
_root_n: Nursery|None = None
_service_n: Nursery|None = None
_ipc_server: _server.IPCServer|None = None
@property
@ -1021,48 +1010,12 @@ class Actor:
the RPC service nursery.
'''
actor_repr: str = _pformat.nest_from_op(
input_op='>c(',
text=self.pformat(),
nest_indent=1,
)
log.cancel(
'Actor.cancel_soon()` was called!\n'
f'>> scheduling `Actor.cancel()`\n'
f'{actor_repr}'
)
assert self._service_tn
self._service_tn.start_soon(
assert self._service_n
self._service_n.start_soon(
self.cancel,
None, # self cancel all rpc tasks
)
# schedule a "canceller task" in the `._root_tn` once the
# `._service_tn` is fully shutdown; task waits for child-ish
# scopes to fully exit then finally cancels its parent,
# root-most, scope.
async def cancel_root_tn_after_services():
log.runtime(
'Waiting on service-tn to cancel..\n'
f'c>)\n'
f'|_{self._service_tn.cancel_scope!r}\n'
)
await self._cancel_complete.wait()
log.cancel(
f'`._service_tn` cancelled\n'
f'>c)\n'
f'|_{self._service_tn.cancel_scope!r}\n'
f'\n'
f'>> cancelling `._root_tn`\n'
f'c>(\n'
f' |_{self._root_tn.cancel_scope!r}\n'
)
self._root_tn.cancel_scope.cancel()
self._root_tn.start_soon(
cancel_root_tn_after_services
)
@property
def cancel_complete(self) -> bool:
return self._cancel_complete.is_set()
@ -1167,8 +1120,8 @@ class Actor:
await ipc_server.wait_for_shutdown()
# cancel all rpc tasks permanently
if self._service_tn:
self._service_tn.cancel_scope.cancel()
if self._service_n:
self._service_n.cancel_scope.cancel()
log_meth(msg)
self._cancel_complete.set()
@ -1305,7 +1258,7 @@ class Actor:
'''
Cancel all ongoing RPC tasks owned/spawned for a given
`parent_chan: Channel` or simply all tasks (inside
`._service_tn`) when `parent_chan=None`.
`._service_n`) when `parent_chan=None`.
'''
tasks: dict = self._rpc_tasks
@ -1517,55 +1470,46 @@ async def async_main(
accept_addrs.append(addr.unwrap())
assert accept_addrs
ya_root_tn: bool = bool(actor._root_tn)
ya_service_tn: bool = bool(actor._service_tn)
# NOTE, a top-most "root" nursery in each actor-process
# enables a lifetime priority for the IPC-channel connection
# with a sub-actor's immediate parent. I.e. this connection
# is kept alive as a resilient service connection until all
# other machinery has exited, cancellation of all
# embedded/child scopes have completed. This helps ensure
# a deterministic (and thus "graceful")
# first-class-supervision style teardown where a parent actor
# (vs. say peers) is always the last to be contacted before
# disconnect.
# The "root" nursery ensures the channel with the immediate
# parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in
# a deterministic way.
root_tn: trio.Nursery
async with (
collapse_eg(),
maybe_open_nursery(
nursery=actor._root_tn,
) as root_tn,
trio.open_nursery() as root_tn,
):
if ya_root_tn:
assert root_tn is actor._root_tn
else:
actor._root_tn = root_tn
actor._root_n = root_tn
assert actor._root_n
ipc_server: _server.IPCServer
async with (
collapse_eg(),
maybe_open_nursery(
nursery=actor._service_tn,
) as service_tn,
trio.open_nursery() as service_nursery,
_server.open_ipc_server(
parent_tn=service_tn, # ?TODO, why can't this be the root-tn
stream_handler_tn=service_tn,
parent_tn=service_nursery,
stream_handler_tn=service_nursery,
) as ipc_server,
# ) as actor._ipc_server,
# ^TODO? prettier?
):
if ya_service_tn:
assert service_tn is actor._service_tn
else:
# This nursery is used to handle all inbound
# connections to us such that if the TCP server
# is killed, connections can continue to process
# in the background until this nursery is cancelled.
actor._service_tn = service_tn
# set after allocate
# This nursery is used to handle all inbound
# connections to us such that if the TCP server
# is killed, connections can continue to process
# in the background until this nursery is cancelled.
actor._service_n = service_nursery
actor._ipc_server = ipc_server
assert (
actor._service_n
and (
actor._service_n
is
actor._ipc_server._parent_tn
is
ipc_server._stream_handler_tn
)
)
# load exposed/allowed RPC modules
# XXX: do this **after** establishing a channel to the parent
@ -1591,11 +1535,10 @@ async def async_main(
# - root actor: the ``accept_addr`` passed to this method
# TODO: why is this not with the root nursery?
# - see above that the `._service_tn` is what's used?
try:
eps: list = await ipc_server.listen_on(
accept_addrs=accept_addrs,
stream_handler_nursery=service_tn,
stream_handler_nursery=service_nursery,
)
log.runtime(
f'Booted IPC server\n'
@ -1603,7 +1546,7 @@ async def async_main(
)
assert (
(eps[0].listen_tn)
is not service_tn
is not service_nursery
)
except OSError as oserr:
@ -1765,7 +1708,7 @@ async def async_main(
# XXX TODO but hard XXX
# we can't actually do this bc the debugger uses the
# _service_tn to spawn the lock task, BUT, in theory if we had
# _service_n to spawn the lock task, BUT, in theory if we had
# the root nursery surround this finally block it might be
# actually possible to debug THIS machinery in the same way
# as user task code?

View File

@ -297,23 +297,6 @@ async def hard_kill(
# zombies (as a feature) we ask the OS to do send in the
# removal swad as the last resort.
if cs.cancelled_caught:
# TODO? attempt at intermediary-rent-sub
# with child in debug lock?
# |_https://github.com/goodboy/tractor/issues/320
#
# if not is_root_process():
# log.warning(
# 'Attempting to acquire debug-REPL-lock before zombie reap!'
# )
# with trio.CancelScope(shield=True):
# async with debug.acquire_debug_lock(
# subactor_uid=current_actor().uid,
# ) as _ctx:
# log.warning(
# 'Acquired debug lock, child ready to be killed ??\n'
# )
# TODO: toss in the skynet-logo face as ascii art?
log.critical(
# 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'

View File

@ -117,6 +117,7 @@ class ActorNursery:
]
] = {}
self.cancelled: bool = False
self._join_procs = trio.Event()
self._at_least_one_child_in_debug: bool = False
self.errors = errors
@ -134,53 +135,10 @@ class ActorNursery:
# TODO: remove the `.run_in_actor()` API and thus this 2ndary
# nursery when that API get's moved outside this primitive!
self._ria_nursery = ria_nursery
# TODO, factor this into a .hilevel api!
#
# portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set()
# trio.Nursery-like cancel (request) statuses
self._cancelled_caught: bool = False
self._cancel_called: bool = False
@property
def cancel_called(self) -> bool:
'''
Records whether cancellation has been requested for this
actor-nursery by a call to `.cancel()` either due to,
- an explicit call by some actor-local-task,
- an implicit call due to an error/cancel emited inside
the `tractor.open_nursery()` block.
'''
return self._cancel_called
@property
def cancelled_caught(self) -> bool:
'''
Set when this nursery was able to cance all spawned subactors
gracefully via an (implicit) call to `.cancel()`.
'''
return self._cancelled_caught
# TODO! remove internal/test-suite usage!
@property
def cancelled(self) -> bool:
warnings.warn(
"`ActorNursery.cancelled` is now deprecated, use "
" `.cancel_called` instead.",
DeprecationWarning,
stacklevel=2,
)
return (
self._cancel_called
# and
# self._cancelled_caught
)
async def start_actor(
self,
name: str,
@ -358,7 +316,7 @@ class ActorNursery:
'''
__runtimeframe__: int = 1 # noqa
self._cancel_called = True
self.cancelled = True
# TODO: impl a repr for spawn more compact
# then `._children`..
@ -436,8 +394,6 @@ class ActorNursery:
) in children.values():
log.warning(f"Hard killing process {proc}")
proc.terminate()
else:
self._cancelled_caught
# mark ourselves as having (tried to have) cancelled all subactors
self._join_procs.set()
@ -647,7 +603,6 @@ _shutdown_msg: str = (
@acm
# @api_frame
async def open_nursery(
*, # named params only!
hide_tb: bool = True,
**kwargs,
# ^TODO, paramspec for `open_root_actor()`

View File

@ -250,7 +250,7 @@ async def _maybe_enter_pm(
*,
tb: TracebackType|None = None,
api_frame: FrameType|None = None,
hide_tb: bool = True,
hide_tb: bool = False,
# only enter debugger REPL when returns `True`
debug_filter: Callable[

View File

@ -58,7 +58,6 @@ from tractor._context import Context
from tractor import _state
from tractor._exceptions import (
NoRuntime,
InternalError,
)
from tractor._state import (
current_actor,
@ -80,9 +79,6 @@ from ._sigint import (
sigint_shield as sigint_shield,
_ctlc_ignore_header as _ctlc_ignore_header
)
from ..pformat import (
ppfmt,
)
if TYPE_CHECKING:
from trio.lowlevel import Task
@ -481,12 +477,12 @@ async def _pause(
# we have to figure out how to avoid having the service nursery
# cancel on this task start? I *think* this works below:
# ```python
# actor._service_tn.cancel_scope.shield = shield
# actor._service_n.cancel_scope.shield = shield
# ```
# but not entirely sure if that's a sane way to implement it?
# NOTE currently we spawn the lock request task inside this
# subactor's global `Actor._service_tn` so that the
# subactor's global `Actor._service_n` so that the
# lifetime of the lock-request can outlive the current
# `._pause()` scope while the user steps through their
# application code and when they finally exit the
@ -510,7 +506,7 @@ async def _pause(
f'|_{task}\n'
)
with trio.CancelScope(shield=shield):
req_ctx: Context = await actor._service_tn.start(
req_ctx: Context = await actor._service_n.start(
partial(
request_root_stdio_lock,
actor_uid=actor.uid,
@ -544,7 +540,7 @@ async def _pause(
_repl_fail_report = None
# when the actor is mid-runtime cancellation the
# `Actor._service_tn` might get closed before we can spawn
# `Actor._service_n` might get closed before we can spawn
# the request task, so just ignore expected RTE.
elif (
isinstance(pause_err, RuntimeError)
@ -989,7 +985,7 @@ def pause_from_sync(
# that output and assign the `repl` created above!
bg_task, _ = trio.from_thread.run(
afn=partial(
actor._service_tn.start,
actor._service_n.start,
partial(
_pause_from_bg_root_thread,
behalf_of_thread=thread,
@ -1157,10 +1153,9 @@ def pause_from_sync(
'use_greenback',
False,
):
raise InternalError(
f'`greenback` was never initialized in this actor?\n'
f'\n'
f'{ppfmt(_state._runtime_vars)}\n'
raise RuntimeError(
'`greenback` was never initialized in this actor!?\n\n'
f'{_state._runtime_vars}\n'
) from rte
raise

View File

@ -101,27 +101,11 @@ class Channel:
# ^XXX! ONLY set if a remote actor sends an `Error`-msg
self._closed: bool = False
# flag set by `Portal.cancel_actor()` indicating remote
# (possibly peer) cancellation of the far end actor runtime.
# flag set by ``Portal.cancel_actor()`` indicating remote
# (possibly peer) cancellation of the far end actor
# runtime.
self._cancel_called: bool = False
@property
def closed(self) -> bool:
'''
Was `.aclose()` successfully called?
'''
return self._closed
@property
def cancel_called(self) -> bool:
'''
Set when `Portal.cancel_actor()` is called on a portal which
wraps this IPC channel.
'''
return self._cancel_called
@property
def uid(self) -> tuple[str, str]:
'''
@ -185,9 +169,7 @@ class Channel:
addr,
**kwargs,
)
# XXX, for UDS *no!* since we recv the peer-pid and build out
# a new addr..
# assert transport.raddr == addr
assert transport.raddr == addr
chan = Channel(transport=transport)
# ?TODO, compact this into adapter level-methods?
@ -303,7 +285,7 @@ class Channel:
self,
payload: Any,
hide_tb: bool = False,
hide_tb: bool = True,
) -> None:
'''

View File

@ -17,59 +17,29 @@
Utils to tame mp non-SC madeness
'''
import platform
def disable_mantracker():
'''
Disable all `multiprocessing` "resource tracking" machinery since
Disable all ``multiprocessing``` "resource tracking" machinery since
it's an absolute multi-threaded mess of non-SC madness.
'''
from multiprocessing.shared_memory import SharedMemory
from multiprocessing import resource_tracker as mantracker
# Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
# 3.13+ only.. can pass `track=False` to disable
# all the resource tracker bs.
# https://docs.python.org/3/library/multiprocessing.shared_memory.html
if (_py_313 := (
platform.python_version_tuple()[:-1]
>=
('3', '13')
)
):
from functools import partial
return partial(
SharedMemory,
track=False,
)
def unregister(self, name, rtype):
pass
# !TODO, once we drop 3.12- we can obvi remove all this!
else:
from multiprocessing import (
resource_tracker as mantracker,
)
def ensure_running(self):
pass
# Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
def unregister(self, name, rtype):
pass
def ensure_running(self):
pass
# "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
# use std type verbatim
shmT = SharedMemory
return shmT
# "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd

View File

@ -1001,11 +1001,7 @@ class Server(Struct):
partial(
_serve_ipc_eps,
server=self,
stream_handler_tn=(
stream_handler_nursery
or
self._stream_handler_tn
),
stream_handler_tn=stream_handler_nursery,
listen_addrs=accept_addrs,
)
)
@ -1149,17 +1145,13 @@ async def open_ipc_server(
async with maybe_open_nursery(
nursery=parent_tn,
) as parent_tn:
) as rent_tn:
no_more_peers = trio.Event()
no_more_peers.set()
ipc_server = IPCServer(
_parent_tn=parent_tn,
_stream_handler_tn=(
stream_handler_tn
or
parent_tn
),
_parent_tn=rent_tn,
_stream_handler_tn=stream_handler_tn or rent_tn,
_no_more_peers=no_more_peers,
)
try:

View File

@ -23,15 +23,14 @@ considered optional within the context of this runtime-library.
"""
from __future__ import annotations
from multiprocessing import shared_memory as shm
from multiprocessing.shared_memory import (
# SharedMemory,
ShareableList,
)
import platform
from sys import byteorder
import time
from typing import Optional
from multiprocessing import shared_memory as shm
from multiprocessing.shared_memory import (
SharedMemory,
ShareableList,
)
from msgspec import (
Struct,
@ -62,7 +61,7 @@ except ImportError:
log = get_logger(__name__)
SharedMemory = disable_mantracker()
disable_mantracker()
class SharedInt:
@ -790,23 +789,11 @@ def open_shm_list(
readonly=readonly,
)
# TODO, factor into a @actor_fixture acm-API?
# -[ ] also `@maybe_actor_fixture()` which inludes
# the .current_actor() convenience check?
# |_ orr can that just be in the sin-maybe-version?
#
# "close" attached shm on actor teardown
try:
actor = tractor.current_actor()
actor.lifetime_stack.callback(shml.shm.close)
# XXX on 3.13+ we don't need to call this?
# -> bc we pass `track=False` for `SharedMemeory` orr?
if (
platform.python_version_tuple()[:-1] < ('3', '13')
):
actor.lifetime_stack.callback(shml.shm.unlink)
actor.lifetime_stack.callback(shml.shm.unlink)
except RuntimeError:
log.warning('tractor runtime not active, skipping teardown steps')

View File

@ -430,25 +430,20 @@ class MsgpackTransport(MsgTransport):
return await self.stream.send_all(size + bytes_data)
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
) as _re:
trans_err = _re
) as bre:
trans_err = bre
tpt_name: str = f'{type(self).__name__!r}'
match trans_err:
# XXX, specifc to UDS transport and its,
# well, "speediness".. XD
# |_ likely todo with races related to how fast
# the socket is setup/torn-down on linux
# as it pertains to rando pings from the
# `.discovery` subsys and protos.
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe'
in
trans_err.args[0]
'[Errno 32] Broken pipe' in trans_err.args[0]
# ^XXX, specifc to UDS transport and its,
# well, "speediness".. XD
# |_ likely todo with races related to how fast
# the socket is setup/torn-down on linux
# as it pertains to rando pings from the
# `.discovery` subsys and protos.
):
tpt_closed = TransportClosed.from_src_exc(
raise TransportClosed.from_src_exc(
message=(
f'{tpt_name} already closed by peer\n'
),
@ -456,31 +451,14 @@ class MsgpackTransport(MsgTransport):
src_exc=trans_err,
raise_on_report=True,
loglevel='transport',
)
raise tpt_closed from trans_err
# case trio.ClosedResourceError() if (
# 'this socket was already closed'
# in
# trans_err.args[0]
# ):
# tpt_closed = TransportClosed.from_src_exc(
# message=(
# f'{tpt_name} already closed by peer\n'
# ),
# body=f'{self}\n',
# src_exc=trans_err,
# raise_on_report=True,
# loglevel='transport',
# )
# raise tpt_closed from trans_err
) from bre
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
log.exception(
f'{tpt_name} layer failed pre-send ??\n'
'{tpt_name} layer failed pre-send ??\n'
)
raise trans_err
@ -525,7 +503,7 @@ class MsgpackTransport(MsgTransport):
def pformat(self) -> str:
return (
f'<{type(self).__name__}(\n'
f' |_peers: 1\n'
f' |_peers: 2\n'
f' laddr: {self._laddr}\n'
f' raddr: {self._raddr}\n'
# f'\n'

View File

@ -18,9 +18,6 @@ Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protoco
'''
from __future__ import annotations
from contextlib import (
contextmanager as cm,
)
from pathlib import Path
import os
from socket import (
@ -32,7 +29,6 @@ from socket import (
)
import struct
from typing import (
Type,
TYPE_CHECKING,
ClassVar,
)
@ -103,6 +99,8 @@ class UDSAddress(
self.filedir
or
self.def_bindspace
# or
# get_rt_dir()
)
@property
@ -207,35 +205,12 @@ class UDSAddress(
f']'
)
@cm
def _reraise_as_connerr(
src_excs: tuple[Type[Exception]],
addr: UDSAddress,
):
try:
yield
except src_excs as src_exc:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {addr.sockpath}\n'
f'\n'
f'from src: {src_exc!r}\n'
) from src_exc
async def start_listener(
addr: UDSAddress,
**kwargs,
) -> SocketListener:
'''
Start listening for inbound connections via
a `trio.SocketListener` (task) which `socket.bind()`s on `addr`.
Note, if the `UDSAddress.bindspace: Path` directory dne it is
implicitly created.
'''
# sock = addr._sock = socket.socket(
sock = socket.socket(
socket.AF_UNIX,
socket.SOCK_STREAM
@ -246,25 +221,17 @@ async def start_listener(
f'|_{addr}\n'
)
# ?TODO? should we use the `actor.lifetime_stack`
# to rm on shutdown?
bindpath: Path = addr.sockpath
if not (bs := addr.bindspace).is_dir():
log.info(
'Creating bindspace dir in file-sys\n'
f'>{{\n'
f'|_{bs!r}\n'
)
bs.mkdir()
with _reraise_as_connerr(
src_excs=(
FileNotFoundError,
OSError,
),
addr=addr
):
try:
await sock.bind(str(bindpath))
except (
FileNotFoundError,
) as fdne:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {addr.sockpath}\n'
) from fdne
sock.listen(1)
log.info(
@ -389,30 +356,27 @@ class MsgpackUDSStream(MsgpackTransport):
# `.setsockopt()` call tells the OS provide it; the client
# pid can then be read on server/listen() side via
# `get_peer_info()` above.
with _reraise_as_connerr(
src_excs=(
FileNotFoundError,
),
addr=addr
):
try:
stream = await open_unix_socket_w_passcred(
str(sockpath),
**kwargs
)
except (
FileNotFoundError,
) as fdne:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {sockpath}\n'
) from fdne
tpt_stream = MsgpackUDSStream(
stream = MsgpackUDSStream(
stream,
prefix_size=prefix_size,
codec=codec
)
# XXX assign from new addrs after peer-PID extract!
(
tpt_stream._laddr,
tpt_stream._raddr,
) = cls.get_stream_addrs(stream)
return tpt_stream
stream._raddr = addr
return stream
@classmethod
def get_stream_addrs(

View File

@ -130,7 +130,6 @@ class LinkedTaskChannel(
_trio_task: trio.Task
_aio_task_complete: trio.Event
_closed_by_aio_task: bool = False
_suppress_graceful_exits: bool = True
_trio_err: BaseException|None = None
@ -209,15 +208,10 @@ class LinkedTaskChannel(
async def aclose(self) -> None:
await self._from_aio.aclose()
# ?TODO? async version of this?
def started_nowait(
def started(
self,
val: Any = None,
) -> None:
'''
Synchronize aio-side with its trio-parent.
'''
self._aio_started_val = val
return self._to_trio.send_nowait(val)
@ -248,7 +242,6 @@ class LinkedTaskChannel(
# cycle on the trio side?
# await trio.lowlevel.checkpoint()
return await self._from_aio.receive()
except BaseException as err:
async with translate_aio_errors(
chan=self,
@ -326,7 +319,7 @@ def _run_asyncio_task(
qsize: int = 1,
provide_channels: bool = False,
suppress_graceful_exits: bool = True,
hide_tb: bool = True,
hide_tb: bool = False,
**kwargs,
) -> LinkedTaskChannel:
@ -354,6 +347,18 @@ def _run_asyncio_task(
# value otherwise it would just return ;P
assert qsize > 1
if provide_channels:
assert 'to_trio' in args
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = from_trio
coro = func(**kwargs)
trio_task: trio.Task = trio.lowlevel.current_task()
trio_cs = trio.CancelScope()
aio_task_complete = trio.Event()
@ -368,25 +373,6 @@ def _run_asyncio_task(
_suppress_graceful_exits=suppress_graceful_exits,
)
# allow target func to accept/stream results manually by name
if 'to_trio' in args:
kwargs['to_trio'] = to_trio
if 'from_trio' in args:
kwargs['from_trio'] = from_trio
if 'chan' in args:
kwargs['chan'] = chan
if provide_channels:
assert (
'to_trio' in args
or
'chan' in args
)
coro = func(**kwargs)
async def wait_on_coro_final_result(
to_trio: trio.MemorySendChannel,
coro: Awaitable,
@ -459,23 +445,9 @@ def _run_asyncio_task(
f'Task exited with final result: {result!r}\n'
)
# XXX ALWAYS close the child-`asyncio`-task-side's
# `to_trio` handle which will in turn relay
# a `trio.EndOfChannel` to the `trio`-parent.
# Consequently the parent `trio` task MUST ALWAYS
# check for any `chan._aio_err` to be raised when it
# receives an EoC.
#
# NOTE, there are 2 EoC cases,
# - normal/graceful EoC due to the aio-side actually
# terminating its "streaming", but the task did not
# error and is not yet complete.
#
# - the aio-task terminated and we specially mark the
# closure as due to the `asyncio.Task`'s exit.
#
# only close the sender side which will relay
# a `trio.EndOfChannel` to the trio (consumer) side.
to_trio.close()
chan._closed_by_aio_task = True
aio_task_complete.set()
log.runtime(
@ -673,9 +645,8 @@ def _run_asyncio_task(
not trio_cs.cancel_called
):
log.cancel(
f'Cancelling trio-side due to aio-side src exc\n'
f'\n'
f'{curr_aio_err!r}\n'
f'Cancelling `trio` side due to aio-side src exc\n'
f'{curr_aio_err}\n'
f'\n'
f'(c>\n'
f' |_{trio_task}\n'
@ -787,7 +758,6 @@ async def translate_aio_errors(
aio_done_before_trio: bool = aio_task.done()
assert aio_task
trio_err: BaseException|None = None
eoc: trio.EndOfChannel|None = None
try:
yield # back to one of the cross-loop apis
except trio.Cancelled as taskc:
@ -819,48 +789,12 @@ async def translate_aio_errors(
# )
# raise
# XXX EoC is a special SIGNAL from the aio-side here!
# There are 2 cases to handle:
# 1. the "EoC passthrough" case.
# - the aio-task actually closed the channel "gracefully" and
# the trio-task should unwind any ongoing channel
# iteration/receiving,
# |_this exc-translator wraps calls to `LinkedTaskChannel.receive()`
# in which case we want to relay the actual "end-of-chan" for
# iteration purposes.
#
# 2. relaying the "asyncio.Task termination" case.
# - if the aio-task terminates, maybe with an error, AND the
# `open_channel_from()` API was used, it will always signal
# that termination.
# |_`wait_on_coro_final_result()` always calls
# `to_trio.close()` when `provide_channels=True` so we need to
# always check if there is an aio-side exc which needs to be
# relayed to the parent trio side!
# |_in this case the special `chan._closed_by_aio_task` is
# ALWAYS set.
#
except trio.EndOfChannel as _eoc:
eoc = _eoc
if (
chan._closed_by_aio_task
and
aio_err
):
log.cancel(
f'The asyncio-child task terminated due to error\n'
f'{aio_err!r}\n'
)
chan._trio_to_raise = aio_err
trio_err = chan._trio_err = eoc
#
# ?TODO?, raise something like a,
# chan._trio_to_raise = AsyncioErrored()
# BUT, with the tb rewritten to reflect the underlying
# call stack?
else:
trio_err = chan._trio_err = eoc
# XXX always passthrough EoC since this translator is often
# called from `LinkedTaskChannel.receive()` which we want
# passthrough and further we have no special meaning for it in
# terms of relaying errors or signals from the aio side!
except trio.EndOfChannel as eoc:
trio_err = chan._trio_err = eoc
raise eoc
# NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio
@ -1113,7 +1047,7 @@ async def translate_aio_errors(
#
if wait_on_aio_task:
await chan._aio_task_complete.wait()
log.debug(
log.info(
'asyncio-task is done and unblocked trio-side!\n'
)
@ -1130,17 +1064,11 @@ async def translate_aio_errors(
trio_to_raise: (
AsyncioCancelled|
AsyncioTaskExited|
Exception| # relayed from aio-task
None
) = chan._trio_to_raise
raise_from: Exception = (
trio_err if (aio_err is trio_to_raise)
else aio_err
)
if not suppress_graceful_exits:
raise trio_to_raise from raise_from
raise trio_to_raise from (aio_err or trio_err)
if trio_to_raise:
match (
@ -1173,7 +1101,7 @@ async def translate_aio_errors(
)
return
case _:
raise trio_to_raise from raise_from
raise trio_to_raise from (aio_err or trio_err)
# Check if the asyncio-side is the cause of the trio-side
# error.
@ -1239,6 +1167,7 @@ async def run_task(
@acm
async def open_channel_from(
target: Callable[..., Any],
suppress_graceful_exits: bool = True,
**target_kwargs,
@ -1272,6 +1201,7 @@ async def open_channel_from(
# deliver stream handle upward
yield first, chan
except trio.Cancelled as taskc:
# await tractor.pause(shield=True) # ya it worx ;)
if cs.cancel_called:
if isinstance(chan._trio_to_raise, AsyncioCancelled):
log.cancel(

View File

@ -41,9 +41,6 @@ import trio
from tractor._state import current_actor
from tractor.log import get_logger
# from ._beg import collapse_eg
# from ._taskc import (
# maybe_raise_from_masking_exc,
# )
if TYPE_CHECKING:
@ -109,9 +106,6 @@ async def _enter_and_wait(
async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]],
# caller can provide their own scope
tn: trio.Nursery|None = None,
) -> AsyncGenerator[
tuple[
T | None,
@ -154,45 +148,39 @@ async def gather_contexts(
'`.trionics.gather_contexts()` input mngrs is empty?\n'
'\n'
'Did try to use inline generator syntax?\n'
'Check that list({mngrs}) works!\n'
# 'or sequence-type intead!\n'
# 'Use a non-lazy iterator or sequence-type intead!\n'
'Use a non-lazy iterator or sequence-type intead!\n'
)
try:
async with (
#
# ?TODO, does including these (eg-collapsing,
# taskc-unmasking) improve tb noise-reduction/legibility?
#
# collapse_eg(),
maybe_open_nursery(
nursery=tn,
) as tn,
# maybe_raise_from_masking_exc(),
):
for mngr in mngrs:
tn.start_soon(
_enter_and_wait,
mngr,
unwrapped,
all_entered,
parent_exit,
seed,
)
async with (
# collapse_eg(),
trio.open_nursery(
strict_exception_groups=False,
# ^XXX^ TODO? soo roll our own then ??
# -> since we kinda want the "if only one `.exception` then
# just raise that" interface?
) as tn,
):
for mngr in mngrs:
tn.start_soon(
_enter_and_wait,
mngr,
unwrapped,
all_entered,
parent_exit,
seed,
)
# deliver control to caller once all ctx-managers have
# started (yielded back to us).
await all_entered.wait()
# deliver control once all managers have started up
await all_entered.wait()
try:
yield tuple(unwrapped.values())
finally:
# XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug:
# <tractorbugurlhere>
parent_exit.set()
finally:
# XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug:
# <tractorbugurlhere>
parent_exit.set()
# Per actor task caching helpers.
# Further potential examples of interest:
@ -204,7 +192,7 @@ class _Cache:
a kept-alive-while-in-use async resource.
'''
service_tn: Optional[trio.Nursery] = None
service_n: Optional[trio.Nursery] = None
locks: dict[Hashable, trio.Lock] = {}
users: int = 0
values: dict[Any, Any] = {}
@ -245,9 +233,6 @@ async def maybe_open_context(
kwargs: dict = {},
key: Hashable | Callable[..., Hashable] = None,
# caller can provide their own scope
tn: trio.Nursery|None = None,
) -> AsyncIterator[tuple[bool, T]]:
'''
Maybe open an async-context-manager (acm) if there is not already
@ -280,94 +265,40 @@ async def maybe_open_context(
# have it not be closed until all consumers have exited (which is
# currently difficult to implement any other way besides using our
# pre-allocated runtime instance..)
if tn:
# TODO, assert tn is eventual parent of this task!
task: trio.Task = trio.lowlevel.current_task()
task_tn: trio.Nursery = task.parent_nursery
if not tn._cancel_status.encloses(
task_tn._cancel_status
):
raise RuntimeError(
f'Mis-nesting of task under provided {tn} !?\n'
f'Current task is NOT a child(-ish)!!\n'
f'\n'
f'task: {task}\n'
f'task_tn: {task_tn}\n'
)
service_tn = tn
else:
service_tn: trio.Nursery = current_actor()._service_tn
service_n: trio.Nursery = current_actor()._service_n
# TODO: is there any way to allocate
# a 'stays-open-till-last-task-finshed nursery?
# service_tn: trio.Nursery
# async with maybe_open_nursery(_Cache.service_tn) as service_tn:
# _Cache.service_tn = service_tn
# service_n: trio.Nursery
# async with maybe_open_nursery(_Cache.service_n) as service_n:
# _Cache.service_n = service_n
cache_miss_ke: KeyError|None = None
maybe_taskc: trio.Cancelled|None = None
try:
# **critical section** that should prevent other tasks from
# checking the _Cache until complete otherwise the scheduler
# may switch and by accident we create more then one resource.
yielded = _Cache.values[ctx_key]
except KeyError as _ke:
# XXX, stay mutexed up to cache-miss yield
try:
cache_miss_ke = _ke
log.debug(
f'Allocating new @acm-func entry\n'
f'ctx_key={ctx_key}\n'
f'acm_func={acm_func}\n'
)
mngr = acm_func(**kwargs)
resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
resources[ctx_key] = (service_tn, trio.Event())
yielded: Any = await service_tn.start(
_Cache.run_ctx,
mngr,
ctx_key,
)
_Cache.users += 1
finally:
# XXX, since this runs from an `except` it's a checkpoint
# whih can be `trio.Cancelled`-masked.
#
# NOTE, in that case the mutex is never released by the
# (first and) caching task and **we can't** simply shield
# bc that will inf-block on the `await
# no_more_users.wait()`.
#
# SO just always unlock!
lock.release()
except KeyError:
log.debug(f'Allocating new {acm_func} for {ctx_key}')
mngr = acm_func(**kwargs)
resources = _Cache.resources
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
resources[ctx_key] = (service_n, trio.Event())
try:
yield (
False, # cache_hit = "no"
yielded,
)
except trio.Cancelled as taskc:
maybe_taskc = taskc
log.cancel(
f'Cancelled from cache-miss entry\n'
f'\n'
f'ctx_key: {ctx_key!r}\n'
f'mngr: {mngr!r}\n'
)
# XXX, always unset ke from cancelled context
# since we never consider it a masked exc case!
# - bc this can be called directly ty `._rpc._invoke()`?
#
if maybe_taskc.__context__ is cache_miss_ke:
maybe_taskc.__context__ = None
raise taskc
# sync up to the mngr's yielded value
yielded = await service_n.start(
_Cache.run_ctx,
mngr,
ctx_key,
)
_Cache.users += 1
lock.release()
yield False, yielded
else:
_Cache.users += 1
log.debug(
log.runtime(
f'Re-using cached resource for user {_Cache.users}\n\n'
f'{ctx_key!r} -> {type(yielded)}\n'
@ -377,19 +308,9 @@ async def maybe_open_context(
# f'{ctx_key!r} -> {yielded!r}\n'
)
lock.release()
yield (
True, # cache_hit = "yes"
yielded,
)
yield True, yielded
finally:
if lock.locked():
stats: trio.LockStatistics = lock.statistics()
log.error(
f'Lock left locked by last owner !?\n'
f'{stats}\n'
)
_Cache.users -= 1
if yielded is not None: