1
0
Fork 0
tractor/tractor/ipc/_tcp.py

224 lines
5.6 KiB
Python
Raw Normal View History

# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
TCP implementation of tractor.ipc._transport.MsgTransport protocol
'''
from __future__ import annotations
Factor actor-embedded IPC-tpt-server to `ipc` subsys Primarily moving the `Actor._serve_forever()`-task-as-method and supporting actor-instance attributes to a new `.ipo._server` sub-mod which now encapsulates, - the coupling various `trio.Nursery`s (and their independent lifetime mgmt) to different `trio.serve_listener()`s tasks and `SocketStream` handler scopes. - `Address` and `SocketListener` mgmt and tracking through the idea of an "IPC endpoint": each "bound-and-active instance" of a served-listener for some (varied transport protocol's socket) address. - start and shutdown of the entire server's lifetime via an `@acm`. - delegation of starting/stopping tpt-protocol-specific `trio.abc.Listener`s to the corresponding `.ipc._<proto_key>` sub-module (newly defined mod-top-level instead of `Address` method) `start/close_listener()` funcs. Impl details of the `.ipc._server` sub-sys, - add new `IPCServer`, allocated with `open_ipc_server()`, and which encapsulates starting multiple-transport-proto-`trio.abc.Listener`s from an input set of `._addr.Address`s using, |_`IPCServer.listen_on()` which internally spawns tasks that delegate to a new `_serve_ipc_eps()`, a rework of what was (effectively) `Actor._serve_forever()` and which now, * allocates a new `IPCEndpoint`-struct (see below) for each address-listener pair alongside the specified listener-serving/stream-handling `trio.Nursery`s provided by the caller. * starts and stops each transport (socket's) listener by calling `IPCEndpoint.start/close_listener()` which in turn delegates to the underlying `inspect.getmodule(IPCEndpoint.addr)` backend tpt module's equivalent impl. * tracks all created endpoints in a `._endpoints: list[IPCEndpoint]` which is further exposed through public properties for introspection of served transport-protocols and their addresses. |_`IPCServer._[parent/stream_handler]_tn: Nursery`s which are either allocated (in which case, as the same instance) or provided by the caller of `open_ipc_server()` such that the same nursery-cancel-scope controls offered by `trio.serve_listeners(handler_nursery=)` are offered where the `._parent_tn` is used to spawn `_serve_ipc_eps()` tasks, and `._stream_handler_tn` is passed verbatim as `handler_nursery`. - a new `IPCEndpoint`-struct (as mentioned) which wraps each transport-proto's address + listener + allocated-supervising-nursery to encapsulate the "lifetime of a server IPC endpoint" such that eventually we can track and managed per-protocol/address/`.listen_on()`-call scoped starts/stops/restarts for the purposes of filtering/banning peer traffic. |_ also included is an unused `.peer_tpts` table which we can hopefully use to replace `Actor._peers` in a `Channel`-tracking transport-proto-aware way! Surrounding changes to `.ipc.*` primitives to match, - make `[TCP|UDS]Address` types `msgspec.Struct(frozen=True)` and thus drop any-and-all `addr._host =` style mutation throughout. |_ as such also drop their `.__init__()` and `.__eq__()` meths. |_ UDS tweaks to field names and thus `.__repr__()`. - move `[TCP|UDS]Address.[start/close]_listener()` meths to be mod-level equiv `start|close_listener()` funcs. - just hard code the `.ipc._types._key_to_transport/._addr_to_transport` table entries instead of all the prior fancy dynamic class property reading stuff (remember, "explicit is better then implicit"). Modified in `._runtime.Actor` internals, - drop the `._serve_forever()` and `.cancel_server()`, methods and `._server_down` waiting logic from `.cancel_soon()` - add `.[_]ipc_server` which is opened just after the `._service_n` and delegate to it for any equivalent publicly exposed instance attributes/properties.
2025-04-10 22:06:12 +00:00
from typing import (
ClassVar,
)
# from contextlib import (
# asynccontextmanager as acm,
# )
Factor actor-embedded IPC-tpt-server to `ipc` subsys Primarily moving the `Actor._serve_forever()`-task-as-method and supporting actor-instance attributes to a new `.ipo._server` sub-mod which now encapsulates, - the coupling various `trio.Nursery`s (and their independent lifetime mgmt) to different `trio.serve_listener()`s tasks and `SocketStream` handler scopes. - `Address` and `SocketListener` mgmt and tracking through the idea of an "IPC endpoint": each "bound-and-active instance" of a served-listener for some (varied transport protocol's socket) address. - start and shutdown of the entire server's lifetime via an `@acm`. - delegation of starting/stopping tpt-protocol-specific `trio.abc.Listener`s to the corresponding `.ipc._<proto_key>` sub-module (newly defined mod-top-level instead of `Address` method) `start/close_listener()` funcs. Impl details of the `.ipc._server` sub-sys, - add new `IPCServer`, allocated with `open_ipc_server()`, and which encapsulates starting multiple-transport-proto-`trio.abc.Listener`s from an input set of `._addr.Address`s using, |_`IPCServer.listen_on()` which internally spawns tasks that delegate to a new `_serve_ipc_eps()`, a rework of what was (effectively) `Actor._serve_forever()` and which now, * allocates a new `IPCEndpoint`-struct (see below) for each address-listener pair alongside the specified listener-serving/stream-handling `trio.Nursery`s provided by the caller. * starts and stops each transport (socket's) listener by calling `IPCEndpoint.start/close_listener()` which in turn delegates to the underlying `inspect.getmodule(IPCEndpoint.addr)` backend tpt module's equivalent impl. * tracks all created endpoints in a `._endpoints: list[IPCEndpoint]` which is further exposed through public properties for introspection of served transport-protocols and their addresses. |_`IPCServer._[parent/stream_handler]_tn: Nursery`s which are either allocated (in which case, as the same instance) or provided by the caller of `open_ipc_server()` such that the same nursery-cancel-scope controls offered by `trio.serve_listeners(handler_nursery=)` are offered where the `._parent_tn` is used to spawn `_serve_ipc_eps()` tasks, and `._stream_handler_tn` is passed verbatim as `handler_nursery`. - a new `IPCEndpoint`-struct (as mentioned) which wraps each transport-proto's address + listener + allocated-supervising-nursery to encapsulate the "lifetime of a server IPC endpoint" such that eventually we can track and managed per-protocol/address/`.listen_on()`-call scoped starts/stops/restarts for the purposes of filtering/banning peer traffic. |_ also included is an unused `.peer_tpts` table which we can hopefully use to replace `Actor._peers` in a `Channel`-tracking transport-proto-aware way! Surrounding changes to `.ipc.*` primitives to match, - make `[TCP|UDS]Address` types `msgspec.Struct(frozen=True)` and thus drop any-and-all `addr._host =` style mutation throughout. |_ as such also drop their `.__init__()` and `.__eq__()` meths. |_ UDS tweaks to field names and thus `.__repr__()`. - move `[TCP|UDS]Address.[start/close]_listener()` meths to be mod-level equiv `start|close_listener()` funcs. - just hard code the `.ipc._types._key_to_transport/._addr_to_transport` table entries instead of all the prior fancy dynamic class property reading stuff (remember, "explicit is better then implicit"). Modified in `._runtime.Actor` internals, - drop the `._serve_forever()` and `.cancel_server()`, methods and `._server_down` waiting logic from `.cancel_soon()` - add `.[_]ipc_server` which is opened just after the `._service_n` and delegate to it for any equivalent publicly exposed instance attributes/properties.
2025-04-10 22:06:12 +00:00
import msgspec
import trio
from trio import (
SocketListener,
open_tcp_listeners,
)
from tractor.msg import MsgCodec
from tractor.log import get_logger
Factor actor-embedded IPC-tpt-server to `ipc` subsys Primarily moving the `Actor._serve_forever()`-task-as-method and supporting actor-instance attributes to a new `.ipo._server` sub-mod which now encapsulates, - the coupling various `trio.Nursery`s (and their independent lifetime mgmt) to different `trio.serve_listener()`s tasks and `SocketStream` handler scopes. - `Address` and `SocketListener` mgmt and tracking through the idea of an "IPC endpoint": each "bound-and-active instance" of a served-listener for some (varied transport protocol's socket) address. - start and shutdown of the entire server's lifetime via an `@acm`. - delegation of starting/stopping tpt-protocol-specific `trio.abc.Listener`s to the corresponding `.ipc._<proto_key>` sub-module (newly defined mod-top-level instead of `Address` method) `start/close_listener()` funcs. Impl details of the `.ipc._server` sub-sys, - add new `IPCServer`, allocated with `open_ipc_server()`, and which encapsulates starting multiple-transport-proto-`trio.abc.Listener`s from an input set of `._addr.Address`s using, |_`IPCServer.listen_on()` which internally spawns tasks that delegate to a new `_serve_ipc_eps()`, a rework of what was (effectively) `Actor._serve_forever()` and which now, * allocates a new `IPCEndpoint`-struct (see below) for each address-listener pair alongside the specified listener-serving/stream-handling `trio.Nursery`s provided by the caller. * starts and stops each transport (socket's) listener by calling `IPCEndpoint.start/close_listener()` which in turn delegates to the underlying `inspect.getmodule(IPCEndpoint.addr)` backend tpt module's equivalent impl. * tracks all created endpoints in a `._endpoints: list[IPCEndpoint]` which is further exposed through public properties for introspection of served transport-protocols and their addresses. |_`IPCServer._[parent/stream_handler]_tn: Nursery`s which are either allocated (in which case, as the same instance) or provided by the caller of `open_ipc_server()` such that the same nursery-cancel-scope controls offered by `trio.serve_listeners(handler_nursery=)` are offered where the `._parent_tn` is used to spawn `_serve_ipc_eps()` tasks, and `._stream_handler_tn` is passed verbatim as `handler_nursery`. - a new `IPCEndpoint`-struct (as mentioned) which wraps each transport-proto's address + listener + allocated-supervising-nursery to encapsulate the "lifetime of a server IPC endpoint" such that eventually we can track and managed per-protocol/address/`.listen_on()`-call scoped starts/stops/restarts for the purposes of filtering/banning peer traffic. |_ also included is an unused `.peer_tpts` table which we can hopefully use to replace `Actor._peers` in a `Channel`-tracking transport-proto-aware way! Surrounding changes to `.ipc.*` primitives to match, - make `[TCP|UDS]Address` types `msgspec.Struct(frozen=True)` and thus drop any-and-all `addr._host =` style mutation throughout. |_ as such also drop their `.__init__()` and `.__eq__()` meths. |_ UDS tweaks to field names and thus `.__repr__()`. - move `[TCP|UDS]Address.[start/close]_listener()` meths to be mod-level equiv `start|close_listener()` funcs. - just hard code the `.ipc._types._key_to_transport/._addr_to_transport` table entries instead of all the prior fancy dynamic class property reading stuff (remember, "explicit is better then implicit"). Modified in `._runtime.Actor` internals, - drop the `._serve_forever()` and `.cancel_server()`, methods and `._server_down` waiting logic from `.cancel_soon()` - add `.[_]ipc_server` which is opened just after the `._service_n` and delegate to it for any equivalent publicly exposed instance attributes/properties.
2025-04-10 22:06:12 +00:00
from tractor.ipc._transport import (
MsgTransport,
MsgpackTransport,
)
log = get_logger(__name__)
Factor actor-embedded IPC-tpt-server to `ipc` subsys Primarily moving the `Actor._serve_forever()`-task-as-method and supporting actor-instance attributes to a new `.ipo._server` sub-mod which now encapsulates, - the coupling various `trio.Nursery`s (and their independent lifetime mgmt) to different `trio.serve_listener()`s tasks and `SocketStream` handler scopes. - `Address` and `SocketListener` mgmt and tracking through the idea of an "IPC endpoint": each "bound-and-active instance" of a served-listener for some (varied transport protocol's socket) address. - start and shutdown of the entire server's lifetime via an `@acm`. - delegation of starting/stopping tpt-protocol-specific `trio.abc.Listener`s to the corresponding `.ipc._<proto_key>` sub-module (newly defined mod-top-level instead of `Address` method) `start/close_listener()` funcs. Impl details of the `.ipc._server` sub-sys, - add new `IPCServer`, allocated with `open_ipc_server()`, and which encapsulates starting multiple-transport-proto-`trio.abc.Listener`s from an input set of `._addr.Address`s using, |_`IPCServer.listen_on()` which internally spawns tasks that delegate to a new `_serve_ipc_eps()`, a rework of what was (effectively) `Actor._serve_forever()` and which now, * allocates a new `IPCEndpoint`-struct (see below) for each address-listener pair alongside the specified listener-serving/stream-handling `trio.Nursery`s provided by the caller. * starts and stops each transport (socket's) listener by calling `IPCEndpoint.start/close_listener()` which in turn delegates to the underlying `inspect.getmodule(IPCEndpoint.addr)` backend tpt module's equivalent impl. * tracks all created endpoints in a `._endpoints: list[IPCEndpoint]` which is further exposed through public properties for introspection of served transport-protocols and their addresses. |_`IPCServer._[parent/stream_handler]_tn: Nursery`s which are either allocated (in which case, as the same instance) or provided by the caller of `open_ipc_server()` such that the same nursery-cancel-scope controls offered by `trio.serve_listeners(handler_nursery=)` are offered where the `._parent_tn` is used to spawn `_serve_ipc_eps()` tasks, and `._stream_handler_tn` is passed verbatim as `handler_nursery`. - a new `IPCEndpoint`-struct (as mentioned) which wraps each transport-proto's address + listener + allocated-supervising-nursery to encapsulate the "lifetime of a server IPC endpoint" such that eventually we can track and managed per-protocol/address/`.listen_on()`-call scoped starts/stops/restarts for the purposes of filtering/banning peer traffic. |_ also included is an unused `.peer_tpts` table which we can hopefully use to replace `Actor._peers` in a `Channel`-tracking transport-proto-aware way! Surrounding changes to `.ipc.*` primitives to match, - make `[TCP|UDS]Address` types `msgspec.Struct(frozen=True)` and thus drop any-and-all `addr._host =` style mutation throughout. |_ as such also drop their `.__init__()` and `.__eq__()` meths. |_ UDS tweaks to field names and thus `.__repr__()`. - move `[TCP|UDS]Address.[start/close]_listener()` meths to be mod-level equiv `start|close_listener()` funcs. - just hard code the `.ipc._types._key_to_transport/._addr_to_transport` table entries instead of all the prior fancy dynamic class property reading stuff (remember, "explicit is better then implicit"). Modified in `._runtime.Actor` internals, - drop the `._serve_forever()` and `.cancel_server()`, methods and `._server_down` waiting logic from `.cancel_soon()` - add `.[_]ipc_server` which is opened just after the `._service_n` and delegate to it for any equivalent publicly exposed instance attributes/properties.
2025-04-10 22:06:12 +00:00
class TCPAddress(
msgspec.Struct,
frozen=True,
):
_host: str
_port: int
proto_key: ClassVar[str] = 'tcp'
unwrapped_type: ClassVar[type] = tuple[str, int]
def_bindspace: ClassVar[str] = '127.0.0.1'
@property
def is_valid(self) -> bool:
return self._port != 0
@property
def bindspace(self) -> str:
return self._host
@property
def domain(self) -> str:
return self._host
@classmethod
def from_addr(
cls,
addr: tuple[str, int]
) -> TCPAddress:
match addr:
case (str(), int()):
return TCPAddress(addr[0], addr[1])
case _:
raise ValueError(
f'Invalid unwrapped address for {cls}\n'
f'{addr}\n'
)
def unwrap(self) -> tuple[str, int]:
return (
self._host,
self._port,
)
@classmethod
def get_random(
cls,
bindspace: str = def_bindspace,
) -> TCPAddress:
return TCPAddress(bindspace, 0)
@classmethod
def get_root(cls) -> TCPAddress:
return TCPAddress(
'127.0.0.1',
1616,
)
def __repr__(self) -> str:
return (
f'{type(self).__name__}[{self.unwrap()}]'
)
Factor actor-embedded IPC-tpt-server to `ipc` subsys Primarily moving the `Actor._serve_forever()`-task-as-method and supporting actor-instance attributes to a new `.ipo._server` sub-mod which now encapsulates, - the coupling various `trio.Nursery`s (and their independent lifetime mgmt) to different `trio.serve_listener()`s tasks and `SocketStream` handler scopes. - `Address` and `SocketListener` mgmt and tracking through the idea of an "IPC endpoint": each "bound-and-active instance" of a served-listener for some (varied transport protocol's socket) address. - start and shutdown of the entire server's lifetime via an `@acm`. - delegation of starting/stopping tpt-protocol-specific `trio.abc.Listener`s to the corresponding `.ipc._<proto_key>` sub-module (newly defined mod-top-level instead of `Address` method) `start/close_listener()` funcs. Impl details of the `.ipc._server` sub-sys, - add new `IPCServer`, allocated with `open_ipc_server()`, and which encapsulates starting multiple-transport-proto-`trio.abc.Listener`s from an input set of `._addr.Address`s using, |_`IPCServer.listen_on()` which internally spawns tasks that delegate to a new `_serve_ipc_eps()`, a rework of what was (effectively) `Actor._serve_forever()` and which now, * allocates a new `IPCEndpoint`-struct (see below) for each address-listener pair alongside the specified listener-serving/stream-handling `trio.Nursery`s provided by the caller. * starts and stops each transport (socket's) listener by calling `IPCEndpoint.start/close_listener()` which in turn delegates to the underlying `inspect.getmodule(IPCEndpoint.addr)` backend tpt module's equivalent impl. * tracks all created endpoints in a `._endpoints: list[IPCEndpoint]` which is further exposed through public properties for introspection of served transport-protocols and their addresses. |_`IPCServer._[parent/stream_handler]_tn: Nursery`s which are either allocated (in which case, as the same instance) or provided by the caller of `open_ipc_server()` such that the same nursery-cancel-scope controls offered by `trio.serve_listeners(handler_nursery=)` are offered where the `._parent_tn` is used to spawn `_serve_ipc_eps()` tasks, and `._stream_handler_tn` is passed verbatim as `handler_nursery`. - a new `IPCEndpoint`-struct (as mentioned) which wraps each transport-proto's address + listener + allocated-supervising-nursery to encapsulate the "lifetime of a server IPC endpoint" such that eventually we can track and managed per-protocol/address/`.listen_on()`-call scoped starts/stops/restarts for the purposes of filtering/banning peer traffic. |_ also included is an unused `.peer_tpts` table which we can hopefully use to replace `Actor._peers` in a `Channel`-tracking transport-proto-aware way! Surrounding changes to `.ipc.*` primitives to match, - make `[TCP|UDS]Address` types `msgspec.Struct(frozen=True)` and thus drop any-and-all `addr._host =` style mutation throughout. |_ as such also drop their `.__init__()` and `.__eq__()` meths. |_ UDS tweaks to field names and thus `.__repr__()`. - move `[TCP|UDS]Address.[start/close]_listener()` meths to be mod-level equiv `start|close_listener()` funcs. - just hard code the `.ipc._types._key_to_transport/._addr_to_transport` table entries instead of all the prior fancy dynamic class property reading stuff (remember, "explicit is better then implicit"). Modified in `._runtime.Actor` internals, - drop the `._serve_forever()` and `.cancel_server()`, methods and `._server_down` waiting logic from `.cancel_soon()` - add `.[_]ipc_server` which is opened just after the `._service_n` and delegate to it for any equivalent publicly exposed instance attributes/properties.
2025-04-10 22:06:12 +00:00
@classmethod
def get_transport(
cls,
codec: str = 'msgpack',
) -> MsgTransport:
match codec:
case 'msgspack':
return MsgpackTCPStream
case _:
raise ValueError(
f'No IPC transport with {codec!r} supported !'
)
Factor actor-embedded IPC-tpt-server to `ipc` subsys Primarily moving the `Actor._serve_forever()`-task-as-method and supporting actor-instance attributes to a new `.ipo._server` sub-mod which now encapsulates, - the coupling various `trio.Nursery`s (and their independent lifetime mgmt) to different `trio.serve_listener()`s tasks and `SocketStream` handler scopes. - `Address` and `SocketListener` mgmt and tracking through the idea of an "IPC endpoint": each "bound-and-active instance" of a served-listener for some (varied transport protocol's socket) address. - start and shutdown of the entire server's lifetime via an `@acm`. - delegation of starting/stopping tpt-protocol-specific `trio.abc.Listener`s to the corresponding `.ipc._<proto_key>` sub-module (newly defined mod-top-level instead of `Address` method) `start/close_listener()` funcs. Impl details of the `.ipc._server` sub-sys, - add new `IPCServer`, allocated with `open_ipc_server()`, and which encapsulates starting multiple-transport-proto-`trio.abc.Listener`s from an input set of `._addr.Address`s using, |_`IPCServer.listen_on()` which internally spawns tasks that delegate to a new `_serve_ipc_eps()`, a rework of what was (effectively) `Actor._serve_forever()` and which now, * allocates a new `IPCEndpoint`-struct (see below) for each address-listener pair alongside the specified listener-serving/stream-handling `trio.Nursery`s provided by the caller. * starts and stops each transport (socket's) listener by calling `IPCEndpoint.start/close_listener()` which in turn delegates to the underlying `inspect.getmodule(IPCEndpoint.addr)` backend tpt module's equivalent impl. * tracks all created endpoints in a `._endpoints: list[IPCEndpoint]` which is further exposed through public properties for introspection of served transport-protocols and their addresses. |_`IPCServer._[parent/stream_handler]_tn: Nursery`s which are either allocated (in which case, as the same instance) or provided by the caller of `open_ipc_server()` such that the same nursery-cancel-scope controls offered by `trio.serve_listeners(handler_nursery=)` are offered where the `._parent_tn` is used to spawn `_serve_ipc_eps()` tasks, and `._stream_handler_tn` is passed verbatim as `handler_nursery`. - a new `IPCEndpoint`-struct (as mentioned) which wraps each transport-proto's address + listener + allocated-supervising-nursery to encapsulate the "lifetime of a server IPC endpoint" such that eventually we can track and managed per-protocol/address/`.listen_on()`-call scoped starts/stops/restarts for the purposes of filtering/banning peer traffic. |_ also included is an unused `.peer_tpts` table which we can hopefully use to replace `Actor._peers` in a `Channel`-tracking transport-proto-aware way! Surrounding changes to `.ipc.*` primitives to match, - make `[TCP|UDS]Address` types `msgspec.Struct(frozen=True)` and thus drop any-and-all `addr._host =` style mutation throughout. |_ as such also drop their `.__init__()` and `.__eq__()` meths. |_ UDS tweaks to field names and thus `.__repr__()`. - move `[TCP|UDS]Address.[start/close]_listener()` meths to be mod-level equiv `start|close_listener()` funcs. - just hard code the `.ipc._types._key_to_transport/._addr_to_transport` table entries instead of all the prior fancy dynamic class property reading stuff (remember, "explicit is better then implicit"). Modified in `._runtime.Actor` internals, - drop the `._serve_forever()` and `.cancel_server()`, methods and `._server_down` waiting logic from `.cancel_soon()` - add `.[_]ipc_server` which is opened just after the `._service_n` and delegate to it for any equivalent publicly exposed instance attributes/properties.
2025-04-10 22:06:12 +00:00
async def start_listener(
addr: TCPAddress,
**kwargs,
) -> SocketListener:
'''
Start a TCP socket listener on the given `TCPAddress`.
Factor actor-embedded IPC-tpt-server to `ipc` subsys Primarily moving the `Actor._serve_forever()`-task-as-method and supporting actor-instance attributes to a new `.ipo._server` sub-mod which now encapsulates, - the coupling various `trio.Nursery`s (and their independent lifetime mgmt) to different `trio.serve_listener()`s tasks and `SocketStream` handler scopes. - `Address` and `SocketListener` mgmt and tracking through the idea of an "IPC endpoint": each "bound-and-active instance" of a served-listener for some (varied transport protocol's socket) address. - start and shutdown of the entire server's lifetime via an `@acm`. - delegation of starting/stopping tpt-protocol-specific `trio.abc.Listener`s to the corresponding `.ipc._<proto_key>` sub-module (newly defined mod-top-level instead of `Address` method) `start/close_listener()` funcs. Impl details of the `.ipc._server` sub-sys, - add new `IPCServer`, allocated with `open_ipc_server()`, and which encapsulates starting multiple-transport-proto-`trio.abc.Listener`s from an input set of `._addr.Address`s using, |_`IPCServer.listen_on()` which internally spawns tasks that delegate to a new `_serve_ipc_eps()`, a rework of what was (effectively) `Actor._serve_forever()` and which now, * allocates a new `IPCEndpoint`-struct (see below) for each address-listener pair alongside the specified listener-serving/stream-handling `trio.Nursery`s provided by the caller. * starts and stops each transport (socket's) listener by calling `IPCEndpoint.start/close_listener()` which in turn delegates to the underlying `inspect.getmodule(IPCEndpoint.addr)` backend tpt module's equivalent impl. * tracks all created endpoints in a `._endpoints: list[IPCEndpoint]` which is further exposed through public properties for introspection of served transport-protocols and their addresses. |_`IPCServer._[parent/stream_handler]_tn: Nursery`s which are either allocated (in which case, as the same instance) or provided by the caller of `open_ipc_server()` such that the same nursery-cancel-scope controls offered by `trio.serve_listeners(handler_nursery=)` are offered where the `._parent_tn` is used to spawn `_serve_ipc_eps()` tasks, and `._stream_handler_tn` is passed verbatim as `handler_nursery`. - a new `IPCEndpoint`-struct (as mentioned) which wraps each transport-proto's address + listener + allocated-supervising-nursery to encapsulate the "lifetime of a server IPC endpoint" such that eventually we can track and managed per-protocol/address/`.listen_on()`-call scoped starts/stops/restarts for the purposes of filtering/banning peer traffic. |_ also included is an unused `.peer_tpts` table which we can hopefully use to replace `Actor._peers` in a `Channel`-tracking transport-proto-aware way! Surrounding changes to `.ipc.*` primitives to match, - make `[TCP|UDS]Address` types `msgspec.Struct(frozen=True)` and thus drop any-and-all `addr._host =` style mutation throughout. |_ as such also drop their `.__init__()` and `.__eq__()` meths. |_ UDS tweaks to field names and thus `.__repr__()`. - move `[TCP|UDS]Address.[start/close]_listener()` meths to be mod-level equiv `start|close_listener()` funcs. - just hard code the `.ipc._types._key_to_transport/._addr_to_transport` table entries instead of all the prior fancy dynamic class property reading stuff (remember, "explicit is better then implicit"). Modified in `._runtime.Actor` internals, - drop the `._serve_forever()` and `.cancel_server()`, methods and `._server_down` waiting logic from `.cancel_soon()` - add `.[_]ipc_server` which is opened just after the `._service_n` and delegate to it for any equivalent publicly exposed instance attributes/properties.
2025-04-10 22:06:12 +00:00
'''
log.info(
f'Attempting to bind TCP socket\n'
f'>[\n'
f'|_{addr}\n'
)
Factor actor-embedded IPC-tpt-server to `ipc` subsys Primarily moving the `Actor._serve_forever()`-task-as-method and supporting actor-instance attributes to a new `.ipo._server` sub-mod which now encapsulates, - the coupling various `trio.Nursery`s (and their independent lifetime mgmt) to different `trio.serve_listener()`s tasks and `SocketStream` handler scopes. - `Address` and `SocketListener` mgmt and tracking through the idea of an "IPC endpoint": each "bound-and-active instance" of a served-listener for some (varied transport protocol's socket) address. - start and shutdown of the entire server's lifetime via an `@acm`. - delegation of starting/stopping tpt-protocol-specific `trio.abc.Listener`s to the corresponding `.ipc._<proto_key>` sub-module (newly defined mod-top-level instead of `Address` method) `start/close_listener()` funcs. Impl details of the `.ipc._server` sub-sys, - add new `IPCServer`, allocated with `open_ipc_server()`, and which encapsulates starting multiple-transport-proto-`trio.abc.Listener`s from an input set of `._addr.Address`s using, |_`IPCServer.listen_on()` which internally spawns tasks that delegate to a new `_serve_ipc_eps()`, a rework of what was (effectively) `Actor._serve_forever()` and which now, * allocates a new `IPCEndpoint`-struct (see below) for each address-listener pair alongside the specified listener-serving/stream-handling `trio.Nursery`s provided by the caller. * starts and stops each transport (socket's) listener by calling `IPCEndpoint.start/close_listener()` which in turn delegates to the underlying `inspect.getmodule(IPCEndpoint.addr)` backend tpt module's equivalent impl. * tracks all created endpoints in a `._endpoints: list[IPCEndpoint]` which is further exposed through public properties for introspection of served transport-protocols and their addresses. |_`IPCServer._[parent/stream_handler]_tn: Nursery`s which are either allocated (in which case, as the same instance) or provided by the caller of `open_ipc_server()` such that the same nursery-cancel-scope controls offered by `trio.serve_listeners(handler_nursery=)` are offered where the `._parent_tn` is used to spawn `_serve_ipc_eps()` tasks, and `._stream_handler_tn` is passed verbatim as `handler_nursery`. - a new `IPCEndpoint`-struct (as mentioned) which wraps each transport-proto's address + listener + allocated-supervising-nursery to encapsulate the "lifetime of a server IPC endpoint" such that eventually we can track and managed per-protocol/address/`.listen_on()`-call scoped starts/stops/restarts for the purposes of filtering/banning peer traffic. |_ also included is an unused `.peer_tpts` table which we can hopefully use to replace `Actor._peers` in a `Channel`-tracking transport-proto-aware way! Surrounding changes to `.ipc.*` primitives to match, - make `[TCP|UDS]Address` types `msgspec.Struct(frozen=True)` and thus drop any-and-all `addr._host =` style mutation throughout. |_ as such also drop their `.__init__()` and `.__eq__()` meths. |_ UDS tweaks to field names and thus `.__repr__()`. - move `[TCP|UDS]Address.[start/close]_listener()` meths to be mod-level equiv `start|close_listener()` funcs. - just hard code the `.ipc._types._key_to_transport/._addr_to_transport` table entries instead of all the prior fancy dynamic class property reading stuff (remember, "explicit is better then implicit"). Modified in `._runtime.Actor` internals, - drop the `._serve_forever()` and `.cancel_server()`, methods and `._server_down` waiting logic from `.cancel_soon()` - add `.[_]ipc_server` which is opened just after the `._service_n` and delegate to it for any equivalent publicly exposed instance attributes/properties.
2025-04-10 22:06:12 +00:00
# ?TODO, maybe we should just change the lower-level call this is
# using internall per-listener?
listeners: list[SocketListener] = await open_tcp_listeners(
host=addr._host,
port=addr._port,
**kwargs
)
# NOTE, for now we don't expect non-singleton-resolving
# domain-addresses/multi-homed-hosts.
# (though it is supported by `open_tcp_listeners()`)
assert len(listeners) == 1
listener = listeners[0]
host, port = listener.socket.getsockname()[:2]
log.info(
f'Listening on TCP socket\n'
f'[>\n'
f' |_{addr}\n'
)
Factor actor-embedded IPC-tpt-server to `ipc` subsys Primarily moving the `Actor._serve_forever()`-task-as-method and supporting actor-instance attributes to a new `.ipo._server` sub-mod which now encapsulates, - the coupling various `trio.Nursery`s (and their independent lifetime mgmt) to different `trio.serve_listener()`s tasks and `SocketStream` handler scopes. - `Address` and `SocketListener` mgmt and tracking through the idea of an "IPC endpoint": each "bound-and-active instance" of a served-listener for some (varied transport protocol's socket) address. - start and shutdown of the entire server's lifetime via an `@acm`. - delegation of starting/stopping tpt-protocol-specific `trio.abc.Listener`s to the corresponding `.ipc._<proto_key>` sub-module (newly defined mod-top-level instead of `Address` method) `start/close_listener()` funcs. Impl details of the `.ipc._server` sub-sys, - add new `IPCServer`, allocated with `open_ipc_server()`, and which encapsulates starting multiple-transport-proto-`trio.abc.Listener`s from an input set of `._addr.Address`s using, |_`IPCServer.listen_on()` which internally spawns tasks that delegate to a new `_serve_ipc_eps()`, a rework of what was (effectively) `Actor._serve_forever()` and which now, * allocates a new `IPCEndpoint`-struct (see below) for each address-listener pair alongside the specified listener-serving/stream-handling `trio.Nursery`s provided by the caller. * starts and stops each transport (socket's) listener by calling `IPCEndpoint.start/close_listener()` which in turn delegates to the underlying `inspect.getmodule(IPCEndpoint.addr)` backend tpt module's equivalent impl. * tracks all created endpoints in a `._endpoints: list[IPCEndpoint]` which is further exposed through public properties for introspection of served transport-protocols and their addresses. |_`IPCServer._[parent/stream_handler]_tn: Nursery`s which are either allocated (in which case, as the same instance) or provided by the caller of `open_ipc_server()` such that the same nursery-cancel-scope controls offered by `trio.serve_listeners(handler_nursery=)` are offered where the `._parent_tn` is used to spawn `_serve_ipc_eps()` tasks, and `._stream_handler_tn` is passed verbatim as `handler_nursery`. - a new `IPCEndpoint`-struct (as mentioned) which wraps each transport-proto's address + listener + allocated-supervising-nursery to encapsulate the "lifetime of a server IPC endpoint" such that eventually we can track and managed per-protocol/address/`.listen_on()`-call scoped starts/stops/restarts for the purposes of filtering/banning peer traffic. |_ also included is an unused `.peer_tpts` table which we can hopefully use to replace `Actor._peers` in a `Channel`-tracking transport-proto-aware way! Surrounding changes to `.ipc.*` primitives to match, - make `[TCP|UDS]Address` types `msgspec.Struct(frozen=True)` and thus drop any-and-all `addr._host =` style mutation throughout. |_ as such also drop their `.__init__()` and `.__eq__()` meths. |_ UDS tweaks to field names and thus `.__repr__()`. - move `[TCP|UDS]Address.[start/close]_listener()` meths to be mod-level equiv `start|close_listener()` funcs. - just hard code the `.ipc._types._key_to_transport/._addr_to_transport` table entries instead of all the prior fancy dynamic class property reading stuff (remember, "explicit is better then implicit"). Modified in `._runtime.Actor` internals, - drop the `._serve_forever()` and `.cancel_server()`, methods and `._server_down` waiting logic from `.cancel_soon()` - add `.[_]ipc_server` which is opened just after the `._service_n` and delegate to it for any equivalent publicly exposed instance attributes/properties.
2025-04-10 22:06:12 +00:00
return listener
# TODO: typing oddity.. not sure why we have to inherit here, but it
# seems to be an issue with `get_msg_transport()` returning
# a `Type[Protocol]`; probably should make a `mypy` issue?
class MsgpackTCPStream(MsgpackTransport):
'''
A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using the ``msgspec`` codec lib.
'''
address_type = TCPAddress
layer_key: int = 4
@property
def maddr(self) -> str:
host, port = self.raddr.unwrap()
return (
# TODO, use `ipaddress` from stdlib to handle
# first detecting which of `ipv4/6` before
# choosing the routing prefix part.
f'/ipv4/{host}'
f'/{self.address_type.proto_key}/{port}'
# f'/{self.chan.uid[0]}'
# f'/{self.cid}'
# f'/cid={cid_head}..{cid_tail}'
# TODO: ? not use this ^ right ?
)
def connected(self) -> bool:
return self.stream.socket.fileno() != -1
@classmethod
async def connect_to(
cls,
destaddr: TCPAddress,
prefix_size: int = 4,
codec: MsgCodec|None = None,
**kwargs
) -> MsgpackTCPStream:
stream = await trio.open_tcp_stream(
*destaddr.unwrap(),
**kwargs
)
return MsgpackTCPStream(
stream,
prefix_size=prefix_size,
codec=codec
)
@classmethod
def get_stream_addrs(
cls,
stream: trio.SocketStream
) -> tuple[
TCPAddress,
TCPAddress,
]:
# TODO, what types are these?
lsockname = stream.socket.getsockname()
l_sockaddr: tuple[str, int] = tuple(lsockname[:2])
rsockname = stream.socket.getpeername()
r_sockaddr: tuple[str, int] = tuple(rsockname[:2])
return (
TCPAddress.from_addr(l_sockaddr),
TCPAddress.from_addr(r_sockaddr),
)