forked from goodboy/tractor
1
0
Fork 0

Update docs to new close semantics

alpha2
Tyler Goodlet 2021-09-02 08:24:13 -04:00
parent af85d35685
commit b4d95e9543
1 changed files with 25 additions and 19 deletions

View File

@ -29,23 +29,26 @@ log = get_logger(__name__)
class ReceiveMsgStream(trio.abc.ReceiveChannel): class ReceiveMsgStream(trio.abc.ReceiveChannel):
"""A wrapper around a ``trio._channel.MemoryReceiveChannel`` with '''A IPC message stream for receiving logically sequenced values
special behaviour for signalling stream termination across an over an inter-actor ``Channel``. This is the type returned to
inter-actor ``Channel``. This is the type returned to a local task a local task which entered either ``Portal.open_stream_from()`` or
which invoked a remote streaming function using `Portal.run()`. ``Context.open_stream()``.
Termination rules: Termination rules:
- if the local task signals stop iteration a cancel signal is - on cancellation the stream is **not** implicitly closed and the
relayed to the remote task indicating to stop streaming surrounding ``Context`` is expected to handle how that cancel
- if the remote task signals the end of a stream, raise is relayed to any task on the remote side.
a ``StopAsyncIteration`` to terminate the local ``async for`` - if the remote task signals the end of a stream the
``ReceiveChannel`` semantics dictate that a ``StopAsyncIteration``
to terminate the local ``async for``.
""" '''
def __init__( def __init__(
self, self,
ctx: 'Context', # typing: ignore # noqa ctx: 'Context', # typing: ignore # noqa
rx_chan: trio.abc.ReceiveChannel, rx_chan: trio.abc.ReceiveChannel,
) -> None: ) -> None:
self._ctx = ctx self._ctx = ctx
self._rx_chan = rx_chan self._rx_chan = rx_chan
@ -59,13 +62,16 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
return msg['yield'] return msg['yield']
async def receive(self): async def receive(self):
'''Async receive a single msg from the IPC transport, the next
in sequence for this stream.
'''
# see ``.aclose()`` for notes on the old behaviour prior to # see ``.aclose()`` for notes on the old behaviour prior to
# introducing this # introducing this
if self._eoc: if self._eoc:
raise trio.EndOfChannel raise trio.EndOfChannel
try: try:
msg = await self._rx_chan.receive() msg = await self._rx_chan.receive()
return msg['yield'] return msg['yield']
@ -101,10 +107,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
except ( except (
trio.ClosedResourceError, # by self._rx_chan trio.ClosedResourceError, # by self._rx_chan
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
# Wait why would we do an implicit close on cancel? THAT'S
# NOT HOW MEM CHANS WORK!!?!?!?!?
# trio.Cancelled, # by local cancellation
): ):
# XXX: we close the stream on any of these error conditions: # XXX: we close the stream on any of these error conditions:
@ -153,7 +155,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
return return
# XXX: This must be set **AFTER** the shielded test above!
self._eoc = True self._eoc = True
# NOTE: this is super subtle IPC messaging stuff: # NOTE: this is super subtle IPC messaging stuff:
@ -175,9 +176,14 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# ``__aexit__()`` on teardown so it **does not** need to be # ``__aexit__()`` on teardown so it **does not** need to be
# called here. # called here.
if not self._ctx._portal: if not self._ctx._portal:
# Only for 2 way streams can we can send stop from the
# caller side.
try: try:
# only for 2 way streams can we can send # NOTE: if this call is cancelled we expect this end to
# stop from the caller side # handle as though the stop was never sent (though if it
# was it shouldn't matter since it's unlikely a user
# will try to re-use a stream after attemping to close
# it).
await self._ctx.send_stop() await self._ctx.send_stop()
except ( except (
@ -189,9 +195,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# it can't traverse the transport. # it can't traverse the transport.
log.debug(f'Channel for {self} was already closed') log.debug(f'Channel for {self} was already closed')
# close the local mem chan ``self._rx_chan`` ??!? # Do we close the local mem chan ``self._rx_chan`` ??!?
# DEFINITELY NOT if we're a bi-dir ``MsgStream``! # NO, DEFINITELY NOT if we're a bi-dir ``MsgStream``!
# BECAUSE this same core-msg-loop mem recv-chan is used to deliver # BECAUSE this same core-msg-loop mem recv-chan is used to deliver
# the potential final result from the surrounding inter-actor # the potential final result from the surrounding inter-actor
# `Context` so we don't want to close it until that context has # `Context` so we don't want to close it until that context has