diff --git a/tractor/_portal.py b/tractor/_portal.py index 136683f..eb89252 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -65,6 +65,7 @@ class Portal: self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] ] = None + self._agens: Set(AsyncGenerator) = set() async def aclose(self) -> None: log.debug(f"Closing {self}") @@ -142,14 +143,16 @@ class Portal: except GeneratorExit: # for now this msg cancels an ongoing remote task await self.channel.send({'cancel': True, 'cid': cid}) - log.debug( + log.warn( f"Cancelling async gen call {cid} to " f"{self.channel.uid}") raise # TODO: use AsyncExitStack to aclose() all agens # on teardown - return yield_from_q() + agen = yield_from_q() + self._agens.add(agen) + return agen elif resptype == 'return': msg = await q.get() @@ -269,13 +272,18 @@ async def open_portal( nursery.start_soon(actor._process_messages, channel) portal = Portal(channel) - yield portal + try: + yield portal + finally: + # tear down all async generators + for agen in portal._agens: + await agen.aclose() - # cancel remote channel-msg loop - if channel.connected(): - await portal.close() + # cancel remote channel-msg loop + if channel.connected(): + await portal.close() - # cancel background msg loop task - nursery.cancel_scope.cancel() - if was_connected: - await channel.aclose() + # cancel background msg loop task + nursery.cancel_scope.cancel() + if was_connected: + await channel.aclose()