`ipc._uds`: assign `.l/raddr` in `.connect_to()`

Using `.get_stream_addrs()` such that we always (*can*) assign the peer
end's PID in the `._raddr`.

Also factor common `ConnectionError` re-raising into
a `_reraise_as_connerr()`-@cm.
to_asyncio_eoc_signal
Tyler Goodlet 2025-07-24 23:16:30 -04:00
parent 11c4e65757
commit ccc3b1fce1
1 changed files with 42 additions and 21 deletions

View File

@ -18,6 +18,9 @@ Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protoco
'''
from __future__ import annotations
from contextlib import (
contextmanager as cm,
)
from pathlib import Path
import os
from socket import (
@ -29,6 +32,7 @@ from socket import (
)
import struct
from typing import (
Type,
TYPE_CHECKING,
ClassVar,
)
@ -205,6 +209,22 @@ class UDSAddress(
f']'
)
@cm
def _reraise_as_connerr(
src_excs: tuple[Type[Exception]],
addr: UDSAddress,
):
try:
yield
except src_excs as src_exc:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {addr.sockpath}\n'
f'\n'
f'from src: {src_exc!r}\n'
) from src_exc
async def start_listener(
addr: UDSAddress,
@ -222,16 +242,14 @@ async def start_listener(
)
bindpath: Path = addr.sockpath
try:
with _reraise_as_connerr(
src_excs=(
FileNotFoundError,
OSError,
),
addr=addr
):
await sock.bind(str(bindpath))
except (
FileNotFoundError,
) as fdne:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {addr.sockpath}\n'
) from fdne
sock.listen(1)
log.info(
@ -356,27 +374,30 @@ class MsgpackUDSStream(MsgpackTransport):
# `.setsockopt()` call tells the OS provide it; the client
# pid can then be read on server/listen() side via
# `get_peer_info()` above.
try:
with _reraise_as_connerr(
src_excs=(
FileNotFoundError,
),
addr=addr
):
stream = await open_unix_socket_w_passcred(
str(sockpath),
**kwargs
)
except (
FileNotFoundError,
) as fdne:
raise ConnectionError(
f'Bad UDS socket-filepath-as-address ??\n'
f'{addr}\n'
f' |_sockpath: {sockpath}\n'
) from fdne
stream = MsgpackUDSStream(
tpt_stream = MsgpackUDSStream(
stream,
prefix_size=prefix_size,
codec=codec
)
stream._raddr = addr
return stream
# XXX assign from new addrs after peer-PID extract!
(
tpt_stream._laddr,
tpt_stream._raddr,
) = cls.get_stream_addrs(stream)
return tpt_stream
@classmethod
def get_stream_addrs(