Compare commits

...

3 Commits

Author SHA1 Message Date
Tyler Goodlet 90bfdaf58c Drop extra newline from log msg 2024-03-25 15:03:33 -04:00
Tyler Goodlet 507cd96904 Change all `| None` -> `|None` in `._runtime` 2024-03-25 14:15:36 -04:00
Tyler Goodlet 2588e54867 Add todo-notes for hiding `@acm` frames
In the particular case of the `Portal.open_context().__aexit__()` frame,
due to usage of `contextlib.asynccontextmanager`, we can't easily hook
into monkeypatching a `__tracebackhide__` set nor catch-n-reraise around
the block exit without defining our own `.__aexit__()` impl. Thus, it's
prolly most sane to do something with an override of
`contextlib._AsyncGeneratorContextManager` or the public exposed
`AsyncContextDecorator` (which uses the former internally right?).

Also fixup some old `._invoke` mod paths in comments and just show
`str(eoc)` in `.open_stream().__aexit__()` terminated-by-EoC log msg
since the `repr()` form won't pprint the IPC msg nicely..
2024-03-24 16:49:07 -04:00
3 changed files with 42 additions and 27 deletions

View File

@ -351,7 +351,7 @@ class Context:
by the runtime in 2 ways: by the runtime in 2 ways:
- by entering ``Portal.open_context()`` which is the primary - by entering ``Portal.open_context()`` which is the primary
public API for any "caller" task or, public API for any "caller" task or,
- by the RPC machinery's `._runtime._invoke()` as a `ctx` arg - by the RPC machinery's `._rpc._invoke()` as a `ctx` arg
to a remotely scheduled "callee" function. to a remotely scheduled "callee" function.
AND is always constructed using the below ``mk_context()``. AND is always constructed using the below ``mk_context()``.
@ -361,10 +361,10 @@ class Context:
`trio.Task`s. Contexts are allocated on each side of any task `trio.Task`s. Contexts are allocated on each side of any task
RPC-linked msg dialog, i.e. for every request to a remote RPC-linked msg dialog, i.e. for every request to a remote
actor from a `Portal`. On the "callee" side a context is actor from a `Portal`. On the "callee" side a context is
always allocated inside ``._runtime._invoke()``. always allocated inside ``._rpc._invoke()``.
# TODO: more detailed writeup on cancellation, error and TODO: more detailed writeup on cancellation, error and
# streaming semantics.. streaming semantics..
A context can be cancelled and (possibly eventually restarted) from A context can be cancelled and (possibly eventually restarted) from
either side of the underlying IPC channel, it can also open task either side of the underlying IPC channel, it can also open task
@ -1206,7 +1206,9 @@ class Context:
# await pause() # await pause()
log.warning( log.warning(
'Stream was terminated by EoC\n\n' 'Stream was terminated by EoC\n\n'
f'{repr(eoc)}\n' # NOTE: won't show the error <Type> but
# does show txt followed by IPC msg.
f'{str(eoc)}\n'
) )
finally: finally:
@ -1303,7 +1305,7 @@ class Context:
# `._cancel_called == True`. # `._cancel_called == True`.
not raise_overrun_from_self not raise_overrun_from_self
and isinstance(remote_error, RemoteActorError) and isinstance(remote_error, RemoteActorError)
and remote_error.msgdata['type_str'] == 'StreamOverrun' and remote_error.msgdata['boxed_type_str'] == 'StreamOverrun'
and tuple(remote_error.msgdata['sender']) == our_uid and tuple(remote_error.msgdata['sender']) == our_uid
): ):
# NOTE: we set the local scope error to any "self # NOTE: we set the local scope error to any "self
@ -1880,6 +1882,19 @@ class Context:
return False return False
# TODO: exception tb masking by using a manual
# `.__aexit__()`/.__aenter__()` pair on a type?
# => currently this is one of the few places we can't easily
# mask errors - on the exit side of a `Portal.open_context()`..
# there's # => currently this is one of the few places we can't
# there's 2 ways to approach it:
# - manually write an @acm type as per above
# - use `contextlib.AsyncContextDecorator` to override the default
# impl to suppress traceback frames:
# * https://docs.python.org/3/library/contextlib.html#contextlib.AsyncContextDecorator
# * https://docs.python.org/3/library/contextlib.html#contextlib.ContextDecorator
# - also we could just override directly the underlying
# `contextlib._AsyncGeneratorContextManager`?
@acm @acm
async def open_context_from_portal( async def open_context_from_portal(
portal: Portal, portal: Portal,

View File

@ -140,16 +140,16 @@ class Actor:
msg_buffer_size: int = 2**6 msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()` after fork # nursery placeholders filled in by `async_main()` after fork
_root_n: Nursery | None = None _root_n: Nursery|None = None
_service_n: Nursery | None = None _service_n: Nursery|None = None
_server_n: Nursery | None = None _server_n: Nursery|None = None
# Information about `__main__` from parent # Information about `__main__` from parent
_parent_main_data: dict[str, str] _parent_main_data: dict[str, str]
_parent_chan_cs: CancelScope | None = None _parent_chan_cs: CancelScope|None = None
# syncs for setup/teardown sequences # syncs for setup/teardown sequences
_server_down: trio.Event | None = None _server_down: trio.Event|None = None
# user toggled crash handling (including monkey-patched in # user toggled crash handling (including monkey-patched in
# `trio.open_nursery()` via `.trionics._supervisor` B) # `trio.open_nursery()` via `.trionics._supervisor` B)
@ -178,7 +178,7 @@ class Actor:
spawn_method: str|None = None, spawn_method: str|None = None,
# TODO: remove! # TODO: remove!
arbiter_addr: tuple[str, int] | None = None, arbiter_addr: tuple[str, int]|None = None,
) -> None: ) -> None:
''' '''
@ -193,7 +193,7 @@ class Actor:
) )
self._cancel_complete = trio.Event() self._cancel_complete = trio.Event()
self._cancel_called_by_remote: tuple[str, tuple] | None = None self._cancel_called_by_remote: tuple[str, tuple]|None = None
self._cancel_called: bool = False self._cancel_called: bool = False
# retreive and store parent `__main__` data which # retreive and store parent `__main__` data which
@ -249,11 +249,11 @@ class Actor:
] = {} ] = {}
self._listeners: list[trio.abc.Listener] = [] self._listeners: list[trio.abc.Listener] = []
self._parent_chan: Channel | None = None self._parent_chan: Channel|None = None
self._forkserver_info: tuple | None = None self._forkserver_info: tuple|None = None
self._actoruid2nursery: dict[ self._actoruid2nursery: dict[
tuple[str, str], tuple[str, str],
ActorNursery | None, ActorNursery|None,
] = {} # type: ignore # noqa ] = {} # type: ignore # noqa
# when provided, init the registry addresses property from # when provided, init the registry addresses property from
@ -779,7 +779,7 @@ class Actor:
# #
# side: str|None = None, # side: str|None = None,
msg_buffer_size: int | None = None, msg_buffer_size: int|None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
) -> Context: ) -> Context:
@ -844,7 +844,7 @@ class Actor:
kwargs: dict, kwargs: dict,
# IPC channel config # IPC channel config
msg_buffer_size: int | None = None, msg_buffer_size: int|None = None,
allow_overruns: bool = False, allow_overruns: bool = False,
load_nsf: bool = False, load_nsf: bool = False,
@ -918,11 +918,11 @@ class Actor:
async def _from_parent( async def _from_parent(
self, self,
parent_addr: tuple[str, int] | None, parent_addr: tuple[str, int]|None,
) -> tuple[ ) -> tuple[
Channel, Channel,
list[tuple[str, int]] | None, list[tuple[str, int]]|None,
]: ]:
''' '''
Bootstrap this local actor's runtime config from its parent by Bootstrap this local actor's runtime config from its parent by
@ -943,7 +943,7 @@ class Actor:
# Initial handshake: swap names. # Initial handshake: swap names.
await self._do_handshake(chan) await self._do_handshake(chan)
accept_addrs: list[tuple[str, int]] | None = None accept_addrs: list[tuple[str, int]]|None = None
if self._spawn_method == "trio": if self._spawn_method == "trio":
# Receive runtime state from our parent # Receive runtime state from our parent
parent_data: dict[str, Any] parent_data: dict[str, Any]
@ -1007,7 +1007,7 @@ class Actor:
handler_nursery: Nursery, handler_nursery: Nursery,
*, *,
# (host, port) to bind for channel server # (host, port) to bind for channel server
listen_sockaddrs: list[tuple[str, int]] | None = None, listen_sockaddrs: list[tuple[str, int]]|None = None,
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -1464,7 +1464,7 @@ class Actor:
async def async_main( async def async_main(
actor: Actor, actor: Actor,
accept_addrs: tuple[str, int] | None = None, accept_addrs: tuple[str, int]|None = None,
# XXX: currently ``parent_addr`` is only needed for the # XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to # ``multiprocessing`` backend (which pickles state sent to
@ -1473,7 +1473,7 @@ async def async_main(
# change this to a simple ``is_subactor: bool`` which will # change this to a simple ``is_subactor: bool`` which will
# be False when running as root actor and True when as # be False when running as root actor and True when as
# a subactor. # a subactor.
parent_addr: tuple[str, int] | None = None, parent_addr: tuple[str, int]|None = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -1496,7 +1496,7 @@ async def async_main(
try: try:
# establish primary connection with immediate parent # establish primary connection with immediate parent
actor._parent_chan: Channel | None = None actor._parent_chan: Channel|None = None
if parent_addr is not None: if parent_addr is not None:
( (
@ -1795,7 +1795,7 @@ class Arbiter(Actor):
self, self,
name: str, name: str,
) -> tuple[str, int] | None: ) -> tuple[str, int]|None:
for uid, sockaddr in self._registry.items(): for uid, sockaddr in self._registry.items():
if name in uid: if name in uid:

View File

@ -583,7 +583,7 @@ async def open_nursery(
finally: finally:
msg: str = ( msg: str = (
'Actor-nursery exited\n' 'Actor-nursery exited\n'
f'|_{an}\n\n' f'|_{an}\n'
) )
# shutdown runtime if it was started # shutdown runtime if it was started