diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 9316837..08ddabc 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -30,7 +30,6 @@ import sys from typing import ( Any, Callable, - Optional, Union, TYPE_CHECKING, ) @@ -101,7 +100,7 @@ async def _invoke( cancel_scope = trio.CancelScope() # activated cancel scope ref - cs: Optional[trio.CancelScope] = None + cs: trio.CancelScope | None = None ctx = actor.get_context( chan, @@ -468,16 +467,20 @@ class Actor: msg_buffer_size: int = 2**6 # nursery placeholders filled in by `async_main()` after fork - _root_n: Optional[trio.Nursery] = None - _service_n: Optional[trio.Nursery] = None - _server_n: Optional[trio.Nursery] = None + _root_n: trio.Nursery | None = None + _service_n: trio.Nursery | None = None + _server_n: trio.Nursery | None = None # Information about `__main__` from parent _parent_main_data: dict[str, str] - _parent_chan_cs: Optional[trio.CancelScope] = None + _parent_chan_cs: trio.CancelScope | None = None # syncs for setup/teardown sequences - _server_down: Optional[trio.Event] = None + _server_down: trio.Event | None = None + + # user toggled crash handling (including monkey-patched in + # `trio.open_nursery()` via `.trionics._supervisor` B) + _debug_mode: bool = False # if started on ``asycio`` running ``trio`` in guest mode _infected_aio: bool = False @@ -493,8 +496,8 @@ class Actor: enable_modules: list[str] = [], uid: str | None = None, loglevel: str | None = None, - arbiter_addr: Optional[tuple[str, int]] = None, - spawn_method: Optional[str] = None + arbiter_addr: tuple[str, int] | None = None, + spawn_method: str | None = None ) -> None: ''' This constructor is called in the parent actor **before** the spawning @@ -554,9 +557,8 @@ class Actor: ] = {} self._listeners: list[trio.abc.Listener] = [] - self._parent_chan: Optional[Channel] = None - self._forkserver_info: Optional[ - tuple[Any, Any, Any, Any, Any]] = None + self._parent_chan: Channel | None = None + self._forkserver_info: tuple | None = None self._actoruid2nursery: dict[ tuple[str, str], ActorNursery | None, @@ -647,7 +649,7 @@ class Actor: self._no_more_peers = trio.Event() # unset chan = Channel.from_stream(stream) - uid: Optional[tuple[str, str]] = chan.uid + uid: tuple[str, str] | None = chan.uid log.runtime(f"New connection to us {chan}") # send/receive initial handshake response @@ -695,7 +697,7 @@ class Actor: # append new channel self._peers[uid].append(chan) - local_nursery: Optional[ActorNursery] = None # noqa + local_nursery: ActorNursery | None = None # noqa disconnected: bool = False # Begin channel management - respond to remote requests and @@ -947,8 +949,8 @@ class Actor: async def _from_parent( self, - parent_addr: Optional[tuple[str, int]], - ) -> tuple[Channel, Optional[tuple[str, int]]]: + parent_addr: tuple[str, int] | None, + ) -> tuple[Channel, tuple[str, int] | None]: try: # Connect back to the parent actor and conduct initial # handshake. From this point on if we error, we @@ -961,7 +963,7 @@ class Actor: # Initial handshake: swap names. await self._do_handshake(chan) - accept_addr: Optional[tuple[str, int]] = None + accept_addr: tuple[str, int] | None = None if self._spawn_method == "trio": # Receive runtime state from our parent @@ -1020,7 +1022,7 @@ class Actor: self._server_down = trio.Event() try: async with trio.open_nursery() as server_n: - l: list[trio.abc.Listener] = await server_n.start( + listeners: list[trio.abc.Listener] = await server_n.start( partial( trio.serve_tcp, self._stream_handler, @@ -1031,10 +1033,13 @@ class Actor: host=accept_host, ) ) + sockets: list[trio.socket] = [ + getattr(listener, 'socket', 'unknown socket') + for listener in listeners + ] log.runtime( - "Started tcp server(s) on" - f" {[getattr(l, 'socket', 'unknown socket') for l in l]}") - self._listeners.extend(l) + f'Started tcp server(s) on {sockets}') + self._listeners.extend(listeners) task_status.started(server_n) finally: # signal the server is down since nursery above terminated @@ -1215,7 +1220,7 @@ class Actor: self._server_n.cancel_scope.cancel() @property - def accept_addr(self) -> Optional[tuple[str, int]]: + def accept_addr(self) -> tuple[str, int] | None: ''' Primary address to which the channel server is bound. @@ -1267,7 +1272,7 @@ class Actor: async def async_main( actor: Actor, - accept_addr: Optional[tuple[str, int]] = None, + accept_addr: tuple[str, int] | None = None, # XXX: currently ``parent_addr`` is only needed for the # ``multiprocessing`` backend (which pickles state sent to @@ -1276,7 +1281,7 @@ async def async_main( # change this to a simple ``is_subactor: bool`` which will # be False when running as root actor and True when as # a subactor. - parent_addr: Optional[tuple[str, int]] = None, + parent_addr: tuple[str, int] | None = None, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: