Drop 'IPC' prefix from `._server` types
We already have the `.ipc` sub-pkg name so it seems a bit redundant/noisy for a namespace path Bp Leave an alias for the `Server` rn since it's already used in a few other internal mods.. will likely rename later if everyone is cool with it..cluster_api_egs_conflict
							parent
							
								
									b71afdc615
								
							
						
					
					
						commit
						bff32b0ad7
					
				| 
						 | 
				
			
			@ -49,7 +49,7 @@ def test_basic_ipc_server(
 | 
			
		|||
            )
 | 
			
		||||
            assert server._no_more_peers.is_set()
 | 
			
		||||
 | 
			
		||||
            eps: list[ipc.IPCEndpoint] = await server.listen_on(
 | 
			
		||||
            eps: list[ipc._server.Endpoint] = await server.listen_on(
 | 
			
		||||
                accept_addrs=[rando_addr],
 | 
			
		||||
                stream_handler_nursery=None,
 | 
			
		||||
            )
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -289,7 +289,7 @@ async def maybe_wait_on_canced_subs(
 | 
			
		|||
#
 | 
			
		||||
# -[x] maybe change to mod-func and rename for implied
 | 
			
		||||
#    multi-transport semantics?
 | 
			
		||||
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
 | 
			
		||||
# -[ ] register each stream/tpt/chan with the owning `Endpoint`
 | 
			
		||||
#     so that we can query per tpt all peer contact infos?
 | 
			
		||||
#  |_[ ] possibly provide a global viewing via a
 | 
			
		||||
#        `collections.ChainMap`?
 | 
			
		||||
| 
						 | 
				
			
			@ -309,7 +309,7 @@ async def handle_stream_from_peer(
 | 
			
		|||
    any `IPCServer.listen_on()` passed `stream_handler_tn: Nursery`
 | 
			
		||||
    such that it is invoked as,
 | 
			
		||||
 | 
			
		||||
      IPCEndpoint.stream_handler_tn.start_soon(
 | 
			
		||||
      Endpoint.stream_handler_tn.start_soon(
 | 
			
		||||
          handle_stream,
 | 
			
		||||
          stream,
 | 
			
		||||
      )
 | 
			
		||||
| 
						 | 
				
			
			@ -577,7 +577,7 @@ async def handle_stream_from_peer(
 | 
			
		|||
    # finally block closure
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class IPCEndpoint(Struct):
 | 
			
		||||
class Endpoint(Struct):
 | 
			
		||||
    '''
 | 
			
		||||
    An instance of an IPC "bound" address where the lifetime of the
 | 
			
		||||
    "ability to accept connections" (from clients) and then handle
 | 
			
		||||
| 
						 | 
				
			
			@ -636,7 +636,7 @@ class IPCEndpoint(Struct):
 | 
			
		|||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class IPCServer(Struct):
 | 
			
		||||
class Server(Struct):
 | 
			
		||||
    _parent_tn: Nursery
 | 
			
		||||
    _stream_handler_tn: Nursery
 | 
			
		||||
    # level-triggered sig for whether "no peers are currently
 | 
			
		||||
| 
						 | 
				
			
			@ -644,7 +644,7 @@ class IPCServer(Struct):
 | 
			
		|||
    # initialized with `.is_set() == True`.
 | 
			
		||||
    _no_more_peers: trio.Event
 | 
			
		||||
 | 
			
		||||
    _endpoints: list[IPCEndpoint] = []
 | 
			
		||||
    _endpoints: list[Endpoint] = []
 | 
			
		||||
 | 
			
		||||
    # connection tracking & mgmt
 | 
			
		||||
    _peers: defaultdict[
 | 
			
		||||
| 
						 | 
				
			
			@ -659,10 +659,10 @@ class IPCServer(Struct):
 | 
			
		|||
    # syncs for setup/teardown sequences
 | 
			
		||||
    _shutdown: trio.Event|None = None
 | 
			
		||||
 | 
			
		||||
    # TODO, maybe just make `._endpoints: list[IPCEndpoint]` and
 | 
			
		||||
    # TODO, maybe just make `._endpoints: list[Endpoint]` and
 | 
			
		||||
    # provide dict-views onto it?
 | 
			
		||||
    # @property
 | 
			
		||||
    # def addrs2eps(self) -> dict[Address, IPCEndpoint]:
 | 
			
		||||
    # def addrs2eps(self) -> dict[Address, Endpoint]:
 | 
			
		||||
    #     ...
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
| 
						 | 
				
			
			@ -708,7 +708,7 @@ class IPCServer(Struct):
 | 
			
		|||
            await self._shutdown.wait()
 | 
			
		||||
        else:
 | 
			
		||||
            tpt_protos: list[str] = []
 | 
			
		||||
            ep: IPCEndpoint
 | 
			
		||||
            ep: Endpoint
 | 
			
		||||
            for ep in self._endpoints:
 | 
			
		||||
                tpt_protos.append(ep.addr.proto_key)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -790,7 +790,7 @@ class IPCServer(Struct):
 | 
			
		|||
 | 
			
		||||
    def epsdict(self) -> dict[
 | 
			
		||||
        Address,
 | 
			
		||||
        IPCEndpoint,
 | 
			
		||||
        Endpoint,
 | 
			
		||||
    ]:
 | 
			
		||||
        return {
 | 
			
		||||
            ep.addr: ep
 | 
			
		||||
| 
						 | 
				
			
			@ -804,7 +804,7 @@ class IPCServer(Struct):
 | 
			
		|||
        return ev.is_set()
 | 
			
		||||
 | 
			
		||||
    def pformat(self) -> str:
 | 
			
		||||
        eps: list[IPCEndpoint] = self._endpoints
 | 
			
		||||
        eps: list[Endpoint] = self._endpoints
 | 
			
		||||
 | 
			
		||||
        state_repr: str = (
 | 
			
		||||
            f'{len(eps)!r} IPC-endpoints active'
 | 
			
		||||
| 
						 | 
				
			
			@ -835,13 +835,13 @@ class IPCServer(Struct):
 | 
			
		|||
 | 
			
		||||
    # TODO? maybe allow shutting down a `.listen_on()`s worth of
 | 
			
		||||
    # listeners by cancelling the corresponding
 | 
			
		||||
    # `IPCEndpoint._listen_tn` only ?
 | 
			
		||||
    # `Endpoint._listen_tn` only ?
 | 
			
		||||
    # -[ ] in theory you could use this to
 | 
			
		||||
    #     "boot-and-wait-for-reconnect" of all current and connecting
 | 
			
		||||
    #     peers?
 | 
			
		||||
    #  |_ would require that the stream-handler is intercepted so we
 | 
			
		||||
    #     can intercept every `MsgTransport` (stream) and track per
 | 
			
		||||
    #     `IPCEndpoint` likely?
 | 
			
		||||
    #     `Endpoint` likely?
 | 
			
		||||
    #
 | 
			
		||||
    # async def unlisten(
 | 
			
		||||
    #     self,
 | 
			
		||||
| 
						 | 
				
			
			@ -854,7 +854,7 @@ class IPCServer(Struct):
 | 
			
		|||
        *,
 | 
			
		||||
        accept_addrs: list[tuple[str, int|str]]|None = None,
 | 
			
		||||
        stream_handler_nursery: Nursery|None = None,
 | 
			
		||||
    ) -> list[IPCEndpoint]:
 | 
			
		||||
    ) -> list[Endpoint]:
 | 
			
		||||
        '''
 | 
			
		||||
        Start `SocketListeners` (i.e. bind and call `socket.listen()`)
 | 
			
		||||
        for all IPC-transport-protocol specific `Address`-types
 | 
			
		||||
| 
						 | 
				
			
			@ -888,7 +888,7 @@ class IPCServer(Struct):
 | 
			
		|||
            f'Binding to endpoints for,\n'
 | 
			
		||||
            f'{accept_addrs}\n'
 | 
			
		||||
        )
 | 
			
		||||
        eps: list[IPCEndpoint] = await self._parent_tn.start(
 | 
			
		||||
        eps: list[Endpoint] = await self._parent_tn.start(
 | 
			
		||||
            partial(
 | 
			
		||||
                _serve_ipc_eps,
 | 
			
		||||
                server=self,
 | 
			
		||||
| 
						 | 
				
			
			@ -904,7 +904,7 @@ class IPCServer(Struct):
 | 
			
		|||
        self._endpoints.extend(eps)
 | 
			
		||||
        # XXX, just a little bit of sanity
 | 
			
		||||
        group_tn: Nursery|None = None
 | 
			
		||||
        ep: IPCEndpoint
 | 
			
		||||
        ep: Endpoint
 | 
			
		||||
        for ep in eps:
 | 
			
		||||
            if ep.addr not in self.addrs:
 | 
			
		||||
                breakpoint()
 | 
			
		||||
| 
						 | 
				
			
			@ -917,6 +917,10 @@ class IPCServer(Struct):
 | 
			
		|||
        return eps
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# alias until we decide on final naming
 | 
			
		||||
IPCServer = Server
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def _serve_ipc_eps(
 | 
			
		||||
    *,
 | 
			
		||||
    server: IPCServer,
 | 
			
		||||
| 
						 | 
				
			
			@ -941,12 +945,12 @@ async def _serve_ipc_eps(
 | 
			
		|||
        listen_tn: Nursery
 | 
			
		||||
        async with trio.open_nursery() as listen_tn:
 | 
			
		||||
 | 
			
		||||
            eps: list[IPCEndpoint] = []
 | 
			
		||||
            eps: list[Endpoint] = []
 | 
			
		||||
            # XXX NOTE, required to call `serve_listeners()` below.
 | 
			
		||||
            # ?TODO, maybe just pass `list(eps.values()` tho?
 | 
			
		||||
            listeners: list[trio.abc.Listener] = []
 | 
			
		||||
            for addr in listen_addrs:
 | 
			
		||||
                ep = IPCEndpoint(
 | 
			
		||||
                ep = Endpoint(
 | 
			
		||||
                    addr=addr,
 | 
			
		||||
                    listen_tn=listen_tn,
 | 
			
		||||
                    stream_handler_tn=stream_handler_tn,
 | 
			
		||||
| 
						 | 
				
			
			@ -1010,7 +1014,7 @@ async def _serve_ipc_eps(
 | 
			
		|||
    finally:
 | 
			
		||||
        if eps:
 | 
			
		||||
            addr: Address
 | 
			
		||||
            ep: IPCEndpoint
 | 
			
		||||
            ep: Endpoint
 | 
			
		||||
            for addr, ep in server.epsdict().items():
 | 
			
		||||
                ep.close_listener()
 | 
			
		||||
                server._endpoints.remove(ep)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue