Add an `Actor.pformat()`
And map `.__repr__/__str__` to it and add various new fields to fill it out, - drop `self.uid` as var and instead add `Actor._aid: Aid` and proxy to it for the various `.name/.uid/.pid` properties as well as a new `.aid` field. |_ the `Aid.pid` addition is also included. Other improvements, - flip to a sync call to `Address.close_listener()`. - track the `async_main()` parent task as `Actor._task`. - add exception logging around failure to bind due to already-in-use when calling `add.open_listener()` in `._stream_forever()`; sometimes the error might be overridden by something else during the runtime-failure unwind..leslies_extra_appendix
parent
46e775ce6d
commit
1cb2337c7c
|
@ -200,9 +200,14 @@ class Actor:
|
|||
phase (aka before a new process is executed).
|
||||
|
||||
'''
|
||||
self.name = name
|
||||
self.uid = (name, uuid)
|
||||
self._aid = msgtypes.Aid(
|
||||
name=name,
|
||||
uuid=uuid,
|
||||
pid=os.getpid(),
|
||||
)
|
||||
self._task: trio.Task|None = None
|
||||
|
||||
# state
|
||||
self._cancel_complete = trio.Event()
|
||||
self._cancel_called_by_remote: tuple[str, tuple]|None = None
|
||||
self._cancel_called: bool = False
|
||||
|
@ -281,6 +286,77 @@ class Actor:
|
|||
self.reg_addrs: list[UnwrappedAddress] = registry_addrs
|
||||
_state._runtime_vars['_registry_addrs'] = registry_addrs
|
||||
|
||||
@property
|
||||
def aid(self) -> msgtypes.Aid:
|
||||
'''
|
||||
This process-singleton-actor's "unique ID" in struct form.
|
||||
|
||||
'''
|
||||
return self._aid
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self._aid.name
|
||||
|
||||
@property
|
||||
def uid(self) -> tuple[str, str]:
|
||||
'''
|
||||
This process-singleton's "unique (cross-host) ID".
|
||||
|
||||
Delivered from the `.Aid.name/.uuid` fields as a `tuple` pair
|
||||
and should be multi-host unique despite a large distributed
|
||||
process plane.
|
||||
|
||||
'''
|
||||
return (
|
||||
self._aid.name,
|
||||
self._aid.uuid,
|
||||
)
|
||||
|
||||
@property
|
||||
def pid(self) -> int:
|
||||
return self._aid.pid
|
||||
|
||||
def pformat(self) -> str:
|
||||
ds: str = '='
|
||||
parent_uid: tuple|None = None
|
||||
if rent_chan := self._parent_chan:
|
||||
parent_uid = rent_chan.uid
|
||||
peers: list[tuple] = list(self._peer_connected)
|
||||
listen_addrs: str = pformat(self._listen_addrs)
|
||||
fmtstr: str = (
|
||||
f' |_id: {self.aid!r}\n'
|
||||
# f" aid{ds}{self.aid!r}\n"
|
||||
f" parent{ds}{parent_uid}\n"
|
||||
f'\n'
|
||||
f' |_ipc: {len(peers)!r} connected peers\n'
|
||||
f" peers{ds}{peers!r}\n"
|
||||
f" _listen_addrs{ds}'{listen_addrs}'\n"
|
||||
f" _listeners{ds}'{self._listeners}'\n"
|
||||
f'\n'
|
||||
f' |_rpc: {len(self._rpc_tasks)} tasks\n'
|
||||
f" ctxs{ds}{len(self._contexts)}\n"
|
||||
f'\n'
|
||||
f' |_runtime: ._task{ds}{self._task!r}\n'
|
||||
f' _spawn_method{ds}{self._spawn_method}\n'
|
||||
f' _actoruid2nursery{ds}{self._actoruid2nursery}\n'
|
||||
f' _forkserver_info{ds}{self._forkserver_info}\n'
|
||||
f'\n'
|
||||
f' |_state: "TODO: .repr_state()"\n'
|
||||
f' _cancel_complete{ds}{self._cancel_complete}\n'
|
||||
f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n'
|
||||
f' _cancel_called{ds}{self._cancel_called}\n'
|
||||
)
|
||||
return (
|
||||
'<Actor(\n'
|
||||
+
|
||||
fmtstr
|
||||
+
|
||||
')>\n'
|
||||
)
|
||||
|
||||
__repr__ = pformat
|
||||
|
||||
@property
|
||||
def reg_addrs(self) -> list[UnwrappedAddress]:
|
||||
'''
|
||||
|
@ -421,12 +497,19 @@ class Actor:
|
|||
try:
|
||||
uid: tuple|None = await self._do_handshake(chan)
|
||||
except (
|
||||
# we need this for ``msgspec`` for some reason?
|
||||
# for now, it's been put in the stream backend.
|
||||
TransportClosed,
|
||||
# ^XXX NOTE, the above wraps `trio` exc types raised
|
||||
# during various `SocketStream.send/receive_xx()` calls
|
||||
# under different fault conditions such as,
|
||||
#
|
||||
# trio.BrokenResourceError,
|
||||
# trio.ClosedResourceError,
|
||||
|
||||
TransportClosed,
|
||||
#
|
||||
# Inside our `.ipc._transport` layer we absorb and
|
||||
# re-raise our own `TransportClosed` exc such that this
|
||||
# higher level runtime code can only worry one
|
||||
# "kinda-error" that we expect to tolerate during
|
||||
# discovery-sys related pings, queires, DoS etc.
|
||||
):
|
||||
# XXX: This may propagate up from `Channel._aiter_recv()`
|
||||
# and `MsgpackStream._inter_packets()` on a read from the
|
||||
|
@ -1205,7 +1288,8 @@ class Actor:
|
|||
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
|
||||
) -> None:
|
||||
'''
|
||||
Start the IPC transport server, begin listening for new connections.
|
||||
Start the IPC transport server, begin listening/accepting new
|
||||
`trio.SocketStream` connections.
|
||||
|
||||
This will cause an actor to continue living (and thus
|
||||
blocking at the process/OS-thread level) until
|
||||
|
@ -1223,10 +1307,24 @@ class Actor:
|
|||
self._server_down = trio.Event()
|
||||
try:
|
||||
async with trio.open_nursery() as server_n:
|
||||
listeners: list[trio.abc.Listener] = [
|
||||
await addr.open_listener()
|
||||
for addr in listen_addrs
|
||||
]
|
||||
|
||||
listeners: list[trio.abc.Listener] = []
|
||||
for addr in listen_addrs:
|
||||
try:
|
||||
listener: trio.abc.Listener = await addr.open_listener()
|
||||
except OSError as oserr:
|
||||
if (
|
||||
'[Errno 98] Address already in use'
|
||||
in
|
||||
oserr.args[0]
|
||||
):
|
||||
log.exception(
|
||||
f'Address already in use?\n'
|
||||
f'{addr}\n'
|
||||
)
|
||||
raise
|
||||
listeners.append(listener)
|
||||
|
||||
await server_n.start(
|
||||
partial(
|
||||
trio.serve_listeners,
|
||||
|
@ -1249,8 +1347,10 @@ class Actor:
|
|||
task_status.started(server_n)
|
||||
|
||||
finally:
|
||||
addr: Address
|
||||
for addr in listen_addrs:
|
||||
await addr.close_listener()
|
||||
addr.close_listener()
|
||||
|
||||
# signal the server is down since nursery above terminated
|
||||
self._server_down.set()
|
||||
|
||||
|
@ -1717,6 +1817,8 @@ async def async_main(
|
|||
the actor's "runtime" and all thus all ongoing RPC tasks.
|
||||
|
||||
'''
|
||||
actor._task: trio.Task = trio.lowlevel.current_task()
|
||||
|
||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||
# on our debugger state.
|
||||
_debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT)
|
||||
|
@ -1726,18 +1828,17 @@ async def async_main(
|
|||
|
||||
# establish primary connection with immediate parent
|
||||
actor._parent_chan: Channel|None = None
|
||||
if parent_addr is not None:
|
||||
|
||||
if parent_addr is not None:
|
||||
(
|
||||
actor._parent_chan,
|
||||
set_accept_addr_says_rent,
|
||||
maybe_preferred_transports_says_rent,
|
||||
) = await actor._from_parent(parent_addr)
|
||||
|
||||
|
||||
accept_addrs: list[UnwrappedAddress] = []
|
||||
# either it's passed in because we're not a child or
|
||||
# because we're running in mp mode
|
||||
accept_addrs: list[UnwrappedAddress] = []
|
||||
if (
|
||||
set_accept_addr_says_rent
|
||||
and
|
||||
|
|
|
@ -143,6 +143,7 @@ class Aid(
|
|||
'''
|
||||
name: str
|
||||
uuid: str
|
||||
pid: int|None = None
|
||||
|
||||
# TODO? can/should we extend this field set?
|
||||
# -[ ] use built-in support for UUIDs? `uuid.UUID` which has
|
||||
|
|
Loading…
Reference in New Issue