From fe1c4dbc4ccc07fb99b03f659ec4963cd5a910ea Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 16 Feb 2019 14:05:03 -0500 Subject: [PATCH] mpypy and docs fixups --- tractor/_actor.py | 7 ++++--- tractor/_portal.py | 26 +++++++++++++++----------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 9842f12..02cdbc0 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -303,11 +303,12 @@ class Actor: async def _push_result( self, - actorid: Tuple[str, str], + chan: Channel, msg: Dict[str, Any], ) -> None: """Push an RPC result to the local consumer's queue. """ + actorid = chan.uid assert actorid, f"`actorid` can't be {actorid}" cid = msg['cid'] send_chan = self._cids2qs[(actorid, cid)] @@ -390,10 +391,10 @@ class Actor: f" {chan} from {chan.uid}") break - log.trace(f"Received msg {msg} from {chan.uid}") + log.trace(f"Received msg {msg} from {chan.uid}") # type: ignore if msg.get('cid'): # deliver response to local caller/waiter - await self._push_result(chan.uid, msg) + await self._push_result(chan, msg) log.debug( f"Waiting on next msg for {chan} from {chan.uid}") continue diff --git a/tractor/_portal.py b/tractor/_portal.py index 9b2265e..11ccdbf 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -49,9 +49,10 @@ async def _do_handshake( class StreamReceiveChannel(trio.abc.ReceiveChannel): - """A wrapper around a ``trio.abc.ReceiveChannel`` with - special behaviour for stream termination on both ends of - an inter-actor channel. + """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with + special behaviour for signalling stream termination on across an + inter-actor ``Channel``. This is the type returned to a local task + which invoked a remote streaming function using `Portal.run()`. Termination rules: - if the local task signals stop iteration a cancel signal is @@ -65,7 +66,7 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): cid: str, rx_chan: trio.abc.ReceiveChannel, portal: 'Portal', - ): + ) -> None: self._cid = cid self._rx_chan = rx_chan self._portal = portal @@ -84,28 +85,28 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): await self.aclose() raise StopAsyncIteration except trio.Cancelled: + # relay cancels to the remote task await self.aclose() raise except KeyError: - # if 'stop' in msg: - # break # far end async gen terminated - # else: # internal error should never get here assert msg.get('cid'), ( "Received internal error at portal?") raise unpack_error(msg, self._portal.channel) async def aclose(self): + """Cancel associate remote actor task on close + as well as the local memory channel. + """ if self._rx_chan._closed: log.warning(f"{self} is already closed") return cid = self._cid - # XXX: cancel remote task on close - log.warning( - f"Cancelling stream {cid} to " - f"{self._portal.channel.uid}") with trio.move_on_after(0.5) as cs: cs.shield = True + log.warning( + f"Cancelling stream {cid} to " + f"{self._portal.channel.uid}") # TODO: yeah.. it'd be nice if this was just an # async func on the far end. Gotta figure out a # better way then implicitly feeding the ctx @@ -117,6 +118,9 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): pass if cs.cancelled_caught: + # XXX: there's no way to know if the remote task was indeed + # cancelled in the case where the connection is broken or + # some other network error occurred. if not self._portal.channel.connected(): log.warning( "May have failed to cancel remote task "