Make `LinkedTaskChannel` trio-task-broadcastable with `.subscribe()`

aio_explicit_task_cancels
Tyler Goodlet 2022-04-11 17:07:06 -04:00
parent 46963c2e63
commit c322a193f2
1 changed files with 43 additions and 3 deletions

View File

@ -38,6 +38,10 @@ from outcome import Error
from .log import get_logger
from ._state import current_actor
from ._exceptions import AsyncioCancelled
from .trionics._broadcast import (
broadcast_receiver,
BroadcastReceiver,
)
log = get_logger(__name__)
@ -63,6 +67,7 @@ class LinkedTaskChannel(trio.abc.Channel):
# set after ``asyncio.create_task()``
_aio_task: Optional[asyncio.Task] = None
_aio_err: Optional[BaseException] = None
_broadcaster: Optional[BroadcastReceiver] = None
async def aclose(self) -> None:
await self._from_aio.aclose()
@ -79,7 +84,7 @@ class LinkedTaskChannel(trio.abc.Channel):
return await self._from_aio.receive()
async def wait_ayncio_complete(self) -> None:
async def wait_asyncio_complete(self) -> None:
await self._aio_task_complete.wait()
# def cancel_asyncio_task(self) -> None:
@ -94,6 +99,43 @@ class LinkedTaskChannel(trio.abc.Channel):
'''
self._to_aio.put_nowait(item)
def closed(self) -> bool:
return self._from_aio._closed
# TODO: shoud we consider some kind of "decorator" system
# that checks for structural-typing compatibliity and then
# automatically adds this ctx-mngr-as-method machinery?
@acm
async def subscribe(
self,
) -> AsyncIterator[BroadcastReceiver]:
'''
Allocate and return a ``BroadcastReceiver`` which delegates
to this inter-task channel.
This allows multiple local tasks to receive each their own copy
of this message stream.
See ``tractor._streaming.MsgStream.subscribe()`` for further
similar details.
'''
if self._broadcaster is None:
bcast = self._broadcaster = broadcast_receiver(
self,
# use memory channel size by default
self._from_aio._state.max_buffer_size, # type: ignore
receive_afunc=self.receive,
)
self.receive = bcast.receive # type: ignore
async with self._broadcaster.subscribe() as bstream:
assert bstream.key != self._broadcaster.key
assert bstream._recv == self._broadcaster._recv
yield bstream
def _run_asyncio_task(
@ -334,7 +376,6 @@ async def translate_aio_errors(
maybe_raise_aio_err()
async def run_task(
func: Callable,
*,
@ -425,7 +466,6 @@ def run_as_asyncio_guest(
trio_done_fut = asyncio.Future()
def trio_done_callback(main_outcome):
actor = current_actor()
if isinstance(main_outcome, Error):
error = main_outcome.error