forked from goodboy/tractor
1
0
Fork 0

Merge pull request #392 from goodboy/introspect_ipc

Introspect-ipc: some `.ipc` subpkg iface refinements for reading cancel statuses and `Address.__repr__()`
Bd 2025-08-18 22:15:40 -04:00 committed by GitHub
commit a9aa5ec04e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 392 additions and 129 deletions

View File

@ -0,0 +1,114 @@
'''
Unit-ish tests for specific IPC transport protocol backends.
'''
from __future__ import annotations
from pathlib import Path
import pytest
import trio
import tractor
from tractor import (
Actor,
_state,
_addr,
)
@pytest.fixture
def bindspace_dir_str() -> str:
rt_dir: Path = tractor._state.get_rt_dir()
bs_dir: Path = rt_dir / 'doggy'
bs_dir_str: str = str(bs_dir)
assert not bs_dir.is_dir()
yield bs_dir_str
# delete it on suite teardown.
# ?TODO? should we support this internally
# or is leaking it ok?
if bs_dir.is_dir():
bs_dir.rmdir()
def test_uds_bindspace_created_implicitly(
debug_mode: bool,
bindspace_dir_str: str,
):
registry_addr: tuple = (
f'{bindspace_dir_str}',
'registry@doggy.sock',
)
bs_dir_str: str = registry_addr[0]
# XXX, ensure bindspace-dir DNE beforehand!
assert not Path(bs_dir_str).is_dir()
async def main():
async with tractor.open_nursery(
enable_transports=['uds'],
registry_addrs=[registry_addr],
debug_mode=debug_mode,
) as _an:
# XXX MUST be created implicitly by
# `.ipc._uds.start_listener()`!
assert Path(bs_dir_str).is_dir()
root: Actor = tractor.current_actor()
assert root.is_registrar
assert registry_addr in root.reg_addrs
assert (
registry_addr
in
_state._runtime_vars['_registry_addrs']
)
assert (
_addr.wrap_address(registry_addr)
in
root.registry_addrs
)
trio.run(main)
def test_uds_double_listen_raises_connerr(
debug_mode: bool,
bindspace_dir_str: str,
):
registry_addr: tuple = (
f'{bindspace_dir_str}',
'registry@doggy.sock',
)
async def main():
async with tractor.open_nursery(
enable_transports=['uds'],
registry_addrs=[registry_addr],
debug_mode=debug_mode,
) as _an:
# runtime up
root: Actor = tractor.current_actor()
from tractor.ipc._uds import (
start_listener,
UDSAddress,
)
ya_bound_addr: UDSAddress = root.registry_addrs[0]
try:
await start_listener(
addr=ya_bound_addr,
)
except ConnectionError as connerr:
assert type(src_exc := connerr.__context__) is OSError
assert 'Address already in use' in src_exc.args
# complete, exit test.
else:
pytest.fail('It dint raise a connerr !?')
trio.run(main)

View File

@ -154,7 +154,7 @@ class Context:
2 cancel-scope-linked, communicating and parallel executing 2 cancel-scope-linked, communicating and parallel executing
`Task`s. Contexts are allocated on each side of any task `Task`s. Contexts are allocated on each side of any task
RPC-linked msg dialog, i.e. for every request to a remote RPC-linked msg dialog, i.e. for every request to a remote
actor from a `Portal`. On the "callee" side a context is actor from a `Portal`. On the "child" side a context is
always allocated inside `._rpc._invoke()`. always allocated inside `._rpc._invoke()`.
TODO: more detailed writeup on cancellation, error and TODO: more detailed writeup on cancellation, error and
@ -222,8 +222,8 @@ class Context:
# `._runtime.invoke()`. # `._runtime.invoke()`.
_remote_func_type: str | None = None _remote_func_type: str | None = None
# NOTE: (for now) only set (a portal) on the caller side since # NOTE: (for now) only set (a portal) on the parent side since
# the callee doesn't generally need a ref to one and should # the child doesn't generally need a ref to one and should
# normally need to explicitly ask for handle to its peer if # normally need to explicitly ask for handle to its peer if
# more the the `Context` is needed? # more the the `Context` is needed?
_portal: Portal | None = None _portal: Portal | None = None
@ -252,12 +252,12 @@ class Context:
_outcome_msg: Return|Error|ContextCancelled = Unresolved _outcome_msg: Return|Error|ContextCancelled = Unresolved
# on a clean exit there should be a final value # on a clean exit there should be a final value
# delivered from the far end "callee" task, so # delivered from the far end "child" task, so
# this value is only set on one side. # this value is only set on one side.
# _result: Any | int = None # _result: Any | int = None
_result: PayloadT|Unresolved = Unresolved _result: PayloadT|Unresolved = Unresolved
# if the local "caller" task errors this value is always set # if the local "parent" task errors this value is always set
# to the error that was captured in the # to the error that was captured in the
# `Portal.open_context().__aexit__()` teardown block OR, in # `Portal.open_context().__aexit__()` teardown block OR, in
# 2 special cases when an (maybe) expected remote error # 2 special cases when an (maybe) expected remote error
@ -293,7 +293,7 @@ class Context:
# a `ContextCancelled` due to a call to `.cancel()` triggering # a `ContextCancelled` due to a call to `.cancel()` triggering
# "graceful closure" on either side: # "graceful closure" on either side:
# - `._runtime._invoke()` will check this flag before engaging # - `._runtime._invoke()` will check this flag before engaging
# the crash handler REPL in such cases where the "callee" # the crash handler REPL in such cases where the "child"
# raises the cancellation, # raises the cancellation,
# - `.devx.debug.lock_stdio_for_peer()` will set it to `False` if # - `.devx.debug.lock_stdio_for_peer()` will set it to `False` if
# the global tty-lock has been configured to filter out some # the global tty-lock has been configured to filter out some
@ -307,8 +307,8 @@ class Context:
_stream_opened: bool = False _stream_opened: bool = False
_stream: MsgStream|None = None _stream: MsgStream|None = None
# caller of `Portal.open_context()` for # the parent-task's calling-fn's frame-info, the frame above
# logging purposes mostly # `Portal.open_context()`, for introspection/logging.
_caller_info: CallerInfo|None = None _caller_info: CallerInfo|None = None
# overrun handling machinery # overrun handling machinery
@ -529,11 +529,11 @@ class Context:
''' '''
Exactly the value of `self._scope.cancelled_caught` Exactly the value of `self._scope.cancelled_caught`
(delegation) and should only be (able to be read as) (delegation) and should only be (able to be read as)
`True` for a `.side == "caller"` ctx wherein the `True` for a `.side == "parent"` ctx wherein the
`Portal.open_context()` block was exited due to a call to `Portal.open_context()` block was exited due to a call to
`._scope.cancel()` - which should only ocurr in 2 cases: `._scope.cancel()` - which should only ocurr in 2 cases:
- a caller side calls `.cancel()`, the far side cancels - a parent side calls `.cancel()`, the far side cancels
and delivers back a `ContextCancelled` (making and delivers back a `ContextCancelled` (making
`.cancel_acked == True`) and `._scope.cancel()` is `.cancel_acked == True`) and `._scope.cancel()` is
called by `._maybe_cancel_and_set_remote_error()` which called by `._maybe_cancel_and_set_remote_error()` which
@ -542,20 +542,20 @@ class Context:
=> `._scope.cancelled_caught == True` by normal `trio` => `._scope.cancelled_caught == True` by normal `trio`
cs semantics. cs semantics.
- a caller side is delivered a `._remote_error: - a parent side is delivered a `._remote_error:
RemoteActorError` via `._deliver_msg()` and a transitive RemoteActorError` via `._deliver_msg()` and a transitive
call to `_maybe_cancel_and_set_remote_error()` calls call to `_maybe_cancel_and_set_remote_error()` calls
`._scope.cancel()` and that cancellation eventually `._scope.cancel()` and that cancellation eventually
results in `trio.Cancelled`(s) caught in the results in `trio.Cancelled`(s) caught in the
`.open_context()` handling around the @acm's `yield`. `.open_context()` handling around the @acm's `yield`.
Only as an FYI, in the "callee" side case it can also be Only as an FYI, in the "child" side case it can also be
set but never is readable by any task outside the RPC set but never is readable by any task outside the RPC
machinery in `._invoke()` since,: machinery in `._invoke()` since,:
- when a callee side calls `.cancel()`, `._scope.cancel()` - when a child side calls `.cancel()`, `._scope.cancel()`
is called immediately and handled specially inside is called immediately and handled specially inside
`._invoke()` to raise a `ContextCancelled` which is then `._invoke()` to raise a `ContextCancelled` which is then
sent to the caller side. sent to the parent side.
However, `._scope.cancelled_caught` can NEVER be However, `._scope.cancelled_caught` can NEVER be
accessed/read as `True` by any RPC invoked task since it accessed/read as `True` by any RPC invoked task since it
@ -666,7 +666,7 @@ class Context:
when called/closed by actor local task(s). when called/closed by actor local task(s).
NOTEs: NOTEs:
- It is expected that the caller has previously unwrapped - It is expected that the parent has previously unwrapped
the remote error using a call to `unpack_error()` and the remote error using a call to `unpack_error()` and
provides that output exception value as the input provides that output exception value as the input
`error` argument *here*. `error` argument *here*.
@ -676,7 +676,7 @@ class Context:
`Portal.open_context()` (ideally) we want to interrupt `Portal.open_context()` (ideally) we want to interrupt
any ongoing local tasks operating within that any ongoing local tasks operating within that
`Context`'s cancel-scope so as to be notified ASAP of `Context`'s cancel-scope so as to be notified ASAP of
the remote error and engage any caller handling (eg. the remote error and engage any parent handling (eg.
for cross-process task supervision). for cross-process task supervision).
- In some cases we may want to raise the remote error - In some cases we may want to raise the remote error
@ -886,6 +886,11 @@ class Context:
@property @property
def repr_caller(self) -> str: def repr_caller(self) -> str:
'''
Render a "namespace-path" style representation of the calling
task-fn.
'''
ci: CallerInfo|None = self._caller_info ci: CallerInfo|None = self._caller_info
if ci: if ci:
return ( return (
@ -899,7 +904,7 @@ class Context:
def repr_api(self) -> str: def repr_api(self) -> str:
return 'Portal.open_context()' return 'Portal.open_context()'
# TODO: use `.dev._frame_stack` scanning to find caller! # TODO: use `.dev._frame_stack` scanning to find caller fn!
# ci: CallerInfo|None = self._caller_info # ci: CallerInfo|None = self._caller_info
# if ci: # if ci:
# return ( # return (
@ -934,7 +939,7 @@ class Context:
=> That is, an IPC `Context` (this) **does not** => That is, an IPC `Context` (this) **does not**
have the same semantics as a `trio.CancelScope`. have the same semantics as a `trio.CancelScope`.
If the caller (who entered the `Portal.open_context()`) If the parent (who entered the `Portal.open_context()`)
desires that the internal block's cancel-scope be desires that the internal block's cancel-scope be
cancelled it should open its own `trio.CancelScope` and cancelled it should open its own `trio.CancelScope` and
manage it as needed. manage it as needed.
@ -1006,7 +1011,6 @@ class Context:
else: else:
log.cancel( log.cancel(
f'Timed out on cancel request of remote task?\n' f'Timed out on cancel request of remote task?\n'
f'\n'
f'{reminfo}' f'{reminfo}'
) )
@ -1017,7 +1021,7 @@ class Context:
# `_invoke()` RPC task. # `_invoke()` RPC task.
# #
# NOTE: on this side we ALWAYS cancel the local scope # NOTE: on this side we ALWAYS cancel the local scope
# since the caller expects a `ContextCancelled` to be sent # since the parent expects a `ContextCancelled` to be sent
# from `._runtime._invoke()` back to the other side. The # from `._runtime._invoke()` back to the other side. The
# logic for catching the result of the below # logic for catching the result of the below
# `._scope.cancel()` is inside the `._runtime._invoke()` # `._scope.cancel()` is inside the `._runtime._invoke()`
@ -1190,8 +1194,8 @@ class Context:
) -> Any|Exception: ) -> Any|Exception:
''' '''
From some (caller) side task, wait for and return the final From some (parent) side task, wait for and return the final
result from the remote (callee) side's task. result from the remote (child) side's task.
This provides a mechanism for one task running in some actor to wait This provides a mechanism for one task running in some actor to wait
on another task at the other side, in some other actor, to terminate. on another task at the other side, in some other actor, to terminate.
@ -1487,6 +1491,12 @@ class Context:
): ):
status = 'peer-cancelled' status = 'peer-cancelled'
case (
Unresolved,
trio.Cancelled(), # any error-type
) if self.canceller:
status = 'actor-cancelled'
# (remote) error condition # (remote) error condition
case ( case (
Unresolved, Unresolved,
@ -1600,7 +1610,7 @@ class Context:
raise err raise err
# TODO: maybe a flag to by-pass encode op if already done # TODO: maybe a flag to by-pass encode op if already done
# here in caller? # here in parent?
await self.chan.send(started_msg) await self.chan.send(started_msg)
# set msg-related internal runtime-state # set msg-related internal runtime-state
@ -1676,7 +1686,7 @@ class Context:
XXX RULES XXX XXX RULES XXX
------ - ------ ------ - ------
- NEVER raise remote errors from this method; a runtime task caller. - NEVER raise remote errors from this method; a calling runtime-task.
An error "delivered" to a ctx should always be raised by An error "delivered" to a ctx should always be raised by
the corresponding local task operating on the the corresponding local task operating on the
`Portal`/`Context` APIs. `Portal`/`Context` APIs.
@ -1752,7 +1762,7 @@ class Context:
else: else:
report = ( report = (
'Queueing OVERRUN msg on caller task:\n\n' 'Queueing OVERRUN msg on parent task:\n\n'
+ report + report
) )
log.debug(report) log.debug(report)
@ -1948,12 +1958,12 @@ async def open_context_from_portal(
IPC protocol. IPC protocol.
The yielded `tuple` is a pair delivering a `tractor.Context` The yielded `tuple` is a pair delivering a `tractor.Context`
and any first value "sent" by the "callee" task via a call and any first value "sent" by the "child" task via a call
to `Context.started(<value: Any>)`; this side of the to `Context.started(<value: Any>)`; this side of the
context does not unblock until the "callee" task calls context does not unblock until the "child" task calls
`.started()` in similar style to `trio.Nursery.start()`. `.started()` in similar style to `trio.Nursery.start()`.
When the "callee" (side that is "called"/started by a call When the "child" (side that is "called"/started by a call
to *this* method) returns, the caller side (this) unblocks to *this* method) returns, the parent side (this) unblocks
and any final value delivered from the other end can be and any final value delivered from the other end can be
retrieved using the `Contex.wait_for_result()` api. retrieved using the `Contex.wait_for_result()` api.
@ -1966,7 +1976,7 @@ async def open_context_from_portal(
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# denote this frame as a "runtime frame" for stack # denote this frame as a "runtime frame" for stack
# introspection where we report the caller code in logging # introspection where we report the parent code in logging
# and error message content. # and error message content.
# NOTE: 2 bc of the wrapping `@acm` # NOTE: 2 bc of the wrapping `@acm`
__runtimeframe__: int = 2 # noqa __runtimeframe__: int = 2 # noqa
@ -2025,7 +2035,7 @@ async def open_context_from_portal(
# placeholder for any exception raised in the runtime # placeholder for any exception raised in the runtime
# or by user tasks which cause this context's closure. # or by user tasks which cause this context's closure.
scope_err: BaseException|None = None scope_err: BaseException|None = None
ctxc_from_callee: ContextCancelled|None = None ctxc_from_child: ContextCancelled|None = None
try: try:
async with ( async with (
collapse_eg(), collapse_eg(),
@ -2104,7 +2114,7 @@ async def open_context_from_portal(
# that we can re-use it around the `yield` ^ here # that we can re-use it around the `yield` ^ here
# or vice versa? # or vice versa?
# #
# maybe TODO NOTE: between the caller exiting and # maybe TODO NOTE: between the parent exiting and
# arriving here the far end may have sent a ctxc-msg or # arriving here the far end may have sent a ctxc-msg or
# other error, so the quetion is whether we should check # other error, so the quetion is whether we should check
# for it here immediately and maybe raise so as to engage # for it here immediately and maybe raise so as to engage
@ -2170,16 +2180,16 @@ async def open_context_from_portal(
# request in which case we DO let the error bubble to the # request in which case we DO let the error bubble to the
# opener. # opener.
# #
# 2-THIS "caller" task somewhere invoked `Context.cancel()` # 2-THIS "parent" task somewhere invoked `Context.cancel()`
# and received a `ContextCanclled` from the "callee" # and received a `ContextCanclled` from the "child"
# task, in which case we mask the `ContextCancelled` from # task, in which case we mask the `ContextCancelled` from
# bubbling to this "caller" (much like how `trio.Nursery` # bubbling to this "parent" (much like how `trio.Nursery`
# swallows any `trio.Cancelled` bubbled by a call to # swallows any `trio.Cancelled` bubbled by a call to
# `Nursery.cancel_scope.cancel()`) # `Nursery.cancel_scope.cancel()`)
except ContextCancelled as ctxc: except ContextCancelled as ctxc:
scope_err = ctxc scope_err = ctxc
ctx._local_error: BaseException = scope_err ctx._local_error: BaseException = scope_err
ctxc_from_callee = ctxc ctxc_from_child = ctxc
# XXX TODO XXX: FIX THIS debug_mode BUGGGG!!! # XXX TODO XXX: FIX THIS debug_mode BUGGGG!!!
# using this code and then resuming the REPL will # using this code and then resuming the REPL will
@ -2216,11 +2226,11 @@ async def open_context_from_portal(
# the above `._scope` can be cancelled due to: # the above `._scope` can be cancelled due to:
# 1. an explicit self cancel via `Context.cancel()` or # 1. an explicit self cancel via `Context.cancel()` or
# `Actor.cancel()`, # `Actor.cancel()`,
# 2. any "callee"-side remote error, possibly also a cancellation # 2. any "child"-side remote error, possibly also a cancellation
# request by some peer, # request by some peer,
# 3. any "caller" (aka THIS scope's) local error raised in the above `yield` # 3. any "parent" (aka THIS scope's) local error raised in the above `yield`
except ( except (
# CASE 3: standard local error in this caller/yieldee # CASE 3: standard local error in this parent/yieldee
Exception, Exception,
# CASES 1 & 2: can manifest as a `ctx._scope_nursery` # CASES 1 & 2: can manifest as a `ctx._scope_nursery`
@ -2234,9 +2244,9 @@ async def open_context_from_portal(
# any `Context._maybe_raise_remote_err()` call. # any `Context._maybe_raise_remote_err()` call.
# #
# 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]` # 2.-`BaseExceptionGroup[ContextCancelled | RemoteActorError]`
# from any error delivered from the "callee" side # from any error delivered from the "child" side
# AND a group-exc is only raised if there was > 1 # AND a group-exc is only raised if there was > 1
# tasks started *here* in the "caller" / opener # tasks started *here* in the "parent" / opener
# block. If any one of those tasks calls # block. If any one of those tasks calls
# `.wait_for_result()` or `MsgStream.receive()` # `.wait_for_result()` or `MsgStream.receive()`
# `._maybe_raise_remote_err()` will be transitively # `._maybe_raise_remote_err()` will be transitively
@ -2249,8 +2259,8 @@ async def open_context_from_portal(
trio.Cancelled, # NOTE: NOT from inside the ctx._scope trio.Cancelled, # NOTE: NOT from inside the ctx._scope
KeyboardInterrupt, KeyboardInterrupt,
) as caller_err: ) as rent_err:
scope_err = caller_err scope_err = rent_err
ctx._local_error: BaseException = scope_err ctx._local_error: BaseException = scope_err
# XXX: ALWAYS request the context to CANCEL ON any ERROR. # XXX: ALWAYS request the context to CANCEL ON any ERROR.
@ -2268,7 +2278,7 @@ async def open_context_from_portal(
logmeth = log.exception logmeth = log.exception
logmeth( logmeth(
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()}\n' f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n'
) )
if debug_mode(): if debug_mode():
@ -2289,9 +2299,9 @@ async def open_context_from_portal(
'Calling `ctx.cancel()`!\n' 'Calling `ctx.cancel()`!\n'
) )
# we don't need to cancel the callee if it already # we don't need to cancel the child if it already
# told us it's cancelled ;p # told us it's cancelled ;p
if ctxc_from_callee is None: if ctxc_from_child is None:
try: try:
await ctx.cancel() await ctx.cancel()
except ( except (
@ -2322,8 +2332,8 @@ async def open_context_from_portal(
# via a call to # via a call to
# `Context._maybe_cancel_and_set_remote_error()`. # `Context._maybe_cancel_and_set_remote_error()`.
# As per `Context._deliver_msg()`, that error IS # As per `Context._deliver_msg()`, that error IS
# ALWAYS SET any time "callee" side fails and causes "caller # ALWAYS SET any time "child" side fails and causes
# side" cancellation via a `ContextCancelled` here. # "parent side" cancellation via a `ContextCancelled` here.
try: try:
result_or_err: Exception|Any = await ctx.wait_for_result() result_or_err: Exception|Any = await ctx.wait_for_result()
except BaseException as berr: except BaseException as berr:
@ -2359,7 +2369,7 @@ async def open_context_from_portal(
) )
case (None, _): case (None, _):
log.runtime( log.runtime(
'Context returned final result from callee task:\n' 'Context returned final result from child task:\n'
f'<= peer: {uid}\n' f'<= peer: {uid}\n'
f' |_ {nsf}()\n\n' f' |_ {nsf}()\n\n'
@ -2454,7 +2464,7 @@ async def open_context_from_portal(
) )
# TODO: should we add a `._cancel_req_received` # TODO: should we add a `._cancel_req_received`
# flag to determine if the callee manually called # flag to determine if the child manually called
# `ctx.cancel()`? # `ctx.cancel()`?
# -[ ] going to need a cid check no? # -[ ] going to need a cid check no?
@ -2510,7 +2520,7 @@ def mk_context(
recv_chan: trio.MemoryReceiveChannel recv_chan: trio.MemoryReceiveChannel
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size) send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
# TODO: only scan caller-info if log level so high! # TODO: only scan parent-info if log level so high!
from .devx._frame_stack import find_caller_info from .devx._frame_stack import find_caller_info
caller_info: CallerInfo|None = find_caller_info() caller_info: CallerInfo|None = find_caller_info()

View File

@ -300,7 +300,7 @@ class Portal:
) )
# XXX the one spot we set it? # XXX the one spot we set it?
self.channel._cancel_called: bool = True chan._cancel_called: bool = True
try: try:
# send cancel cmd - might not get response # send cancel cmd - might not get response
# XXX: sure would be nice to make this work with # XXX: sure would be nice to make this work with

View File

@ -642,7 +642,7 @@ async def _invoke(
tn: Nursery tn: Nursery
rpc_ctx_cs: CancelScope rpc_ctx_cs: CancelScope
async with ( async with (
collapse_eg(), collapse_eg(hide_tb=False),
trio.open_nursery() as tn, trio.open_nursery() as tn,
msgops.maybe_limit_plds( msgops.maybe_limit_plds(
ctx=ctx, ctx=ctx,
@ -823,24 +823,44 @@ async def _invoke(
f'after having {ctx.repr_state!r}\n' f'after having {ctx.repr_state!r}\n'
) )
if merr: if merr:
logmeth: Callable = log.error logmeth: Callable = log.error
if isinstance(merr, ContextCancelled): if (
logmeth: Callable = log.runtime # ctxc: by `Context.cancel()`
isinstance(merr, ContextCancelled)
if not isinstance(merr, RemoteActorError): # out-of-layer cancellation, one of:
tb_str: str = ''.join(traceback.format_exception(merr)) # - actorc: by `Portal.cancel_actor()`
# - OSc: by SIGINT or `Process.signal()`
or (
isinstance(merr, trio.Cancelled)
and
ctx.canceller
)
):
logmeth: Callable = log.cancel
descr_str += (
f' with {merr!r}\n'
)
elif (
not isinstance(merr, RemoteActorError)
):
tb_str: str = ''.join(
traceback.format_exception(merr)
)
descr_str += ( descr_str += (
f'\n{merr!r}\n' # needed? f'\n{merr!r}\n' # needed?
f'{tb_str}\n' f'{tb_str}\n'
f'\n'
f'scope_error:\n'
f'{scope_err!r}\n'
) )
else: else:
descr_str += f'\n{merr!r}\n' descr_str += (
f'{merr!r}\n'
)
else: else:
descr_str += f'\nwith final result {ctx.outcome!r}\n' descr_str += (
f'\n'
f'with final result {ctx.outcome!r}\n'
)
logmeth( logmeth(
f'{message}\n' f'{message}\n'

View File

@ -101,11 +101,27 @@ class Channel:
# ^XXX! ONLY set if a remote actor sends an `Error`-msg # ^XXX! ONLY set if a remote actor sends an `Error`-msg
self._closed: bool = False self._closed: bool = False
# flag set by ``Portal.cancel_actor()`` indicating remote # flag set by `Portal.cancel_actor()` indicating remote
# (possibly peer) cancellation of the far end actor # (possibly peer) cancellation of the far end actor runtime.
# runtime.
self._cancel_called: bool = False self._cancel_called: bool = False
@property
def closed(self) -> bool:
'''
Was `.aclose()` successfully called?
'''
return self._closed
@property
def cancel_called(self) -> bool:
'''
Set when `Portal.cancel_actor()` is called on a portal which
wraps this IPC channel.
'''
return self._cancel_called
@property @property
def uid(self) -> tuple[str, str]: def uid(self) -> tuple[str, str]:
''' '''
@ -169,7 +185,9 @@ class Channel:
addr, addr,
**kwargs, **kwargs,
) )
assert transport.raddr == addr # XXX, for UDS *no!* since we recv the peer-pid and build out
# a new addr..
# assert transport.raddr == addr
chan = Channel(transport=transport) chan = Channel(transport=transport)
# ?TODO, compact this into adapter level-methods? # ?TODO, compact this into adapter level-methods?
@ -285,7 +303,7 @@ class Channel:
self, self,
payload: Any, payload: Any,
hide_tb: bool = True, hide_tb: bool = False,
) -> None: ) -> None:
''' '''

View File

@ -17,29 +17,59 @@
Utils to tame mp non-SC madeness Utils to tame mp non-SC madeness
''' '''
import platform
def disable_mantracker(): def disable_mantracker():
''' '''
Disable all ``multiprocessing``` "resource tracking" machinery since Disable all `multiprocessing` "resource tracking" machinery since
it's an absolute multi-threaded mess of non-SC madness. it's an absolute multi-threaded mess of non-SC madness.
''' '''
from multiprocessing import resource_tracker as mantracker from multiprocessing.shared_memory import SharedMemory
# Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
def unregister(self, name, rtype): # 3.13+ only.. can pass `track=False` to disable
pass # all the resource tracker bs.
# https://docs.python.org/3/library/multiprocessing.shared_memory.html
if (_py_313 := (
platform.python_version_tuple()[:-1]
>=
('3', '13')
)
):
from functools import partial
return partial(
SharedMemory,
track=False,
)
def ensure_running(self): # !TODO, once we drop 3.12- we can obvi remove all this!
pass else:
from multiprocessing import (
resource_tracker as mantracker,
)
# "know your land and know your prey" # Tell the "resource tracker" thing to fuck off.
# https://www.dailymotion.com/video/x6ozzco class ManTracker(mantracker.ResourceTracker):
mantracker._resource_tracker = ManTracker() def register(self, name, rtype):
mantracker.register = mantracker._resource_tracker.register pass
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister def unregister(self, name, rtype):
mantracker.getfd = mantracker._resource_tracker.getfd pass
def ensure_running(self):
pass
# "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
# use std type verbatim
shmT = SharedMemory
return shmT

View File

@ -23,14 +23,15 @@ considered optional within the context of this runtime-library.
""" """
from __future__ import annotations from __future__ import annotations
from multiprocessing import shared_memory as shm
from multiprocessing.shared_memory import (
# SharedMemory,
ShareableList,
)
import platform
from sys import byteorder from sys import byteorder
import time import time
from typing import Optional from typing import Optional
from multiprocessing import shared_memory as shm
from multiprocessing.shared_memory import (
SharedMemory,
ShareableList,
)
from msgspec import ( from msgspec import (
Struct, Struct,
@ -61,7 +62,7 @@ except ImportError:
log = get_logger(__name__) log = get_logger(__name__)
disable_mantracker() SharedMemory = disable_mantracker()
class SharedInt: class SharedInt:
@ -789,11 +790,23 @@ def open_shm_list(
readonly=readonly, readonly=readonly,
) )
# TODO, factor into a @actor_fixture acm-API?
# -[ ] also `@maybe_actor_fixture()` which inludes
# the .current_actor() convenience check?
# |_ orr can that just be in the sin-maybe-version?
#
# "close" attached shm on actor teardown # "close" attached shm on actor teardown
try: try:
actor = tractor.current_actor() actor = tractor.current_actor()
actor.lifetime_stack.callback(shml.shm.close) actor.lifetime_stack.callback(shml.shm.close)
actor.lifetime_stack.callback(shml.shm.unlink)
# XXX on 3.13+ we don't need to call this?
# -> bc we pass `track=False` for `SharedMemeory` orr?
if (
platform.python_version_tuple()[:-1] < ('3', '13')
):
actor.lifetime_stack.callback(shml.shm.unlink)
except RuntimeError: except RuntimeError:
log.warning('tractor runtime not active, skipping teardown steps') log.warning('tractor runtime not active, skipping teardown steps')

View File

@ -430,20 +430,25 @@ class MsgpackTransport(MsgTransport):
return await self.stream.send_all(size + bytes_data) return await self.stream.send_all(size + bytes_data)
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
) as bre: trio.ClosedResourceError,
trans_err = bre ) as _re:
trans_err = _re
tpt_name: str = f'{type(self).__name__!r}' tpt_name: str = f'{type(self).__name__!r}'
match trans_err: match trans_err:
# XXX, specifc to UDS transport and its,
# well, "speediness".. XD
# |_ likely todo with races related to how fast
# the socket is setup/torn-down on linux
# as it pertains to rando pings from the
# `.discovery` subsys and protos.
case trio.BrokenResourceError() if ( case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe' in trans_err.args[0] '[Errno 32] Broken pipe'
# ^XXX, specifc to UDS transport and its, in
# well, "speediness".. XD trans_err.args[0]
# |_ likely todo with races related to how fast
# the socket is setup/torn-down on linux
# as it pertains to rando pings from the
# `.discovery` subsys and protos.
): ):
raise TransportClosed.from_src_exc( tpt_closed = TransportClosed.from_src_exc(
message=( message=(
f'{tpt_name} already closed by peer\n' f'{tpt_name} already closed by peer\n'
), ),
@ -451,14 +456,31 @@ class MsgpackTransport(MsgTransport):
src_exc=trans_err, src_exc=trans_err,
raise_on_report=True, raise_on_report=True,
loglevel='transport', loglevel='transport',
) from bre )
raise tpt_closed from trans_err
# case trio.ClosedResourceError() if (
# 'this socket was already closed'
# in
# trans_err.args[0]
# ):
# tpt_closed = TransportClosed.from_src_exc(
# message=(
# f'{tpt_name} already closed by peer\n'
# ),
# body=f'{self}\n',
# src_exc=trans_err,
# raise_on_report=True,
# loglevel='transport',
# )
# raise tpt_closed from trans_err
# unless the disconnect condition falls under "a # unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn # normal operation breakage" we usualy console warn
# about it. # about it.
case _: case _:
log.exception( log.exception(
'{tpt_name} layer failed pre-send ??\n' f'{tpt_name} layer failed pre-send ??\n'
) )
raise trans_err raise trans_err
@ -503,7 +525,7 @@ class MsgpackTransport(MsgTransport):
def pformat(self) -> str: def pformat(self) -> str:
return ( return (
f'<{type(self).__name__}(\n' f'<{type(self).__name__}(\n'
f' |_peers: 2\n' f' |_peers: 1\n'
f' laddr: {self._laddr}\n' f' laddr: {self._laddr}\n'
f' raddr: {self._raddr}\n' f' raddr: {self._raddr}\n'
# f'\n' # f'\n'

View File

@ -18,6 +18,9 @@ Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protoco
''' '''
from __future__ import annotations from __future__ import annotations
from contextlib import (
contextmanager as cm,
)
from pathlib import Path from pathlib import Path
import os import os
from socket import ( from socket import (
@ -29,6 +32,7 @@ from socket import (
) )
import struct import struct
from typing import ( from typing import (
Type,
TYPE_CHECKING, TYPE_CHECKING,
ClassVar, ClassVar,
) )
@ -99,8 +103,6 @@ class UDSAddress(
self.filedir self.filedir
or or
self.def_bindspace self.def_bindspace
# or
# get_rt_dir()
) )
@property @property
@ -205,12 +207,35 @@ class UDSAddress(
f']' f']'
) )
@cm
def _reraise_as_connerr(
src_excs: tuple[Type[Exception]],
addr: UDSAddress,
):
try:
yield
except src_excs as src_exc:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {addr.sockpath}\n'
f'\n'
f'from src: {src_exc!r}\n'
) from src_exc
async def start_listener( async def start_listener(
addr: UDSAddress, addr: UDSAddress,
**kwargs, **kwargs,
) -> SocketListener: ) -> SocketListener:
# sock = addr._sock = socket.socket( '''
Start listening for inbound connections via
a `trio.SocketListener` (task) which `socket.bind()`s on `addr`.
Note, if the `UDSAddress.bindspace: Path` directory dne it is
implicitly created.
'''
sock = socket.socket( sock = socket.socket(
socket.AF_UNIX, socket.AF_UNIX,
socket.SOCK_STREAM socket.SOCK_STREAM
@ -221,17 +246,25 @@ async def start_listener(
f'|_{addr}\n' f'|_{addr}\n'
) )
# ?TODO? should we use the `actor.lifetime_stack`
# to rm on shutdown?
bindpath: Path = addr.sockpath bindpath: Path = addr.sockpath
try: if not (bs := addr.bindspace).is_dir():
log.info(
'Creating bindspace dir in file-sys\n'
f'>{{\n'
f'|_{bs!r}\n'
)
bs.mkdir()
with _reraise_as_connerr(
src_excs=(
FileNotFoundError,
OSError,
),
addr=addr
):
await sock.bind(str(bindpath)) await sock.bind(str(bindpath))
except (
FileNotFoundError,
) as fdne:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {addr.sockpath}\n'
) from fdne
sock.listen(1) sock.listen(1)
log.info( log.info(
@ -356,27 +389,30 @@ class MsgpackUDSStream(MsgpackTransport):
# `.setsockopt()` call tells the OS provide it; the client # `.setsockopt()` call tells the OS provide it; the client
# pid can then be read on server/listen() side via # pid can then be read on server/listen() side via
# `get_peer_info()` above. # `get_peer_info()` above.
try:
with _reraise_as_connerr(
src_excs=(
FileNotFoundError,
),
addr=addr
):
stream = await open_unix_socket_w_passcred( stream = await open_unix_socket_w_passcred(
str(sockpath), str(sockpath),
**kwargs **kwargs
) )
except (
FileNotFoundError,
) as fdne:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {sockpath}\n'
) from fdne
stream = MsgpackUDSStream( tpt_stream = MsgpackUDSStream(
stream, stream,
prefix_size=prefix_size, prefix_size=prefix_size,
codec=codec codec=codec
) )
stream._raddr = addr # XXX assign from new addrs after peer-PID extract!
return stream (
tpt_stream._laddr,
tpt_stream._raddr,
) = cls.get_stream_addrs(stream)
return tpt_stream
@classmethod @classmethod
def get_stream_addrs( def get_stream_addrs(