diff --git a/tests/ipc/__init__.py b/tests/ipc/__init__.py new file mode 100644 index 00000000..b8b1f156 --- /dev/null +++ b/tests/ipc/__init__.py @@ -0,0 +1,4 @@ +''' +`tractor.ipc` subsystem(s)/unit testing suites. + +''' diff --git a/tests/ipc/test_server.py b/tests/ipc/test_server.py new file mode 100644 index 00000000..9045f5df --- /dev/null +++ b/tests/ipc/test_server.py @@ -0,0 +1,72 @@ +''' +High-level `.ipc._server` unit tests. + +''' +from __future__ import annotations + +import pytest +import trio +from tractor import ( + devx, + ipc, + log, +) +from tractor._testing.addr import ( + get_rando_addr, +) +# TODO, use/check-roundtripping with some of these wrapper types? +# +# from .._addr import Address +# from ._chan import Channel +# from ._transport import MsgTransport +# from ._uds import UDSAddress +# from ._tcp import TCPAddress + + +@pytest.mark.parametrize( + '_tpt_proto', + ['uds', 'tcp'] +) +def test_basic_ipc_server( + _tpt_proto: str, + debug_mode: bool, + loglevel: str, +): + + # so we see the socket-listener reporting on console + log.get_console_log("INFO") + + rando_addr: tuple = get_rando_addr( + tpt_proto=_tpt_proto, + ) + async def main(): + async with ipc._server.open_ipc_server() as server: + + assert ( + server._parent_tn + and + server._parent_tn is server._stream_handler_tn + ) + assert server._no_more_peers.is_set() + + eps: list[ipc.IPCEndpoint] = await server.listen_on( + accept_addrs=[rando_addr], + stream_handler_nursery=None, + ) + assert ( + len(eps) == 1 + and + (ep := eps[0])._listener + and + not ep.peer_tpts + ) + + server._parent_tn.cancel_scope.cancel() + + # !TODO! actually make a bg-task connection from a client + # using `ipc._chan._connect_chan()` + + with devx.maybe_open_crash_handler( + pdb=debug_mode, + ): + trio.run(main)