Compare commits

..

No commits in common. "a528d45a30ce823d9d131aa6c66703987366030b" and "dc68ea41182104e039bc8269dd042e98594807e8" have entirely different histories.

11 changed files with 137 additions and 279 deletions

View File

@ -217,14 +217,7 @@ class TCPAddress(Address):
cls,
addr: tuple[str, int]
) -> TCPAddress:
match addr:
case (str(), int()):
return TCPAddress(addr[0], addr[1])
case _:
raise ValueError(
f'Invalid unwrapped address for {cls}\n'
f'{addr}\n'
)
return TCPAddress(addr[0], addr[1])
def unwrap(self) -> tuple[str, int]:
return (
@ -235,6 +228,7 @@ class TCPAddress(Address):
@classmethod
def get_random(
cls,
current_actor: Actor,
bindspace: str = def_bindspace,
) -> TCPAddress:
return TCPAddress(bindspace, 0)
@ -281,15 +275,6 @@ class TCPAddress(Address):
...
def unwrap_sockpath(
sockpath: Path,
) -> tuple[Path, Path]:
return (
sockpath.parent,
sockpath.name,
)
class UDSAddress(Address):
# TODO, maybe we should use better field and value
# -[x] really this is a `.protocol_key` not a "name" of anything.
@ -302,36 +287,23 @@ class UDSAddress(Address):
def __init__(
self,
filedir: Path|str|None,
# TODO, i think i want `.filename` here?
filename: str|Path,
# XXX, in the sense you can also pass
filepath: str|Path,
maybe_pid: int,
# ^XXX, in the sense you can also pass
# a "non-real-world-process-id" such as is handy to represent
# our host-local default "port-like" key for the very first
# root actor to create a registry address.
maybe_pid: int|None = None,
):
fdir = self._filedir = Path(filedir or self.def_bindspace).absolute()
fpath = self._filepath = Path(filename)
fp: Path = fdir / fpath
assert fp.is_absolute()
# to track which "side" is the peer process by reading socket
# credentials-info.
self._filepath: Path = Path(filepath).absolute()
self._pid: int = maybe_pid
@property
def sockpath(self) -> Path:
return self._filedir / self._filepath
@property
def is_valid(self) -> bool:
'''
We block socket files not allocated under the runtime subdir.
'''
return self.bindspace in self.sockpath.parents
return self.bindspace in self._filepath.parents
@property
def bindspace(self) -> Path:
@ -340,43 +312,23 @@ class UDSAddress(Address):
just the sub-directory in which we allocate socket files.
'''
return self._filedir or self.def_bindspace
return self.def_bindspace
@classmethod
def from_addr(
cls,
addr: (
tuple[Path|str|None, int]
|Path|str
),
addr: tuple[Path, int]
) -> UDSAddress:
match addr:
case tuple()|list():
sockpath: Path = Path(addr[0])
filedir, filename = unwrap_sockpath(sockpath)
pid: int = addr[1]
return UDSAddress(
filedir=filedir,
filename=filename,
maybe_pid=pid,
)
# NOTE, in case we ever decide to just `.unwrap()`
# to a `Path|str`?
case str()|Path():
sockpath: Path = Path(addr)
return UDSAddress(*unwrap_sockpath(sockpath))
case _:
# import pdbp; pdbp.set_trace()
raise TypeError(
f'Bad unwrapped-address for {cls} !\n'
f'{addr!r}\n'
)
return UDSAddress(
filepath=addr[0],
maybe_pid=addr[1],
)
def unwrap(self) -> tuple[str, int]:
# XXX NOTE, since this gets passed DIRECTLY to
# `.ipc._uds.open_unix_socket_w_passcred()`
def unwrap(self) -> tuple[Path, int]:
return (
str(self.sockpath),
str(self._filepath),
# XXX NOTE, since this gets passed DIRECTLY to
# `open_unix_socket_w_passcred()` above!
self._pid,
)
@ -386,7 +338,7 @@ class UDSAddress(Address):
bindspace: Path|None = None, # default netns
) -> UDSAddress:
filedir: Path = bindspace or cls.def_bindspace
bs: Path = bindspace or get_rt_dir()
pid: int = os.getpid()
actor: Actor|None = current_actor(
err_on_no_runtime=False,
@ -399,27 +351,30 @@ class UDSAddress(Address):
prefix: str = 'root'
sockname: str = f'{prefix}@{pid}'
sockpath: Path = Path(f'{sockname}.sock')
sockpath: Path = Path(f'{bs}/{sockname}.sock')
return UDSAddress(
filedir=filedir,
filename=sockpath,
# filename=f'{tempfile.gettempdir()}/{uuid4()}.sock'
filepath=sockpath,
maybe_pid=pid,
)
@classmethod
def get_root(cls) -> Address:
def_uds_filepath: Path = 'registry@1616.sock'
def_uds_filepath: Path = (
get_rt_dir()
/
'registry@1616.sock'
)
return UDSAddress(
filedir=None,
filename=def_uds_filepath,
maybe_pid=1616,
filepath=def_uds_filepath,
maybe_pid=1616
)
def __repr__(self) -> str:
return (
f'{type(self).__name__}'
f'['
f'({self.sockpath}, {self._pid})'
f'({self._filepath}, {self._pid})'
f']'
)
@ -436,7 +391,7 @@ class UDSAddress(Address):
self,
**kwargs,
) -> SocketListener:
sock = self._sock = socket.socket(
self._sock = socket.socket(
socket.AF_UNIX,
socket.SOCK_STREAM
)
@ -445,10 +400,8 @@ class UDSAddress(Address):
f'>[\n'
f'|_{self}\n'
)
bindpath: Path = self.sockpath
await sock.bind(str(bindpath))
sock.listen(1)
await self._sock.bind(self._filepath)
self._sock.listen(1)
log.info(
f'Listening on UDS socket\n'
f'[>\n'
@ -458,7 +411,7 @@ class UDSAddress(Address):
def close_listener(self):
self._sock.close()
os.unlink(self.sockpath)
os.unlink(self._filepath)
preferred_transport: str = 'uds'
@ -502,55 +455,26 @@ def mk_uuid() -> str:
def wrap_address(
addr: UnwrappedAddress
) -> Address:
'''
Wrap an `UnwrappedAddress` as an `Address`-type based
on matching builtin python data-structures which we adhoc
use for each.
XXX NOTE, careful care must be placed to ensure
`UnwrappedAddress` cases are **definitely unique** otherwise the
wrong transport backend may be loaded and will break many
low-level things in our runtime in a not-fun-to-debug way!
XD
'''
if is_wrapped_addr(addr):
return addr
cls: Type|None = None
# if 'sock' in addr[0]:
# import pdbp; pdbp.set_trace()
match addr:
# TODO! BUT THIS WILL MATCH FOR TCP !...
# -[ ] so prolly go back to what guille had orig XD
# a plain ol' `str`?
case ((
case (
str()|Path(),
int(),
)):
):
cls = UDSAddress
# classic network socket-address as tuple/list
case (
(str(), int())
|
[str(), int()]
):
case tuple() | list():
cls = TCPAddress
# likely an unset UDS or TCP reg address as defaulted in
# `_state._runtime_vars['_root_mailbox']`
case (
None
|
[None, None]
):
case None:
cls: Type[Address] = get_address_cls(preferred_transport)
addr: UnwrappedAddress = cls.get_root().unwrap()
case _:
# import pdbp; pdbp.set_trace()
raise TypeError(
f'Can not wrap address {type(addr)}\n'
f'{addr!r}\n'

View File

@ -366,7 +366,7 @@ class Context:
# f' ---\n'
f' |_ipc: {self.dst_maddr}\n'
# f' dst_maddr{ds}{self.dst_maddr}\n'
f" uid{ds}'{self.chan.aid}'\n"
f" uid{ds}'{self.chan.uid}'\n"
f" cid{ds}'{self.cid}'\n"
# f' ---\n'
f'\n'
@ -945,10 +945,10 @@ class Context:
reminfo: str = (
# ' =>\n'
# f'Context.cancel() => {self.chan.uid}\n'
f'\n'
f'c)=> {self.chan.uid}\n'
f' |_[{self.dst_maddr}\n'
f' >>{self.repr_rpc}\n'
# f'{self.chan.uid}\n'
f' |_ @{self.dst_maddr}\n'
f' >> {self.repr_rpc}\n'
# f' >> {self._nsf}() -> {codec}[dict]:\n\n'
# TODO: pull msg-type from spec re #320
)

View File

@ -126,12 +126,6 @@ class TrioTaskExited(Exception):
'''
class DebugRequestError(RuntimeError):
'''
Failed to request stdio lock from root actor!
'''
# NOTE: more or less should be close to these:
# 'boxed_type',
# 'src_type',
@ -197,8 +191,6 @@ def get_err_type(type_name: str) -> BaseException|None:
):
return type_ref
return None
def pack_from_raise(
local_err: (
@ -1017,10 +1009,7 @@ class TransportClosed(trio.BrokenResourceError):
f' {cause}\n' # exc repr
)
getattr(
log,
self._loglevel
)(message)
getattr(log, self._loglevel)(message)
# some errors we want to blow up from
# inside the RPC msg loop

View File

@ -175,7 +175,7 @@ class Portal:
# not expecting a "main" result
if self._expect_result_ctx is None:
log.warning(
f"Portal for {self.channel.aid} not expecting a final"
f"Portal for {self.channel.uid} not expecting a final"
" result?\nresult() should only be called if subactor"
" was spawned with `ActorNursery.run_in_actor()`")
return NoResult
@ -222,7 +222,7 @@ class Portal:
# IPC calls
if self._streams:
log.cancel(
f"Cancelling all streams with {self.channel.aid}")
f"Cancelling all streams with {self.channel.uid}")
for stream in self._streams.copy():
try:
await stream.aclose()
@ -267,7 +267,7 @@ class Portal:
return False
reminfo: str = (
f'c)=> {self.channel.aid}\n'
f'c)=> {self.channel.uid}\n'
f' |_{chan}\n'
)
log.cancel(
@ -310,7 +310,7 @@ class Portal:
):
log.debug(
'IPC chan for actor already closed or broken?\n\n'
f'{self.channel.aid}\n'
f'{self.channel.uid}\n'
f' |_{self.channel}\n'
)
return False
@ -551,10 +551,8 @@ async def open_portal(
await channel.connect()
was_connected = True
if channel.aid is None:
await channel._do_handshake(
aid=actor.aid,
)
if channel.uid is None:
await actor._do_handshake(channel)
msg_loop_cs: trio.CancelScope|None = None
if start_msg_loop:

View File

@ -1219,10 +1219,8 @@ async def process_messages(
# -[ ] figure out how this will break with other transports?
tc.report_n_maybe_raise(
message=(
f'peer IPC channel closed abruptly?\n'
f'\n'
f'<=x[\n'
f' {chan}\n'
f'peer IPC channel closed abruptly?\n\n'
f'<=x {chan}\n'
f' |_{chan.raddr}\n\n'
)
+

View File

@ -289,9 +289,7 @@ class Actor:
@property
def aid(self) -> msgtypes.Aid:
'''
This process-singleton-actor's "unique actor ID" in struct form.
See the `tractor.msg.Aid` struct for details.
This process-singleton-actor's "unique ID" in struct form.
'''
return self._aid
@ -310,17 +308,6 @@ class Actor:
process plane.
'''
msg: str = (
f'`{type(self).__name__}.uid` is now deprecated.\n'
'Use the new `.aid: tractor.msg.Aid` (struct) instead '
'which also provides additional named (optional) fields '
'beyond just the `.name` and `.uuid`.'
)
warnings.warn(
msg,
DeprecationWarning,
stacklevel=2,
)
return (
self._aid.name,
self._aid.uuid,
@ -508,9 +495,7 @@ class Actor:
# send/receive initial handshake response
try:
peer_aid: msgtypes.Aid = await chan._do_handshake(
aid=self.aid,
)
uid: tuple|None = await self._do_handshake(chan)
except (
TransportClosed,
# ^XXX NOTE, the above wraps `trio` exc types raised
@ -539,12 +524,6 @@ class Actor:
)
return
uid: tuple[str, str] = (
peer_aid.name,
peer_aid.uuid,
)
# TODO, can we make this downstream peer tracking use the
# `peer_aid` instead?
familiar: str = 'new-peer'
if _pre_chan := self._peers.get(uid):
familiar: str = 'pre-existing-peer'
@ -1148,8 +1127,9 @@ class Actor:
)
assert isinstance(chan, Channel)
# TODO: move this into a `Channel.handshake()`?
# Initial handshake: swap names.
await chan._do_handshake(aid=self.aid)
await self._do_handshake(chan)
accept_addrs: list[UnwrappedAddress]|None = None
@ -1290,16 +1270,11 @@ class Actor:
# -[ ] need to extend the `SpawnSpec` tho!
)
# failed to connect back?
except (
OSError,
ConnectionError,
):
except OSError: # failed to connect
log.warning(
f'Failed to connect to spawning parent actor!?\n'
f'\n'
f'x=> {parent_addr}\n'
f' |_{self}\n\n'
f'|_{self}\n\n'
)
await self.cancel(req_chan=None) # self cancel
raise
@ -1341,13 +1316,13 @@ class Actor:
if (
'[Errno 98] Address already in use'
in
oserr.args#[0]
oserr.args[0]
):
log.exception(
f'Address already in use?\n'
f'{addr}\n'
)
raise
raise
listeners.append(listener)
await server_n.start(
@ -1362,10 +1337,8 @@ class Actor:
handler_nursery=handler_nursery
)
)
# TODO, wow make this message better! XD
log.info(
log.runtime(
'Started server(s)\n'
+
'\n'.join([f'|_{addr}' for addr in listen_addrs])
)
self._listen_addrs.extend(listen_addrs)
@ -1484,13 +1457,8 @@ class Actor:
if self._server_down is not None:
await self._server_down.wait()
else:
tpt_protos: list[str] = []
addr: Address
for addr in self._listen_addrs:
tpt_protos.append(addr.proto_key)
log.warning(
'Transport server(s) may have been cancelled before started?\n'
f'protos: {tpt_protos!r}\n'
'Transport[TCP] server was cancelled start?'
)
# cancel all rpc tasks permanently
@ -1777,6 +1745,41 @@ class Actor:
'''
return self._peers[uid]
# TODO: move to `Channel.handshake(uid)`
async def _do_handshake(
self,
chan: Channel
) -> msgtypes.Aid:
'''
Exchange `(name, UUIDs)` identifiers as the first
communication step with any (peer) remote `Actor`.
These are essentially the "mailbox addresses" found in
"actor model" parlance.
'''
name, uuid = self.uid
await chan.send(
msgtypes.Aid(
name=name,
uuid=uuid,
)
)
aid: msgtypes.Aid = await chan.recv()
chan.aid = aid
uid: tuple[str, str] = (
aid.name,
aid.uuid,
)
if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid
return uid
def is_infected_aio(self) -> bool:
'''
If `True`, this actor is running `trio` in guest mode on

View File

@ -52,7 +52,6 @@ from tractor._runtime import Actor
from tractor._entry import _mp_main
from tractor._exceptions import ActorFailure
from tractor.msg.types import (
Aid,
SpawnSpec,
)
@ -165,7 +164,7 @@ async def exhaust_portal(
# TODO: merge with above?
log.warning(
'Cancelled portal result waiter task:\n'
f'uid: {portal.channel.aid}\n'
f'uid: {portal.channel.uid}\n'
f'error: {err}\n'
)
return err
@ -173,7 +172,7 @@ async def exhaust_portal(
else:
log.debug(
f'Returning final result from portal:\n'
f'uid: {portal.channel.aid}\n'
f'uid: {portal.channel.uid}\n'
f'result: {final}\n'
)
return final
@ -326,12 +325,12 @@ async def soft_kill(
see `.hard_kill()`).
'''
peer_aid: Aid = portal.channel.aid
uid: tuple[str, str] = portal.channel.uid
try:
log.cancel(
f'Soft killing sub-actor via portal request\n'
f'\n'
f'(c=> {peer_aid}\n'
f'(c=> {portal.chan.uid}\n'
f' |_{proc}\n'
)
# wait on sub-proc to signal termination
@ -380,7 +379,7 @@ async def soft_kill(
if proc.poll() is None: # type: ignore
log.warning(
'Subactor still alive after cancel request?\n\n'
f'uid: {peer_aid}\n'
f'uid: {uid}\n'
f'|_{proc}\n'
)
n.cancel_scope.cancel()
@ -461,9 +460,6 @@ async def trio_proc(
# the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy).
"--uid",
# TODO, how to pass this over "wire" encodings like
# cmdline args?
# -[ ] maybe we can add an `Aid.min_tuple()` ?
str(subactor.uid),
# Address the child must connect to on startup
"--parent_addr",
@ -729,8 +725,7 @@ async def mp_proc(
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid,
)
subactor.uid)
# XXX: monkey patch poll API to match the ``subprocess`` API..
# not sure why they don't expose this but kk.

View File

@ -437,23 +437,22 @@ class MsgStream(trio.abc.Channel):
message: str = (
f'Stream self-closed by {this_side!r}-side before EoC from {peer_side!r}\n'
# } bc a stream is a "scope"/msging-phase inside an IPC
f'c}}>\n'
f'x}}>\n'
f' |_{self}\n'
)
log.cancel(message)
self._eoc = trio.EndOfChannel(message)
if (
(rx_chan := self._rx_chan)
and
(stats := rx_chan.statistics()).tasks_waiting_receive
):
message += (
f'AND there is still reader tasks,\n'
f'\n'
log.cancel(
f'Msg-stream is closing but there is still reader tasks,\n'
f'{stats}\n'
)
log.cancel(message)
self._eoc = trio.EndOfChannel(message)
# ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
# => NO, DEFINITELY NOT! <=
# if we're a bi-dir `MsgStream` BECAUSE this same
@ -812,12 +811,13 @@ async def open_stream_from_ctx(
# sanity, can remove?
assert eoc is stream._eoc
log.runtime(
log.warning(
'Stream was terminated by EoC\n\n'
# NOTE: won't show the error <Type> but
# does show txt followed by IPC msg.
f'{str(eoc)}\n'
)
finally:
if ctx._portal:
try:

View File

@ -73,7 +73,6 @@ from tractor.log import get_logger
from tractor._context import Context
from tractor import _state
from tractor._exceptions import (
DebugRequestError,
InternalError,
NoRuntime,
is_multi_cancelled,
@ -1741,6 +1740,13 @@ def sigint_shield(
_pause_msg: str = 'Opening a pdb REPL in paused actor'
class DebugRequestError(RuntimeError):
'''
Failed to request stdio lock from root actor!
'''
_repl_fail_msg: str|None = (
'Failed to REPl via `_pause()` '
)

View File

@ -24,13 +24,13 @@ from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
)
import os
import platform
from pprint import pformat
import typing
from typing import (
Any,
)
import warnings
import trio
@ -50,10 +50,7 @@ from tractor._exceptions import (
MsgTypeError,
pack_from_raise,
)
from tractor.msg import (
Aid,
MsgCodec,
)
from tractor.msg import MsgCodec
log = get_logger(__name__)
@ -89,8 +86,8 @@ class Channel:
# user in ``.from_stream()``.
self._transport: MsgTransport|None = transport
# set after handshake - always info from peer end
self.aid: Aid|None = None
# set after handshake - always uid of far end
self.uid: tuple[str, str]|None = None
self._aiter_msgs = self._iter_msgs()
self._exc: Exception|None = None
@ -102,29 +99,6 @@ class Channel:
# runtime.
self._cancel_called: bool = False
@property
def uid(self) -> tuple[str, str]:
'''
Peer actor's unique id.
'''
msg: str = (
f'`{type(self).__name__}.uid` is now deprecated.\n'
'Use the new `.aid: tractor.msg.Aid` (struct) instead '
'which also provides additional named (optional) fields '
'beyond just the `.name` and `.uuid`.'
)
warnings.warn(
msg,
DeprecationWarning,
stacklevel=2,
)
peer_aid: Aid = self.aid
return (
peer_aid.name,
peer_aid.uuid,
)
@property
def stream(self) -> trio.abc.Stream | None:
return self._transport.stream if self._transport else None
@ -208,7 +182,9 @@ class Channel:
f' _closed={self._closed}\n'
f' _cancel_called={self._cancel_called}\n'
f'\n'
f' |_peer: {self.aid}\n'
f' |_runtime: Actor\n'
f' pid={os.getpid()}\n'
f' uid={self.uid}\n'
f'\n'
f' |_msgstream: {tpt_name}\n'
f' proto={tpt.laddr.proto_key!r}\n'
@ -305,7 +281,7 @@ class Channel:
async def aclose(self) -> None:
log.transport(
f'Closing channel to {self.aid} '
f'Closing channel to {self.uid} '
f'{self.laddr} -> {self.raddr}'
)
assert self._transport
@ -405,29 +381,6 @@ class Channel:
def connected(self) -> bool:
return self._transport.connected() if self._transport else False
async def _do_handshake(
self,
aid: Aid,
) -> Aid:
'''
Exchange `(name, UUIDs)` identifiers as the first
communication step with any (peer) remote `Actor`.
These are essentially the "mailbox addresses" found in
"actor model" parlance.
'''
await self.send(aid)
peer_aid: Aid = await self.recv()
log.runtime(
f'Received hanshake with peer actor,\n'
f'{peer_aid}\n'
)
# NOTE, we always are referencing the remote peer!
self.aid = peer_aid
return peer_aid
@acm
async def _connect_chan(

View File

@ -139,28 +139,20 @@ class MsgpackUDSStream(MsgpackTransport):
**kwargs
) -> MsgpackUDSStream:
filepath: Path
pid: int
(
filepath,
pid,
) = addr.unwrap()
sockpath: Path = addr.sockpath
#
# ^XXX NOTE, we don't provide any out-of-band `.pid` info
# (like, over the socket as extra msgs) since the (augmented)
# `.setsockopt()` call tells the OS provide it; the client
# pid can then be read on server/listen() side via
# `get_peer_info()` above.
try:
stream = await open_unix_socket_w_passcred(
str(sockpath),
**kwargs
)
except (
FileNotFoundError,
) as fdne:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {sockpath}\n'
) from fdne
# XXX NOTE, we don't need to provide the `.pid` part from
# the addr since the OS does this implicitly! .. lel
# stream = await trio.open_unix_socket(
stream = await open_unix_socket_w_passcred(
str(filepath),
**kwargs
)
stream = MsgpackUDSStream(
stream,
prefix_size=prefix_size,