Non TCP specific addressing everywhere #17
|
@ -10,6 +10,7 @@ pkgs.mkShell {
|
|||
inherit nativeBuildInputs;
|
||||
|
||||
LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath nativeBuildInputs;
|
||||
TMPDIR = "/tmp";
|
||||
|
||||
shellHook = ''
|
||||
set -e
|
||||
|
|
|
@ -9,7 +9,7 @@ async def main(service_name):
|
|||
async with tractor.open_nursery() as an:
|
||||
await an.start_actor(service_name)
|
||||
|
||||
async with tractor.get_registry('127.0.0.1', 1616) as portal:
|
||||
async with tractor.get_registry() as portal:
|
||||
print(f"Arbiter is listening on {portal.channel}")
|
||||
|
||||
async with tractor.wait_for_actor(service_name) as sockaddr:
|
||||
|
|
|
@ -26,7 +26,7 @@ async def test_reg_then_unreg(reg_addr):
|
|||
portal = await n.start_actor('actor', enable_modules=[__name__])
|
||||
uid = portal.channel.uid
|
||||
|
||||
async with tractor.get_registry(*reg_addr) as aportal:
|
||||
async with tractor.get_registry(reg_addr) as aportal:
|
||||
# this local actor should be the arbiter
|
||||
assert actor is aportal.actor
|
||||
|
||||
|
@ -160,7 +160,7 @@ async def spawn_and_check_registry(
|
|||
async with tractor.open_root_actor(
|
||||
registry_addrs=[reg_addr],
|
||||
):
|
||||
async with tractor.get_registry(*reg_addr) as portal:
|
||||
async with tractor.get_registry(reg_addr) as portal:
|
||||
# runtime needs to be up to call this
|
||||
actor = tractor.current_actor()
|
||||
|
||||
|
@ -300,7 +300,7 @@ async def close_chans_before_nursery(
|
|||
async with tractor.open_root_actor(
|
||||
registry_addrs=[reg_addr],
|
||||
):
|
||||
async with tractor.get_registry(*reg_addr) as aportal:
|
||||
async with tractor.get_registry(reg_addr) as aportal:
|
||||
try:
|
||||
get_reg = partial(unpack_reg, aportal)
|
||||
|
||||
|
|
|
@ -66,6 +66,9 @@ def run_example_in_subproc(
|
|||
# due to backpressure!!!
|
||||
proc = testdir.popen(
|
||||
cmdargs,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
**kwargs,
|
||||
)
|
||||
assert not proc.returncode
|
||||
|
@ -119,10 +122,14 @@ def test_example(
|
|||
code = ex.read()
|
||||
|
||||
with run_example_in_subproc(code) as proc:
|
||||
proc.wait()
|
||||
err, _ = proc.stderr.read(), proc.stdout.read()
|
||||
# print(f'STDERR: {err}')
|
||||
# print(f'STDOUT: {out}')
|
||||
err = None
|
||||
try:
|
||||
if not proc.poll():
|
||||
_, err = proc.communicate(timeout=15)
|
||||
|
||||
except subprocess.TimeoutExpired as e:
|
||||
proc.kill()
|
||||
err = e.stderr
|
||||
|
||||
# if we get some gnarly output let's aggregate and raise
|
||||
if err:
|
||||
|
|
|
@ -871,7 +871,7 @@ async def serve_subactors(
|
|||
)
|
||||
await ipc.send((
|
||||
peer.chan.uid,
|
||||
peer.chan.raddr,
|
||||
peer.chan.raddr.unwrap(),
|
||||
))
|
||||
|
||||
print('Spawner exiting spawn serve loop!')
|
||||
|
|
|
@ -38,7 +38,7 @@ async def test_self_is_registered_localportal(reg_addr):
|
|||
"Verify waiting on the arbiter to register itself using a local portal."
|
||||
actor = tractor.current_actor()
|
||||
assert actor.is_arbiter
|
||||
async with tractor.get_registry(*reg_addr) as portal:
|
||||
async with tractor.get_registry(reg_addr) as portal:
|
||||
assert isinstance(portal, tractor._portal.LocalPortal)
|
||||
|
||||
with trio.fail_after(0.2):
|
||||
|
|
|
@ -32,7 +32,7 @@ def test_abort_on_sigint(daemon):
|
|||
@tractor_test
|
||||
async def test_cancel_remote_arbiter(daemon, reg_addr):
|
||||
assert not tractor.current_actor().is_arbiter
|
||||
async with tractor.get_registry(*reg_addr) as portal:
|
||||
async with tractor.get_registry(reg_addr) as portal:
|
||||
await portal.cancel_actor()
|
||||
|
||||
time.sleep(0.1)
|
||||
|
@ -41,7 +41,7 @@ async def test_cancel_remote_arbiter(daemon, reg_addr):
|
|||
|
||||
# no arbiter socket should exist
|
||||
with pytest.raises(OSError):
|
||||
async with tractor.get_registry(*reg_addr) as portal:
|
||||
async with tractor.get_registry(reg_addr) as portal:
|
||||
pass
|
||||
|
||||
|
||||
|
|
|
@ -77,7 +77,7 @@ async def movie_theatre_question():
|
|||
async def test_movie_theatre_convo(start_method):
|
||||
"""The main ``tractor`` routine.
|
||||
"""
|
||||
async with tractor.open_nursery() as n:
|
||||
async with tractor.open_nursery(debug_mode=True) as n:
|
||||
|
||||
portal = await n.start_actor(
|
||||
'frank',
|
||||
|
|
|
@ -0,0 +1,310 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from __future__ import annotations
|
||||
import os
|
||||
import tempfile
|
||||
from uuid import uuid4
|
||||
from typing import (
|
||||
Protocol,
|
||||
ClassVar,
|
||||
TypeVar,
|
||||
Union,
|
||||
Type
|
||||
)
|
||||
|
||||
import trio
|
||||
from trio import socket
|
||||
|
||||
|
||||
NamespaceType = TypeVar('NamespaceType')
|
||||
AddressType = TypeVar('AddressType')
|
||||
StreamType = TypeVar('StreamType')
|
||||
ListenerType = TypeVar('ListenerType')
|
||||
|
||||
|
||||
class Address(Protocol[
|
||||
NamespaceType,
|
||||
AddressType,
|
||||
StreamType,
|
||||
ListenerType
|
||||
]):
|
||||
|
||||
name_key: ClassVar[str]
|
||||
address_type: ClassVar[Type[AddressType]]
|
||||
|
||||
@property
|
||||
def is_valid(self) -> bool:
|
||||
...
|
||||
|
||||
@property
|
||||
def namespace(self) -> NamespaceType|None:
|
||||
...
|
||||
|
||||
@classmethod
|
||||
def from_addr(cls, addr: AddressType) -> Address:
|
||||
...
|
||||
|
||||
def unwrap(self) -> AddressType:
|
||||
...
|
||||
|
||||
@classmethod
|
||||
def get_random(cls, namespace: NamespaceType | None = None) -> Address:
|
||||
...
|
||||
|
||||
@classmethod
|
||||
def get_root(cls) -> Address:
|
||||
...
|
||||
|
||||
def __repr__(self) -> str:
|
||||
...
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
...
|
||||
|
||||
async def open_stream(self, **kwargs) -> StreamType:
|
||||
...
|
||||
|
||||
async def open_listener(self, **kwargs) -> ListenerType:
|
||||
...
|
||||
|
||||
async def close_listener(self):
|
||||
...
|
||||
|
||||
|
||||
class TCPAddress(Address[
|
||||
str,
|
||||
tuple[str, int],
|
||||
trio.SocketStream,
|
||||
trio.SocketListener
|
||||
]):
|
||||
|
||||
name_key: str = 'tcp'
|
||||
address_type: type = tuple[str, int]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
port: int
|
||||
):
|
||||
if (
|
||||
not isinstance(host, str)
|
||||
or
|
||||
not isinstance(port, int)
|
||||
):
|
||||
raise TypeError(f'Expected host {host} to be str and port {port} to be int')
|
||||
self._host = host
|
||||
self._port = port
|
||||
|
||||
@property
|
||||
def is_valid(self) -> bool:
|
||||
return self._port != 0
|
||||
|
||||
@property
|
||||
def namespace(self) -> str:
|
||||
return self._host
|
||||
|
||||
@classmethod
|
||||
def from_addr(cls, addr: tuple[str, int]) -> TCPAddress:
|
||||
return TCPAddress(addr[0], addr[1])
|
||||
|
||||
def unwrap(self) -> tuple[str, int]:
|
||||
return self._host, self._port
|
||||
|
||||
@classmethod
|
||||
def get_random(cls, namespace: str = '127.0.0.1') -> TCPAddress:
|
||||
return TCPAddress(namespace, 0)
|
||||
|
||||
@classmethod
|
||||
def get_root(cls) -> Address:
|
||||
return TCPAddress('127.0.0.1', 1616)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'{type(self)} @ {self.unwrap()}'
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
if not isinstance(other, TCPAddress):
|
||||
raise TypeError(
|
||||
f'Can not compare {type(other)} with {type(self)}'
|
||||
)
|
||||
|
||||
return (
|
||||
self._host == other._host
|
||||
and
|
||||
self._port == other._port
|
||||
)
|
||||
|
||||
async def open_stream(self, **kwargs) -> trio.SocketStream:
|
||||
stream = await trio.open_tcp_stream(
|
||||
self._host,
|
||||
self._port,
|
||||
**kwargs
|
||||
)
|
||||
self._host, self._port = stream.socket.getsockname()[:2]
|
||||
return stream
|
||||
|
||||
async def open_listener(self, **kwargs) -> trio.SocketListener:
|
||||
listeners = await trio.open_tcp_listeners(
|
||||
host=self._host,
|
||||
port=self._port,
|
||||
**kwargs
|
||||
)
|
||||
assert len(listeners) == 1
|
||||
listener = listeners[0]
|
||||
self._host, self._port = listener.socket.getsockname()[:2]
|
||||
return listener
|
||||
|
||||
async def close_listener(self):
|
||||
...
|
||||
|
||||
|
||||
class UDSAddress(Address[
|
||||
None,
|
||||
str,
|
||||
trio.SocketStream,
|
||||
trio.SocketListener
|
||||
]):
|
||||
|
||||
name_key: str = 'uds'
|
||||
address_type: type = str
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
filepath: str
|
||||
):
|
||||
self._filepath = filepath
|
||||
|
||||
@property
|
||||
def is_valid(self) -> bool:
|
||||
return True
|
||||
|
||||
@property
|
||||
def namespace(self) -> None:
|
||||
return
|
||||
|
||||
@classmethod
|
||||
def from_addr(cls, filepath: str) -> UDSAddress:
|
||||
return UDSAddress(filepath)
|
||||
|
||||
def unwrap(self) -> str:
|
||||
return self._filepath
|
||||
|
||||
@classmethod
|
||||
def get_random(cls, namespace: None = None) -> UDSAddress:
|
||||
return UDSAddress(f'{tempfile.gettempdir()}/{uuid4()}.sock')
|
||||
|
||||
@classmethod
|
||||
def get_root(cls) -> Address:
|
||||
return UDSAddress('tractor.sock')
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'{type(self)} @ {self._filepath}'
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
if not isinstance(other, UDSAddress):
|
||||
raise TypeError(
|
||||
f'Can not compare {type(other)} with {type(self)}'
|
||||
)
|
||||
|
||||
return self._filepath == other._filepath
|
||||
|
||||
async def open_stream(self, **kwargs) -> trio.SocketStream:
|
||||
stream = await trio.open_unix_socket(
|
||||
self._filepath,
|
||||
**kwargs
|
||||
)
|
||||
return stream
|
||||
|
||||
async def open_listener(self, **kwargs) -> trio.SocketListener:
|
||||
self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
await self._sock.bind(self._filepath)
|
||||
self._sock.listen(1)
|
||||
return trio.SocketListener(self._sock)
|
||||
|
||||
async def close_listener(self):
|
||||
self._sock.close()
|
||||
os.unlink(self._filepath)
|
||||
|
||||
|
||||
preferred_transport = 'uds'
|
||||
|
||||
|
||||
_address_types = (
|
||||
TCPAddress,
|
||||
UDSAddress
|
||||
)
|
||||
|
||||
|
||||
_default_addrs: dict[str, Type[Address]] = {
|
||||
cls.name_key: cls
|
||||
for cls in _address_types
|
||||
}
|
||||
|
||||
|
||||
AddressTypes = Union[
|
||||
tuple([
|
||||
cls.address_type
|
||||
for cls in _address_types
|
||||
])
|
||||
]
|
||||
|
||||
|
||||
_default_lo_addrs: dict[
|
||||
str,
|
||||
AddressTypes
|
||||
] = {
|
||||
cls.name_key: cls.get_root().unwrap()
|
||||
for cls in _address_types
|
||||
}
|
||||
|
||||
|
||||
def get_address_cls(name: str) -> Type[Address]:
|
||||
return _default_addrs[name]
|
||||
|
||||
|
||||
def is_wrapped_addr(addr: any) -> bool:
|
||||
return type(addr) in _address_types
|
||||
|
||||
|
||||
def wrap_address(addr: AddressTypes) -> Address:
|
||||
|
||||
if is_wrapped_addr(addr):
|
||||
return addr
|
||||
|
||||
cls = None
|
||||
match addr:
|
||||
case str():
|
||||
cls = UDSAddress
|
||||
|
||||
case tuple() | list():
|
||||
cls = TCPAddress
|
||||
|
||||
case None:
|
||||
cls = get_address_cls(preferred_transport)
|
||||
addr = cls.get_root().unwrap()
|
||||
|
||||
case _:
|
||||
raise TypeError(
|
||||
f'Can not wrap addr {addr} of type {type(addr)}'
|
||||
)
|
||||
|
||||
return cls.from_addr(addr)
|
||||
|
||||
|
||||
def default_lo_addrs(transports: list[str]) -> list[AddressTypes]:
|
||||
return [
|
||||
_default_lo_addrs[transport]
|
||||
for transport in transports
|
||||
]
|
|
@ -31,8 +31,12 @@ def parse_uid(arg):
|
|||
return str(name), str(uuid) # ensures str encoding
|
||||
|
||||
def parse_ipaddr(arg):
|
||||
host, port = literal_eval(arg)
|
||||
return (str(host), int(port))
|
||||
try:
|
||||
return literal_eval(arg)
|
||||
|
||||
except (ValueError, SyntaxError):
|
||||
# UDS: try to interpret as a straight up str
|
||||
return arg
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
@ -859,19 +859,10 @@ class Context:
|
|||
@property
|
||||
def dst_maddr(self) -> str:
|
||||
chan: Channel = self.chan
|
||||
dst_addr, dst_port = chan.raddr
|
||||
trans: MsgTransport = chan.transport
|
||||
# cid: str = self.cid
|
||||
# cid_head, cid_tail = cid[:6], cid[-6:]
|
||||
return (
|
||||
f'/ipv4/{dst_addr}'
|
||||
f'/{trans.name_key}/{dst_port}'
|
||||
# f'/{self.chan.uid[0]}'
|
||||
# f'/{self.cid}'
|
||||
|
||||
# f'/cid={cid_head}..{cid_tail}'
|
||||
# TODO: ? not use this ^ right ?
|
||||
)
|
||||
return trans.maddr
|
||||
|
||||
dmaddr = dst_maddr
|
||||
|
||||
|
|
|
@ -30,6 +30,12 @@ from contextlib import asynccontextmanager as acm
|
|||
from tractor.log import get_logger
|
||||
from .trionics import gather_contexts
|
||||
from .ipc import _connect_chan, Channel
|
||||
from ._addr import (
|
||||
AddressTypes,
|
||||
Address,
|
||||
preferred_transport,
|
||||
wrap_address
|
||||
)
|
||||
from ._portal import (
|
||||
Portal,
|
||||
open_portal,
|
||||
|
@ -48,11 +54,7 @@ log = get_logger(__name__)
|
|||
|
||||
|
||||
@acm
|
||||
async def get_registry(
|
||||
host: str,
|
||||
port: int,
|
||||
|
||||
) -> AsyncGenerator[
|
||||
async def get_registry(addr: AddressTypes | None = None) -> AsyncGenerator[
|
||||
Portal | LocalPortal | None,
|
||||
None,
|
||||
]:
|
||||
|
@ -69,13 +71,13 @@ async def get_registry(
|
|||
# (likely a re-entrant call from the arbiter actor)
|
||||
yield LocalPortal(
|
||||
actor,
|
||||
Channel((host, port))
|
||||
await Channel.from_addr(addr)
|
||||
)
|
||||
else:
|
||||
# TODO: try to look pre-existing connection from
|
||||
# `Actor._peers` and use it instead?
|
||||
async with (
|
||||
_connect_chan(host, port) as chan,
|
||||
_connect_chan(addr) as chan,
|
||||
open_portal(chan) as regstr_ptl,
|
||||
):
|
||||
yield regstr_ptl
|
||||
|
@ -89,11 +91,10 @@ async def get_root(
|
|||
|
||||
# TODO: rename mailbox to `_root_maddr` when we finally
|
||||
# add and impl libp2p multi-addrs?
|
||||
host, port = _runtime_vars['_root_mailbox']
|
||||
assert host is not None
|
||||
addr = _runtime_vars['_root_mailbox']
|
||||
|
||||
async with (
|
||||
_connect_chan(host, port) as chan,
|
||||
_connect_chan(addr) as chan,
|
||||
open_portal(chan, **kwargs) as portal,
|
||||
):
|
||||
yield portal
|
||||
|
@ -134,10 +135,10 @@ def get_peer_by_name(
|
|||
@acm
|
||||
async def query_actor(
|
||||
name: str,
|
||||
regaddr: tuple[str, int]|None = None,
|
||||
regaddr: AddressTypes|None = None,
|
||||
|
||||
) -> AsyncGenerator[
|
||||
tuple[str, int]|None,
|
||||
AddressTypes|None,
|
||||
None,
|
||||
]:
|
||||
'''
|
||||
|
@ -163,31 +164,31 @@ async def query_actor(
|
|||
return
|
||||
|
||||
reg_portal: Portal
|
||||
regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0]
|
||||
async with get_registry(*regaddr) as reg_portal:
|
||||
regaddr: Address = wrap_address(regaddr) or actor.reg_addrs[0]
|
||||
async with get_registry(regaddr) as reg_portal:
|
||||
# TODO: return portals to all available actors - for now
|
||||
# just the last one that registered
|
||||
sockaddr: tuple[str, int] = await reg_portal.run_from_ns(
|
||||
addr: AddressTypes = await reg_portal.run_from_ns(
|
||||
'self',
|
||||
'find_actor',
|
||||
name=name,
|
||||
)
|
||||
yield sockaddr
|
||||
yield addr
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_open_portal(
|
||||
addr: tuple[str, int],
|
||||
addr: AddressTypes,
|
||||
name: str,
|
||||
):
|
||||
async with query_actor(
|
||||
name=name,
|
||||
regaddr=addr,
|
||||
) as sockaddr:
|
||||
) as addr:
|
||||
pass
|
||||
|
||||
if sockaddr:
|
||||
async with _connect_chan(*sockaddr) as chan:
|
||||
if addr:
|
||||
async with _connect_chan(addr) as chan:
|
||||
async with open_portal(chan) as portal:
|
||||
yield portal
|
||||
else:
|
||||
|
@ -197,7 +198,8 @@ async def maybe_open_portal(
|
|||
@acm
|
||||
async def find_actor(
|
||||
name: str,
|
||||
registry_addrs: list[tuple[str, int]]|None = None,
|
||||
registry_addrs: list[AddressTypes]|None = None,
|
||||
enable_transports: list[str] = [preferred_transport],
|
||||
|
||||
only_first: bool = True,
|
||||
raise_on_none: bool = False,
|
||||
|
@ -224,15 +226,15 @@ async def find_actor(
|
|||
# XXX NOTE: make sure to dynamically read the value on
|
||||
# every call since something may change it globally (eg.
|
||||
# like in our discovery test suite)!
|
||||
from . import _root
|
||||
from ._addr import default_lo_addrs
|
||||
registry_addrs = (
|
||||
_runtime_vars['_registry_addrs']
|
||||
or
|
||||
_root._default_lo_addrs
|
||||
default_lo_addrs(enable_transports)
|
||||
)
|
||||
|
||||
maybe_portals: list[
|
||||
AsyncContextManager[tuple[str, int]]
|
||||
AsyncContextManager[AddressTypes]
|
||||
] = list(
|
||||
maybe_open_portal(
|
||||
addr=addr,
|
||||
|
@ -274,7 +276,7 @@ async def find_actor(
|
|||
@acm
|
||||
async def wait_for_actor(
|
||||
name: str,
|
||||
registry_addr: tuple[str, int] | None = None,
|
||||
registry_addr: AddressTypes | None = None,
|
||||
|
||||
) -> AsyncGenerator[Portal, None]:
|
||||
'''
|
||||
|
@ -291,7 +293,7 @@ async def wait_for_actor(
|
|||
yield peer_portal
|
||||
return
|
||||
|
||||
regaddr: tuple[str, int] = (
|
||||
regaddr: AddressTypes = (
|
||||
registry_addr
|
||||
or
|
||||
actor.reg_addrs[0]
|
||||
|
@ -299,8 +301,8 @@ async def wait_for_actor(
|
|||
# TODO: use `.trionics.gather_contexts()` like
|
||||
# above in `find_actor()` as well?
|
||||
reg_portal: Portal
|
||||
async with get_registry(*regaddr) as reg_portal:
|
||||
sockaddrs = await reg_portal.run_from_ns(
|
||||
async with get_registry(regaddr) as reg_portal:
|
||||
addrs = await reg_portal.run_from_ns(
|
||||
'self',
|
||||
'wait_for_actor',
|
||||
name=name,
|
||||
|
@ -308,8 +310,8 @@ async def wait_for_actor(
|
|||
|
||||
# get latest registered addr by default?
|
||||
# TODO: offer multi-portal yields in multi-homed case?
|
||||
sockaddr: tuple[str, int] = sockaddrs[-1]
|
||||
addr: AddressTypes = addrs[-1]
|
||||
|
||||
async with _connect_chan(*sockaddr) as chan:
|
||||
async with _connect_chan(addr) as chan:
|
||||
async with open_portal(chan) as portal:
|
||||
yield portal
|
||||
|
|
|
@ -37,6 +37,7 @@ from .log import (
|
|||
from . import _state
|
||||
from .devx import _debug
|
||||
from .to_asyncio import run_as_asyncio_guest
|
||||
from ._addr import AddressTypes
|
||||
from ._runtime import (
|
||||
async_main,
|
||||
Actor,
|
||||
|
@ -52,10 +53,10 @@ log = get_logger(__name__)
|
|||
def _mp_main(
|
||||
|
||||
actor: Actor,
|
||||
accept_addrs: list[tuple[str, int]],
|
||||
accept_addrs: list[AddressTypes],
|
||||
forkserver_info: tuple[Any, Any, Any, Any, Any],
|
||||
start_method: SpawnMethodKey,
|
||||
parent_addr: tuple[str, int] | None = None,
|
||||
parent_addr: AddressTypes | None = None,
|
||||
infect_asyncio: bool = False,
|
||||
|
||||
) -> None:
|
||||
|
@ -206,7 +207,7 @@ def nest_from_op(
|
|||
def _trio_main(
|
||||
actor: Actor,
|
||||
*,
|
||||
parent_addr: tuple[str, int] | None = None,
|
||||
parent_addr: AddressTypes | None = None,
|
||||
infect_asyncio: bool = False,
|
||||
|
||||
) -> None:
|
||||
|
|
|
@ -43,21 +43,18 @@ from .devx import _debug
|
|||
from . import _spawn
|
||||
from . import _state
|
||||
from . import log
|
||||
from .ipc import _connect_chan
|
||||
from .ipc import (
|
||||
_connect_chan,
|
||||
)
|
||||
from ._addr import (
|
||||
AddressTypes,
|
||||
wrap_address,
|
||||
preferred_transport,
|
||||
default_lo_addrs
|
||||
)
|
||||
from ._exceptions import is_multi_cancelled
|
||||
|
||||
|
||||
# set at startup and after forks
|
||||
_default_host: str = '127.0.0.1'
|
||||
_default_port: int = 1616
|
||||
|
||||
# default registry always on localhost
|
||||
_default_lo_addrs: list[tuple[str, int]] = [(
|
||||
_default_host,
|
||||
_default_port,
|
||||
)]
|
||||
|
||||
|
||||
logger = log.get_logger('tractor')
|
||||
|
||||
|
||||
|
@ -66,10 +63,12 @@ async def open_root_actor(
|
|||
|
||||
*,
|
||||
# defaults are above
|
||||
registry_addrs: list[tuple[str, int]]|None = None,
|
||||
registry_addrs: list[AddressTypes]|None = None,
|
||||
|
||||
# defaults are above
|
||||
arbiter_addr: tuple[str, int]|None = None,
|
||||
arbiter_addr: tuple[AddressTypes]|None = None,
|
||||
|
||||
enable_transports: list[str] = [preferred_transport],
|
||||
|
||||
name: str|None = 'root',
|
||||
|
||||
|
@ -195,11 +194,9 @@ async def open_root_actor(
|
|||
)
|
||||
registry_addrs = [arbiter_addr]
|
||||
|
||||
registry_addrs: list[tuple[str, int]] = (
|
||||
registry_addrs
|
||||
or
|
||||
_default_lo_addrs
|
||||
)
|
||||
if not registry_addrs:
|
||||
registry_addrs: list[AddressTypes] = default_lo_addrs(enable_transports)
|
||||
|
||||
assert registry_addrs
|
||||
|
||||
loglevel = (
|
||||
|
@ -248,10 +245,10 @@ async def open_root_actor(
|
|||
enable_stack_on_sig()
|
||||
|
||||
# closed into below ping task-func
|
||||
ponged_addrs: list[tuple[str, int]] = []
|
||||
ponged_addrs: list[AddressTypes] = []
|
||||
|
||||
async def ping_tpt_socket(
|
||||
addr: tuple[str, int],
|
||||
addr: AddressTypes,
|
||||
timeout: float = 1,
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -271,7 +268,7 @@ async def open_root_actor(
|
|||
# be better to eventually have a "discovery" protocol
|
||||
# with basic handshake instead?
|
||||
with trio.move_on_after(timeout):
|
||||
async with _connect_chan(*addr):
|
||||
async with _connect_chan(addr):
|
||||
ponged_addrs.append(addr)
|
||||
|
||||
except OSError:
|
||||
|
@ -284,10 +281,10 @@ async def open_root_actor(
|
|||
for addr in registry_addrs:
|
||||
tn.start_soon(
|
||||
ping_tpt_socket,
|
||||
tuple(addr), # TODO: just drop this requirement?
|
||||
addr,
|
||||
)
|
||||
|
||||
trans_bind_addrs: list[tuple[str, int]] = []
|
||||
trans_bind_addrs: list[AddressTypes] = []
|
||||
|
||||
# Create a new local root-actor instance which IS NOT THE
|
||||
# REGISTRAR
|
||||
|
@ -311,9 +308,12 @@ async def open_root_actor(
|
|||
)
|
||||
# DO NOT use the registry_addrs as the transport server
|
||||
# addrs for this new non-registar, root-actor.
|
||||
for host, port in ponged_addrs:
|
||||
# NOTE: zero triggers dynamic OS port allocation
|
||||
trans_bind_addrs.append((host, 0))
|
||||
for addr in ponged_addrs:
|
||||
waddr = wrap_address(addr)
|
||||
print(waddr)
|
||||
trans_bind_addrs.append(
|
||||
waddr.get_random(namespace=waddr.namespace)
|
||||
)
|
||||
|
||||
# Start this local actor as the "registrar", aka a regular
|
||||
# actor who manages the local registry of "mailboxes" of
|
||||
|
@ -322,7 +322,7 @@ async def open_root_actor(
|
|||
|
||||
# NOTE that if the current actor IS THE REGISTAR, the
|
||||
# following init steps are taken:
|
||||
# - the tranport layer server is bound to each (host, port)
|
||||
# - the tranport layer server is bound to each addr
|
||||
# pair defined in provided registry_addrs, or the default.
|
||||
trans_bind_addrs = registry_addrs
|
||||
|
||||
|
@ -462,7 +462,7 @@ def run_daemon(
|
|||
|
||||
# runtime kwargs
|
||||
name: str | None = 'root',
|
||||
registry_addrs: list[tuple[str, int]] = _default_lo_addrs,
|
||||
registry_addrs: list[AddressTypes]|None = None,
|
||||
|
||||
start_method: str | None = None,
|
||||
debug_mode: bool = False,
|
||||
|
|
|
@ -74,6 +74,13 @@ from tractor.msg import (
|
|||
types as msgtypes,
|
||||
)
|
||||
from .ipc import Channel
|
||||
from ._addr import (
|
||||
AddressTypes,
|
||||
Address,
|
||||
wrap_address,
|
||||
preferred_transport,
|
||||
default_lo_addrs
|
||||
)
|
||||
from ._context import (
|
||||
mk_context,
|
||||
Context,
|
||||
|
@ -179,11 +186,11 @@ class Actor:
|
|||
enable_modules: list[str] = [],
|
||||
uid: str|None = None,
|
||||
loglevel: str|None = None,
|
||||
registry_addrs: list[tuple[str, int]]|None = None,
|
||||
registry_addrs: list[AddressTypes]|None = None,
|
||||
spawn_method: str|None = None,
|
||||
|
||||
# TODO: remove!
|
||||
arbiter_addr: tuple[str, int]|None = None,
|
||||
arbiter_addr: AddressTypes|None = None,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -223,7 +230,7 @@ class Actor:
|
|||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
registry_addrs: list[tuple[str, int]] = [arbiter_addr]
|
||||
registry_addrs: list[AddressTypes] = [arbiter_addr]
|
||||
|
||||
# marked by the process spawning backend at startup
|
||||
# will be None for the parent most process started manually
|
||||
|
@ -257,6 +264,7 @@ class Actor:
|
|||
] = {}
|
||||
|
||||
self._listeners: list[trio.abc.Listener] = []
|
||||
self._listen_addrs: list[Address] = []
|
||||
self._parent_chan: Channel|None = None
|
||||
self._forkserver_info: tuple|None = None
|
||||
|
||||
|
@ -269,13 +277,13 @@ class Actor:
|
|||
|
||||
# when provided, init the registry addresses property from
|
||||
# input via the validator.
|
||||
self._reg_addrs: list[tuple[str, int]] = []
|
||||
self._reg_addrs: list[AddressTypes] = []
|
||||
if registry_addrs:
|
||||
self.reg_addrs: list[tuple[str, int]] = registry_addrs
|
||||
self.reg_addrs: list[AddressTypes] = registry_addrs
|
||||
_state._runtime_vars['_registry_addrs'] = registry_addrs
|
||||
|
||||
@property
|
||||
def reg_addrs(self) -> list[tuple[str, int]]:
|
||||
def reg_addrs(self) -> list[AddressTypes]:
|
||||
'''
|
||||
List of (socket) addresses for all known (and contactable)
|
||||
registry actors.
|
||||
|
@ -286,7 +294,7 @@ class Actor:
|
|||
@reg_addrs.setter
|
||||
def reg_addrs(
|
||||
self,
|
||||
addrs: list[tuple[str, int]],
|
||||
addrs: list[AddressTypes],
|
||||
) -> None:
|
||||
if not addrs:
|
||||
log.warning(
|
||||
|
@ -295,16 +303,7 @@ class Actor:
|
|||
)
|
||||
return
|
||||
|
||||
# always sanity check the input list since it's critical
|
||||
# that addrs are correct for discovery sys operation.
|
||||
for addr in addrs:
|
||||
if not isinstance(addr, tuple):
|
||||
raise ValueError(
|
||||
'Expected `Actor.reg_addrs: list[tuple[str, int]]`\n'
|
||||
f'Got {addrs}'
|
||||
)
|
||||
|
||||
self._reg_addrs = addrs
|
||||
self._reg_addrs = addrs
|
||||
|
||||
async def wait_for_peer(
|
||||
self,
|
||||
|
@ -1024,11 +1023,11 @@ class Actor:
|
|||
|
||||
async def _from_parent(
|
||||
self,
|
||||
parent_addr: tuple[str, int]|None,
|
||||
parent_addr: AddressTypes|None,
|
||||
|
||||
) -> tuple[
|
||||
Channel,
|
||||
list[tuple[str, int]]|None,
|
||||
list[AddressTypes]|None,
|
||||
]:
|
||||
'''
|
||||
Bootstrap this local actor's runtime config from its parent by
|
||||
|
@ -1040,16 +1039,13 @@ class Actor:
|
|||
# Connect back to the parent actor and conduct initial
|
||||
# handshake. From this point on if we error, we
|
||||
# attempt to ship the exception back to the parent.
|
||||
chan = Channel(
|
||||
destaddr=parent_addr,
|
||||
)
|
||||
await chan.connect()
|
||||
chan = await Channel.from_addr(wrap_address(parent_addr))
|
||||
|
||||
# TODO: move this into a `Channel.handshake()`?
|
||||
# Initial handshake: swap names.
|
||||
await self._do_handshake(chan)
|
||||
|
||||
accept_addrs: list[tuple[str, int]]|None = None
|
||||
accept_addrs: list[AddressTypes]|None = None
|
||||
|
||||
if self._spawn_method == "trio":
|
||||
|
||||
|
@ -1066,7 +1062,7 @@ class Actor:
|
|||
# if "trace"/"util" mode is enabled?
|
||||
f'{pretty_struct.pformat(spawnspec)}\n'
|
||||
)
|
||||
accept_addrs: list[tuple[str, int]] = spawnspec.bind_addrs
|
||||
accept_addrs: list[AddressTypes] = spawnspec.bind_addrs
|
||||
|
||||
# TODO: another `Struct` for rtvs..
|
||||
rvs: dict[str, Any] = spawnspec._runtime_vars
|
||||
|
@ -1173,8 +1169,7 @@ class Actor:
|
|||
self,
|
||||
handler_nursery: Nursery,
|
||||
*,
|
||||
# (host, port) to bind for channel server
|
||||
listen_sockaddrs: list[tuple[str, int]]|None = None,
|
||||
listen_addrs: list[AddressTypes]|None = None,
|
||||
|
||||
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
|
||||
) -> None:
|
||||
|
@ -1186,41 +1181,45 @@ class Actor:
|
|||
`.cancel_server()` is called.
|
||||
|
||||
'''
|
||||
if listen_sockaddrs is None:
|
||||
listen_sockaddrs = [(None, 0)]
|
||||
if listen_addrs is None:
|
||||
listen_addrs = default_lo_addrs([preferred_transport])
|
||||
|
||||
else:
|
||||
listen_addrs: list[Address] = [
|
||||
wrap_address(a) for a in listen_addrs
|
||||
]
|
||||
|
||||
self._server_down = trio.Event()
|
||||
try:
|
||||
async with trio.open_nursery() as server_n:
|
||||
listeners: list[trio.abc.Listener] = [
|
||||
await addr.open_listener()
|
||||
for addr in listen_addrs
|
||||
]
|
||||
await server_n.start(
|
||||
partial(
|
||||
trio.serve_listeners,
|
||||
handler=self._stream_handler,
|
||||
listeners=listeners,
|
||||
|
||||
for host, port in listen_sockaddrs:
|
||||
listeners: list[trio.abc.Listener] = await server_n.start(
|
||||
partial(
|
||||
trio.serve_tcp,
|
||||
|
||||
handler=self._stream_handler,
|
||||
port=port,
|
||||
host=host,
|
||||
|
||||
# NOTE: configured such that new
|
||||
# connections will stay alive even if
|
||||
# this server is cancelled!
|
||||
handler_nursery=handler_nursery,
|
||||
)
|
||||
# NOTE: configured such that new
|
||||
# connections will stay alive even if
|
||||
# this server is cancelled!
|
||||
handler_nursery=handler_nursery
|
||||
)
|
||||
sockets: list[trio.socket] = [
|
||||
getattr(listener, 'socket', 'unknown socket')
|
||||
for listener in listeners
|
||||
]
|
||||
log.runtime(
|
||||
'Started TCP server(s)\n'
|
||||
f'|_{sockets}\n'
|
||||
)
|
||||
self._listeners.extend(listeners)
|
||||
)
|
||||
log.runtime(
|
||||
'Started server(s)\n'
|
||||
'\n'.join([f'|_{addr}' for addr in listen_addrs])
|
||||
)
|
||||
self._listen_addrs.extend(listen_addrs)
|
||||
self._listeners.extend(listeners)
|
||||
|
||||
task_status.started(server_n)
|
||||
|
||||
finally:
|
||||
for addr in listen_addrs:
|
||||
await addr.close_listener()
|
||||
# signal the server is down since nursery above terminated
|
||||
self._server_down.set()
|
||||
|
||||
|
@ -1579,26 +1578,21 @@ class Actor:
|
|||
return False
|
||||
|
||||
@property
|
||||
def accept_addrs(self) -> list[tuple[str, int]]:
|
||||
def accept_addrs(self) -> list[AddressTypes]:
|
||||
'''
|
||||
All addresses to which the transport-channel server binds
|
||||
and listens for new connections.
|
||||
|
||||
'''
|
||||
# throws OSError on failure
|
||||
return [
|
||||
listener.socket.getsockname()
|
||||
for listener in self._listeners
|
||||
] # type: ignore
|
||||
return [a.unwrap() for a in self._listen_addrs]
|
||||
|
||||
@property
|
||||
def accept_addr(self) -> tuple[str, int]:
|
||||
def accept_addr(self) -> AddressTypes:
|
||||
'''
|
||||
Primary address to which the IPC transport server is
|
||||
bound and listening for new connections.
|
||||
|
||||
'''
|
||||
# throws OSError on failure
|
||||
return self.accept_addrs[0]
|
||||
|
||||
def get_parent(self) -> Portal:
|
||||
|
@ -1670,7 +1664,7 @@ class Actor:
|
|||
|
||||
async def async_main(
|
||||
actor: Actor,
|
||||
accept_addrs: tuple[str, int]|None = None,
|
||||
accept_addrs: AddressTypes|None = None,
|
||||
|
||||
# XXX: currently ``parent_addr`` is only needed for the
|
||||
# ``multiprocessing`` backend (which pickles state sent to
|
||||
|
@ -1679,7 +1673,7 @@ async def async_main(
|
|||
# change this to a simple ``is_subactor: bool`` which will
|
||||
# be False when running as root actor and True when as
|
||||
# a subactor.
|
||||
parent_addr: tuple[str, int]|None = None,
|
||||
parent_addr: AddressTypes|None = None,
|
||||
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
|
@ -1769,7 +1763,7 @@ async def async_main(
|
|||
partial(
|
||||
actor._serve_forever,
|
||||
service_nursery,
|
||||
listen_sockaddrs=accept_addrs,
|
||||
listen_addrs=accept_addrs,
|
||||
)
|
||||
)
|
||||
except OSError as oserr:
|
||||
|
@ -1785,7 +1779,7 @@ async def async_main(
|
|||
|
||||
raise
|
||||
|
||||
accept_addrs: list[tuple[str, int]] = actor.accept_addrs
|
||||
accept_addrs: list[AddressTypes] = actor.accept_addrs
|
||||
|
||||
# NOTE: only set the loopback addr for the
|
||||
# process-tree-global "root" mailbox since
|
||||
|
@ -1793,9 +1787,8 @@ async def async_main(
|
|||
# their root actor over that channel.
|
||||
if _state._runtime_vars['_is_root']:
|
||||
for addr in accept_addrs:
|
||||
host, _ = addr
|
||||
# TODO: generic 'lo' detector predicate
|
||||
if '127.0.0.1' in host:
|
||||
waddr = wrap_address(addr)
|
||||
if waddr == waddr.get_root():
|
||||
_state._runtime_vars['_root_mailbox'] = addr
|
||||
|
||||
# Register with the arbiter if we're told its addr
|
||||
|
@ -1810,24 +1803,21 @@ async def async_main(
|
|||
# only on unique actor uids?
|
||||
for addr in actor.reg_addrs:
|
||||
try:
|
||||
assert isinstance(addr, tuple)
|
||||
assert addr[1] # non-zero after bind
|
||||
waddr = wrap_address(addr)
|
||||
assert waddr.is_valid
|
||||
except AssertionError:
|
||||
await _debug.pause()
|
||||
|
||||
async with get_registry(*addr) as reg_portal:
|
||||
async with get_registry(addr) as reg_portal:
|
||||
for accept_addr in accept_addrs:
|
||||
|
||||
if not accept_addr[1]:
|
||||
await _debug.pause()
|
||||
|
||||
assert accept_addr[1]
|
||||
accept_addr = wrap_address(accept_addr)
|
||||
assert accept_addr.is_valid
|
||||
|
||||
await reg_portal.run_from_ns(
|
||||
'self',
|
||||
'register_actor',
|
||||
uid=actor.uid,
|
||||
sockaddr=accept_addr,
|
||||
addr=accept_addr.unwrap(),
|
||||
)
|
||||
|
||||
is_registered: bool = True
|
||||
|
@ -1954,12 +1944,13 @@ async def async_main(
|
|||
):
|
||||
failed: bool = False
|
||||
for addr in actor.reg_addrs:
|
||||
assert isinstance(addr, tuple)
|
||||
waddr = wrap_address(addr)
|
||||
assert waddr.is_valid
|
||||
with trio.move_on_after(0.5) as cs:
|
||||
cs.shield = True
|
||||
try:
|
||||
async with get_registry(
|
||||
*addr,
|
||||
addr,
|
||||
) as reg_portal:
|
||||
await reg_portal.run_from_ns(
|
||||
'self',
|
||||
|
@ -2037,7 +2028,7 @@ class Arbiter(Actor):
|
|||
|
||||
self._registry: dict[
|
||||
tuple[str, str],
|
||||
tuple[str, int],
|
||||
AddressTypes,
|
||||
] = {}
|
||||
self._waiters: dict[
|
||||
str,
|
||||
|
@ -2053,18 +2044,18 @@ class Arbiter(Actor):
|
|||
self,
|
||||
name: str,
|
||||
|
||||
) -> tuple[str, int]|None:
|
||||
) -> AddressTypes|None:
|
||||
|
||||
for uid, sockaddr in self._registry.items():
|
||||
for uid, addr in self._registry.items():
|
||||
if name in uid:
|
||||
return sockaddr
|
||||
return addr
|
||||
|
||||
return None
|
||||
|
||||
async def get_registry(
|
||||
self
|
||||
|
||||
) -> dict[str, tuple[str, int]]:
|
||||
) -> dict[str, AddressTypes]:
|
||||
'''
|
||||
Return current name registry.
|
||||
|
||||
|
@ -2084,7 +2075,7 @@ class Arbiter(Actor):
|
|||
self,
|
||||
name: str,
|
||||
|
||||
) -> list[tuple[str, int]]:
|
||||
) -> list[AddressTypes]:
|
||||
'''
|
||||
Wait for a particular actor to register.
|
||||
|
||||
|
@ -2092,44 +2083,41 @@ class Arbiter(Actor):
|
|||
registered.
|
||||
|
||||
'''
|
||||
sockaddrs: list[tuple[str, int]] = []
|
||||
sockaddr: tuple[str, int]
|
||||
addrs: list[AddressTypes] = []
|
||||
addr: AddressTypes
|
||||
|
||||
mailbox_info: str = 'Actor registry contact infos:\n'
|
||||
for uid, sockaddr in self._registry.items():
|
||||
for uid, addr in self._registry.items():
|
||||
mailbox_info += (
|
||||
f'|_uid: {uid}\n'
|
||||
f'|_sockaddr: {sockaddr}\n\n'
|
||||
f'|_addr: {addr}\n\n'
|
||||
)
|
||||
if name == uid[0]:
|
||||
sockaddrs.append(sockaddr)
|
||||
addrs.append(addr)
|
||||
|
||||
if not sockaddrs:
|
||||
if not addrs:
|
||||
waiter = trio.Event()
|
||||
self._waiters.setdefault(name, []).append(waiter)
|
||||
await waiter.wait()
|
||||
|
||||
for uid in self._waiters[name]:
|
||||
if not isinstance(uid, trio.Event):
|
||||
sockaddrs.append(self._registry[uid])
|
||||
addrs.append(self._registry[uid])
|
||||
|
||||
log.runtime(mailbox_info)
|
||||
return sockaddrs
|
||||
return addrs
|
||||
|
||||
async def register_actor(
|
||||
self,
|
||||
uid: tuple[str, str],
|
||||
sockaddr: tuple[str, int]
|
||||
|
||||
addr: AddressTypes
|
||||
) -> None:
|
||||
uid = name, hash = (str(uid[0]), str(uid[1]))
|
||||
addr = (host, port) = (
|
||||
str(sockaddr[0]),
|
||||
int(sockaddr[1]),
|
||||
)
|
||||
if port == 0:
|
||||
waddr: Address = wrap_address(addr)
|
||||
if not waddr.is_valid:
|
||||
# should never be 0-dynamic-os-alloc
|
||||
await _debug.pause()
|
||||
assert port # should never be 0-dynamic-os-alloc
|
||||
|
||||
self._registry[uid] = addr
|
||||
|
||||
# pop and signal all waiter events
|
||||
|
|
|
@ -46,6 +46,7 @@ from tractor._state import (
|
|||
_runtime_vars,
|
||||
)
|
||||
from tractor.log import get_logger
|
||||
from tractor._addr import AddressTypes
|
||||
from tractor._portal import Portal
|
||||
from tractor._runtime import Actor
|
||||
from tractor._entry import _mp_main
|
||||
|
@ -392,8 +393,8 @@ async def new_proc(
|
|||
errors: dict[tuple[str, str], Exception],
|
||||
|
||||
# passed through to actor main
|
||||
bind_addrs: list[tuple[str, int]],
|
||||
parent_addr: tuple[str, int],
|
||||
bind_addrs: list[AddressTypes],
|
||||
parent_addr: AddressTypes,
|
||||
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
||||
|
||||
*,
|
||||
|
@ -431,8 +432,8 @@ async def trio_proc(
|
|||
errors: dict[tuple[str, str], Exception],
|
||||
|
||||
# passed through to actor main
|
||||
bind_addrs: list[tuple[str, int]],
|
||||
parent_addr: tuple[str, int],
|
||||
bind_addrs: list[AddressTypes],
|
||||
parent_addr: AddressTypes,
|
||||
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
||||
*,
|
||||
infect_asyncio: bool = False,
|
||||
|
@ -520,15 +521,15 @@ async def trio_proc(
|
|||
|
||||
# send a "spawning specification" which configures the
|
||||
# initial runtime state of the child.
|
||||
await chan.send(
|
||||
SpawnSpec(
|
||||
_parent_main_data=subactor._parent_main_data,
|
||||
enable_modules=subactor.enable_modules,
|
||||
reg_addrs=subactor.reg_addrs,
|
||||
bind_addrs=bind_addrs,
|
||||
_runtime_vars=_runtime_vars,
|
||||
)
|
||||
sspec = SpawnSpec(
|
||||
_parent_main_data=subactor._parent_main_data,
|
||||
enable_modules=subactor.enable_modules,
|
||||
reg_addrs=subactor.reg_addrs,
|
||||
bind_addrs=bind_addrs,
|
||||
_runtime_vars=_runtime_vars,
|
||||
)
|
||||
log.runtime(f'Sending spawn spec: {str(sspec)}')
|
||||
await chan.send(sspec)
|
||||
|
||||
# track subactor in current nursery
|
||||
curr_actor: Actor = current_actor()
|
||||
|
@ -638,8 +639,8 @@ async def mp_proc(
|
|||
subactor: Actor,
|
||||
errors: dict[tuple[str, str], Exception],
|
||||
# passed through to actor main
|
||||
bind_addrs: list[tuple[str, int]],
|
||||
parent_addr: tuple[str, int],
|
||||
bind_addrs: list[AddressTypes],
|
||||
parent_addr: AddressTypes,
|
||||
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
||||
*,
|
||||
infect_asyncio: bool = False,
|
||||
|
|
|
@ -28,7 +28,13 @@ import warnings
|
|||
|
||||
import trio
|
||||
|
||||
|
||||
from .devx._debug import maybe_wait_for_debugger
|
||||
from ._addr import (
|
||||
AddressTypes,
|
||||
preferred_transport,
|
||||
get_address_cls
|
||||
)
|
||||
from ._state import current_actor, is_main_process
|
||||
from .log import get_logger, get_loglevel
|
||||
from ._runtime import Actor
|
||||
|
@ -47,8 +53,6 @@ if TYPE_CHECKING:
|
|||
|
||||
log = get_logger(__name__)
|
||||
|
||||
_default_bind_addr: tuple[str, int] = ('127.0.0.1', 0)
|
||||
|
||||
|
||||
class ActorNursery:
|
||||
'''
|
||||
|
@ -130,8 +134,9 @@ class ActorNursery:
|
|||
|
||||
*,
|
||||
|
||||
bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
|
||||
bind_addrs: list[AddressTypes]|None = None,
|
||||
rpc_module_paths: list[str]|None = None,
|
||||
enable_transports: list[str] = [preferred_transport],
|
||||
enable_modules: list[str]|None = None,
|
||||
loglevel: str|None = None, # set log level per subactor
|
||||
debug_mode: bool|None = None,
|
||||
|
@ -156,6 +161,12 @@ class ActorNursery:
|
|||
or get_loglevel()
|
||||
)
|
||||
|
||||
if not bind_addrs:
|
||||
bind_addrs: list[AddressTypes] = [
|
||||
get_address_cls(transport).get_random().unwrap()
|
||||
for transport in enable_transports
|
||||
]
|
||||
|
||||
# configure and pass runtime state
|
||||
_rtv = _state._runtime_vars.copy()
|
||||
_rtv['_is_root'] = False
|
||||
|
@ -224,7 +235,7 @@ class ActorNursery:
|
|||
*,
|
||||
|
||||
name: str | None = None,
|
||||
bind_addrs: tuple[str, int] = [_default_bind_addr],
|
||||
bind_addrs: AddressTypes|None = None,
|
||||
rpc_module_paths: list[str] | None = None,
|
||||
enable_modules: list[str] | None = None,
|
||||
loglevel: str | None = None, # set log level per subactor
|
||||
|
|
|
@ -13,20 +13,25 @@
|
|||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
|
||||
import platform
|
||||
|
||||
from ._transport import MsgTransport as MsgTransport
|
||||
from ._transport import (
|
||||
MsgTransportKey as MsgTransportKey,
|
||||
MsgType as MsgType,
|
||||
MsgTransport as MsgTransport,
|
||||
MsgpackTransport as MsgpackTransport
|
||||
)
|
||||
|
||||
from ._tcp import (
|
||||
get_stream_addrs as get_stream_addrs,
|
||||
MsgpackTCPStream as MsgpackTCPStream
|
||||
from ._tcp import MsgpackTCPStream as MsgpackTCPStream
|
||||
from ._uds import MsgpackUDSStream as MsgpackUDSStream
|
||||
|
||||
from ._types import (
|
||||
transport_from_addr as transport_from_addr,
|
||||
transport_from_stream as transport_from_stream,
|
||||
)
|
||||
|
||||
from ._chan import (
|
||||
_connect_chan as _connect_chan,
|
||||
get_msg_transport as get_msg_transport,
|
||||
Channel as Channel
|
||||
)
|
||||
|
||||
|
|
|
@ -29,15 +29,19 @@ from pprint import pformat
|
|||
import typing
|
||||
from typing import (
|
||||
Any,
|
||||
Type
|
||||
)
|
||||
|
||||
import trio
|
||||
|
||||
from tractor.ipc._transport import MsgTransport
|
||||
from tractor.ipc._tcp import (
|
||||
MsgpackTCPStream,
|
||||
get_stream_addrs
|
||||
from tractor.ipc._types import (
|
||||
transport_from_addr,
|
||||
transport_from_stream,
|
||||
)
|
||||
from tractor._addr import (
|
||||
wrap_address,
|
||||
Address,
|
||||
AddressTypes
|
||||
)
|
||||
from tractor.log import get_logger
|
||||
from tractor._exceptions import (
|
||||
|
@ -52,17 +56,6 @@ log = get_logger(__name__)
|
|||
_is_windows = platform.system() == 'Windows'
|
||||
|
||||
|
||||
def get_msg_transport(
|
||||
|
||||
key: tuple[str, str],
|
||||
|
||||
) -> Type[MsgTransport]:
|
||||
|
||||
return {
|
||||
('msgpack', 'tcp'): MsgpackTCPStream,
|
||||
}[key]
|
||||
|
||||
|
||||
class Channel:
|
||||
'''
|
||||
An inter-process channel for communication between (remote) actors.
|
||||
|
@ -77,10 +70,7 @@ class Channel:
|
|||
def __init__(
|
||||
|
||||
self,
|
||||
destaddr: tuple[str, int]|None,
|
||||
|
||||
msg_transport_type_key: tuple[str, str] = ('msgpack', 'tcp'),
|
||||
|
||||
transport: MsgTransport|None = None,
|
||||
# TODO: optional reconnection support?
|
||||
# auto_reconnect: bool = False,
|
||||
# on_reconnect: typing.Callable[..., typing.Awaitable] = None,
|
||||
|
@ -90,13 +80,9 @@ class Channel:
|
|||
# self._recon_seq = on_reconnect
|
||||
# self._autorecon = auto_reconnect
|
||||
|
||||
self._destaddr = destaddr
|
||||
self._transport_key = msg_transport_type_key
|
||||
|
||||
# Either created in ``.connect()`` or passed in by
|
||||
# user in ``.from_stream()``.
|
||||
self._stream: trio.SocketStream|None = None
|
||||
self._transport: MsgTransport|None = None
|
||||
self._transport: MsgTransport|None = transport
|
||||
|
||||
# set after handshake - always uid of far end
|
||||
self.uid: tuple[str, str]|None = None
|
||||
|
@ -110,6 +96,10 @@ class Channel:
|
|||
# runtime.
|
||||
self._cancel_called: bool = False
|
||||
|
||||
@property
|
||||
def stream(self) -> trio.abc.Stream | None:
|
||||
return self._transport.stream if self._transport else None
|
||||
|
||||
@property
|
||||
def msgstream(self) -> MsgTransport:
|
||||
log.info(
|
||||
|
@ -124,52 +114,32 @@ class Channel:
|
|||
@classmethod
|
||||
def from_stream(
|
||||
cls,
|
||||
stream: trio.SocketStream,
|
||||
**kwargs,
|
||||
|
||||
stream: trio.abc.Stream,
|
||||
) -> Channel:
|
||||
|
||||
src, dst = get_stream_addrs(stream)
|
||||
chan = Channel(
|
||||
destaddr=dst,
|
||||
**kwargs,
|
||||
transport_cls = transport_from_stream(stream)
|
||||
return Channel(
|
||||
transport=transport_cls(stream)
|
||||
)
|
||||
|
||||
# set immediately here from provided instance
|
||||
chan._stream: trio.SocketStream = stream
|
||||
chan.set_msg_transport(stream)
|
||||
return chan
|
||||
@classmethod
|
||||
async def from_addr(
|
||||
cls,
|
||||
addr: AddressTypes,
|
||||
**kwargs
|
||||
) -> Channel:
|
||||
addr: Address = wrap_address(addr)
|
||||
transport_cls = transport_from_addr(addr)
|
||||
transport = await transport_cls.connect_to(addr, **kwargs)
|
||||
|
||||
def set_msg_transport(
|
||||
self,
|
||||
stream: trio.SocketStream,
|
||||
type_key: tuple[str, str]|None = None,
|
||||
|
||||
# XXX optionally provided codec pair for `msgspec`:
|
||||
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
|
||||
codec: MsgCodec|None = None,
|
||||
|
||||
) -> MsgTransport:
|
||||
type_key = (
|
||||
type_key
|
||||
or
|
||||
self._transport_key
|
||||
log.transport(
|
||||
f'Opened channel[{type(transport)}]: {transport.laddr} -> {transport.raddr}'
|
||||
)
|
||||
# get transport type, then
|
||||
self._transport = get_msg_transport(
|
||||
type_key
|
||||
# instantiate an instance of the msg-transport
|
||||
)(
|
||||
stream,
|
||||
codec=codec,
|
||||
)
|
||||
return self._transport
|
||||
return Channel(transport=transport)
|
||||
|
||||
@cm
|
||||
def apply_codec(
|
||||
self,
|
||||
codec: MsgCodec,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Temporarily override the underlying IPC msg codec for
|
||||
|
@ -189,44 +159,20 @@ class Channel:
|
|||
return '<Channel with inactive transport?>'
|
||||
|
||||
return repr(
|
||||
self._transport.stream.socket._sock
|
||||
self._transport
|
||||
).replace( # type: ignore
|
||||
"socket.socket",
|
||||
"Channel",
|
||||
)
|
||||
|
||||
@property
|
||||
def laddr(self) -> tuple[str, int]|None:
|
||||
def laddr(self) -> Address|None:
|
||||
return self._transport.laddr if self._transport else None
|
||||
|
||||
@property
|
||||
def raddr(self) -> tuple[str, int]|None:
|
||||
def raddr(self) -> Address|None:
|
||||
return self._transport.raddr if self._transport else None
|
||||
|
||||
async def connect(
|
||||
self,
|
||||
destaddr: tuple[Any, ...] | None = None,
|
||||
**kwargs
|
||||
|
||||
) -> MsgTransport:
|
||||
|
||||
if self.connected():
|
||||
raise RuntimeError("channel is already connected?")
|
||||
|
||||
destaddr = destaddr or self._destaddr
|
||||
assert isinstance(destaddr, tuple)
|
||||
|
||||
stream = await trio.open_tcp_stream(
|
||||
*destaddr,
|
||||
**kwargs
|
||||
)
|
||||
transport = self.set_msg_transport(stream)
|
||||
|
||||
log.transport(
|
||||
f'Opened channel[{type(transport)}]: {self.laddr} -> {self.raddr}'
|
||||
)
|
||||
return transport
|
||||
|
||||
# TODO: something like,
|
||||
# `pdbp.hideframe_on(errors=[MsgTypeError])`
|
||||
# instead of the `try/except` hack we have rn..
|
||||
|
@ -261,7 +207,11 @@ class Channel:
|
|||
# assert err
|
||||
__tracebackhide__: bool = False
|
||||
else:
|
||||
assert err.cid
|
||||
try:
|
||||
assert err.cid
|
||||
|
||||
except KeyError:
|
||||
raise err
|
||||
|
||||
raise
|
||||
|
||||
|
@ -388,17 +338,14 @@ class Channel:
|
|||
|
||||
@acm
|
||||
async def _connect_chan(
|
||||
host: str,
|
||||
port: int
|
||||
|
||||
addr: AddressTypes
|
||||
) -> typing.AsyncGenerator[Channel, None]:
|
||||
'''
|
||||
Create and connect a channel with disconnect on context manager
|
||||
teardown.
|
||||
|
||||
'''
|
||||
chan = Channel((host, port))
|
||||
await chan.connect()
|
||||
chan = await Channel.from_addr(addr)
|
||||
yield chan
|
||||
with trio.CancelScope(shield=True):
|
||||
await chan.aclose()
|
||||
|
|
|
@ -183,6 +183,9 @@ class RingBuffSender(trio.abc.SendStream):
|
|||
def wrap_fd(self) -> int:
|
||||
return self._wrap_event.fd
|
||||
|
||||
async def _wait_wrap(self):
|
||||
await self._wrap_event.read()
|
||||
|
||||
async def send_all(self, data: Buffer):
|
||||
async with self._send_lock:
|
||||
# while data is larger than the remaining buf
|
||||
|
@ -193,7 +196,7 @@ class RingBuffSender(trio.abc.SendStream):
|
|||
self._shm.buf[self.ptr:] = data[:remaining]
|
||||
# signal write and wait for reader wrap around
|
||||
self._write_event.write(remaining)
|
||||
await self._wrap_event.read()
|
||||
await self._wait_wrap()
|
||||
|
||||
# wrap around and trim already written bytes
|
||||
self._ptr = 0
|
||||
|
|
|
@ -18,388 +18,88 @@ TCP implementation of tractor.ipc._transport.MsgTransport protocol
|
|||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from collections.abc import (
|
||||
AsyncGenerator,
|
||||
AsyncIterator,
|
||||
)
|
||||
import struct
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
)
|
||||
|
||||
import msgspec
|
||||
from tricycle import BufferedReceiveStream
|
||||
import trio
|
||||
|
||||
from tractor.msg import MsgCodec
|
||||
from tractor.log import get_logger
|
||||
from tractor._exceptions import (
|
||||
MsgTypeError,
|
||||
TransportClosed,
|
||||
_mk_send_mte,
|
||||
_mk_recv_mte,
|
||||
)
|
||||
from tractor.msg import (
|
||||
_ctxvar_MsgCodec,
|
||||
# _codec, XXX see `self._codec` sanity/debug checks
|
||||
MsgCodec,
|
||||
types as msgtypes,
|
||||
pretty_struct,
|
||||
)
|
||||
from tractor.ipc import MsgTransport
|
||||
from tractor._addr import TCPAddress
|
||||
from tractor.ipc._transport import MsgpackTransport
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
def get_stream_addrs(
|
||||
stream: trio.SocketStream
|
||||
) -> tuple[
|
||||
tuple[str, int], # local
|
||||
tuple[str, int], # remote
|
||||
]:
|
||||
'''
|
||||
Return the `trio` streaming transport prot's socket-addrs for
|
||||
both the local and remote sides as a pair.
|
||||
|
||||
'''
|
||||
# rn, should both be IP sockets
|
||||
lsockname = stream.socket.getsockname()
|
||||
rsockname = stream.socket.getpeername()
|
||||
return (
|
||||
tuple(lsockname[:2]),
|
||||
tuple(rsockname[:2]),
|
||||
)
|
||||
|
||||
|
||||
# TODO: typing oddity.. not sure why we have to inherit here, but it
|
||||
# seems to be an issue with `get_msg_transport()` returning
|
||||
# a `Type[Protocol]`; probably should make a `mypy` issue?
|
||||
class MsgpackTCPStream(MsgTransport):
|
||||
class MsgpackTCPStream(MsgpackTransport):
|
||||
'''
|
||||
A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
||||
using the ``msgspec`` codec lib.
|
||||
|
||||
'''
|
||||
address_type = TCPAddress
|
||||
layer_key: int = 4
|
||||
name_key: str = 'tcp'
|
||||
|
||||
# TODO: better naming for this?
|
||||
# -[ ] check how libp2p does naming for such things?
|
||||
codec_key: str = 'msgpack'
|
||||
# def __init__(
|
||||
# self,
|
||||
# stream: trio.SocketStream,
|
||||
# prefix_size: int = 4,
|
||||
# codec: CodecType = None,
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
stream: trio.SocketStream,
|
||||
prefix_size: int = 4,
|
||||
# ) -> None:
|
||||
# super().__init__(
|
||||
# stream,
|
||||
# prefix_size=prefix_size,
|
||||
# codec=codec
|
||||
# )
|
||||
|
||||
# XXX optionally provided codec pair for `msgspec`:
|
||||
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
|
||||
#
|
||||
# TODO: define this as a `Codec` struct which can be
|
||||
# overriden dynamically by the application/runtime?
|
||||
codec: tuple[
|
||||
Callable[[Any], Any]|None, # coder
|
||||
Callable[[type, Any], Any]|None, # decoder
|
||||
]|None = None,
|
||||
@property
|
||||
def maddr(self) -> str:
|
||||
host, port = self.raddr.unwrap()
|
||||
return (
|
||||
f'/ipv4/{host}'
|
||||
f'/{self.address_type.name_key}/{port}'
|
||||
# f'/{self.chan.uid[0]}'
|
||||
# f'/{self.cid}'
|
||||
|
||||
) -> None:
|
||||
|
||||
self.stream = stream
|
||||
assert self.stream.socket
|
||||
|
||||
# should both be IP sockets
|
||||
self._laddr, self._raddr = get_stream_addrs(stream)
|
||||
|
||||
# create read loop instance
|
||||
self._aiter_pkts = self._iter_packets()
|
||||
self._send_lock = trio.StrictFIFOLock()
|
||||
|
||||
# public i guess?
|
||||
self.drained: list[dict] = []
|
||||
|
||||
self.recv_stream = BufferedReceiveStream(
|
||||
transport_stream=stream
|
||||
# f'/cid={cid_head}..{cid_tail}'
|
||||
# TODO: ? not use this ^ right ?
|
||||
)
|
||||
self.prefix_size = prefix_size
|
||||
|
||||
# allow for custom IPC msg interchange format
|
||||
# dynamic override Bo
|
||||
self._task = trio.lowlevel.current_task()
|
||||
|
||||
# XXX for ctxvar debug only!
|
||||
# self._codec: MsgCodec = (
|
||||
# codec
|
||||
# or
|
||||
# _codec._ctxvar_MsgCodec.get()
|
||||
# )
|
||||
|
||||
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
|
||||
'''
|
||||
Yield `bytes`-blob decoded packets from the underlying TCP
|
||||
stream using the current task's `MsgCodec`.
|
||||
|
||||
This is a streaming routine implemented as an async generator
|
||||
func (which was the original design, but could be changed?)
|
||||
and is allocated by a `.__call__()` inside `.__init__()` where
|
||||
it is assigned to the `._aiter_pkts` attr.
|
||||
|
||||
'''
|
||||
decodes_failed: int = 0
|
||||
|
||||
while True:
|
||||
try:
|
||||
header: bytes = await self.recv_stream.receive_exactly(4)
|
||||
except (
|
||||
ValueError,
|
||||
ConnectionResetError,
|
||||
|
||||
# not sure entirely why we need this but without it we
|
||||
# seem to be getting racy failures here on
|
||||
# arbiter/registry name subs..
|
||||
trio.BrokenResourceError,
|
||||
|
||||
) as trans_err:
|
||||
|
||||
loglevel = 'transport'
|
||||
match trans_err:
|
||||
# case (
|
||||
# ConnectionResetError()
|
||||
# ):
|
||||
# loglevel = 'transport'
|
||||
|
||||
# peer actor (graceful??) TCP EOF but `tricycle`
|
||||
# seems to raise a 0-bytes-read?
|
||||
case ValueError() if (
|
||||
'unclean EOF' in trans_err.args[0]
|
||||
):
|
||||
pass
|
||||
|
||||
# peer actor (task) prolly shutdown quickly due
|
||||
# to cancellation
|
||||
case trio.BrokenResourceError() if (
|
||||
'Connection reset by peer' in trans_err.args[0]
|
||||
):
|
||||
pass
|
||||
|
||||
# unless the disconnect condition falls under "a
|
||||
# normal operation breakage" we usualy console warn
|
||||
# about it.
|
||||
case _:
|
||||
loglevel: str = 'warning'
|
||||
|
||||
|
||||
raise TransportClosed(
|
||||
message=(
|
||||
f'IPC transport already closed by peer\n'
|
||||
f'x)> {type(trans_err)}\n'
|
||||
f' |_{self}\n'
|
||||
),
|
||||
loglevel=loglevel,
|
||||
) from trans_err
|
||||
|
||||
# XXX definitely can happen if transport is closed
|
||||
# manually by another `trio.lowlevel.Task` in the
|
||||
# same actor; we use this in some simulated fault
|
||||
# testing for ex, but generally should never happen
|
||||
# under normal operation!
|
||||
#
|
||||
# NOTE: as such we always re-raise this error from the
|
||||
# RPC msg loop!
|
||||
except trio.ClosedResourceError as closure_err:
|
||||
raise TransportClosed(
|
||||
message=(
|
||||
f'IPC transport already manually closed locally?\n'
|
||||
f'x)> {type(closure_err)} \n'
|
||||
f' |_{self}\n'
|
||||
),
|
||||
loglevel='error',
|
||||
raise_on_report=(
|
||||
closure_err.args[0] == 'another task closed this fd'
|
||||
or
|
||||
closure_err.args[0] in ['another task closed this fd']
|
||||
),
|
||||
) from closure_err
|
||||
|
||||
# graceful TCP EOF disconnect
|
||||
if header == b'':
|
||||
raise TransportClosed(
|
||||
message=(
|
||||
f'IPC transport already gracefully closed\n'
|
||||
f')>\n'
|
||||
f'|_{self}\n'
|
||||
),
|
||||
loglevel='transport',
|
||||
# cause=??? # handy or no?
|
||||
)
|
||||
|
||||
size: int
|
||||
size, = struct.unpack("<I", header)
|
||||
|
||||
log.transport(f'received header {size}') # type: ignore
|
||||
msg_bytes: bytes = await self.recv_stream.receive_exactly(size)
|
||||
|
||||
log.transport(f"received {msg_bytes}") # type: ignore
|
||||
try:
|
||||
# NOTE: lookup the `trio.Task.context`'s var for
|
||||
# the current `MsgCodec`.
|
||||
codec: MsgCodec = _ctxvar_MsgCodec.get()
|
||||
|
||||
# XXX for ctxvar debug only!
|
||||
# if self._codec.pld_spec != codec.pld_spec:
|
||||
# assert (
|
||||
# task := trio.lowlevel.current_task()
|
||||
# ) is not self._task
|
||||
# self._task = task
|
||||
# self._codec = codec
|
||||
# log.runtime(
|
||||
# f'Using new codec in {self}.recv()\n'
|
||||
# f'codec: {self._codec}\n\n'
|
||||
# f'msg_bytes: {msg_bytes}\n'
|
||||
# )
|
||||
yield codec.decode(msg_bytes)
|
||||
|
||||
# XXX NOTE: since the below error derives from
|
||||
# `DecodeError` we need to catch is specially
|
||||
# and always raise such that spec violations
|
||||
# are never allowed to be caught silently!
|
||||
except msgspec.ValidationError as verr:
|
||||
msgtyperr: MsgTypeError = _mk_recv_mte(
|
||||
msg=msg_bytes,
|
||||
codec=codec,
|
||||
src_validation_error=verr,
|
||||
)
|
||||
# XXX deliver up to `Channel.recv()` where
|
||||
# a re-raise and `Error`-pack can inject the far
|
||||
# end actor `.uid`.
|
||||
yield msgtyperr
|
||||
|
||||
except (
|
||||
msgspec.DecodeError,
|
||||
UnicodeDecodeError,
|
||||
):
|
||||
if decodes_failed < 4:
|
||||
# ignore decoding errors for now and assume they have to
|
||||
# do with a channel drop - hope that receiving from the
|
||||
# channel will raise an expected error and bubble up.
|
||||
try:
|
||||
msg_str: str|bytes = msg_bytes.decode()
|
||||
except UnicodeDecodeError:
|
||||
msg_str = msg_bytes
|
||||
|
||||
log.exception(
|
||||
'Failed to decode msg?\n'
|
||||
f'{codec}\n\n'
|
||||
'Rxed bytes from wire:\n\n'
|
||||
f'{msg_str!r}\n'
|
||||
)
|
||||
decodes_failed += 1
|
||||
else:
|
||||
raise
|
||||
|
||||
async def send(
|
||||
self,
|
||||
msg: msgtypes.MsgType,
|
||||
|
||||
strict_types: bool = True,
|
||||
hide_tb: bool = False,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Send a msgpack encoded py-object-blob-as-msg over TCP.
|
||||
|
||||
If `strict_types == True` then a `MsgTypeError` will be raised on any
|
||||
invalid msg type
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
# XXX see `trio._sync.AsyncContextManagerMixin` for details
|
||||
# on the `.acquire()`/`.release()` sequencing..
|
||||
async with self._send_lock:
|
||||
|
||||
# NOTE: lookup the `trio.Task.context`'s var for
|
||||
# the current `MsgCodec`.
|
||||
codec: MsgCodec = _ctxvar_MsgCodec.get()
|
||||
|
||||
# XXX for ctxvar debug only!
|
||||
# if self._codec.pld_spec != codec.pld_spec:
|
||||
# self._codec = codec
|
||||
# log.runtime(
|
||||
# f'Using new codec in {self}.send()\n'
|
||||
# f'codec: {self._codec}\n\n'
|
||||
# f'msg: {msg}\n'
|
||||
# )
|
||||
|
||||
if type(msg) not in msgtypes.__msg_types__:
|
||||
if strict_types:
|
||||
raise _mk_send_mte(
|
||||
msg,
|
||||
codec=codec,
|
||||
)
|
||||
else:
|
||||
log.warning(
|
||||
'Sending non-`Msg`-spec msg?\n\n'
|
||||
f'{msg}\n'
|
||||
)
|
||||
|
||||
try:
|
||||
bytes_data: bytes = codec.encode(msg)
|
||||
except TypeError as _err:
|
||||
typerr = _err
|
||||
msgtyperr: MsgTypeError = _mk_send_mte(
|
||||
msg,
|
||||
codec=codec,
|
||||
message=(
|
||||
f'IPC-msg-spec violation in\n\n'
|
||||
f'{pretty_struct.Struct.pformat(msg)}'
|
||||
),
|
||||
src_type_error=typerr,
|
||||
)
|
||||
raise msgtyperr from typerr
|
||||
|
||||
# supposedly the fastest says,
|
||||
# https://stackoverflow.com/a/54027962
|
||||
size: bytes = struct.pack("<I", len(bytes_data))
|
||||
return await self.stream.send_all(size + bytes_data)
|
||||
|
||||
# ?TODO? does it help ever to dynamically show this
|
||||
# frame?
|
||||
# try:
|
||||
# <the-above_code>
|
||||
# except BaseException as _err:
|
||||
# err = _err
|
||||
# if not isinstance(err, MsgTypeError):
|
||||
# __tracebackhide__: bool = False
|
||||
# raise
|
||||
|
||||
@property
|
||||
def laddr(self) -> tuple[str, int]:
|
||||
return self._laddr
|
||||
|
||||
@property
|
||||
def raddr(self) -> tuple[str, int]:
|
||||
return self._raddr
|
||||
|
||||
async def recv(self) -> Any:
|
||||
return await self._aiter_pkts.asend(None)
|
||||
|
||||
async def drain(self) -> AsyncIterator[dict]:
|
||||
'''
|
||||
Drain the stream's remaining messages sent from
|
||||
the far end until the connection is closed by
|
||||
the peer.
|
||||
|
||||
'''
|
||||
try:
|
||||
async for msg in self._iter_packets():
|
||||
self.drained.append(msg)
|
||||
except TransportClosed:
|
||||
for msg in self.drained:
|
||||
yield msg
|
||||
|
||||
def __aiter__(self):
|
||||
return self._aiter_pkts
|
||||
|
||||
def connected(self) -> bool:
|
||||
return self.stream.socket.fileno() != -1
|
||||
|
||||
@classmethod
|
||||
async def connect_to(
|
||||
cls,
|
||||
destaddr: TCPAddress,
|
||||
prefix_size: int = 4,
|
||||
codec: MsgCodec|None = None,
|
||||
**kwargs
|
||||
) -> MsgpackTCPStream:
|
||||
stream = await trio.open_tcp_stream(
|
||||
*destaddr.unwrap(),
|
||||
**kwargs
|
||||
)
|
||||
return MsgpackTCPStream(
|
||||
stream,
|
||||
prefix_size=prefix_size,
|
||||
codec=codec
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_stream_addrs(
|
||||
cls,
|
||||
stream: trio.SocketStream
|
||||
) -> tuple[
|
||||
tuple[str, int],
|
||||
tuple[str, int]
|
||||
]:
|
||||
lsockname = stream.socket.getsockname()
|
||||
rsockname = stream.socket.getpeername()
|
||||
return (
|
||||
TCPAddress.from_addr(tuple(lsockname[:2])),
|
||||
TCPAddress.from_addr(tuple(rsockname[:2])),
|
||||
)
|
||||
|
|
|
@ -18,13 +18,45 @@ typing.Protocol based generic msg API, implement this class to add backends for
|
|||
tractor.ipc.Channel
|
||||
|
||||
'''
|
||||
import trio
|
||||
from __future__ import annotations
|
||||
from typing import (
|
||||
runtime_checkable,
|
||||
Type,
|
||||
Protocol,
|
||||
TypeVar,
|
||||
ClassVar
|
||||
)
|
||||
from collections.abc import AsyncIterator
|
||||
from collections.abc import (
|
||||
AsyncGenerator,
|
||||
AsyncIterator,
|
||||
)
|
||||
import struct
|
||||
|
||||
import trio
|
||||
import msgspec
|
||||
from tricycle import BufferedReceiveStream
|
||||
|
||||
from tractor.log import get_logger
|
||||
from tractor._exceptions import (
|
||||
MsgTypeError,
|
||||
TransportClosed,
|
||||
_mk_send_mte,
|
||||
_mk_recv_mte,
|
||||
)
|
||||
from tractor.msg import (
|
||||
_ctxvar_MsgCodec,
|
||||
# _codec, XXX see `self._codec` sanity/debug checks
|
||||
MsgCodec,
|
||||
types as msgtypes,
|
||||
pretty_struct,
|
||||
)
|
||||
from tractor._addr import Address
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
# (codec, transport)
|
||||
MsgTransportKey = tuple[str, str]
|
||||
|
||||
|
||||
# from tractor.msg.types import MsgType
|
||||
|
@ -44,8 +76,8 @@ class MsgTransport(Protocol[MsgType]):
|
|||
stream: trio.abc.Stream
|
||||
drained: list[MsgType]
|
||||
|
||||
def __init__(self, stream: trio.abc.Stream) -> None:
|
||||
...
|
||||
address_type: ClassVar[Type[Address]]
|
||||
codec_key: ClassVar[str]
|
||||
|
||||
# XXX: should this instead be called `.sendall()`?
|
||||
async def send(self, msg: MsgType) -> None:
|
||||
|
@ -65,10 +97,354 @@ class MsgTransport(Protocol[MsgType]):
|
|||
def drain(self) -> AsyncIterator[dict]:
|
||||
...
|
||||
|
||||
@classmethod
|
||||
def key(cls) -> MsgTransportKey:
|
||||
return cls.codec_key, cls.address_type.name_key
|
||||
|
||||
@property
|
||||
def laddr(self) -> tuple[str, int]:
|
||||
def laddr(self) -> Address:
|
||||
...
|
||||
|
||||
@property
|
||||
def raddr(self) -> tuple[str, int]:
|
||||
def raddr(self) -> Address:
|
||||
...
|
||||
|
||||
@property
|
||||
def maddr(self) -> str:
|
||||
...
|
||||
|
||||
@classmethod
|
||||
async def connect_to(
|
||||
cls,
|
||||
addr: Address,
|
||||
**kwargs
|
||||
) -> MsgTransport:
|
||||
...
|
||||
|
||||
@classmethod
|
||||
def get_stream_addrs(
|
||||
cls,
|
||||
stream: trio.abc.Stream
|
||||
) -> tuple[
|
||||
Address, # local
|
||||
Address # remote
|
||||
]:
|
||||
'''
|
||||
Return the `trio` streaming transport prot's addrs for both
|
||||
the local and remote sides as a pair.
|
||||
|
||||
'''
|
||||
...
|
||||
|
||||
|
||||
class MsgpackTransport(MsgTransport):
|
||||
|
||||
# TODO: better naming for this?
|
||||
# -[ ] check how libp2p does naming for such things?
|
||||
codec_key: str = 'msgpack'
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
stream: trio.abc.Stream,
|
||||
prefix_size: int = 4,
|
||||
|
||||
# XXX optionally provided codec pair for `msgspec`:
|
||||
# https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
|
||||
#
|
||||
# TODO: define this as a `Codec` struct which can be
|
||||
# overriden dynamically by the application/runtime?
|
||||
codec: MsgCodec = None,
|
||||
|
||||
) -> None:
|
||||
self.stream = stream
|
||||
self._laddr, self._raddr = self.get_stream_addrs(stream)
|
||||
|
||||
# create read loop instance
|
||||
self._aiter_pkts = self._iter_packets()
|
||||
self._send_lock = trio.StrictFIFOLock()
|
||||
|
||||
# public i guess?
|
||||
self.drained: list[dict] = []
|
||||
|
||||
self.recv_stream = BufferedReceiveStream(
|
||||
transport_stream=stream
|
||||
)
|
||||
self.prefix_size = prefix_size
|
||||
|
||||
# allow for custom IPC msg interchange format
|
||||
# dynamic override Bo
|
||||
self._task = trio.lowlevel.current_task()
|
||||
|
||||
# XXX for ctxvar debug only!
|
||||
# self._codec: MsgCodec = (
|
||||
# codec
|
||||
# or
|
||||
# _codec._ctxvar_MsgCodec.get()
|
||||
# )
|
||||
|
||||
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
|
||||
'''
|
||||
Yield `bytes`-blob decoded packets from the underlying TCP
|
||||
stream using the current task's `MsgCodec`.
|
||||
|
||||
This is a streaming routine implemented as an async generator
|
||||
func (which was the original design, but could be changed?)
|
||||
and is allocated by a `.__call__()` inside `.__init__()` where
|
||||
it is assigned to the `._aiter_pkts` attr.
|
||||
|
||||
'''
|
||||
decodes_failed: int = 0
|
||||
|
||||
while True:
|
||||
try:
|
||||
header: bytes = await self.recv_stream.receive_exactly(4)
|
||||
except (
|
||||
ValueError,
|
||||
ConnectionResetError,
|
||||
|
||||
# not sure entirely why we need this but without it we
|
||||
# seem to be getting racy failures here on
|
||||
# arbiter/registry name subs..
|
||||
trio.BrokenResourceError,
|
||||
|
||||
) as trans_err:
|
||||
|
||||
loglevel = 'transport'
|
||||
match trans_err:
|
||||
# case (
|
||||
# ConnectionResetError()
|
||||
# ):
|
||||
# loglevel = 'transport'
|
||||
|
||||
# peer actor (graceful??) TCP EOF but `tricycle`
|
||||
# seems to raise a 0-bytes-read?
|
||||
case ValueError() if (
|
||||
'unclean EOF' in trans_err.args[0]
|
||||
):
|
||||
pass
|
||||
|
||||
# peer actor (task) prolly shutdown quickly due
|
||||
# to cancellation
|
||||
case trio.BrokenResourceError() if (
|
||||
'Connection reset by peer' in trans_err.args[0]
|
||||
):
|
||||
pass
|
||||
|
||||
# unless the disconnect condition falls under "a
|
||||
# normal operation breakage" we usualy console warn
|
||||
# about it.
|
||||
case _:
|
||||
loglevel: str = 'warning'
|
||||
|
||||
|
||||
raise TransportClosed(
|
||||
message=(
|
||||
f'IPC transport already closed by peer\n'
|
||||
f'x)> {type(trans_err)}\n'
|
||||
f' |_{self}\n'
|
||||
),
|
||||
loglevel=loglevel,
|
||||
) from trans_err
|
||||
|
||||
# XXX definitely can happen if transport is closed
|
||||
# manually by another `trio.lowlevel.Task` in the
|
||||
# same actor; we use this in some simulated fault
|
||||
# testing for ex, but generally should never happen
|
||||
# under normal operation!
|
||||
#
|
||||
# NOTE: as such we always re-raise this error from the
|
||||
# RPC msg loop!
|
||||
except trio.ClosedResourceError as closure_err:
|
||||
raise TransportClosed(
|
||||
message=(
|
||||
f'IPC transport already manually closed locally?\n'
|
||||
f'x)> {type(closure_err)} \n'
|
||||
f' |_{self}\n'
|
||||
),
|
||||
loglevel='error',
|
||||
raise_on_report=(
|
||||
closure_err.args[0] == 'another task closed this fd'
|
||||
or
|
||||
closure_err.args[0] in ['another task closed this fd']
|
||||
),
|
||||
) from closure_err
|
||||
|
||||
# graceful TCP EOF disconnect
|
||||
if header == b'':
|
||||
raise TransportClosed(
|
||||
message=(
|
||||
f'IPC transport already gracefully closed\n'
|
||||
f')>\n'
|
||||
f'|_{self}\n'
|
||||
),
|
||||
loglevel='transport',
|
||||
# cause=??? # handy or no?
|
||||
)
|
||||
|
||||
size: int
|
||||
size, = struct.unpack("<I", header)
|
||||
|
||||
log.transport(f'received header {size}') # type: ignore
|
||||
msg_bytes: bytes = await self.recv_stream.receive_exactly(size)
|
||||
|
||||
log.transport(f"received {msg_bytes}") # type: ignore
|
||||
try:
|
||||
# NOTE: lookup the `trio.Task.context`'s var for
|
||||
# the current `MsgCodec`.
|
||||
codec: MsgCodec = _ctxvar_MsgCodec.get()
|
||||
|
||||
# XXX for ctxvar debug only!
|
||||
# if self._codec.pld_spec != codec.pld_spec:
|
||||
# assert (
|
||||
# task := trio.lowlevel.current_task()
|
||||
# ) is not self._task
|
||||
# self._task = task
|
||||
# self._codec = codec
|
||||
# log.runtime(
|
||||
# f'Using new codec in {self}.recv()\n'
|
||||
# f'codec: {self._codec}\n\n'
|
||||
# f'msg_bytes: {msg_bytes}\n'
|
||||
# )
|
||||
yield codec.decode(msg_bytes)
|
||||
|
||||
# XXX NOTE: since the below error derives from
|
||||
# `DecodeError` we need to catch is specially
|
||||
# and always raise such that spec violations
|
||||
# are never allowed to be caught silently!
|
||||
except msgspec.ValidationError as verr:
|
||||
msgtyperr: MsgTypeError = _mk_recv_mte(
|
||||
msg=msg_bytes,
|
||||
codec=codec,
|
||||
src_validation_error=verr,
|
||||
)
|
||||
# XXX deliver up to `Channel.recv()` where
|
||||
# a re-raise and `Error`-pack can inject the far
|
||||
# end actor `.uid`.
|
||||
yield msgtyperr
|
||||
|
||||
except (
|
||||
msgspec.DecodeError,
|
||||
UnicodeDecodeError,
|
||||
):
|
||||
if decodes_failed < 4:
|
||||
# ignore decoding errors for now and assume they have to
|
||||
# do with a channel drop - hope that receiving from the
|
||||
# channel will raise an expected error and bubble up.
|
||||
try:
|
||||
msg_str: str|bytes = msg_bytes.decode()
|
||||
except UnicodeDecodeError:
|
||||
msg_str = msg_bytes
|
||||
|
||||
log.exception(
|
||||
'Failed to decode msg?\n'
|
||||
f'{codec}\n\n'
|
||||
'Rxed bytes from wire:\n\n'
|
||||
f'{msg_str!r}\n'
|
||||
)
|
||||
decodes_failed += 1
|
||||
else:
|
||||
raise
|
||||
|
||||
async def send(
|
||||
self,
|
||||
msg: msgtypes.MsgType,
|
||||
|
||||
strict_types: bool = True,
|
||||
hide_tb: bool = False,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Send a msgpack encoded py-object-blob-as-msg over TCP.
|
||||
|
||||
If `strict_types == True` then a `MsgTypeError` will be raised on any
|
||||
invalid msg type
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
# XXX see `trio._sync.AsyncContextManagerMixin` for details
|
||||
# on the `.acquire()`/`.release()` sequencing..
|
||||
async with self._send_lock:
|
||||
|
||||
# NOTE: lookup the `trio.Task.context`'s var for
|
||||
# the current `MsgCodec`.
|
||||
codec: MsgCodec = _ctxvar_MsgCodec.get()
|
||||
|
||||
# XXX for ctxvar debug only!
|
||||
# if self._codec.pld_spec != codec.pld_spec:
|
||||
# self._codec = codec
|
||||
# log.runtime(
|
||||
# f'Using new codec in {self}.send()\n'
|
||||
# f'codec: {self._codec}\n\n'
|
||||
# f'msg: {msg}\n'
|
||||
# )
|
||||
|
||||
if type(msg) not in msgtypes.__msg_types__:
|
||||
if strict_types:
|
||||
raise _mk_send_mte(
|
||||
msg,
|
||||
codec=codec,
|
||||
)
|
||||
else:
|
||||
log.warning(
|
||||
'Sending non-`Msg`-spec msg?\n\n'
|
||||
f'{msg}\n'
|
||||
)
|
||||
|
||||
try:
|
||||
bytes_data: bytes = codec.encode(msg)
|
||||
except TypeError as _err:
|
||||
typerr = _err
|
||||
msgtyperr: MsgTypeError = _mk_send_mte(
|
||||
msg,
|
||||
codec=codec,
|
||||
message=(
|
||||
f'IPC-msg-spec violation in\n\n'
|
||||
f'{pretty_struct.Struct.pformat(msg)}'
|
||||
),
|
||||
src_type_error=typerr,
|
||||
)
|
||||
raise msgtyperr from typerr
|
||||
|
||||
# supposedly the fastest says,
|
||||
# https://stackoverflow.com/a/54027962
|
||||
size: bytes = struct.pack("<I", len(bytes_data))
|
||||
return await self.stream.send_all(size + bytes_data)
|
||||
|
||||
# ?TODO? does it help ever to dynamically show this
|
||||
# frame?
|
||||
# try:
|
||||
# <the-above_code>
|
||||
# except BaseException as _err:
|
||||
# err = _err
|
||||
# if not isinstance(err, MsgTypeError):
|
||||
# __tracebackhide__: bool = False
|
||||
# raise
|
||||
|
||||
async def recv(self) -> msgtypes.MsgType:
|
||||
return await self._aiter_pkts.asend(None)
|
||||
|
||||
async def drain(self) -> AsyncIterator[dict]:
|
||||
'''
|
||||
Drain the stream's remaining messages sent from
|
||||
the far end until the connection is closed by
|
||||
the peer.
|
||||
|
||||
'''
|
||||
try:
|
||||
async for msg in self._iter_packets():
|
||||
self.drained.append(msg)
|
||||
except TransportClosed:
|
||||
for msg in self.drained:
|
||||
yield msg
|
||||
|
||||
def __aiter__(self):
|
||||
return self._aiter_pkts
|
||||
|
||||
@property
|
||||
def laddr(self) -> Address:
|
||||
return self._laddr
|
||||
|
||||
@property
|
||||
def raddr(self) -> Address:
|
||||
return self._raddr
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
from typing import Type
|
||||
|
||||
import trio
|
||||
import socket
|
||||
|
||||
from tractor._addr import Address
|
||||
from tractor.ipc._transport import (
|
||||
MsgTransportKey,
|
||||
MsgTransport
|
||||
)
|
||||
from tractor.ipc._tcp import MsgpackTCPStream
|
||||
from tractor.ipc._uds import MsgpackUDSStream
|
||||
|
||||
|
||||
# manually updated list of all supported msg transport types
|
||||
_msg_transports = [
|
||||
MsgpackTCPStream,
|
||||
MsgpackUDSStream
|
||||
]
|
||||
|
||||
|
||||
# convert a MsgTransportKey to the corresponding transport type
|
||||
_key_to_transport: dict[MsgTransportKey, Type[MsgTransport]] = {
|
||||
cls.key(): cls
|
||||
for cls in _msg_transports
|
||||
}
|
||||
|
||||
# convert an Address wrapper to its corresponding transport type
|
||||
_addr_to_transport: dict[Type[Address], Type[MsgTransport]] = {
|
||||
cls.address_type: cls
|
||||
for cls in _msg_transports
|
||||
}
|
||||
|
||||
|
||||
def transport_from_addr(
|
||||
addr: Address,
|
||||
codec_key: str = 'msgpack',
|
||||
) -> Type[MsgTransport]:
|
||||
'''
|
||||
Given a destination address and a desired codec, find the
|
||||
corresponding `MsgTransport` type.
|
||||
|
||||
'''
|
||||
try:
|
||||
return _addr_to_transport[type(addr)]
|
||||
|
||||
except KeyError:
|
||||
raise NotImplementedError(
|
||||
f'No known transport for address {repr(addr)}'
|
||||
)
|
||||
|
||||
|
||||
def transport_from_stream(
|
||||
stream: trio.abc.Stream,
|
||||
codec_key: str = 'msgpack'
|
||||
) -> Type[MsgTransport]:
|
||||
'''
|
||||
Given an arbitrary `trio.abc.Stream` and a desired codec,
|
||||
find the corresponding `MsgTransport` type.
|
||||
|
||||
'''
|
||||
transport = None
|
||||
if isinstance(stream, trio.SocketStream):
|
||||
sock = stream.socket
|
||||
match sock.family:
|
||||
case socket.AF_INET | socket.AF_INET6:
|
||||
transport = 'tcp'
|
||||
|
||||
case socket.AF_UNIX:
|
||||
transport = 'uds'
|
||||
|
||||
case _:
|
||||
raise NotImplementedError(
|
||||
f'Unsupported socket family: {sock.family}'
|
||||
)
|
||||
|
||||
if not transport:
|
||||
raise NotImplementedError(
|
||||
f'Could not figure out transport type for stream type {type(stream)}'
|
||||
)
|
||||
|
||||
key = (codec_key, transport)
|
||||
|
||||
return _key_to_transport[key]
|
|
@ -0,0 +1,97 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
'''
|
||||
Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protocol
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
|
||||
import trio
|
||||
|
||||
from tractor.msg import MsgCodec
|
||||
from tractor.log import get_logger
|
||||
from tractor._addr import UDSAddress
|
||||
from tractor.ipc._transport import MsgpackTransport
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
class MsgpackUDSStream(MsgpackTransport):
|
||||
'''
|
||||
A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
||||
using the ``msgspec`` codec lib.
|
||||
|
||||
'''
|
||||
address_type = UDSAddress
|
||||
layer_key: int = 7
|
||||
|
||||
# def __init__(
|
||||
# self,
|
||||
# stream: trio.SocketStream,
|
||||
# prefix_size: int = 4,
|
||||
# codec: CodecType = None,
|
||||
|
||||
# ) -> None:
|
||||
# super().__init__(
|
||||
# stream,
|
||||
# prefix_size=prefix_size,
|
||||
# codec=codec
|
||||
# )
|
||||
|
||||
@property
|
||||
def maddr(self) -> str:
|
||||
filepath = self.raddr.unwrap()
|
||||
return (
|
||||
f'/ipv4/localhost'
|
||||
f'/{self.address_type.name_key}/{filepath}'
|
||||
# f'/{self.chan.uid[0]}'
|
||||
# f'/{self.cid}'
|
||||
|
||||
# f'/cid={cid_head}..{cid_tail}'
|
||||
# TODO: ? not use this ^ right ?
|
||||
)
|
||||
|
||||
def connected(self) -> bool:
|
||||
return self.stream.socket.fileno() != -1
|
||||
|
||||
@classmethod
|
||||
async def connect_to(
|
||||
cls,
|
||||
addr: UDSAddress,
|
||||
prefix_size: int = 4,
|
||||
codec: MsgCodec|None = None,
|
||||
**kwargs
|
||||
) -> MsgpackUDSStream:
|
||||
stream = await trio.open_unix_socket(
|
||||
addr.unwrap(),
|
||||
**kwargs
|
||||
)
|
||||
return MsgpackUDSStream(
|
||||
stream,
|
||||
prefix_size=prefix_size,
|
||||
codec=codec
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def get_stream_addrs(
|
||||
cls,
|
||||
stream: trio.SocketStream
|
||||
) -> tuple[UDSAddress, UDSAddress]:
|
||||
return (
|
||||
UDSAddress.from_addr(stream.socket.getsockname()),
|
||||
UDSAddress.from_addr(stream.socket.getsockname()),
|
||||
)
|
|
@ -47,6 +47,7 @@ from tractor.msg import (
|
|||
pretty_struct,
|
||||
)
|
||||
from tractor.log import get_logger
|
||||
from tractor._addr import AddressTypes
|
||||
|
||||
|
||||
log = get_logger('tractor.msgspec')
|
||||
|
@ -167,8 +168,8 @@ class SpawnSpec(
|
|||
|
||||
# TODO: not just sockaddr pairs?
|
||||
# -[ ] abstract into a `TransportAddr` type?
|
||||
reg_addrs: list[tuple[str, int]]
|
||||
bind_addrs: list[tuple[str, int]]
|
||||
reg_addrs: list[AddressTypes]
|
||||
bind_addrs: list[AddressTypes]
|
||||
|
||||
|
||||
# TODO: caps based RPC support in the payload?
|
||||
|
|
Loading…
Reference in New Issue