forked from goodboy/tractor
Drop `None`-sentinel cancels RPC loop mechanism
Pretty sure we haven't *needed it* for a while, it was always generally hazardous in terms of IPC msg types, AND it's definitely incompatible with a dynamically applied typed msg spec: you can't just expect a `None` to be willy nilly handled all the time XD For now I'm masking out all the code and leaving very detailed surrounding notes but am not removing it quite yet in case for strange reason it is needed by some edge case (though I haven't found according to the test suite). Backstory: ------ - ------ Originally (i'm pretty sure anyway) it was added as a super naive "remote cancellation" mechanism (back before there were specific `Actor` methods for such things) that was mostly (only?) used before IPC `Channel` closures to "more gracefully cancel" the connection's parented RPC tasks. Since we now have explicit runtime-RPC endpoints for conducting remote cancellation of both tasks and full actors, it should really be removed anyway, because: - a `None`-msg setinel is inconsistent with other RPC endpoint handling input patterns which (even prior to typed msging) had specific msg-value triggers. - the IPC endpoint's (block) implementation should use `Actor.cancel_rpc_tasks(parent_chan=chan)` instead of a manual loop through a `Actor._rpc_tasks.copy()`.. Deats: - mask the `Channel.send(None)` calls from both the `Actor._stream_handler()` tail as well as from the `._portal.open_portal()` was connected block. - mask the msg loop endpoint block and toss in lotsa notes. Unrelated tweaks: - drop `Actor._debug_mode`; unused. - make `Actor.cancel_server()` return a `bool`. - use `.msg.pretty_struct.Struct.pformat()` to show any msg that is ignored (bc invalid) in `._push_result()`.runtime_to_msgspec
parent
4cfe4979ff
commit
b9a61ded0a
|
@ -502,7 +502,7 @@ async def open_portal(
|
||||||
'''
|
'''
|
||||||
actor = current_actor()
|
actor = current_actor()
|
||||||
assert actor
|
assert actor
|
||||||
was_connected = False
|
was_connected: bool = False
|
||||||
|
|
||||||
async with maybe_open_nursery(nursery, shield=shield) as nursery:
|
async with maybe_open_nursery(nursery, shield=shield) as nursery:
|
||||||
|
|
||||||
|
@ -533,9 +533,7 @@ async def open_portal(
|
||||||
await portal.aclose()
|
await portal.aclose()
|
||||||
|
|
||||||
if was_connected:
|
if was_connected:
|
||||||
# gracefully signal remote channel-msg loop
|
await channel.aclose()
|
||||||
await channel.send(None)
|
|
||||||
# await channel.aclose()
|
|
||||||
|
|
||||||
# cancel background msg loop task
|
# cancel background msg loop task
|
||||||
if msg_loop_cs:
|
if msg_loop_cs:
|
||||||
|
|
|
@ -55,7 +55,6 @@ from ._exceptions import (
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
)
|
)
|
||||||
from .devx import (
|
from .devx import (
|
||||||
pause,
|
|
||||||
maybe_wait_for_debugger,
|
maybe_wait_for_debugger,
|
||||||
_debug,
|
_debug,
|
||||||
)
|
)
|
||||||
|
@ -429,8 +428,6 @@ async def _invoke(
|
||||||
# XXX for .pause_from_sync()` usage we need to make sure
|
# XXX for .pause_from_sync()` usage we need to make sure
|
||||||
# `greenback` is boostrapped in the subactor!
|
# `greenback` is boostrapped in the subactor!
|
||||||
await _debug.maybe_init_greenback()
|
await _debug.maybe_init_greenback()
|
||||||
# else:
|
|
||||||
# await pause()
|
|
||||||
|
|
||||||
# TODO: possibly a specially formatted traceback
|
# TODO: possibly a specially formatted traceback
|
||||||
# (not sure what typing is for this..)?
|
# (not sure what typing is for this..)?
|
||||||
|
@ -855,30 +852,54 @@ async def process_messages(
|
||||||
|
|
||||||
match msg:
|
match msg:
|
||||||
|
|
||||||
|
# NOTE: this *was a dedicated
|
||||||
|
# "graceful-terminate-loop" mechanism using
|
||||||
|
# a `None`-msg-sentinel which would cancel all RPC
|
||||||
|
# tasks parented by this loop's IPC channel; that
|
||||||
|
# is all rpc-scheduled-tasks started over the
|
||||||
|
# connection were explicitly per-task cancelled
|
||||||
|
# normally prior to the `Channel`'s underlying
|
||||||
|
# transport being later closed.
|
||||||
|
#
|
||||||
|
# * all `.send(None)`s were # removed as part of
|
||||||
|
# typed-msging requirements
|
||||||
|
#
|
||||||
|
# TODO: if this mechanism is still desired going
|
||||||
|
# forward it should be implemented as part of the
|
||||||
|
# normal runtime-cancel-RPC endpoints with either,
|
||||||
|
# - a special `msg.types.Msg` to trigger the loop endpoint
|
||||||
|
# (like `None` was used prior) or,
|
||||||
|
# - it should just be accomplished using A
|
||||||
|
# `Start(ns='self', func='cancel_rpc_tasks())`
|
||||||
|
# request instead?
|
||||||
|
#
|
||||||
# if msg is None:
|
# if msg is None:
|
||||||
# dedicated loop terminate sentinel
|
# case None:
|
||||||
case None:
|
# tasks: dict[
|
||||||
|
# tuple[Channel, str],
|
||||||
|
# tuple[Context, Callable, trio.Event]
|
||||||
|
# ] = actor._rpc_tasks.copy()
|
||||||
|
# log.cancel(
|
||||||
|
# f'Peer IPC channel terminated via `None` setinel msg?\n'
|
||||||
|
# f'=> Cancelling all {len(tasks)} local RPC tasks..\n'
|
||||||
|
# f'peer: {chan.uid}\n'
|
||||||
|
# f'|_{chan}\n'
|
||||||
|
# )
|
||||||
|
# # TODO: why aren't we just calling
|
||||||
|
# # `.cancel_rpc_tasks()` with the parent
|
||||||
|
# # chan as input instead?
|
||||||
|
# for (channel, cid) in tasks:
|
||||||
|
# if channel is chan:
|
||||||
|
# await actor._cancel_task(
|
||||||
|
# cid,
|
||||||
|
# channel,
|
||||||
|
# requesting_uid=channel.uid,
|
||||||
|
|
||||||
tasks: dict[
|
# ipc_msg=msg,
|
||||||
tuple[Channel, str],
|
# )
|
||||||
tuple[Context, Callable, trio.Event]
|
|
||||||
] = actor._rpc_tasks.copy()
|
|
||||||
log.cancel(
|
|
||||||
f'Peer IPC channel terminated via `None` setinel msg?\n'
|
|
||||||
f'=> Cancelling all {len(tasks)} local RPC tasks..\n'
|
|
||||||
f'peer: {chan.uid}\n'
|
|
||||||
f'|_{chan}\n'
|
|
||||||
)
|
|
||||||
for (channel, cid) in tasks:
|
|
||||||
if channel is chan:
|
|
||||||
await actor._cancel_task(
|
|
||||||
cid,
|
|
||||||
channel,
|
|
||||||
requesting_uid=channel.uid,
|
|
||||||
|
|
||||||
ipc_msg=msg,
|
# # immediately break out of this loop!
|
||||||
)
|
# break
|
||||||
break
|
|
||||||
|
|
||||||
# cid = msg.get('cid')
|
# cid = msg.get('cid')
|
||||||
# if cid:
|
# if cid:
|
||||||
|
@ -916,7 +937,7 @@ async def process_messages(
|
||||||
cid=cid,
|
cid=cid,
|
||||||
ns=ns,
|
ns=ns,
|
||||||
func=funcname,
|
func=funcname,
|
||||||
kwargs=kwargs,
|
kwargs=kwargs, # type-spec this? see `msg.types`
|
||||||
uid=actorid,
|
uid=actorid,
|
||||||
):
|
):
|
||||||
# try:
|
# try:
|
||||||
|
|
|
@ -65,7 +65,11 @@ from trio import (
|
||||||
TaskStatus,
|
TaskStatus,
|
||||||
)
|
)
|
||||||
|
|
||||||
from .msg import NamespacePath
|
from tractor.msg import (
|
||||||
|
pretty_struct,
|
||||||
|
NamespacePath,
|
||||||
|
types as msgtypes,
|
||||||
|
)
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
from ._context import (
|
from ._context import (
|
||||||
mk_context,
|
mk_context,
|
||||||
|
@ -91,10 +95,6 @@ from ._rpc import (
|
||||||
process_messages,
|
process_messages,
|
||||||
try_ship_error_to_remote,
|
try_ship_error_to_remote,
|
||||||
)
|
)
|
||||||
from tractor.msg import (
|
|
||||||
types as msgtypes,
|
|
||||||
pretty_struct,
|
|
||||||
)
|
|
||||||
# from tractor.msg.types import (
|
# from tractor.msg.types import (
|
||||||
# Aid,
|
# Aid,
|
||||||
# SpawnSpec,
|
# SpawnSpec,
|
||||||
|
@ -164,18 +164,15 @@ class Actor:
|
||||||
# Information about `__main__` from parent
|
# Information about `__main__` from parent
|
||||||
_parent_main_data: dict[str, str]
|
_parent_main_data: dict[str, str]
|
||||||
_parent_chan_cs: CancelScope|None = None
|
_parent_chan_cs: CancelScope|None = None
|
||||||
_spawn_spec: SpawnSpec|None = None
|
_spawn_spec: msgtypes.SpawnSpec|None = None
|
||||||
|
|
||||||
# syncs for setup/teardown sequences
|
# syncs for setup/teardown sequences
|
||||||
_server_down: trio.Event|None = None
|
_server_down: trio.Event|None = None
|
||||||
|
|
||||||
# user toggled crash handling (including monkey-patched in
|
|
||||||
# `trio.open_nursery()` via `.trionics._supervisor` B)
|
|
||||||
_debug_mode: bool = False
|
|
||||||
|
|
||||||
# if started on ``asycio`` running ``trio`` in guest mode
|
# if started on ``asycio`` running ``trio`` in guest mode
|
||||||
_infected_aio: bool = False
|
_infected_aio: bool = False
|
||||||
|
|
||||||
|
# TODO: nursery tracking like `trio` does?
|
||||||
# _ans: dict[
|
# _ans: dict[
|
||||||
# tuple[str, str],
|
# tuple[str, str],
|
||||||
# list[ActorNursery],
|
# list[ActorNursery],
|
||||||
|
@ -716,35 +713,50 @@ class Actor:
|
||||||
# TODO: figure out why this breaks tests..
|
# TODO: figure out why this breaks tests..
|
||||||
db_cs.cancel()
|
db_cs.cancel()
|
||||||
|
|
||||||
# XXX: is this necessary (GC should do it)?
|
# XXX TODO XXX: DO WE NEED THIS?
|
||||||
|
# -[ ] is it necessary any more (GC should do it) now
|
||||||
|
# that we have strict(er) graceful cancellation
|
||||||
|
# semantics?
|
||||||
# XXX WARNING XXX
|
# XXX WARNING XXX
|
||||||
# Be AWARE OF THE INDENT LEVEL HERE
|
# Be AWARE OF THE INDENT LEVEL HERE
|
||||||
# -> ONLY ENTER THIS BLOCK WHEN ._peers IS
|
# -> ONLY ENTER THIS BLOCK WHEN ._peers IS
|
||||||
# EMPTY!!!!
|
# EMPTY!!!!
|
||||||
if (
|
#
|
||||||
not self._peers
|
|
||||||
and chan.connected()
|
|
||||||
):
|
|
||||||
# if the channel is still connected it may mean the far
|
# if the channel is still connected it may mean the far
|
||||||
# end has not closed and we may have gotten here due to
|
# end has not closed and we may have gotten here due to
|
||||||
# an error and so we should at least try to terminate
|
# an error and so we should at least try to terminate
|
||||||
# the channel from this end gracefully.
|
# the channel from this end gracefully.
|
||||||
log.runtime(
|
#if (
|
||||||
'Terminating channel with `None` setinel msg\n'
|
# not self._peers
|
||||||
f'|_{chan}\n'
|
# and chan.connected()
|
||||||
)
|
#):
|
||||||
try:
|
# log.runtime(
|
||||||
# send msg loop terminate sentinel which
|
# 'Terminating channel with `None` setinel msg\n'
|
||||||
# triggers cancellation of all remotely
|
# f'|_{chan}\n'
|
||||||
# started tasks.
|
# )
|
||||||
await chan.send(None)
|
# try:
|
||||||
|
# # ORIGINALLY we sent a msg loop terminate
|
||||||
|
# # sentinel (`None`) which triggers
|
||||||
|
# # cancellation of all remotely started
|
||||||
|
# # tasks.
|
||||||
|
# #
|
||||||
|
# # HOWEVER, after we added typed msging,
|
||||||
|
# # you can't just willy nilly send `None`
|
||||||
|
# # wherever since it might be invalid given
|
||||||
|
# # the currently configured msg-spec.
|
||||||
|
# #
|
||||||
|
# # SO, this was all removed and I'm pretty
|
||||||
|
# # confident we don't need it replaced with
|
||||||
|
# # a manual RPC to
|
||||||
|
# # a `Actor.cancel_rpc_tasks()` right?
|
||||||
|
# await chan.send(None)
|
||||||
|
|
||||||
# XXX: do we want this? no right?
|
# # XXX: do we want this? NO RIGHT?
|
||||||
# causes "[104] connection reset by peer" on other end
|
# # causes "[104] connection reset by peer" on other end
|
||||||
# await chan.aclose()
|
# # await chan.aclose()
|
||||||
|
|
||||||
except trio.BrokenResourceError:
|
# except trio.BrokenResourceError:
|
||||||
log.runtime(f"Channel {chan.uid} was already closed")
|
# log.runtime(f"Channel {chan.uid} was already closed")
|
||||||
|
|
||||||
# TODO: rename to `._deliver_payload()` since this handles
|
# TODO: rename to `._deliver_payload()` since this handles
|
||||||
# more then just `result` msgs now obvi XD
|
# more then just `result` msgs now obvi XD
|
||||||
|
@ -774,9 +786,10 @@ class Actor:
|
||||||
log.warning(
|
log.warning(
|
||||||
'Ignoring invalid IPC ctx msg!\n\n'
|
'Ignoring invalid IPC ctx msg!\n\n'
|
||||||
f'<= sender: {uid}\n'
|
f'<= sender: {uid}\n'
|
||||||
f'=> cid: {cid}\n\n'
|
# XXX don't need right since it's always in msg?
|
||||||
|
# f'=> cid: {cid}\n\n'
|
||||||
|
|
||||||
f'{msg}\n'
|
f'{pretty_struct.Struct.pformat(msg)}\n'
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -1437,7 +1450,7 @@ class Actor:
|
||||||
)
|
)
|
||||||
await self._ongoing_rpc_tasks.wait()
|
await self._ongoing_rpc_tasks.wait()
|
||||||
|
|
||||||
def cancel_server(self) -> None:
|
def cancel_server(self) -> bool:
|
||||||
'''
|
'''
|
||||||
Cancel the internal IPC transport server nursery thereby
|
Cancel the internal IPC transport server nursery thereby
|
||||||
preventing any new inbound IPC connections establishing.
|
preventing any new inbound IPC connections establishing.
|
||||||
|
@ -1446,6 +1459,9 @@ class Actor:
|
||||||
if self._server_n:
|
if self._server_n:
|
||||||
log.runtime("Shutting down channel server")
|
log.runtime("Shutting down channel server")
|
||||||
self._server_n.cancel_scope.cancel()
|
self._server_n.cancel_scope.cancel()
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def accept_addrs(self) -> list[tuple[str, int]]:
|
def accept_addrs(self) -> list[tuple[str, int]]:
|
||||||
|
|
Loading…
Reference in New Issue