diff --git a/tractor/_actor.py b/tractor/_actor.py index ceee16b..2a8ec66 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -199,7 +199,9 @@ class Actor: Tuple[trio._core._run.CancelScope, typing.Callable, trio.Event] ] = {} # map {uids -> {callids -> waiter queues}} - self._actors2calls: Dict[Tuple[str, str], Dict[str, trio.Queue]] = {} + self._cids2qs: Dict[ + Tuple[Tuple[str, str], str], + trio.abc.SendChannel[Any]] = {} self._listeners: List[trio.abc.Listener] = [] self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[Tuple[Any, Any, Any, Any, Any]] = None @@ -299,37 +301,58 @@ class Actor: log.exception( f"Channel for {chan.uid} was already zonked..") - async def _push_result(self, actorid, cid: str, msg: dict) -> None: + async def _push_result( + self, + actorid: Tuple[str, str], + msg: Dict[str, Any], + ) -> None: """Push an RPC result to the local consumer's queue. """ assert actorid, f"`actorid` can't be {actorid}" - q = self.get_waitq(actorid, cid) + cid = msg['cid'] + send_chan = self._cids2qs[(actorid, cid)] + assert send_chan.cid == cid log.debug(f"Delivering {msg} from {actorid} to caller {cid}") - # maintain backpressure - await q.put(msg) + try: + # maintain backpressure + await send_chan.send(msg) + except trio.BrokenResourceError: + # XXX: local consumer has closed their side + # so cancel the far end streaming task + log.warning(f"{send_chan} consumer is already closed") - def get_waitq( + def get_memchans( self, actorid: Tuple[str, str], cid: str - ) -> trio.Queue: + ) -> trio.abc.ReceiveChannel: log.debug(f"Getting result queue for {actorid} cid {cid}") - cids2qs = self._actors2calls.setdefault(actorid, {}) - return cids2qs.setdefault(cid, trio.Queue(1000)) + try: + recv_chan = self._cids2qs[(actorid, cid)] + except KeyError: + send_chan, recv_chan = trio.open_memory_channel(1000) + send_chan.cid = cid + self._cids2qs[(actorid, cid)] = send_chan + + return recv_chan async def send_cmd( - self, chan: Channel, ns: str, func: str, kwargs: dict - ) -> Tuple[str, trio.Queue]: + self, + chan: Channel, + ns: str, + func: str, + kwargs: dict + ) -> Tuple[str, trio.abc.ReceiveChannel]: """Send a ``'cmd'`` message to a remote actor and return a caller id and a ``trio.Queue`` that can be used to wait for responses delivered by the local message processing loop. """ cid = str(uuid.uuid1()) assert chan.uid - q = self.get_waitq(chan.uid, cid) + recv_chan = self.get_memchans(chan.uid, cid) log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) - return cid, q + return cid, recv_chan async def _process_messages( self, chan: Channel, @@ -364,11 +387,10 @@ class Actor: f" {chan} from {chan.uid}") break - log.debug(f"Received msg {msg} from {chan.uid}") - cid = msg.get('cid') - if cid: + log.trace(f"Received msg {msg} from {chan.uid}") + if msg.get('cid'): # deliver response to local caller/waiter - await self._push_result(chan.uid, cid, msg) + await self._push_result(chan.uid, msg) log.debug( f"Waiting on next msg for {chan} from {chan.uid}") continue