Compare commits
7 Commits
9fc9b10b53
...
338ea5529c
Author | SHA1 | Date |
---|---|---|
|
338ea5529c | |
|
6bc67338cf | |
|
fd20004757 | |
|
ddc2e5f0f8 | |
|
4b0aa5e379 | |
|
6a303358df | |
|
c85757aee1 |
|
@ -43,12 +43,17 @@ import warnings
|
|||
|
||||
import trio
|
||||
|
||||
# from .devx import (
|
||||
# maybe_wait_for_debugger,
|
||||
# pause,
|
||||
# )
|
||||
from ._exceptions import (
|
||||
# _raise_from_no_key_in_msg,
|
||||
unpack_error,
|
||||
pack_error,
|
||||
ContextCancelled,
|
||||
# MessagingError,
|
||||
RemoteActorError,
|
||||
StreamOverrun,
|
||||
)
|
||||
from .log import get_logger
|
||||
|
@ -64,6 +69,164 @@ if TYPE_CHECKING:
|
|||
log = get_logger(__name__)
|
||||
|
||||
|
||||
async def _drain_to_final_msg(
|
||||
ctx: Context,
|
||||
) -> list[dict]:
|
||||
|
||||
# ) -> tuple[
|
||||
# Any|Exception,
|
||||
# list[dict],
|
||||
# ]:
|
||||
raise_overrun: bool = not ctx._allow_overruns
|
||||
|
||||
# wait for a final context result by collecting (but
|
||||
# basically ignoring) any bi-dir-stream msgs still in transit
|
||||
# from the far end.
|
||||
pre_result_drained: list[dict] = []
|
||||
while not ctx._remote_error:
|
||||
try:
|
||||
# NOTE: this REPL usage actually works here dawg! Bo
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
# if re := ctx._remote_error:
|
||||
# ctx._maybe_raise_remote_err(
|
||||
# re,
|
||||
# # NOTE: obvi we don't care if we
|
||||
# # overran the far end if we're already
|
||||
# # waiting on a final result (msg).
|
||||
# raise_overrun_from_self=raise_overrun,
|
||||
# )
|
||||
|
||||
# TODO: bad idea?
|
||||
# with trio.CancelScope() as res_cs:
|
||||
# ctx._res_scope = res_cs
|
||||
# msg: dict = await ctx._recv_chan.receive()
|
||||
# if res_cs.cancelled_caught:
|
||||
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
msg: dict = await ctx._recv_chan.receive()
|
||||
ctx._result: Any = msg['return']
|
||||
log.runtime(
|
||||
'Context delivered final result msg:\n'
|
||||
f'{pformat(msg)}'
|
||||
)
|
||||
pre_result_drained.append(msg)
|
||||
# NOTE: we don't need to do this right?
|
||||
# XXX: only close the rx mem chan AFTER
|
||||
# a final result is retreived.
|
||||
# if ctx._recv_chan:
|
||||
# await ctx._recv_chan.aclose()
|
||||
break
|
||||
|
||||
# NOTE: we get here if the far end was
|
||||
# `ContextCancelled` in 2 cases:
|
||||
# 1. we requested the cancellation and thus
|
||||
# SHOULD NOT raise that far end error,
|
||||
# 2. WE DID NOT REQUEST that cancel and thus
|
||||
# SHOULD RAISE HERE!
|
||||
except trio.Cancelled:
|
||||
|
||||
# CASE 2: mask the local cancelled-error(s)
|
||||
# only when we are sure the remote error is
|
||||
# the source cause of this local task's
|
||||
# cancellation.
|
||||
if re := ctx._remote_error:
|
||||
ctx._maybe_raise_remote_err(re)
|
||||
|
||||
# CASE 1: we DID request the cancel we simply
|
||||
# continue to bubble up as normal.
|
||||
raise
|
||||
|
||||
except KeyError:
|
||||
|
||||
if 'yield' in msg:
|
||||
# far end task is still streaming to us so discard
|
||||
log.warning(f'Discarding std "yield"\n{msg}')
|
||||
pre_result_drained.append(msg)
|
||||
continue
|
||||
|
||||
# TODO: work out edge cases here where
|
||||
# a stream is open but the task also calls
|
||||
# this?
|
||||
# -[ ] should be a runtime error if a stream is open
|
||||
# right?
|
||||
elif 'stop' in msg:
|
||||
log.cancel(
|
||||
'Remote stream terminated due to "stop" msg:\n'
|
||||
f'{msg}'
|
||||
)
|
||||
pre_result_drained.append(msg)
|
||||
continue
|
||||
|
||||
# internal error should never get here
|
||||
assert msg.get('cid'), (
|
||||
"Received internal error at portal?"
|
||||
)
|
||||
|
||||
# XXX fallthrough to handle expected error XXX
|
||||
re: Exception|None = ctx._remote_error
|
||||
if re:
|
||||
log.critical(
|
||||
'Remote ctx terminated due to "error" msg:\n'
|
||||
f'{re}'
|
||||
)
|
||||
assert msg is ctx._cancel_msg
|
||||
# NOTE: this solved a super dupe edge case XD
|
||||
# this was THE super duper edge case of:
|
||||
# - local task opens a remote task,
|
||||
# - requests remote cancellation of far end
|
||||
# ctx/tasks,
|
||||
# - needs to wait for the cancel ack msg
|
||||
# (ctxc) or some result in the race case
|
||||
# where the other side's task returns
|
||||
# before the cancel request msg is ever
|
||||
# rxed and processed,
|
||||
# - here this surrounding drain loop (which
|
||||
# iterates all ipc msgs until the ack or
|
||||
# an early result arrives) was NOT exiting
|
||||
# since we are the edge case: local task
|
||||
# does not re-raise any ctxc it receives
|
||||
# IFF **it** was the cancellation
|
||||
# requester..
|
||||
# will raise if necessary, ow break from
|
||||
# loop presuming any error terminates the
|
||||
# context!
|
||||
ctx._maybe_raise_remote_err(
|
||||
re,
|
||||
# NOTE: obvi we don't care if we
|
||||
# overran the far end if we're already
|
||||
# waiting on a final result (msg).
|
||||
# raise_overrun_from_self=False,
|
||||
raise_overrun_from_self=raise_overrun,
|
||||
)
|
||||
|
||||
break # OOOOOF, yeah obvi we need this..
|
||||
|
||||
# XXX we should never really get here
|
||||
# right! since `._deliver_msg()` should
|
||||
# always have detected an {'error': ..}
|
||||
# msg and already called this right!?!
|
||||
elif error := unpack_error(
|
||||
msg=msg,
|
||||
chan=ctx._portal.channel,
|
||||
hide_tb=False,
|
||||
):
|
||||
log.critical('SHOULD NEVER GET HERE!?')
|
||||
assert msg is ctx._cancel_msg
|
||||
assert error.msgdata == ctx._remote_error.msgdata
|
||||
from .devx._debug import pause
|
||||
await pause()
|
||||
ctx._maybe_cancel_and_set_remote_error(error)
|
||||
ctx._maybe_raise_remote_err(error)
|
||||
|
||||
else:
|
||||
# bubble the original src key error
|
||||
raise
|
||||
|
||||
return pre_result_drained
|
||||
|
||||
|
||||
# TODO: make this a msgspec.Struct!
|
||||
@dataclass
|
||||
class Context:
|
||||
|
@ -118,6 +281,7 @@ class Context:
|
|||
# which is exactly the primitive that allows for
|
||||
# cross-actor-task-supervision and thus SC.
|
||||
_scope: trio.CancelScope | None = None
|
||||
# _res_scope: trio.CancelScope|None = None
|
||||
|
||||
# on a clean exit there should be a final value
|
||||
# delivered from the far end "callee" task, so
|
||||
|
@ -205,6 +369,10 @@ class Context:
|
|||
)
|
||||
)
|
||||
|
||||
# @property
|
||||
# def is_waiting_result(self) -> bool:
|
||||
# return bool(self._res_scope)
|
||||
|
||||
@property
|
||||
def side(self) -> str:
|
||||
'''
|
||||
|
@ -247,7 +415,11 @@ class Context:
|
|||
await self.chan.send({'yield': data, 'cid': self.cid})
|
||||
|
||||
async def send_stop(self) -> None:
|
||||
await self.chan.send({'stop': True, 'cid': self.cid})
|
||||
# await pause()
|
||||
await self.chan.send({
|
||||
'stop': True,
|
||||
'cid': self.cid
|
||||
})
|
||||
|
||||
def _maybe_cancel_and_set_remote_error(
|
||||
self,
|
||||
|
@ -320,27 +492,37 @@ class Context:
|
|||
# XXX: set the remote side's error so that after we cancel
|
||||
# whatever task is the opener of this context it can raise
|
||||
# that error as the reason.
|
||||
# if self._remote_error:
|
||||
# return
|
||||
|
||||
# breakpoint()
|
||||
log.cancel(
|
||||
'Setting remote error for ctx \n'
|
||||
f'<= remote ctx uid: {self.chan.uid}\n'
|
||||
f'=>\n{error}'
|
||||
)
|
||||
self._remote_error: BaseException = error
|
||||
|
||||
if (
|
||||
isinstance(error, ContextCancelled)
|
||||
):
|
||||
# always record the cancelling actor's uid since its cancellation
|
||||
# state is linked and we want to know which process was
|
||||
# the cause / requester of the cancellation.
|
||||
self._canceller = error.canceller
|
||||
|
||||
log.cancel(
|
||||
'Remote task-context was cancelled for '
|
||||
f'actor: {self.chan.uid}\n'
|
||||
f'task: {self.cid}\n'
|
||||
f'canceller: {error.canceller}\n'
|
||||
)
|
||||
# always record the cancelling actor's uid since its cancellation
|
||||
# state is linked and we want to know which process was
|
||||
# the cause / requester of the cancellation.
|
||||
# if error.canceller is None:
|
||||
# import pdbp; pdbp.set_trace()
|
||||
|
||||
# breakpoint()
|
||||
self._canceller = error.canceller
|
||||
|
||||
|
||||
if self._cancel_called:
|
||||
# from ._debug import breakpoint
|
||||
# await breakpoint()
|
||||
|
||||
# this is an expected cancel request response message
|
||||
# and we **don't need to raise it** in local cancel
|
||||
# scope since it will potentially override a real error.
|
||||
|
@ -348,10 +530,11 @@ class Context:
|
|||
|
||||
else:
|
||||
log.error(
|
||||
f'Remote context error,\n'
|
||||
f'remote actor: {self.chan.uid}\n'
|
||||
f'task: {self.cid}\n'
|
||||
f'{error}'
|
||||
f'Remote context error:\n'
|
||||
f'{error}\n'
|
||||
f'{pformat(self)}\n'
|
||||
# f'remote actor: {self.chan.uid}\n'
|
||||
# f'cid: {self.cid}\n'
|
||||
)
|
||||
self._canceller = self.chan.uid
|
||||
|
||||
|
@ -376,9 +559,11 @@ class Context:
|
|||
self._scope.cancel()
|
||||
|
||||
# NOTE: this REPL usage actually works here dawg! Bo
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
|
||||
# TODO: maybe we have to use `._res_scope.cancel()` if it
|
||||
# exists?
|
||||
|
||||
async def cancel(
|
||||
self,
|
||||
timeout: float = 0.616,
|
||||
|
@ -395,6 +580,8 @@ class Context:
|
|||
log.cancel(
|
||||
f'Cancelling {side} side of context to {self.chan.uid}'
|
||||
)
|
||||
|
||||
# await pause()
|
||||
self._cancel_called: bool = True
|
||||
|
||||
# caller side who entered `Portal.open_context()`
|
||||
|
@ -484,13 +671,11 @@ class Context:
|
|||
'''
|
||||
actor: Actor = current_actor()
|
||||
|
||||
# here we create a mem chan that corresponds to the
|
||||
# far end caller / callee.
|
||||
|
||||
# Likewise if the surrounding context has been cancelled we error here
|
||||
# since it likely means the surrounding block was exited or
|
||||
# killed
|
||||
|
||||
# If the surrounding context has been cancelled by some
|
||||
# task with a handle to THIS, we error here immediately
|
||||
# since it likely means the surrounding lexical-scope has
|
||||
# errored, been `trio.Cancelled` or at the least
|
||||
# `Context.cancel()` was called by some task.
|
||||
if self._cancel_called:
|
||||
|
||||
# XXX NOTE: ALWAYS RAISE any remote error here even if
|
||||
|
@ -503,6 +688,11 @@ class Context:
|
|||
# actually try to stream - a cancel msg was already
|
||||
# sent to the other side!
|
||||
if self._remote_error:
|
||||
# NOTE: this is diff then calling
|
||||
# `._maybe_raise_from_remote_msg()` specifically
|
||||
# because any task entering this `.open_stream()`
|
||||
# AFTER cancellation has already been requested,
|
||||
# we DO NOT want to absorb any ctxc ACK silently!
|
||||
raise self._remote_error
|
||||
|
||||
# XXX NOTE: if no `ContextCancelled` has been responded
|
||||
|
@ -529,7 +719,7 @@ class Context:
|
|||
# to send a stop from the caller to the callee in the
|
||||
# single-direction-stream case you'll get a lookup error
|
||||
# currently.
|
||||
ctx = actor.get_context(
|
||||
ctx: Context = actor.get_context(
|
||||
self.chan,
|
||||
self.cid,
|
||||
msg_buffer_size=msg_buffer_size,
|
||||
|
@ -548,6 +738,19 @@ class Context:
|
|||
'The underlying channel for this stream was already closed!?'
|
||||
)
|
||||
|
||||
# NOTE: implicitly this will call `MsgStream.aclose()` on
|
||||
# `.__aexit__()` due to stream's parent `Channel` type!
|
||||
#
|
||||
# XXX NOTE XXX: ensures the stream is "one-shot use",
|
||||
# which specifically means that on exit,
|
||||
# - signal ``trio.EndOfChannel``/``StopAsyncIteration`` to
|
||||
# the far end indicating that the caller exited
|
||||
# the streaming context purposefully by letting
|
||||
# the exit block exec.
|
||||
# - this is diff from the cancel/error case where
|
||||
# a cancel request from this side or an error
|
||||
# should be sent to the far end indicating the
|
||||
# stream WAS NOT just closed normally/gracefully.
|
||||
async with MsgStream(
|
||||
ctx=self,
|
||||
rx_chan=ctx._recv_chan,
|
||||
|
@ -567,11 +770,37 @@ class Context:
|
|||
# await trio.lowlevel.checkpoint()
|
||||
yield stream
|
||||
|
||||
# NOTE: Make the stream "one-shot use". On exit,
|
||||
# signal
|
||||
# ``trio.EndOfChannel``/``StopAsyncIteration`` to
|
||||
# the far end.
|
||||
await stream.aclose()
|
||||
|
||||
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
||||
# wait for any immediate child in debug before popping the
|
||||
# context from the runtime msg loop otherwise inside
|
||||
# ``Actor._push_result()`` the msg will be discarded and in
|
||||
# the case where that msg is global debugger unlock (via
|
||||
# a "stop" msg for a stream), this can result in a deadlock
|
||||
# where the root is waiting on the lock to clear but the
|
||||
# child has already cleared it and clobbered IPC.
|
||||
#
|
||||
# await maybe_wait_for_debugger()
|
||||
|
||||
# XXX TODO: pretty sure this isn't needed (see
|
||||
# note above this block) AND will result in
|
||||
# a double `.send_stop()` call. The only reason to
|
||||
# put it here would be to due with "order" in
|
||||
# terms of raising any remote error (as per
|
||||
# directly below) or bc the stream's
|
||||
# `.__aexit__()` block might not get run
|
||||
# (doubtful)? Either way if we did put this back
|
||||
# in we also need a state var to avoid the double
|
||||
# stop-msg send..
|
||||
#
|
||||
# await stream.aclose()
|
||||
|
||||
# if re := ctx._remote_error:
|
||||
# ctx._maybe_raise_remote_err(
|
||||
# re,
|
||||
# raise_ctxc_from_self_call=True,
|
||||
# )
|
||||
# await trio.lowlevel.checkpoint()
|
||||
|
||||
finally:
|
||||
if self._portal:
|
||||
|
@ -587,7 +816,10 @@ class Context:
|
|||
def _maybe_raise_remote_err(
|
||||
self,
|
||||
err: Exception,
|
||||
) -> None:
|
||||
raise_ctxc_from_self_call: bool = False,
|
||||
raise_overrun_from_self: bool = True,
|
||||
|
||||
) -> ContextCancelled|None:
|
||||
'''
|
||||
Maybe raise a remote error depending on who (which task from
|
||||
which actor) requested a cancellation (if any).
|
||||
|
@ -603,13 +835,21 @@ class Context:
|
|||
# "error"-msg.
|
||||
our_uid: tuple[str, str] = current_actor().uid
|
||||
if (
|
||||
isinstance(err, ContextCancelled)
|
||||
(not raise_ctxc_from_self_call
|
||||
and isinstance(err, ContextCancelled)
|
||||
and (
|
||||
self._cancel_called
|
||||
or self.chan._cancel_called
|
||||
or self.canceller == our_uid
|
||||
or tuple(err.canceller) == our_uid
|
||||
or tuple(err.canceller) == our_uid)
|
||||
)
|
||||
or
|
||||
(not raise_overrun_from_self
|
||||
and isinstance(err, RemoteActorError)
|
||||
and err.msgdata['type_str'] == 'StreamOverrun'
|
||||
and tuple(err.msgdata['sender']) == our_uid
|
||||
)
|
||||
|
||||
):
|
||||
# NOTE: we set the local scope error to any "self
|
||||
# cancellation" error-response thus "absorbing"
|
||||
|
@ -661,77 +901,196 @@ class Context:
|
|||
assert self._portal, "Context.result() can not be called from callee!"
|
||||
assert self._recv_chan
|
||||
|
||||
if re := self._remote_error:
|
||||
return self._maybe_raise_remote_err(re)
|
||||
raise_overrun: bool = not self._allow_overruns
|
||||
# if re := self._remote_error:
|
||||
# return self._maybe_raise_remote_err(
|
||||
# re,
|
||||
# # NOTE: obvi we don't care if we
|
||||
# # overran the far end if we're already
|
||||
# # waiting on a final result (msg).
|
||||
# raise_overrun_from_self=raise_overrun,
|
||||
# )
|
||||
|
||||
res_placeholder: int = id(self)
|
||||
if (
|
||||
self._result == id(self)
|
||||
self._result == res_placeholder
|
||||
and not self._remote_error
|
||||
and not self._recv_chan._closed # type: ignore
|
||||
):
|
||||
# wait for a final context result consuming
|
||||
# and discarding any bi dir stream msgs still
|
||||
# in transit from the far end.
|
||||
while True:
|
||||
try:
|
||||
msg = await self._recv_chan.receive()
|
||||
self._result: Any = msg['return']
|
||||
|
||||
# NOTE: we don't need to do this right?
|
||||
# XXX: only close the rx mem chan AFTER
|
||||
# a final result is retreived.
|
||||
# if self._recv_chan:
|
||||
# await self._recv_chan.aclose()
|
||||
|
||||
break
|
||||
|
||||
# NOTE: we get here if the far end was
|
||||
# `ContextCancelled` in 2 cases:
|
||||
# 1. we requested the cancellation and thus
|
||||
# SHOULD NOT raise that far end error,
|
||||
# 2. WE DID NOT REQUEST that cancel and thus
|
||||
# SHOULD RAISE HERE!
|
||||
except trio.Cancelled:
|
||||
|
||||
# CASE 2: mask the local cancelled-error(s)
|
||||
# only when we are sure the remote error is the
|
||||
# (likely) source cause of this local runtime
|
||||
# task's cancellation.
|
||||
if re := self._remote_error:
|
||||
self._maybe_raise_remote_err(re)
|
||||
|
||||
# CASE 1: we DID request the cancel we simply
|
||||
# continue to bubble up as normal.
|
||||
raise
|
||||
|
||||
except KeyError: # as msgerr:
|
||||
|
||||
if 'yield' in msg:
|
||||
# far end task is still streaming to us so discard
|
||||
log.warning(f'Discarding stream delivered {msg}')
|
||||
continue
|
||||
|
||||
elif 'stop' in msg:
|
||||
log.debug('Remote stream terminated')
|
||||
continue
|
||||
|
||||
# internal error should never get here
|
||||
assert msg.get('cid'), (
|
||||
"Received internal error at portal?"
|
||||
# wait for a final context result by collecting (but
|
||||
# basically ignoring) any bi-dir-stream msgs still in transit
|
||||
# from the far end.
|
||||
drained_msgs: list[dict] = await _drain_to_final_msg(ctx=self)
|
||||
log.runtime(
|
||||
'Ctx drained pre-result msgs:\n'
|
||||
f'{drained_msgs}'
|
||||
)
|
||||
|
||||
if err:= unpack_error(
|
||||
msg,
|
||||
self._portal.channel
|
||||
): # from msgerr
|
||||
self._maybe_cancel_and_set_remote_error(err)
|
||||
self._maybe_raise_remote_err(err)
|
||||
# TODO: implement via helper func ^^^^
|
||||
# pre_result_drained: list[dict] = []
|
||||
# while not self._remote_error:
|
||||
# try:
|
||||
# # NOTE: this REPL usage actually works here dawg! Bo
|
||||
# # from .devx._debug import pause
|
||||
# # await pause()
|
||||
# # if re := self._remote_error:
|
||||
# # self._maybe_raise_remote_err(
|
||||
# # re,
|
||||
# # # NOTE: obvi we don't care if we
|
||||
# # # overran the far end if we're already
|
||||
# # # waiting on a final result (msg).
|
||||
# # raise_overrun_from_self=raise_overrun,
|
||||
# # )
|
||||
|
||||
else:
|
||||
raise
|
||||
# # TODO: bad idea?
|
||||
# # with trio.CancelScope() as res_cs:
|
||||
# # self._res_scope = res_cs
|
||||
# # msg: dict = await self._recv_chan.receive()
|
||||
# # if res_cs.cancelled_caught:
|
||||
|
||||
if re := self._remote_error:
|
||||
return self._maybe_raise_remote_err(re)
|
||||
# # from .devx._debug import pause
|
||||
# # await pause()
|
||||
# msg: dict = await self._recv_chan.receive()
|
||||
# self._result: Any = msg['return']
|
||||
# log.runtime(
|
||||
# 'Context delivered final result msg:\n'
|
||||
# f'{pformat(msg)}'
|
||||
# )
|
||||
# # NOTE: we don't need to do this right?
|
||||
# # XXX: only close the rx mem chan AFTER
|
||||
# # a final result is retreived.
|
||||
# # if self._recv_chan:
|
||||
# # await self._recv_chan.aclose()
|
||||
# break
|
||||
|
||||
# # NOTE: we get here if the far end was
|
||||
# # `ContextCancelled` in 2 cases:
|
||||
# # 1. we requested the cancellation and thus
|
||||
# # SHOULD NOT raise that far end error,
|
||||
# # 2. WE DID NOT REQUEST that cancel and thus
|
||||
# # SHOULD RAISE HERE!
|
||||
# except trio.Cancelled:
|
||||
|
||||
# # CASE 2: mask the local cancelled-error(s)
|
||||
# # only when we are sure the remote error is
|
||||
# # the source cause of this local task's
|
||||
# # cancellation.
|
||||
# if re := self._remote_error:
|
||||
# self._maybe_raise_remote_err(re)
|
||||
|
||||
# # CASE 1: we DID request the cancel we simply
|
||||
# # continue to bubble up as normal.
|
||||
# raise
|
||||
|
||||
# except KeyError:
|
||||
|
||||
# if 'yield' in msg:
|
||||
# # far end task is still streaming to us so discard
|
||||
# log.warning(f'Discarding std "yield"\n{msg}')
|
||||
# pre_result_drained.append(msg)
|
||||
# continue
|
||||
|
||||
# # TODO: work out edge cases here where
|
||||
# # a stream is open but the task also calls
|
||||
# # this?
|
||||
# # -[ ] should be a runtime error if a stream is open
|
||||
# # right?
|
||||
# elif 'stop' in msg:
|
||||
# log.cancel(
|
||||
# 'Remote stream terminated due to "stop" msg:\n'
|
||||
# f'{msg}'
|
||||
# )
|
||||
# pre_result_drained.append(msg)
|
||||
# continue
|
||||
|
||||
# # internal error should never get here
|
||||
# assert msg.get('cid'), (
|
||||
# "Received internal error at portal?"
|
||||
# )
|
||||
|
||||
# # XXX fallthrough to handle expected error XXX
|
||||
# re: Exception|None = self._remote_error
|
||||
# if re:
|
||||
# log.critical(
|
||||
# 'Remote ctx terminated due to "error" msg:\n'
|
||||
# f'{re}'
|
||||
# )
|
||||
# assert msg is self._cancel_msg
|
||||
# # NOTE: this solved a super dupe edge case XD
|
||||
# # this was THE super duper edge case of:
|
||||
# # - local task opens a remote task,
|
||||
# # - requests remote cancellation of far end
|
||||
# # ctx/tasks,
|
||||
# # - needs to wait for the cancel ack msg
|
||||
# # (ctxc) or some result in the race case
|
||||
# # where the other side's task returns
|
||||
# # before the cancel request msg is ever
|
||||
# # rxed and processed,
|
||||
# # - here this surrounding drain loop (which
|
||||
# # iterates all ipc msgs until the ack or
|
||||
# # an early result arrives) was NOT exiting
|
||||
# # since we are the edge case: local task
|
||||
# # does not re-raise any ctxc it receives
|
||||
# # IFF **it** was the cancellation
|
||||
# # requester..
|
||||
# # will raise if necessary, ow break from
|
||||
# # loop presuming any error terminates the
|
||||
# # context!
|
||||
# self._maybe_raise_remote_err(
|
||||
# re,
|
||||
# # NOTE: obvi we don't care if we
|
||||
# # overran the far end if we're already
|
||||
# # waiting on a final result (msg).
|
||||
# # raise_overrun_from_self=False,
|
||||
# raise_overrun_from_self=raise_overrun,
|
||||
# )
|
||||
|
||||
# break # OOOOOF, yeah obvi we need this..
|
||||
|
||||
# # XXX we should never really get here
|
||||
# # right! since `._deliver_msg()` should
|
||||
# # always have detected an {'error': ..}
|
||||
# # msg and already called this right!?!
|
||||
# elif error := unpack_error(
|
||||
# msg=msg,
|
||||
# chan=self._portal.channel,
|
||||
# hide_tb=False,
|
||||
# ):
|
||||
# log.critical('SHOULD NEVER GET HERE!?')
|
||||
# assert msg is self._cancel_msg
|
||||
# assert error.msgdata == self._remote_error.msgdata
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
# self._maybe_cancel_and_set_remote_error(error)
|
||||
# self._maybe_raise_remote_err(error)
|
||||
|
||||
# else:
|
||||
# # bubble the original src key error
|
||||
# raise
|
||||
|
||||
if (
|
||||
(re := self._remote_error)
|
||||
and self._result == res_placeholder
|
||||
):
|
||||
maybe_err: Exception|None = self._maybe_raise_remote_err(
|
||||
re,
|
||||
# NOTE: obvi we don't care if we
|
||||
# overran the far end if we're already
|
||||
# waiting on a final result (msg).
|
||||
# raise_overrun_from_self=False,
|
||||
raise_overrun_from_self=(
|
||||
raise_overrun
|
||||
and
|
||||
# only when we ARE NOT the canceller
|
||||
# should we raise overruns, bc ow we're
|
||||
# raising something we know might happen
|
||||
# during cancellation ;)
|
||||
(not self._cancel_called)
|
||||
),
|
||||
)
|
||||
if maybe_err:
|
||||
self._result = maybe_err
|
||||
|
||||
return self._result
|
||||
|
||||
|
@ -779,7 +1138,7 @@ class Context:
|
|||
while self._overflow_q:
|
||||
# NOTE: these msgs should never be errors since we always do
|
||||
# the check prior to checking if we're in an overrun state
|
||||
# inside ``.deliver_msg()``.
|
||||
# inside ``._deliver_msg()``.
|
||||
msg = self._overflow_q.popleft()
|
||||
try:
|
||||
await self._send_chan.send(msg)
|
||||
|
@ -830,34 +1189,50 @@ class Context:
|
|||
messages are eventually sent if possible.
|
||||
|
||||
'''
|
||||
cid = self.cid
|
||||
chan = self.chan
|
||||
uid = chan.uid
|
||||
cid: str = self.cid
|
||||
chan: Channel = self.chan
|
||||
from_uid: tuple[str, str] = chan.uid
|
||||
send_chan: trio.MemorySendChannel = self._send_chan
|
||||
|
||||
log.runtime(
|
||||
f"Delivering {msg} from {uid} to caller {cid}"
|
||||
)
|
||||
|
||||
if (
|
||||
msg.get('error') # check for field
|
||||
and (
|
||||
error := unpack_error(
|
||||
if re := unpack_error(
|
||||
msg,
|
||||
self.chan,
|
||||
)
|
||||
)
|
||||
):
|
||||
log.error(
|
||||
f'Delivering error-msg from {from_uid} to caller {cid}'
|
||||
f'{re}'
|
||||
)
|
||||
self._cancel_msg = msg
|
||||
self._maybe_cancel_and_set_remote_error(error)
|
||||
self._maybe_cancel_and_set_remote_error(re)
|
||||
|
||||
if (
|
||||
self._in_overrun
|
||||
):
|
||||
# XXX NEVER do this XXX..!!
|
||||
# bc if the error is a ctxc and there is a task
|
||||
# waiting on `.result()` we need the msg to be sent
|
||||
# over the `send_chan`/`._recv_chan` so that the error
|
||||
# is relayed to that waiter task..
|
||||
# return True
|
||||
#
|
||||
# XXX ALSO NO!! XXX
|
||||
# if self._remote_error:
|
||||
# self._maybe_raise_remote_err(error)
|
||||
|
||||
if self._in_overrun:
|
||||
log.warning(
|
||||
f'Capturing overrun-msg from {from_uid} to caller {cid}'
|
||||
f'{msg}'
|
||||
)
|
||||
self._overflow_q.append(msg)
|
||||
return False
|
||||
|
||||
try:
|
||||
log.runtime(
|
||||
f'Delivering IPC `Context` msg:\n'
|
||||
f'<= {from_uid}\n'
|
||||
f'=> caller: {cid}\n'
|
||||
f'{msg}'
|
||||
)
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
send_chan.send_nowait(msg)
|
||||
return True
|
||||
# if an error is deteced we should always
|
||||
|
@ -890,7 +1265,8 @@ class Context:
|
|||
lines = [
|
||||
f'OVERRUN on actor-task context {cid}@{local_uid}!\n'
|
||||
# TODO: put remote task name here if possible?
|
||||
f'remote sender actor: {uid}',
|
||||
f'sender: {from_uid}',
|
||||
f'msg: {msg}',
|
||||
# TODO: put task func name here and maybe an arrow
|
||||
# from sender to overrunner?
|
||||
# f'local task {self.func_name}'
|
||||
|
@ -926,11 +1302,19 @@ class Context:
|
|||
# anything different.
|
||||
return False
|
||||
else:
|
||||
# raise local overrun and immediately pack as IPC
|
||||
# msg for far end.
|
||||
try:
|
||||
raise StreamOverrun(text)
|
||||
raise StreamOverrun(
|
||||
text,
|
||||
sender=from_uid,
|
||||
)
|
||||
except StreamOverrun as err:
|
||||
err_msg = pack_error(err)
|
||||
err_msg['cid'] = cid
|
||||
err_msg: dict[str, dict] = pack_error(
|
||||
err,
|
||||
cid=cid,
|
||||
)
|
||||
# err_msg['cid']: str = cid
|
||||
try:
|
||||
await chan.send(err_msg)
|
||||
except trio.BrokenResourceError:
|
||||
|
|
|
@ -164,12 +164,14 @@ class MessagingError(Exception):
|
|||
def pack_error(
|
||||
exc: BaseException,
|
||||
tb: str|None = None,
|
||||
cid: str|None = None,
|
||||
|
||||
) -> dict[str, dict]:
|
||||
'''
|
||||
Create an "error message" encoded for wire transport via an IPC
|
||||
`Channel`; expected to be unpacked on the receiver side using
|
||||
`unpack_error()` below.
|
||||
Create an "error message" which boxes a locally caught
|
||||
exception's meta-data and encodes it for wire transport via an
|
||||
IPC `Channel`; expected to be unpacked (and thus unboxed) on
|
||||
the receiver side using `unpack_error()` below.
|
||||
|
||||
'''
|
||||
if tb:
|
||||
|
@ -197,7 +199,12 @@ def pack_error(
|
|||
):
|
||||
error_msg.update(exc.msgdata)
|
||||
|
||||
return {'error': error_msg}
|
||||
|
||||
pkt: dict = {'error': error_msg}
|
||||
if cid:
|
||||
pkt['cid'] = cid
|
||||
|
||||
return pkt
|
||||
|
||||
|
||||
def unpack_error(
|
||||
|
@ -358,8 +365,7 @@ def _raise_from_no_key_in_msg(
|
|||
# is activated above.
|
||||
_type: str = 'Stream' if stream else 'Context'
|
||||
raise MessagingError(
|
||||
f'{_type} was expecting a `{expect_key}` message'
|
||||
' BUT received a non-`error` msg:\n'
|
||||
f'cid: {cid}\n'
|
||||
'{pformat(msg)}'
|
||||
f"{_type} was expecting a '{expect_key}' message"
|
||||
" BUT received a non-error msg:\n"
|
||||
f'{pformat(msg)}'
|
||||
) from src_err
|
||||
|
|
|
@ -19,13 +19,14 @@ Inter-process comms abstractions
|
|||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import platform
|
||||
import struct
|
||||
import typing
|
||||
import platform
|
||||
from pprint import pformat
|
||||
from collections.abc import (
|
||||
AsyncGenerator,
|
||||
AsyncIterator,
|
||||
)
|
||||
import typing
|
||||
from typing import (
|
||||
Any,
|
||||
runtime_checkable,
|
||||
|
@ -370,7 +371,10 @@ class Channel:
|
|||
|
||||
async def send(self, item: Any) -> None:
|
||||
|
||||
log.transport(f"send `{item}`") # type: ignore
|
||||
log.transport(
|
||||
'=> send IPC msg:\n\n'
|
||||
f'{pformat(item)}\n'
|
||||
) # type: ignore
|
||||
assert self.msgstream
|
||||
|
||||
await self.msgstream.send(item)
|
||||
|
|
|
@ -39,7 +39,15 @@ import trio
|
|||
from async_generator import asynccontextmanager
|
||||
|
||||
from .trionics import maybe_open_nursery
|
||||
from ._state import current_actor
|
||||
from .devx import (
|
||||
# acquire_debug_lock,
|
||||
# pause,
|
||||
maybe_wait_for_debugger,
|
||||
)
|
||||
from ._state import (
|
||||
current_actor,
|
||||
debug_mode,
|
||||
)
|
||||
from ._ipc import Channel
|
||||
from .log import get_logger
|
||||
from .msg import NamespacePath
|
||||
|
@ -48,6 +56,7 @@ from ._exceptions import (
|
|||
unpack_error,
|
||||
NoResult,
|
||||
ContextCancelled,
|
||||
RemoteActorError,
|
||||
)
|
||||
from ._context import (
|
||||
Context,
|
||||
|
@ -468,7 +477,6 @@ class Portal:
|
|||
ctx._started_called: bool = True
|
||||
|
||||
except KeyError as src_error:
|
||||
|
||||
_raise_from_no_key_in_msg(
|
||||
ctx=ctx,
|
||||
msg=msg,
|
||||
|
@ -493,6 +501,33 @@ class Portal:
|
|||
# in enter tuple.
|
||||
yield ctx, first
|
||||
|
||||
# between the caller exiting and arriving here the
|
||||
# far end may have sent a ctxc-msg or other error,
|
||||
# so check for it here immediately and maybe raise
|
||||
# so as to engage the ctxc handling block below!
|
||||
# if re := ctx._remote_error:
|
||||
# maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err(
|
||||
# re,
|
||||
|
||||
# # TODO: do we want this to always raise?
|
||||
# # - means that on self-ctxc, if/when the
|
||||
# # block is exited before the msg arrives
|
||||
# # but then the msg during __exit__
|
||||
# # calling we may not activate the
|
||||
# # ctxc-handler block below? should we
|
||||
# # be?
|
||||
# # - if there's a remote error that arrives
|
||||
# # after the child has exited, we won't
|
||||
# # handle until the `finally:` block
|
||||
# # where `.result()` is always called,
|
||||
# # again in which case we handle it
|
||||
# # differently then in the handler block
|
||||
# # that would normally engage from THIS
|
||||
# # block?
|
||||
# raise_ctxc_from_self_call=True,
|
||||
# )
|
||||
# assert maybe_ctxc
|
||||
|
||||
# when in allow_overruns mode there may be
|
||||
# lingering overflow sender tasks remaining?
|
||||
if nurse.child_tasks:
|
||||
|
@ -538,7 +573,7 @@ class Portal:
|
|||
# `.canceller: tuple[str, str]` to be same value as
|
||||
# caught here in a `ContextCancelled.canceller`.
|
||||
#
|
||||
# Again, there are 2 cases:
|
||||
# AGAIN to restate the above, there are 2 cases:
|
||||
#
|
||||
# 1-some other context opened in this `.open_context()`
|
||||
# block cancelled due to a self or peer cancellation
|
||||
|
@ -554,6 +589,16 @@ class Portal:
|
|||
except ContextCancelled as ctxc:
|
||||
scope_err = ctxc
|
||||
|
||||
# XXX TODO XXX: FIX THIS debug_mode BUGGGG!!!
|
||||
# using this code and then resuming the REPL will
|
||||
# cause a SIGINT-ignoring HANG!
|
||||
# -> prolly due to a stale debug lock entry..
|
||||
# -[ ] USE `.stackscope` to demonstrate that (possibly
|
||||
# documenting it as a definittive example of
|
||||
# debugging the tractor-runtime itself using it's
|
||||
# own `.devx.` tooling!
|
||||
# await pause()
|
||||
|
||||
# CASE 2: context was cancelled by local task calling
|
||||
# `.cancel()`, we don't raise and the exit block should
|
||||
# exit silently.
|
||||
|
@ -561,18 +606,23 @@ class Portal:
|
|||
ctx._cancel_called
|
||||
and (
|
||||
ctxc is ctx._remote_error
|
||||
or
|
||||
ctxc.canceller is self.canceller
|
||||
# ctxc.msgdata == ctx._remote_error.msgdata
|
||||
|
||||
# TODO: uhh `Portal.canceller` ain't a thangg
|
||||
# dawg? (was `self.canceller` before?!?)
|
||||
and
|
||||
ctxc.canceller == self.actor.uid
|
||||
)
|
||||
):
|
||||
log.debug(
|
||||
f'Context {ctx} cancelled gracefully with:\n'
|
||||
log.cancel(
|
||||
f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n'
|
||||
f'{ctxc}'
|
||||
)
|
||||
# CASE 1: this context was never cancelled via a local
|
||||
# task (tree) having called `Context.cancel()`, raise
|
||||
# the error since it was caused by someone else!
|
||||
else:
|
||||
# await pause()
|
||||
raise
|
||||
|
||||
# the above `._scope` can be cancelled due to:
|
||||
|
@ -601,8 +651,8 @@ class Portal:
|
|||
trio.Cancelled, # NOTE: NOT from inside the ctx._scope
|
||||
KeyboardInterrupt,
|
||||
|
||||
) as err:
|
||||
scope_err = err
|
||||
) as caller_err:
|
||||
scope_err = caller_err
|
||||
|
||||
# XXX: ALWAYS request the context to CANCEL ON any ERROR.
|
||||
# NOTE: `Context.cancel()` is conversely NEVER CALLED in
|
||||
|
@ -610,11 +660,26 @@ class Portal:
|
|||
# handled in the block above!
|
||||
log.cancel(
|
||||
'Context cancelled for task due to\n'
|
||||
f'{err}\n'
|
||||
f'{caller_err}\n'
|
||||
'Sending cancel request..\n'
|
||||
f'task:{cid}\n'
|
||||
f'actor:{uid}'
|
||||
)
|
||||
|
||||
if debug_mode():
|
||||
log.pdb(
|
||||
'Delaying `ctx.cancel()` until debug lock '
|
||||
'acquired..'
|
||||
)
|
||||
# async with acquire_debug_lock(self.actor.uid):
|
||||
# pass
|
||||
# TODO: factor ^ into below for non-root cases?
|
||||
await maybe_wait_for_debugger()
|
||||
log.pdb(
|
||||
'Acquired debug lock! '
|
||||
'Calling `ctx.cancel()`!'
|
||||
)
|
||||
|
||||
try:
|
||||
await ctx.cancel()
|
||||
except trio.BrokenResourceError:
|
||||
|
@ -628,6 +693,33 @@ class Portal:
|
|||
|
||||
# no local scope error, the "clean exit with a result" case.
|
||||
else:
|
||||
# between the caller exiting and arriving here the
|
||||
# far end may have sent a ctxc-msg or other error,
|
||||
# so check for it here immediately and maybe raise
|
||||
# so as to engage the ctxc handling block below!
|
||||
# if re := ctx._remote_error:
|
||||
# maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err(
|
||||
# re,
|
||||
|
||||
# # TODO: do we want this to always raise?
|
||||
# # - means that on self-ctxc, if/when the
|
||||
# # block is exited before the msg arrives
|
||||
# # but then the msg during __exit__
|
||||
# # calling we may not activate the
|
||||
# # ctxc-handler block below? should we
|
||||
# # be?
|
||||
# # - if there's a remote error that arrives
|
||||
# # after the child has exited, we won't
|
||||
# # handle until the `finally:` block
|
||||
# # where `.result()` is always called,
|
||||
# # again in which case we handle it
|
||||
# # differently then in the handler block
|
||||
# # that would normally engage from THIS
|
||||
# # block?
|
||||
# raise_ctxc_from_self_call=True,
|
||||
# )
|
||||
# assert maybe_ctxc
|
||||
|
||||
if ctx.chan.connected():
|
||||
log.info(
|
||||
'Waiting on final context-task result for\n'
|
||||
|
@ -644,13 +736,8 @@ class Portal:
|
|||
# As per `Context._deliver_msg()`, that error IS
|
||||
# ALWAYS SET any time "callee" side fails and causes "caller
|
||||
# side" cancellation via a `ContextCancelled` here.
|
||||
# result = await ctx.result()
|
||||
try:
|
||||
result = await ctx.result()
|
||||
log.runtime(
|
||||
f'Context {fn_name} returned value from callee:\n'
|
||||
f'`{result}`'
|
||||
)
|
||||
result_or_err: Exception|Any = await ctx.result()
|
||||
except BaseException as berr:
|
||||
# on normal teardown, if we get some error
|
||||
# raised in `Context.result()` we still want to
|
||||
|
@ -662,7 +749,48 @@ class Portal:
|
|||
scope_err = berr
|
||||
raise
|
||||
|
||||
# an exception type boxed in a `RemoteActorError`
|
||||
# is returned (meaning it was obvi not raised).
|
||||
msgdata: str|None = getattr(
|
||||
result_or_err,
|
||||
'msgdata',
|
||||
None
|
||||
)
|
||||
# yes! this worx Bp
|
||||
# from .devx import _debug
|
||||
# await _debug.pause()
|
||||
match (msgdata, result_or_err):
|
||||
case (
|
||||
{'tb_str': tbstr},
|
||||
ContextCancelled(),
|
||||
):
|
||||
log.cancel(tbstr)
|
||||
|
||||
case (
|
||||
{'tb_str': tbstr},
|
||||
RemoteActorError(),
|
||||
):
|
||||
log.exception(
|
||||
f'Context `{fn_name}` remotely errored:\n'
|
||||
f'`{tbstr}`'
|
||||
)
|
||||
case (None, _):
|
||||
log.runtime(
|
||||
f'Context {fn_name} returned value from callee:\n'
|
||||
f'`{result_or_err}`'
|
||||
)
|
||||
|
||||
finally:
|
||||
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
||||
# wait for any immediate child in debug before popping the
|
||||
# context from the runtime msg loop otherwise inside
|
||||
# ``Actor._push_result()`` the msg will be discarded and in
|
||||
# the case where that msg is global debugger unlock (via
|
||||
# a "stop" msg for a stream), this can result in a deadlock
|
||||
# where the root is waiting on the lock to clear but the
|
||||
# child has already cleared it and clobbered IPC.
|
||||
await maybe_wait_for_debugger()
|
||||
|
||||
# though it should be impossible for any tasks
|
||||
# operating *in* this scope to have survived
|
||||
# we tear down the runtime feeder chan last
|
||||
|
@ -707,6 +835,10 @@ class Portal:
|
|||
# out any exception group or legit (remote) ctx
|
||||
# error that sourced from the remote task or its
|
||||
# runtime.
|
||||
#
|
||||
# NOTE: further, this should be the only place the
|
||||
# underlying feeder channel is
|
||||
# once-and-only-CLOSED!
|
||||
with trio.CancelScope(shield=True):
|
||||
await ctx._recv_chan.aclose()
|
||||
|
||||
|
@ -747,6 +879,9 @@ class Portal:
|
|||
|
||||
# FINALLY, remove the context from runtime tracking and
|
||||
# exit!
|
||||
log.runtime(
|
||||
f'Exiting context opened with {ctx.chan.uid}'
|
||||
)
|
||||
self.actor._contexts.pop(
|
||||
(self.channel.uid, ctx.cid),
|
||||
None,
|
||||
|
|
|
@ -28,6 +28,7 @@ from itertools import chain
|
|||
import importlib
|
||||
import importlib.util
|
||||
import inspect
|
||||
from pprint import pformat
|
||||
import signal
|
||||
import sys
|
||||
from typing import (
|
||||
|
@ -48,6 +49,10 @@ import trio
|
|||
from trio import (
|
||||
CancelScope,
|
||||
)
|
||||
from trio.lowlevel import (
|
||||
current_task,
|
||||
Task,
|
||||
)
|
||||
from trio_typing import (
|
||||
Nursery,
|
||||
TaskStatus,
|
||||
|
@ -80,6 +85,26 @@ if TYPE_CHECKING:
|
|||
|
||||
log = get_logger('tractor')
|
||||
|
||||
_gb_mod: ModuleType|None|False = None
|
||||
|
||||
|
||||
async def maybe_import_gb():
|
||||
global _gb_mod
|
||||
if _gb_mod is False:
|
||||
return
|
||||
|
||||
try:
|
||||
import greenback
|
||||
_gb_mod = greenback
|
||||
await greenback.ensure_portal()
|
||||
|
||||
except ModuleNotFoundError:
|
||||
log.warning(
|
||||
'`greenback` is not installed.\n'
|
||||
'No sync debug support!'
|
||||
)
|
||||
_gb_mod = False
|
||||
|
||||
|
||||
async def _invoke(
|
||||
|
||||
|
@ -227,15 +252,27 @@ async def _invoke(
|
|||
# wrapper that calls `Context.started()` and then does
|
||||
# the `await coro()`?
|
||||
elif context:
|
||||
# context func with support for bi-dir streaming
|
||||
await chan.send({'functype': 'context', 'cid': cid})
|
||||
|
||||
# a "context" endpoint type is the most general and
|
||||
# "least sugary" type of RPC ep with support for
|
||||
# bi-dir streaming B)
|
||||
await chan.send({
|
||||
'functype': 'context',
|
||||
'cid': cid
|
||||
})
|
||||
|
||||
try:
|
||||
async with trio.open_nursery() as nurse:
|
||||
ctx._scope_nursery = nurse
|
||||
ctx._scope = nurse.cancel_scope
|
||||
task_status.started(ctx)
|
||||
|
||||
# TODO: should would be nice to have our
|
||||
# `TaskMngr` nursery here!
|
||||
# res: Any = await coro
|
||||
res = await coro
|
||||
|
||||
# deliver final result to caller side.
|
||||
await chan.send({
|
||||
'return': res,
|
||||
'cid': cid
|
||||
|
@ -271,9 +308,10 @@ async def _invoke(
|
|||
# associated child isn't in debug any more
|
||||
await _debug.maybe_wait_for_debugger()
|
||||
ctx: Context = actor._contexts.pop((chan.uid, cid))
|
||||
log.runtime(
|
||||
f'Context entrypoint {func} was terminated:\n'
|
||||
f'{ctx}'
|
||||
log.cancel(
|
||||
f'Context task was terminated:\n'
|
||||
f'func: {func}\n'
|
||||
f'ctx: {pformat(ctx)}'
|
||||
)
|
||||
|
||||
if ctx.cancelled_caught:
|
||||
|
@ -285,13 +323,14 @@ async def _invoke(
|
|||
if re := ctx._remote_error:
|
||||
ctx._maybe_raise_remote_err(re)
|
||||
|
||||
fname: str = func.__name__
|
||||
# fname: str = func.__name__
|
||||
task: Task = current_task()
|
||||
cs: CancelScope = ctx._scope
|
||||
if cs.cancel_called:
|
||||
our_uid: tuple = actor.uid
|
||||
canceller: tuple = ctx.canceller
|
||||
msg: str = (
|
||||
f'`{fname}()`@{our_uid} cancelled by '
|
||||
'actor was cancelled by '
|
||||
)
|
||||
|
||||
# NOTE / TODO: if we end up having
|
||||
|
@ -310,16 +349,37 @@ async def _invoke(
|
|||
# some actor who calls `Portal.cancel_actor()`
|
||||
# and by side-effect cancels this ctx.
|
||||
elif canceller == ctx.chan.uid:
|
||||
msg += f'its caller {canceller} '
|
||||
msg += 'its caller'
|
||||
|
||||
else:
|
||||
msg += f'remote actor {canceller}'
|
||||
msg += 'a remote peer'
|
||||
|
||||
div_chars: str = '------ - ------'
|
||||
div_offset: int = (
|
||||
round(len(msg)/2)+1
|
||||
+
|
||||
round(len(div_chars)/2)+1
|
||||
)
|
||||
div_str: str = (
|
||||
'\n'
|
||||
+
|
||||
' '*div_offset
|
||||
+
|
||||
f'{div_chars}\n'
|
||||
)
|
||||
msg += (
|
||||
div_str +
|
||||
f'<= canceller: {canceller}\n'
|
||||
f'=> uid: {our_uid}\n'
|
||||
f' |_ task: `{task.name}()`'
|
||||
)
|
||||
|
||||
# TODO: does this ever get set any more or can
|
||||
# we remove it?
|
||||
if ctx._cancel_msg:
|
||||
msg += (
|
||||
' with msg:\n'
|
||||
'------ - ------\n'
|
||||
'IPC msg:\n'
|
||||
f'{ctx._cancel_msg}'
|
||||
)
|
||||
|
||||
|
@ -439,9 +499,9 @@ async def _invoke(
|
|||
# always ship errors back to caller
|
||||
err_msg: dict[str, dict] = pack_error(
|
||||
err,
|
||||
tb=tb,
|
||||
# tb=tb, # TODO: special tb fmting?
|
||||
cid=cid,
|
||||
)
|
||||
err_msg['cid'] = cid
|
||||
|
||||
if is_rpc:
|
||||
try:
|
||||
|
@ -508,19 +568,28 @@ async def try_ship_error_to_parent(
|
|||
err: Union[Exception, BaseExceptionGroup],
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Box, pack and encode a local runtime(-internal) exception for
|
||||
an IPC channel `.send()` with transport/network failures and
|
||||
local cancellation ignored but logged as critical(ly bad).
|
||||
|
||||
'''
|
||||
with CancelScope(shield=True):
|
||||
try:
|
||||
# internal error so ship to parent without cid
|
||||
await channel.send(pack_error(err))
|
||||
await channel.send(
|
||||
# NOTE: normally only used for internal runtime errors
|
||||
# so ship to peer actor without a cid.
|
||||
pack_error(err)
|
||||
)
|
||||
except (
|
||||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
):
|
||||
# in SC terms this is one of the worst things that can
|
||||
# happen and creates the 2-general's dilemma.
|
||||
# happen and provides for a 2-general's dilemma..
|
||||
log.critical(
|
||||
f"Failed to ship error to parent "
|
||||
f"{channel.uid}, channel was closed"
|
||||
f'Failed to ship error to parent '
|
||||
f'{channel.uid}, IPC transport failure!'
|
||||
)
|
||||
|
||||
|
||||
|
@ -573,6 +642,11 @@ class Actor:
|
|||
# if started on ``asycio`` running ``trio`` in guest mode
|
||||
_infected_aio: bool = False
|
||||
|
||||
# _ans: dict[
|
||||
# tuple[str, str],
|
||||
# list[ActorNursery],
|
||||
# ] = {}
|
||||
|
||||
# Process-global stack closed at end on actor runtime teardown.
|
||||
# NOTE: this is currently an undocumented public api.
|
||||
lifetime_stack: ExitStack = ExitStack()
|
||||
|
@ -593,7 +667,10 @@ class Actor:
|
|||
|
||||
'''
|
||||
self.name = name
|
||||
self.uid = (name, uid or str(uuid.uuid4()))
|
||||
self.uid = (
|
||||
name,
|
||||
uid or str(uuid.uuid4())
|
||||
)
|
||||
|
||||
self._cancel_complete = trio.Event()
|
||||
self._cancel_called_by_remote: tuple[str, tuple] | None = None
|
||||
|
@ -762,7 +839,10 @@ class Actor:
|
|||
return
|
||||
|
||||
# channel tracking
|
||||
event = self._peer_connected.pop(uid, None)
|
||||
event: trio.Event|None = self._peer_connected.pop(
|
||||
uid,
|
||||
None,
|
||||
)
|
||||
if event:
|
||||
# Instructing connection: this is likely a new channel to
|
||||
# a recently spawned actor which we'd like to control via
|
||||
|
@ -771,46 +851,43 @@ class Actor:
|
|||
# Alert any task waiting on this connection to come up
|
||||
event.set()
|
||||
|
||||
chans = self._peers[uid]
|
||||
|
||||
# TODO: re-use channels for new connections instead
|
||||
# of always new ones; will require changing all the
|
||||
# discovery funcs
|
||||
chans: list[Channel] = self._peers[uid]
|
||||
if chans:
|
||||
# TODO: re-use channels for new connections instead
|
||||
# of always new ones?
|
||||
# => will require changing all the discovery funcs..
|
||||
log.runtime(
|
||||
f"already have channel(s) for {uid}:{chans}?"
|
||||
)
|
||||
|
||||
log.runtime(f"Registered {chan} for {uid}") # type: ignore
|
||||
# append new channel
|
||||
log.runtime(f"Registered {chan} for {uid}") # type: ignore
|
||||
# TODO: can we just use list-ref directly?
|
||||
# chans.append(chan)
|
||||
self._peers[uid].append(chan)
|
||||
|
||||
local_nursery: ActorNursery | None = None # noqa
|
||||
disconnected: bool = False
|
||||
|
||||
# Begin channel management - respond to remote requests and
|
||||
# process received reponses.
|
||||
disconnected: bool = False
|
||||
try:
|
||||
disconnected = await process_messages(self, chan)
|
||||
|
||||
except (
|
||||
trio.Cancelled,
|
||||
):
|
||||
log.cancel(f"Msg loop was cancelled for {chan}")
|
||||
disconnected: bool = await process_messages(self, chan)
|
||||
except trio.Cancelled:
|
||||
log.cancel(f'Msg loop was cancelled for {chan}')
|
||||
raise
|
||||
|
||||
finally:
|
||||
local_nursery = self._actoruid2nursery.get(uid, local_nursery)
|
||||
local_nursery: (
|
||||
ActorNursery|None
|
||||
) = self._actoruid2nursery.get(uid)
|
||||
|
||||
# This is set in ``Portal.cancel_actor()``. So if
|
||||
# the peer was cancelled we try to wait for them
|
||||
# to tear down their side of the connection before
|
||||
# moving on with closing our own side.
|
||||
if (
|
||||
local_nursery
|
||||
):
|
||||
if local_nursery:
|
||||
if chan._cancel_called:
|
||||
log.cancel(f"Waiting on cancel request to peer {chan.uid}")
|
||||
log.cancel(f'Waiting on cancel request to peer {chan.uid}')
|
||||
|
||||
# XXX: this is a soft wait on the channel (and its
|
||||
# underlying transport protocol) to close from the
|
||||
# remote peer side since we presume that any channel
|
||||
|
@ -853,26 +930,48 @@ class Actor:
|
|||
# the cause of other downstream errors.
|
||||
entry = local_nursery._children.get(uid)
|
||||
if entry:
|
||||
proc: trio.Process
|
||||
_, proc, _ = entry
|
||||
|
||||
poll = getattr(proc, 'poll', None)
|
||||
if poll and poll() is None:
|
||||
log.cancel(
|
||||
f'Actor {uid} IPC broke but proc is alive?'
|
||||
f'Peer actor IPC broke but proc is alive?\n'
|
||||
f'uid: {uid}\n'
|
||||
f'|_{proc}\n'
|
||||
)
|
||||
|
||||
# ``Channel`` teardown and closure sequence
|
||||
|
||||
# Drop ref to channel so it can be gc-ed and disconnected
|
||||
log.runtime(f"Releasing channel {chan} from {chan.uid}")
|
||||
log.runtime(
|
||||
f'Disconnected IPC channel:\n'
|
||||
f'uid: {chan.uid}\n'
|
||||
f'|_{pformat(chan)}\n'
|
||||
)
|
||||
chans = self._peers.get(chan.uid)
|
||||
chans.remove(chan)
|
||||
|
||||
if not chans:
|
||||
log.runtime(f"No more channels for {chan.uid}")
|
||||
log.runtime(
|
||||
f'No more channels with {chan.uid}'
|
||||
)
|
||||
self._peers.pop(uid, None)
|
||||
|
||||
log.runtime(f"Peers is {self._peers}")
|
||||
peers_str: str = ''
|
||||
for uid, chans in self._peers.items():
|
||||
peers_str += (
|
||||
f'- uid: {uid}\n'
|
||||
)
|
||||
for i, chan in enumerate(chans):
|
||||
peers_str += (
|
||||
f' |_[{i}] {pformat(chan)}\n'
|
||||
)
|
||||
|
||||
log.runtime(
|
||||
f'Remaining IPC {len(self._peers)} peers:\n'
|
||||
+ peers_str
|
||||
)
|
||||
|
||||
# No more channels to other actors (at all) registered
|
||||
# as connected.
|
||||
|
@ -888,15 +987,58 @@ class Actor:
|
|||
if _state.is_root_process():
|
||||
pdb_lock = _debug.Lock
|
||||
pdb_lock._blocked.add(uid)
|
||||
log.runtime(f"{uid} blocked from pdb locking")
|
||||
|
||||
# TODO: NEEEDS TO BE TESTED!
|
||||
# actually, no idea if this ever even enters.. XD
|
||||
pdb_user_uid: tuple = pdb_lock.global_actor_in_debug
|
||||
if (
|
||||
pdb_user_uid
|
||||
and local_nursery
|
||||
):
|
||||
entry: tuple|None = local_nursery._children.get(pdb_user_uid)
|
||||
if entry:
|
||||
proc: trio.Process
|
||||
_, proc, _ = entry
|
||||
|
||||
if (
|
||||
(poll := getattr(proc, 'poll', None))
|
||||
and poll() is None
|
||||
):
|
||||
log.cancel(
|
||||
'Root actor reports no-more-peers, BUT '
|
||||
'a DISCONNECTED child still has the debug '
|
||||
'lock!\n'
|
||||
f'root uid: {self.uid}\n'
|
||||
f'last disconnected child uid: {uid}\n'
|
||||
f'locking child uid: {pdb_user_uid}\n'
|
||||
)
|
||||
await _debug.maybe_wait_for_debugger(
|
||||
child_in_debug=True
|
||||
)
|
||||
|
||||
# TODO: just bc a child's transport dropped
|
||||
# doesn't mean it's not still using the pdb
|
||||
# REPL! so,
|
||||
# -[ ] ideally we can check out child proc
|
||||
# tree to ensure that its alive (and
|
||||
# actually using the REPL) before we cancel
|
||||
# it's lock acquire by doing the below!
|
||||
# -[ ] create a way to read the tree of each actor's
|
||||
# grandchildren such that when an
|
||||
# intermediary parent is cancelled but their
|
||||
# child has locked the tty, the grandparent
|
||||
# will not allow the parent to cancel or
|
||||
# zombie reap the child! see open issue:
|
||||
# - https://github.com/goodboy/tractor/issues/320
|
||||
# ------ - ------
|
||||
# if a now stale local task has the TTY lock still
|
||||
# we cancel it to allow servicing other requests for
|
||||
# the lock.
|
||||
db_cs = pdb_lock._root_local_task_cs_in_debug
|
||||
db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug
|
||||
if (
|
||||
db_cs
|
||||
and not db_cs.cancel_called
|
||||
and uid == pdb_user_uid
|
||||
):
|
||||
log.warning(
|
||||
f'STALE DEBUG LOCK DETECTED FOR {uid}'
|
||||
|
@ -905,13 +1047,22 @@ class Actor:
|
|||
db_cs.cancel()
|
||||
|
||||
# XXX: is this necessary (GC should do it)?
|
||||
if chan.connected():
|
||||
# XXX WARNING XXX
|
||||
# Be AWARE OF THE INDENT LEVEL HERE
|
||||
# -> ONLY ENTER THIS BLOCK WHEN ._peers IS
|
||||
# EMPTY!!!!
|
||||
if (
|
||||
not self._peers
|
||||
and chan.connected()
|
||||
):
|
||||
# if the channel is still connected it may mean the far
|
||||
# end has not closed and we may have gotten here due to
|
||||
# an error and so we should at least try to terminate
|
||||
# the channel from this end gracefully.
|
||||
|
||||
log.runtime(f"Disconnecting channel {chan}")
|
||||
log.runtime(
|
||||
'Terminating channel with `None` setinel msg\n'
|
||||
f'|_{chan}\n'
|
||||
)
|
||||
try:
|
||||
# send a msg loop terminate sentinel
|
||||
await chan.send(None)
|
||||
|
@ -928,15 +1079,16 @@ class Actor:
|
|||
chan: Channel,
|
||||
cid: str,
|
||||
msg: dict[str, Any],
|
||||
) -> None:
|
||||
|
||||
) -> None|bool:
|
||||
'''
|
||||
Push an RPC result to the local consumer's queue.
|
||||
|
||||
'''
|
||||
uid = chan.uid
|
||||
uid: tuple[str, str] = chan.uid
|
||||
assert uid, f"`chan.uid` can't be {uid}"
|
||||
try:
|
||||
ctx = self._contexts[(uid, cid)]
|
||||
ctx: Context = self._contexts[(uid, cid)]
|
||||
except KeyError:
|
||||
log.warning(
|
||||
f'Ignoring msg from [no-longer/un]known context {uid}:'
|
||||
|
@ -1066,6 +1218,16 @@ class Actor:
|
|||
parent_data.pop('bind_port'),
|
||||
)
|
||||
rvs = parent_data.pop('_runtime_vars')
|
||||
|
||||
if rvs['_debug_mode']:
|
||||
try:
|
||||
from .devx import enable_stack_on_sig
|
||||
enable_stack_on_sig()
|
||||
except ImportError:
|
||||
log.warning(
|
||||
'`stackscope` not installed for use in debug mode!'
|
||||
)
|
||||
|
||||
log.runtime(f"Runtime vars are: {rvs}")
|
||||
rvs['_is_root'] = False
|
||||
_state._runtime_vars.update(rvs)
|
||||
|
@ -1284,9 +1446,15 @@ class Actor:
|
|||
'''
|
||||
tasks: dict = self._rpc_tasks
|
||||
if tasks:
|
||||
tasks_str: str = ''
|
||||
for (ctx, func, _) in tasks.values():
|
||||
tasks_str += (
|
||||
f' |_{func.__name__}() [cid={ctx.cid[-6:]}..]\n'
|
||||
)
|
||||
|
||||
log.cancel(
|
||||
f'Cancelling all {len(tasks)} rpc tasks:\n'
|
||||
f'{tasks}'
|
||||
f'{tasks_str}'
|
||||
)
|
||||
for (
|
||||
(chan, cid),
|
||||
|
@ -1511,7 +1679,10 @@ async def async_main(
|
|||
)
|
||||
|
||||
if actor._parent_chan:
|
||||
await try_ship_error_to_parent(actor._parent_chan, err)
|
||||
await try_ship_error_to_parent(
|
||||
actor._parent_chan,
|
||||
err,
|
||||
)
|
||||
|
||||
# always!
|
||||
match err:
|
||||
|
@ -1595,43 +1766,53 @@ async def process_messages(
|
|||
or boxed errors back to the remote caller (task).
|
||||
|
||||
'''
|
||||
# TODO: once https://github.com/python-trio/trio/issues/467 gets
|
||||
# worked out we'll likely want to use that!
|
||||
msg: dict | None = None
|
||||
# TODO: once `trio` get's an "obvious way" for req/resp we
|
||||
# should use it?
|
||||
# https://github.com/python-trio/trio/issues/467
|
||||
log.runtime(
|
||||
'Entering IPC msg loop:\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'|_{chan}'
|
||||
)
|
||||
nursery_cancelled_before_task: bool = False
|
||||
|
||||
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
|
||||
msg: dict | None = None
|
||||
try:
|
||||
# NOTE: this internal scope allows for keeping this
|
||||
# message loop running despite the current task having
|
||||
# been cancelled (eg. `open_portal()` may call this method
|
||||
# from a locally spawned task) and recieve this scope
|
||||
# using ``scope = Nursery.start()``
|
||||
with CancelScope(shield=shield) as loop_cs:
|
||||
# this internal scope allows for keeping this message
|
||||
# loop running despite the current task having been
|
||||
# cancelled (eg. `open_portal()` may call this method from
|
||||
# a locally spawned task) and recieve this scope using
|
||||
# ``scope = Nursery.start()``
|
||||
task_status.started(loop_cs)
|
||||
async for msg in chan:
|
||||
|
||||
if msg is None: # loop terminate sentinel
|
||||
# dedicated loop terminate sentinel
|
||||
if msg is None:
|
||||
|
||||
tasks: dict[
|
||||
tuple[Channel, str],
|
||||
tuple[Context, Callable, trio.Event]
|
||||
] = actor._rpc_tasks.copy()
|
||||
log.cancel(
|
||||
f"Channel to {chan.uid} terminated?\n"
|
||||
"Cancelling all associated tasks..")
|
||||
|
||||
for (channel, cid) in actor._rpc_tasks.copy():
|
||||
f'Peer IPC channel terminated via `None` setinel msg?\n'
|
||||
f'=> Cancelling all {len(tasks)} local RPC tasks..\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'|_{chan}\n'
|
||||
)
|
||||
for (channel, cid) in tasks:
|
||||
if channel is chan:
|
||||
await actor._cancel_task(
|
||||
cid,
|
||||
channel,
|
||||
)
|
||||
|
||||
log.runtime(
|
||||
f"Msg loop signalled to terminate for"
|
||||
f" {chan} from {chan.uid}")
|
||||
|
||||
break
|
||||
|
||||
log.transport( # type: ignore
|
||||
f"Received msg {msg} from {chan.uid}")
|
||||
f'<= IPC msg from peer: {chan.uid}\n\n'
|
||||
# TODO: conditionally avoid fmting depending
|
||||
# on log level (for perf)?
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
|
||||
cid = msg.get('cid')
|
||||
if cid:
|
||||
|
@ -1640,7 +1821,10 @@ async def process_messages(
|
|||
await actor._push_result(chan, cid, msg)
|
||||
|
||||
log.runtime(
|
||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||
f'Waiting on next IPC msg from {chan.uid}:\n'
|
||||
# f'last msg: {msg}\n'
|
||||
f'|_{chan}'
|
||||
)
|
||||
continue
|
||||
|
||||
# TODO: implement with ``match:`` syntax?
|
||||
|
@ -1693,7 +1877,7 @@ async def process_messages(
|
|||
)
|
||||
|
||||
log.cancel(
|
||||
f'Cancelling msg loop for {chan.uid}'
|
||||
f'Cancelling IPC msg-loop with {chan.uid}'
|
||||
)
|
||||
loop_cs.cancel()
|
||||
break
|
||||
|
@ -1735,8 +1919,10 @@ async def process_messages(
|
|||
try:
|
||||
func = actor._get_rpc_func(ns, funcname)
|
||||
except (ModuleNotExposed, AttributeError) as err:
|
||||
err_msg = pack_error(err)
|
||||
err_msg['cid'] = cid
|
||||
err_msg: dict[str, dict] = pack_error(
|
||||
err,
|
||||
cid=cid,
|
||||
)
|
||||
await chan.send(err_msg)
|
||||
continue
|
||||
|
||||
|
@ -1838,7 +2024,10 @@ async def process_messages(
|
|||
log.exception("Actor errored:")
|
||||
|
||||
if actor._parent_chan:
|
||||
await try_ship_error_to_parent(actor._parent_chan, err)
|
||||
await try_ship_error_to_parent(
|
||||
actor._parent_chan,
|
||||
err,
|
||||
)
|
||||
|
||||
# if this is the `MainProcess` we expect the error broadcasting
|
||||
# above to trigger an error at consuming portal "checkpoints"
|
||||
|
@ -1847,8 +2036,9 @@ async def process_messages(
|
|||
finally:
|
||||
# msg debugging for when he machinery is brokey
|
||||
log.runtime(
|
||||
f"Exiting msg loop for {chan} from {chan.uid} "
|
||||
f"with last msg:\n{msg}"
|
||||
f'Exiting IPC msg loop with {chan.uid} '
|
||||
f'final msg: {msg}\n'
|
||||
f'|_{chan}'
|
||||
)
|
||||
|
||||
# transport **was not** disconnected
|
||||
|
|
|
@ -144,7 +144,7 @@ async def exhaust_portal(
|
|||
|
||||
# XXX: streams should never be reaped here since they should
|
||||
# always be established and shutdown using a context manager api
|
||||
final = await portal.result()
|
||||
final: Any = await portal.result()
|
||||
|
||||
except (
|
||||
Exception,
|
||||
|
@ -152,13 +152,23 @@ async def exhaust_portal(
|
|||
) as err:
|
||||
# we reraise in the parent task via a ``BaseExceptionGroup``
|
||||
return err
|
||||
|
||||
except trio.Cancelled as err:
|
||||
# lol, of course we need this too ;P
|
||||
# TODO: merge with above?
|
||||
log.warning(f"Cancelled result waiter for {portal.actor.uid}")
|
||||
log.warning(
|
||||
'Cancelled portal result waiter task:\n'
|
||||
f'uid: {portal.channel.uid}\n'
|
||||
f'error: {err}\n'
|
||||
)
|
||||
return err
|
||||
|
||||
else:
|
||||
log.debug(f"Returning final result: {final}")
|
||||
log.debug(
|
||||
f'Returning final result from portal:\n'
|
||||
f'uid: {portal.channel.uid}\n'
|
||||
f'result: {final}\n'
|
||||
)
|
||||
return final
|
||||
|
||||
|
||||
|
@ -170,26 +180,34 @@ async def cancel_on_completion(
|
|||
|
||||
) -> None:
|
||||
'''
|
||||
Cancel actor gracefully once it's "main" portal's
|
||||
Cancel actor gracefully once its "main" portal's
|
||||
result arrives.
|
||||
|
||||
Should only be called for actors spawned with `run_in_actor()`.
|
||||
Should only be called for actors spawned via the
|
||||
`Portal.run_in_actor()` API.
|
||||
|
||||
=> and really this API will be deprecated and should be
|
||||
re-implemented as a `.hilevel.one_shot_task_nursery()`..)
|
||||
|
||||
'''
|
||||
# if this call errors we store the exception for later
|
||||
# in ``errors`` which will be reraised inside
|
||||
# an exception group and we still send out a cancel request
|
||||
result = await exhaust_portal(portal, actor)
|
||||
result: Any|Exception = await exhaust_portal(portal, actor)
|
||||
if isinstance(result, Exception):
|
||||
errors[actor.uid] = result
|
||||
errors[actor.uid]: Exception = result
|
||||
log.warning(
|
||||
f"Cancelling {portal.channel.uid} after error {result}"
|
||||
'Cancelling subactor due to error:\n'
|
||||
f'uid: {portal.channel.uid}\n'
|
||||
f'error: {result}\n'
|
||||
)
|
||||
|
||||
else:
|
||||
log.runtime(
|
||||
f"Cancelling {portal.channel.uid} gracefully "
|
||||
f"after result {result}")
|
||||
'Cancelling subactor gracefully:\n'
|
||||
f'uid: {portal.channel.uid}\n'
|
||||
f'result: {result}\n'
|
||||
)
|
||||
|
||||
# cancel the process now that we have a final result
|
||||
await portal.cancel_actor()
|
||||
|
@ -219,11 +237,14 @@ async def do_hard_kill(
|
|||
to be handled.
|
||||
|
||||
'''
|
||||
log.cancel(
|
||||
'Terminating sub-proc:\n'
|
||||
f'|_{proc}\n'
|
||||
)
|
||||
# NOTE: this timeout used to do nothing since we were shielding
|
||||
# the ``.wait()`` inside ``new_proc()`` which will pretty much
|
||||
# never release until the process exits, now it acts as
|
||||
# a hard-kill time ultimatum.
|
||||
log.debug(f"Terminating {proc}")
|
||||
with trio.move_on_after(terminate_after) as cs:
|
||||
|
||||
# NOTE: code below was copied verbatim from the now deprecated
|
||||
|
@ -260,7 +281,10 @@ async def do_hard_kill(
|
|||
# zombies (as a feature) we ask the OS to do send in the
|
||||
# removal swad as the last resort.
|
||||
if cs.cancelled_caught:
|
||||
log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}")
|
||||
log.critical(
|
||||
'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'
|
||||
f'|_{proc}\n'
|
||||
)
|
||||
proc.kill()
|
||||
|
||||
|
||||
|
@ -281,10 +305,16 @@ async def soft_wait(
|
|||
join/reap on an actor-runtime-in-process.
|
||||
|
||||
'''
|
||||
uid = portal.channel.uid
|
||||
uid: tuple[str, str] = portal.channel.uid
|
||||
try:
|
||||
log.cancel(f'Soft waiting on actor:\n{uid}')
|
||||
log.cancel(
|
||||
'Soft waiting on sub-actor proc:\n'
|
||||
f'uid: {uid}\n'
|
||||
f'|_{proc}\n'
|
||||
)
|
||||
# wait on sub-proc to signal termination
|
||||
await wait_func(proc)
|
||||
|
||||
except trio.Cancelled:
|
||||
# if cancelled during a soft wait, cancel the child
|
||||
# actor before entering the hard reap sequence
|
||||
|
@ -296,8 +326,8 @@ async def soft_wait(
|
|||
|
||||
async def cancel_on_proc_deth():
|
||||
'''
|
||||
Cancel the actor cancel request if we detect that
|
||||
that the process terminated.
|
||||
"Cancel the (actor) cancel" request if we detect
|
||||
that that the underlying sub-process terminated.
|
||||
|
||||
'''
|
||||
await wait_func(proc)
|
||||
|
@ -314,10 +344,10 @@ async def soft_wait(
|
|||
|
||||
if proc.poll() is None: # type: ignore
|
||||
log.warning(
|
||||
'Actor still alive after cancel request:\n'
|
||||
f'{uid}'
|
||||
'Subactor still alive after cancel request?\n\n'
|
||||
f'uid: {uid}\n'
|
||||
f'|_{proc}\n'
|
||||
)
|
||||
|
||||
n.cancel_scope.cancel()
|
||||
raise
|
||||
|
||||
|
@ -341,7 +371,7 @@ async def new_proc(
|
|||
) -> None:
|
||||
|
||||
# lookup backend spawning target
|
||||
target = _methods[_spawn_method]
|
||||
target: Callable = _methods[_spawn_method]
|
||||
|
||||
# mark the new actor with the global spawn method
|
||||
subactor._spawn_method = _spawn_method
|
||||
|
@ -492,8 +522,9 @@ async def trio_proc(
|
|||
# cancel result waiter that may have been spawned in
|
||||
# tandem if not done already
|
||||
log.cancel(
|
||||
"Cancelling existing result waiter task for "
|
||||
f"{subactor.uid}")
|
||||
'Cancelling existing result waiter task for '
|
||||
f'{subactor.uid}'
|
||||
)
|
||||
nursery.cancel_scope.cancel()
|
||||
|
||||
finally:
|
||||
|
@ -511,7 +542,16 @@ async def trio_proc(
|
|||
with trio.move_on_after(0.5):
|
||||
await proc.wait()
|
||||
|
||||
if is_root_process():
|
||||
log.pdb(
|
||||
'Delaying subproc reaper while debugger locked..'
|
||||
)
|
||||
await maybe_wait_for_debugger(
|
||||
child_in_debug=_runtime_vars.get(
|
||||
'_debug_mode', False
|
||||
),
|
||||
# TODO: need a diff value then default?
|
||||
# poll_steps=9999999,
|
||||
)
|
||||
# TODO: solve the following issue where we need
|
||||
# to do a similar wait like this but in an
|
||||
# "intermediary" parent actor that itself isn't
|
||||
|
@ -519,10 +559,18 @@ async def trio_proc(
|
|||
# to hold off on relaying SIGINT until that child
|
||||
# is complete.
|
||||
# https://github.com/goodboy/tractor/issues/320
|
||||
await maybe_wait_for_debugger(
|
||||
child_in_debug=_runtime_vars.get(
|
||||
'_debug_mode', False),
|
||||
)
|
||||
# -[ ] we need to handle non-root parent-actors specially
|
||||
# by somehow determining if a child is in debug and then
|
||||
# avoiding cancel/kill of said child by this
|
||||
# (intermediary) parent until such a time as the root says
|
||||
# the pdb lock is released and we are good to tear down
|
||||
# (our children)..
|
||||
#
|
||||
# -[ ] so maybe something like this where we try to
|
||||
# acquire the lock and get notified of who has it,
|
||||
# check that uid against our known children?
|
||||
# this_uid: tuple[str, str] = current_actor().uid
|
||||
# await acquire_debug_lock(this_uid)
|
||||
|
||||
if proc.poll() is None:
|
||||
log.cancel(f"Attempting to hard kill {proc}")
|
||||
|
|
|
@ -21,8 +21,9 @@ The machinery and types behind ``Context.open_stream()``
|
|||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
import inspect
|
||||
from contextlib import asynccontextmanager as acm
|
||||
import inspect
|
||||
from pprint import pformat
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
|
@ -35,6 +36,7 @@ import trio
|
|||
|
||||
from ._exceptions import (
|
||||
_raise_from_no_key_in_msg,
|
||||
ContextCancelled,
|
||||
)
|
||||
from .log import get_logger
|
||||
from .trionics import (
|
||||
|
@ -84,8 +86,8 @@ class MsgStream(trio.abc.Channel):
|
|||
self._broadcaster = _broadcaster
|
||||
|
||||
# flag to denote end of stream
|
||||
self._eoc: bool = False
|
||||
self._closed: bool = False
|
||||
self._eoc: bool|trio.EndOfChannel = False
|
||||
self._closed: bool|trio.ClosedResourceError = False
|
||||
|
||||
# delegate directly to underlying mem channel
|
||||
def receive_nowait(self):
|
||||
|
@ -93,6 +95,9 @@ class MsgStream(trio.abc.Channel):
|
|||
try:
|
||||
return msg['yield']
|
||||
except KeyError as kerr:
|
||||
# if 'return' in msg:
|
||||
# return msg
|
||||
|
||||
_raise_from_no_key_in_msg(
|
||||
ctx=self._ctx,
|
||||
msg=msg,
|
||||
|
@ -122,16 +127,26 @@ class MsgStream(trio.abc.Channel):
|
|||
# see ``.aclose()`` for notes on the old behaviour prior to
|
||||
# introducing this
|
||||
if self._eoc:
|
||||
raise trio.EndOfChannel
|
||||
raise self._eoc
|
||||
# raise trio.EndOfChannel
|
||||
|
||||
if self._closed:
|
||||
raise trio.ClosedResourceError('This stream was closed')
|
||||
raise self._closed
|
||||
# raise trio.ClosedResourceError(
|
||||
# 'This stream was already closed'
|
||||
# )
|
||||
|
||||
src_err: Exception|None = None
|
||||
try:
|
||||
try:
|
||||
msg = await self._rx_chan.receive()
|
||||
return msg['yield']
|
||||
|
||||
except KeyError as kerr:
|
||||
src_err = kerr
|
||||
|
||||
# NOTE: may raise any of the below error types
|
||||
# includg EoC when a 'stop' msg is found.
|
||||
_raise_from_no_key_in_msg(
|
||||
ctx=self._ctx,
|
||||
msg=msg,
|
||||
|
@ -141,11 +156,14 @@ class MsgStream(trio.abc.Channel):
|
|||
stream=self,
|
||||
)
|
||||
|
||||
except (
|
||||
trio.ClosedResourceError, # by self._rx_chan
|
||||
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
|
||||
):
|
||||
# XXX: we close the stream on any of these error conditions:
|
||||
except (
|
||||
# trio.ClosedResourceError, # by self._rx_chan
|
||||
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
|
||||
) as eoc:
|
||||
src_err = eoc
|
||||
self._eoc = eoc
|
||||
# await trio.sleep(1)
|
||||
|
||||
# a ``ClosedResourceError`` indicates that the internal
|
||||
# feeder memory receive channel was closed likely by the
|
||||
|
@ -168,14 +186,53 @@ class MsgStream(trio.abc.Channel):
|
|||
# closing this stream and not flushing a final value to
|
||||
# remaining (clone) consumers who may not have been
|
||||
# scheduled to receive it yet.
|
||||
# try:
|
||||
# maybe_err_msg_or_res: dict = self._rx_chan.receive_nowait()
|
||||
# if maybe_err_msg_or_res:
|
||||
# log.warning(
|
||||
# 'Discarding un-processed msg:\n'
|
||||
# f'{maybe_err_msg_or_res}'
|
||||
# )
|
||||
# except trio.WouldBlock:
|
||||
# # no queued msgs that might be another remote
|
||||
# # error, so just raise the original EoC
|
||||
# pass
|
||||
|
||||
# raise eoc
|
||||
|
||||
except trio.ClosedResourceError as cre: # by self._rx_chan
|
||||
src_err = cre
|
||||
log.warning(
|
||||
'`Context._rx_chan` was already closed?'
|
||||
)
|
||||
self._closed = cre
|
||||
|
||||
# when the send is closed we assume the stream has
|
||||
# terminated and signal this local iterator to stop
|
||||
await self.aclose()
|
||||
drained: list[Exception|dict] = await self.aclose()
|
||||
if drained:
|
||||
log.warning(
|
||||
'Drained context msgs during closure:\n'
|
||||
f'{drained}'
|
||||
)
|
||||
# TODO: pass these to the `._ctx._drained_msgs: deque`
|
||||
# and then iterate them as part of any `.result()` call?
|
||||
|
||||
raise # propagate
|
||||
# NOTE XXX: if the context was cancelled or remote-errored
|
||||
# but we received the stream close msg first, we
|
||||
# probably want to instead raise the remote error
|
||||
# over the end-of-stream connection error since likely
|
||||
# the remote error was the source cause?
|
||||
ctx: Context = self._ctx
|
||||
if re := ctx._remote_error:
|
||||
ctx._maybe_raise_remote_err(
|
||||
re,
|
||||
raise_ctxc_from_self_call=True,
|
||||
)
|
||||
|
||||
async def aclose(self):
|
||||
raise src_err # propagate
|
||||
|
||||
async def aclose(self) -> list[Exception|dict]:
|
||||
'''
|
||||
Cancel associated remote actor task and local memory channel on
|
||||
close.
|
||||
|
@ -185,15 +242,55 @@ class MsgStream(trio.abc.Channel):
|
|||
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
|
||||
rx_chan = self._rx_chan
|
||||
|
||||
if rx_chan._closed:
|
||||
log.cancel(f"{self} is already closed")
|
||||
if (
|
||||
rx_chan._closed
|
||||
or
|
||||
self._closed
|
||||
):
|
||||
log.cancel(
|
||||
f'`MsgStream` is already closed\n'
|
||||
f'.cid: {self._ctx.cid}\n'
|
||||
f'._rx_chan`: {rx_chan}\n'
|
||||
f'._eoc: {self._eoc}\n'
|
||||
f'._closed: {self._eoc}\n'
|
||||
)
|
||||
|
||||
# this stream has already been closed so silently succeed as
|
||||
# per ``trio.AsyncResource`` semantics.
|
||||
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
|
||||
return
|
||||
return []
|
||||
|
||||
self._eoc = True
|
||||
ctx: Context = self._ctx
|
||||
# caught_eoc: bool = False
|
||||
drained: list[Exception|dict] = []
|
||||
while not drained:
|
||||
try:
|
||||
maybe_final_msg = self.receive_nowait()
|
||||
if maybe_final_msg:
|
||||
log.cancel(
|
||||
'Drained un-processed stream msg:\n'
|
||||
f'{pformat(maybe_final_msg)}'
|
||||
)
|
||||
# TODO: inject into parent `Context` buf?
|
||||
drained.append(maybe_final_msg)
|
||||
|
||||
except trio.WouldBlock as be:
|
||||
drained.append(be)
|
||||
break
|
||||
|
||||
except trio.EndOfChannel as eoc:
|
||||
drained.append(eoc)
|
||||
# caught_eoc = True
|
||||
self._eoc: bool = eoc
|
||||
break
|
||||
|
||||
except ContextCancelled as ctxc:
|
||||
log.cancel(
|
||||
'Context was cancelled during stream closure:\n'
|
||||
f'canceller: {ctxc.canceller}\n'
|
||||
f'{pformat(ctxc.msgdata)}'
|
||||
)
|
||||
break
|
||||
|
||||
# NOTE: this is super subtle IPC messaging stuff:
|
||||
# Relay stop iteration to far end **iff** we're
|
||||
|
@ -224,26 +321,33 @@ class MsgStream(trio.abc.Channel):
|
|||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError
|
||||
):
|
||||
) as re:
|
||||
# the underlying channel may already have been pulled
|
||||
# in which case our stop message is meaningless since
|
||||
# it can't traverse the transport.
|
||||
ctx = self._ctx
|
||||
log.warning(
|
||||
f'Stream was already destroyed?\n'
|
||||
f'actor: {ctx.chan.uid}\n'
|
||||
f'ctx id: {ctx.cid}'
|
||||
)
|
||||
drained.append(re)
|
||||
self._closed = re
|
||||
|
||||
self._closed = True
|
||||
# if caught_eoc:
|
||||
# # from .devx import _debug
|
||||
# # await _debug.pause()
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await rx_chan.aclose()
|
||||
|
||||
# Do we close the local mem chan ``self._rx_chan`` ??!?
|
||||
# self._eoc: bool = caught_eoc
|
||||
|
||||
# NO, DEFINITELY NOT if we're a bi-dir ``MsgStream``!
|
||||
# BECAUSE this same core-msg-loop mem recv-chan is used to deliver
|
||||
# the potential final result from the surrounding inter-actor
|
||||
# `Context` so we don't want to close it until that context has
|
||||
# run to completion.
|
||||
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
|
||||
# => NO, DEFINITELY NOT! <=
|
||||
# if we're a bi-dir ``MsgStream`` BECAUSE this same
|
||||
# core-msg-loop mem recv-chan is used to deliver the
|
||||
# potential final result from the surrounding inter-actor
|
||||
# `Context` so we don't want to close it until that
|
||||
# context has run to completion.
|
||||
|
||||
# XXX: Notes on old behaviour:
|
||||
# await rx_chan.aclose()
|
||||
|
@ -272,6 +376,8 @@ class MsgStream(trio.abc.Channel):
|
|||
# runtime's closure of ``rx_chan`` in the case where we may
|
||||
# still need to consume msgs that are "in transit" from the far
|
||||
# end (eg. for ``Context.result()``).
|
||||
# self._closed = True
|
||||
return drained
|
||||
|
||||
@acm
|
||||
async def subscribe(
|
||||
|
@ -337,9 +443,13 @@ class MsgStream(trio.abc.Channel):
|
|||
raise self._ctx._remote_error # from None
|
||||
|
||||
if self._closed:
|
||||
raise trio.ClosedResourceError('This stream was already closed')
|
||||
raise self._closed
|
||||
# raise trio.ClosedResourceError('This stream was already closed')
|
||||
|
||||
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
|
||||
await self._ctx.chan.send({
|
||||
'yield': data,
|
||||
'cid': self._ctx.cid,
|
||||
})
|
||||
|
||||
|
||||
def stream(func: Callable) -> Callable:
|
||||
|
|
|
@ -157,7 +157,7 @@ class ActorNursery:
|
|||
|
||||
# start a task to spawn a process
|
||||
# blocks until process has been started and a portal setup
|
||||
nursery = nursery or self._da_nursery
|
||||
nursery: trio.Nursery = nursery or self._da_nursery
|
||||
|
||||
# XXX: the type ignore is actually due to a `mypy` bug
|
||||
return await nursery.start( # type: ignore
|
||||
|
@ -233,12 +233,14 @@ class ActorNursery:
|
|||
return portal
|
||||
|
||||
async def cancel(self, hard_kill: bool = False) -> None:
|
||||
"""Cancel this nursery by instructing each subactor to cancel
|
||||
'''
|
||||
Cancel this nursery by instructing each subactor to cancel
|
||||
itself and wait for all subactors to terminate.
|
||||
|
||||
If ``hard_killl`` is set to ``True`` then kill the processes
|
||||
directly without any far end graceful ``trio`` cancellation.
|
||||
"""
|
||||
|
||||
'''
|
||||
self.cancelled = True
|
||||
|
||||
log.cancel(f"Cancelling nursery in {self._actor.uid}")
|
||||
|
@ -246,7 +248,14 @@ class ActorNursery:
|
|||
|
||||
async with trio.open_nursery() as nursery:
|
||||
|
||||
for subactor, proc, portal in self._children.values():
|
||||
subactor: Actor
|
||||
proc: trio.Process
|
||||
portal: Portal
|
||||
for (
|
||||
subactor,
|
||||
proc,
|
||||
portal,
|
||||
) in self._children.values():
|
||||
|
||||
# TODO: are we ever even going to use this or
|
||||
# is the spawning backend responsible for such
|
||||
|
@ -286,8 +295,16 @@ class ActorNursery:
|
|||
# then hard kill all sub-processes
|
||||
if cs.cancelled_caught:
|
||||
log.error(
|
||||
f"Failed to cancel {self}\nHard killing process tree!")
|
||||
for subactor, proc, portal in self._children.values():
|
||||
f'Failed to cancel {self}\nHard killing process tree!'
|
||||
)
|
||||
subactor: Actor
|
||||
proc: trio.Process
|
||||
portal: Portal
|
||||
for (
|
||||
subactor,
|
||||
proc,
|
||||
portal,
|
||||
) in self._children.values():
|
||||
log.warning(f"Hard killing process {proc}")
|
||||
proc.terminate()
|
||||
|
||||
|
@ -384,7 +401,17 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
else:
|
||||
log.exception(
|
||||
f"Nursery for {current_actor().uid} "
|
||||
f"errored with")
|
||||
"errored with\n"
|
||||
|
||||
# TODO: same thing as in
|
||||
# `._invoke()` to compute how to
|
||||
# place this div-line in the
|
||||
# middle of the above msg
|
||||
# content..
|
||||
# -[ ] prolly helper-func it too
|
||||
# in our `.log` module..
|
||||
# '------ - ------'
|
||||
)
|
||||
|
||||
# cancel all subactors
|
||||
await anursery.cancel()
|
||||
|
|
|
@ -289,11 +289,19 @@ def get_console_log(
|
|||
if not level:
|
||||
return log
|
||||
|
||||
log.setLevel(level.upper() if not isinstance(level, int) else level)
|
||||
log.setLevel(
|
||||
level.upper()
|
||||
if not isinstance(level, int)
|
||||
else level
|
||||
)
|
||||
|
||||
if not any(
|
||||
handler.stream == sys.stderr # type: ignore
|
||||
for handler in logger.handlers if getattr(handler, 'stream', None)
|
||||
for handler in logger.handlers if getattr(
|
||||
handler,
|
||||
'stream',
|
||||
None,
|
||||
)
|
||||
):
|
||||
handler = logging.StreamHandler()
|
||||
formatter = colorlog.ColoredFormatter(
|
||||
|
|
Loading…
Reference in New Issue