From bff32b0ad75e5912521faab2ae2000707ac77b6d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 17 Jun 2025 23:33:58 -0400 Subject: [PATCH] Drop 'IPC' prefix from `._server` types We already have the `.ipc` sub-pkg name so it seems a bit redundant/noisy for a namespace path Bp Leave an alias for the `Server` rn since it's already used in a few other internal mods.. will likely rename later if everyone is cool with it.. --- tests/ipc/test_server.py | 2 +- tractor/ipc/_server.py | 40 ++++++++++++++++++++++------------------ 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/tests/ipc/test_server.py b/tests/ipc/test_server.py index 9045f5df..1d63bd1b 100644 --- a/tests/ipc/test_server.py +++ b/tests/ipc/test_server.py @@ -49,7 +49,7 @@ def test_basic_ipc_server( ) assert server._no_more_peers.is_set() - eps: list[ipc.IPCEndpoint] = await server.listen_on( + eps: list[ipc._server.Endpoint] = await server.listen_on( accept_addrs=[rando_addr], stream_handler_nursery=None, ) diff --git a/tractor/ipc/_server.py b/tractor/ipc/_server.py index ee1f4d68..a8732c10 100644 --- a/tractor/ipc/_server.py +++ b/tractor/ipc/_server.py @@ -289,7 +289,7 @@ async def maybe_wait_on_canced_subs( # # -[x] maybe change to mod-func and rename for implied # multi-transport semantics? -# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint` +# -[ ] register each stream/tpt/chan with the owning `Endpoint` # so that we can query per tpt all peer contact infos? # |_[ ] possibly provide a global viewing via a # `collections.ChainMap`? @@ -309,7 +309,7 @@ async def handle_stream_from_peer( any `IPCServer.listen_on()` passed `stream_handler_tn: Nursery` such that it is invoked as, - IPCEndpoint.stream_handler_tn.start_soon( + Endpoint.stream_handler_tn.start_soon( handle_stream, stream, ) @@ -577,7 +577,7 @@ async def handle_stream_from_peer( # finally block closure -class IPCEndpoint(Struct): +class Endpoint(Struct): ''' An instance of an IPC "bound" address where the lifetime of the "ability to accept connections" (from clients) and then handle @@ -636,7 +636,7 @@ class IPCEndpoint(Struct): ) -class IPCServer(Struct): +class Server(Struct): _parent_tn: Nursery _stream_handler_tn: Nursery # level-triggered sig for whether "no peers are currently @@ -644,7 +644,7 @@ class IPCServer(Struct): # initialized with `.is_set() == True`. _no_more_peers: trio.Event - _endpoints: list[IPCEndpoint] = [] + _endpoints: list[Endpoint] = [] # connection tracking & mgmt _peers: defaultdict[ @@ -659,10 +659,10 @@ class IPCServer(Struct): # syncs for setup/teardown sequences _shutdown: trio.Event|None = None - # TODO, maybe just make `._endpoints: list[IPCEndpoint]` and + # TODO, maybe just make `._endpoints: list[Endpoint]` and # provide dict-views onto it? # @property - # def addrs2eps(self) -> dict[Address, IPCEndpoint]: + # def addrs2eps(self) -> dict[Address, Endpoint]: # ... @property @@ -708,7 +708,7 @@ class IPCServer(Struct): await self._shutdown.wait() else: tpt_protos: list[str] = [] - ep: IPCEndpoint + ep: Endpoint for ep in self._endpoints: tpt_protos.append(ep.addr.proto_key) @@ -790,7 +790,7 @@ class IPCServer(Struct): def epsdict(self) -> dict[ Address, - IPCEndpoint, + Endpoint, ]: return { ep.addr: ep @@ -804,7 +804,7 @@ class IPCServer(Struct): return ev.is_set() def pformat(self) -> str: - eps: list[IPCEndpoint] = self._endpoints + eps: list[Endpoint] = self._endpoints state_repr: str = ( f'{len(eps)!r} IPC-endpoints active' @@ -835,13 +835,13 @@ class IPCServer(Struct): # TODO? maybe allow shutting down a `.listen_on()`s worth of # listeners by cancelling the corresponding - # `IPCEndpoint._listen_tn` only ? + # `Endpoint._listen_tn` only ? # -[ ] in theory you could use this to # "boot-and-wait-for-reconnect" of all current and connecting # peers? # |_ would require that the stream-handler is intercepted so we # can intercept every `MsgTransport` (stream) and track per - # `IPCEndpoint` likely? + # `Endpoint` likely? # # async def unlisten( # self, @@ -854,7 +854,7 @@ class IPCServer(Struct): *, accept_addrs: list[tuple[str, int|str]]|None = None, stream_handler_nursery: Nursery|None = None, - ) -> list[IPCEndpoint]: + ) -> list[Endpoint]: ''' Start `SocketListeners` (i.e. bind and call `socket.listen()`) for all IPC-transport-protocol specific `Address`-types @@ -888,7 +888,7 @@ class IPCServer(Struct): f'Binding to endpoints for,\n' f'{accept_addrs}\n' ) - eps: list[IPCEndpoint] = await self._parent_tn.start( + eps: list[Endpoint] = await self._parent_tn.start( partial( _serve_ipc_eps, server=self, @@ -904,7 +904,7 @@ class IPCServer(Struct): self._endpoints.extend(eps) # XXX, just a little bit of sanity group_tn: Nursery|None = None - ep: IPCEndpoint + ep: Endpoint for ep in eps: if ep.addr not in self.addrs: breakpoint() @@ -917,6 +917,10 @@ class IPCServer(Struct): return eps +# alias until we decide on final naming +IPCServer = Server + + async def _serve_ipc_eps( *, server: IPCServer, @@ -941,12 +945,12 @@ async def _serve_ipc_eps( listen_tn: Nursery async with trio.open_nursery() as listen_tn: - eps: list[IPCEndpoint] = [] + eps: list[Endpoint] = [] # XXX NOTE, required to call `serve_listeners()` below. # ?TODO, maybe just pass `list(eps.values()` tho? listeners: list[trio.abc.Listener] = [] for addr in listen_addrs: - ep = IPCEndpoint( + ep = Endpoint( addr=addr, listen_tn=listen_tn, stream_handler_tn=stream_handler_tn, @@ -1010,7 +1014,7 @@ async def _serve_ipc_eps( finally: if eps: addr: Address - ep: IPCEndpoint + ep: Endpoint for addr, ep in server.epsdict().items(): ep.close_listener() server._endpoints.remove(ep)