diff --git a/tractor/_entry.py b/tractor/_entry.py index bf719ab..78f8328 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -146,7 +146,7 @@ def _trio_main( finally: log.info( - 'Actor terminated\n' + 'Subactor terminated\n' + actor_info ) diff --git a/tractor/_portal.py b/tractor/_portal.py index e25a6c7..806dcc7 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -435,7 +435,6 @@ class Portal: yield stream finally: - # cancel the far end task on consumer close # NOTE: this is a special case since we assume that if using # this ``.open_fream_from()`` api, the stream is one a one @@ -496,7 +495,7 @@ class LocalPortal: async def open_portal( channel: Channel, - nursery: trio.Nursery|None = None, + tn: trio.Nursery|None = None, start_msg_loop: bool = True, shield: bool = False, @@ -504,15 +503,19 @@ async def open_portal( ''' Open a ``Portal`` through the provided ``channel``. - Spawns a background task to handle message processing (normally - done by the actor-runtime implicitly). + Spawns a background task to handle RPC processing, normally + done by the actor-runtime implicitly via a call to + `._rpc.process_messages()`. just after connection establishment. ''' actor = current_actor() assert actor was_connected: bool = False - async with maybe_open_nursery(nursery, shield=shield) as nursery: + async with maybe_open_nursery( + tn, + shield=shield, + ) as tn: if not channel.connected(): await channel.connect() @@ -524,7 +527,7 @@ async def open_portal( msg_loop_cs: trio.CancelScope|None = None if start_msg_loop: from ._runtime import process_messages - msg_loop_cs = await nursery.start( + msg_loop_cs = await tn.start( partial( process_messages, actor, @@ -544,7 +547,7 @@ async def open_portal( await channel.aclose() # cancel background msg loop task - if msg_loop_cs: + if msg_loop_cs is not None: msg_loop_cs.cancel() - nursery.cancel_scope.cancel() + tn.cancel_scope.cancel() diff --git a/tractor/_rpc.py b/tractor/_rpc.py index d5899d4..ee3151d 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -64,11 +64,13 @@ from .msg import ( current_codec, MsgCodec, NamespacePath, + pretty_struct, ) from tractor.msg.types import ( CancelAck, Error, Msg, + MsgType, Return, Start, StartAck, @@ -774,7 +776,10 @@ async def process_messages( shield: bool = False, task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED, -) -> bool: +) -> ( + bool, # chan diconnected + MsgType, # last msg +): ''' This is the low-level, per-IPC-channel, RPC task scheduler loop. @@ -816,11 +821,6 @@ async def process_messages( # |_ for ex, from `aioquic` which exposed "stream ids": # - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175 # - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659 - log.runtime( - 'Entering RPC msg loop:\n' - f'peer: {chan.uid}\n' - f'|_{chan}\n' - ) nursery_cancelled_before_task: bool = False msg: Msg|None = None try: @@ -834,12 +834,15 @@ async def process_messages( async for msg in chan: log.transport( # type: ignore - f'<= IPC msg from peer: {chan.uid}\n\n' + f'IPC msg from peer\n' + f'<= {chan.uid}\n\n' # TODO: avoid fmting depending on loglevel for perf? - # -[ ] specifically `pformat()` sub-call..? + # -[ ] specifically `pretty_struct.pformat()` sub-call..? + # - how to only log-level-aware actually call this? # -[ ] use `.msg.pretty_struct` here now instead! - f'{pformat(msg)}\n' + # f'{pretty_struct.pformat(msg)}\n' + f'{msg}\n' ) match msg: @@ -953,10 +956,11 @@ async def process_messages( uid=actorid, ): log.runtime( - 'Handling RPC `Start` request from\n' - f'peer: {actorid}\n' - '\n' - f'=> {ns}.{funcname}({kwargs})\n' + 'Handling RPC `Start` request\n' + f'<= peer: {actorid}\n' + f' |_{ns}.{funcname}({kwargs})\n\n' + + f'{pretty_struct.pformat(msg)}\n' ) # runtime-internal endpoint: `Actor.` @@ -1097,25 +1101,24 @@ async def process_messages( parent_chan=chan, ) - except ( - TransportClosed, - ): + except TransportClosed: # channels "breaking" (for TCP streams by EOF or 104 # connection-reset) is ok since we don't have a teardown # handshake for them (yet) and instead we simply bail out of # the message loop and expect the teardown sequence to clean # up.. - # TODO: add a teardown handshake? and, + # + # TODO: maybe add a teardown handshake? and, # -[ ] don't show this msg if it's an ephemeral discovery ep call? # -[ ] figure out how this will break with other transports? log.runtime( - f'channel closed abruptly with\n' - f'peer: {chan.uid}\n' - f'|_{chan.raddr}\n' + f'IPC channel closed abruptly\n' + f'<=x peer: {chan.uid}\n' + f' |_{chan.raddr}\n' ) # transport **WAS** disconnected - return True + return (True, msg) except ( Exception, @@ -1155,9 +1158,9 @@ async def process_messages( log.runtime( 'Exiting IPC msg loop with final msg\n\n' f'<= peer: {chan.uid}\n' - f'|_{chan}\n\n' - f'{pformat(msg)}\n\n' + f' |_{chan}\n\n' + f'{pretty_struct.pformat(msg)}' ) # transport **WAS NOT** disconnected - return False + return (False, msg) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index d7cc548..d28f490 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -49,6 +49,7 @@ from pprint import pformat import signal import sys from typing import ( + Any, Callable, TYPE_CHECKING, ) @@ -68,7 +69,7 @@ from tractor.msg import ( pretty_struct, NamespacePath, types as msgtypes, - Msg, + MsgType, ) from ._ipc import Channel from ._context import ( @@ -96,19 +97,6 @@ from ._rpc import ( process_messages, try_ship_error_to_remote, ) -# from tractor.msg.types import ( -# Aid, -# SpawnSpec, -# Start, -# StartAck, -# Started, -# Yield, -# Stop, -# Return, -# Error, -# ) - - if TYPE_CHECKING: @@ -315,29 +303,32 @@ class Actor: self._reg_addrs = addrs async def wait_for_peer( - self, uid: tuple[str, str] + self, + uid: tuple[str, str], + ) -> tuple[trio.Event, Channel]: ''' - Wait for a connection back from a spawned actor with a `uid` - using a `trio.Event` for sync. + Wait for a connection back from a (spawned sub-)actor with + a `uid` using a `trio.Event` for sync. ''' - log.runtime(f"Waiting for peer {uid} to connect") + log.debug(f'Waiting for peer {uid!r} to connect') event = self._peer_connected.setdefault(uid, trio.Event()) await event.wait() - log.runtime(f"{uid} successfully connected back to us") + log.debug(f'{uid!r} successfully connected back to us') return event, self._peers[uid][-1] def load_modules( self, - debug_mode: bool = False, + # debug_mode: bool = False, ) -> None: ''' - Load enabled RPC py-modules locally (after process fork/spawn). + Load explicitly enabled python modules from local fs after + process spawn. Since this actor may be spawned on a different machine from the original nursery we need to try and load the local module - code (presuming it exists). + code manually (presuming it exists). ''' try: @@ -350,16 +341,21 @@ class Actor: _mp_fixup_main._fixup_main_from_path( parent_data['init_main_from_path']) + status: str = 'Attempting to import enabled modules:\n' for modpath, filepath in self.enable_modules.items(): # XXX append the allowed module to the python path which # should allow for relative (at least downward) imports. sys.path.append(os.path.dirname(filepath)) - log.runtime(f"Attempting to import {modpath}@{filepath}") - mod = importlib.import_module(modpath) + status += ( + f'|_{modpath!r} -> {filepath!r}\n' + ) + mod: ModuleType = importlib.import_module(modpath) self._mods[modpath] = mod if modpath == '__main__': self._mods['__mp_main__'] = mod + log.runtime(status) + except ModuleNotFoundError: # it is expected the corresponding `ModuleNotExposed` error # will be raised later @@ -413,21 +409,23 @@ class Actor: chan = Channel.from_stream(stream) their_uid: tuple[str, str]|None = chan.uid - con_msg: str = '' + con_status: str = '' + + # TODO: remove this branch since can never happen? + # NOTE: `.uid` is only set after first contact if their_uid: - # NOTE: `.uid` is only set after first contact - con_msg = ( - 'IPC Re-connection from already known peer? ' + con_status = ( + 'IPC Re-connection from already known peer?\n' ) else: - con_msg = ( - 'New IPC connection to us ' + con_status = ( + 'New inbound IPC connection <=\n' ) - con_msg += ( - f'<= @{chan.raddr}\n' + con_status += ( f'|_{chan}\n' # f' |_@{chan.raddr}\n\n' + # ^-TODO-^ remove since alfready in chan.__repr__()? ) # send/receive initial handshake response try: @@ -447,13 +445,13 @@ class Actor: # a bound listener on the "arbiter" addr. the reset will be # because the handshake was never meant took place. log.warning( - con_msg + con_status + ' -> But failed to handshake? Ignoring..\n' ) return - con_msg += ( + con_status += ( f' -> Handshake with actor `{uid[0]}[{uid[1][-6:]}]` complete\n' ) # IPC connection tracking for both peers and new children: @@ -466,7 +464,7 @@ class Actor: None, ) if event: - con_msg += ( + con_status += ( ' -> Waking subactor spawn waiters: ' f'{event.statistics().tasks_waiting}\n' f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n' @@ -477,7 +475,7 @@ class Actor: event.set() else: - con_msg += ( + con_status += ( f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n' ) # type: ignore @@ -491,13 +489,18 @@ class Actor: # TODO: can we just use list-ref directly? chans.append(chan) - log.runtime(con_msg) + con_status += ' -> Entering RPC msg loop..\n' + log.runtime(con_status) # Begin channel management - respond to remote requests and # process received reponses. disconnected: bool = False + last_msg: MsgType try: - disconnected: bool = await process_messages( + ( + disconnected, + last_msg, + ) = await process_messages( self, chan, ) @@ -598,16 +601,24 @@ class Actor: # that the IPC layer may have failed # unexpectedly since it may be the cause of # other downstream errors. - entry = local_nursery._children.get(uid) + entry: tuple|None = local_nursery._children.get(uid) if entry: proc: trio.Process _, proc, _ = entry if ( (poll := getattr(proc, 'poll', None)) - and poll() is None + and + poll() is None # proc still alive ): - log.cancel( + # TODO: change log level based on + # detecting whether chan was created for + # ephemeral `.register_actor()` request! + # -[ ] also, that should be avoidable by + # re-using any existing chan from the + # `._discovery.get_registry()` call as + # well.. + log.runtime( f'Peer IPC broke but subproc is alive?\n\n' f'<=x {chan.uid}@{chan.raddr}\n' @@ -616,17 +627,17 @@ class Actor: # ``Channel`` teardown and closure sequence # drop ref to channel so it can be gc-ed and disconnected - log.runtime( - f'Disconnected IPC channel:\n' - f'uid: {chan.uid}\n' - f'|_{pformat(chan)}\n' + con_teardown_status: str = ( + f'IPC channel disconnected:\n' + f'<=x uid: {chan.uid}\n' + f' |_{pformat(chan)}\n\n' ) chans.remove(chan) # TODO: do we need to be this pedantic? if not chans: - log.runtime( - f'No more channels with {chan.uid}' + con_teardown_status += ( + f'-> No more channels with {chan.uid}' ) self._peers.pop(uid, None) @@ -640,15 +651,16 @@ class Actor: f' |_[{i}] {pformat(chan)}\n' ) - log.runtime( - f'Remaining IPC {len(self._peers)} peers:\n' - + peers_str + con_teardown_status += ( + f'-> Remaining IPC {len(self._peers)} peers: {peers_str}\n' ) # No more channels to other actors (at all) registered # as connected. if not self._peers: - log.runtime("Signalling no more peer channel connections") + con_teardown_status += ( + 'Signalling no more peer channel connections' + ) self._no_more_peers.set() # NOTE: block this actor from acquiring the @@ -723,13 +735,16 @@ class Actor: # TODO: figure out why this breaks tests.. db_cs.cancel() + log.runtime(con_teardown_status) + # finally block closure + # TODO: rename to `._deliver_payload()` since this handles # more then just `result` msgs now obvi XD async def _deliver_ctx_payload( self, chan: Channel, cid: str, - msg: Msg|MsgTypeError, + msg: MsgType|MsgTypeError, ) -> None|bool: ''' @@ -754,7 +769,7 @@ class Actor: # XXX don't need right since it's always in msg? # f'=> cid: {cid}\n\n' - f'{pretty_struct.Struct.pformat(msg)}\n' + f'{pretty_struct.pformat(msg)}\n' ) return @@ -896,9 +911,11 @@ class Actor: cid=cid, ) log.runtime( - 'Sending RPC start msg\n\n' + 'Sending RPC `Start`\n\n' f'=> peer: {chan.uid}\n' - f' |_ {ns}.{func}({kwargs})\n' + f' |_ {ns}.{func}({kwargs})\n\n' + + f'{pretty_struct.pformat(msg)}' ) await chan.send(msg) @@ -955,31 +972,29 @@ class Actor: if self._spawn_method == "trio": - # Receive runtime state from our parent - # parent_data: dict[str, Any] - # parent_data = await chan.recv() - - # TODO: maybe we should just wrap this directly - # in a `Actor.spawn_info: SpawnInfo` struct? + # Receive post-spawn runtime state from our parent. spawnspec: msgtypes.SpawnSpec = await chan.recv() self._spawn_spec = spawnspec - # TODO: eventually all these msgs as - # `msgspec.Struct` with a special mode that - # pformats them in multi-line mode, BUT only - # if "trace"/"util" mode is enabled? log.runtime( 'Received runtime spec from parent:\n\n' - f'{pformat(spawnspec)}\n' + + # TODO: eventually all these msgs as + # `msgspec.Struct` with a special mode that + # pformats them in multi-line mode, BUT only + # if "trace"/"util" mode is enabled? + f'{pretty_struct.pformat(spawnspec)}\n' ) - # accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs') accept_addrs: list[tuple[str, int]] = spawnspec.bind_addrs - # rvs = parent_data.pop('_runtime_vars') - rvs = spawnspec._runtime_vars + # TODO: another `Struct` for rtvs.. + rvs: dict[str, Any] = spawnspec._runtime_vars if rvs['_debug_mode']: try: - log.info( + # TODO: maybe return some status msgs upward + # to that we can emit them in `con_status` + # instead? + log.devx( 'Enabling `stackscope` traces on SIGUSR1' ) from .devx import enable_stack_on_sig @@ -989,7 +1004,6 @@ class Actor: '`stackscope` not installed for use in debug mode!' ) - log.runtime(f'Runtime vars are: {rvs}') rvs['_is_root'] = False _state._runtime_vars.update(rvs) @@ -1006,18 +1020,12 @@ class Actor: for val in spawnspec.reg_addrs ] - # for attr, value in parent_data.items(): + # TODO: better then monkey patching.. + # -[ ] maybe read the actual f#$-in `._spawn_spec` XD for _, attr, value in pretty_struct.iter_fields( spawnspec, ): setattr(self, attr, value) - # if ( - # attr == 'reg_addrs' - # and value - # ): - # self.reg_addrs = [tuple(val) for val in value] - # else: - # setattr(self, attr, value) return ( chan, @@ -1026,12 +1034,11 @@ class Actor: except OSError: # failed to connect log.warning( - f'Failed to connect to parent!?\n\n' - 'Closing IPC [TCP] transport server to\n' - f'{parent_addr}\n' + f'Failed to connect to spawning parent actor!?\n' + f'x=> {parent_addr}\n' f'|_{self}\n\n' ) - await self.cancel(chan=None) # self cancel + await self.cancel(req_chan=None) # self cancel raise async def _serve_forever( @@ -1109,8 +1116,7 @@ class Actor: # chan whose lifetime limits the lifetime of its remotely # requested and locally spawned RPC tasks - similar to the # supervision semantics of a nursery wherein the actual - # implementation does start all such tasks in - # a sub-nursery. + # implementation does start all such tasks in a sub-nursery. req_chan: Channel|None, ) -> bool: @@ -1151,7 +1157,7 @@ class Actor: # other) repr fields instead of doing this all manual.. msg: str = ( f'Runtime cancel request from {requester_type}:\n\n' - f'<= .cancel(): {requesting_uid}\n' + f'<= .cancel(): {requesting_uid}\n\n' ) # TODO: what happens here when we self-cancel tho? @@ -1166,8 +1172,8 @@ class Actor: dbcs = _debug.DebugStatus.req_cs if dbcs is not None: msg += ( - '>> Cancelling active debugger request..\n' - f'|_{_debug.Lock}\n' + '-> Cancelling active debugger request..\n' + f'|_{_debug.Lock.pformat()}' ) dbcs.cancel() @@ -1418,7 +1424,12 @@ class Actor: ''' if self._server_n: - log.runtime("Shutting down channel server") + # TODO: obvi a different server type when we eventually + # support some others XD + server_prot: str = 'TCP' + log.runtime( + f'Cancelling {server_prot} server' + ) self._server_n.cancel_scope.cancel() return True @@ -1602,6 +1613,7 @@ async def async_main( assert accept_addrs try: + # TODO: why is this not with the root nursery? actor._server_n = await service_nursery.start( partial( actor._serve_forever, @@ -1886,13 +1898,13 @@ class Arbiter(Actor): sockaddrs: list[tuple[str, int]] = [] sockaddr: tuple[str, int] - for (aname, _), sockaddr in self._registry.items(): - log.runtime( - f'Actor mailbox info:\n' - f'aname: {aname}\n' - f'sockaddr: {sockaddr}\n' + mailbox_info: str = 'Actor registry contact infos:\n' + for uid, sockaddr in self._registry.items(): + mailbox_info += ( + f'|_uid: {uid}\n' + f'|_sockaddr: {sockaddr}\n\n' ) - if name == aname: + if name == uid[0]: sockaddrs.append(sockaddr) if not sockaddrs: @@ -1904,6 +1916,7 @@ class Arbiter(Actor): if not isinstance(uid, trio.Event): sockaddrs.append(self._registry[uid]) + log.runtime(mailbox_info) return sockaddrs async def register_actor(