Compare commits

...

7 Commits

Author SHA1 Message Date
Tyler Goodlet 338ea5529c .log: more multi-line styling 2025-03-14 16:41:08 -04:00
Tyler Goodlet 6bc67338cf Better subproc supervisor logging, todo for #320
Given i just similarly revamped a buncha `._runtime` log msg formatting,
might as well do something similar inside the spawning machinery such
that groking teardown sequences of each supervising task is much more
sane XD

Mostly this includes doing similar `'<field>: <value>\n'` multi-line
formatting when reporting various subproc supervision steps as well as
showing a detailed `trio.Process.__repr__()` as appropriate.

Also adds a detailed #TODO according to the needs of #320 for which
we're going to need some internal mechanism for intermediary parent
actors to determine if a given debug tty locker (sub-actor) is one of
*their* (transitive) children and thus stall the normal
cancellation/teardown sequence until that locker is complete.
2025-03-14 16:41:06 -04:00
Tyler Goodlet fd20004757 _supervise: iter nice expanded multi-line `._children` tups with typing 2025-03-14 16:34:17 -04:00
Tyler Goodlet ddc2e5f0f8 WIP: solved the modden client hang.. 2025-03-14 16:34:10 -04:00
Tyler Goodlet 4b0aa5e379 Baboso! fix `chan.send(None)` indent.. 2025-03-14 15:49:37 -04:00
Tyler Goodlet 6a303358df Improved log msg formatting in core
As part of solving some final edge cases todo with inter-peer remote
cancellation (particularly a remote cancel from a separate actor
tree-client hanging on the request side in `modden`..) I needed less
dense, more line-delimited log msg formats when understanding ipc
channel and context cancels from console logging; this adds a ton of
that to:
- `._invoke()` which now does,
  - better formatting of `Context`-task info as multi-line
    `'<field>: <value>\n'` messages,
  - use of `trio.Task` (from `.lowlevel.current_task()` for full
    rpc-func namespace-path info,
  - better "msg flow annotations" with `<=` for understanding
    `ContextCancelled` flow.
- `Actor._stream_handler()` where in we break down IPC peers reporting
  better as multi-line `|_<Channel>` log msgs instead of all jammed on
  one line..
- `._ipc.Channel.send()` use `pformat()` for repr of packet.

Also tweak some optional deps imports for debug mode:
- add `maybe_import_gb()` for attempting to import `greenback`.
- maybe enable `stackscope` tree pprinter on `SIGUSR1` if installed.

Add a further stale-debugger-lock guard before removal:
- read the `._debug.Lock.global_actor_in_debug: tuple` uid and possibly
  `maybe_wait_for_debugger()` when the child-user is known to have
  a live process in our tree.
- only cancel `Lock._root_local_task_cs_in_debug: CancelScope` when
  the disconnected channel maps to the `Lock.global_actor_in_debug`,
  though not sure this is correct yet?

Started adding missing type annots in sections that were modified.
2025-03-14 15:49:36 -04:00
Tyler Goodlet c85757aee1 Let `pack_error()` take a msg injected `cid: str|None` 2025-03-14 15:31:16 -04:00
9 changed files with 1225 additions and 313 deletions

View File

@ -43,12 +43,17 @@ import warnings
import trio import trio
# from .devx import (
# maybe_wait_for_debugger,
# pause,
# )
from ._exceptions import ( from ._exceptions import (
# _raise_from_no_key_in_msg, # _raise_from_no_key_in_msg,
unpack_error, unpack_error,
pack_error, pack_error,
ContextCancelled, ContextCancelled,
# MessagingError, # MessagingError,
RemoteActorError,
StreamOverrun, StreamOverrun,
) )
from .log import get_logger from .log import get_logger
@ -64,6 +69,164 @@ if TYPE_CHECKING:
log = get_logger(__name__) log = get_logger(__name__)
async def _drain_to_final_msg(
ctx: Context,
) -> list[dict]:
# ) -> tuple[
# Any|Exception,
# list[dict],
# ]:
raise_overrun: bool = not ctx._allow_overruns
# wait for a final context result by collecting (but
# basically ignoring) any bi-dir-stream msgs still in transit
# from the far end.
pre_result_drained: list[dict] = []
while not ctx._remote_error:
try:
# NOTE: this REPL usage actually works here dawg! Bo
# from .devx._debug import pause
# await pause()
# if re := ctx._remote_error:
# ctx._maybe_raise_remote_err(
# re,
# # NOTE: obvi we don't care if we
# # overran the far end if we're already
# # waiting on a final result (msg).
# raise_overrun_from_self=raise_overrun,
# )
# TODO: bad idea?
# with trio.CancelScope() as res_cs:
# ctx._res_scope = res_cs
# msg: dict = await ctx._recv_chan.receive()
# if res_cs.cancelled_caught:
# from .devx._debug import pause
# await pause()
msg: dict = await ctx._recv_chan.receive()
ctx._result: Any = msg['return']
log.runtime(
'Context delivered final result msg:\n'
f'{pformat(msg)}'
)
pre_result_drained.append(msg)
# NOTE: we don't need to do this right?
# XXX: only close the rx mem chan AFTER
# a final result is retreived.
# if ctx._recv_chan:
# await ctx._recv_chan.aclose()
break
# NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases:
# 1. we requested the cancellation and thus
# SHOULD NOT raise that far end error,
# 2. WE DID NOT REQUEST that cancel and thus
# SHOULD RAISE HERE!
except trio.Cancelled:
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is
# the source cause of this local task's
# cancellation.
if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re)
# CASE 1: we DID request the cancel we simply
# continue to bubble up as normal.
raise
except KeyError:
if 'yield' in msg:
# far end task is still streaming to us so discard
log.warning(f'Discarding std "yield"\n{msg}')
pre_result_drained.append(msg)
continue
# TODO: work out edge cases here where
# a stream is open but the task also calls
# this?
# -[ ] should be a runtime error if a stream is open
# right?
elif 'stop' in msg:
log.cancel(
'Remote stream terminated due to "stop" msg:\n'
f'{msg}'
)
pre_result_drained.append(msg)
continue
# internal error should never get here
assert msg.get('cid'), (
"Received internal error at portal?"
)
# XXX fallthrough to handle expected error XXX
re: Exception|None = ctx._remote_error
if re:
log.critical(
'Remote ctx terminated due to "error" msg:\n'
f'{re}'
)
assert msg is ctx._cancel_msg
# NOTE: this solved a super dupe edge case XD
# this was THE super duper edge case of:
# - local task opens a remote task,
# - requests remote cancellation of far end
# ctx/tasks,
# - needs to wait for the cancel ack msg
# (ctxc) or some result in the race case
# where the other side's task returns
# before the cancel request msg is ever
# rxed and processed,
# - here this surrounding drain loop (which
# iterates all ipc msgs until the ack or
# an early result arrives) was NOT exiting
# since we are the edge case: local task
# does not re-raise any ctxc it receives
# IFF **it** was the cancellation
# requester..
# will raise if necessary, ow break from
# loop presuming any error terminates the
# context!
ctx._maybe_raise_remote_err(
re,
# NOTE: obvi we don't care if we
# overran the far end if we're already
# waiting on a final result (msg).
# raise_overrun_from_self=False,
raise_overrun_from_self=raise_overrun,
)
break # OOOOOF, yeah obvi we need this..
# XXX we should never really get here
# right! since `._deliver_msg()` should
# always have detected an {'error': ..}
# msg and already called this right!?!
elif error := unpack_error(
msg=msg,
chan=ctx._portal.channel,
hide_tb=False,
):
log.critical('SHOULD NEVER GET HERE!?')
assert msg is ctx._cancel_msg
assert error.msgdata == ctx._remote_error.msgdata
from .devx._debug import pause
await pause()
ctx._maybe_cancel_and_set_remote_error(error)
ctx._maybe_raise_remote_err(error)
else:
# bubble the original src key error
raise
return pre_result_drained
# TODO: make this a msgspec.Struct! # TODO: make this a msgspec.Struct!
@dataclass @dataclass
class Context: class Context:
@ -118,6 +281,7 @@ class Context:
# which is exactly the primitive that allows for # which is exactly the primitive that allows for
# cross-actor-task-supervision and thus SC. # cross-actor-task-supervision and thus SC.
_scope: trio.CancelScope | None = None _scope: trio.CancelScope | None = None
# _res_scope: trio.CancelScope|None = None
# on a clean exit there should be a final value # on a clean exit there should be a final value
# delivered from the far end "callee" task, so # delivered from the far end "callee" task, so
@ -205,6 +369,10 @@ class Context:
) )
) )
# @property
# def is_waiting_result(self) -> bool:
# return bool(self._res_scope)
@property @property
def side(self) -> str: def side(self) -> str:
''' '''
@ -247,7 +415,11 @@ class Context:
await self.chan.send({'yield': data, 'cid': self.cid}) await self.chan.send({'yield': data, 'cid': self.cid})
async def send_stop(self) -> None: async def send_stop(self) -> None:
await self.chan.send({'stop': True, 'cid': self.cid}) # await pause()
await self.chan.send({
'stop': True,
'cid': self.cid
})
def _maybe_cancel_and_set_remote_error( def _maybe_cancel_and_set_remote_error(
self, self,
@ -320,27 +492,37 @@ class Context:
# XXX: set the remote side's error so that after we cancel # XXX: set the remote side's error so that after we cancel
# whatever task is the opener of this context it can raise # whatever task is the opener of this context it can raise
# that error as the reason. # that error as the reason.
# if self._remote_error:
# return
# breakpoint()
log.cancel(
'Setting remote error for ctx \n'
f'<= remote ctx uid: {self.chan.uid}\n'
f'=>\n{error}'
)
self._remote_error: BaseException = error self._remote_error: BaseException = error
if ( if (
isinstance(error, ContextCancelled) isinstance(error, ContextCancelled)
): ):
# always record the cancelling actor's uid since its cancellation
# state is linked and we want to know which process was
# the cause / requester of the cancellation.
self._canceller = error.canceller
log.cancel( log.cancel(
'Remote task-context was cancelled for ' 'Remote task-context was cancelled for '
f'actor: {self.chan.uid}\n' f'actor: {self.chan.uid}\n'
f'task: {self.cid}\n' f'task: {self.cid}\n'
f'canceller: {error.canceller}\n' f'canceller: {error.canceller}\n'
) )
# always record the cancelling actor's uid since its cancellation
# state is linked and we want to know which process was
# the cause / requester of the cancellation.
# if error.canceller is None:
# import pdbp; pdbp.set_trace()
# breakpoint()
self._canceller = error.canceller
if self._cancel_called: if self._cancel_called:
# from ._debug import breakpoint
# await breakpoint()
# this is an expected cancel request response message # this is an expected cancel request response message
# and we **don't need to raise it** in local cancel # and we **don't need to raise it** in local cancel
# scope since it will potentially override a real error. # scope since it will potentially override a real error.
@ -348,10 +530,11 @@ class Context:
else: else:
log.error( log.error(
f'Remote context error,\n' f'Remote context error:\n'
f'remote actor: {self.chan.uid}\n' f'{error}\n'
f'task: {self.cid}\n' f'{pformat(self)}\n'
f'{error}' # f'remote actor: {self.chan.uid}\n'
# f'cid: {self.cid}\n'
) )
self._canceller = self.chan.uid self._canceller = self.chan.uid
@ -376,9 +559,11 @@ class Context:
self._scope.cancel() self._scope.cancel()
# NOTE: this REPL usage actually works here dawg! Bo # NOTE: this REPL usage actually works here dawg! Bo
# from .devx._debug import pause
# await pause() # await pause()
# TODO: maybe we have to use `._res_scope.cancel()` if it
# exists?
async def cancel( async def cancel(
self, self,
timeout: float = 0.616, timeout: float = 0.616,
@ -395,6 +580,8 @@ class Context:
log.cancel( log.cancel(
f'Cancelling {side} side of context to {self.chan.uid}' f'Cancelling {side} side of context to {self.chan.uid}'
) )
# await pause()
self._cancel_called: bool = True self._cancel_called: bool = True
# caller side who entered `Portal.open_context()` # caller side who entered `Portal.open_context()`
@ -484,13 +671,11 @@ class Context:
''' '''
actor: Actor = current_actor() actor: Actor = current_actor()
# here we create a mem chan that corresponds to the # If the surrounding context has been cancelled by some
# far end caller / callee. # task with a handle to THIS, we error here immediately
# since it likely means the surrounding lexical-scope has
# Likewise if the surrounding context has been cancelled we error here # errored, been `trio.Cancelled` or at the least
# since it likely means the surrounding block was exited or # `Context.cancel()` was called by some task.
# killed
if self._cancel_called: if self._cancel_called:
# XXX NOTE: ALWAYS RAISE any remote error here even if # XXX NOTE: ALWAYS RAISE any remote error here even if
@ -503,6 +688,11 @@ class Context:
# actually try to stream - a cancel msg was already # actually try to stream - a cancel msg was already
# sent to the other side! # sent to the other side!
if self._remote_error: if self._remote_error:
# NOTE: this is diff then calling
# `._maybe_raise_from_remote_msg()` specifically
# because any task entering this `.open_stream()`
# AFTER cancellation has already been requested,
# we DO NOT want to absorb any ctxc ACK silently!
raise self._remote_error raise self._remote_error
# XXX NOTE: if no `ContextCancelled` has been responded # XXX NOTE: if no `ContextCancelled` has been responded
@ -529,7 +719,7 @@ class Context:
# to send a stop from the caller to the callee in the # to send a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error # single-direction-stream case you'll get a lookup error
# currently. # currently.
ctx = actor.get_context( ctx: Context = actor.get_context(
self.chan, self.chan,
self.cid, self.cid,
msg_buffer_size=msg_buffer_size, msg_buffer_size=msg_buffer_size,
@ -548,6 +738,19 @@ class Context:
'The underlying channel for this stream was already closed!?' 'The underlying channel for this stream was already closed!?'
) )
# NOTE: implicitly this will call `MsgStream.aclose()` on
# `.__aexit__()` due to stream's parent `Channel` type!
#
# XXX NOTE XXX: ensures the stream is "one-shot use",
# which specifically means that on exit,
# - signal ``trio.EndOfChannel``/``StopAsyncIteration`` to
# the far end indicating that the caller exited
# the streaming context purposefully by letting
# the exit block exec.
# - this is diff from the cancel/error case where
# a cancel request from this side or an error
# should be sent to the far end indicating the
# stream WAS NOT just closed normally/gracefully.
async with MsgStream( async with MsgStream(
ctx=self, ctx=self,
rx_chan=ctx._recv_chan, rx_chan=ctx._recv_chan,
@ -567,11 +770,37 @@ class Context:
# await trio.lowlevel.checkpoint() # await trio.lowlevel.checkpoint()
yield stream yield stream
# NOTE: Make the stream "one-shot use". On exit,
# signal # XXX: (MEGA IMPORTANT) if this is a root opened process we
# ``trio.EndOfChannel``/``StopAsyncIteration`` to # wait for any immediate child in debug before popping the
# the far end. # context from the runtime msg loop otherwise inside
await stream.aclose() # ``Actor._push_result()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the
# child has already cleared it and clobbered IPC.
#
# await maybe_wait_for_debugger()
# XXX TODO: pretty sure this isn't needed (see
# note above this block) AND will result in
# a double `.send_stop()` call. The only reason to
# put it here would be to due with "order" in
# terms of raising any remote error (as per
# directly below) or bc the stream's
# `.__aexit__()` block might not get run
# (doubtful)? Either way if we did put this back
# in we also need a state var to avoid the double
# stop-msg send..
#
# await stream.aclose()
# if re := ctx._remote_error:
# ctx._maybe_raise_remote_err(
# re,
# raise_ctxc_from_self_call=True,
# )
# await trio.lowlevel.checkpoint()
finally: finally:
if self._portal: if self._portal:
@ -587,7 +816,10 @@ class Context:
def _maybe_raise_remote_err( def _maybe_raise_remote_err(
self, self,
err: Exception, err: Exception,
) -> None: raise_ctxc_from_self_call: bool = False,
raise_overrun_from_self: bool = True,
) -> ContextCancelled|None:
''' '''
Maybe raise a remote error depending on who (which task from Maybe raise a remote error depending on who (which task from
which actor) requested a cancellation (if any). which actor) requested a cancellation (if any).
@ -603,13 +835,21 @@ class Context:
# "error"-msg. # "error"-msg.
our_uid: tuple[str, str] = current_actor().uid our_uid: tuple[str, str] = current_actor().uid
if ( if (
isinstance(err, ContextCancelled) (not raise_ctxc_from_self_call
and isinstance(err, ContextCancelled)
and ( and (
self._cancel_called self._cancel_called
or self.chan._cancel_called or self.chan._cancel_called
or self.canceller == our_uid or self.canceller == our_uid
or tuple(err.canceller) == our_uid or tuple(err.canceller) == our_uid)
) )
or
(not raise_overrun_from_self
and isinstance(err, RemoteActorError)
and err.msgdata['type_str'] == 'StreamOverrun'
and tuple(err.msgdata['sender']) == our_uid
)
): ):
# NOTE: we set the local scope error to any "self # NOTE: we set the local scope error to any "self
# cancellation" error-response thus "absorbing" # cancellation" error-response thus "absorbing"
@ -661,77 +901,196 @@ class Context:
assert self._portal, "Context.result() can not be called from callee!" assert self._portal, "Context.result() can not be called from callee!"
assert self._recv_chan assert self._recv_chan
if re := self._remote_error: raise_overrun: bool = not self._allow_overruns
return self._maybe_raise_remote_err(re) # if re := self._remote_error:
# return self._maybe_raise_remote_err(
# re,
# # NOTE: obvi we don't care if we
# # overran the far end if we're already
# # waiting on a final result (msg).
# raise_overrun_from_self=raise_overrun,
# )
res_placeholder: int = id(self)
if ( if (
self._result == id(self) self._result == res_placeholder
and not self._remote_error and not self._remote_error
and not self._recv_chan._closed # type: ignore and not self._recv_chan._closed # type: ignore
): ):
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
while True:
try:
msg = await self._recv_chan.receive()
self._result: Any = msg['return']
# NOTE: we don't need to do this right? # wait for a final context result by collecting (but
# XXX: only close the rx mem chan AFTER # basically ignoring) any bi-dir-stream msgs still in transit
# a final result is retreived. # from the far end.
# if self._recv_chan: drained_msgs: list[dict] = await _drain_to_final_msg(ctx=self)
# await self._recv_chan.aclose() log.runtime(
'Ctx drained pre-result msgs:\n'
break f'{drained_msgs}'
# NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases:
# 1. we requested the cancellation and thus
# SHOULD NOT raise that far end error,
# 2. WE DID NOT REQUEST that cancel and thus
# SHOULD RAISE HERE!
except trio.Cancelled:
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is the
# (likely) source cause of this local runtime
# task's cancellation.
if re := self._remote_error:
self._maybe_raise_remote_err(re)
# CASE 1: we DID request the cancel we simply
# continue to bubble up as normal.
raise
except KeyError: # as msgerr:
if 'yield' in msg:
# far end task is still streaming to us so discard
log.warning(f'Discarding stream delivered {msg}')
continue
elif 'stop' in msg:
log.debug('Remote stream terminated')
continue
# internal error should never get here
assert msg.get('cid'), (
"Received internal error at portal?"
) )
if err:= unpack_error( # TODO: implement via helper func ^^^^
msg, # pre_result_drained: list[dict] = []
self._portal.channel # while not self._remote_error:
): # from msgerr # try:
self._maybe_cancel_and_set_remote_error(err) # # NOTE: this REPL usage actually works here dawg! Bo
self._maybe_raise_remote_err(err) # # from .devx._debug import pause
# # await pause()
# # if re := self._remote_error:
# # self._maybe_raise_remote_err(
# # re,
# # # NOTE: obvi we don't care if we
# # # overran the far end if we're already
# # # waiting on a final result (msg).
# # raise_overrun_from_self=raise_overrun,
# # )
else: # # TODO: bad idea?
raise # # with trio.CancelScope() as res_cs:
# # self._res_scope = res_cs
# # msg: dict = await self._recv_chan.receive()
# # if res_cs.cancelled_caught:
if re := self._remote_error: # # from .devx._debug import pause
return self._maybe_raise_remote_err(re) # # await pause()
# msg: dict = await self._recv_chan.receive()
# self._result: Any = msg['return']
# log.runtime(
# 'Context delivered final result msg:\n'
# f'{pformat(msg)}'
# )
# # NOTE: we don't need to do this right?
# # XXX: only close the rx mem chan AFTER
# # a final result is retreived.
# # if self._recv_chan:
# # await self._recv_chan.aclose()
# break
# # NOTE: we get here if the far end was
# # `ContextCancelled` in 2 cases:
# # 1. we requested the cancellation and thus
# # SHOULD NOT raise that far end error,
# # 2. WE DID NOT REQUEST that cancel and thus
# # SHOULD RAISE HERE!
# except trio.Cancelled:
# # CASE 2: mask the local cancelled-error(s)
# # only when we are sure the remote error is
# # the source cause of this local task's
# # cancellation.
# if re := self._remote_error:
# self._maybe_raise_remote_err(re)
# # CASE 1: we DID request the cancel we simply
# # continue to bubble up as normal.
# raise
# except KeyError:
# if 'yield' in msg:
# # far end task is still streaming to us so discard
# log.warning(f'Discarding std "yield"\n{msg}')
# pre_result_drained.append(msg)
# continue
# # TODO: work out edge cases here where
# # a stream is open but the task also calls
# # this?
# # -[ ] should be a runtime error if a stream is open
# # right?
# elif 'stop' in msg:
# log.cancel(
# 'Remote stream terminated due to "stop" msg:\n'
# f'{msg}'
# )
# pre_result_drained.append(msg)
# continue
# # internal error should never get here
# assert msg.get('cid'), (
# "Received internal error at portal?"
# )
# # XXX fallthrough to handle expected error XXX
# re: Exception|None = self._remote_error
# if re:
# log.critical(
# 'Remote ctx terminated due to "error" msg:\n'
# f'{re}'
# )
# assert msg is self._cancel_msg
# # NOTE: this solved a super dupe edge case XD
# # this was THE super duper edge case of:
# # - local task opens a remote task,
# # - requests remote cancellation of far end
# # ctx/tasks,
# # - needs to wait for the cancel ack msg
# # (ctxc) or some result in the race case
# # where the other side's task returns
# # before the cancel request msg is ever
# # rxed and processed,
# # - here this surrounding drain loop (which
# # iterates all ipc msgs until the ack or
# # an early result arrives) was NOT exiting
# # since we are the edge case: local task
# # does not re-raise any ctxc it receives
# # IFF **it** was the cancellation
# # requester..
# # will raise if necessary, ow break from
# # loop presuming any error terminates the
# # context!
# self._maybe_raise_remote_err(
# re,
# # NOTE: obvi we don't care if we
# # overran the far end if we're already
# # waiting on a final result (msg).
# # raise_overrun_from_self=False,
# raise_overrun_from_self=raise_overrun,
# )
# break # OOOOOF, yeah obvi we need this..
# # XXX we should never really get here
# # right! since `._deliver_msg()` should
# # always have detected an {'error': ..}
# # msg and already called this right!?!
# elif error := unpack_error(
# msg=msg,
# chan=self._portal.channel,
# hide_tb=False,
# ):
# log.critical('SHOULD NEVER GET HERE!?')
# assert msg is self._cancel_msg
# assert error.msgdata == self._remote_error.msgdata
# from .devx._debug import pause
# await pause()
# self._maybe_cancel_and_set_remote_error(error)
# self._maybe_raise_remote_err(error)
# else:
# # bubble the original src key error
# raise
if (
(re := self._remote_error)
and self._result == res_placeholder
):
maybe_err: Exception|None = self._maybe_raise_remote_err(
re,
# NOTE: obvi we don't care if we
# overran the far end if we're already
# waiting on a final result (msg).
# raise_overrun_from_self=False,
raise_overrun_from_self=(
raise_overrun
and
# only when we ARE NOT the canceller
# should we raise overruns, bc ow we're
# raising something we know might happen
# during cancellation ;)
(not self._cancel_called)
),
)
if maybe_err:
self._result = maybe_err
return self._result return self._result
@ -779,7 +1138,7 @@ class Context:
while self._overflow_q: while self._overflow_q:
# NOTE: these msgs should never be errors since we always do # NOTE: these msgs should never be errors since we always do
# the check prior to checking if we're in an overrun state # the check prior to checking if we're in an overrun state
# inside ``.deliver_msg()``. # inside ``._deliver_msg()``.
msg = self._overflow_q.popleft() msg = self._overflow_q.popleft()
try: try:
await self._send_chan.send(msg) await self._send_chan.send(msg)
@ -830,34 +1189,50 @@ class Context:
messages are eventually sent if possible. messages are eventually sent if possible.
''' '''
cid = self.cid cid: str = self.cid
chan = self.chan chan: Channel = self.chan
uid = chan.uid from_uid: tuple[str, str] = chan.uid
send_chan: trio.MemorySendChannel = self._send_chan send_chan: trio.MemorySendChannel = self._send_chan
log.runtime( if re := unpack_error(
f"Delivering {msg} from {uid} to caller {cid}"
)
if (
msg.get('error') # check for field
and (
error := unpack_error(
msg, msg,
self.chan, self.chan,
)
)
): ):
log.error(
f'Delivering error-msg from {from_uid} to caller {cid}'
f'{re}'
)
self._cancel_msg = msg self._cancel_msg = msg
self._maybe_cancel_and_set_remote_error(error) self._maybe_cancel_and_set_remote_error(re)
if ( # XXX NEVER do this XXX..!!
self._in_overrun # bc if the error is a ctxc and there is a task
): # waiting on `.result()` we need the msg to be sent
# over the `send_chan`/`._recv_chan` so that the error
# is relayed to that waiter task..
# return True
#
# XXX ALSO NO!! XXX
# if self._remote_error:
# self._maybe_raise_remote_err(error)
if self._in_overrun:
log.warning(
f'Capturing overrun-msg from {from_uid} to caller {cid}'
f'{msg}'
)
self._overflow_q.append(msg) self._overflow_q.append(msg)
return False return False
try: try:
log.runtime(
f'Delivering IPC `Context` msg:\n'
f'<= {from_uid}\n'
f'=> caller: {cid}\n'
f'{msg}'
)
# from .devx._debug import pause
# await pause()
send_chan.send_nowait(msg) send_chan.send_nowait(msg)
return True return True
# if an error is deteced we should always # if an error is deteced we should always
@ -890,7 +1265,8 @@ class Context:
lines = [ lines = [
f'OVERRUN on actor-task context {cid}@{local_uid}!\n' f'OVERRUN on actor-task context {cid}@{local_uid}!\n'
# TODO: put remote task name here if possible? # TODO: put remote task name here if possible?
f'remote sender actor: {uid}', f'sender: {from_uid}',
f'msg: {msg}',
# TODO: put task func name here and maybe an arrow # TODO: put task func name here and maybe an arrow
# from sender to overrunner? # from sender to overrunner?
# f'local task {self.func_name}' # f'local task {self.func_name}'
@ -926,11 +1302,19 @@ class Context:
# anything different. # anything different.
return False return False
else: else:
# raise local overrun and immediately pack as IPC
# msg for far end.
try: try:
raise StreamOverrun(text) raise StreamOverrun(
text,
sender=from_uid,
)
except StreamOverrun as err: except StreamOverrun as err:
err_msg = pack_error(err) err_msg: dict[str, dict] = pack_error(
err_msg['cid'] = cid err,
cid=cid,
)
# err_msg['cid']: str = cid
try: try:
await chan.send(err_msg) await chan.send(err_msg)
except trio.BrokenResourceError: except trio.BrokenResourceError:

View File

@ -163,13 +163,15 @@ class MessagingError(Exception):
def pack_error( def pack_error(
exc: BaseException, exc: BaseException,
tb: str | None = None, tb: str|None = None,
cid: str|None = None,
) -> dict[str, dict]: ) -> dict[str, dict]:
''' '''
Create an "error message" encoded for wire transport via an IPC Create an "error message" which boxes a locally caught
`Channel`; expected to be unpacked on the receiver side using exception's meta-data and encodes it for wire transport via an
`unpack_error()` below. IPC `Channel`; expected to be unpacked (and thus unboxed) on
the receiver side using `unpack_error()` below.
''' '''
if tb: if tb:
@ -197,7 +199,12 @@ def pack_error(
): ):
error_msg.update(exc.msgdata) error_msg.update(exc.msgdata)
return {'error': error_msg}
pkt: dict = {'error': error_msg}
if cid:
pkt['cid'] = cid
return pkt
def unpack_error( def unpack_error(
@ -207,7 +214,7 @@ def unpack_error(
err_type=RemoteActorError, err_type=RemoteActorError,
hide_tb: bool = True, hide_tb: bool = True,
) -> None | Exception: ) -> None|Exception:
''' '''
Unpack an 'error' message from the wire Unpack an 'error' message from the wire
into a local `RemoteActorError` (subtype). into a local `RemoteActorError` (subtype).
@ -358,8 +365,7 @@ def _raise_from_no_key_in_msg(
# is activated above. # is activated above.
_type: str = 'Stream' if stream else 'Context' _type: str = 'Stream' if stream else 'Context'
raise MessagingError( raise MessagingError(
f'{_type} was expecting a `{expect_key}` message' f"{_type} was expecting a '{expect_key}' message"
' BUT received a non-`error` msg:\n' " BUT received a non-error msg:\n"
f'cid: {cid}\n' f'{pformat(msg)}'
'{pformat(msg)}'
) from src_err ) from src_err

View File

@ -19,13 +19,14 @@ Inter-process comms abstractions
""" """
from __future__ import annotations from __future__ import annotations
import platform
import struct import struct
import typing import platform
from pprint import pformat
from collections.abc import ( from collections.abc import (
AsyncGenerator, AsyncGenerator,
AsyncIterator, AsyncIterator,
) )
import typing
from typing import ( from typing import (
Any, Any,
runtime_checkable, runtime_checkable,
@ -370,7 +371,10 @@ class Channel:
async def send(self, item: Any) -> None: async def send(self, item: Any) -> None:
log.transport(f"send `{item}`") # type: ignore log.transport(
'=> send IPC msg:\n\n'
f'{pformat(item)}\n'
) # type: ignore
assert self.msgstream assert self.msgstream
await self.msgstream.send(item) await self.msgstream.send(item)

View File

@ -39,7 +39,15 @@ import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
from .trionics import maybe_open_nursery from .trionics import maybe_open_nursery
from ._state import current_actor from .devx import (
# acquire_debug_lock,
# pause,
maybe_wait_for_debugger,
)
from ._state import (
current_actor,
debug_mode,
)
from ._ipc import Channel from ._ipc import Channel
from .log import get_logger from .log import get_logger
from .msg import NamespacePath from .msg import NamespacePath
@ -48,6 +56,7 @@ from ._exceptions import (
unpack_error, unpack_error,
NoResult, NoResult,
ContextCancelled, ContextCancelled,
RemoteActorError,
) )
from ._context import ( from ._context import (
Context, Context,
@ -468,7 +477,6 @@ class Portal:
ctx._started_called: bool = True ctx._started_called: bool = True
except KeyError as src_error: except KeyError as src_error:
_raise_from_no_key_in_msg( _raise_from_no_key_in_msg(
ctx=ctx, ctx=ctx,
msg=msg, msg=msg,
@ -493,6 +501,33 @@ class Portal:
# in enter tuple. # in enter tuple.
yield ctx, first yield ctx, first
# between the caller exiting and arriving here the
# far end may have sent a ctxc-msg or other error,
# so check for it here immediately and maybe raise
# so as to engage the ctxc handling block below!
# if re := ctx._remote_error:
# maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err(
# re,
# # TODO: do we want this to always raise?
# # - means that on self-ctxc, if/when the
# # block is exited before the msg arrives
# # but then the msg during __exit__
# # calling we may not activate the
# # ctxc-handler block below? should we
# # be?
# # - if there's a remote error that arrives
# # after the child has exited, we won't
# # handle until the `finally:` block
# # where `.result()` is always called,
# # again in which case we handle it
# # differently then in the handler block
# # that would normally engage from THIS
# # block?
# raise_ctxc_from_self_call=True,
# )
# assert maybe_ctxc
# when in allow_overruns mode there may be # when in allow_overruns mode there may be
# lingering overflow sender tasks remaining? # lingering overflow sender tasks remaining?
if nurse.child_tasks: if nurse.child_tasks:
@ -538,7 +573,7 @@ class Portal:
# `.canceller: tuple[str, str]` to be same value as # `.canceller: tuple[str, str]` to be same value as
# caught here in a `ContextCancelled.canceller`. # caught here in a `ContextCancelled.canceller`.
# #
# Again, there are 2 cases: # AGAIN to restate the above, there are 2 cases:
# #
# 1-some other context opened in this `.open_context()` # 1-some other context opened in this `.open_context()`
# block cancelled due to a self or peer cancellation # block cancelled due to a self or peer cancellation
@ -554,6 +589,16 @@ class Portal:
except ContextCancelled as ctxc: except ContextCancelled as ctxc:
scope_err = ctxc scope_err = ctxc
# XXX TODO XXX: FIX THIS debug_mode BUGGGG!!!
# using this code and then resuming the REPL will
# cause a SIGINT-ignoring HANG!
# -> prolly due to a stale debug lock entry..
# -[ ] USE `.stackscope` to demonstrate that (possibly
# documenting it as a definittive example of
# debugging the tractor-runtime itself using it's
# own `.devx.` tooling!
# await pause()
# CASE 2: context was cancelled by local task calling # CASE 2: context was cancelled by local task calling
# `.cancel()`, we don't raise and the exit block should # `.cancel()`, we don't raise and the exit block should
# exit silently. # exit silently.
@ -561,18 +606,23 @@ class Portal:
ctx._cancel_called ctx._cancel_called
and ( and (
ctxc is ctx._remote_error ctxc is ctx._remote_error
or # ctxc.msgdata == ctx._remote_error.msgdata
ctxc.canceller is self.canceller
# TODO: uhh `Portal.canceller` ain't a thangg
# dawg? (was `self.canceller` before?!?)
and
ctxc.canceller == self.actor.uid
) )
): ):
log.debug( log.cancel(
f'Context {ctx} cancelled gracefully with:\n' f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n'
f'{ctxc}' f'{ctxc}'
) )
# CASE 1: this context was never cancelled via a local # CASE 1: this context was never cancelled via a local
# task (tree) having called `Context.cancel()`, raise # task (tree) having called `Context.cancel()`, raise
# the error since it was caused by someone else! # the error since it was caused by someone else!
else: else:
# await pause()
raise raise
# the above `._scope` can be cancelled due to: # the above `._scope` can be cancelled due to:
@ -601,8 +651,8 @@ class Portal:
trio.Cancelled, # NOTE: NOT from inside the ctx._scope trio.Cancelled, # NOTE: NOT from inside the ctx._scope
KeyboardInterrupt, KeyboardInterrupt,
) as err: ) as caller_err:
scope_err = err scope_err = caller_err
# XXX: ALWAYS request the context to CANCEL ON any ERROR. # XXX: ALWAYS request the context to CANCEL ON any ERROR.
# NOTE: `Context.cancel()` is conversely NEVER CALLED in # NOTE: `Context.cancel()` is conversely NEVER CALLED in
@ -610,11 +660,26 @@ class Portal:
# handled in the block above! # handled in the block above!
log.cancel( log.cancel(
'Context cancelled for task due to\n' 'Context cancelled for task due to\n'
f'{err}\n' f'{caller_err}\n'
'Sending cancel request..\n' 'Sending cancel request..\n'
f'task:{cid}\n' f'task:{cid}\n'
f'actor:{uid}' f'actor:{uid}'
) )
if debug_mode():
log.pdb(
'Delaying `ctx.cancel()` until debug lock '
'acquired..'
)
# async with acquire_debug_lock(self.actor.uid):
# pass
# TODO: factor ^ into below for non-root cases?
await maybe_wait_for_debugger()
log.pdb(
'Acquired debug lock! '
'Calling `ctx.cancel()`!'
)
try: try:
await ctx.cancel() await ctx.cancel()
except trio.BrokenResourceError: except trio.BrokenResourceError:
@ -628,6 +693,33 @@ class Portal:
# no local scope error, the "clean exit with a result" case. # no local scope error, the "clean exit with a result" case.
else: else:
# between the caller exiting and arriving here the
# far end may have sent a ctxc-msg or other error,
# so check for it here immediately and maybe raise
# so as to engage the ctxc handling block below!
# if re := ctx._remote_error:
# maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err(
# re,
# # TODO: do we want this to always raise?
# # - means that on self-ctxc, if/when the
# # block is exited before the msg arrives
# # but then the msg during __exit__
# # calling we may not activate the
# # ctxc-handler block below? should we
# # be?
# # - if there's a remote error that arrives
# # after the child has exited, we won't
# # handle until the `finally:` block
# # where `.result()` is always called,
# # again in which case we handle it
# # differently then in the handler block
# # that would normally engage from THIS
# # block?
# raise_ctxc_from_self_call=True,
# )
# assert maybe_ctxc
if ctx.chan.connected(): if ctx.chan.connected():
log.info( log.info(
'Waiting on final context-task result for\n' 'Waiting on final context-task result for\n'
@ -644,13 +736,8 @@ class Portal:
# As per `Context._deliver_msg()`, that error IS # As per `Context._deliver_msg()`, that error IS
# ALWAYS SET any time "callee" side fails and causes "caller # ALWAYS SET any time "callee" side fails and causes "caller
# side" cancellation via a `ContextCancelled` here. # side" cancellation via a `ContextCancelled` here.
# result = await ctx.result()
try: try:
result = await ctx.result() result_or_err: Exception|Any = await ctx.result()
log.runtime(
f'Context {fn_name} returned value from callee:\n'
f'`{result}`'
)
except BaseException as berr: except BaseException as berr:
# on normal teardown, if we get some error # on normal teardown, if we get some error
# raised in `Context.result()` we still want to # raised in `Context.result()` we still want to
@ -662,7 +749,48 @@ class Portal:
scope_err = berr scope_err = berr
raise raise
# an exception type boxed in a `RemoteActorError`
# is returned (meaning it was obvi not raised).
msgdata: str|None = getattr(
result_or_err,
'msgdata',
None
)
# yes! this worx Bp
# from .devx import _debug
# await _debug.pause()
match (msgdata, result_or_err):
case (
{'tb_str': tbstr},
ContextCancelled(),
):
log.cancel(tbstr)
case (
{'tb_str': tbstr},
RemoteActorError(),
):
log.exception(
f'Context `{fn_name}` remotely errored:\n'
f'`{tbstr}`'
)
case (None, _):
log.runtime(
f'Context {fn_name} returned value from callee:\n'
f'`{result_or_err}`'
)
finally: finally:
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
# ``Actor._push_result()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the
# child has already cleared it and clobbered IPC.
await maybe_wait_for_debugger()
# though it should be impossible for any tasks # though it should be impossible for any tasks
# operating *in* this scope to have survived # operating *in* this scope to have survived
# we tear down the runtime feeder chan last # we tear down the runtime feeder chan last
@ -707,6 +835,10 @@ class Portal:
# out any exception group or legit (remote) ctx # out any exception group or legit (remote) ctx
# error that sourced from the remote task or its # error that sourced from the remote task or its
# runtime. # runtime.
#
# NOTE: further, this should be the only place the
# underlying feeder channel is
# once-and-only-CLOSED!
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await ctx._recv_chan.aclose() await ctx._recv_chan.aclose()
@ -747,6 +879,9 @@ class Portal:
# FINALLY, remove the context from runtime tracking and # FINALLY, remove the context from runtime tracking and
# exit! # exit!
log.runtime(
f'Exiting context opened with {ctx.chan.uid}'
)
self.actor._contexts.pop( self.actor._contexts.pop(
(self.channel.uid, ctx.cid), (self.channel.uid, ctx.cid),
None, None,

View File

@ -28,6 +28,7 @@ from itertools import chain
import importlib import importlib
import importlib.util import importlib.util
import inspect import inspect
from pprint import pformat
import signal import signal
import sys import sys
from typing import ( from typing import (
@ -48,6 +49,10 @@ import trio
from trio import ( from trio import (
CancelScope, CancelScope,
) )
from trio.lowlevel import (
current_task,
Task,
)
from trio_typing import ( from trio_typing import (
Nursery, Nursery,
TaskStatus, TaskStatus,
@ -80,6 +85,26 @@ if TYPE_CHECKING:
log = get_logger('tractor') log = get_logger('tractor')
_gb_mod: ModuleType|None|False = None
async def maybe_import_gb():
global _gb_mod
if _gb_mod is False:
return
try:
import greenback
_gb_mod = greenback
await greenback.ensure_portal()
except ModuleNotFoundError:
log.warning(
'`greenback` is not installed.\n'
'No sync debug support!'
)
_gb_mod = False
async def _invoke( async def _invoke(
@ -227,15 +252,27 @@ async def _invoke(
# wrapper that calls `Context.started()` and then does # wrapper that calls `Context.started()` and then does
# the `await coro()`? # the `await coro()`?
elif context: elif context:
# context func with support for bi-dir streaming
await chan.send({'functype': 'context', 'cid': cid}) # a "context" endpoint type is the most general and
# "least sugary" type of RPC ep with support for
# bi-dir streaming B)
await chan.send({
'functype': 'context',
'cid': cid
})
try: try:
async with trio.open_nursery() as nurse: async with trio.open_nursery() as nurse:
ctx._scope_nursery = nurse ctx._scope_nursery = nurse
ctx._scope = nurse.cancel_scope ctx._scope = nurse.cancel_scope
task_status.started(ctx) task_status.started(ctx)
# TODO: should would be nice to have our
# `TaskMngr` nursery here!
# res: Any = await coro
res = await coro res = await coro
# deliver final result to caller side.
await chan.send({ await chan.send({
'return': res, 'return': res,
'cid': cid 'cid': cid
@ -271,9 +308,10 @@ async def _invoke(
# associated child isn't in debug any more # associated child isn't in debug any more
await _debug.maybe_wait_for_debugger() await _debug.maybe_wait_for_debugger()
ctx: Context = actor._contexts.pop((chan.uid, cid)) ctx: Context = actor._contexts.pop((chan.uid, cid))
log.runtime( log.cancel(
f'Context entrypoint {func} was terminated:\n' f'Context task was terminated:\n'
f'{ctx}' f'func: {func}\n'
f'ctx: {pformat(ctx)}'
) )
if ctx.cancelled_caught: if ctx.cancelled_caught:
@ -285,13 +323,14 @@ async def _invoke(
if re := ctx._remote_error: if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re) ctx._maybe_raise_remote_err(re)
fname: str = func.__name__ # fname: str = func.__name__
task: Task = current_task()
cs: CancelScope = ctx._scope cs: CancelScope = ctx._scope
if cs.cancel_called: if cs.cancel_called:
our_uid: tuple = actor.uid our_uid: tuple = actor.uid
canceller: tuple = ctx.canceller canceller: tuple = ctx.canceller
msg: str = ( msg: str = (
f'`{fname}()`@{our_uid} cancelled by ' 'actor was cancelled by '
) )
# NOTE / TODO: if we end up having # NOTE / TODO: if we end up having
@ -310,16 +349,37 @@ async def _invoke(
# some actor who calls `Portal.cancel_actor()` # some actor who calls `Portal.cancel_actor()`
# and by side-effect cancels this ctx. # and by side-effect cancels this ctx.
elif canceller == ctx.chan.uid: elif canceller == ctx.chan.uid:
msg += f'its caller {canceller} ' msg += 'its caller'
else: else:
msg += f'remote actor {canceller}' msg += 'a remote peer'
div_chars: str = '------ - ------'
div_offset: int = (
round(len(msg)/2)+1
+
round(len(div_chars)/2)+1
)
div_str: str = (
'\n'
+
' '*div_offset
+
f'{div_chars}\n'
)
msg += (
div_str +
f'<= canceller: {canceller}\n'
f'=> uid: {our_uid}\n'
f' |_ task: `{task.name}()`'
)
# TODO: does this ever get set any more or can # TODO: does this ever get set any more or can
# we remove it? # we remove it?
if ctx._cancel_msg: if ctx._cancel_msg:
msg += ( msg += (
' with msg:\n' '------ - ------\n'
'IPC msg:\n'
f'{ctx._cancel_msg}' f'{ctx._cancel_msg}'
) )
@ -439,9 +499,9 @@ async def _invoke(
# always ship errors back to caller # always ship errors back to caller
err_msg: dict[str, dict] = pack_error( err_msg: dict[str, dict] = pack_error(
err, err,
tb=tb, # tb=tb, # TODO: special tb fmting?
cid=cid,
) )
err_msg['cid'] = cid
if is_rpc: if is_rpc:
try: try:
@ -508,19 +568,28 @@ async def try_ship_error_to_parent(
err: Union[Exception, BaseExceptionGroup], err: Union[Exception, BaseExceptionGroup],
) -> None: ) -> None:
'''
Box, pack and encode a local runtime(-internal) exception for
an IPC channel `.send()` with transport/network failures and
local cancellation ignored but logged as critical(ly bad).
'''
with CancelScope(shield=True): with CancelScope(shield=True):
try: try:
# internal error so ship to parent without cid await channel.send(
await channel.send(pack_error(err)) # NOTE: normally only used for internal runtime errors
# so ship to peer actor without a cid.
pack_error(err)
)
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
): ):
# in SC terms this is one of the worst things that can # in SC terms this is one of the worst things that can
# happen and creates the 2-general's dilemma. # happen and provides for a 2-general's dilemma..
log.critical( log.critical(
f"Failed to ship error to parent " f'Failed to ship error to parent '
f"{channel.uid}, channel was closed" f'{channel.uid}, IPC transport failure!'
) )
@ -573,6 +642,11 @@ class Actor:
# if started on ``asycio`` running ``trio`` in guest mode # if started on ``asycio`` running ``trio`` in guest mode
_infected_aio: bool = False _infected_aio: bool = False
# _ans: dict[
# tuple[str, str],
# list[ActorNursery],
# ] = {}
# Process-global stack closed at end on actor runtime teardown. # Process-global stack closed at end on actor runtime teardown.
# NOTE: this is currently an undocumented public api. # NOTE: this is currently an undocumented public api.
lifetime_stack: ExitStack = ExitStack() lifetime_stack: ExitStack = ExitStack()
@ -593,7 +667,10 @@ class Actor:
''' '''
self.name = name self.name = name
self.uid = (name, uid or str(uuid.uuid4())) self.uid = (
name,
uid or str(uuid.uuid4())
)
self._cancel_complete = trio.Event() self._cancel_complete = trio.Event()
self._cancel_called_by_remote: tuple[str, tuple] | None = None self._cancel_called_by_remote: tuple[str, tuple] | None = None
@ -762,7 +839,10 @@ class Actor:
return return
# channel tracking # channel tracking
event = self._peer_connected.pop(uid, None) event: trio.Event|None = self._peer_connected.pop(
uid,
None,
)
if event: if event:
# Instructing connection: this is likely a new channel to # Instructing connection: this is likely a new channel to
# a recently spawned actor which we'd like to control via # a recently spawned actor which we'd like to control via
@ -771,46 +851,43 @@ class Actor:
# Alert any task waiting on this connection to come up # Alert any task waiting on this connection to come up
event.set() event.set()
chans = self._peers[uid] chans: list[Channel] = self._peers[uid]
# TODO: re-use channels for new connections instead
# of always new ones; will require changing all the
# discovery funcs
if chans: if chans:
# TODO: re-use channels for new connections instead
# of always new ones?
# => will require changing all the discovery funcs..
log.runtime( log.runtime(
f"already have channel(s) for {uid}:{chans}?" f"already have channel(s) for {uid}:{chans}?"
) )
log.runtime(f"Registered {chan} for {uid}") # type: ignore
# append new channel # append new channel
log.runtime(f"Registered {chan} for {uid}") # type: ignore
# TODO: can we just use list-ref directly?
# chans.append(chan)
self._peers[uid].append(chan) self._peers[uid].append(chan)
local_nursery: ActorNursery | None = None # noqa
disconnected: bool = False
# Begin channel management - respond to remote requests and # Begin channel management - respond to remote requests and
# process received reponses. # process received reponses.
disconnected: bool = False
try: try:
disconnected = await process_messages(self, chan) disconnected: bool = await process_messages(self, chan)
except trio.Cancelled:
except ( log.cancel(f'Msg loop was cancelled for {chan}')
trio.Cancelled,
):
log.cancel(f"Msg loop was cancelled for {chan}")
raise raise
finally: finally:
local_nursery = self._actoruid2nursery.get(uid, local_nursery) local_nursery: (
ActorNursery|None
) = self._actoruid2nursery.get(uid)
# This is set in ``Portal.cancel_actor()``. So if # This is set in ``Portal.cancel_actor()``. So if
# the peer was cancelled we try to wait for them # the peer was cancelled we try to wait for them
# to tear down their side of the connection before # to tear down their side of the connection before
# moving on with closing our own side. # moving on with closing our own side.
if ( if local_nursery:
local_nursery
):
if chan._cancel_called: if chan._cancel_called:
log.cancel(f"Waiting on cancel request to peer {chan.uid}") log.cancel(f'Waiting on cancel request to peer {chan.uid}')
# XXX: this is a soft wait on the channel (and its # XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the # underlying transport protocol) to close from the
# remote peer side since we presume that any channel # remote peer side since we presume that any channel
@ -853,26 +930,48 @@ class Actor:
# the cause of other downstream errors. # the cause of other downstream errors.
entry = local_nursery._children.get(uid) entry = local_nursery._children.get(uid)
if entry: if entry:
proc: trio.Process
_, proc, _ = entry _, proc, _ = entry
poll = getattr(proc, 'poll', None) poll = getattr(proc, 'poll', None)
if poll and poll() is None: if poll and poll() is None:
log.cancel( log.cancel(
f'Actor {uid} IPC broke but proc is alive?' f'Peer actor IPC broke but proc is alive?\n'
f'uid: {uid}\n'
f'|_{proc}\n'
) )
# ``Channel`` teardown and closure sequence # ``Channel`` teardown and closure sequence
# Drop ref to channel so it can be gc-ed and disconnected # Drop ref to channel so it can be gc-ed and disconnected
log.runtime(f"Releasing channel {chan} from {chan.uid}") log.runtime(
f'Disconnected IPC channel:\n'
f'uid: {chan.uid}\n'
f'|_{pformat(chan)}\n'
)
chans = self._peers.get(chan.uid) chans = self._peers.get(chan.uid)
chans.remove(chan) chans.remove(chan)
if not chans: if not chans:
log.runtime(f"No more channels for {chan.uid}") log.runtime(
f'No more channels with {chan.uid}'
)
self._peers.pop(uid, None) self._peers.pop(uid, None)
log.runtime(f"Peers is {self._peers}") peers_str: str = ''
for uid, chans in self._peers.items():
peers_str += (
f'- uid: {uid}\n'
)
for i, chan in enumerate(chans):
peers_str += (
f' |_[{i}] {pformat(chan)}\n'
)
log.runtime(
f'Remaining IPC {len(self._peers)} peers:\n'
+ peers_str
)
# No more channels to other actors (at all) registered # No more channels to other actors (at all) registered
# as connected. # as connected.
@ -888,15 +987,58 @@ class Actor:
if _state.is_root_process(): if _state.is_root_process():
pdb_lock = _debug.Lock pdb_lock = _debug.Lock
pdb_lock._blocked.add(uid) pdb_lock._blocked.add(uid)
log.runtime(f"{uid} blocked from pdb locking")
# TODO: NEEEDS TO BE TESTED!
# actually, no idea if this ever even enters.. XD
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
if (
pdb_user_uid
and local_nursery
):
entry: tuple|None = local_nursery._children.get(pdb_user_uid)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
(poll := getattr(proc, 'poll', None))
and poll() is None
):
log.cancel(
'Root actor reports no-more-peers, BUT '
'a DISCONNECTED child still has the debug '
'lock!\n'
f'root uid: {self.uid}\n'
f'last disconnected child uid: {uid}\n'
f'locking child uid: {pdb_user_uid}\n'
)
await _debug.maybe_wait_for_debugger(
child_in_debug=True
)
# TODO: just bc a child's transport dropped
# doesn't mean it's not still using the pdb
# REPL! so,
# -[ ] ideally we can check out child proc
# tree to ensure that its alive (and
# actually using the REPL) before we cancel
# it's lock acquire by doing the below!
# -[ ] create a way to read the tree of each actor's
# grandchildren such that when an
# intermediary parent is cancelled but their
# child has locked the tty, the grandparent
# will not allow the parent to cancel or
# zombie reap the child! see open issue:
# - https://github.com/goodboy/tractor/issues/320
# ------ - ------
# if a now stale local task has the TTY lock still # if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for # we cancel it to allow servicing other requests for
# the lock. # the lock.
db_cs = pdb_lock._root_local_task_cs_in_debug db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug
if ( if (
db_cs db_cs
and not db_cs.cancel_called and not db_cs.cancel_called
and uid == pdb_user_uid
): ):
log.warning( log.warning(
f'STALE DEBUG LOCK DETECTED FOR {uid}' f'STALE DEBUG LOCK DETECTED FOR {uid}'
@ -905,13 +1047,22 @@ class Actor:
db_cs.cancel() db_cs.cancel()
# XXX: is this necessary (GC should do it)? # XXX: is this necessary (GC should do it)?
if chan.connected(): # XXX WARNING XXX
# Be AWARE OF THE INDENT LEVEL HERE
# -> ONLY ENTER THIS BLOCK WHEN ._peers IS
# EMPTY!!!!
if (
not self._peers
and chan.connected()
):
# if the channel is still connected it may mean the far # if the channel is still connected it may mean the far
# end has not closed and we may have gotten here due to # end has not closed and we may have gotten here due to
# an error and so we should at least try to terminate # an error and so we should at least try to terminate
# the channel from this end gracefully. # the channel from this end gracefully.
log.runtime(
log.runtime(f"Disconnecting channel {chan}") 'Terminating channel with `None` setinel msg\n'
f'|_{chan}\n'
)
try: try:
# send a msg loop terminate sentinel # send a msg loop terminate sentinel
await chan.send(None) await chan.send(None)
@ -928,15 +1079,16 @@ class Actor:
chan: Channel, chan: Channel,
cid: str, cid: str,
msg: dict[str, Any], msg: dict[str, Any],
) -> None:
) -> None|bool:
''' '''
Push an RPC result to the local consumer's queue. Push an RPC result to the local consumer's queue.
''' '''
uid = chan.uid uid: tuple[str, str] = chan.uid
assert uid, f"`chan.uid` can't be {uid}" assert uid, f"`chan.uid` can't be {uid}"
try: try:
ctx = self._contexts[(uid, cid)] ctx: Context = self._contexts[(uid, cid)]
except KeyError: except KeyError:
log.warning( log.warning(
f'Ignoring msg from [no-longer/un]known context {uid}:' f'Ignoring msg from [no-longer/un]known context {uid}:'
@ -1066,6 +1218,16 @@ class Actor:
parent_data.pop('bind_port'), parent_data.pop('bind_port'),
) )
rvs = parent_data.pop('_runtime_vars') rvs = parent_data.pop('_runtime_vars')
if rvs['_debug_mode']:
try:
from .devx import enable_stack_on_sig
enable_stack_on_sig()
except ImportError:
log.warning(
'`stackscope` not installed for use in debug mode!'
)
log.runtime(f"Runtime vars are: {rvs}") log.runtime(f"Runtime vars are: {rvs}")
rvs['_is_root'] = False rvs['_is_root'] = False
_state._runtime_vars.update(rvs) _state._runtime_vars.update(rvs)
@ -1284,9 +1446,15 @@ class Actor:
''' '''
tasks: dict = self._rpc_tasks tasks: dict = self._rpc_tasks
if tasks: if tasks:
tasks_str: str = ''
for (ctx, func, _) in tasks.values():
tasks_str += (
f' |_{func.__name__}() [cid={ctx.cid[-6:]}..]\n'
)
log.cancel( log.cancel(
f'Cancelling all {len(tasks)} rpc tasks:\n' f'Cancelling all {len(tasks)} rpc tasks:\n'
f'{tasks}' f'{tasks_str}'
) )
for ( for (
(chan, cid), (chan, cid),
@ -1511,7 +1679,10 @@ async def async_main(
) )
if actor._parent_chan: if actor._parent_chan:
await try_ship_error_to_parent(actor._parent_chan, err) await try_ship_error_to_parent(
actor._parent_chan,
err,
)
# always! # always!
match err: match err:
@ -1595,43 +1766,53 @@ async def process_messages(
or boxed errors back to the remote caller (task). or boxed errors back to the remote caller (task).
''' '''
# TODO: once https://github.com/python-trio/trio/issues/467 gets # TODO: once `trio` get's an "obvious way" for req/resp we
# worked out we'll likely want to use that! # should use it?
msg: dict | None = None # https://github.com/python-trio/trio/issues/467
log.runtime(
'Entering IPC msg loop:\n'
f'peer: {chan.uid}\n'
f'|_{chan}'
)
nursery_cancelled_before_task: bool = False nursery_cancelled_before_task: bool = False
msg: dict | None = None
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
try: try:
# NOTE: this internal scope allows for keeping this
# message loop running despite the current task having
# been cancelled (eg. `open_portal()` may call this method
# from a locally spawned task) and recieve this scope
# using ``scope = Nursery.start()``
with CancelScope(shield=shield) as loop_cs: with CancelScope(shield=shield) as loop_cs:
# this internal scope allows for keeping this message
# loop running despite the current task having been
# cancelled (eg. `open_portal()` may call this method from
# a locally spawned task) and recieve this scope using
# ``scope = Nursery.start()``
task_status.started(loop_cs) task_status.started(loop_cs)
async for msg in chan: async for msg in chan:
if msg is None: # loop terminate sentinel # dedicated loop terminate sentinel
if msg is None:
tasks: dict[
tuple[Channel, str],
tuple[Context, Callable, trio.Event]
] = actor._rpc_tasks.copy()
log.cancel( log.cancel(
f"Channel to {chan.uid} terminated?\n" f'Peer IPC channel terminated via `None` setinel msg?\n'
"Cancelling all associated tasks..") f'=> Cancelling all {len(tasks)} local RPC tasks..\n'
f'peer: {chan.uid}\n'
for (channel, cid) in actor._rpc_tasks.copy(): f'|_{chan}\n'
)
for (channel, cid) in tasks:
if channel is chan: if channel is chan:
await actor._cancel_task( await actor._cancel_task(
cid, cid,
channel, channel,
) )
log.runtime(
f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}")
break break
log.transport( # type: ignore log.transport( # type: ignore
f"Received msg {msg} from {chan.uid}") f'<= IPC msg from peer: {chan.uid}\n\n'
# TODO: conditionally avoid fmting depending
# on log level (for perf)?
f'{pformat(msg)}\n'
)
cid = msg.get('cid') cid = msg.get('cid')
if cid: if cid:
@ -1640,7 +1821,10 @@ async def process_messages(
await actor._push_result(chan, cid, msg) await actor._push_result(chan, cid, msg)
log.runtime( log.runtime(
f"Waiting on next msg for {chan} from {chan.uid}") f'Waiting on next IPC msg from {chan.uid}:\n'
# f'last msg: {msg}\n'
f'|_{chan}'
)
continue continue
# TODO: implement with ``match:`` syntax? # TODO: implement with ``match:`` syntax?
@ -1693,7 +1877,7 @@ async def process_messages(
) )
log.cancel( log.cancel(
f'Cancelling msg loop for {chan.uid}' f'Cancelling IPC msg-loop with {chan.uid}'
) )
loop_cs.cancel() loop_cs.cancel()
break break
@ -1735,8 +1919,10 @@ async def process_messages(
try: try:
func = actor._get_rpc_func(ns, funcname) func = actor._get_rpc_func(ns, funcname)
except (ModuleNotExposed, AttributeError) as err: except (ModuleNotExposed, AttributeError) as err:
err_msg = pack_error(err) err_msg: dict[str, dict] = pack_error(
err_msg['cid'] = cid err,
cid=cid,
)
await chan.send(err_msg) await chan.send(err_msg)
continue continue
@ -1838,7 +2024,10 @@ async def process_messages(
log.exception("Actor errored:") log.exception("Actor errored:")
if actor._parent_chan: if actor._parent_chan:
await try_ship_error_to_parent(actor._parent_chan, err) await try_ship_error_to_parent(
actor._parent_chan,
err,
)
# if this is the `MainProcess` we expect the error broadcasting # if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints" # above to trigger an error at consuming portal "checkpoints"
@ -1847,8 +2036,9 @@ async def process_messages(
finally: finally:
# msg debugging for when he machinery is brokey # msg debugging for when he machinery is brokey
log.runtime( log.runtime(
f"Exiting msg loop for {chan} from {chan.uid} " f'Exiting IPC msg loop with {chan.uid} '
f"with last msg:\n{msg}" f'final msg: {msg}\n'
f'|_{chan}'
) )
# transport **was not** disconnected # transport **was not** disconnected

View File

@ -144,7 +144,7 @@ async def exhaust_portal(
# XXX: streams should never be reaped here since they should # XXX: streams should never be reaped here since they should
# always be established and shutdown using a context manager api # always be established and shutdown using a context manager api
final = await portal.result() final: Any = await portal.result()
except ( except (
Exception, Exception,
@ -152,13 +152,23 @@ async def exhaust_portal(
) as err: ) as err:
# we reraise in the parent task via a ``BaseExceptionGroup`` # we reraise in the parent task via a ``BaseExceptionGroup``
return err return err
except trio.Cancelled as err: except trio.Cancelled as err:
# lol, of course we need this too ;P # lol, of course we need this too ;P
# TODO: merge with above? # TODO: merge with above?
log.warning(f"Cancelled result waiter for {portal.actor.uid}") log.warning(
'Cancelled portal result waiter task:\n'
f'uid: {portal.channel.uid}\n'
f'error: {err}\n'
)
return err return err
else: else:
log.debug(f"Returning final result: {final}") log.debug(
f'Returning final result from portal:\n'
f'uid: {portal.channel.uid}\n'
f'result: {final}\n'
)
return final return final
@ -170,26 +180,34 @@ async def cancel_on_completion(
) -> None: ) -> None:
''' '''
Cancel actor gracefully once it's "main" portal's Cancel actor gracefully once its "main" portal's
result arrives. result arrives.
Should only be called for actors spawned with `run_in_actor()`. Should only be called for actors spawned via the
`Portal.run_in_actor()` API.
=> and really this API will be deprecated and should be
re-implemented as a `.hilevel.one_shot_task_nursery()`..)
''' '''
# if this call errors we store the exception for later # if this call errors we store the exception for later
# in ``errors`` which will be reraised inside # in ``errors`` which will be reraised inside
# an exception group and we still send out a cancel request # an exception group and we still send out a cancel request
result = await exhaust_portal(portal, actor) result: Any|Exception = await exhaust_portal(portal, actor)
if isinstance(result, Exception): if isinstance(result, Exception):
errors[actor.uid] = result errors[actor.uid]: Exception = result
log.warning( log.warning(
f"Cancelling {portal.channel.uid} after error {result}" 'Cancelling subactor due to error:\n'
f'uid: {portal.channel.uid}\n'
f'error: {result}\n'
) )
else: else:
log.runtime( log.runtime(
f"Cancelling {portal.channel.uid} gracefully " 'Cancelling subactor gracefully:\n'
f"after result {result}") f'uid: {portal.channel.uid}\n'
f'result: {result}\n'
)
# cancel the process now that we have a final result # cancel the process now that we have a final result
await portal.cancel_actor() await portal.cancel_actor()
@ -219,11 +237,14 @@ async def do_hard_kill(
to be handled. to be handled.
''' '''
log.cancel(
'Terminating sub-proc:\n'
f'|_{proc}\n'
)
# NOTE: this timeout used to do nothing since we were shielding # NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much # the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as # never release until the process exits, now it acts as
# a hard-kill time ultimatum. # a hard-kill time ultimatum.
log.debug(f"Terminating {proc}")
with trio.move_on_after(terminate_after) as cs: with trio.move_on_after(terminate_after) as cs:
# NOTE: code below was copied verbatim from the now deprecated # NOTE: code below was copied verbatim from the now deprecated
@ -260,7 +281,10 @@ async def do_hard_kill(
# zombies (as a feature) we ask the OS to do send in the # zombies (as a feature) we ask the OS to do send in the
# removal swad as the last resort. # removal swad as the last resort.
if cs.cancelled_caught: if cs.cancelled_caught:
log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}") log.critical(
'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'
f'|_{proc}\n'
)
proc.kill() proc.kill()
@ -281,10 +305,16 @@ async def soft_wait(
join/reap on an actor-runtime-in-process. join/reap on an actor-runtime-in-process.
''' '''
uid = portal.channel.uid uid: tuple[str, str] = portal.channel.uid
try: try:
log.cancel(f'Soft waiting on actor:\n{uid}') log.cancel(
'Soft waiting on sub-actor proc:\n'
f'uid: {uid}\n'
f'|_{proc}\n'
)
# wait on sub-proc to signal termination
await wait_func(proc) await wait_func(proc)
except trio.Cancelled: except trio.Cancelled:
# if cancelled during a soft wait, cancel the child # if cancelled during a soft wait, cancel the child
# actor before entering the hard reap sequence # actor before entering the hard reap sequence
@ -296,8 +326,8 @@ async def soft_wait(
async def cancel_on_proc_deth(): async def cancel_on_proc_deth():
''' '''
Cancel the actor cancel request if we detect that "Cancel the (actor) cancel" request if we detect
that the process terminated. that that the underlying sub-process terminated.
''' '''
await wait_func(proc) await wait_func(proc)
@ -314,10 +344,10 @@ async def soft_wait(
if proc.poll() is None: # type: ignore if proc.poll() is None: # type: ignore
log.warning( log.warning(
'Actor still alive after cancel request:\n' 'Subactor still alive after cancel request?\n\n'
f'{uid}' f'uid: {uid}\n'
f'|_{proc}\n'
) )
n.cancel_scope.cancel() n.cancel_scope.cancel()
raise raise
@ -341,7 +371,7 @@ async def new_proc(
) -> None: ) -> None:
# lookup backend spawning target # lookup backend spawning target
target = _methods[_spawn_method] target: Callable = _methods[_spawn_method]
# mark the new actor with the global spawn method # mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method subactor._spawn_method = _spawn_method
@ -492,8 +522,9 @@ async def trio_proc(
# cancel result waiter that may have been spawned in # cancel result waiter that may have been spawned in
# tandem if not done already # tandem if not done already
log.cancel( log.cancel(
"Cancelling existing result waiter task for " 'Cancelling existing result waiter task for '
f"{subactor.uid}") f'{subactor.uid}'
)
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()
finally: finally:
@ -511,7 +542,16 @@ async def trio_proc(
with trio.move_on_after(0.5): with trio.move_on_after(0.5):
await proc.wait() await proc.wait()
if is_root_process(): log.pdb(
'Delaying subproc reaper while debugger locked..'
)
await maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get(
'_debug_mode', False
),
# TODO: need a diff value then default?
# poll_steps=9999999,
)
# TODO: solve the following issue where we need # TODO: solve the following issue where we need
# to do a similar wait like this but in an # to do a similar wait like this but in an
# "intermediary" parent actor that itself isn't # "intermediary" parent actor that itself isn't
@ -519,10 +559,18 @@ async def trio_proc(
# to hold off on relaying SIGINT until that child # to hold off on relaying SIGINT until that child
# is complete. # is complete.
# https://github.com/goodboy/tractor/issues/320 # https://github.com/goodboy/tractor/issues/320
await maybe_wait_for_debugger( # -[ ] we need to handle non-root parent-actors specially
child_in_debug=_runtime_vars.get( # by somehow determining if a child is in debug and then
'_debug_mode', False), # avoiding cancel/kill of said child by this
) # (intermediary) parent until such a time as the root says
# the pdb lock is released and we are good to tear down
# (our children)..
#
# -[ ] so maybe something like this where we try to
# acquire the lock and get notified of who has it,
# check that uid against our known children?
# this_uid: tuple[str, str] = current_actor().uid
# await acquire_debug_lock(this_uid)
if proc.poll() is None: if proc.poll() is None:
log.cancel(f"Attempting to hard kill {proc}") log.cancel(f"Attempting to hard kill {proc}")

View File

@ -21,8 +21,9 @@ The machinery and types behind ``Context.open_stream()``
''' '''
from __future__ import annotations from __future__ import annotations
import inspect
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
import inspect
from pprint import pformat
from typing import ( from typing import (
Any, Any,
Callable, Callable,
@ -35,6 +36,7 @@ import trio
from ._exceptions import ( from ._exceptions import (
_raise_from_no_key_in_msg, _raise_from_no_key_in_msg,
ContextCancelled,
) )
from .log import get_logger from .log import get_logger
from .trionics import ( from .trionics import (
@ -84,8 +86,8 @@ class MsgStream(trio.abc.Channel):
self._broadcaster = _broadcaster self._broadcaster = _broadcaster
# flag to denote end of stream # flag to denote end of stream
self._eoc: bool = False self._eoc: bool|trio.EndOfChannel = False
self._closed: bool = False self._closed: bool|trio.ClosedResourceError = False
# delegate directly to underlying mem channel # delegate directly to underlying mem channel
def receive_nowait(self): def receive_nowait(self):
@ -93,6 +95,9 @@ class MsgStream(trio.abc.Channel):
try: try:
return msg['yield'] return msg['yield']
except KeyError as kerr: except KeyError as kerr:
# if 'return' in msg:
# return msg
_raise_from_no_key_in_msg( _raise_from_no_key_in_msg(
ctx=self._ctx, ctx=self._ctx,
msg=msg, msg=msg,
@ -122,16 +127,26 @@ class MsgStream(trio.abc.Channel):
# see ``.aclose()`` for notes on the old behaviour prior to # see ``.aclose()`` for notes on the old behaviour prior to
# introducing this # introducing this
if self._eoc: if self._eoc:
raise trio.EndOfChannel raise self._eoc
# raise trio.EndOfChannel
if self._closed: if self._closed:
raise trio.ClosedResourceError('This stream was closed') raise self._closed
# raise trio.ClosedResourceError(
# 'This stream was already closed'
# )
src_err: Exception|None = None
try:
try: try:
msg = await self._rx_chan.receive() msg = await self._rx_chan.receive()
return msg['yield'] return msg['yield']
except KeyError as kerr: except KeyError as kerr:
src_err = kerr
# NOTE: may raise any of the below error types
# includg EoC when a 'stop' msg is found.
_raise_from_no_key_in_msg( _raise_from_no_key_in_msg(
ctx=self._ctx, ctx=self._ctx,
msg=msg, msg=msg,
@ -141,11 +156,14 @@ class MsgStream(trio.abc.Channel):
stream=self, stream=self,
) )
except (
trio.ClosedResourceError, # by self._rx_chan
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
):
# XXX: we close the stream on any of these error conditions: # XXX: we close the stream on any of these error conditions:
except (
# trio.ClosedResourceError, # by self._rx_chan
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
) as eoc:
src_err = eoc
self._eoc = eoc
# await trio.sleep(1)
# a ``ClosedResourceError`` indicates that the internal # a ``ClosedResourceError`` indicates that the internal
# feeder memory receive channel was closed likely by the # feeder memory receive channel was closed likely by the
@ -168,14 +186,53 @@ class MsgStream(trio.abc.Channel):
# closing this stream and not flushing a final value to # closing this stream and not flushing a final value to
# remaining (clone) consumers who may not have been # remaining (clone) consumers who may not have been
# scheduled to receive it yet. # scheduled to receive it yet.
# try:
# maybe_err_msg_or_res: dict = self._rx_chan.receive_nowait()
# if maybe_err_msg_or_res:
# log.warning(
# 'Discarding un-processed msg:\n'
# f'{maybe_err_msg_or_res}'
# )
# except trio.WouldBlock:
# # no queued msgs that might be another remote
# # error, so just raise the original EoC
# pass
# raise eoc
except trio.ClosedResourceError as cre: # by self._rx_chan
src_err = cre
log.warning(
'`Context._rx_chan` was already closed?'
)
self._closed = cre
# when the send is closed we assume the stream has # when the send is closed we assume the stream has
# terminated and signal this local iterator to stop # terminated and signal this local iterator to stop
await self.aclose() drained: list[Exception|dict] = await self.aclose()
if drained:
log.warning(
'Drained context msgs during closure:\n'
f'{drained}'
)
# TODO: pass these to the `._ctx._drained_msgs: deque`
# and then iterate them as part of any `.result()` call?
raise # propagate # NOTE XXX: if the context was cancelled or remote-errored
# but we received the stream close msg first, we
# probably want to instead raise the remote error
# over the end-of-stream connection error since likely
# the remote error was the source cause?
ctx: Context = self._ctx
if re := ctx._remote_error:
ctx._maybe_raise_remote_err(
re,
raise_ctxc_from_self_call=True,
)
async def aclose(self): raise src_err # propagate
async def aclose(self) -> list[Exception|dict]:
''' '''
Cancel associated remote actor task and local memory channel on Cancel associated remote actor task and local memory channel on
close. close.
@ -185,15 +242,55 @@ class MsgStream(trio.abc.Channel):
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
rx_chan = self._rx_chan rx_chan = self._rx_chan
if rx_chan._closed: if (
log.cancel(f"{self} is already closed") rx_chan._closed
or
self._closed
):
log.cancel(
f'`MsgStream` is already closed\n'
f'.cid: {self._ctx.cid}\n'
f'._rx_chan`: {rx_chan}\n'
f'._eoc: {self._eoc}\n'
f'._closed: {self._eoc}\n'
)
# this stream has already been closed so silently succeed as # this stream has already been closed so silently succeed as
# per ``trio.AsyncResource`` semantics. # per ``trio.AsyncResource`` semantics.
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
return return []
self._eoc = True ctx: Context = self._ctx
# caught_eoc: bool = False
drained: list[Exception|dict] = []
while not drained:
try:
maybe_final_msg = self.receive_nowait()
if maybe_final_msg:
log.cancel(
'Drained un-processed stream msg:\n'
f'{pformat(maybe_final_msg)}'
)
# TODO: inject into parent `Context` buf?
drained.append(maybe_final_msg)
except trio.WouldBlock as be:
drained.append(be)
break
except trio.EndOfChannel as eoc:
drained.append(eoc)
# caught_eoc = True
self._eoc: bool = eoc
break
except ContextCancelled as ctxc:
log.cancel(
'Context was cancelled during stream closure:\n'
f'canceller: {ctxc.canceller}\n'
f'{pformat(ctxc.msgdata)}'
)
break
# NOTE: this is super subtle IPC messaging stuff: # NOTE: this is super subtle IPC messaging stuff:
# Relay stop iteration to far end **iff** we're # Relay stop iteration to far end **iff** we're
@ -224,26 +321,33 @@ class MsgStream(trio.abc.Channel):
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError trio.ClosedResourceError
): ) as re:
# the underlying channel may already have been pulled # the underlying channel may already have been pulled
# in which case our stop message is meaningless since # in which case our stop message is meaningless since
# it can't traverse the transport. # it can't traverse the transport.
ctx = self._ctx
log.warning( log.warning(
f'Stream was already destroyed?\n' f'Stream was already destroyed?\n'
f'actor: {ctx.chan.uid}\n' f'actor: {ctx.chan.uid}\n'
f'ctx id: {ctx.cid}' f'ctx id: {ctx.cid}'
) )
drained.append(re)
self._closed = re
self._closed = True # if caught_eoc:
# # from .devx import _debug
# # await _debug.pause()
# with trio.CancelScope(shield=True):
# await rx_chan.aclose()
# Do we close the local mem chan ``self._rx_chan`` ??!? # self._eoc: bool = caught_eoc
# NO, DEFINITELY NOT if we're a bi-dir ``MsgStream``! # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
# BECAUSE this same core-msg-loop mem recv-chan is used to deliver # => NO, DEFINITELY NOT! <=
# the potential final result from the surrounding inter-actor # if we're a bi-dir ``MsgStream`` BECAUSE this same
# `Context` so we don't want to close it until that context has # core-msg-loop mem recv-chan is used to deliver the
# run to completion. # potential final result from the surrounding inter-actor
# `Context` so we don't want to close it until that
# context has run to completion.
# XXX: Notes on old behaviour: # XXX: Notes on old behaviour:
# await rx_chan.aclose() # await rx_chan.aclose()
@ -272,6 +376,8 @@ class MsgStream(trio.abc.Channel):
# runtime's closure of ``rx_chan`` in the case where we may # runtime's closure of ``rx_chan`` in the case where we may
# still need to consume msgs that are "in transit" from the far # still need to consume msgs that are "in transit" from the far
# end (eg. for ``Context.result()``). # end (eg. for ``Context.result()``).
# self._closed = True
return drained
@acm @acm
async def subscribe( async def subscribe(
@ -337,9 +443,13 @@ class MsgStream(trio.abc.Channel):
raise self._ctx._remote_error # from None raise self._ctx._remote_error # from None
if self._closed: if self._closed:
raise trio.ClosedResourceError('This stream was already closed') raise self._closed
# raise trio.ClosedResourceError('This stream was already closed')
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) await self._ctx.chan.send({
'yield': data,
'cid': self._ctx.cid,
})
def stream(func: Callable) -> Callable: def stream(func: Callable) -> Callable:

View File

@ -157,7 +157,7 @@ class ActorNursery:
# start a task to spawn a process # start a task to spawn a process
# blocks until process has been started and a portal setup # blocks until process has been started and a portal setup
nursery = nursery or self._da_nursery nursery: trio.Nursery = nursery or self._da_nursery
# XXX: the type ignore is actually due to a `mypy` bug # XXX: the type ignore is actually due to a `mypy` bug
return await nursery.start( # type: ignore return await nursery.start( # type: ignore
@ -233,12 +233,14 @@ class ActorNursery:
return portal return portal
async def cancel(self, hard_kill: bool = False) -> None: async def cancel(self, hard_kill: bool = False) -> None:
"""Cancel this nursery by instructing each subactor to cancel '''
Cancel this nursery by instructing each subactor to cancel
itself and wait for all subactors to terminate. itself and wait for all subactors to terminate.
If ``hard_killl`` is set to ``True`` then kill the processes If ``hard_killl`` is set to ``True`` then kill the processes
directly without any far end graceful ``trio`` cancellation. directly without any far end graceful ``trio`` cancellation.
"""
'''
self.cancelled = True self.cancelled = True
log.cancel(f"Cancelling nursery in {self._actor.uid}") log.cancel(f"Cancelling nursery in {self._actor.uid}")
@ -246,7 +248,14 @@ class ActorNursery:
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for subactor, proc, portal in self._children.values(): subactor: Actor
proc: trio.Process
portal: Portal
for (
subactor,
proc,
portal,
) in self._children.values():
# TODO: are we ever even going to use this or # TODO: are we ever even going to use this or
# is the spawning backend responsible for such # is the spawning backend responsible for such
@ -286,8 +295,16 @@ class ActorNursery:
# then hard kill all sub-processes # then hard kill all sub-processes
if cs.cancelled_caught: if cs.cancelled_caught:
log.error( log.error(
f"Failed to cancel {self}\nHard killing process tree!") f'Failed to cancel {self}\nHard killing process tree!'
for subactor, proc, portal in self._children.values(): )
subactor: Actor
proc: trio.Process
portal: Portal
for (
subactor,
proc,
portal,
) in self._children.values():
log.warning(f"Hard killing process {proc}") log.warning(f"Hard killing process {proc}")
proc.terminate() proc.terminate()
@ -384,7 +401,17 @@ async def _open_and_supervise_one_cancels_all_nursery(
else: else:
log.exception( log.exception(
f"Nursery for {current_actor().uid} " f"Nursery for {current_actor().uid} "
f"errored with") "errored with\n"
# TODO: same thing as in
# `._invoke()` to compute how to
# place this div-line in the
# middle of the above msg
# content..
# -[ ] prolly helper-func it too
# in our `.log` module..
# '------ - ------'
)
# cancel all subactors # cancel all subactors
await anursery.cancel() await anursery.cancel()

View File

@ -289,11 +289,19 @@ def get_console_log(
if not level: if not level:
return log return log
log.setLevel(level.upper() if not isinstance(level, int) else level) log.setLevel(
level.upper()
if not isinstance(level, int)
else level
)
if not any( if not any(
handler.stream == sys.stderr # type: ignore handler.stream == sys.stderr # type: ignore
for handler in logger.handlers if getattr(handler, 'stream', None) for handler in logger.handlers if getattr(
handler,
'stream',
None,
)
): ):
handler = logging.StreamHandler() handler = logging.StreamHandler()
formatter = colorlog.ColoredFormatter( formatter = colorlog.ColoredFormatter(