diff --git a/tractor/_actor.py b/tractor/_actor.py index 1b38d88..4fa3808 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -23,7 +23,6 @@ from ._exceptions import ( from ._portal import ( Portal, open_portal, - _do_handshake, LocalPortal, ) from . import _state @@ -50,7 +49,8 @@ async def _invoke( sig = inspect.signature(func) treat_as_gen = False cs = None - ctx = Context(chan, cid) + cancel_scope = trio.CancelScope() + ctx = Context(chan, cid, cancel_scope) if 'ctx' in sig.parameters: kwargs['ctx'] = ctx # TODO: eventually we want to be more stringent @@ -73,7 +73,7 @@ async def _invoke( not is_async_gen_partial ): await chan.send({'functype': 'function', 'cid': cid}) - with trio.CancelScope() as cs: + with cancel_scope as cs: task_status.started(cs) await chan.send({'return': func(**kwargs), 'cid': cid}) else: @@ -88,7 +88,7 @@ async def _invoke( # have to properly handle the closing (aclosing) # of the async gen in order to be sure the cancel # is propagated! - with trio.CancelScope() as cs: + with cancel_scope as cs: task_status.started(cs) async with aclosing(coro) as agen: async for item in agen: @@ -113,7 +113,7 @@ async def _invoke( # back values like an async-generator would but must # manualy construct the response dict-packet-responses as # above - with trio.CancelScope() as cs: + with cancel_scope as cs: task_status.started(cs) await coro if not cs.cancelled_caught: @@ -122,7 +122,7 @@ async def _invoke( await chan.send({'stop': True, 'cid': cid}) else: await chan.send({'functype': 'asyncfunction', 'cid': cid}) - with trio.CancelScope() as cs: + with cancel_scope as cs: task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) except Exception as err: @@ -174,7 +174,7 @@ class Actor: arbiter_addr: Optional[Tuple[str, int]] = None, ) -> None: self.name = name - self.uid = (name, uid or str(uuid.uuid1())) + self.uid = (name, uid or str(uuid.uuid4())) self.rpc_module_paths = rpc_module_paths self._mods: dict = {} # TODO: consider making this a dynamically defined @@ -247,7 +247,7 @@ class Actor: # send/receive initial handshake response try: - uid = await _do_handshake(self, chan) + uid = await self._do_handshake(chan) except StopAsyncIteration: log.warning(f"Channel {chan} failed to handshake") return @@ -351,7 +351,7 @@ class Actor: caller id and a ``trio.Queue`` that can be used to wait for responses delivered by the local message processing loop. """ - cid = str(uuid.uuid1()) + cid = str(uuid.uuid4()) assert chan.uid recv_chan = self.get_memchans(chan.uid, cid) log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") @@ -373,11 +373,12 @@ class Actor: msg = None log.debug(f"Entering msg loop for {chan} from {chan.uid}") try: - # internal scope allows for keeping this message - # loop running despite the current task having been - # cancelled (eg. `open_portal()` may call this method from - # a locally spawned task) with trio.CancelScope(shield=shield) as cs: + # this internal scope allows for keeping this message + # loop running despite the current task having been + # cancelled (eg. `open_portal()` may call this method from + # a locally spawned task) and recieve this scope using + # ``scope = Nursery.start()`` task_status.started(cs) async for msg in chan: if msg is None: # loop terminate sentinel @@ -385,7 +386,7 @@ class Actor: f"Cancelling all tasks for {chan} from {chan.uid}") for (channel, cid) in self._rpc_tasks: if channel is chan: - self.cancel_task(cid, Context(channel, cid)) + self._cancel_task(cid, channel) log.debug( f"Msg loop signalled to terminate for" f" {chan} from {chan.uid}") @@ -419,6 +420,16 @@ class Actor: f"{ns}.{funcname}({kwargs})") if ns == 'self': func = getattr(self, funcname) + if funcname == '_cancel_task': + # XXX: a special case is made here for + # remote calls since we don't want the + # remote actor have to know which channel + # the task is associated with and we can't + # pass non-primitive types between actors. + # This means you can use: + # Portal.run('self', '_cancel_task, cid=did) + # without passing the `chan` arg. + kwargs['chan'] = chan else: # complain to client about restricted modules try: @@ -537,7 +548,7 @@ class Actor: ) await chan.connect() # initial handshake, report who we are, who they are - await _do_handshake(self, chan) + await self._do_handshake(chan) except OSError: # failed to connect log.warning( f"Failed to connect to parent @ {parent_addr}," @@ -661,21 +672,20 @@ class Actor: self.cancel_server() self._root_nursery.cancel_scope.cancel() - async def cancel_task(self, cid, ctx): - """Cancel a local task. + async def _cancel_task(self, cid, chan): + """Cancel a local task by call-id / channel. - Note this method will be treated as a streaming funciton + Note this method will be treated as a streaming function by remote actor-callers due to the declaration of ``ctx`` in the signature (for now). """ # right now this is only implicitly called by # streaming IPC but it should be called # to cancel any remotely spawned task - chan = ctx.chan try: # this ctx based lookup ensures the requested task to # be cancelled was indeed spawned by a request from this channel - scope, func, is_complete = self._rpc_tasks[(ctx.chan, cid)] + scope, func, is_complete = self._rpc_tasks[(chan, cid)] except KeyError: log.warning(f"{cid} has already completed/terminated?") return @@ -686,7 +696,7 @@ class Actor: # don't allow cancelling this function mid-execution # (is this necessary?) - if func is self.cancel_task: + if func is self._cancel_task: return scope.cancel() @@ -704,7 +714,7 @@ class Actor: log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") for (chan, cid) in tasks.copy(): # TODO: this should really done in a nursery batch - await self.cancel_task(cid, Context(chan, cid)) + await self._cancel_task(cid, chan) # if tasks: log.info( f"Waiting for remaining rpc tasks to complete {tasks}") @@ -735,6 +745,25 @@ class Actor: """Return all channels to the actor with provided uid.""" return self._peers[uid] + async def _do_handshake( + self, + chan: Channel + ) -> Tuple[str, str]: + """Exchange (name, UUIDs) identifiers as the first communication step. + + These are essentially the "mailbox addresses" found in actor model + parlance. + """ + await chan.send(self.uid) + uid: Tuple[str, str] = await chan.recv() + + if not isinstance(uid, tuple): + raise ValueError(f"{uid} is not a valid uid?!") + + chan.uid = uid + log.info(f"Handshake with actor {uid}@{chan.raddr} complete") + return uid + class Arbiter(Actor): """A special actor who knows all the other actors and always has diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 1cddeca..5acb79a 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -215,10 +215,7 @@ class Context: """ chan: Channel cid: str - - # TODO: we should probably attach the actor-task - # cancel scope here now that trio is exposing it - # as a public object + cancel_scope: trio.CancelScope async def send_yield(self, data: Any) -> None: await self.chan.send({'yield': data, 'cid': self.cid}) diff --git a/tractor/_portal.py b/tractor/_portal.py index 160db19..925e3ad 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -33,21 +33,6 @@ async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): yield nursery -async def _do_handshake( - actor: 'Actor', # type: ignore - chan: Channel -) -> Any: - await chan.send(actor.uid) - uid: Tuple[str, str] = await chan.recv() - - if not isinstance(uid, tuple): - raise ValueError(f"{uid} is not a valid uid?!") - - chan.uid = uid - log.info(f"Handshake with actor {uid}@{chan.raddr} complete") - return uid - - class StreamReceiveChannel(trio.abc.ReceiveChannel): """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with special behaviour for signalling stream termination across an @@ -95,8 +80,8 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): raise unpack_error(msg, self._portal.channel) async def aclose(self): - """Cancel associate remote actor task on close - as well as the local memory channel. + """Cancel associated remote actor task and local memory channel + on close. """ if self._rx_chan._closed: log.warning(f"{self} is already closed") @@ -107,15 +92,10 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): log.warning( f"Cancelling stream {cid} to " f"{self._portal.channel.uid}") - # TODO: yeah.. it'd be nice if this was just an - # async func on the far end. Gotta figure out a - # better way then implicitly feeding the ctx - # to declaring functions; likely a decorator - # system. - rchan = await self._portal.run( - 'self', 'cancel_task', cid=cid) - async for _ in rchan: - pass + # NOTE: we're telling the far end actor to cancel a task + # corresponding to *this actor*. The far end local channel + # instance is passed to `Actor._cancel_task()` implicitly. + await self._portal.run('self', '_cancel_task', cid=cid) if cs.cancelled_caught: # XXX: there's no way to know if the remote task was indeed @@ -153,6 +133,7 @@ class Portal: Tuple[str, Any, str, Dict[str, Any]] ] = None self._streams: Set[StreamReceiveChannel] = set() + self.actor = current_actor() async def _submit( self, @@ -167,7 +148,7 @@ class Portal: This is an async call. """ # ship a function call request to the remote actor - cid, recv_chan = await current_actor().send_cmd( + cid, recv_chan = await self.actor.send_cmd( self.channel, ns, func, kwargs) # wait on first response msg and handle (this should be @@ -345,7 +326,7 @@ async def open_portal( was_connected = True if channel.uid is None: - await _do_handshake(actor, channel) + await actor._do_handshake(channel) msg_loop_cs = await nursery.start( partial(