diff --git a/tractor/_clustering.py b/tractor/_clustering.py index dbb50304..4f639239 100644 --- a/tractor/_clustering.py +++ b/tractor/_clustering.py @@ -66,7 +66,7 @@ async def open_actor_cluster( trio.open_nursery() as tn, tractor.trionics.maybe_raise_from_masking_exc() ): - uid = tractor.current_actor().uid + uid = tractor.current_actor().aid.uid async def _start(name: str) -> None: name = f'{uid[0]}.{name}' diff --git a/tractor/_context.py b/tractor/_context.py index 5a69077e..fa90759b 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -463,10 +463,11 @@ class Context: # self._cancel_called = val + # TODO, use the `Actor.aid: Aid` instead! @property def canceller(self) -> tuple[str, str]|None: ''' - `Actor.uid: tuple[str, str]` of the (remote) + `Actor.aid.uid: tuple[str, str]` of the (remote) actor-process who's task was cancelled thus causing this (side of the) context to also be cancelled. @@ -499,12 +500,12 @@ class Context: if from_uid := re.src_uid: from_uid: tuple = tuple(from_uid) - our_uid: tuple = self._actor.uid + our_uid: tuple = self._actor.aid.uid our_canceller = self.canceller return bool( isinstance((ctxc := re), ContextCancelled) - and from_uid == self.chan.uid + and from_uid == self.chan.aid.uid and ctxc.canceller == our_uid and our_canceller == our_uid ) @@ -515,7 +516,7 @@ class Context: Records whether the task on the remote side of this IPC context acknowledged a cancel request via a relayed `ContextCancelled` with the `.canceller` attr set to the - `Actor.uid` of the local actor who's task entered + `Actor.aid.uid` of the local actor who's task entered `Portal.open_context()`. This will only be `True` when `.cancel()` is called and @@ -789,8 +790,8 @@ class Context: # appropriately. log.runtime( 'Setting remote error for ctx\n\n' - f'<= {self.peer_side!r}: {self.chan.uid}\n' - f'=> {self.side!r}: {self._actor.uid}\n\n' + f'<= {self.peer_side!r}: {self.chan.aid.reprol()}\n' + f'=> {self.side!r}: {self._actor.aid.reprol()}\n\n' f'{error!r}' ) self._remote_error: BaseException = error @@ -811,7 +812,7 @@ class Context: # cancelled. # # !TODO, switching to `Actor.aid` here! - if (canc := error.canceller) == self._actor.uid: + if (canc := error.canceller) == self._actor.aid.uid: whom: str = 'us' self._canceller = canc else: @@ -1036,7 +1037,7 @@ class Context: --------- - after the far end cancels, the `.cancel()` calling side should receive a `ContextCancelled` with the - `.canceller: tuple` uid set to the current `Actor.uid`. + `.canceller: tuple` uid set to the current `Actor.aid.uid`. - timeout (quickly) on failure to rx this ACK error-msg in an attempt to sidestep 2-generals when the transport @@ -1065,9 +1066,9 @@ class Context: ) reminfo: str = ( # ' =>\n' - # f'Context.cancel() => {self.chan.uid}\n' + # f'Context.cancel() => {self.chan.aid.uid}\n' f'\n' - f'c)=> {self.chan.uid}\n' + f'c)=> {self.chan.aid.reprol()}\n' f' |_[{self.dst_maddr}\n' f' >> {self.repr_rpc}\n' # f' >> {self._nsf}() -> {codec}[dict]:\n\n' @@ -1211,7 +1212,7 @@ class Context: ''' __tracebackhide__: bool = hide_tb - peer_uid: tuple = self.chan.uid + peer_uid: tuple = self.chan.aid.uid # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption # for "graceful cancellation" case(s): @@ -1228,7 +1229,7 @@ class Context: # (`ContextCancelled`) as an expected # error-msg-is-cancellation-ack IFF said # `remote_error: ContextCancelled` has `.canceller` - # set to the `Actor.uid` of THIS task (i.e. the + # set to the `Actor.aid.uid` of THIS task (i.e. the # cancellation requesting task's actor is the actor # checking whether it should absorb the ctxc). self_ctxc: bool = self._is_self_cancelled(remote_error) @@ -1679,7 +1680,7 @@ class Context: elif self._started_called: raise RuntimeError( - f'called `.started()` twice on context with {self.chan.uid}' + f'called `.started()` twice on context with {self.chan.aid.uid}' ) started_msg = Started( @@ -1812,7 +1813,7 @@ class Context: ''' cid: str = self.cid chan: Channel = self.chan - from_uid: tuple[str, str] = chan.uid + from_uid: tuple[str, str] = chan.aid.uid send_chan: trio.MemorySendChannel = self._send_chan nsf: NamespacePath = self._nsf @@ -1953,20 +1954,22 @@ class Context: # overrun state and that msg isn't stuck in an # overflow queue what happens?!? - local_uid = self._actor.uid + local_aid = self._actor.aid txt: str = ( 'on IPC context:\n' f'<= sender: {from_uid}\n' f' |_ {self._nsf}()\n\n' - f'=> overrun: {local_uid}\n' + f'=> overrun: {local_aid.reprol()!r}\n' f' |_cid: {cid}\n' f' |_task: {self._task}\n' ) if not self._stream_opened: txt += ( - f'\n*** No stream open on `{local_uid[0]}` side! ***\n\n' + f'\n' + f'*** No stream open on `{local_aid.name}` side! ***\n' + f'\n' f'{msg}\n' ) @@ -2115,7 +2118,11 @@ async def open_context_from_portal( # XXX NOTE XXX: currenly we do NOT allow opening a contex # with "self" since the local feeder mem-chan processing # is not built for it. - if (uid := portal.channel.uid) == portal.actor.uid: + if ( + (uid := portal.channel.aid.uid) + == + portal.actor.aid.uid + ): raise RuntimeError( '** !! Invalid Operation !! **\n' 'Can not open an IPC ctx with the local actor!\n' @@ -2329,7 +2336,7 @@ async def open_context_from_portal( and ctxc is ctx._remote_error and - ctxc.canceller == portal.actor.uid + ctxc.canceller == portal.actor.aid.uid ): log.cancel( f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n' @@ -2406,7 +2413,7 @@ async def open_context_from_portal( logmeth(msg) if debug_mode(): - # async with debug.acquire_debug_lock(portal.actor.uid): + # async with debug.acquire_debug_lock(portal.actor.aid.uid): # pass # TODO: factor ^ into below for non-root cases? # diff --git a/tractor/_rpc.py b/tractor/_rpc.py index ac658cb2..cae92bad 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -252,8 +252,8 @@ async def _invoke_non_context( ): log.warning( 'Failed to send RPC result?\n' - f'|_{func}@{actor.uid}() -> {ret_msg}\n\n' - f'x=> peer: {chan.uid}\n' + f'|_{func}@{actor.aid.reprol()}() -> {ret_msg}\n\n' + f'x=> peer: {chan.aid.reprol()}\n' ) @acm @@ -698,7 +698,7 @@ async def _invoke( # which cancels the scope presuming the input error # is not a `.cancel_acked` pleaser. if rpc_ctx_cs.cancelled_caught: - our_uid: tuple = actor.uid + our_uid: tuple = actor.aid.uid # first check for and raise any remote error # before raising any context cancelled case @@ -730,7 +730,7 @@ async def _invoke( # TODO: determine if the ctx peer task was the # exact task which cancelled, vs. some other # task in the same actor. - elif canceller == ctx.chan.uid: + elif canceller == ctx.chan.aid.uid: explain += f'its {ctx.peer_side!r}-side peer' elif canceller == our_uid: @@ -825,7 +825,7 @@ async def _invoke( # associated child isn't in debug any more await debug.maybe_wait_for_debugger() ctx: Context = actor._contexts.pop(( - chan.uid, + chan.aid.uid, cid, )) @@ -927,7 +927,7 @@ async def try_ship_error_to_remote( log.critical( 'IPC transport failure -> ' f'failed to ship error to {remote_descr}!\n\n' - f'{type(msg)!r}[{msg.boxed_type_str}] X=> {channel.uid}\n' + f'{type(msg)!r}[{msg.boxed_type_str}] X=> {channel.aid.uid}\n' f'\n' # TODO: use `.msg.preetty_struct` for this! f'{msg}\n' @@ -1005,7 +1005,7 @@ async def process_messages( async for msg in chan: log.transport( # type: ignore f'IPC msg from peer\n' - f'<= {chan.uid}\n\n' + f'<= {chan.aid.reprol()}\n\n' # TODO: use of the pprinting of structs is # FRAGILE and should prolly not be @@ -1109,7 +1109,7 @@ async def process_messages( except BaseException: log.exception( 'Failed to cancel task?\n' - f'<= canceller: {chan.uid}\n' + f'<= canceller: {chan.aid.reprol()}\n' f' |_{chan}\n\n' f'=> {actor}\n' f' |_cid: {target_cid}\n' @@ -1264,7 +1264,7 @@ async def process_messages( log.transport( 'Waiting on next IPC msg from\n' - f'peer: {chan.uid}\n' + f'peer: {chan.aid.reprol()}\n' f'|_{chan}\n' ) @@ -1341,8 +1341,8 @@ async def process_messages( match err: case ContextCancelled(): log.cancel( - f'Actor: {actor.uid} was context-cancelled with,\n' - f'str(err)' + f'Actor: {actor.aid.reprol()!r} is ctxc with,\n' + f'{str(err)}' ) case _: log.exception("Actor errored:") diff --git a/tractor/_runtime.py b/tractor/_runtime.py index f77c69c1..93fb68fd 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -691,7 +691,7 @@ class Actor: ''' # ?TODO, use Aid here as well? - actor_uid = chan.uid + actor_uid = chan.aid.uid assert actor_uid try: ctx = self._contexts[( @@ -701,7 +701,7 @@ class Actor: )] log.debug( f'Retreived cached IPC ctx for\n' - f'peer: {chan.uid}\n' + f'peer: {chan.aid.uid}\n' f'cid:{cid}\n' ) ctx._allow_overruns: bool = allow_overruns @@ -718,7 +718,7 @@ class Actor: except KeyError: log.debug( f'Allocate new IPC ctx for\n' - f'peer: {chan.uid}\n' + f'peer: {chan.aid.uid}\n' f'cid: {cid}\n' ) ctx = mk_context( @@ -764,7 +764,7 @@ class Actor: ''' cid: str = str(uuid.uuid4()) - assert chan.uid + assert chan.aid.uid ctx = self.get_context( chan=chan, cid=cid, @@ -791,12 +791,12 @@ class Actor: ns=ns, func=func, kwargs=kwargs, - uid=self.uid, + uid=self.aid.uid, # <- !TODO use .aid! cid=cid, ) log.runtime( 'Sending RPC `Start`\n\n' - f'=> peer: {chan.uid}\n' + f'=> peer: {chan.aid.uid}\n' f' |_ {ns}.{func}({kwargs})\n\n' f'{pretty_struct.pformat(msg)}' @@ -1244,7 +1244,7 @@ class Actor: 'Cancel request for invalid RPC task.\n' 'The task likely already completed or was never started!\n\n' f'<= canceller: {requesting_aid}\n' - f'=> {cid}@{parent_chan.uid}\n' + f'=> {cid}@{parent_chan.aid.uid}\n' f' |_{parent_chan}\n' ) return True @@ -1381,7 +1381,7 @@ class Actor: f'Cancelling {descr} RPC tasks\n\n' f'<=c) {req_aid} [canceller]\n' f'{rent_chan_repr}' - f'c)=> {self.uid} [cancellee]\n' + f'c)=> {self.aid.uid} [cancellee]\n' f' |_{self} [with {len(tasks)} tasks]\n' # f' |_tasks: {len(tasks)}\n' # f'{tasks_str}' @@ -1687,7 +1687,7 @@ async def async_main( await reg_portal.run_from_ns( 'self', 'register_actor', - uid=actor.uid, + uid=actor.aid.uid, addr=accept_addr.unwrap(), ) @@ -1758,9 +1758,11 @@ async def async_main( # always! match internal_err: case ContextCancelled(): + reprol: str = actor.aid.reprol() log.cancel( - f'Actor: {actor.uid} was task-context-cancelled with,\n' - f'str(internal_err)' + f'Actor {reprol!r} was task-ctx-cancelled with,\n' + f'\n' + f'{internal_err!r}' ) case _: log.exception( @@ -1832,7 +1834,7 @@ async def async_main( await reg_portal.run_from_ns( 'self', 'unregister_actor', - uid=actor.uid + uid=actor.aid.uid, ) except OSError: failed = True diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 8d3c2cf6..01026ad9 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -151,7 +151,7 @@ async def exhaust_portal( __tracebackhide__ = True try: log.debug( - f'Waiting on final result from {actor.uid}' + f'Waiting on final result from {actor.aid.uid}' ) # XXX: streams should never be reaped here since they should @@ -210,17 +210,17 @@ async def cancel_on_completion( actor, ) if isinstance(result, Exception): - errors[actor.uid]: Exception = result + errors[actor.aid.uid]: Exception = result log.cancel( 'Cancelling subactor runtime due to error:\n\n' - f'Portal.cancel_actor() => {portal.channel.uid}\n\n' + f'Portal.cancel_actor() => {portal.channel.aid}\n\n' f'error: {result}\n' ) else: log.runtime( 'Cancelling subactor gracefully:\n\n' - f'Portal.cancel_actor() => {portal.channel.uid}\n\n' + f'Portal.cancel_actor() => {portal.channel.aid}\n\n' f'result: {result}\n' ) @@ -308,7 +308,7 @@ async def hard_kill( # ) # with trio.CancelScope(shield=True): # async with debug.acquire_debug_lock( - # subactor_uid=current_actor().uid, + # subactor_uid=current_actor().aid.uid, # ) as _ctx: # log.warning( # 'Acquired debug lock, child ready to be killed ??\n' @@ -483,7 +483,7 @@ async def trio_proc( # TODO, how to pass this over "wire" encodings like # cmdline args? # -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ? - str(subactor.uid), + str(subactor.aid.uid), # Address the child must connect to on startup "--parent_addr", str(parent_addr) @@ -514,7 +514,7 @@ async def trio_proc( # channel should have handshake completed by the # local actor by the time we get a ref to it event, chan = await ipc_server.wait_for_peer( - subactor.uid + subactor.aid.uid ) except trio.Cancelled: @@ -528,7 +528,9 @@ async def trio_proc( await debug.maybe_wait_for_debugger() elif proc is not None: - async with debug.acquire_debug_lock(subactor.uid): + async with debug.acquire_debug_lock( + subactor_uid=subactor.aid.uid + ): # soft wait on the proc to terminate with trio.move_on_after(0.5): await proc.wait() @@ -538,7 +540,7 @@ async def trio_proc( assert proc portal = Portal(chan) - actor_nursery._children[subactor.uid] = ( + actor_nursery._children[subactor.aid.uid] = ( subactor, proc, portal, @@ -563,7 +565,7 @@ async def trio_proc( # track subactor in current nursery curr_actor: Actor = current_actor() - curr_actor._actoruid2nursery[subactor.uid] = actor_nursery + curr_actor._actoruid2nursery[subactor.aid.uid] = actor_nursery # resume caller at next checkpoint now that child is up task_status.started(portal) @@ -616,7 +618,9 @@ async def trio_proc( # don't clobber an ongoing pdb if cancelled_during_spawn: # Try again to avoid TTY clobbering. - async with debug.acquire_debug_lock(subactor.uid): + async with debug.acquire_debug_lock( + subactor_uid=subactor.aid.uid + ): with trio.move_on_after(0.5): await proc.wait() @@ -662,7 +666,7 @@ async def trio_proc( if not cancelled_during_spawn: # pop child entry to indicate we no longer managing this # subactor - actor_nursery._children.pop(subactor.uid) + actor_nursery._children.pop(subactor.aid.uid) async def mp_proc( @@ -744,7 +748,7 @@ async def mp_proc( # register the process before start in case we get a cancel # request before the actor has fully spawned - then we can wait # for it to fully come up before sending a cancel request - actor_nursery._children[subactor.uid] = (subactor, proc, None) + actor_nursery._children[subactor.aid.uid] = (subactor, proc, None) proc.start() if not proc.is_alive(): @@ -758,7 +762,7 @@ async def mp_proc( # channel should have handshake completed by the # local actor by the time we get a ref to it event, chan = await ipc_server.wait_for_peer( - subactor.uid, + subactor.aid.uid, ) # XXX: monkey patch poll API to match the ``subprocess`` API.. @@ -771,7 +775,7 @@ async def mp_proc( # any process we may have started. portal = Portal(chan) - actor_nursery._children[subactor.uid] = (subactor, proc, portal) + actor_nursery._children[subactor.aid.uid] = (subactor, proc, portal) # unblock parent task task_status.started(portal) @@ -810,7 +814,7 @@ async def mp_proc( # tandem if not done already log.warning( "Cancelling existing result waiter task for " - f"{subactor.uid}") + f"{subactor.aid.uid}") nursery.cancel_scope.cancel() finally: @@ -828,7 +832,7 @@ async def mp_proc( log.debug(f"Joined {proc}") # pop child entry to indicate we are no longer managing subactor - actor_nursery._children.pop(subactor.uid) + actor_nursery._children.pop(subactor.aid.uid) # TODO: prolly report to ``mypy`` how this causes all sorts of # false errors.. diff --git a/tractor/_state.py b/tractor/_state.py index 86e3ea12..7a4e1242 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -104,7 +104,7 @@ def current_actor( msg += ( f'Apparently the lact active actor was\n' f'|_{last}\n' - f'|_{last.uid}\n' + f'|_{last.aid.uid}\n' ) # no actor runtime has (as of yet) ever been started for # this process. diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 301c44e8..5b0b60b7 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -391,15 +391,17 @@ class ActorNursery: else: if portal is None: # actor hasn't fully spawned yet - event: trio.Event = server._peer_connected[subactor.uid] + event: trio.Event = server._peer_connected[ + subactor.aid.uid + ] log.warning( - f"{subactor.uid} never 't finished spawning?" + f"{subactor.aid.uid} never 't finished spawning?" ) await event.wait() # channel/portal should now be up - _, _, portal = children[subactor.uid] + _, _, portal = children[subactor.aid.uid] # XXX should be impossible to get here # unless method was called from within @@ -407,7 +409,7 @@ class ActorNursery: if portal is None: # cancelled while waiting on the event # to arrive - chan = server._peers[subactor.uid][-1] + chan = server._peers[subactor.aid.uid][-1] if chan: portal = Portal(chan) else: # there's no other choice left @@ -506,7 +508,7 @@ async def _open_and_supervise_one_cancels_all_nursery( except BaseException as _inner_err: inner_err = _inner_err - errors[actor.uid] = inner_err + errors[actor.aid.uid] = inner_err # If we error in the root but the debugger is # engaged we don't want to prematurely kill (and @@ -539,7 +541,7 @@ async def _open_and_supervise_one_cancels_all_nursery( log.cancel( f'Actor-nursery cancelled by {etype}\n\n' - f'{current_actor().uid}\n' + f'{current_actor().aid.uid}\n' f' |_{an}\n\n' # TODO: show tb str? diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index 9d109f3f..932fc075 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -94,7 +94,7 @@ class Channel: self._transport: MsgTransport|None = transport # set after handshake - always info from peer end - self.aid: Aid|None = None + self._aid: Aid|None = None self._aiter_msgs = self._iter_msgs() self._exc: Exception|None = None @@ -122,6 +122,14 @@ class Channel: ''' return self._cancel_called + @property + def aid(self) -> Aid: + ''' + Peer actor's ID. + + ''' + return self._aid + @property def uid(self) -> tuple[str, str]: ''' @@ -505,7 +513,7 @@ class Channel: f'<= {peer_aid.reprol(sin_uuid=False)}\n' ) # NOTE, we always are referencing the remote peer! - self.aid = peer_aid + self._aid = peer_aid return peer_aid diff --git a/tractor/log.py b/tractor/log.py index 3ff5384b..71708224 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -293,7 +293,7 @@ _conc_name_getters = { 'task': pformat_task_uid, 'actor': lambda: _curr_actor_no_exc(), 'actor_name': lambda: current_actor().name, - 'actor_uid': lambda: current_actor().uid[1][:6], + 'actor_uid': lambda: current_actor().aid.uuid[:6], } diff --git a/tractor/msg/types.py b/tractor/msg/types.py index 6176ca90..ea314f00 100644 --- a/tractor/msg/types.py +++ b/tractor/msg/types.py @@ -324,6 +324,8 @@ class Start( # => SEE ABOVE <= kwargs: dict[str, Any] uid: tuple[str, str] # (calling) actor-id + # aid: Aid + # ^TODO, convert stat! # TODO: enforcing a msg-spec in terms `Msg.pld` # parameterizable msgs to be used in the appls IPC dialog.