diff --git a/tractor/__init__.py b/tractor/__init__.py index a691df6..731f3e9 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -24,7 +24,6 @@ from ._clustering import open_actor_cluster from ._ipc import Channel from ._streaming import ( Context, - ReceiveMsgStream, MsgStream, stream, context, @@ -64,7 +63,6 @@ __all__ = [ 'MsgStream', 'BaseExceptionGroup', 'Portal', - 'ReceiveMsgStream', 'RemoteActorError', 'breakpoint', 'context', diff --git a/tractor/_portal.py b/tractor/_portal.py index 05504bd..17871aa 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -45,7 +45,10 @@ from ._exceptions import ( NoResult, ContextCancelled, ) -from ._streaming import Context, ReceiveMsgStream +from ._streaming import ( + Context, + MsgStream, +) log = get_logger(__name__) @@ -101,7 +104,7 @@ class Portal: # it is expected that ``result()`` will be awaited at some # point. self._expect_result: Optional[Context] = None - self._streams: set[ReceiveMsgStream] = set() + self._streams: set[MsgStream] = set() self.actor = current_actor() async def _submit_for_result( @@ -316,7 +319,7 @@ class Portal: async_gen_func: Callable, # typing: ignore **kwargs, - ) -> AsyncGenerator[ReceiveMsgStream, None]: + ) -> AsyncGenerator[MsgStream, None]: if not inspect.isasyncgenfunction(async_gen_func): if not ( @@ -341,7 +344,7 @@ class Portal: try: # deliver receive only stream - async with ReceiveMsgStream( + async with MsgStream( ctx, ctx._recv_chan, ) as rchan: self._streams.add(rchan) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 699a906..f24856c 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -50,12 +50,13 @@ log = get_logger(__name__) # - use __slots__ on ``Context``? -class ReceiveMsgStream(trio.abc.ReceiveChannel): +class MsgStream(trio.abc.Channel): ''' - A IPC message stream for receiving logically sequenced values over - an inter-actor ``Channel``. This is the type returned to a local - task which entered either ``Portal.open_stream_from()`` or - ``Context.open_stream()``. + A bidirectional message stream for receiving logically sequenced + values over an inter-actor IPC ``Channel``. + + This is the type returned to a local task which entered either + ``Portal.open_stream_from()`` or ``Context.open_stream()``. Termination rules: @@ -317,15 +318,15 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): async with self._broadcaster.subscribe() as bstream: assert bstream.key != self._broadcaster.key assert bstream._recv == self._broadcaster._recv + + # NOTE: we patch on a `.send()` to the bcaster so that the + # caller can still conduct 2-way streaming using this + # ``bstream`` handle transparently as though it was the msg + # stream instance. + bstream.send = self.send + yield bstream - -class MsgStream(ReceiveMsgStream, trio.abc.Channel): - ''' - Bidirectional message stream for use within an inter-actor actor - ``Context```. - - ''' async def send( self, data: Any