diff --git a/tractor/_portal.py b/tractor/_portal.py index 2683d88..8a1b562 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -36,7 +36,7 @@ async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): async def _do_handshake( actor: 'Actor', # type: ignore chan: Channel -)-> Any: +) -> Any: await chan.send(actor.uid) uid: Tuple[str, str] = await chan.recv() @@ -76,8 +76,11 @@ class Portal: await self.channel.aclose() async def _submit( - self, ns: str, func: str, **kwargs - ) -> Tuple[str, trio.Queue, str, Dict[str, Any]]: + self, + ns: str, + func: str, + kwargs, + ) -> Tuple[str, trio.abc.ReceiveChannel, str, Dict[str, Any]]: """Submit a function to be scheduled and run by actor, return the associated caller id, response queue, response type str, first message packet as a tuple. @@ -85,11 +88,13 @@ class Portal: This is an async call. """ # ship a function call request to the remote actor - cid, q = await current_actor().send_cmd(self.channel, ns, func, kwargs) + cid, recv_chan = await current_actor().send_cmd( + self.channel, ns, func, kwargs) # wait on first response msg and handle (this should be # in an immediate response) - first_msg = await q.get() + + first_msg = await recv_chan.receive() functype = first_msg.get('functype') if functype == 'function' or functype == 'asyncfunction': @@ -101,12 +106,12 @@ class Portal: else: raise ValueError(f"{first_msg} is an invalid response packet?") - return cid, q, resp_type, first_msg + return cid, recv_chan, resp_type, first_msg async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None: assert self._expect_result is None, \ "A pending main result has already been submitted" - self._expect_result = await self._submit(ns, func, **kwargs) + self._expect_result = await self._submit(ns, func, kwargs) async def run(self, ns: str, func: str, **kwargs) -> Any: """Submit a remote function to be scheduled and run by actor, @@ -116,62 +121,67 @@ class Portal: remote rpc task or a local async generator instance. """ return await self._return_from_resptype( - *(await self._submit(ns, func, **kwargs)) + *(await self._submit(ns, func, kwargs)) ) async def _return_from_resptype( - self, cid: str, q: trio.Queue, resptype: str, first_msg: dict + self, + cid: str, + recv_chan: trio.abc.ReceiveChannel, + resptype: str, + first_msg: dict ) -> Any: # TODO: not this needs some serious work and thinking about how # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) - if resptype == 'yield': # stream response - async def yield_from_q(): - try: - async for msg in q: - try: - yield msg['yield'] - except KeyError: - if 'stop' in msg: - break # far end async gen terminated - else: - # internal error should never get here - assert msg.get('cid'), ( - "Received internal error at portal?") - raise unpack_error(msg, self.channel) + async def yield_from_recvchan(): + async with recv_chan: + try: + async for msg in recv_chan: + try: + yield msg['yield'] + except KeyError: + if 'stop' in msg: + break # far end async gen terminated + else: + # internal error should never get here + assert msg.get('cid'), ( + "Received internal error at portal?") + raise unpack_error(msg, self.channel) - except (GeneratorExit, trio.Cancelled): - log.warning( - f"Cancelling async gen call {cid} to " - f"{self.channel.uid}") - with trio.move_on_after(0.5) as cs: - cs.shield = True - # TODO: yeah.. it'd be nice if this was just an - # async func on the far end. Gotta figure out a - # better way then implicitly feeding the ctx - # to declaring functions; likely a decorator - # sytsem. - agen = await self.run('self', 'cancel_task', cid=cid) - async with aclosing(agen) as agen: - async for _ in agen: - pass - if cs.cancelled_caught: - if not self.channel.connected(): - log.warning( - "May have failed to cancel remote task " - f"{cid} for {self.channel.uid}") - raise + except (GeneratorExit, trio.Cancelled) as err: + log.warning( + f"Cancelling async gen call {cid} to " + f"{self.channel.uid}") + with trio.move_on_after(0.5) as cs: + cs.shield = True + # TODO: yeah.. it'd be nice if this was just an + # async func on the far end. Gotta figure out a + # better way then implicitly feeding the ctx + # to declaring functions; likely a decorator + # system. + agen = await self.run( + 'self', 'cancel_task', cid=cid) + async with aclosing(agen) as agen: + async for _ in agen: + pass + + if cs.cancelled_caught: + if not self.channel.connected(): + log.warning( + "May have failed to cancel remote task " + f"{cid} for {self.channel.uid}") # TODO: use AsyncExitStack to aclose() all agens # on teardown - agen = yield_from_q() + agen = yield_from_recvchan() self._agens.add(agen) return agen elif resptype == 'return': # single response - msg = await q.get() + msg = await recv_chan.receive() try: return msg['return'] except KeyError: @@ -214,17 +224,20 @@ class Portal: return self._result - async def _cancel_streams(self): - # terminate all locally running async generator - # IPC calls - if self._agens: - log.warning( - f"Cancelling all streams with {self.channel.uid}") - for agen in self._agens: - await agen.aclose() + # async def _cancel_streams(self): + # # terminate all locally running async generator + # # IPC calls + # if self._agens: + # log.warning( + # f"Cancelling all streams with {self.channel.uid}") + # for agen in self._agens: + # await agen.aclose() - async def close(self) -> None: - await self._cancel_streams() + async def aclose(self) -> None: + # TODO: once we move to implementing our own `ReceiveChannel` + # (including remote task cancellation inside its `.aclose()`) + # we'll need to .aclose all those channels here + pass async def cancel_actor(self) -> bool: """Cancel the actor on the other end of this portal. @@ -233,7 +246,7 @@ class Portal: log.warning("This portal is already closed can't cancel") return False - await self._cancel_streams() + # await self._cancel_streams() log.warning( f"Sending actor cancel request to {self.channel.uid} on " @@ -308,7 +321,7 @@ async def open_portal( try: yield portal finally: - await portal.close() + await portal.aclose() if was_connected: # cancel remote channel-msg loop