forked from goodboy/tractor
1
0
Fork 0

Mk `process_messages()` return last msg; summary logging

Not sure it's **that** useful (yet) but in theory would allow avoiding
certain log level usage around transient RPC requests for discovery methods
(like `.register_actor()` and friends); can't hurt to be able to
introspect that last message for other future cases I'd imagine as well.
Adjust the calling code in `._runtime` to match; other spots are using
the `trio.Nursery.start()` schedule style and are fine as is.

Improve a bunch more log messages throughout a few mods mostly by going
to a "summary" single-emission style where possible/appropriate:
- in `._runtime` more "single summary" status style log emissions:
 |_mk `Actor.load_modules()` render a single mod loaded summary.
 |_use a summary `con_status: str` for `Actor._stream_handler()` conn
   setup and an equiv (`con_teardown_status`) for connection teardowns.
 |_similar thing in `Actor.wait_for_actor()`.
- generally more usage of `.msg.pretty_struct` apis throughout `._runtime`.
runtime_to_msgspec
Tyler Goodlet 2024-04-30 12:15:46 -04:00
parent f139adddca
commit 40c972f0ec
4 changed files with 147 additions and 128 deletions

View File

@ -146,7 +146,7 @@ def _trio_main(
finally:
log.info(
'Actor terminated\n'
'Subactor terminated\n'
+
actor_info
)

View File

@ -435,7 +435,6 @@ class Portal:
yield stream
finally:
# cancel the far end task on consumer close
# NOTE: this is a special case since we assume that if using
# this ``.open_fream_from()`` api, the stream is one a one
@ -496,7 +495,7 @@ class LocalPortal:
async def open_portal(
channel: Channel,
nursery: trio.Nursery|None = None,
tn: trio.Nursery|None = None,
start_msg_loop: bool = True,
shield: bool = False,
@ -504,15 +503,19 @@ async def open_portal(
'''
Open a ``Portal`` through the provided ``channel``.
Spawns a background task to handle message processing (normally
done by the actor-runtime implicitly).
Spawns a background task to handle RPC processing, normally
done by the actor-runtime implicitly via a call to
`._rpc.process_messages()`. just after connection establishment.
'''
actor = current_actor()
assert actor
was_connected: bool = False
async with maybe_open_nursery(nursery, shield=shield) as nursery:
async with maybe_open_nursery(
tn,
shield=shield,
) as tn:
if not channel.connected():
await channel.connect()
@ -524,7 +527,7 @@ async def open_portal(
msg_loop_cs: trio.CancelScope|None = None
if start_msg_loop:
from ._runtime import process_messages
msg_loop_cs = await nursery.start(
msg_loop_cs = await tn.start(
partial(
process_messages,
actor,
@ -544,7 +547,7 @@ async def open_portal(
await channel.aclose()
# cancel background msg loop task
if msg_loop_cs:
if msg_loop_cs is not None:
msg_loop_cs.cancel()
nursery.cancel_scope.cancel()
tn.cancel_scope.cancel()

View File

@ -64,11 +64,13 @@ from .msg import (
current_codec,
MsgCodec,
NamespacePath,
pretty_struct,
)
from tractor.msg.types import (
CancelAck,
Error,
Msg,
MsgType,
Return,
Start,
StartAck,
@ -774,7 +776,10 @@ async def process_messages(
shield: bool = False,
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
) -> bool:
) -> (
bool, # chan diconnected
MsgType, # last msg
):
'''
This is the low-level, per-IPC-channel, RPC task scheduler loop.
@ -816,11 +821,6 @@ async def process_messages(
# |_ for ex, from `aioquic` which exposed "stream ids":
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
log.runtime(
'Entering RPC msg loop:\n'
f'peer: {chan.uid}\n'
f'|_{chan}\n'
)
nursery_cancelled_before_task: bool = False
msg: Msg|None = None
try:
@ -834,12 +834,15 @@ async def process_messages(
async for msg in chan:
log.transport( # type: ignore
f'<= IPC msg from peer: {chan.uid}\n\n'
f'IPC msg from peer\n'
f'<= {chan.uid}\n\n'
# TODO: avoid fmting depending on loglevel for perf?
# -[ ] specifically `pformat()` sub-call..?
# -[ ] specifically `pretty_struct.pformat()` sub-call..?
# - how to only log-level-aware actually call this?
# -[ ] use `.msg.pretty_struct` here now instead!
f'{pformat(msg)}\n'
# f'{pretty_struct.pformat(msg)}\n'
f'{msg}\n'
)
match msg:
@ -953,10 +956,11 @@ async def process_messages(
uid=actorid,
):
log.runtime(
'Handling RPC `Start` request from\n'
f'peer: {actorid}\n'
'\n'
f'=> {ns}.{funcname}({kwargs})\n'
'Handling RPC `Start` request\n'
f'<= peer: {actorid}\n'
f' |_{ns}.{funcname}({kwargs})\n\n'
f'{pretty_struct.pformat(msg)}\n'
)
# runtime-internal endpoint: `Actor.<funcname>`
@ -1097,25 +1101,24 @@ async def process_messages(
parent_chan=chan,
)
except (
TransportClosed,
):
except TransportClosed:
# channels "breaking" (for TCP streams by EOF or 104
# connection-reset) is ok since we don't have a teardown
# handshake for them (yet) and instead we simply bail out of
# the message loop and expect the teardown sequence to clean
# up..
# TODO: add a teardown handshake? and,
#
# TODO: maybe add a teardown handshake? and,
# -[ ] don't show this msg if it's an ephemeral discovery ep call?
# -[ ] figure out how this will break with other transports?
log.runtime(
f'channel closed abruptly with\n'
f'peer: {chan.uid}\n'
f'IPC channel closed abruptly\n'
f'<=x peer: {chan.uid}\n'
f' |_{chan.raddr}\n'
)
# transport **WAS** disconnected
return True
return (True, msg)
except (
Exception,
@ -1156,8 +1159,8 @@ async def process_messages(
'Exiting IPC msg loop with final msg\n\n'
f'<= peer: {chan.uid}\n'
f' |_{chan}\n\n'
f'{pformat(msg)}\n\n'
f'{pretty_struct.pformat(msg)}'
)
# transport **WAS NOT** disconnected
return False
return (False, msg)

View File

@ -49,6 +49,7 @@ from pprint import pformat
import signal
import sys
from typing import (
Any,
Callable,
TYPE_CHECKING,
)
@ -68,7 +69,7 @@ from tractor.msg import (
pretty_struct,
NamespacePath,
types as msgtypes,
Msg,
MsgType,
)
from ._ipc import Channel
from ._context import (
@ -96,19 +97,6 @@ from ._rpc import (
process_messages,
try_ship_error_to_remote,
)
# from tractor.msg.types import (
# Aid,
# SpawnSpec,
# Start,
# StartAck,
# Started,
# Yield,
# Stop,
# Return,
# Error,
# )
if TYPE_CHECKING:
@ -315,29 +303,32 @@ class Actor:
self._reg_addrs = addrs
async def wait_for_peer(
self, uid: tuple[str, str]
self,
uid: tuple[str, str],
) -> tuple[trio.Event, Channel]:
'''
Wait for a connection back from a spawned actor with a `uid`
using a `trio.Event` for sync.
Wait for a connection back from a (spawned sub-)actor with
a `uid` using a `trio.Event` for sync.
'''
log.runtime(f"Waiting for peer {uid} to connect")
log.debug(f'Waiting for peer {uid!r} to connect')
event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait()
log.runtime(f"{uid} successfully connected back to us")
log.debug(f'{uid!r} successfully connected back to us')
return event, self._peers[uid][-1]
def load_modules(
self,
debug_mode: bool = False,
# debug_mode: bool = False,
) -> None:
'''
Load enabled RPC py-modules locally (after process fork/spawn).
Load explicitly enabled python modules from local fs after
process spawn.
Since this actor may be spawned on a different machine from
the original nursery we need to try and load the local module
code (presuming it exists).
code manually (presuming it exists).
'''
try:
@ -350,16 +341,21 @@ class Actor:
_mp_fixup_main._fixup_main_from_path(
parent_data['init_main_from_path'])
status: str = 'Attempting to import enabled modules:\n'
for modpath, filepath in self.enable_modules.items():
# XXX append the allowed module to the python path which
# should allow for relative (at least downward) imports.
sys.path.append(os.path.dirname(filepath))
log.runtime(f"Attempting to import {modpath}@{filepath}")
mod = importlib.import_module(modpath)
status += (
f'|_{modpath!r} -> {filepath!r}\n'
)
mod: ModuleType = importlib.import_module(modpath)
self._mods[modpath] = mod
if modpath == '__main__':
self._mods['__mp_main__'] = mod
log.runtime(status)
except ModuleNotFoundError:
# it is expected the corresponding `ModuleNotExposed` error
# will be raised later
@ -413,21 +409,23 @@ class Actor:
chan = Channel.from_stream(stream)
their_uid: tuple[str, str]|None = chan.uid
con_msg: str = ''
if their_uid:
con_status: str = ''
# TODO: remove this branch since can never happen?
# NOTE: `.uid` is only set after first contact
con_msg = (
'IPC Re-connection from already known peer? '
if their_uid:
con_status = (
'IPC Re-connection from already known peer?\n'
)
else:
con_msg = (
'New IPC connection to us '
con_status = (
'New inbound IPC connection <=\n'
)
con_msg += (
f'<= @{chan.raddr}\n'
con_status += (
f'|_{chan}\n'
# f' |_@{chan.raddr}\n\n'
# ^-TODO-^ remove since alfready in chan.__repr__()?
)
# send/receive initial handshake response
try:
@ -447,13 +445,13 @@ class Actor:
# a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place.
log.warning(
con_msg
con_status
+
' -> But failed to handshake? Ignoring..\n'
)
return
con_msg += (
con_status += (
f' -> Handshake with actor `{uid[0]}[{uid[1][-6:]}]` complete\n'
)
# IPC connection tracking for both peers and new children:
@ -466,7 +464,7 @@ class Actor:
None,
)
if event:
con_msg += (
con_status += (
' -> Waking subactor spawn waiters: '
f'{event.statistics().tasks_waiting}\n'
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
@ -477,7 +475,7 @@ class Actor:
event.set()
else:
con_msg += (
con_status += (
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
) # type: ignore
@ -491,13 +489,18 @@ class Actor:
# TODO: can we just use list-ref directly?
chans.append(chan)
log.runtime(con_msg)
con_status += ' -> Entering RPC msg loop..\n'
log.runtime(con_status)
# Begin channel management - respond to remote requests and
# process received reponses.
disconnected: bool = False
last_msg: MsgType
try:
disconnected: bool = await process_messages(
(
disconnected,
last_msg,
) = await process_messages(
self,
chan,
)
@ -598,16 +601,24 @@ class Actor:
# that the IPC layer may have failed
# unexpectedly since it may be the cause of
# other downstream errors.
entry = local_nursery._children.get(uid)
entry: tuple|None = local_nursery._children.get(uid)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
(poll := getattr(proc, 'poll', None))
and poll() is None
and
poll() is None # proc still alive
):
log.cancel(
# TODO: change log level based on
# detecting whether chan was created for
# ephemeral `.register_actor()` request!
# -[ ] also, that should be avoidable by
# re-using any existing chan from the
# `._discovery.get_registry()` call as
# well..
log.runtime(
f'Peer IPC broke but subproc is alive?\n\n'
f'<=x {chan.uid}@{chan.raddr}\n'
@ -616,17 +627,17 @@ class Actor:
# ``Channel`` teardown and closure sequence
# drop ref to channel so it can be gc-ed and disconnected
log.runtime(
f'Disconnected IPC channel:\n'
f'uid: {chan.uid}\n'
f'|_{pformat(chan)}\n'
con_teardown_status: str = (
f'IPC channel disconnected:\n'
f'<=x uid: {chan.uid}\n'
f' |_{pformat(chan)}\n\n'
)
chans.remove(chan)
# TODO: do we need to be this pedantic?
if not chans:
log.runtime(
f'No more channels with {chan.uid}'
con_teardown_status += (
f'-> No more channels with {chan.uid}'
)
self._peers.pop(uid, None)
@ -640,15 +651,16 @@ class Actor:
f' |_[{i}] {pformat(chan)}\n'
)
log.runtime(
f'Remaining IPC {len(self._peers)} peers:\n'
+ peers_str
con_teardown_status += (
f'-> Remaining IPC {len(self._peers)} peers: {peers_str}\n'
)
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channel connections")
con_teardown_status += (
'Signalling no more peer channel connections'
)
self._no_more_peers.set()
# NOTE: block this actor from acquiring the
@ -723,13 +735,16 @@ class Actor:
# TODO: figure out why this breaks tests..
db_cs.cancel()
log.runtime(con_teardown_status)
# finally block closure
# TODO: rename to `._deliver_payload()` since this handles
# more then just `result` msgs now obvi XD
async def _deliver_ctx_payload(
self,
chan: Channel,
cid: str,
msg: Msg|MsgTypeError,
msg: MsgType|MsgTypeError,
) -> None|bool:
'''
@ -754,7 +769,7 @@ class Actor:
# XXX don't need right since it's always in msg?
# f'=> cid: {cid}\n\n'
f'{pretty_struct.Struct.pformat(msg)}\n'
f'{pretty_struct.pformat(msg)}\n'
)
return
@ -896,9 +911,11 @@ class Actor:
cid=cid,
)
log.runtime(
'Sending RPC start msg\n\n'
'Sending RPC `Start`\n\n'
f'=> peer: {chan.uid}\n'
f' |_ {ns}.{func}({kwargs})\n'
f' |_ {ns}.{func}({kwargs})\n\n'
f'{pretty_struct.pformat(msg)}'
)
await chan.send(msg)
@ -955,31 +972,29 @@ class Actor:
if self._spawn_method == "trio":
# Receive runtime state from our parent
# parent_data: dict[str, Any]
# parent_data = await chan.recv()
# TODO: maybe we should just wrap this directly
# in a `Actor.spawn_info: SpawnInfo` struct?
# Receive post-spawn runtime state from our parent.
spawnspec: msgtypes.SpawnSpec = await chan.recv()
self._spawn_spec = spawnspec
log.runtime(
'Received runtime spec from parent:\n\n'
# TODO: eventually all these msgs as
# `msgspec.Struct` with a special mode that
# pformats them in multi-line mode, BUT only
# if "trace"/"util" mode is enabled?
log.runtime(
'Received runtime spec from parent:\n\n'
f'{pformat(spawnspec)}\n'
f'{pretty_struct.pformat(spawnspec)}\n'
)
# accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
accept_addrs: list[tuple[str, int]] = spawnspec.bind_addrs
# rvs = parent_data.pop('_runtime_vars')
rvs = spawnspec._runtime_vars
# TODO: another `Struct` for rtvs..
rvs: dict[str, Any] = spawnspec._runtime_vars
if rvs['_debug_mode']:
try:
log.info(
# TODO: maybe return some status msgs upward
# to that we can emit them in `con_status`
# instead?
log.devx(
'Enabling `stackscope` traces on SIGUSR1'
)
from .devx import enable_stack_on_sig
@ -989,7 +1004,6 @@ class Actor:
'`stackscope` not installed for use in debug mode!'
)
log.runtime(f'Runtime vars are: {rvs}')
rvs['_is_root'] = False
_state._runtime_vars.update(rvs)
@ -1006,18 +1020,12 @@ class Actor:
for val in spawnspec.reg_addrs
]
# for attr, value in parent_data.items():
# TODO: better then monkey patching..
# -[ ] maybe read the actual f#$-in `._spawn_spec` XD
for _, attr, value in pretty_struct.iter_fields(
spawnspec,
):
setattr(self, attr, value)
# if (
# attr == 'reg_addrs'
# and value
# ):
# self.reg_addrs = [tuple(val) for val in value]
# else:
# setattr(self, attr, value)
return (
chan,
@ -1026,12 +1034,11 @@ class Actor:
except OSError: # failed to connect
log.warning(
f'Failed to connect to parent!?\n\n'
'Closing IPC [TCP] transport server to\n'
f'{parent_addr}\n'
f'Failed to connect to spawning parent actor!?\n'
f'x=> {parent_addr}\n'
f'|_{self}\n\n'
)
await self.cancel(chan=None) # self cancel
await self.cancel(req_chan=None) # self cancel
raise
async def _serve_forever(
@ -1109,8 +1116,7 @@ class Actor:
# chan whose lifetime limits the lifetime of its remotely
# requested and locally spawned RPC tasks - similar to the
# supervision semantics of a nursery wherein the actual
# implementation does start all such tasks in
# a sub-nursery.
# implementation does start all such tasks in a sub-nursery.
req_chan: Channel|None,
) -> bool:
@ -1151,7 +1157,7 @@ class Actor:
# other) repr fields instead of doing this all manual..
msg: str = (
f'Runtime cancel request from {requester_type}:\n\n'
f'<= .cancel(): {requesting_uid}\n'
f'<= .cancel(): {requesting_uid}\n\n'
)
# TODO: what happens here when we self-cancel tho?
@ -1166,8 +1172,8 @@ class Actor:
dbcs = _debug.DebugStatus.req_cs
if dbcs is not None:
msg += (
'>> Cancelling active debugger request..\n'
f'|_{_debug.Lock}\n'
'-> Cancelling active debugger request..\n'
f'|_{_debug.Lock.pformat()}'
)
dbcs.cancel()
@ -1418,7 +1424,12 @@ class Actor:
'''
if self._server_n:
log.runtime("Shutting down channel server")
# TODO: obvi a different server type when we eventually
# support some others XD
server_prot: str = 'TCP'
log.runtime(
f'Cancelling {server_prot} server'
)
self._server_n.cancel_scope.cancel()
return True
@ -1602,6 +1613,7 @@ async def async_main(
assert accept_addrs
try:
# TODO: why is this not with the root nursery?
actor._server_n = await service_nursery.start(
partial(
actor._serve_forever,
@ -1886,13 +1898,13 @@ class Arbiter(Actor):
sockaddrs: list[tuple[str, int]] = []
sockaddr: tuple[str, int]
for (aname, _), sockaddr in self._registry.items():
log.runtime(
f'Actor mailbox info:\n'
f'aname: {aname}\n'
f'sockaddr: {sockaddr}\n'
mailbox_info: str = 'Actor registry contact infos:\n'
for uid, sockaddr in self._registry.items():
mailbox_info += (
f'|_uid: {uid}\n'
f'|_sockaddr: {sockaddr}\n\n'
)
if name == aname:
if name == uid[0]:
sockaddrs.append(sockaddr)
if not sockaddrs:
@ -1904,6 +1916,7 @@ class Arbiter(Actor):
if not isinstance(uid, trio.Event):
sockaddrs.append(self._registry[uid])
log.runtime(mailbox_info)
return sockaddrs
async def register_actor(