Compare commits
7 Commits
dc68ea4118
...
a528d45a30
Author | SHA1 | Date |
---|---|---|
|
a528d45a30 | |
|
c2705cce68 | |
|
eeed5fd7f1 | |
|
dd3e918cfe | |
|
35acc5a3d5 | |
|
9f837161ea | |
|
3091316b0a |
150
tractor/_addr.py
150
tractor/_addr.py
|
@ -217,7 +217,14 @@ class TCPAddress(Address):
|
||||||
cls,
|
cls,
|
||||||
addr: tuple[str, int]
|
addr: tuple[str, int]
|
||||||
) -> TCPAddress:
|
) -> TCPAddress:
|
||||||
return TCPAddress(addr[0], addr[1])
|
match addr:
|
||||||
|
case (str(), int()):
|
||||||
|
return TCPAddress(addr[0], addr[1])
|
||||||
|
case _:
|
||||||
|
raise ValueError(
|
||||||
|
f'Invalid unwrapped address for {cls}\n'
|
||||||
|
f'{addr}\n'
|
||||||
|
)
|
||||||
|
|
||||||
def unwrap(self) -> tuple[str, int]:
|
def unwrap(self) -> tuple[str, int]:
|
||||||
return (
|
return (
|
||||||
|
@ -228,7 +235,6 @@ class TCPAddress(Address):
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_random(
|
def get_random(
|
||||||
cls,
|
cls,
|
||||||
current_actor: Actor,
|
|
||||||
bindspace: str = def_bindspace,
|
bindspace: str = def_bindspace,
|
||||||
) -> TCPAddress:
|
) -> TCPAddress:
|
||||||
return TCPAddress(bindspace, 0)
|
return TCPAddress(bindspace, 0)
|
||||||
|
@ -275,6 +281,15 @@ class TCPAddress(Address):
|
||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
|
def unwrap_sockpath(
|
||||||
|
sockpath: Path,
|
||||||
|
) -> tuple[Path, Path]:
|
||||||
|
return (
|
||||||
|
sockpath.parent,
|
||||||
|
sockpath.name,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class UDSAddress(Address):
|
class UDSAddress(Address):
|
||||||
# TODO, maybe we should use better field and value
|
# TODO, maybe we should use better field and value
|
||||||
# -[x] really this is a `.protocol_key` not a "name" of anything.
|
# -[x] really this is a `.protocol_key` not a "name" of anything.
|
||||||
|
@ -287,23 +302,36 @@ class UDSAddress(Address):
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
filepath: str|Path,
|
filedir: Path|str|None,
|
||||||
maybe_pid: int,
|
# TODO, i think i want `.filename` here?
|
||||||
# ^XXX, in the sense you can also pass
|
filename: str|Path,
|
||||||
|
|
||||||
|
# XXX, in the sense you can also pass
|
||||||
# a "non-real-world-process-id" such as is handy to represent
|
# a "non-real-world-process-id" such as is handy to represent
|
||||||
# our host-local default "port-like" key for the very first
|
# our host-local default "port-like" key for the very first
|
||||||
# root actor to create a registry address.
|
# root actor to create a registry address.
|
||||||
|
maybe_pid: int|None = None,
|
||||||
):
|
):
|
||||||
self._filepath: Path = Path(filepath).absolute()
|
fdir = self._filedir = Path(filedir or self.def_bindspace).absolute()
|
||||||
|
fpath = self._filepath = Path(filename)
|
||||||
|
fp: Path = fdir / fpath
|
||||||
|
assert fp.is_absolute()
|
||||||
|
|
||||||
|
# to track which "side" is the peer process by reading socket
|
||||||
|
# credentials-info.
|
||||||
self._pid: int = maybe_pid
|
self._pid: int = maybe_pid
|
||||||
|
|
||||||
|
@property
|
||||||
|
def sockpath(self) -> Path:
|
||||||
|
return self._filedir / self._filepath
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_valid(self) -> bool:
|
def is_valid(self) -> bool:
|
||||||
'''
|
'''
|
||||||
We block socket files not allocated under the runtime subdir.
|
We block socket files not allocated under the runtime subdir.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self.bindspace in self._filepath.parents
|
return self.bindspace in self.sockpath.parents
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def bindspace(self) -> Path:
|
def bindspace(self) -> Path:
|
||||||
|
@ -312,23 +340,43 @@ class UDSAddress(Address):
|
||||||
just the sub-directory in which we allocate socket files.
|
just the sub-directory in which we allocate socket files.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self.def_bindspace
|
return self._filedir or self.def_bindspace
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_addr(
|
def from_addr(
|
||||||
cls,
|
cls,
|
||||||
addr: tuple[Path, int]
|
addr: (
|
||||||
|
tuple[Path|str|None, int]
|
||||||
|
|Path|str
|
||||||
|
),
|
||||||
) -> UDSAddress:
|
) -> UDSAddress:
|
||||||
return UDSAddress(
|
match addr:
|
||||||
filepath=addr[0],
|
case tuple()|list():
|
||||||
maybe_pid=addr[1],
|
sockpath: Path = Path(addr[0])
|
||||||
)
|
filedir, filename = unwrap_sockpath(sockpath)
|
||||||
|
pid: int = addr[1]
|
||||||
|
return UDSAddress(
|
||||||
|
filedir=filedir,
|
||||||
|
filename=filename,
|
||||||
|
maybe_pid=pid,
|
||||||
|
)
|
||||||
|
# NOTE, in case we ever decide to just `.unwrap()`
|
||||||
|
# to a `Path|str`?
|
||||||
|
case str()|Path():
|
||||||
|
sockpath: Path = Path(addr)
|
||||||
|
return UDSAddress(*unwrap_sockpath(sockpath))
|
||||||
|
case _:
|
||||||
|
# import pdbp; pdbp.set_trace()
|
||||||
|
raise TypeError(
|
||||||
|
f'Bad unwrapped-address for {cls} !\n'
|
||||||
|
f'{addr!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
def unwrap(self) -> tuple[Path, int]:
|
def unwrap(self) -> tuple[str, int]:
|
||||||
|
# XXX NOTE, since this gets passed DIRECTLY to
|
||||||
|
# `.ipc._uds.open_unix_socket_w_passcred()`
|
||||||
return (
|
return (
|
||||||
str(self._filepath),
|
str(self.sockpath),
|
||||||
# XXX NOTE, since this gets passed DIRECTLY to
|
|
||||||
# `open_unix_socket_w_passcred()` above!
|
|
||||||
self._pid,
|
self._pid,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -338,7 +386,7 @@ class UDSAddress(Address):
|
||||||
bindspace: Path|None = None, # default netns
|
bindspace: Path|None = None, # default netns
|
||||||
) -> UDSAddress:
|
) -> UDSAddress:
|
||||||
|
|
||||||
bs: Path = bindspace or get_rt_dir()
|
filedir: Path = bindspace or cls.def_bindspace
|
||||||
pid: int = os.getpid()
|
pid: int = os.getpid()
|
||||||
actor: Actor|None = current_actor(
|
actor: Actor|None = current_actor(
|
||||||
err_on_no_runtime=False,
|
err_on_no_runtime=False,
|
||||||
|
@ -351,30 +399,27 @@ class UDSAddress(Address):
|
||||||
prefix: str = 'root'
|
prefix: str = 'root'
|
||||||
sockname: str = f'{prefix}@{pid}'
|
sockname: str = f'{prefix}@{pid}'
|
||||||
|
|
||||||
sockpath: Path = Path(f'{bs}/{sockname}.sock')
|
sockpath: Path = Path(f'{sockname}.sock')
|
||||||
return UDSAddress(
|
return UDSAddress(
|
||||||
# filename=f'{tempfile.gettempdir()}/{uuid4()}.sock'
|
filedir=filedir,
|
||||||
filepath=sockpath,
|
filename=sockpath,
|
||||||
maybe_pid=pid,
|
maybe_pid=pid,
|
||||||
)
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_root(cls) -> Address:
|
def get_root(cls) -> Address:
|
||||||
def_uds_filepath: Path = (
|
def_uds_filepath: Path = 'registry@1616.sock'
|
||||||
get_rt_dir()
|
|
||||||
/
|
|
||||||
'registry@1616.sock'
|
|
||||||
)
|
|
||||||
return UDSAddress(
|
return UDSAddress(
|
||||||
filepath=def_uds_filepath,
|
filedir=None,
|
||||||
maybe_pid=1616
|
filename=def_uds_filepath,
|
||||||
|
maybe_pid=1616,
|
||||||
)
|
)
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
return (
|
return (
|
||||||
f'{type(self).__name__}'
|
f'{type(self).__name__}'
|
||||||
f'['
|
f'['
|
||||||
f'({self._filepath}, {self._pid})'
|
f'({self.sockpath}, {self._pid})'
|
||||||
f']'
|
f']'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -391,7 +436,7 @@ class UDSAddress(Address):
|
||||||
self,
|
self,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) -> SocketListener:
|
) -> SocketListener:
|
||||||
self._sock = socket.socket(
|
sock = self._sock = socket.socket(
|
||||||
socket.AF_UNIX,
|
socket.AF_UNIX,
|
||||||
socket.SOCK_STREAM
|
socket.SOCK_STREAM
|
||||||
)
|
)
|
||||||
|
@ -400,8 +445,10 @@ class UDSAddress(Address):
|
||||||
f'>[\n'
|
f'>[\n'
|
||||||
f'|_{self}\n'
|
f'|_{self}\n'
|
||||||
)
|
)
|
||||||
await self._sock.bind(self._filepath)
|
|
||||||
self._sock.listen(1)
|
bindpath: Path = self.sockpath
|
||||||
|
await sock.bind(str(bindpath))
|
||||||
|
sock.listen(1)
|
||||||
log.info(
|
log.info(
|
||||||
f'Listening on UDS socket\n'
|
f'Listening on UDS socket\n'
|
||||||
f'[>\n'
|
f'[>\n'
|
||||||
|
@ -411,7 +458,7 @@ class UDSAddress(Address):
|
||||||
|
|
||||||
def close_listener(self):
|
def close_listener(self):
|
||||||
self._sock.close()
|
self._sock.close()
|
||||||
os.unlink(self._filepath)
|
os.unlink(self.sockpath)
|
||||||
|
|
||||||
|
|
||||||
preferred_transport: str = 'uds'
|
preferred_transport: str = 'uds'
|
||||||
|
@ -455,26 +502,55 @@ def mk_uuid() -> str:
|
||||||
def wrap_address(
|
def wrap_address(
|
||||||
addr: UnwrappedAddress
|
addr: UnwrappedAddress
|
||||||
) -> Address:
|
) -> Address:
|
||||||
|
'''
|
||||||
|
Wrap an `UnwrappedAddress` as an `Address`-type based
|
||||||
|
on matching builtin python data-structures which we adhoc
|
||||||
|
use for each.
|
||||||
|
|
||||||
|
XXX NOTE, careful care must be placed to ensure
|
||||||
|
`UnwrappedAddress` cases are **definitely unique** otherwise the
|
||||||
|
wrong transport backend may be loaded and will break many
|
||||||
|
low-level things in our runtime in a not-fun-to-debug way!
|
||||||
|
|
||||||
|
XD
|
||||||
|
|
||||||
|
'''
|
||||||
if is_wrapped_addr(addr):
|
if is_wrapped_addr(addr):
|
||||||
return addr
|
return addr
|
||||||
|
|
||||||
cls: Type|None = None
|
cls: Type|None = None
|
||||||
|
# if 'sock' in addr[0]:
|
||||||
|
# import pdbp; pdbp.set_trace()
|
||||||
match addr:
|
match addr:
|
||||||
case (
|
# TODO! BUT THIS WILL MATCH FOR TCP !...
|
||||||
|
# -[ ] so prolly go back to what guille had orig XD
|
||||||
|
# a plain ol' `str`?
|
||||||
|
case ((
|
||||||
str()|Path(),
|
str()|Path(),
|
||||||
int(),
|
int(),
|
||||||
):
|
)):
|
||||||
cls = UDSAddress
|
cls = UDSAddress
|
||||||
|
|
||||||
case tuple() | list():
|
# classic network socket-address as tuple/list
|
||||||
|
case (
|
||||||
|
(str(), int())
|
||||||
|
|
|
||||||
|
[str(), int()]
|
||||||
|
):
|
||||||
cls = TCPAddress
|
cls = TCPAddress
|
||||||
|
|
||||||
case None:
|
# likely an unset UDS or TCP reg address as defaulted in
|
||||||
|
# `_state._runtime_vars['_root_mailbox']`
|
||||||
|
case (
|
||||||
|
None
|
||||||
|
|
|
||||||
|
[None, None]
|
||||||
|
):
|
||||||
cls: Type[Address] = get_address_cls(preferred_transport)
|
cls: Type[Address] = get_address_cls(preferred_transport)
|
||||||
addr: UnwrappedAddress = cls.get_root().unwrap()
|
addr: UnwrappedAddress = cls.get_root().unwrap()
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
|
# import pdbp; pdbp.set_trace()
|
||||||
raise TypeError(
|
raise TypeError(
|
||||||
f'Can not wrap address {type(addr)}\n'
|
f'Can not wrap address {type(addr)}\n'
|
||||||
f'{addr!r}\n'
|
f'{addr!r}\n'
|
||||||
|
|
|
@ -366,7 +366,7 @@ class Context:
|
||||||
# f' ---\n'
|
# f' ---\n'
|
||||||
f' |_ipc: {self.dst_maddr}\n'
|
f' |_ipc: {self.dst_maddr}\n'
|
||||||
# f' dst_maddr{ds}{self.dst_maddr}\n'
|
# f' dst_maddr{ds}{self.dst_maddr}\n'
|
||||||
f" uid{ds}'{self.chan.uid}'\n"
|
f" uid{ds}'{self.chan.aid}'\n"
|
||||||
f" cid{ds}'{self.cid}'\n"
|
f" cid{ds}'{self.cid}'\n"
|
||||||
# f' ---\n'
|
# f' ---\n'
|
||||||
f'\n'
|
f'\n'
|
||||||
|
@ -945,10 +945,10 @@ class Context:
|
||||||
reminfo: str = (
|
reminfo: str = (
|
||||||
# ' =>\n'
|
# ' =>\n'
|
||||||
# f'Context.cancel() => {self.chan.uid}\n'
|
# f'Context.cancel() => {self.chan.uid}\n'
|
||||||
|
f'\n'
|
||||||
f'c)=> {self.chan.uid}\n'
|
f'c)=> {self.chan.uid}\n'
|
||||||
# f'{self.chan.uid}\n'
|
f' |_[{self.dst_maddr}\n'
|
||||||
f' |_ @{self.dst_maddr}\n'
|
f' >>{self.repr_rpc}\n'
|
||||||
f' >> {self.repr_rpc}\n'
|
|
||||||
# f' >> {self._nsf}() -> {codec}[dict]:\n\n'
|
# f' >> {self._nsf}() -> {codec}[dict]:\n\n'
|
||||||
# TODO: pull msg-type from spec re #320
|
# TODO: pull msg-type from spec re #320
|
||||||
)
|
)
|
||||||
|
|
|
@ -126,6 +126,12 @@ class TrioTaskExited(Exception):
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
|
||||||
|
class DebugRequestError(RuntimeError):
|
||||||
|
'''
|
||||||
|
Failed to request stdio lock from root actor!
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
# NOTE: more or less should be close to these:
|
# NOTE: more or less should be close to these:
|
||||||
# 'boxed_type',
|
# 'boxed_type',
|
||||||
# 'src_type',
|
# 'src_type',
|
||||||
|
@ -191,6 +197,8 @@ def get_err_type(type_name: str) -> BaseException|None:
|
||||||
):
|
):
|
||||||
return type_ref
|
return type_ref
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def pack_from_raise(
|
def pack_from_raise(
|
||||||
local_err: (
|
local_err: (
|
||||||
|
@ -1009,7 +1017,10 @@ class TransportClosed(trio.BrokenResourceError):
|
||||||
f' {cause}\n' # exc repr
|
f' {cause}\n' # exc repr
|
||||||
)
|
)
|
||||||
|
|
||||||
getattr(log, self._loglevel)(message)
|
getattr(
|
||||||
|
log,
|
||||||
|
self._loglevel
|
||||||
|
)(message)
|
||||||
|
|
||||||
# some errors we want to blow up from
|
# some errors we want to blow up from
|
||||||
# inside the RPC msg loop
|
# inside the RPC msg loop
|
||||||
|
|
|
@ -175,7 +175,7 @@ class Portal:
|
||||||
# not expecting a "main" result
|
# not expecting a "main" result
|
||||||
if self._expect_result_ctx is None:
|
if self._expect_result_ctx is None:
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Portal for {self.channel.uid} not expecting a final"
|
f"Portal for {self.channel.aid} not expecting a final"
|
||||||
" result?\nresult() should only be called if subactor"
|
" result?\nresult() should only be called if subactor"
|
||||||
" was spawned with `ActorNursery.run_in_actor()`")
|
" was spawned with `ActorNursery.run_in_actor()`")
|
||||||
return NoResult
|
return NoResult
|
||||||
|
@ -222,7 +222,7 @@ class Portal:
|
||||||
# IPC calls
|
# IPC calls
|
||||||
if self._streams:
|
if self._streams:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"Cancelling all streams with {self.channel.uid}")
|
f"Cancelling all streams with {self.channel.aid}")
|
||||||
for stream in self._streams.copy():
|
for stream in self._streams.copy():
|
||||||
try:
|
try:
|
||||||
await stream.aclose()
|
await stream.aclose()
|
||||||
|
@ -267,7 +267,7 @@ class Portal:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
reminfo: str = (
|
reminfo: str = (
|
||||||
f'c)=> {self.channel.uid}\n'
|
f'c)=> {self.channel.aid}\n'
|
||||||
f' |_{chan}\n'
|
f' |_{chan}\n'
|
||||||
)
|
)
|
||||||
log.cancel(
|
log.cancel(
|
||||||
|
@ -310,7 +310,7 @@ class Portal:
|
||||||
):
|
):
|
||||||
log.debug(
|
log.debug(
|
||||||
'IPC chan for actor already closed or broken?\n\n'
|
'IPC chan for actor already closed or broken?\n\n'
|
||||||
f'{self.channel.uid}\n'
|
f'{self.channel.aid}\n'
|
||||||
f' |_{self.channel}\n'
|
f' |_{self.channel}\n'
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
|
@ -551,8 +551,10 @@ async def open_portal(
|
||||||
await channel.connect()
|
await channel.connect()
|
||||||
was_connected = True
|
was_connected = True
|
||||||
|
|
||||||
if channel.uid is None:
|
if channel.aid is None:
|
||||||
await actor._do_handshake(channel)
|
await channel._do_handshake(
|
||||||
|
aid=actor.aid,
|
||||||
|
)
|
||||||
|
|
||||||
msg_loop_cs: trio.CancelScope|None = None
|
msg_loop_cs: trio.CancelScope|None = None
|
||||||
if start_msg_loop:
|
if start_msg_loop:
|
||||||
|
|
|
@ -1219,8 +1219,10 @@ async def process_messages(
|
||||||
# -[ ] figure out how this will break with other transports?
|
# -[ ] figure out how this will break with other transports?
|
||||||
tc.report_n_maybe_raise(
|
tc.report_n_maybe_raise(
|
||||||
message=(
|
message=(
|
||||||
f'peer IPC channel closed abruptly?\n\n'
|
f'peer IPC channel closed abruptly?\n'
|
||||||
f'<=x {chan}\n'
|
f'\n'
|
||||||
|
f'<=x[\n'
|
||||||
|
f' {chan}\n'
|
||||||
f' |_{chan.raddr}\n\n'
|
f' |_{chan.raddr}\n\n'
|
||||||
)
|
)
|
||||||
+
|
+
|
||||||
|
|
|
@ -289,7 +289,9 @@ class Actor:
|
||||||
@property
|
@property
|
||||||
def aid(self) -> msgtypes.Aid:
|
def aid(self) -> msgtypes.Aid:
|
||||||
'''
|
'''
|
||||||
This process-singleton-actor's "unique ID" in struct form.
|
This process-singleton-actor's "unique actor ID" in struct form.
|
||||||
|
|
||||||
|
See the `tractor.msg.Aid` struct for details.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self._aid
|
return self._aid
|
||||||
|
@ -308,6 +310,17 @@ class Actor:
|
||||||
process plane.
|
process plane.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
msg: str = (
|
||||||
|
f'`{type(self).__name__}.uid` is now deprecated.\n'
|
||||||
|
'Use the new `.aid: tractor.msg.Aid` (struct) instead '
|
||||||
|
'which also provides additional named (optional) fields '
|
||||||
|
'beyond just the `.name` and `.uuid`.'
|
||||||
|
)
|
||||||
|
warnings.warn(
|
||||||
|
msg,
|
||||||
|
DeprecationWarning,
|
||||||
|
stacklevel=2,
|
||||||
|
)
|
||||||
return (
|
return (
|
||||||
self._aid.name,
|
self._aid.name,
|
||||||
self._aid.uuid,
|
self._aid.uuid,
|
||||||
|
@ -495,7 +508,9 @@ class Actor:
|
||||||
|
|
||||||
# send/receive initial handshake response
|
# send/receive initial handshake response
|
||||||
try:
|
try:
|
||||||
uid: tuple|None = await self._do_handshake(chan)
|
peer_aid: msgtypes.Aid = await chan._do_handshake(
|
||||||
|
aid=self.aid,
|
||||||
|
)
|
||||||
except (
|
except (
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
# ^XXX NOTE, the above wraps `trio` exc types raised
|
# ^XXX NOTE, the above wraps `trio` exc types raised
|
||||||
|
@ -524,6 +539,12 @@ class Actor:
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
uid: tuple[str, str] = (
|
||||||
|
peer_aid.name,
|
||||||
|
peer_aid.uuid,
|
||||||
|
)
|
||||||
|
# TODO, can we make this downstream peer tracking use the
|
||||||
|
# `peer_aid` instead?
|
||||||
familiar: str = 'new-peer'
|
familiar: str = 'new-peer'
|
||||||
if _pre_chan := self._peers.get(uid):
|
if _pre_chan := self._peers.get(uid):
|
||||||
familiar: str = 'pre-existing-peer'
|
familiar: str = 'pre-existing-peer'
|
||||||
|
@ -1127,9 +1148,8 @@ class Actor:
|
||||||
)
|
)
|
||||||
assert isinstance(chan, Channel)
|
assert isinstance(chan, Channel)
|
||||||
|
|
||||||
# TODO: move this into a `Channel.handshake()`?
|
|
||||||
# Initial handshake: swap names.
|
# Initial handshake: swap names.
|
||||||
await self._do_handshake(chan)
|
await chan._do_handshake(aid=self.aid)
|
||||||
|
|
||||||
accept_addrs: list[UnwrappedAddress]|None = None
|
accept_addrs: list[UnwrappedAddress]|None = None
|
||||||
|
|
||||||
|
@ -1270,11 +1290,16 @@ class Actor:
|
||||||
# -[ ] need to extend the `SpawnSpec` tho!
|
# -[ ] need to extend the `SpawnSpec` tho!
|
||||||
)
|
)
|
||||||
|
|
||||||
except OSError: # failed to connect
|
# failed to connect back?
|
||||||
|
except (
|
||||||
|
OSError,
|
||||||
|
ConnectionError,
|
||||||
|
):
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Failed to connect to spawning parent actor!?\n'
|
f'Failed to connect to spawning parent actor!?\n'
|
||||||
|
f'\n'
|
||||||
f'x=> {parent_addr}\n'
|
f'x=> {parent_addr}\n'
|
||||||
f'|_{self}\n\n'
|
f' |_{self}\n\n'
|
||||||
)
|
)
|
||||||
await self.cancel(req_chan=None) # self cancel
|
await self.cancel(req_chan=None) # self cancel
|
||||||
raise
|
raise
|
||||||
|
@ -1316,13 +1341,13 @@ class Actor:
|
||||||
if (
|
if (
|
||||||
'[Errno 98] Address already in use'
|
'[Errno 98] Address already in use'
|
||||||
in
|
in
|
||||||
oserr.args[0]
|
oserr.args#[0]
|
||||||
):
|
):
|
||||||
log.exception(
|
log.exception(
|
||||||
f'Address already in use?\n'
|
f'Address already in use?\n'
|
||||||
f'{addr}\n'
|
f'{addr}\n'
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
listeners.append(listener)
|
listeners.append(listener)
|
||||||
|
|
||||||
await server_n.start(
|
await server_n.start(
|
||||||
|
@ -1337,8 +1362,10 @@ class Actor:
|
||||||
handler_nursery=handler_nursery
|
handler_nursery=handler_nursery
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
log.runtime(
|
# TODO, wow make this message better! XD
|
||||||
|
log.info(
|
||||||
'Started server(s)\n'
|
'Started server(s)\n'
|
||||||
|
+
|
||||||
'\n'.join([f'|_{addr}' for addr in listen_addrs])
|
'\n'.join([f'|_{addr}' for addr in listen_addrs])
|
||||||
)
|
)
|
||||||
self._listen_addrs.extend(listen_addrs)
|
self._listen_addrs.extend(listen_addrs)
|
||||||
|
@ -1457,8 +1484,13 @@ class Actor:
|
||||||
if self._server_down is not None:
|
if self._server_down is not None:
|
||||||
await self._server_down.wait()
|
await self._server_down.wait()
|
||||||
else:
|
else:
|
||||||
|
tpt_protos: list[str] = []
|
||||||
|
addr: Address
|
||||||
|
for addr in self._listen_addrs:
|
||||||
|
tpt_protos.append(addr.proto_key)
|
||||||
log.warning(
|
log.warning(
|
||||||
'Transport[TCP] server was cancelled start?'
|
'Transport server(s) may have been cancelled before started?\n'
|
||||||
|
f'protos: {tpt_protos!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# cancel all rpc tasks permanently
|
# cancel all rpc tasks permanently
|
||||||
|
@ -1745,41 +1777,6 @@ class Actor:
|
||||||
'''
|
'''
|
||||||
return self._peers[uid]
|
return self._peers[uid]
|
||||||
|
|
||||||
# TODO: move to `Channel.handshake(uid)`
|
|
||||||
async def _do_handshake(
|
|
||||||
self,
|
|
||||||
chan: Channel
|
|
||||||
|
|
||||||
) -> msgtypes.Aid:
|
|
||||||
'''
|
|
||||||
Exchange `(name, UUIDs)` identifiers as the first
|
|
||||||
communication step with any (peer) remote `Actor`.
|
|
||||||
|
|
||||||
These are essentially the "mailbox addresses" found in
|
|
||||||
"actor model" parlance.
|
|
||||||
|
|
||||||
'''
|
|
||||||
name, uuid = self.uid
|
|
||||||
await chan.send(
|
|
||||||
msgtypes.Aid(
|
|
||||||
name=name,
|
|
||||||
uuid=uuid,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
aid: msgtypes.Aid = await chan.recv()
|
|
||||||
chan.aid = aid
|
|
||||||
|
|
||||||
uid: tuple[str, str] = (
|
|
||||||
aid.name,
|
|
||||||
aid.uuid,
|
|
||||||
)
|
|
||||||
|
|
||||||
if not isinstance(uid, tuple):
|
|
||||||
raise ValueError(f"{uid} is not a valid uid?!")
|
|
||||||
|
|
||||||
chan.uid = uid
|
|
||||||
return uid
|
|
||||||
|
|
||||||
def is_infected_aio(self) -> bool:
|
def is_infected_aio(self) -> bool:
|
||||||
'''
|
'''
|
||||||
If `True`, this actor is running `trio` in guest mode on
|
If `True`, this actor is running `trio` in guest mode on
|
||||||
|
|
|
@ -52,6 +52,7 @@ from tractor._runtime import Actor
|
||||||
from tractor._entry import _mp_main
|
from tractor._entry import _mp_main
|
||||||
from tractor._exceptions import ActorFailure
|
from tractor._exceptions import ActorFailure
|
||||||
from tractor.msg.types import (
|
from tractor.msg.types import (
|
||||||
|
Aid,
|
||||||
SpawnSpec,
|
SpawnSpec,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -164,7 +165,7 @@ async def exhaust_portal(
|
||||||
# TODO: merge with above?
|
# TODO: merge with above?
|
||||||
log.warning(
|
log.warning(
|
||||||
'Cancelled portal result waiter task:\n'
|
'Cancelled portal result waiter task:\n'
|
||||||
f'uid: {portal.channel.uid}\n'
|
f'uid: {portal.channel.aid}\n'
|
||||||
f'error: {err}\n'
|
f'error: {err}\n'
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
|
@ -172,7 +173,7 @@ async def exhaust_portal(
|
||||||
else:
|
else:
|
||||||
log.debug(
|
log.debug(
|
||||||
f'Returning final result from portal:\n'
|
f'Returning final result from portal:\n'
|
||||||
f'uid: {portal.channel.uid}\n'
|
f'uid: {portal.channel.aid}\n'
|
||||||
f'result: {final}\n'
|
f'result: {final}\n'
|
||||||
)
|
)
|
||||||
return final
|
return final
|
||||||
|
@ -325,12 +326,12 @@ async def soft_kill(
|
||||||
see `.hard_kill()`).
|
see `.hard_kill()`).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
uid: tuple[str, str] = portal.channel.uid
|
peer_aid: Aid = portal.channel.aid
|
||||||
try:
|
try:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Soft killing sub-actor via portal request\n'
|
f'Soft killing sub-actor via portal request\n'
|
||||||
f'\n'
|
f'\n'
|
||||||
f'(c=> {portal.chan.uid}\n'
|
f'(c=> {peer_aid}\n'
|
||||||
f' |_{proc}\n'
|
f' |_{proc}\n'
|
||||||
)
|
)
|
||||||
# wait on sub-proc to signal termination
|
# wait on sub-proc to signal termination
|
||||||
|
@ -379,7 +380,7 @@ async def soft_kill(
|
||||||
if proc.poll() is None: # type: ignore
|
if proc.poll() is None: # type: ignore
|
||||||
log.warning(
|
log.warning(
|
||||||
'Subactor still alive after cancel request?\n\n'
|
'Subactor still alive after cancel request?\n\n'
|
||||||
f'uid: {uid}\n'
|
f'uid: {peer_aid}\n'
|
||||||
f'|_{proc}\n'
|
f'|_{proc}\n'
|
||||||
)
|
)
|
||||||
n.cancel_scope.cancel()
|
n.cancel_scope.cancel()
|
||||||
|
@ -460,6 +461,9 @@ async def trio_proc(
|
||||||
# the OS; it otherwise can be passed via the parent channel if
|
# the OS; it otherwise can be passed via the parent channel if
|
||||||
# we prefer in the future (for privacy).
|
# we prefer in the future (for privacy).
|
||||||
"--uid",
|
"--uid",
|
||||||
|
# TODO, how to pass this over "wire" encodings like
|
||||||
|
# cmdline args?
|
||||||
|
# -[ ] maybe we can add an `Aid.min_tuple()` ?
|
||||||
str(subactor.uid),
|
str(subactor.uid),
|
||||||
# Address the child must connect to on startup
|
# Address the child must connect to on startup
|
||||||
"--parent_addr",
|
"--parent_addr",
|
||||||
|
@ -725,7 +729,8 @@ async def mp_proc(
|
||||||
# channel should have handshake completed by the
|
# channel should have handshake completed by the
|
||||||
# local actor by the time we get a ref to it
|
# local actor by the time we get a ref to it
|
||||||
event, chan = await actor_nursery._actor.wait_for_peer(
|
event, chan = await actor_nursery._actor.wait_for_peer(
|
||||||
subactor.uid)
|
subactor.uid,
|
||||||
|
)
|
||||||
|
|
||||||
# XXX: monkey patch poll API to match the ``subprocess`` API..
|
# XXX: monkey patch poll API to match the ``subprocess`` API..
|
||||||
# not sure why they don't expose this but kk.
|
# not sure why they don't expose this but kk.
|
||||||
|
|
|
@ -437,22 +437,23 @@ class MsgStream(trio.abc.Channel):
|
||||||
message: str = (
|
message: str = (
|
||||||
f'Stream self-closed by {this_side!r}-side before EoC from {peer_side!r}\n'
|
f'Stream self-closed by {this_side!r}-side before EoC from {peer_side!r}\n'
|
||||||
# } bc a stream is a "scope"/msging-phase inside an IPC
|
# } bc a stream is a "scope"/msging-phase inside an IPC
|
||||||
f'x}}>\n'
|
f'c}}>\n'
|
||||||
f' |_{self}\n'
|
f' |_{self}\n'
|
||||||
)
|
)
|
||||||
log.cancel(message)
|
|
||||||
self._eoc = trio.EndOfChannel(message)
|
|
||||||
|
|
||||||
if (
|
if (
|
||||||
(rx_chan := self._rx_chan)
|
(rx_chan := self._rx_chan)
|
||||||
and
|
and
|
||||||
(stats := rx_chan.statistics()).tasks_waiting_receive
|
(stats := rx_chan.statistics()).tasks_waiting_receive
|
||||||
):
|
):
|
||||||
log.cancel(
|
message += (
|
||||||
f'Msg-stream is closing but there is still reader tasks,\n'
|
f'AND there is still reader tasks,\n'
|
||||||
|
f'\n'
|
||||||
f'{stats}\n'
|
f'{stats}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
log.cancel(message)
|
||||||
|
self._eoc = trio.EndOfChannel(message)
|
||||||
|
|
||||||
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
|
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
|
||||||
# => NO, DEFINITELY NOT! <=
|
# => NO, DEFINITELY NOT! <=
|
||||||
# if we're a bi-dir `MsgStream` BECAUSE this same
|
# if we're a bi-dir `MsgStream` BECAUSE this same
|
||||||
|
@ -811,13 +812,12 @@ async def open_stream_from_ctx(
|
||||||
# sanity, can remove?
|
# sanity, can remove?
|
||||||
assert eoc is stream._eoc
|
assert eoc is stream._eoc
|
||||||
|
|
||||||
log.warning(
|
log.runtime(
|
||||||
'Stream was terminated by EoC\n\n'
|
'Stream was terminated by EoC\n\n'
|
||||||
# NOTE: won't show the error <Type> but
|
# NOTE: won't show the error <Type> but
|
||||||
# does show txt followed by IPC msg.
|
# does show txt followed by IPC msg.
|
||||||
f'{str(eoc)}\n'
|
f'{str(eoc)}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if ctx._portal:
|
if ctx._portal:
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -73,6 +73,7 @@ from tractor.log import get_logger
|
||||||
from tractor._context import Context
|
from tractor._context import Context
|
||||||
from tractor import _state
|
from tractor import _state
|
||||||
from tractor._exceptions import (
|
from tractor._exceptions import (
|
||||||
|
DebugRequestError,
|
||||||
InternalError,
|
InternalError,
|
||||||
NoRuntime,
|
NoRuntime,
|
||||||
is_multi_cancelled,
|
is_multi_cancelled,
|
||||||
|
@ -1740,13 +1741,6 @@ def sigint_shield(
|
||||||
_pause_msg: str = 'Opening a pdb REPL in paused actor'
|
_pause_msg: str = 'Opening a pdb REPL in paused actor'
|
||||||
|
|
||||||
|
|
||||||
class DebugRequestError(RuntimeError):
|
|
||||||
'''
|
|
||||||
Failed to request stdio lock from root actor!
|
|
||||||
|
|
||||||
'''
|
|
||||||
|
|
||||||
|
|
||||||
_repl_fail_msg: str|None = (
|
_repl_fail_msg: str|None = (
|
||||||
'Failed to REPl via `_pause()` '
|
'Failed to REPl via `_pause()` '
|
||||||
)
|
)
|
||||||
|
|
|
@ -24,13 +24,13 @@ from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
contextmanager as cm,
|
contextmanager as cm,
|
||||||
)
|
)
|
||||||
import os
|
|
||||||
import platform
|
import platform
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
import typing
|
import typing
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
)
|
)
|
||||||
|
import warnings
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
@ -50,7 +50,10 @@ from tractor._exceptions import (
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
pack_from_raise,
|
pack_from_raise,
|
||||||
)
|
)
|
||||||
from tractor.msg import MsgCodec
|
from tractor.msg import (
|
||||||
|
Aid,
|
||||||
|
MsgCodec,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -86,8 +89,8 @@ class Channel:
|
||||||
# user in ``.from_stream()``.
|
# user in ``.from_stream()``.
|
||||||
self._transport: MsgTransport|None = transport
|
self._transport: MsgTransport|None = transport
|
||||||
|
|
||||||
# set after handshake - always uid of far end
|
# set after handshake - always info from peer end
|
||||||
self.uid: tuple[str, str]|None = None
|
self.aid: Aid|None = None
|
||||||
|
|
||||||
self._aiter_msgs = self._iter_msgs()
|
self._aiter_msgs = self._iter_msgs()
|
||||||
self._exc: Exception|None = None
|
self._exc: Exception|None = None
|
||||||
|
@ -99,6 +102,29 @@ class Channel:
|
||||||
# runtime.
|
# runtime.
|
||||||
self._cancel_called: bool = False
|
self._cancel_called: bool = False
|
||||||
|
|
||||||
|
@property
|
||||||
|
def uid(self) -> tuple[str, str]:
|
||||||
|
'''
|
||||||
|
Peer actor's unique id.
|
||||||
|
|
||||||
|
'''
|
||||||
|
msg: str = (
|
||||||
|
f'`{type(self).__name__}.uid` is now deprecated.\n'
|
||||||
|
'Use the new `.aid: tractor.msg.Aid` (struct) instead '
|
||||||
|
'which also provides additional named (optional) fields '
|
||||||
|
'beyond just the `.name` and `.uuid`.'
|
||||||
|
)
|
||||||
|
warnings.warn(
|
||||||
|
msg,
|
||||||
|
DeprecationWarning,
|
||||||
|
stacklevel=2,
|
||||||
|
)
|
||||||
|
peer_aid: Aid = self.aid
|
||||||
|
return (
|
||||||
|
peer_aid.name,
|
||||||
|
peer_aid.uuid,
|
||||||
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def stream(self) -> trio.abc.Stream | None:
|
def stream(self) -> trio.abc.Stream | None:
|
||||||
return self._transport.stream if self._transport else None
|
return self._transport.stream if self._transport else None
|
||||||
|
@ -182,9 +208,7 @@ class Channel:
|
||||||
f' _closed={self._closed}\n'
|
f' _closed={self._closed}\n'
|
||||||
f' _cancel_called={self._cancel_called}\n'
|
f' _cancel_called={self._cancel_called}\n'
|
||||||
f'\n'
|
f'\n'
|
||||||
f' |_runtime: Actor\n'
|
f' |_peer: {self.aid}\n'
|
||||||
f' pid={os.getpid()}\n'
|
|
||||||
f' uid={self.uid}\n'
|
|
||||||
f'\n'
|
f'\n'
|
||||||
f' |_msgstream: {tpt_name}\n'
|
f' |_msgstream: {tpt_name}\n'
|
||||||
f' proto={tpt.laddr.proto_key!r}\n'
|
f' proto={tpt.laddr.proto_key!r}\n'
|
||||||
|
@ -281,7 +305,7 @@ class Channel:
|
||||||
async def aclose(self) -> None:
|
async def aclose(self) -> None:
|
||||||
|
|
||||||
log.transport(
|
log.transport(
|
||||||
f'Closing channel to {self.uid} '
|
f'Closing channel to {self.aid} '
|
||||||
f'{self.laddr} -> {self.raddr}'
|
f'{self.laddr} -> {self.raddr}'
|
||||||
)
|
)
|
||||||
assert self._transport
|
assert self._transport
|
||||||
|
@ -381,6 +405,29 @@ class Channel:
|
||||||
def connected(self) -> bool:
|
def connected(self) -> bool:
|
||||||
return self._transport.connected() if self._transport else False
|
return self._transport.connected() if self._transport else False
|
||||||
|
|
||||||
|
async def _do_handshake(
|
||||||
|
self,
|
||||||
|
aid: Aid,
|
||||||
|
|
||||||
|
) -> Aid:
|
||||||
|
'''
|
||||||
|
Exchange `(name, UUIDs)` identifiers as the first
|
||||||
|
communication step with any (peer) remote `Actor`.
|
||||||
|
|
||||||
|
These are essentially the "mailbox addresses" found in
|
||||||
|
"actor model" parlance.
|
||||||
|
|
||||||
|
'''
|
||||||
|
await self.send(aid)
|
||||||
|
peer_aid: Aid = await self.recv()
|
||||||
|
log.runtime(
|
||||||
|
f'Received hanshake with peer actor,\n'
|
||||||
|
f'{peer_aid}\n'
|
||||||
|
)
|
||||||
|
# NOTE, we always are referencing the remote peer!
|
||||||
|
self.aid = peer_aid
|
||||||
|
return peer_aid
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def _connect_chan(
|
async def _connect_chan(
|
||||||
|
|
|
@ -139,20 +139,28 @@ class MsgpackUDSStream(MsgpackTransport):
|
||||||
**kwargs
|
**kwargs
|
||||||
) -> MsgpackUDSStream:
|
) -> MsgpackUDSStream:
|
||||||
|
|
||||||
filepath: Path
|
|
||||||
pid: int
|
|
||||||
(
|
|
||||||
filepath,
|
|
||||||
pid,
|
|
||||||
) = addr.unwrap()
|
|
||||||
|
|
||||||
# XXX NOTE, we don't need to provide the `.pid` part from
|
sockpath: Path = addr.sockpath
|
||||||
# the addr since the OS does this implicitly! .. lel
|
#
|
||||||
# stream = await trio.open_unix_socket(
|
# ^XXX NOTE, we don't provide any out-of-band `.pid` info
|
||||||
stream = await open_unix_socket_w_passcred(
|
# (like, over the socket as extra msgs) since the (augmented)
|
||||||
str(filepath),
|
# `.setsockopt()` call tells the OS provide it; the client
|
||||||
**kwargs
|
# pid can then be read on server/listen() side via
|
||||||
)
|
# `get_peer_info()` above.
|
||||||
|
try:
|
||||||
|
stream = await open_unix_socket_w_passcred(
|
||||||
|
str(sockpath),
|
||||||
|
**kwargs
|
||||||
|
)
|
||||||
|
except (
|
||||||
|
FileNotFoundError,
|
||||||
|
) as fdne:
|
||||||
|
raise ConnectionError(
|
||||||
|
f'Bad UDS socket-filepath-as-address ??\n'
|
||||||
|
f'{addr}\n'
|
||||||
|
f' |_sockpath: {sockpath}\n'
|
||||||
|
) from fdne
|
||||||
|
|
||||||
stream = MsgpackUDSStream(
|
stream = MsgpackUDSStream(
|
||||||
stream,
|
stream,
|
||||||
prefix_size=prefix_size,
|
prefix_size=prefix_size,
|
||||||
|
|
Loading…
Reference in New Issue