Merge `ReceiveMsgStream` and `MsgStream`

Since one-way streaming can be accomplished by just *not* sending on one
side (and/or thus wrapping such usage in a more restrictive API), we
just drop the recv-only parent type. The only method different was
`MsgStream.send()`, now merged in. Further in usage of `.subscribe()`
we monkey patch the underlying stream's `.send()` onto the delivered
broadcast receiver so that subscriber tasks can two-way stream as though
using the stream directly.

This allows us to more definitively drop `tractor.open_stream_from()` in
the longer run if we so choose as well; note currently this will
potentially create an issue if a caller tries to `.send()` on such a one
way stream.
breceiver_internals
Tyler Goodlet 2023-01-08 13:00:36 -05:00
parent c2367c1c5e
commit 9f9907271b
3 changed files with 20 additions and 18 deletions

View File

@ -24,7 +24,6 @@ from ._clustering import open_actor_cluster
from ._ipc import Channel
from ._streaming import (
Context,
ReceiveMsgStream,
MsgStream,
stream,
context,
@ -64,7 +63,6 @@ __all__ = [
'MsgStream',
'BaseExceptionGroup',
'Portal',
'ReceiveMsgStream',
'RemoteActorError',
'breakpoint',
'context',

View File

@ -45,7 +45,10 @@ from ._exceptions import (
NoResult,
ContextCancelled,
)
from ._streaming import Context, ReceiveMsgStream
from ._streaming import (
Context,
MsgStream,
)
log = get_logger(__name__)
@ -101,7 +104,7 @@ class Portal:
# it is expected that ``result()`` will be awaited at some
# point.
self._expect_result: Optional[Context] = None
self._streams: set[ReceiveMsgStream] = set()
self._streams: set[MsgStream] = set()
self.actor = current_actor()
async def _submit_for_result(
@ -316,7 +319,7 @@ class Portal:
async_gen_func: Callable, # typing: ignore
**kwargs,
) -> AsyncGenerator[ReceiveMsgStream, None]:
) -> AsyncGenerator[MsgStream, None]:
if not inspect.isasyncgenfunction(async_gen_func):
if not (
@ -341,7 +344,7 @@ class Portal:
try:
# deliver receive only stream
async with ReceiveMsgStream(
async with MsgStream(
ctx, ctx._recv_chan,
) as rchan:
self._streams.add(rchan)

View File

@ -50,12 +50,13 @@ log = get_logger(__name__)
# - use __slots__ on ``Context``?
class ReceiveMsgStream(trio.abc.ReceiveChannel):
class MsgStream(trio.abc.Channel):
'''
A IPC message stream for receiving logically sequenced values over
an inter-actor ``Channel``. This is the type returned to a local
task which entered either ``Portal.open_stream_from()`` or
``Context.open_stream()``.
A bidirectional message stream for receiving logically sequenced
values over an inter-actor IPC ``Channel``.
This is the type returned to a local task which entered either
``Portal.open_stream_from()`` or ``Context.open_stream()``.
Termination rules:
@ -317,15 +318,15 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
async with self._broadcaster.subscribe() as bstream:
assert bstream.key != self._broadcaster.key
assert bstream._recv == self._broadcaster._recv
# NOTE: we patch on a `.send()` to the bcaster so that the
# caller can still conduct 2-way streaming using this
# ``bstream`` handle transparently as though it was the msg
# stream instance.
bstream.send = self.send
yield bstream
class MsgStream(ReceiveMsgStream, trio.abc.Channel):
'''
Bidirectional message stream for use within an inter-actor actor
``Context```.
'''
async def send(
self,
data: Any