From 9f9907271bf9c2ef1192ee7b723e8dd70db5b844 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 8 Jan 2023 13:00:36 -0500 Subject: [PATCH] Merge `ReceiveMsgStream` and `MsgStream` Since one-way streaming can be accomplished by just *not* sending on one side (and/or thus wrapping such usage in a more restrictive API), we just drop the recv-only parent type. The only method different was `MsgStream.send()`, now merged in. Further in usage of `.subscribe()` we monkey patch the underlying stream's `.send()` onto the delivered broadcast receiver so that subscriber tasks can two-way stream as though using the stream directly. This allows us to more definitively drop `tractor.open_stream_from()` in the longer run if we so choose as well; note currently this will potentially create an issue if a caller tries to `.send()` on such a one way stream. --- tractor/__init__.py | 2 -- tractor/_portal.py | 11 +++++++---- tractor/_streaming.py | 25 +++++++++++++------------ 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index a691df6..731f3e9 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -24,7 +24,6 @@ from ._clustering import open_actor_cluster from ._ipc import Channel from ._streaming import ( Context, - ReceiveMsgStream, MsgStream, stream, context, @@ -64,7 +63,6 @@ __all__ = [ 'MsgStream', 'BaseExceptionGroup', 'Portal', - 'ReceiveMsgStream', 'RemoteActorError', 'breakpoint', 'context', diff --git a/tractor/_portal.py b/tractor/_portal.py index 05504bd..17871aa 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -45,7 +45,10 @@ from ._exceptions import ( NoResult, ContextCancelled, ) -from ._streaming import Context, ReceiveMsgStream +from ._streaming import ( + Context, + MsgStream, +) log = get_logger(__name__) @@ -101,7 +104,7 @@ class Portal: # it is expected that ``result()`` will be awaited at some # point. self._expect_result: Optional[Context] = None - self._streams: set[ReceiveMsgStream] = set() + self._streams: set[MsgStream] = set() self.actor = current_actor() async def _submit_for_result( @@ -316,7 +319,7 @@ class Portal: async_gen_func: Callable, # typing: ignore **kwargs, - ) -> AsyncGenerator[ReceiveMsgStream, None]: + ) -> AsyncGenerator[MsgStream, None]: if not inspect.isasyncgenfunction(async_gen_func): if not ( @@ -341,7 +344,7 @@ class Portal: try: # deliver receive only stream - async with ReceiveMsgStream( + async with MsgStream( ctx, ctx._recv_chan, ) as rchan: self._streams.add(rchan) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 699a906..f24856c 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -50,12 +50,13 @@ log = get_logger(__name__) # - use __slots__ on ``Context``? -class ReceiveMsgStream(trio.abc.ReceiveChannel): +class MsgStream(trio.abc.Channel): ''' - A IPC message stream for receiving logically sequenced values over - an inter-actor ``Channel``. This is the type returned to a local - task which entered either ``Portal.open_stream_from()`` or - ``Context.open_stream()``. + A bidirectional message stream for receiving logically sequenced + values over an inter-actor IPC ``Channel``. + + This is the type returned to a local task which entered either + ``Portal.open_stream_from()`` or ``Context.open_stream()``. Termination rules: @@ -317,15 +318,15 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): async with self._broadcaster.subscribe() as bstream: assert bstream.key != self._broadcaster.key assert bstream._recv == self._broadcaster._recv + + # NOTE: we patch on a `.send()` to the bcaster so that the + # caller can still conduct 2-way streaming using this + # ``bstream`` handle transparently as though it was the msg + # stream instance. + bstream.send = self.send + yield bstream - -class MsgStream(ReceiveMsgStream, trio.abc.Channel): - ''' - Bidirectional message stream for use within an inter-actor actor - ``Context```. - - ''' async def send( self, data: Any