Mv `Actor._do_handshake()` to `Channel`, add `.aid`
Finally.. i've been meaning todo this for ages since the actor-id-swap-as-handshake is better layered as part of the IPC msg-ing machinery and then let's us encapsulate the connection-time-assignment of a remote peer's `Aid` as a new `Channel.aid: Aid`. For now we continue to offer the `.uid: tuple[str, str]` attr (by delegating to the `.uid` field) since there's still a few things relying on it in the runtime and ctx layers Nice bonuses from this, - it's very easy to get the peer's `Aid.pid: int` from anywhere in an IPC ctx by just reading it from the chan. - we aren't saving more then the wire struct-msg received. Also add deprecation warnings around usage to get us moving on porting the rest of consuming runtime code to the new attr!leslies_extra_appendix
parent
35acc5a3d5
commit
dd3e918cfe
|
@ -289,7 +289,9 @@ class Actor:
|
||||||
@property
|
@property
|
||||||
def aid(self) -> msgtypes.Aid:
|
def aid(self) -> msgtypes.Aid:
|
||||||
'''
|
'''
|
||||||
This process-singleton-actor's "unique ID" in struct form.
|
This process-singleton-actor's "unique actor ID" in struct form.
|
||||||
|
|
||||||
|
See the `tractor.msg.Aid` struct for details.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self._aid
|
return self._aid
|
||||||
|
@ -308,6 +310,17 @@ class Actor:
|
||||||
process plane.
|
process plane.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
msg: str = (
|
||||||
|
f'`{type(self).__name__}.uid` is now deprecated.\n'
|
||||||
|
'Use the new `.aid: tractor.msg.Aid` (struct) instead '
|
||||||
|
'which also provides additional named (optional) fields '
|
||||||
|
'beyond just the `.name` and `.uuid`.'
|
||||||
|
)
|
||||||
|
warnings.warn(
|
||||||
|
msg,
|
||||||
|
DeprecationWarning,
|
||||||
|
stacklevel=2,
|
||||||
|
)
|
||||||
return (
|
return (
|
||||||
self._aid.name,
|
self._aid.name,
|
||||||
self._aid.uuid,
|
self._aid.uuid,
|
||||||
|
@ -495,7 +508,9 @@ class Actor:
|
||||||
|
|
||||||
# send/receive initial handshake response
|
# send/receive initial handshake response
|
||||||
try:
|
try:
|
||||||
uid: tuple|None = await self._do_handshake(chan)
|
peer_aid: msgtypes.Aid = await chan._do_handshake(
|
||||||
|
aid=self.aid,
|
||||||
|
)
|
||||||
except (
|
except (
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
# ^XXX NOTE, the above wraps `trio` exc types raised
|
# ^XXX NOTE, the above wraps `trio` exc types raised
|
||||||
|
@ -524,6 +539,12 @@ class Actor:
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
uid: tuple[str, str] = (
|
||||||
|
peer_aid.name,
|
||||||
|
peer_aid.uuid,
|
||||||
|
)
|
||||||
|
# TODO, can we make this downstream peer tracking use the
|
||||||
|
# `peer_aid` instead?
|
||||||
familiar: str = 'new-peer'
|
familiar: str = 'new-peer'
|
||||||
if _pre_chan := self._peers.get(uid):
|
if _pre_chan := self._peers.get(uid):
|
||||||
familiar: str = 'pre-existing-peer'
|
familiar: str = 'pre-existing-peer'
|
||||||
|
@ -1127,9 +1148,8 @@ class Actor:
|
||||||
)
|
)
|
||||||
assert isinstance(chan, Channel)
|
assert isinstance(chan, Channel)
|
||||||
|
|
||||||
# TODO: move this into a `Channel.handshake()`?
|
|
||||||
# Initial handshake: swap names.
|
# Initial handshake: swap names.
|
||||||
await self._do_handshake(chan)
|
await chan._do_handshake(aid=self.aid)
|
||||||
|
|
||||||
accept_addrs: list[UnwrappedAddress]|None = None
|
accept_addrs: list[UnwrappedAddress]|None = None
|
||||||
|
|
||||||
|
@ -1270,11 +1290,16 @@ class Actor:
|
||||||
# -[ ] need to extend the `SpawnSpec` tho!
|
# -[ ] need to extend the `SpawnSpec` tho!
|
||||||
)
|
)
|
||||||
|
|
||||||
except OSError: # failed to connect
|
# failed to connect back?
|
||||||
|
except (
|
||||||
|
OSError,
|
||||||
|
ConnectionError,
|
||||||
|
):
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Failed to connect to spawning parent actor!?\n'
|
f'Failed to connect to spawning parent actor!?\n'
|
||||||
|
f'\n'
|
||||||
f'x=> {parent_addr}\n'
|
f'x=> {parent_addr}\n'
|
||||||
f'|_{self}\n\n'
|
f' |_{self}\n\n'
|
||||||
)
|
)
|
||||||
await self.cancel(req_chan=None) # self cancel
|
await self.cancel(req_chan=None) # self cancel
|
||||||
raise
|
raise
|
||||||
|
@ -1316,13 +1341,13 @@ class Actor:
|
||||||
if (
|
if (
|
||||||
'[Errno 98] Address already in use'
|
'[Errno 98] Address already in use'
|
||||||
in
|
in
|
||||||
oserr.args[0]
|
oserr.args#[0]
|
||||||
):
|
):
|
||||||
log.exception(
|
log.exception(
|
||||||
f'Address already in use?\n'
|
f'Address already in use?\n'
|
||||||
f'{addr}\n'
|
f'{addr}\n'
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
listeners.append(listener)
|
listeners.append(listener)
|
||||||
|
|
||||||
await server_n.start(
|
await server_n.start(
|
||||||
|
@ -1337,8 +1362,10 @@ class Actor:
|
||||||
handler_nursery=handler_nursery
|
handler_nursery=handler_nursery
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
log.runtime(
|
# TODO, wow make this message better! XD
|
||||||
|
log.info(
|
||||||
'Started server(s)\n'
|
'Started server(s)\n'
|
||||||
|
+
|
||||||
'\n'.join([f'|_{addr}' for addr in listen_addrs])
|
'\n'.join([f'|_{addr}' for addr in listen_addrs])
|
||||||
)
|
)
|
||||||
self._listen_addrs.extend(listen_addrs)
|
self._listen_addrs.extend(listen_addrs)
|
||||||
|
@ -1457,8 +1484,13 @@ class Actor:
|
||||||
if self._server_down is not None:
|
if self._server_down is not None:
|
||||||
await self._server_down.wait()
|
await self._server_down.wait()
|
||||||
else:
|
else:
|
||||||
|
tpt_protos: list[str] = []
|
||||||
|
addr: Address
|
||||||
|
for addr in self._listen_addrs:
|
||||||
|
tpt_protos.append(addr.proto_key)
|
||||||
log.warning(
|
log.warning(
|
||||||
'Transport[TCP] server was cancelled start?'
|
'Transport server(s) may have been cancelled before started?\n'
|
||||||
|
f'protos: {tpt_protos!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# cancel all rpc tasks permanently
|
# cancel all rpc tasks permanently
|
||||||
|
@ -1745,41 +1777,6 @@ class Actor:
|
||||||
'''
|
'''
|
||||||
return self._peers[uid]
|
return self._peers[uid]
|
||||||
|
|
||||||
# TODO: move to `Channel.handshake(uid)`
|
|
||||||
async def _do_handshake(
|
|
||||||
self,
|
|
||||||
chan: Channel
|
|
||||||
|
|
||||||
) -> msgtypes.Aid:
|
|
||||||
'''
|
|
||||||
Exchange `(name, UUIDs)` identifiers as the first
|
|
||||||
communication step with any (peer) remote `Actor`.
|
|
||||||
|
|
||||||
These are essentially the "mailbox addresses" found in
|
|
||||||
"actor model" parlance.
|
|
||||||
|
|
||||||
'''
|
|
||||||
name, uuid = self.uid
|
|
||||||
await chan.send(
|
|
||||||
msgtypes.Aid(
|
|
||||||
name=name,
|
|
||||||
uuid=uuid,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
aid: msgtypes.Aid = await chan.recv()
|
|
||||||
chan.aid = aid
|
|
||||||
|
|
||||||
uid: tuple[str, str] = (
|
|
||||||
aid.name,
|
|
||||||
aid.uuid,
|
|
||||||
)
|
|
||||||
|
|
||||||
if not isinstance(uid, tuple):
|
|
||||||
raise ValueError(f"{uid} is not a valid uid?!")
|
|
||||||
|
|
||||||
chan.uid = uid
|
|
||||||
return uid
|
|
||||||
|
|
||||||
def is_infected_aio(self) -> bool:
|
def is_infected_aio(self) -> bool:
|
||||||
'''
|
'''
|
||||||
If `True`, this actor is running `trio` in guest mode on
|
If `True`, this actor is running `trio` in guest mode on
|
||||||
|
|
|
@ -24,13 +24,13 @@ from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
contextmanager as cm,
|
contextmanager as cm,
|
||||||
)
|
)
|
||||||
import os
|
|
||||||
import platform
|
import platform
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
import typing
|
import typing
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
)
|
)
|
||||||
|
import warnings
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
@ -50,7 +50,10 @@ from tractor._exceptions import (
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
pack_from_raise,
|
pack_from_raise,
|
||||||
)
|
)
|
||||||
from tractor.msg import MsgCodec
|
from tractor.msg import (
|
||||||
|
Aid,
|
||||||
|
MsgCodec,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -86,8 +89,8 @@ class Channel:
|
||||||
# user in ``.from_stream()``.
|
# user in ``.from_stream()``.
|
||||||
self._transport: MsgTransport|None = transport
|
self._transport: MsgTransport|None = transport
|
||||||
|
|
||||||
# set after handshake - always uid of far end
|
# set after handshake - always info from peer end
|
||||||
self.uid: tuple[str, str]|None = None
|
self.aid: Aid|None = None
|
||||||
|
|
||||||
self._aiter_msgs = self._iter_msgs()
|
self._aiter_msgs = self._iter_msgs()
|
||||||
self._exc: Exception|None = None
|
self._exc: Exception|None = None
|
||||||
|
@ -99,6 +102,29 @@ class Channel:
|
||||||
# runtime.
|
# runtime.
|
||||||
self._cancel_called: bool = False
|
self._cancel_called: bool = False
|
||||||
|
|
||||||
|
@property
|
||||||
|
def uid(self) -> tuple[str, str]:
|
||||||
|
'''
|
||||||
|
Peer actor's unique id.
|
||||||
|
|
||||||
|
'''
|
||||||
|
msg: str = (
|
||||||
|
f'`{type(self).__name__}.uid` is now deprecated.\n'
|
||||||
|
'Use the new `.aid: tractor.msg.Aid` (struct) instead '
|
||||||
|
'which also provides additional named (optional) fields '
|
||||||
|
'beyond just the `.name` and `.uuid`.'
|
||||||
|
)
|
||||||
|
warnings.warn(
|
||||||
|
msg,
|
||||||
|
DeprecationWarning,
|
||||||
|
stacklevel=2,
|
||||||
|
)
|
||||||
|
peer_aid: Aid = self.aid
|
||||||
|
return (
|
||||||
|
peer_aid.name,
|
||||||
|
peer_aid.uuid,
|
||||||
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def stream(self) -> trio.abc.Stream | None:
|
def stream(self) -> trio.abc.Stream | None:
|
||||||
return self._transport.stream if self._transport else None
|
return self._transport.stream if self._transport else None
|
||||||
|
@ -182,9 +208,7 @@ class Channel:
|
||||||
f' _closed={self._closed}\n'
|
f' _closed={self._closed}\n'
|
||||||
f' _cancel_called={self._cancel_called}\n'
|
f' _cancel_called={self._cancel_called}\n'
|
||||||
f'\n'
|
f'\n'
|
||||||
f' |_runtime: Actor\n'
|
f' |_peer: {self.aid}\n'
|
||||||
f' pid={os.getpid()}\n'
|
|
||||||
f' uid={self.uid}\n'
|
|
||||||
f'\n'
|
f'\n'
|
||||||
f' |_msgstream: {tpt_name}\n'
|
f' |_msgstream: {tpt_name}\n'
|
||||||
f' proto={tpt.laddr.proto_key!r}\n'
|
f' proto={tpt.laddr.proto_key!r}\n'
|
||||||
|
@ -281,7 +305,7 @@ class Channel:
|
||||||
async def aclose(self) -> None:
|
async def aclose(self) -> None:
|
||||||
|
|
||||||
log.transport(
|
log.transport(
|
||||||
f'Closing channel to {self.uid} '
|
f'Closing channel to {self.aid} '
|
||||||
f'{self.laddr} -> {self.raddr}'
|
f'{self.laddr} -> {self.raddr}'
|
||||||
)
|
)
|
||||||
assert self._transport
|
assert self._transport
|
||||||
|
@ -381,6 +405,29 @@ class Channel:
|
||||||
def connected(self) -> bool:
|
def connected(self) -> bool:
|
||||||
return self._transport.connected() if self._transport else False
|
return self._transport.connected() if self._transport else False
|
||||||
|
|
||||||
|
async def _do_handshake(
|
||||||
|
self,
|
||||||
|
aid: Aid,
|
||||||
|
|
||||||
|
) -> Aid:
|
||||||
|
'''
|
||||||
|
Exchange `(name, UUIDs)` identifiers as the first
|
||||||
|
communication step with any (peer) remote `Actor`.
|
||||||
|
|
||||||
|
These are essentially the "mailbox addresses" found in
|
||||||
|
"actor model" parlance.
|
||||||
|
|
||||||
|
'''
|
||||||
|
await self.send(aid)
|
||||||
|
peer_aid: Aid = await self.recv()
|
||||||
|
log.runtime(
|
||||||
|
f'Received hanshake with peer actor,\n'
|
||||||
|
f'{peer_aid}\n'
|
||||||
|
)
|
||||||
|
# NOTE, we always are referencing the remote peer!
|
||||||
|
self.aid = peer_aid
|
||||||
|
return peer_aid
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def _connect_chan(
|
async def _connect_chan(
|
||||||
|
|
Loading…
Reference in New Issue