Use `.aid.uid` to avoid deprecation warns
I started getting annoyed by all the warnings from `pytest` during work on macos suport in CI, so this replaces all `Actor.uid`/`Channel.uid` accesses with `.aid.uid` (or `.aid.reprol()` for log msgs) across the core runtime and IPC subsystems to avoid the noise. This also provides incentive to start the adjustment to all `.uid`-holding/tracking internal `dict`-tables/data-structures to instead use `.msg.types.Aid`. Hopefully that will come a (vibed?) follow up shortly B) Deats, - `._context`: swap all `self._actor.uid`, `self.chan.uid`, and `portal.actor.uid` refs to `.aid.uid`; use `.aid.reprol()` for log/error formatting. - `._rpc`: same treatment for `actor.uid`, `chan.uid` in log msgs and cancel-scope handling; fix `str(err)` typo in `ContextCancelled` log. - `._runtime`: update `chan.uid` -> `chan.aid.uid` in ctx cache lookups, RPC `Start` msg, registration and cancel-request handling; improve ctxc log formatting. - `._spawn`: replace all `subactor.uid` with `.aid.uid` for child-proc tracking, IPC peer waiting, debug-lock acquisition, and nursery child dict ops. - `._supervise`: same for `subactor.uid` in cancel and portal-wait paths; use `actor.aid.uid` for error dict. - `._state`: fix `last.uid` -> `last.aid.uid` in `current_actor()` error msg. Also, - `._chan`: make `Channel.aid` a proper `@property` backed by `._aid` so we can add validation/typing later. - `.log`: use `current_actor().aid.uuid` instead of `.uid[1]` for actor-uid log field. - `.msg.types`: add TODO comment for `Start.aid` field conversion. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-codens_aware
parent
f84ef44992
commit
5ea721683b
|
|
@ -66,7 +66,7 @@ async def open_actor_cluster(
|
|||
trio.open_nursery() as tn,
|
||||
tractor.trionics.maybe_raise_from_masking_exc()
|
||||
):
|
||||
uid = tractor.current_actor().uid
|
||||
uid = tractor.current_actor().aid.uid
|
||||
|
||||
async def _start(name: str) -> None:
|
||||
name = f'{uid[0]}.{name}'
|
||||
|
|
|
|||
|
|
@ -463,10 +463,11 @@ class Context:
|
|||
|
||||
# self._cancel_called = val
|
||||
|
||||
# TODO, use the `Actor.aid: Aid` instead!
|
||||
@property
|
||||
def canceller(self) -> tuple[str, str]|None:
|
||||
'''
|
||||
`Actor.uid: tuple[str, str]` of the (remote)
|
||||
`Actor.aid.uid: tuple[str, str]` of the (remote)
|
||||
actor-process who's task was cancelled thus causing this
|
||||
(side of the) context to also be cancelled.
|
||||
|
||||
|
|
@ -499,12 +500,12 @@ class Context:
|
|||
if from_uid := re.src_uid:
|
||||
from_uid: tuple = tuple(from_uid)
|
||||
|
||||
our_uid: tuple = self._actor.uid
|
||||
our_uid: tuple = self._actor.aid.uid
|
||||
our_canceller = self.canceller
|
||||
|
||||
return bool(
|
||||
isinstance((ctxc := re), ContextCancelled)
|
||||
and from_uid == self.chan.uid
|
||||
and from_uid == self.chan.aid.uid
|
||||
and ctxc.canceller == our_uid
|
||||
and our_canceller == our_uid
|
||||
)
|
||||
|
|
@ -515,7 +516,7 @@ class Context:
|
|||
Records whether the task on the remote side of this IPC
|
||||
context acknowledged a cancel request via a relayed
|
||||
`ContextCancelled` with the `.canceller` attr set to the
|
||||
`Actor.uid` of the local actor who's task entered
|
||||
`Actor.aid.uid` of the local actor who's task entered
|
||||
`Portal.open_context()`.
|
||||
|
||||
This will only be `True` when `.cancel()` is called and
|
||||
|
|
@ -789,8 +790,8 @@ class Context:
|
|||
# appropriately.
|
||||
log.runtime(
|
||||
'Setting remote error for ctx\n\n'
|
||||
f'<= {self.peer_side!r}: {self.chan.uid}\n'
|
||||
f'=> {self.side!r}: {self._actor.uid}\n\n'
|
||||
f'<= {self.peer_side!r}: {self.chan.aid.reprol()}\n'
|
||||
f'=> {self.side!r}: {self._actor.aid.reprol()}\n\n'
|
||||
f'{error!r}'
|
||||
)
|
||||
self._remote_error: BaseException = error
|
||||
|
|
@ -811,7 +812,7 @@ class Context:
|
|||
# cancelled.
|
||||
#
|
||||
# !TODO, switching to `Actor.aid` here!
|
||||
if (canc := error.canceller) == self._actor.uid:
|
||||
if (canc := error.canceller) == self._actor.aid.uid:
|
||||
whom: str = 'us'
|
||||
self._canceller = canc
|
||||
else:
|
||||
|
|
@ -1036,7 +1037,7 @@ class Context:
|
|||
---------
|
||||
- after the far end cancels, the `.cancel()` calling side
|
||||
should receive a `ContextCancelled` with the
|
||||
`.canceller: tuple` uid set to the current `Actor.uid`.
|
||||
`.canceller: tuple` uid set to the current `Actor.aid.uid`.
|
||||
|
||||
- timeout (quickly) on failure to rx this ACK error-msg in
|
||||
an attempt to sidestep 2-generals when the transport
|
||||
|
|
@ -1065,9 +1066,9 @@ class Context:
|
|||
)
|
||||
reminfo: str = (
|
||||
# ' =>\n'
|
||||
# f'Context.cancel() => {self.chan.uid}\n'
|
||||
# f'Context.cancel() => {self.chan.aid.uid}\n'
|
||||
f'\n'
|
||||
f'c)=> {self.chan.uid}\n'
|
||||
f'c)=> {self.chan.aid.reprol()}\n'
|
||||
f' |_[{self.dst_maddr}\n'
|
||||
f' >> {self.repr_rpc}\n'
|
||||
# f' >> {self._nsf}() -> {codec}[dict]:\n\n'
|
||||
|
|
@ -1211,7 +1212,7 @@ class Context:
|
|||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
peer_uid: tuple = self.chan.uid
|
||||
peer_uid: tuple = self.chan.aid.uid
|
||||
|
||||
# XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
|
||||
# for "graceful cancellation" case(s):
|
||||
|
|
@ -1228,7 +1229,7 @@ class Context:
|
|||
# (`ContextCancelled`) as an expected
|
||||
# error-msg-is-cancellation-ack IFF said
|
||||
# `remote_error: ContextCancelled` has `.canceller`
|
||||
# set to the `Actor.uid` of THIS task (i.e. the
|
||||
# set to the `Actor.aid.uid` of THIS task (i.e. the
|
||||
# cancellation requesting task's actor is the actor
|
||||
# checking whether it should absorb the ctxc).
|
||||
self_ctxc: bool = self._is_self_cancelled(remote_error)
|
||||
|
|
@ -1679,7 +1680,7 @@ class Context:
|
|||
|
||||
elif self._started_called:
|
||||
raise RuntimeError(
|
||||
f'called `.started()` twice on context with {self.chan.uid}'
|
||||
f'called `.started()` twice on context with {self.chan.aid.uid}'
|
||||
)
|
||||
|
||||
started_msg = Started(
|
||||
|
|
@ -1812,7 +1813,7 @@ class Context:
|
|||
'''
|
||||
cid: str = self.cid
|
||||
chan: Channel = self.chan
|
||||
from_uid: tuple[str, str] = chan.uid
|
||||
from_uid: tuple[str, str] = chan.aid.uid
|
||||
send_chan: trio.MemorySendChannel = self._send_chan
|
||||
nsf: NamespacePath = self._nsf
|
||||
|
||||
|
|
@ -1953,20 +1954,22 @@ class Context:
|
|||
# overrun state and that msg isn't stuck in an
|
||||
# overflow queue what happens?!?
|
||||
|
||||
local_uid = self._actor.uid
|
||||
local_aid = self._actor.aid
|
||||
txt: str = (
|
||||
'on IPC context:\n'
|
||||
|
||||
f'<= sender: {from_uid}\n'
|
||||
f' |_ {self._nsf}()\n\n'
|
||||
|
||||
f'=> overrun: {local_uid}\n'
|
||||
f'=> overrun: {local_aid.reprol()!r}\n'
|
||||
f' |_cid: {cid}\n'
|
||||
f' |_task: {self._task}\n'
|
||||
)
|
||||
if not self._stream_opened:
|
||||
txt += (
|
||||
f'\n*** No stream open on `{local_uid[0]}` side! ***\n\n'
|
||||
f'\n'
|
||||
f'*** No stream open on `{local_aid.name}` side! ***\n'
|
||||
f'\n'
|
||||
f'{msg}\n'
|
||||
)
|
||||
|
||||
|
|
@ -2115,7 +2118,11 @@ async def open_context_from_portal(
|
|||
# XXX NOTE XXX: currenly we do NOT allow opening a contex
|
||||
# with "self" since the local feeder mem-chan processing
|
||||
# is not built for it.
|
||||
if (uid := portal.channel.uid) == portal.actor.uid:
|
||||
if (
|
||||
(uid := portal.channel.aid.uid)
|
||||
==
|
||||
portal.actor.aid.uid
|
||||
):
|
||||
raise RuntimeError(
|
||||
'** !! Invalid Operation !! **\n'
|
||||
'Can not open an IPC ctx with the local actor!\n'
|
||||
|
|
@ -2329,7 +2336,7 @@ async def open_context_from_portal(
|
|||
and
|
||||
ctxc is ctx._remote_error
|
||||
and
|
||||
ctxc.canceller == portal.actor.uid
|
||||
ctxc.canceller == portal.actor.aid.uid
|
||||
):
|
||||
log.cancel(
|
||||
f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n'
|
||||
|
|
@ -2406,7 +2413,7 @@ async def open_context_from_portal(
|
|||
logmeth(msg)
|
||||
|
||||
if debug_mode():
|
||||
# async with debug.acquire_debug_lock(portal.actor.uid):
|
||||
# async with debug.acquire_debug_lock(portal.actor.aid.uid):
|
||||
# pass
|
||||
# TODO: factor ^ into below for non-root cases?
|
||||
#
|
||||
|
|
|
|||
|
|
@ -252,8 +252,8 @@ async def _invoke_non_context(
|
|||
):
|
||||
log.warning(
|
||||
'Failed to send RPC result?\n'
|
||||
f'|_{func}@{actor.uid}() -> {ret_msg}\n\n'
|
||||
f'x=> peer: {chan.uid}\n'
|
||||
f'|_{func}@{actor.aid.reprol()}() -> {ret_msg}\n\n'
|
||||
f'x=> peer: {chan.aid.reprol()}\n'
|
||||
)
|
||||
|
||||
@acm
|
||||
|
|
@ -698,7 +698,7 @@ async def _invoke(
|
|||
# which cancels the scope presuming the input error
|
||||
# is not a `.cancel_acked` pleaser.
|
||||
if rpc_ctx_cs.cancelled_caught:
|
||||
our_uid: tuple = actor.uid
|
||||
our_uid: tuple = actor.aid.uid
|
||||
|
||||
# first check for and raise any remote error
|
||||
# before raising any context cancelled case
|
||||
|
|
@ -730,7 +730,7 @@ async def _invoke(
|
|||
# TODO: determine if the ctx peer task was the
|
||||
# exact task which cancelled, vs. some other
|
||||
# task in the same actor.
|
||||
elif canceller == ctx.chan.uid:
|
||||
elif canceller == ctx.chan.aid.uid:
|
||||
explain += f'its {ctx.peer_side!r}-side peer'
|
||||
|
||||
elif canceller == our_uid:
|
||||
|
|
@ -825,7 +825,7 @@ async def _invoke(
|
|||
# associated child isn't in debug any more
|
||||
await debug.maybe_wait_for_debugger()
|
||||
ctx: Context = actor._contexts.pop((
|
||||
chan.uid,
|
||||
chan.aid.uid,
|
||||
cid,
|
||||
))
|
||||
|
||||
|
|
@ -927,7 +927,7 @@ async def try_ship_error_to_remote(
|
|||
log.critical(
|
||||
'IPC transport failure -> '
|
||||
f'failed to ship error to {remote_descr}!\n\n'
|
||||
f'{type(msg)!r}[{msg.boxed_type_str}] X=> {channel.uid}\n'
|
||||
f'{type(msg)!r}[{msg.boxed_type_str}] X=> {channel.aid.uid}\n'
|
||||
f'\n'
|
||||
# TODO: use `.msg.preetty_struct` for this!
|
||||
f'{msg}\n'
|
||||
|
|
@ -1005,7 +1005,7 @@ async def process_messages(
|
|||
async for msg in chan:
|
||||
log.transport( # type: ignore
|
||||
f'IPC msg from peer\n'
|
||||
f'<= {chan.uid}\n\n'
|
||||
f'<= {chan.aid.reprol()}\n\n'
|
||||
|
||||
# TODO: use of the pprinting of structs is
|
||||
# FRAGILE and should prolly not be
|
||||
|
|
@ -1109,7 +1109,7 @@ async def process_messages(
|
|||
except BaseException:
|
||||
log.exception(
|
||||
'Failed to cancel task?\n'
|
||||
f'<= canceller: {chan.uid}\n'
|
||||
f'<= canceller: {chan.aid.reprol()}\n'
|
||||
f' |_{chan}\n\n'
|
||||
f'=> {actor}\n'
|
||||
f' |_cid: {target_cid}\n'
|
||||
|
|
@ -1264,7 +1264,7 @@ async def process_messages(
|
|||
|
||||
log.transport(
|
||||
'Waiting on next IPC msg from\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'peer: {chan.aid.reprol()}\n'
|
||||
f'|_{chan}\n'
|
||||
)
|
||||
|
||||
|
|
@ -1341,8 +1341,8 @@ async def process_messages(
|
|||
match err:
|
||||
case ContextCancelled():
|
||||
log.cancel(
|
||||
f'Actor: {actor.uid} was context-cancelled with,\n'
|
||||
f'str(err)'
|
||||
f'Actor: {actor.aid.reprol()!r} is ctxc with,\n'
|
||||
f'{str(err)}'
|
||||
)
|
||||
case _:
|
||||
log.exception("Actor errored:")
|
||||
|
|
|
|||
|
|
@ -691,7 +691,7 @@ class Actor:
|
|||
|
||||
'''
|
||||
# ?TODO, use Aid here as well?
|
||||
actor_uid = chan.uid
|
||||
actor_uid = chan.aid.uid
|
||||
assert actor_uid
|
||||
try:
|
||||
ctx = self._contexts[(
|
||||
|
|
@ -701,7 +701,7 @@ class Actor:
|
|||
)]
|
||||
log.debug(
|
||||
f'Retreived cached IPC ctx for\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'peer: {chan.aid.uid}\n'
|
||||
f'cid:{cid}\n'
|
||||
)
|
||||
ctx._allow_overruns: bool = allow_overruns
|
||||
|
|
@ -718,7 +718,7 @@ class Actor:
|
|||
except KeyError:
|
||||
log.debug(
|
||||
f'Allocate new IPC ctx for\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'peer: {chan.aid.uid}\n'
|
||||
f'cid: {cid}\n'
|
||||
)
|
||||
ctx = mk_context(
|
||||
|
|
@ -764,7 +764,7 @@ class Actor:
|
|||
|
||||
'''
|
||||
cid: str = str(uuid.uuid4())
|
||||
assert chan.uid
|
||||
assert chan.aid.uid
|
||||
ctx = self.get_context(
|
||||
chan=chan,
|
||||
cid=cid,
|
||||
|
|
@ -791,12 +791,12 @@ class Actor:
|
|||
ns=ns,
|
||||
func=func,
|
||||
kwargs=kwargs,
|
||||
uid=self.uid,
|
||||
uid=self.aid.uid, # <- !TODO use .aid!
|
||||
cid=cid,
|
||||
)
|
||||
log.runtime(
|
||||
'Sending RPC `Start`\n\n'
|
||||
f'=> peer: {chan.uid}\n'
|
||||
f'=> peer: {chan.aid.uid}\n'
|
||||
f' |_ {ns}.{func}({kwargs})\n\n'
|
||||
|
||||
f'{pretty_struct.pformat(msg)}'
|
||||
|
|
@ -1244,7 +1244,7 @@ class Actor:
|
|||
'Cancel request for invalid RPC task.\n'
|
||||
'The task likely already completed or was never started!\n\n'
|
||||
f'<= canceller: {requesting_aid}\n'
|
||||
f'=> {cid}@{parent_chan.uid}\n'
|
||||
f'=> {cid}@{parent_chan.aid.uid}\n'
|
||||
f' |_{parent_chan}\n'
|
||||
)
|
||||
return True
|
||||
|
|
@ -1381,7 +1381,7 @@ class Actor:
|
|||
f'Cancelling {descr} RPC tasks\n\n'
|
||||
f'<=c) {req_aid} [canceller]\n'
|
||||
f'{rent_chan_repr}'
|
||||
f'c)=> {self.uid} [cancellee]\n'
|
||||
f'c)=> {self.aid.uid} [cancellee]\n'
|
||||
f' |_{self} [with {len(tasks)} tasks]\n'
|
||||
# f' |_tasks: {len(tasks)}\n'
|
||||
# f'{tasks_str}'
|
||||
|
|
@ -1687,7 +1687,7 @@ async def async_main(
|
|||
await reg_portal.run_from_ns(
|
||||
'self',
|
||||
'register_actor',
|
||||
uid=actor.uid,
|
||||
uid=actor.aid.uid,
|
||||
addr=accept_addr.unwrap(),
|
||||
)
|
||||
|
||||
|
|
@ -1758,9 +1758,11 @@ async def async_main(
|
|||
# always!
|
||||
match internal_err:
|
||||
case ContextCancelled():
|
||||
reprol: str = actor.aid.reprol()
|
||||
log.cancel(
|
||||
f'Actor: {actor.uid} was task-context-cancelled with,\n'
|
||||
f'str(internal_err)'
|
||||
f'Actor {reprol!r} was task-ctx-cancelled with,\n'
|
||||
f'\n'
|
||||
f'{internal_err!r}'
|
||||
)
|
||||
case _:
|
||||
log.exception(
|
||||
|
|
@ -1832,7 +1834,7 @@ async def async_main(
|
|||
await reg_portal.run_from_ns(
|
||||
'self',
|
||||
'unregister_actor',
|
||||
uid=actor.uid
|
||||
uid=actor.aid.uid,
|
||||
)
|
||||
except OSError:
|
||||
failed = True
|
||||
|
|
|
|||
|
|
@ -151,7 +151,7 @@ async def exhaust_portal(
|
|||
__tracebackhide__ = True
|
||||
try:
|
||||
log.debug(
|
||||
f'Waiting on final result from {actor.uid}'
|
||||
f'Waiting on final result from {actor.aid.uid}'
|
||||
)
|
||||
|
||||
# XXX: streams should never be reaped here since they should
|
||||
|
|
@ -210,17 +210,17 @@ async def cancel_on_completion(
|
|||
actor,
|
||||
)
|
||||
if isinstance(result, Exception):
|
||||
errors[actor.uid]: Exception = result
|
||||
errors[actor.aid.uid]: Exception = result
|
||||
log.cancel(
|
||||
'Cancelling subactor runtime due to error:\n\n'
|
||||
f'Portal.cancel_actor() => {portal.channel.uid}\n\n'
|
||||
f'Portal.cancel_actor() => {portal.channel.aid}\n\n'
|
||||
f'error: {result}\n'
|
||||
)
|
||||
|
||||
else:
|
||||
log.runtime(
|
||||
'Cancelling subactor gracefully:\n\n'
|
||||
f'Portal.cancel_actor() => {portal.channel.uid}\n\n'
|
||||
f'Portal.cancel_actor() => {portal.channel.aid}\n\n'
|
||||
f'result: {result}\n'
|
||||
)
|
||||
|
||||
|
|
@ -308,7 +308,7 @@ async def hard_kill(
|
|||
# )
|
||||
# with trio.CancelScope(shield=True):
|
||||
# async with debug.acquire_debug_lock(
|
||||
# subactor_uid=current_actor().uid,
|
||||
# subactor_uid=current_actor().aid.uid,
|
||||
# ) as _ctx:
|
||||
# log.warning(
|
||||
# 'Acquired debug lock, child ready to be killed ??\n'
|
||||
|
|
@ -483,7 +483,7 @@ async def trio_proc(
|
|||
# TODO, how to pass this over "wire" encodings like
|
||||
# cmdline args?
|
||||
# -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ?
|
||||
str(subactor.uid),
|
||||
str(subactor.aid.uid),
|
||||
# Address the child must connect to on startup
|
||||
"--parent_addr",
|
||||
str(parent_addr)
|
||||
|
|
@ -514,7 +514,7 @@ async def trio_proc(
|
|||
# channel should have handshake completed by the
|
||||
# local actor by the time we get a ref to it
|
||||
event, chan = await ipc_server.wait_for_peer(
|
||||
subactor.uid
|
||||
subactor.aid.uid
|
||||
)
|
||||
|
||||
except trio.Cancelled:
|
||||
|
|
@ -528,7 +528,9 @@ async def trio_proc(
|
|||
await debug.maybe_wait_for_debugger()
|
||||
|
||||
elif proc is not None:
|
||||
async with debug.acquire_debug_lock(subactor.uid):
|
||||
async with debug.acquire_debug_lock(
|
||||
subactor_uid=subactor.aid.uid
|
||||
):
|
||||
# soft wait on the proc to terminate
|
||||
with trio.move_on_after(0.5):
|
||||
await proc.wait()
|
||||
|
|
@ -538,7 +540,7 @@ async def trio_proc(
|
|||
assert proc
|
||||
|
||||
portal = Portal(chan)
|
||||
actor_nursery._children[subactor.uid] = (
|
||||
actor_nursery._children[subactor.aid.uid] = (
|
||||
subactor,
|
||||
proc,
|
||||
portal,
|
||||
|
|
@ -563,7 +565,7 @@ async def trio_proc(
|
|||
|
||||
# track subactor in current nursery
|
||||
curr_actor: Actor = current_actor()
|
||||
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
|
||||
curr_actor._actoruid2nursery[subactor.aid.uid] = actor_nursery
|
||||
|
||||
# resume caller at next checkpoint now that child is up
|
||||
task_status.started(portal)
|
||||
|
|
@ -616,7 +618,9 @@ async def trio_proc(
|
|||
# don't clobber an ongoing pdb
|
||||
if cancelled_during_spawn:
|
||||
# Try again to avoid TTY clobbering.
|
||||
async with debug.acquire_debug_lock(subactor.uid):
|
||||
async with debug.acquire_debug_lock(
|
||||
subactor_uid=subactor.aid.uid
|
||||
):
|
||||
with trio.move_on_after(0.5):
|
||||
await proc.wait()
|
||||
|
||||
|
|
@ -662,7 +666,7 @@ async def trio_proc(
|
|||
if not cancelled_during_spawn:
|
||||
# pop child entry to indicate we no longer managing this
|
||||
# subactor
|
||||
actor_nursery._children.pop(subactor.uid)
|
||||
actor_nursery._children.pop(subactor.aid.uid)
|
||||
|
||||
|
||||
async def mp_proc(
|
||||
|
|
@ -744,7 +748,7 @@ async def mp_proc(
|
|||
# register the process before start in case we get a cancel
|
||||
# request before the actor has fully spawned - then we can wait
|
||||
# for it to fully come up before sending a cancel request
|
||||
actor_nursery._children[subactor.uid] = (subactor, proc, None)
|
||||
actor_nursery._children[subactor.aid.uid] = (subactor, proc, None)
|
||||
|
||||
proc.start()
|
||||
if not proc.is_alive():
|
||||
|
|
@ -758,7 +762,7 @@ async def mp_proc(
|
|||
# channel should have handshake completed by the
|
||||
# local actor by the time we get a ref to it
|
||||
event, chan = await ipc_server.wait_for_peer(
|
||||
subactor.uid,
|
||||
subactor.aid.uid,
|
||||
)
|
||||
|
||||
# XXX: monkey patch poll API to match the ``subprocess`` API..
|
||||
|
|
@ -771,7 +775,7 @@ async def mp_proc(
|
|||
# any process we may have started.
|
||||
|
||||
portal = Portal(chan)
|
||||
actor_nursery._children[subactor.uid] = (subactor, proc, portal)
|
||||
actor_nursery._children[subactor.aid.uid] = (subactor, proc, portal)
|
||||
|
||||
# unblock parent task
|
||||
task_status.started(portal)
|
||||
|
|
@ -810,7 +814,7 @@ async def mp_proc(
|
|||
# tandem if not done already
|
||||
log.warning(
|
||||
"Cancelling existing result waiter task for "
|
||||
f"{subactor.uid}")
|
||||
f"{subactor.aid.uid}")
|
||||
nursery.cancel_scope.cancel()
|
||||
|
||||
finally:
|
||||
|
|
@ -828,7 +832,7 @@ async def mp_proc(
|
|||
log.debug(f"Joined {proc}")
|
||||
|
||||
# pop child entry to indicate we are no longer managing subactor
|
||||
actor_nursery._children.pop(subactor.uid)
|
||||
actor_nursery._children.pop(subactor.aid.uid)
|
||||
|
||||
# TODO: prolly report to ``mypy`` how this causes all sorts of
|
||||
# false errors..
|
||||
|
|
|
|||
|
|
@ -104,7 +104,7 @@ def current_actor(
|
|||
msg += (
|
||||
f'Apparently the lact active actor was\n'
|
||||
f'|_{last}\n'
|
||||
f'|_{last.uid}\n'
|
||||
f'|_{last.aid.uid}\n'
|
||||
)
|
||||
# no actor runtime has (as of yet) ever been started for
|
||||
# this process.
|
||||
|
|
|
|||
|
|
@ -391,15 +391,17 @@ class ActorNursery:
|
|||
|
||||
else:
|
||||
if portal is None: # actor hasn't fully spawned yet
|
||||
event: trio.Event = server._peer_connected[subactor.uid]
|
||||
event: trio.Event = server._peer_connected[
|
||||
subactor.aid.uid
|
||||
]
|
||||
log.warning(
|
||||
f"{subactor.uid} never 't finished spawning?"
|
||||
f"{subactor.aid.uid} never 't finished spawning?"
|
||||
)
|
||||
|
||||
await event.wait()
|
||||
|
||||
# channel/portal should now be up
|
||||
_, _, portal = children[subactor.uid]
|
||||
_, _, portal = children[subactor.aid.uid]
|
||||
|
||||
# XXX should be impossible to get here
|
||||
# unless method was called from within
|
||||
|
|
@ -407,7 +409,7 @@ class ActorNursery:
|
|||
if portal is None:
|
||||
# cancelled while waiting on the event
|
||||
# to arrive
|
||||
chan = server._peers[subactor.uid][-1]
|
||||
chan = server._peers[subactor.aid.uid][-1]
|
||||
if chan:
|
||||
portal = Portal(chan)
|
||||
else: # there's no other choice left
|
||||
|
|
@ -506,7 +508,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
|
||||
except BaseException as _inner_err:
|
||||
inner_err = _inner_err
|
||||
errors[actor.uid] = inner_err
|
||||
errors[actor.aid.uid] = inner_err
|
||||
|
||||
# If we error in the root but the debugger is
|
||||
# engaged we don't want to prematurely kill (and
|
||||
|
|
@ -539,7 +541,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
log.cancel(
|
||||
f'Actor-nursery cancelled by {etype}\n\n'
|
||||
|
||||
f'{current_actor().uid}\n'
|
||||
f'{current_actor().aid.uid}\n'
|
||||
f' |_{an}\n\n'
|
||||
|
||||
# TODO: show tb str?
|
||||
|
|
|
|||
|
|
@ -94,7 +94,7 @@ class Channel:
|
|||
self._transport: MsgTransport|None = transport
|
||||
|
||||
# set after handshake - always info from peer end
|
||||
self.aid: Aid|None = None
|
||||
self._aid: Aid|None = None
|
||||
|
||||
self._aiter_msgs = self._iter_msgs()
|
||||
self._exc: Exception|None = None
|
||||
|
|
@ -122,6 +122,14 @@ class Channel:
|
|||
'''
|
||||
return self._cancel_called
|
||||
|
||||
@property
|
||||
def aid(self) -> Aid:
|
||||
'''
|
||||
Peer actor's ID.
|
||||
|
||||
'''
|
||||
return self._aid
|
||||
|
||||
@property
|
||||
def uid(self) -> tuple[str, str]:
|
||||
'''
|
||||
|
|
@ -505,7 +513,7 @@ class Channel:
|
|||
f'<= {peer_aid.reprol(sin_uuid=False)}\n'
|
||||
)
|
||||
# NOTE, we always are referencing the remote peer!
|
||||
self.aid = peer_aid
|
||||
self._aid = peer_aid
|
||||
return peer_aid
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -293,7 +293,7 @@ _conc_name_getters = {
|
|||
'task': pformat_task_uid,
|
||||
'actor': lambda: _curr_actor_no_exc(),
|
||||
'actor_name': lambda: current_actor().name,
|
||||
'actor_uid': lambda: current_actor().uid[1][:6],
|
||||
'actor_uid': lambda: current_actor().aid.uuid[:6],
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -324,6 +324,8 @@ class Start(
|
|||
# => SEE ABOVE <=
|
||||
kwargs: dict[str, Any]
|
||||
uid: tuple[str, str] # (calling) actor-id
|
||||
# aid: Aid
|
||||
# ^TODO, convert stat!
|
||||
|
||||
# TODO: enforcing a msg-spec in terms `Msg.pld`
|
||||
# parameterizable msgs to be used in the appls IPC dialog.
|
||||
|
|
|
|||
Loading…
Reference in New Issue