Compare commits

..

7 Commits

Author SHA1 Message Date
Tyler Goodlet 338ea5529c .log: more multi-line styling 2025-03-14 16:41:08 -04:00
Tyler Goodlet 6bc67338cf Better subproc supervisor logging, todo for #320
Given i just similarly revamped a buncha `._runtime` log msg formatting,
might as well do something similar inside the spawning machinery such
that groking teardown sequences of each supervising task is much more
sane XD

Mostly this includes doing similar `'<field>: <value>\n'` multi-line
formatting when reporting various subproc supervision steps as well as
showing a detailed `trio.Process.__repr__()` as appropriate.

Also adds a detailed #TODO according to the needs of #320 for which
we're going to need some internal mechanism for intermediary parent
actors to determine if a given debug tty locker (sub-actor) is one of
*their* (transitive) children and thus stall the normal
cancellation/teardown sequence until that locker is complete.
2025-03-14 16:41:06 -04:00
Tyler Goodlet fd20004757 _supervise: iter nice expanded multi-line `._children` tups with typing 2025-03-14 16:34:17 -04:00
Tyler Goodlet ddc2e5f0f8 WIP: solved the modden client hang.. 2025-03-14 16:34:10 -04:00
Tyler Goodlet 4b0aa5e379 Baboso! fix `chan.send(None)` indent.. 2025-03-14 15:49:37 -04:00
Tyler Goodlet 6a303358df Improved log msg formatting in core
As part of solving some final edge cases todo with inter-peer remote
cancellation (particularly a remote cancel from a separate actor
tree-client hanging on the request side in `modden`..) I needed less
dense, more line-delimited log msg formats when understanding ipc
channel and context cancels from console logging; this adds a ton of
that to:
- `._invoke()` which now does,
  - better formatting of `Context`-task info as multi-line
    `'<field>: <value>\n'` messages,
  - use of `trio.Task` (from `.lowlevel.current_task()` for full
    rpc-func namespace-path info,
  - better "msg flow annotations" with `<=` for understanding
    `ContextCancelled` flow.
- `Actor._stream_handler()` where in we break down IPC peers reporting
  better as multi-line `|_<Channel>` log msgs instead of all jammed on
  one line..
- `._ipc.Channel.send()` use `pformat()` for repr of packet.

Also tweak some optional deps imports for debug mode:
- add `maybe_import_gb()` for attempting to import `greenback`.
- maybe enable `stackscope` tree pprinter on `SIGUSR1` if installed.

Add a further stale-debugger-lock guard before removal:
- read the `._debug.Lock.global_actor_in_debug: tuple` uid and possibly
  `maybe_wait_for_debugger()` when the child-user is known to have
  a live process in our tree.
- only cancel `Lock._root_local_task_cs_in_debug: CancelScope` when
  the disconnected channel maps to the `Lock.global_actor_in_debug`,
  though not sure this is correct yet?

Started adding missing type annots in sections that were modified.
2025-03-14 15:49:36 -04:00
Tyler Goodlet c85757aee1 Let `pack_error()` take a msg injected `cid: str|None` 2025-03-14 15:31:16 -04:00
9 changed files with 1225 additions and 313 deletions

View File

@ -43,12 +43,17 @@ import warnings
import trio
# from .devx import (
# maybe_wait_for_debugger,
# pause,
# )
from ._exceptions import (
# _raise_from_no_key_in_msg,
unpack_error,
pack_error,
ContextCancelled,
# MessagingError,
RemoteActorError,
StreamOverrun,
)
from .log import get_logger
@ -64,6 +69,164 @@ if TYPE_CHECKING:
log = get_logger(__name__)
async def _drain_to_final_msg(
ctx: Context,
) -> list[dict]:
# ) -> tuple[
# Any|Exception,
# list[dict],
# ]:
raise_overrun: bool = not ctx._allow_overruns
# wait for a final context result by collecting (but
# basically ignoring) any bi-dir-stream msgs still in transit
# from the far end.
pre_result_drained: list[dict] = []
while not ctx._remote_error:
try:
# NOTE: this REPL usage actually works here dawg! Bo
# from .devx._debug import pause
# await pause()
# if re := ctx._remote_error:
# ctx._maybe_raise_remote_err(
# re,
# # NOTE: obvi we don't care if we
# # overran the far end if we're already
# # waiting on a final result (msg).
# raise_overrun_from_self=raise_overrun,
# )
# TODO: bad idea?
# with trio.CancelScope() as res_cs:
# ctx._res_scope = res_cs
# msg: dict = await ctx._recv_chan.receive()
# if res_cs.cancelled_caught:
# from .devx._debug import pause
# await pause()
msg: dict = await ctx._recv_chan.receive()
ctx._result: Any = msg['return']
log.runtime(
'Context delivered final result msg:\n'
f'{pformat(msg)}'
)
pre_result_drained.append(msg)
# NOTE: we don't need to do this right?
# XXX: only close the rx mem chan AFTER
# a final result is retreived.
# if ctx._recv_chan:
# await ctx._recv_chan.aclose()
break
# NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases:
# 1. we requested the cancellation and thus
# SHOULD NOT raise that far end error,
# 2. WE DID NOT REQUEST that cancel and thus
# SHOULD RAISE HERE!
except trio.Cancelled:
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is
# the source cause of this local task's
# cancellation.
if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re)
# CASE 1: we DID request the cancel we simply
# continue to bubble up as normal.
raise
except KeyError:
if 'yield' in msg:
# far end task is still streaming to us so discard
log.warning(f'Discarding std "yield"\n{msg}')
pre_result_drained.append(msg)
continue
# TODO: work out edge cases here where
# a stream is open but the task also calls
# this?
# -[ ] should be a runtime error if a stream is open
# right?
elif 'stop' in msg:
log.cancel(
'Remote stream terminated due to "stop" msg:\n'
f'{msg}'
)
pre_result_drained.append(msg)
continue
# internal error should never get here
assert msg.get('cid'), (
"Received internal error at portal?"
)
# XXX fallthrough to handle expected error XXX
re: Exception|None = ctx._remote_error
if re:
log.critical(
'Remote ctx terminated due to "error" msg:\n'
f'{re}'
)
assert msg is ctx._cancel_msg
# NOTE: this solved a super dupe edge case XD
# this was THE super duper edge case of:
# - local task opens a remote task,
# - requests remote cancellation of far end
# ctx/tasks,
# - needs to wait for the cancel ack msg
# (ctxc) or some result in the race case
# where the other side's task returns
# before the cancel request msg is ever
# rxed and processed,
# - here this surrounding drain loop (which
# iterates all ipc msgs until the ack or
# an early result arrives) was NOT exiting
# since we are the edge case: local task
# does not re-raise any ctxc it receives
# IFF **it** was the cancellation
# requester..
# will raise if necessary, ow break from
# loop presuming any error terminates the
# context!
ctx._maybe_raise_remote_err(
re,
# NOTE: obvi we don't care if we
# overran the far end if we're already
# waiting on a final result (msg).
# raise_overrun_from_self=False,
raise_overrun_from_self=raise_overrun,
)
break # OOOOOF, yeah obvi we need this..
# XXX we should never really get here
# right! since `._deliver_msg()` should
# always have detected an {'error': ..}
# msg and already called this right!?!
elif error := unpack_error(
msg=msg,
chan=ctx._portal.channel,
hide_tb=False,
):
log.critical('SHOULD NEVER GET HERE!?')
assert msg is ctx._cancel_msg
assert error.msgdata == ctx._remote_error.msgdata
from .devx._debug import pause
await pause()
ctx._maybe_cancel_and_set_remote_error(error)
ctx._maybe_raise_remote_err(error)
else:
# bubble the original src key error
raise
return pre_result_drained
# TODO: make this a msgspec.Struct!
@dataclass
class Context:
@ -118,6 +281,7 @@ class Context:
# which is exactly the primitive that allows for
# cross-actor-task-supervision and thus SC.
_scope: trio.CancelScope | None = None
# _res_scope: trio.CancelScope|None = None
# on a clean exit there should be a final value
# delivered from the far end "callee" task, so
@ -205,6 +369,10 @@ class Context:
)
)
# @property
# def is_waiting_result(self) -> bool:
# return bool(self._res_scope)
@property
def side(self) -> str:
'''
@ -247,7 +415,11 @@ class Context:
await self.chan.send({'yield': data, 'cid': self.cid})
async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid})
# await pause()
await self.chan.send({
'stop': True,
'cid': self.cid
})
def _maybe_cancel_and_set_remote_error(
self,
@ -320,27 +492,37 @@ class Context:
# XXX: set the remote side's error so that after we cancel
# whatever task is the opener of this context it can raise
# that error as the reason.
# if self._remote_error:
# return
# breakpoint()
log.cancel(
'Setting remote error for ctx \n'
f'<= remote ctx uid: {self.chan.uid}\n'
f'=>\n{error}'
)
self._remote_error: BaseException = error
if (
isinstance(error, ContextCancelled)
):
# always record the cancelling actor's uid since its cancellation
# state is linked and we want to know which process was
# the cause / requester of the cancellation.
self._canceller = error.canceller
log.cancel(
'Remote task-context was cancelled for '
f'actor: {self.chan.uid}\n'
f'task: {self.cid}\n'
f'canceller: {error.canceller}\n'
)
# always record the cancelling actor's uid since its cancellation
# state is linked and we want to know which process was
# the cause / requester of the cancellation.
# if error.canceller is None:
# import pdbp; pdbp.set_trace()
# breakpoint()
self._canceller = error.canceller
if self._cancel_called:
# from ._debug import breakpoint
# await breakpoint()
# this is an expected cancel request response message
# and we **don't need to raise it** in local cancel
# scope since it will potentially override a real error.
@ -348,10 +530,11 @@ class Context:
else:
log.error(
f'Remote context error,\n'
f'remote actor: {self.chan.uid}\n'
f'task: {self.cid}\n'
f'{error}'
f'Remote context error:\n'
f'{error}\n'
f'{pformat(self)}\n'
# f'remote actor: {self.chan.uid}\n'
# f'cid: {self.cid}\n'
)
self._canceller = self.chan.uid
@ -376,9 +559,11 @@ class Context:
self._scope.cancel()
# NOTE: this REPL usage actually works here dawg! Bo
# from .devx._debug import pause
# await pause()
# TODO: maybe we have to use `._res_scope.cancel()` if it
# exists?
async def cancel(
self,
timeout: float = 0.616,
@ -395,6 +580,8 @@ class Context:
log.cancel(
f'Cancelling {side} side of context to {self.chan.uid}'
)
# await pause()
self._cancel_called: bool = True
# caller side who entered `Portal.open_context()`
@ -484,13 +671,11 @@ class Context:
'''
actor: Actor = current_actor()
# here we create a mem chan that corresponds to the
# far end caller / callee.
# Likewise if the surrounding context has been cancelled we error here
# since it likely means the surrounding block was exited or
# killed
# If the surrounding context has been cancelled by some
# task with a handle to THIS, we error here immediately
# since it likely means the surrounding lexical-scope has
# errored, been `trio.Cancelled` or at the least
# `Context.cancel()` was called by some task.
if self._cancel_called:
# XXX NOTE: ALWAYS RAISE any remote error here even if
@ -503,6 +688,11 @@ class Context:
# actually try to stream - a cancel msg was already
# sent to the other side!
if self._remote_error:
# NOTE: this is diff then calling
# `._maybe_raise_from_remote_msg()` specifically
# because any task entering this `.open_stream()`
# AFTER cancellation has already been requested,
# we DO NOT want to absorb any ctxc ACK silently!
raise self._remote_error
# XXX NOTE: if no `ContextCancelled` has been responded
@ -529,7 +719,7 @@ class Context:
# to send a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error
# currently.
ctx = actor.get_context(
ctx: Context = actor.get_context(
self.chan,
self.cid,
msg_buffer_size=msg_buffer_size,
@ -548,6 +738,19 @@ class Context:
'The underlying channel for this stream was already closed!?'
)
# NOTE: implicitly this will call `MsgStream.aclose()` on
# `.__aexit__()` due to stream's parent `Channel` type!
#
# XXX NOTE XXX: ensures the stream is "one-shot use",
# which specifically means that on exit,
# - signal ``trio.EndOfChannel``/``StopAsyncIteration`` to
# the far end indicating that the caller exited
# the streaming context purposefully by letting
# the exit block exec.
# - this is diff from the cancel/error case where
# a cancel request from this side or an error
# should be sent to the far end indicating the
# stream WAS NOT just closed normally/gracefully.
async with MsgStream(
ctx=self,
rx_chan=ctx._recv_chan,
@ -567,11 +770,37 @@ class Context:
# await trio.lowlevel.checkpoint()
yield stream
# NOTE: Make the stream "one-shot use". On exit,
# signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to
# the far end.
await stream.aclose()
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
# ``Actor._push_result()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the
# child has already cleared it and clobbered IPC.
#
# await maybe_wait_for_debugger()
# XXX TODO: pretty sure this isn't needed (see
# note above this block) AND will result in
# a double `.send_stop()` call. The only reason to
# put it here would be to due with "order" in
# terms of raising any remote error (as per
# directly below) or bc the stream's
# `.__aexit__()` block might not get run
# (doubtful)? Either way if we did put this back
# in we also need a state var to avoid the double
# stop-msg send..
#
# await stream.aclose()
# if re := ctx._remote_error:
# ctx._maybe_raise_remote_err(
# re,
# raise_ctxc_from_self_call=True,
# )
# await trio.lowlevel.checkpoint()
finally:
if self._portal:
@ -587,7 +816,10 @@ class Context:
def _maybe_raise_remote_err(
self,
err: Exception,
) -> None:
raise_ctxc_from_self_call: bool = False,
raise_overrun_from_self: bool = True,
) -> ContextCancelled|None:
'''
Maybe raise a remote error depending on who (which task from
which actor) requested a cancellation (if any).
@ -603,13 +835,21 @@ class Context:
# "error"-msg.
our_uid: tuple[str, str] = current_actor().uid
if (
isinstance(err, ContextCancelled)
(not raise_ctxc_from_self_call
and isinstance(err, ContextCancelled)
and (
self._cancel_called
or self.chan._cancel_called
or self.canceller == our_uid
or tuple(err.canceller) == our_uid
or tuple(err.canceller) == our_uid)
)
or
(not raise_overrun_from_self
and isinstance(err, RemoteActorError)
and err.msgdata['type_str'] == 'StreamOverrun'
and tuple(err.msgdata['sender']) == our_uid
)
):
# NOTE: we set the local scope error to any "self
# cancellation" error-response thus "absorbing"
@ -661,77 +901,196 @@ class Context:
assert self._portal, "Context.result() can not be called from callee!"
assert self._recv_chan
if re := self._remote_error:
return self._maybe_raise_remote_err(re)
raise_overrun: bool = not self._allow_overruns
# if re := self._remote_error:
# return self._maybe_raise_remote_err(
# re,
# # NOTE: obvi we don't care if we
# # overran the far end if we're already
# # waiting on a final result (msg).
# raise_overrun_from_self=raise_overrun,
# )
res_placeholder: int = id(self)
if (
self._result == id(self)
self._result == res_placeholder
and not self._remote_error
and not self._recv_chan._closed # type: ignore
):
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
while True:
try:
msg = await self._recv_chan.receive()
self._result: Any = msg['return']
# NOTE: we don't need to do this right?
# XXX: only close the rx mem chan AFTER
# a final result is retreived.
# if self._recv_chan:
# await self._recv_chan.aclose()
break
# NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases:
# 1. we requested the cancellation and thus
# SHOULD NOT raise that far end error,
# 2. WE DID NOT REQUEST that cancel and thus
# SHOULD RAISE HERE!
except trio.Cancelled:
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is the
# (likely) source cause of this local runtime
# task's cancellation.
if re := self._remote_error:
self._maybe_raise_remote_err(re)
# CASE 1: we DID request the cancel we simply
# continue to bubble up as normal.
raise
except KeyError: # as msgerr:
if 'yield' in msg:
# far end task is still streaming to us so discard
log.warning(f'Discarding stream delivered {msg}')
continue
elif 'stop' in msg:
log.debug('Remote stream terminated')
continue
# internal error should never get here
assert msg.get('cid'), (
"Received internal error at portal?"
# wait for a final context result by collecting (but
# basically ignoring) any bi-dir-stream msgs still in transit
# from the far end.
drained_msgs: list[dict] = await _drain_to_final_msg(ctx=self)
log.runtime(
'Ctx drained pre-result msgs:\n'
f'{drained_msgs}'
)
if err:= unpack_error(
msg,
self._portal.channel
): # from msgerr
self._maybe_cancel_and_set_remote_error(err)
self._maybe_raise_remote_err(err)
# TODO: implement via helper func ^^^^
# pre_result_drained: list[dict] = []
# while not self._remote_error:
# try:
# # NOTE: this REPL usage actually works here dawg! Bo
# # from .devx._debug import pause
# # await pause()
# # if re := self._remote_error:
# # self._maybe_raise_remote_err(
# # re,
# # # NOTE: obvi we don't care if we
# # # overran the far end if we're already
# # # waiting on a final result (msg).
# # raise_overrun_from_self=raise_overrun,
# # )
else:
raise
# # TODO: bad idea?
# # with trio.CancelScope() as res_cs:
# # self._res_scope = res_cs
# # msg: dict = await self._recv_chan.receive()
# # if res_cs.cancelled_caught:
if re := self._remote_error:
return self._maybe_raise_remote_err(re)
# # from .devx._debug import pause
# # await pause()
# msg: dict = await self._recv_chan.receive()
# self._result: Any = msg['return']
# log.runtime(
# 'Context delivered final result msg:\n'
# f'{pformat(msg)}'
# )
# # NOTE: we don't need to do this right?
# # XXX: only close the rx mem chan AFTER
# # a final result is retreived.
# # if self._recv_chan:
# # await self._recv_chan.aclose()
# break
# # NOTE: we get here if the far end was
# # `ContextCancelled` in 2 cases:
# # 1. we requested the cancellation and thus
# # SHOULD NOT raise that far end error,
# # 2. WE DID NOT REQUEST that cancel and thus
# # SHOULD RAISE HERE!
# except trio.Cancelled:
# # CASE 2: mask the local cancelled-error(s)
# # only when we are sure the remote error is
# # the source cause of this local task's
# # cancellation.
# if re := self._remote_error:
# self._maybe_raise_remote_err(re)
# # CASE 1: we DID request the cancel we simply
# # continue to bubble up as normal.
# raise
# except KeyError:
# if 'yield' in msg:
# # far end task is still streaming to us so discard
# log.warning(f'Discarding std "yield"\n{msg}')
# pre_result_drained.append(msg)
# continue
# # TODO: work out edge cases here where
# # a stream is open but the task also calls
# # this?
# # -[ ] should be a runtime error if a stream is open
# # right?
# elif 'stop' in msg:
# log.cancel(
# 'Remote stream terminated due to "stop" msg:\n'
# f'{msg}'
# )
# pre_result_drained.append(msg)
# continue
# # internal error should never get here
# assert msg.get('cid'), (
# "Received internal error at portal?"
# )
# # XXX fallthrough to handle expected error XXX
# re: Exception|None = self._remote_error
# if re:
# log.critical(
# 'Remote ctx terminated due to "error" msg:\n'
# f'{re}'
# )
# assert msg is self._cancel_msg
# # NOTE: this solved a super dupe edge case XD
# # this was THE super duper edge case of:
# # - local task opens a remote task,
# # - requests remote cancellation of far end
# # ctx/tasks,
# # - needs to wait for the cancel ack msg
# # (ctxc) or some result in the race case
# # where the other side's task returns
# # before the cancel request msg is ever
# # rxed and processed,
# # - here this surrounding drain loop (which
# # iterates all ipc msgs until the ack or
# # an early result arrives) was NOT exiting
# # since we are the edge case: local task
# # does not re-raise any ctxc it receives
# # IFF **it** was the cancellation
# # requester..
# # will raise if necessary, ow break from
# # loop presuming any error terminates the
# # context!
# self._maybe_raise_remote_err(
# re,
# # NOTE: obvi we don't care if we
# # overran the far end if we're already
# # waiting on a final result (msg).
# # raise_overrun_from_self=False,
# raise_overrun_from_self=raise_overrun,
# )
# break # OOOOOF, yeah obvi we need this..
# # XXX we should never really get here
# # right! since `._deliver_msg()` should
# # always have detected an {'error': ..}
# # msg and already called this right!?!
# elif error := unpack_error(
# msg=msg,
# chan=self._portal.channel,
# hide_tb=False,
# ):
# log.critical('SHOULD NEVER GET HERE!?')
# assert msg is self._cancel_msg
# assert error.msgdata == self._remote_error.msgdata
# from .devx._debug import pause
# await pause()
# self._maybe_cancel_and_set_remote_error(error)
# self._maybe_raise_remote_err(error)
# else:
# # bubble the original src key error
# raise
if (
(re := self._remote_error)
and self._result == res_placeholder
):
maybe_err: Exception|None = self._maybe_raise_remote_err(
re,
# NOTE: obvi we don't care if we
# overran the far end if we're already
# waiting on a final result (msg).
# raise_overrun_from_self=False,
raise_overrun_from_self=(
raise_overrun
and
# only when we ARE NOT the canceller
# should we raise overruns, bc ow we're
# raising something we know might happen
# during cancellation ;)
(not self._cancel_called)
),
)
if maybe_err:
self._result = maybe_err
return self._result
@ -779,7 +1138,7 @@ class Context:
while self._overflow_q:
# NOTE: these msgs should never be errors since we always do
# the check prior to checking if we're in an overrun state
# inside ``.deliver_msg()``.
# inside ``._deliver_msg()``.
msg = self._overflow_q.popleft()
try:
await self._send_chan.send(msg)
@ -830,34 +1189,50 @@ class Context:
messages are eventually sent if possible.
'''
cid = self.cid
chan = self.chan
uid = chan.uid
cid: str = self.cid
chan: Channel = self.chan
from_uid: tuple[str, str] = chan.uid
send_chan: trio.MemorySendChannel = self._send_chan
log.runtime(
f"Delivering {msg} from {uid} to caller {cid}"
)
if (
msg.get('error') # check for field
and (
error := unpack_error(
if re := unpack_error(
msg,
self.chan,
)
)
):
log.error(
f'Delivering error-msg from {from_uid} to caller {cid}'
f'{re}'
)
self._cancel_msg = msg
self._maybe_cancel_and_set_remote_error(error)
self._maybe_cancel_and_set_remote_error(re)
if (
self._in_overrun
):
# XXX NEVER do this XXX..!!
# bc if the error is a ctxc and there is a task
# waiting on `.result()` we need the msg to be sent
# over the `send_chan`/`._recv_chan` so that the error
# is relayed to that waiter task..
# return True
#
# XXX ALSO NO!! XXX
# if self._remote_error:
# self._maybe_raise_remote_err(error)
if self._in_overrun:
log.warning(
f'Capturing overrun-msg from {from_uid} to caller {cid}'
f'{msg}'
)
self._overflow_q.append(msg)
return False
try:
log.runtime(
f'Delivering IPC `Context` msg:\n'
f'<= {from_uid}\n'
f'=> caller: {cid}\n'
f'{msg}'
)
# from .devx._debug import pause
# await pause()
send_chan.send_nowait(msg)
return True
# if an error is deteced we should always
@ -890,7 +1265,8 @@ class Context:
lines = [
f'OVERRUN on actor-task context {cid}@{local_uid}!\n'
# TODO: put remote task name here if possible?
f'remote sender actor: {uid}',
f'sender: {from_uid}',
f'msg: {msg}',
# TODO: put task func name here and maybe an arrow
# from sender to overrunner?
# f'local task {self.func_name}'
@ -926,11 +1302,19 @@ class Context:
# anything different.
return False
else:
# raise local overrun and immediately pack as IPC
# msg for far end.
try:
raise StreamOverrun(text)
raise StreamOverrun(
text,
sender=from_uid,
)
except StreamOverrun as err:
err_msg = pack_error(err)
err_msg['cid'] = cid
err_msg: dict[str, dict] = pack_error(
err,
cid=cid,
)
# err_msg['cid']: str = cid
try:
await chan.send(err_msg)
except trio.BrokenResourceError:

View File

@ -163,13 +163,15 @@ class MessagingError(Exception):
def pack_error(
exc: BaseException,
tb: str | None = None,
tb: str|None = None,
cid: str|None = None,
) -> dict[str, dict]:
'''
Create an "error message" encoded for wire transport via an IPC
`Channel`; expected to be unpacked on the receiver side using
`unpack_error()` below.
Create an "error message" which boxes a locally caught
exception's meta-data and encodes it for wire transport via an
IPC `Channel`; expected to be unpacked (and thus unboxed) on
the receiver side using `unpack_error()` below.
'''
if tb:
@ -197,7 +199,12 @@ def pack_error(
):
error_msg.update(exc.msgdata)
return {'error': error_msg}
pkt: dict = {'error': error_msg}
if cid:
pkt['cid'] = cid
return pkt
def unpack_error(
@ -207,7 +214,7 @@ def unpack_error(
err_type=RemoteActorError,
hide_tb: bool = True,
) -> None | Exception:
) -> None|Exception:
'''
Unpack an 'error' message from the wire
into a local `RemoteActorError` (subtype).
@ -358,8 +365,7 @@ def _raise_from_no_key_in_msg(
# is activated above.
_type: str = 'Stream' if stream else 'Context'
raise MessagingError(
f'{_type} was expecting a `{expect_key}` message'
' BUT received a non-`error` msg:\n'
f'cid: {cid}\n'
'{pformat(msg)}'
f"{_type} was expecting a '{expect_key}' message"
" BUT received a non-error msg:\n"
f'{pformat(msg)}'
) from src_err

View File

@ -19,13 +19,14 @@ Inter-process comms abstractions
"""
from __future__ import annotations
import platform
import struct
import typing
import platform
from pprint import pformat
from collections.abc import (
AsyncGenerator,
AsyncIterator,
)
import typing
from typing import (
Any,
runtime_checkable,
@ -370,7 +371,10 @@ class Channel:
async def send(self, item: Any) -> None:
log.transport(f"send `{item}`") # type: ignore
log.transport(
'=> send IPC msg:\n\n'
f'{pformat(item)}\n'
) # type: ignore
assert self.msgstream
await self.msgstream.send(item)

View File

@ -39,7 +39,15 @@ import trio
from async_generator import asynccontextmanager
from .trionics import maybe_open_nursery
from ._state import current_actor
from .devx import (
# acquire_debug_lock,
# pause,
maybe_wait_for_debugger,
)
from ._state import (
current_actor,
debug_mode,
)
from ._ipc import Channel
from .log import get_logger
from .msg import NamespacePath
@ -48,6 +56,7 @@ from ._exceptions import (
unpack_error,
NoResult,
ContextCancelled,
RemoteActorError,
)
from ._context import (
Context,
@ -468,7 +477,6 @@ class Portal:
ctx._started_called: bool = True
except KeyError as src_error:
_raise_from_no_key_in_msg(
ctx=ctx,
msg=msg,
@ -493,6 +501,33 @@ class Portal:
# in enter tuple.
yield ctx, first
# between the caller exiting and arriving here the
# far end may have sent a ctxc-msg or other error,
# so check for it here immediately and maybe raise
# so as to engage the ctxc handling block below!
# if re := ctx._remote_error:
# maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err(
# re,
# # TODO: do we want this to always raise?
# # - means that on self-ctxc, if/when the
# # block is exited before the msg arrives
# # but then the msg during __exit__
# # calling we may not activate the
# # ctxc-handler block below? should we
# # be?
# # - if there's a remote error that arrives
# # after the child has exited, we won't
# # handle until the `finally:` block
# # where `.result()` is always called,
# # again in which case we handle it
# # differently then in the handler block
# # that would normally engage from THIS
# # block?
# raise_ctxc_from_self_call=True,
# )
# assert maybe_ctxc
# when in allow_overruns mode there may be
# lingering overflow sender tasks remaining?
if nurse.child_tasks:
@ -538,7 +573,7 @@ class Portal:
# `.canceller: tuple[str, str]` to be same value as
# caught here in a `ContextCancelled.canceller`.
#
# Again, there are 2 cases:
# AGAIN to restate the above, there are 2 cases:
#
# 1-some other context opened in this `.open_context()`
# block cancelled due to a self or peer cancellation
@ -554,6 +589,16 @@ class Portal:
except ContextCancelled as ctxc:
scope_err = ctxc
# XXX TODO XXX: FIX THIS debug_mode BUGGGG!!!
# using this code and then resuming the REPL will
# cause a SIGINT-ignoring HANG!
# -> prolly due to a stale debug lock entry..
# -[ ] USE `.stackscope` to demonstrate that (possibly
# documenting it as a definittive example of
# debugging the tractor-runtime itself using it's
# own `.devx.` tooling!
# await pause()
# CASE 2: context was cancelled by local task calling
# `.cancel()`, we don't raise and the exit block should
# exit silently.
@ -561,18 +606,23 @@ class Portal:
ctx._cancel_called
and (
ctxc is ctx._remote_error
or
ctxc.canceller is self.canceller
# ctxc.msgdata == ctx._remote_error.msgdata
# TODO: uhh `Portal.canceller` ain't a thangg
# dawg? (was `self.canceller` before?!?)
and
ctxc.canceller == self.actor.uid
)
):
log.debug(
f'Context {ctx} cancelled gracefully with:\n'
log.cancel(
f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n'
f'{ctxc}'
)
# CASE 1: this context was never cancelled via a local
# task (tree) having called `Context.cancel()`, raise
# the error since it was caused by someone else!
else:
# await pause()
raise
# the above `._scope` can be cancelled due to:
@ -601,8 +651,8 @@ class Portal:
trio.Cancelled, # NOTE: NOT from inside the ctx._scope
KeyboardInterrupt,
) as err:
scope_err = err
) as caller_err:
scope_err = caller_err
# XXX: ALWAYS request the context to CANCEL ON any ERROR.
# NOTE: `Context.cancel()` is conversely NEVER CALLED in
@ -610,11 +660,26 @@ class Portal:
# handled in the block above!
log.cancel(
'Context cancelled for task due to\n'
f'{err}\n'
f'{caller_err}\n'
'Sending cancel request..\n'
f'task:{cid}\n'
f'actor:{uid}'
)
if debug_mode():
log.pdb(
'Delaying `ctx.cancel()` until debug lock '
'acquired..'
)
# async with acquire_debug_lock(self.actor.uid):
# pass
# TODO: factor ^ into below for non-root cases?
await maybe_wait_for_debugger()
log.pdb(
'Acquired debug lock! '
'Calling `ctx.cancel()`!'
)
try:
await ctx.cancel()
except trio.BrokenResourceError:
@ -628,6 +693,33 @@ class Portal:
# no local scope error, the "clean exit with a result" case.
else:
# between the caller exiting and arriving here the
# far end may have sent a ctxc-msg or other error,
# so check for it here immediately and maybe raise
# so as to engage the ctxc handling block below!
# if re := ctx._remote_error:
# maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err(
# re,
# # TODO: do we want this to always raise?
# # - means that on self-ctxc, if/when the
# # block is exited before the msg arrives
# # but then the msg during __exit__
# # calling we may not activate the
# # ctxc-handler block below? should we
# # be?
# # - if there's a remote error that arrives
# # after the child has exited, we won't
# # handle until the `finally:` block
# # where `.result()` is always called,
# # again in which case we handle it
# # differently then in the handler block
# # that would normally engage from THIS
# # block?
# raise_ctxc_from_self_call=True,
# )
# assert maybe_ctxc
if ctx.chan.connected():
log.info(
'Waiting on final context-task result for\n'
@ -644,13 +736,8 @@ class Portal:
# As per `Context._deliver_msg()`, that error IS
# ALWAYS SET any time "callee" side fails and causes "caller
# side" cancellation via a `ContextCancelled` here.
# result = await ctx.result()
try:
result = await ctx.result()
log.runtime(
f'Context {fn_name} returned value from callee:\n'
f'`{result}`'
)
result_or_err: Exception|Any = await ctx.result()
except BaseException as berr:
# on normal teardown, if we get some error
# raised in `Context.result()` we still want to
@ -662,7 +749,48 @@ class Portal:
scope_err = berr
raise
# an exception type boxed in a `RemoteActorError`
# is returned (meaning it was obvi not raised).
msgdata: str|None = getattr(
result_or_err,
'msgdata',
None
)
# yes! this worx Bp
# from .devx import _debug
# await _debug.pause()
match (msgdata, result_or_err):
case (
{'tb_str': tbstr},
ContextCancelled(),
):
log.cancel(tbstr)
case (
{'tb_str': tbstr},
RemoteActorError(),
):
log.exception(
f'Context `{fn_name}` remotely errored:\n'
f'`{tbstr}`'
)
case (None, _):
log.runtime(
f'Context {fn_name} returned value from callee:\n'
f'`{result_or_err}`'
)
finally:
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
# ``Actor._push_result()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the
# child has already cleared it and clobbered IPC.
await maybe_wait_for_debugger()
# though it should be impossible for any tasks
# operating *in* this scope to have survived
# we tear down the runtime feeder chan last
@ -707,6 +835,10 @@ class Portal:
# out any exception group or legit (remote) ctx
# error that sourced from the remote task or its
# runtime.
#
# NOTE: further, this should be the only place the
# underlying feeder channel is
# once-and-only-CLOSED!
with trio.CancelScope(shield=True):
await ctx._recv_chan.aclose()
@ -747,6 +879,9 @@ class Portal:
# FINALLY, remove the context from runtime tracking and
# exit!
log.runtime(
f'Exiting context opened with {ctx.chan.uid}'
)
self.actor._contexts.pop(
(self.channel.uid, ctx.cid),
None,

View File

@ -28,6 +28,7 @@ from itertools import chain
import importlib
import importlib.util
import inspect
from pprint import pformat
import signal
import sys
from typing import (
@ -48,6 +49,10 @@ import trio
from trio import (
CancelScope,
)
from trio.lowlevel import (
current_task,
Task,
)
from trio_typing import (
Nursery,
TaskStatus,
@ -80,6 +85,26 @@ if TYPE_CHECKING:
log = get_logger('tractor')
_gb_mod: ModuleType|None|False = None
async def maybe_import_gb():
global _gb_mod
if _gb_mod is False:
return
try:
import greenback
_gb_mod = greenback
await greenback.ensure_portal()
except ModuleNotFoundError:
log.warning(
'`greenback` is not installed.\n'
'No sync debug support!'
)
_gb_mod = False
async def _invoke(
@ -227,15 +252,27 @@ async def _invoke(
# wrapper that calls `Context.started()` and then does
# the `await coro()`?
elif context:
# context func with support for bi-dir streaming
await chan.send({'functype': 'context', 'cid': cid})
# a "context" endpoint type is the most general and
# "least sugary" type of RPC ep with support for
# bi-dir streaming B)
await chan.send({
'functype': 'context',
'cid': cid
})
try:
async with trio.open_nursery() as nurse:
ctx._scope_nursery = nurse
ctx._scope = nurse.cancel_scope
task_status.started(ctx)
# TODO: should would be nice to have our
# `TaskMngr` nursery here!
# res: Any = await coro
res = await coro
# deliver final result to caller side.
await chan.send({
'return': res,
'cid': cid
@ -271,9 +308,10 @@ async def _invoke(
# associated child isn't in debug any more
await _debug.maybe_wait_for_debugger()
ctx: Context = actor._contexts.pop((chan.uid, cid))
log.runtime(
f'Context entrypoint {func} was terminated:\n'
f'{ctx}'
log.cancel(
f'Context task was terminated:\n'
f'func: {func}\n'
f'ctx: {pformat(ctx)}'
)
if ctx.cancelled_caught:
@ -285,13 +323,14 @@ async def _invoke(
if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re)
fname: str = func.__name__
# fname: str = func.__name__
task: Task = current_task()
cs: CancelScope = ctx._scope
if cs.cancel_called:
our_uid: tuple = actor.uid
canceller: tuple = ctx.canceller
msg: str = (
f'`{fname}()`@{our_uid} cancelled by '
'actor was cancelled by '
)
# NOTE / TODO: if we end up having
@ -310,16 +349,37 @@ async def _invoke(
# some actor who calls `Portal.cancel_actor()`
# and by side-effect cancels this ctx.
elif canceller == ctx.chan.uid:
msg += f'its caller {canceller} '
msg += 'its caller'
else:
msg += f'remote actor {canceller}'
msg += 'a remote peer'
div_chars: str = '------ - ------'
div_offset: int = (
round(len(msg)/2)+1
+
round(len(div_chars)/2)+1
)
div_str: str = (
'\n'
+
' '*div_offset
+
f'{div_chars}\n'
)
msg += (
div_str +
f'<= canceller: {canceller}\n'
f'=> uid: {our_uid}\n'
f' |_ task: `{task.name}()`'
)
# TODO: does this ever get set any more or can
# we remove it?
if ctx._cancel_msg:
msg += (
' with msg:\n'
'------ - ------\n'
'IPC msg:\n'
f'{ctx._cancel_msg}'
)
@ -439,9 +499,9 @@ async def _invoke(
# always ship errors back to caller
err_msg: dict[str, dict] = pack_error(
err,
tb=tb,
# tb=tb, # TODO: special tb fmting?
cid=cid,
)
err_msg['cid'] = cid
if is_rpc:
try:
@ -508,19 +568,28 @@ async def try_ship_error_to_parent(
err: Union[Exception, BaseExceptionGroup],
) -> None:
'''
Box, pack and encode a local runtime(-internal) exception for
an IPC channel `.send()` with transport/network failures and
local cancellation ignored but logged as critical(ly bad).
'''
with CancelScope(shield=True):
try:
# internal error so ship to parent without cid
await channel.send(pack_error(err))
await channel.send(
# NOTE: normally only used for internal runtime errors
# so ship to peer actor without a cid.
pack_error(err)
)
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
):
# in SC terms this is one of the worst things that can
# happen and creates the 2-general's dilemma.
# happen and provides for a 2-general's dilemma..
log.critical(
f"Failed to ship error to parent "
f"{channel.uid}, channel was closed"
f'Failed to ship error to parent '
f'{channel.uid}, IPC transport failure!'
)
@ -573,6 +642,11 @@ class Actor:
# if started on ``asycio`` running ``trio`` in guest mode
_infected_aio: bool = False
# _ans: dict[
# tuple[str, str],
# list[ActorNursery],
# ] = {}
# Process-global stack closed at end on actor runtime teardown.
# NOTE: this is currently an undocumented public api.
lifetime_stack: ExitStack = ExitStack()
@ -593,7 +667,10 @@ class Actor:
'''
self.name = name
self.uid = (name, uid or str(uuid.uuid4()))
self.uid = (
name,
uid or str(uuid.uuid4())
)
self._cancel_complete = trio.Event()
self._cancel_called_by_remote: tuple[str, tuple] | None = None
@ -762,7 +839,10 @@ class Actor:
return
# channel tracking
event = self._peer_connected.pop(uid, None)
event: trio.Event|None = self._peer_connected.pop(
uid,
None,
)
if event:
# Instructing connection: this is likely a new channel to
# a recently spawned actor which we'd like to control via
@ -771,46 +851,43 @@ class Actor:
# Alert any task waiting on this connection to come up
event.set()
chans = self._peers[uid]
# TODO: re-use channels for new connections instead
# of always new ones; will require changing all the
# discovery funcs
chans: list[Channel] = self._peers[uid]
if chans:
# TODO: re-use channels for new connections instead
# of always new ones?
# => will require changing all the discovery funcs..
log.runtime(
f"already have channel(s) for {uid}:{chans}?"
)
log.runtime(f"Registered {chan} for {uid}") # type: ignore
# append new channel
log.runtime(f"Registered {chan} for {uid}") # type: ignore
# TODO: can we just use list-ref directly?
# chans.append(chan)
self._peers[uid].append(chan)
local_nursery: ActorNursery | None = None # noqa
disconnected: bool = False
# Begin channel management - respond to remote requests and
# process received reponses.
disconnected: bool = False
try:
disconnected = await process_messages(self, chan)
except (
trio.Cancelled,
):
log.cancel(f"Msg loop was cancelled for {chan}")
disconnected: bool = await process_messages(self, chan)
except trio.Cancelled:
log.cancel(f'Msg loop was cancelled for {chan}')
raise
finally:
local_nursery = self._actoruid2nursery.get(uid, local_nursery)
local_nursery: (
ActorNursery|None
) = self._actoruid2nursery.get(uid)
# This is set in ``Portal.cancel_actor()``. So if
# the peer was cancelled we try to wait for them
# to tear down their side of the connection before
# moving on with closing our own side.
if (
local_nursery
):
if local_nursery:
if chan._cancel_called:
log.cancel(f"Waiting on cancel request to peer {chan.uid}")
log.cancel(f'Waiting on cancel request to peer {chan.uid}')
# XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the
# remote peer side since we presume that any channel
@ -853,26 +930,48 @@ class Actor:
# the cause of other downstream errors.
entry = local_nursery._children.get(uid)
if entry:
proc: trio.Process
_, proc, _ = entry
poll = getattr(proc, 'poll', None)
if poll and poll() is None:
log.cancel(
f'Actor {uid} IPC broke but proc is alive?'
f'Peer actor IPC broke but proc is alive?\n'
f'uid: {uid}\n'
f'|_{proc}\n'
)
# ``Channel`` teardown and closure sequence
# Drop ref to channel so it can be gc-ed and disconnected
log.runtime(f"Releasing channel {chan} from {chan.uid}")
log.runtime(
f'Disconnected IPC channel:\n'
f'uid: {chan.uid}\n'
f'|_{pformat(chan)}\n'
)
chans = self._peers.get(chan.uid)
chans.remove(chan)
if not chans:
log.runtime(f"No more channels for {chan.uid}")
log.runtime(
f'No more channels with {chan.uid}'
)
self._peers.pop(uid, None)
log.runtime(f"Peers is {self._peers}")
peers_str: str = ''
for uid, chans in self._peers.items():
peers_str += (
f'- uid: {uid}\n'
)
for i, chan in enumerate(chans):
peers_str += (
f' |_[{i}] {pformat(chan)}\n'
)
log.runtime(
f'Remaining IPC {len(self._peers)} peers:\n'
+ peers_str
)
# No more channels to other actors (at all) registered
# as connected.
@ -888,15 +987,58 @@ class Actor:
if _state.is_root_process():
pdb_lock = _debug.Lock
pdb_lock._blocked.add(uid)
log.runtime(f"{uid} blocked from pdb locking")
# TODO: NEEEDS TO BE TESTED!
# actually, no idea if this ever even enters.. XD
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
if (
pdb_user_uid
and local_nursery
):
entry: tuple|None = local_nursery._children.get(pdb_user_uid)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
(poll := getattr(proc, 'poll', None))
and poll() is None
):
log.cancel(
'Root actor reports no-more-peers, BUT '
'a DISCONNECTED child still has the debug '
'lock!\n'
f'root uid: {self.uid}\n'
f'last disconnected child uid: {uid}\n'
f'locking child uid: {pdb_user_uid}\n'
)
await _debug.maybe_wait_for_debugger(
child_in_debug=True
)
# TODO: just bc a child's transport dropped
# doesn't mean it's not still using the pdb
# REPL! so,
# -[ ] ideally we can check out child proc
# tree to ensure that its alive (and
# actually using the REPL) before we cancel
# it's lock acquire by doing the below!
# -[ ] create a way to read the tree of each actor's
# grandchildren such that when an
# intermediary parent is cancelled but their
# child has locked the tty, the grandparent
# will not allow the parent to cancel or
# zombie reap the child! see open issue:
# - https://github.com/goodboy/tractor/issues/320
# ------ - ------
# if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for
# the lock.
db_cs = pdb_lock._root_local_task_cs_in_debug
db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug
if (
db_cs
and not db_cs.cancel_called
and uid == pdb_user_uid
):
log.warning(
f'STALE DEBUG LOCK DETECTED FOR {uid}'
@ -905,13 +1047,22 @@ class Actor:
db_cs.cancel()
# XXX: is this necessary (GC should do it)?
if chan.connected():
# XXX WARNING XXX
# Be AWARE OF THE INDENT LEVEL HERE
# -> ONLY ENTER THIS BLOCK WHEN ._peers IS
# EMPTY!!!!
if (
not self._peers
and chan.connected()
):
# if the channel is still connected it may mean the far
# end has not closed and we may have gotten here due to
# an error and so we should at least try to terminate
# the channel from this end gracefully.
log.runtime(f"Disconnecting channel {chan}")
log.runtime(
'Terminating channel with `None` setinel msg\n'
f'|_{chan}\n'
)
try:
# send a msg loop terminate sentinel
await chan.send(None)
@ -928,15 +1079,16 @@ class Actor:
chan: Channel,
cid: str,
msg: dict[str, Any],
) -> None:
) -> None|bool:
'''
Push an RPC result to the local consumer's queue.
'''
uid = chan.uid
uid: tuple[str, str] = chan.uid
assert uid, f"`chan.uid` can't be {uid}"
try:
ctx = self._contexts[(uid, cid)]
ctx: Context = self._contexts[(uid, cid)]
except KeyError:
log.warning(
f'Ignoring msg from [no-longer/un]known context {uid}:'
@ -1066,6 +1218,16 @@ class Actor:
parent_data.pop('bind_port'),
)
rvs = parent_data.pop('_runtime_vars')
if rvs['_debug_mode']:
try:
from .devx import enable_stack_on_sig
enable_stack_on_sig()
except ImportError:
log.warning(
'`stackscope` not installed for use in debug mode!'
)
log.runtime(f"Runtime vars are: {rvs}")
rvs['_is_root'] = False
_state._runtime_vars.update(rvs)
@ -1284,9 +1446,15 @@ class Actor:
'''
tasks: dict = self._rpc_tasks
if tasks:
tasks_str: str = ''
for (ctx, func, _) in tasks.values():
tasks_str += (
f' |_{func.__name__}() [cid={ctx.cid[-6:]}..]\n'
)
log.cancel(
f'Cancelling all {len(tasks)} rpc tasks:\n'
f'{tasks}'
f'{tasks_str}'
)
for (
(chan, cid),
@ -1511,7 +1679,10 @@ async def async_main(
)
if actor._parent_chan:
await try_ship_error_to_parent(actor._parent_chan, err)
await try_ship_error_to_parent(
actor._parent_chan,
err,
)
# always!
match err:
@ -1595,43 +1766,53 @@ async def process_messages(
or boxed errors back to the remote caller (task).
'''
# TODO: once https://github.com/python-trio/trio/issues/467 gets
# worked out we'll likely want to use that!
msg: dict | None = None
# TODO: once `trio` get's an "obvious way" for req/resp we
# should use it?
# https://github.com/python-trio/trio/issues/467
log.runtime(
'Entering IPC msg loop:\n'
f'peer: {chan.uid}\n'
f'|_{chan}'
)
nursery_cancelled_before_task: bool = False
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
msg: dict | None = None
try:
# NOTE: this internal scope allows for keeping this
# message loop running despite the current task having
# been cancelled (eg. `open_portal()` may call this method
# from a locally spawned task) and recieve this scope
# using ``scope = Nursery.start()``
with CancelScope(shield=shield) as loop_cs:
# this internal scope allows for keeping this message
# loop running despite the current task having been
# cancelled (eg. `open_portal()` may call this method from
# a locally spawned task) and recieve this scope using
# ``scope = Nursery.start()``
task_status.started(loop_cs)
async for msg in chan:
if msg is None: # loop terminate sentinel
# dedicated loop terminate sentinel
if msg is None:
tasks: dict[
tuple[Channel, str],
tuple[Context, Callable, trio.Event]
] = actor._rpc_tasks.copy()
log.cancel(
f"Channel to {chan.uid} terminated?\n"
"Cancelling all associated tasks..")
for (channel, cid) in actor._rpc_tasks.copy():
f'Peer IPC channel terminated via `None` setinel msg?\n'
f'=> Cancelling all {len(tasks)} local RPC tasks..\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'
)
for (channel, cid) in tasks:
if channel is chan:
await actor._cancel_task(
cid,
channel,
)
log.runtime(
f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}")
break
log.transport( # type: ignore
f"Received msg {msg} from {chan.uid}")
f'<= IPC msg from peer: {chan.uid}\n\n'
# TODO: conditionally avoid fmting depending
# on log level (for perf)?
f'{pformat(msg)}\n'
)
cid = msg.get('cid')
if cid:
@ -1640,7 +1821,10 @@ async def process_messages(
await actor._push_result(chan, cid, msg)
log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}")
f'Waiting on next IPC msg from {chan.uid}:\n'
# f'last msg: {msg}\n'
f'|_{chan}'
)
continue
# TODO: implement with ``match:`` syntax?
@ -1693,7 +1877,7 @@ async def process_messages(
)
log.cancel(
f'Cancelling msg loop for {chan.uid}'
f'Cancelling IPC msg-loop with {chan.uid}'
)
loop_cs.cancel()
break
@ -1735,8 +1919,10 @@ async def process_messages(
try:
func = actor._get_rpc_func(ns, funcname)
except (ModuleNotExposed, AttributeError) as err:
err_msg = pack_error(err)
err_msg['cid'] = cid
err_msg: dict[str, dict] = pack_error(
err,
cid=cid,
)
await chan.send(err_msg)
continue
@ -1838,7 +2024,10 @@ async def process_messages(
log.exception("Actor errored:")
if actor._parent_chan:
await try_ship_error_to_parent(actor._parent_chan, err)
await try_ship_error_to_parent(
actor._parent_chan,
err,
)
# if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints"
@ -1847,8 +2036,9 @@ async def process_messages(
finally:
# msg debugging for when he machinery is brokey
log.runtime(
f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}"
f'Exiting IPC msg loop with {chan.uid} '
f'final msg: {msg}\n'
f'|_{chan}'
)
# transport **was not** disconnected

View File

@ -144,7 +144,7 @@ async def exhaust_portal(
# XXX: streams should never be reaped here since they should
# always be established and shutdown using a context manager api
final = await portal.result()
final: Any = await portal.result()
except (
Exception,
@ -152,13 +152,23 @@ async def exhaust_portal(
) as err:
# we reraise in the parent task via a ``BaseExceptionGroup``
return err
except trio.Cancelled as err:
# lol, of course we need this too ;P
# TODO: merge with above?
log.warning(f"Cancelled result waiter for {portal.actor.uid}")
log.warning(
'Cancelled portal result waiter task:\n'
f'uid: {portal.channel.uid}\n'
f'error: {err}\n'
)
return err
else:
log.debug(f"Returning final result: {final}")
log.debug(
f'Returning final result from portal:\n'
f'uid: {portal.channel.uid}\n'
f'result: {final}\n'
)
return final
@ -170,26 +180,34 @@ async def cancel_on_completion(
) -> None:
'''
Cancel actor gracefully once it's "main" portal's
Cancel actor gracefully once its "main" portal's
result arrives.
Should only be called for actors spawned with `run_in_actor()`.
Should only be called for actors spawned via the
`Portal.run_in_actor()` API.
=> and really this API will be deprecated and should be
re-implemented as a `.hilevel.one_shot_task_nursery()`..)
'''
# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
# an exception group and we still send out a cancel request
result = await exhaust_portal(portal, actor)
result: Any|Exception = await exhaust_portal(portal, actor)
if isinstance(result, Exception):
errors[actor.uid] = result
errors[actor.uid]: Exception = result
log.warning(
f"Cancelling {portal.channel.uid} after error {result}"
'Cancelling subactor due to error:\n'
f'uid: {portal.channel.uid}\n'
f'error: {result}\n'
)
else:
log.runtime(
f"Cancelling {portal.channel.uid} gracefully "
f"after result {result}")
'Cancelling subactor gracefully:\n'
f'uid: {portal.channel.uid}\n'
f'result: {result}\n'
)
# cancel the process now that we have a final result
await portal.cancel_actor()
@ -219,11 +237,14 @@ async def do_hard_kill(
to be handled.
'''
log.cancel(
'Terminating sub-proc:\n'
f'|_{proc}\n'
)
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
# a hard-kill time ultimatum.
log.debug(f"Terminating {proc}")
with trio.move_on_after(terminate_after) as cs:
# NOTE: code below was copied verbatim from the now deprecated
@ -260,7 +281,10 @@ async def do_hard_kill(
# zombies (as a feature) we ask the OS to do send in the
# removal swad as the last resort.
if cs.cancelled_caught:
log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}")
log.critical(
'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'
f'|_{proc}\n'
)
proc.kill()
@ -281,10 +305,16 @@ async def soft_wait(
join/reap on an actor-runtime-in-process.
'''
uid = portal.channel.uid
uid: tuple[str, str] = portal.channel.uid
try:
log.cancel(f'Soft waiting on actor:\n{uid}')
log.cancel(
'Soft waiting on sub-actor proc:\n'
f'uid: {uid}\n'
f'|_{proc}\n'
)
# wait on sub-proc to signal termination
await wait_func(proc)
except trio.Cancelled:
# if cancelled during a soft wait, cancel the child
# actor before entering the hard reap sequence
@ -296,8 +326,8 @@ async def soft_wait(
async def cancel_on_proc_deth():
'''
Cancel the actor cancel request if we detect that
that the process terminated.
"Cancel the (actor) cancel" request if we detect
that that the underlying sub-process terminated.
'''
await wait_func(proc)
@ -314,10 +344,10 @@ async def soft_wait(
if proc.poll() is None: # type: ignore
log.warning(
'Actor still alive after cancel request:\n'
f'{uid}'
'Subactor still alive after cancel request?\n\n'
f'uid: {uid}\n'
f'|_{proc}\n'
)
n.cancel_scope.cancel()
raise
@ -341,7 +371,7 @@ async def new_proc(
) -> None:
# lookup backend spawning target
target = _methods[_spawn_method]
target: Callable = _methods[_spawn_method]
# mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method
@ -492,8 +522,9 @@ async def trio_proc(
# cancel result waiter that may have been spawned in
# tandem if not done already
log.cancel(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
'Cancelling existing result waiter task for '
f'{subactor.uid}'
)
nursery.cancel_scope.cancel()
finally:
@ -511,7 +542,16 @@ async def trio_proc(
with trio.move_on_after(0.5):
await proc.wait()
if is_root_process():
log.pdb(
'Delaying subproc reaper while debugger locked..'
)
await maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get(
'_debug_mode', False
),
# TODO: need a diff value then default?
# poll_steps=9999999,
)
# TODO: solve the following issue where we need
# to do a similar wait like this but in an
# "intermediary" parent actor that itself isn't
@ -519,10 +559,18 @@ async def trio_proc(
# to hold off on relaying SIGINT until that child
# is complete.
# https://github.com/goodboy/tractor/issues/320
await maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get(
'_debug_mode', False),
)
# -[ ] we need to handle non-root parent-actors specially
# by somehow determining if a child is in debug and then
# avoiding cancel/kill of said child by this
# (intermediary) parent until such a time as the root says
# the pdb lock is released and we are good to tear down
# (our children)..
#
# -[ ] so maybe something like this where we try to
# acquire the lock and get notified of who has it,
# check that uid against our known children?
# this_uid: tuple[str, str] = current_actor().uid
# await acquire_debug_lock(this_uid)
if proc.poll() is None:
log.cancel(f"Attempting to hard kill {proc}")

View File

@ -21,8 +21,9 @@ The machinery and types behind ``Context.open_stream()``
'''
from __future__ import annotations
import inspect
from contextlib import asynccontextmanager as acm
import inspect
from pprint import pformat
from typing import (
Any,
Callable,
@ -35,6 +36,7 @@ import trio
from ._exceptions import (
_raise_from_no_key_in_msg,
ContextCancelled,
)
from .log import get_logger
from .trionics import (
@ -84,8 +86,8 @@ class MsgStream(trio.abc.Channel):
self._broadcaster = _broadcaster
# flag to denote end of stream
self._eoc: bool = False
self._closed: bool = False
self._eoc: bool|trio.EndOfChannel = False
self._closed: bool|trio.ClosedResourceError = False
# delegate directly to underlying mem channel
def receive_nowait(self):
@ -93,6 +95,9 @@ class MsgStream(trio.abc.Channel):
try:
return msg['yield']
except KeyError as kerr:
# if 'return' in msg:
# return msg
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
@ -122,16 +127,26 @@ class MsgStream(trio.abc.Channel):
# see ``.aclose()`` for notes on the old behaviour prior to
# introducing this
if self._eoc:
raise trio.EndOfChannel
raise self._eoc
# raise trio.EndOfChannel
if self._closed:
raise trio.ClosedResourceError('This stream was closed')
raise self._closed
# raise trio.ClosedResourceError(
# 'This stream was already closed'
# )
src_err: Exception|None = None
try:
try:
msg = await self._rx_chan.receive()
return msg['yield']
except KeyError as kerr:
src_err = kerr
# NOTE: may raise any of the below error types
# includg EoC when a 'stop' msg is found.
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
@ -141,11 +156,14 @@ class MsgStream(trio.abc.Channel):
stream=self,
)
except (
trio.ClosedResourceError, # by self._rx_chan
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
):
# XXX: we close the stream on any of these error conditions:
except (
# trio.ClosedResourceError, # by self._rx_chan
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
) as eoc:
src_err = eoc
self._eoc = eoc
# await trio.sleep(1)
# a ``ClosedResourceError`` indicates that the internal
# feeder memory receive channel was closed likely by the
@ -168,14 +186,53 @@ class MsgStream(trio.abc.Channel):
# closing this stream and not flushing a final value to
# remaining (clone) consumers who may not have been
# scheduled to receive it yet.
# try:
# maybe_err_msg_or_res: dict = self._rx_chan.receive_nowait()
# if maybe_err_msg_or_res:
# log.warning(
# 'Discarding un-processed msg:\n'
# f'{maybe_err_msg_or_res}'
# )
# except trio.WouldBlock:
# # no queued msgs that might be another remote
# # error, so just raise the original EoC
# pass
# raise eoc
except trio.ClosedResourceError as cre: # by self._rx_chan
src_err = cre
log.warning(
'`Context._rx_chan` was already closed?'
)
self._closed = cre
# when the send is closed we assume the stream has
# terminated and signal this local iterator to stop
await self.aclose()
drained: list[Exception|dict] = await self.aclose()
if drained:
log.warning(
'Drained context msgs during closure:\n'
f'{drained}'
)
# TODO: pass these to the `._ctx._drained_msgs: deque`
# and then iterate them as part of any `.result()` call?
raise # propagate
# NOTE XXX: if the context was cancelled or remote-errored
# but we received the stream close msg first, we
# probably want to instead raise the remote error
# over the end-of-stream connection error since likely
# the remote error was the source cause?
ctx: Context = self._ctx
if re := ctx._remote_error:
ctx._maybe_raise_remote_err(
re,
raise_ctxc_from_self_call=True,
)
async def aclose(self):
raise src_err # propagate
async def aclose(self) -> list[Exception|dict]:
'''
Cancel associated remote actor task and local memory channel on
close.
@ -185,15 +242,55 @@ class MsgStream(trio.abc.Channel):
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
rx_chan = self._rx_chan
if rx_chan._closed:
log.cancel(f"{self} is already closed")
if (
rx_chan._closed
or
self._closed
):
log.cancel(
f'`MsgStream` is already closed\n'
f'.cid: {self._ctx.cid}\n'
f'._rx_chan`: {rx_chan}\n'
f'._eoc: {self._eoc}\n'
f'._closed: {self._eoc}\n'
)
# this stream has already been closed so silently succeed as
# per ``trio.AsyncResource`` semantics.
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
return
return []
self._eoc = True
ctx: Context = self._ctx
# caught_eoc: bool = False
drained: list[Exception|dict] = []
while not drained:
try:
maybe_final_msg = self.receive_nowait()
if maybe_final_msg:
log.cancel(
'Drained un-processed stream msg:\n'
f'{pformat(maybe_final_msg)}'
)
# TODO: inject into parent `Context` buf?
drained.append(maybe_final_msg)
except trio.WouldBlock as be:
drained.append(be)
break
except trio.EndOfChannel as eoc:
drained.append(eoc)
# caught_eoc = True
self._eoc: bool = eoc
break
except ContextCancelled as ctxc:
log.cancel(
'Context was cancelled during stream closure:\n'
f'canceller: {ctxc.canceller}\n'
f'{pformat(ctxc.msgdata)}'
)
break
# NOTE: this is super subtle IPC messaging stuff:
# Relay stop iteration to far end **iff** we're
@ -224,26 +321,33 @@ class MsgStream(trio.abc.Channel):
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
) as re:
# the underlying channel may already have been pulled
# in which case our stop message is meaningless since
# it can't traverse the transport.
ctx = self._ctx
log.warning(
f'Stream was already destroyed?\n'
f'actor: {ctx.chan.uid}\n'
f'ctx id: {ctx.cid}'
)
drained.append(re)
self._closed = re
self._closed = True
# if caught_eoc:
# # from .devx import _debug
# # await _debug.pause()
# with trio.CancelScope(shield=True):
# await rx_chan.aclose()
# Do we close the local mem chan ``self._rx_chan`` ??!?
# self._eoc: bool = caught_eoc
# NO, DEFINITELY NOT if we're a bi-dir ``MsgStream``!
# BECAUSE this same core-msg-loop mem recv-chan is used to deliver
# the potential final result from the surrounding inter-actor
# `Context` so we don't want to close it until that context has
# run to completion.
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
# => NO, DEFINITELY NOT! <=
# if we're a bi-dir ``MsgStream`` BECAUSE this same
# core-msg-loop mem recv-chan is used to deliver the
# potential final result from the surrounding inter-actor
# `Context` so we don't want to close it until that
# context has run to completion.
# XXX: Notes on old behaviour:
# await rx_chan.aclose()
@ -272,6 +376,8 @@ class MsgStream(trio.abc.Channel):
# runtime's closure of ``rx_chan`` in the case where we may
# still need to consume msgs that are "in transit" from the far
# end (eg. for ``Context.result()``).
# self._closed = True
return drained
@acm
async def subscribe(
@ -337,9 +443,13 @@ class MsgStream(trio.abc.Channel):
raise self._ctx._remote_error # from None
if self._closed:
raise trio.ClosedResourceError('This stream was already closed')
raise self._closed
# raise trio.ClosedResourceError('This stream was already closed')
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
await self._ctx.chan.send({
'yield': data,
'cid': self._ctx.cid,
})
def stream(func: Callable) -> Callable:

View File

@ -157,7 +157,7 @@ class ActorNursery:
# start a task to spawn a process
# blocks until process has been started and a portal setup
nursery = nursery or self._da_nursery
nursery: trio.Nursery = nursery or self._da_nursery
# XXX: the type ignore is actually due to a `mypy` bug
return await nursery.start( # type: ignore
@ -233,12 +233,14 @@ class ActorNursery:
return portal
async def cancel(self, hard_kill: bool = False) -> None:
"""Cancel this nursery by instructing each subactor to cancel
'''
Cancel this nursery by instructing each subactor to cancel
itself and wait for all subactors to terminate.
If ``hard_killl`` is set to ``True`` then kill the processes
directly without any far end graceful ``trio`` cancellation.
"""
'''
self.cancelled = True
log.cancel(f"Cancelling nursery in {self._actor.uid}")
@ -246,7 +248,14 @@ class ActorNursery:
async with trio.open_nursery() as nursery:
for subactor, proc, portal in self._children.values():
subactor: Actor
proc: trio.Process
portal: Portal
for (
subactor,
proc,
portal,
) in self._children.values():
# TODO: are we ever even going to use this or
# is the spawning backend responsible for such
@ -286,8 +295,16 @@ class ActorNursery:
# then hard kill all sub-processes
if cs.cancelled_caught:
log.error(
f"Failed to cancel {self}\nHard killing process tree!")
for subactor, proc, portal in self._children.values():
f'Failed to cancel {self}\nHard killing process tree!'
)
subactor: Actor
proc: trio.Process
portal: Portal
for (
subactor,
proc,
portal,
) in self._children.values():
log.warning(f"Hard killing process {proc}")
proc.terminate()
@ -384,7 +401,17 @@ async def _open_and_supervise_one_cancels_all_nursery(
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with")
"errored with\n"
# TODO: same thing as in
# `._invoke()` to compute how to
# place this div-line in the
# middle of the above msg
# content..
# -[ ] prolly helper-func it too
# in our `.log` module..
# '------ - ------'
)
# cancel all subactors
await anursery.cancel()

View File

@ -289,11 +289,19 @@ def get_console_log(
if not level:
return log
log.setLevel(level.upper() if not isinstance(level, int) else level)
log.setLevel(
level.upper()
if not isinstance(level, int)
else level
)
if not any(
handler.stream == sys.stderr # type: ignore
for handler in logger.handlers if getattr(handler, 'stream', None)
for handler in logger.handlers if getattr(
handler,
'stream',
None,
)
):
handler = logging.StreamHandler()
formatter = colorlog.ColoredFormatter(