forked from goodboy/tractor
Emit warning on any `ContextCancelled.canceller == None`
parent
6c9bc627d8
commit
f568fca98f
|
@ -33,12 +33,15 @@ import exceptiongroup as eg
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
from ._state import current_actor
|
from ._state import current_actor
|
||||||
|
from .log import get_logger
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._context import Context
|
from ._context import Context
|
||||||
from ._stream import MsgStream
|
from ._stream import MsgStream
|
||||||
from .log import StackLevelAdapter
|
from .log import StackLevelAdapter
|
||||||
|
|
||||||
|
log = get_logger('tractor')
|
||||||
|
|
||||||
_this_mod = importlib.import_module(__name__)
|
_this_mod = importlib.import_module(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -113,10 +116,35 @@ class ContextCancelled(RemoteActorError):
|
||||||
'''
|
'''
|
||||||
@property
|
@property
|
||||||
def canceller(self) -> tuple[str, str]|None:
|
def canceller(self) -> tuple[str, str]|None:
|
||||||
|
'''
|
||||||
|
Return the (maybe) `Actor.uid` for the requesting-author
|
||||||
|
of this ctxc.
|
||||||
|
|
||||||
|
Emit a warning msg when `.canceller` has not been set,
|
||||||
|
which usually idicates that a `None` msg-loop setinel was
|
||||||
|
sent before expected in the runtime. This can happen in
|
||||||
|
a few situations:
|
||||||
|
|
||||||
|
- (simulating) an IPC transport network outage
|
||||||
|
- a (malicious) pkt sent specifically to cancel an actor's
|
||||||
|
runtime non-gracefully without ensuring ongoing RPC tasks are
|
||||||
|
incrementally cancelled as is done with:
|
||||||
|
`Actor`
|
||||||
|
|_`.cancel()`
|
||||||
|
|_`.cancel_soon()`
|
||||||
|
|_`._cancel_task()`
|
||||||
|
|
||||||
|
'''
|
||||||
value = self.msgdata.get('canceller')
|
value = self.msgdata.get('canceller')
|
||||||
if value:
|
if value:
|
||||||
return tuple(value)
|
return tuple(value)
|
||||||
|
|
||||||
|
log.warning(
|
||||||
|
'IPC Context cancelled without a requesting actor?\n'
|
||||||
|
'Maybe the IPC transport ended abruptly?\n\n'
|
||||||
|
f'{self}'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class TransportClosed(trio.ClosedResourceError):
|
class TransportClosed(trio.ClosedResourceError):
|
||||||
"Underlying channel transport was closed prior to use"
|
"Underlying channel transport was closed prior to use"
|
||||||
|
@ -199,7 +227,6 @@ def pack_error(
|
||||||
):
|
):
|
||||||
error_msg.update(exc.msgdata)
|
error_msg.update(exc.msgdata)
|
||||||
|
|
||||||
|
|
||||||
pkt: dict = {'error': error_msg}
|
pkt: dict = {'error': error_msg}
|
||||||
if cid:
|
if cid:
|
||||||
pkt['cid'] = cid
|
pkt['cid'] = cid
|
||||||
|
@ -349,8 +376,8 @@ def _raise_from_no_key_in_msg(
|
||||||
# raise a ``StopAsyncIteration`` **and** in our catch
|
# raise a ``StopAsyncIteration`` **and** in our catch
|
||||||
# block below it will trigger ``.aclose()``.
|
# block below it will trigger ``.aclose()``.
|
||||||
raise trio.EndOfChannel(
|
raise trio.EndOfChannel(
|
||||||
f'Context stream ended due to msg:\n'
|
f'Context stream ended due to msg:\n\n'
|
||||||
f'{pformat(msg)}'
|
f'{pformat(msg)}\n'
|
||||||
) from src_err
|
) from src_err
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue