Pass a `use_greenback: bool` runtime var to subs
Such that the top level `maybe_enable_greenback` from `open_root_actor()` can toggle the entire actor tree's usage. Read the rtv in `._rpc` tasks and only enable if set. Also, rigor up the `._rpc.process_messages()` loop to handle `Error()` and `case _:` separately such that we now raise an explicit rte for unknown / invalid msgs. Use "parent" / "child" for side descriptions in loop comments and put a fat comment before the `StartAck` in `_invoke()`.runtime_to_msgspec
parent
eca2c02f8b
commit
60aa16adf6
|
@ -116,6 +116,8 @@ async def open_root_actor(
|
||||||
os.environ['PYTHONBREAKPOINT'] = (
|
os.environ['PYTHONBREAKPOINT'] = (
|
||||||
'tractor.devx._debug.pause_from_sync'
|
'tractor.devx._debug.pause_from_sync'
|
||||||
)
|
)
|
||||||
|
_state._runtime_vars['use_greenback'] = True
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# TODO: disable `breakpoint()` by default (without
|
# TODO: disable `breakpoint()` by default (without
|
||||||
# `greenback`) since it will break any multi-actor
|
# `greenback`) since it will break any multi-actor
|
||||||
|
@ -386,14 +388,20 @@ async def open_root_actor(
|
||||||
_state._last_actor_terminated = actor
|
_state._last_actor_terminated = actor
|
||||||
|
|
||||||
# restore built-in `breakpoint()` hook state
|
# restore built-in `breakpoint()` hook state
|
||||||
if debug_mode:
|
if (
|
||||||
|
debug_mode
|
||||||
|
and
|
||||||
|
maybe_enable_greenback
|
||||||
|
):
|
||||||
if builtin_bp_handler is not None:
|
if builtin_bp_handler is not None:
|
||||||
sys.breakpointhook = builtin_bp_handler
|
sys.breakpointhook = builtin_bp_handler
|
||||||
|
|
||||||
if orig_bp_path is not None:
|
if orig_bp_path is not None:
|
||||||
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
|
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# clear env back to having no entry
|
# clear env back to having no entry
|
||||||
os.environ.pop('PYTHONBREAKPOINT')
|
os.environ.pop('PYTHONBREAKPOINT', None)
|
||||||
|
|
||||||
logger.runtime("Root actor terminated")
|
logger.runtime("Root actor terminated")
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,6 @@ from trio import (
|
||||||
TaskStatus,
|
TaskStatus,
|
||||||
)
|
)
|
||||||
|
|
||||||
from .msg import NamespacePath
|
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
from ._context import (
|
from ._context import (
|
||||||
Context,
|
Context,
|
||||||
|
@ -61,6 +60,11 @@ from .devx import (
|
||||||
)
|
)
|
||||||
from . import _state
|
from . import _state
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
|
from .msg import (
|
||||||
|
current_codec,
|
||||||
|
MsgCodec,
|
||||||
|
NamespacePath,
|
||||||
|
)
|
||||||
from tractor.msg.types import (
|
from tractor.msg.types import (
|
||||||
CancelAck,
|
CancelAck,
|
||||||
Error,
|
Error,
|
||||||
|
@ -98,6 +102,7 @@ async def _invoke_non_context(
|
||||||
Context | BaseException
|
Context | BaseException
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
):
|
):
|
||||||
|
__tracebackhide__: bool = True
|
||||||
|
|
||||||
# TODO: can we unify this with the `context=True` impl below?
|
# TODO: can we unify this with the `context=True` impl below?
|
||||||
if inspect.isasyncgen(coro):
|
if inspect.isasyncgen(coro):
|
||||||
|
@ -398,7 +403,11 @@ async def _invoke(
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
treat_as_gen: bool = False
|
treat_as_gen: bool = False
|
||||||
|
|
||||||
if _state.debug_mode():
|
if (
|
||||||
|
_state.debug_mode()
|
||||||
|
and
|
||||||
|
_state._runtime_vars['use_greenback']
|
||||||
|
):
|
||||||
# XXX for .pause_from_sync()` usage we need to make sure
|
# XXX for .pause_from_sync()` usage we need to make sure
|
||||||
# `greenback` is boostrapped in the subactor!
|
# `greenback` is boostrapped in the subactor!
|
||||||
await _debug.maybe_init_greenback()
|
await _debug.maybe_init_greenback()
|
||||||
|
@ -512,10 +521,22 @@ async def _invoke(
|
||||||
# wrapper that calls `Context.started()` and then does
|
# wrapper that calls `Context.started()` and then does
|
||||||
# the `await coro()`?
|
# the `await coro()`?
|
||||||
|
|
||||||
# a "context" endpoint type is the most general and
|
# ------ - ------
|
||||||
# "least sugary" type of RPC ep with support for
|
# a "context" endpoint is the most general and
|
||||||
|
# "least sugary" type of RPC with support for
|
||||||
# bi-dir streaming B)
|
# bi-dir streaming B)
|
||||||
# StartAck
|
#
|
||||||
|
# the concurrency relation is simlar to a task nursery
|
||||||
|
# wherein a "parent" task (the one that enters
|
||||||
|
# `trio.open_nursery()` in some actor "opens" (via
|
||||||
|
# `Portal.open_context()`) an IPC ctx to another peer
|
||||||
|
# (which is maybe a sub-) actor who then schedules (aka
|
||||||
|
# `trio.Nursery.start()`s) a new "child" task to execute
|
||||||
|
# the `@context` annotated func; that is this func we're
|
||||||
|
# running directly below!
|
||||||
|
# ------ - ------
|
||||||
|
#
|
||||||
|
# StartAck: respond immediately with endpoint info
|
||||||
await chan.send(
|
await chan.send(
|
||||||
StartAck(
|
StartAck(
|
||||||
cid=cid,
|
cid=cid,
|
||||||
|
@ -524,11 +545,11 @@ async def _invoke(
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: should we also use an `.open_context()` equiv
|
# TODO: should we also use an `.open_context()` equiv
|
||||||
# for this callee side by factoring the impl from
|
# for this child side by factoring the impl from
|
||||||
# `Portal.open_context()` into a common helper?
|
# `Portal.open_context()` into a common helper?
|
||||||
#
|
#
|
||||||
# NOTE: there are many different ctx state details
|
# NOTE: there are many different ctx state details
|
||||||
# in a callee side instance according to current impl:
|
# in a child side instance according to current impl:
|
||||||
# - `.cancelled_caught` can never be `True`.
|
# - `.cancelled_caught` can never be `True`.
|
||||||
# -> the below scope is never exposed to the
|
# -> the below scope is never exposed to the
|
||||||
# `@context` marked RPC function.
|
# `@context` marked RPC function.
|
||||||
|
@ -554,7 +575,7 @@ async def _invoke(
|
||||||
|
|
||||||
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
||||||
# called by any of,
|
# called by any of,
|
||||||
# - *this* callee task manually calling `ctx.cancel()`.
|
# - *this* child task manually calling `ctx.cancel()`.
|
||||||
# - the runtime calling `ctx._deliver_msg()` which
|
# - the runtime calling `ctx._deliver_msg()` which
|
||||||
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
|
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
|
||||||
# which cancels the scope presuming the input error
|
# which cancels the scope presuming the input error
|
||||||
|
@ -631,10 +652,11 @@ async def _invoke(
|
||||||
# f' |_{ctx}'
|
# f' |_{ctx}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# task-contex was either cancelled by request using
|
# task-contex was either cancelled by request
|
||||||
# ``Portal.cancel_actor()`` or ``Context.cancel()``
|
# using ``Portal.cancel_actor()`` or
|
||||||
# on the far end, or it was cancelled by the local
|
# ``Context.cancel()`` on the far end, or it
|
||||||
# (callee) task, so relay this cancel signal to the
|
# was cancelled by the local child (or callee)
|
||||||
|
# task, so relay this cancel signal to the
|
||||||
# other side.
|
# other side.
|
||||||
ctxc = ContextCancelled(
|
ctxc = ContextCancelled(
|
||||||
message=msg,
|
message=msg,
|
||||||
|
@ -655,7 +677,7 @@ async def _invoke(
|
||||||
|
|
||||||
) as scope_error:
|
) as scope_error:
|
||||||
|
|
||||||
# always set this (callee) side's exception as the
|
# always set this (child) side's exception as the
|
||||||
# local error on the context
|
# local error on the context
|
||||||
ctx._local_error: BaseException = scope_error
|
ctx._local_error: BaseException = scope_error
|
||||||
|
|
||||||
|
@ -1024,9 +1046,8 @@ async def process_messages(
|
||||||
trio.Event(),
|
trio.Event(),
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX remote (runtime scoped) error or uknown
|
# runtime-scoped remote error (since no `.cid`)
|
||||||
# msg (type).
|
case Error():
|
||||||
case Error() | _:
|
|
||||||
# NOTE: this is the non-rpc error case,
|
# NOTE: this is the non-rpc error case,
|
||||||
# that is, an error **not** raised inside
|
# that is, an error **not** raised inside
|
||||||
# a call to ``_invoke()`` (i.e. no cid was
|
# a call to ``_invoke()`` (i.e. no cid was
|
||||||
|
@ -1034,10 +1055,6 @@ async def process_messages(
|
||||||
# this error to all local channel
|
# this error to all local channel
|
||||||
# consumers (normally portals) by marking
|
# consumers (normally portals) by marking
|
||||||
# the channel as errored
|
# the channel as errored
|
||||||
log.exception(
|
|
||||||
f'Unhandled IPC msg:\n\n'
|
|
||||||
f'{msg}\n'
|
|
||||||
)
|
|
||||||
# assert chan.uid
|
# assert chan.uid
|
||||||
chan._exc: Exception = unpack_error(
|
chan._exc: Exception = unpack_error(
|
||||||
msg,
|
msg,
|
||||||
|
@ -1045,6 +1062,17 @@ async def process_messages(
|
||||||
)
|
)
|
||||||
raise chan._exc
|
raise chan._exc
|
||||||
|
|
||||||
|
# unknown/invalid msg type?
|
||||||
|
case _:
|
||||||
|
codec: MsgCodec = current_codec()
|
||||||
|
message: str = (
|
||||||
|
f'Unhandled IPC msg for codec?\n\n'
|
||||||
|
f'|_{codec}\n\n'
|
||||||
|
f'{msg}\n'
|
||||||
|
)
|
||||||
|
log.exception(message)
|
||||||
|
raise RuntimeError(message)
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Waiting on next IPC msg from\n'
|
'Waiting on next IPC msg from\n'
|
||||||
f'peer: {chan.uid}\n'
|
f'peer: {chan.uid}\n'
|
||||||
|
|
Loading…
Reference in New Issue