forked from goodboy/tractor
Merge pull request #230 from goodboy/drop_stream_shielding
Drop stream shielding; it was from a legacy api designalpha2
commit
e5845b5d36
|
@ -0,0 +1,9 @@
|
||||||
|
Drop stream "shielding" support which was originally added to sidestep
|
||||||
|
a cancelled call to ``.receive()``
|
||||||
|
|
||||||
|
In the original api design a stream instance was returned directly from
|
||||||
|
a call to ``Portal.run()`` and thus there was no "exit phase" to handle
|
||||||
|
cancellations and errors which would trigger implicit closure. Now that
|
||||||
|
we have said enter/exit semantics with ``Portal.open_stream_from()`` and
|
||||||
|
``Context.open_stream()`` we can drop this implicit (and arguably
|
||||||
|
confusing) behavior.
|
|
@ -313,12 +313,12 @@ async def test_respawn_consumer_task(
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
|
|
||||||
# shield stream's underlying channel from cancellation
|
# shield stream's underlying channel from cancellation
|
||||||
with stream.shield():
|
# with stream.shield():
|
||||||
|
|
||||||
async for v in stream:
|
async for v in stream:
|
||||||
print(f'from stream: {v}')
|
print(f'from stream: {v}')
|
||||||
expect.remove(v)
|
expect.remove(v)
|
||||||
received.append(v)
|
received.append(v)
|
||||||
|
|
||||||
print('exited consume')
|
print('exited consume')
|
||||||
|
|
||||||
|
|
|
@ -29,28 +29,29 @@ log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
"""A wrapper around a ``trio._channel.MemoryReceiveChannel`` with
|
'''A IPC message stream for receiving logically sequenced values
|
||||||
special behaviour for signalling stream termination across an
|
over an inter-actor ``Channel``. This is the type returned to
|
||||||
inter-actor ``Channel``. This is the type returned to a local task
|
a local task which entered either ``Portal.open_stream_from()`` or
|
||||||
which invoked a remote streaming function using `Portal.run()`.
|
``Context.open_stream()``.
|
||||||
|
|
||||||
Termination rules:
|
Termination rules:
|
||||||
|
|
||||||
- if the local task signals stop iteration a cancel signal is
|
- on cancellation the stream is **not** implicitly closed and the
|
||||||
relayed to the remote task indicating to stop streaming
|
surrounding ``Context`` is expected to handle how that cancel
|
||||||
- if the remote task signals the end of a stream, raise
|
is relayed to any task on the remote side.
|
||||||
a ``StopAsyncIteration`` to terminate the local ``async for``
|
- if the remote task signals the end of a stream the
|
||||||
|
``ReceiveChannel`` semantics dictate that a ``StopAsyncIteration``
|
||||||
|
to terminate the local ``async for``.
|
||||||
|
|
||||||
"""
|
'''
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
ctx: 'Context', # typing: ignore # noqa
|
ctx: 'Context', # typing: ignore # noqa
|
||||||
rx_chan: trio.abc.ReceiveChannel,
|
rx_chan: trio.abc.ReceiveChannel,
|
||||||
shield: bool = False,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
self._ctx = ctx
|
self._ctx = ctx
|
||||||
self._rx_chan = rx_chan
|
self._rx_chan = rx_chan
|
||||||
self._shielded = shield
|
|
||||||
|
|
||||||
# flag to denote end of stream
|
# flag to denote end of stream
|
||||||
self._eoc: bool = False
|
self._eoc: bool = False
|
||||||
|
@ -61,13 +62,16 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
return msg['yield']
|
return msg['yield']
|
||||||
|
|
||||||
async def receive(self):
|
async def receive(self):
|
||||||
|
'''Async receive a single msg from the IPC transport, the next
|
||||||
|
in sequence for this stream.
|
||||||
|
|
||||||
|
'''
|
||||||
# see ``.aclose()`` for notes on the old behaviour prior to
|
# see ``.aclose()`` for notes on the old behaviour prior to
|
||||||
# introducing this
|
# introducing this
|
||||||
if self._eoc:
|
if self._eoc:
|
||||||
raise trio.EndOfChannel
|
raise trio.EndOfChannel
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
||||||
msg = await self._rx_chan.receive()
|
msg = await self._rx_chan.receive()
|
||||||
return msg['yield']
|
return msg['yield']
|
||||||
|
|
||||||
|
@ -103,7 +107,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
except (
|
except (
|
||||||
trio.ClosedResourceError, # by self._rx_chan
|
trio.ClosedResourceError, # by self._rx_chan
|
||||||
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
|
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
|
||||||
trio.Cancelled, # by local cancellation
|
|
||||||
):
|
):
|
||||||
# XXX: we close the stream on any of these error conditions:
|
# XXX: we close the stream on any of these error conditions:
|
||||||
|
|
||||||
|
@ -135,23 +138,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
|
|
||||||
raise # propagate
|
raise # propagate
|
||||||
|
|
||||||
@contextmanager
|
|
||||||
def shield(
|
|
||||||
self
|
|
||||||
) -> Iterator['ReceiveMsgStream']: # noqa
|
|
||||||
"""Shield this stream's underlying channel such that a local consumer task
|
|
||||||
can be cancelled (and possibly restarted) using ``trio.Cancelled``.
|
|
||||||
|
|
||||||
Note that here, "shielding" here guards against relaying
|
|
||||||
a ``'stop'`` message to the far end of the stream thus keeping
|
|
||||||
the stream machinery active and ready for further use, it does
|
|
||||||
not have anything to do with an internal ``trio.CancelScope``.
|
|
||||||
|
|
||||||
"""
|
|
||||||
self._shielded = True
|
|
||||||
yield self
|
|
||||||
self._shielded = False
|
|
||||||
|
|
||||||
async def aclose(self):
|
async def aclose(self):
|
||||||
"""Cancel associated remote actor task and local memory channel
|
"""Cancel associated remote actor task and local memory channel
|
||||||
on close.
|
on close.
|
||||||
|
@ -169,19 +155,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
|
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
|
||||||
return
|
return
|
||||||
|
|
||||||
# TODO: broadcasting to multiple consumers
|
|
||||||
# stats = rx_chan.statistics()
|
|
||||||
# if stats.open_receive_channels > 1:
|
|
||||||
# # if we've been cloned don't kill the stream
|
|
||||||
# log.debug(
|
|
||||||
# "there are still consumers running keeping stream alive")
|
|
||||||
# return
|
|
||||||
|
|
||||||
if self._shielded:
|
|
||||||
log.warning(f"{self} is shielded, portal channel being kept alive")
|
|
||||||
return
|
|
||||||
|
|
||||||
# XXX: This must be set **AFTER** the shielded test above!
|
|
||||||
self._eoc = True
|
self._eoc = True
|
||||||
|
|
||||||
# NOTE: this is super subtle IPC messaging stuff:
|
# NOTE: this is super subtle IPC messaging stuff:
|
||||||
|
@ -203,9 +176,14 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
# ``__aexit__()`` on teardown so it **does not** need to be
|
# ``__aexit__()`` on teardown so it **does not** need to be
|
||||||
# called here.
|
# called here.
|
||||||
if not self._ctx._portal:
|
if not self._ctx._portal:
|
||||||
|
# Only for 2 way streams can we can send stop from the
|
||||||
|
# caller side.
|
||||||
try:
|
try:
|
||||||
# only for 2 way streams can we can send
|
# NOTE: if this call is cancelled we expect this end to
|
||||||
# stop from the caller side
|
# handle as though the stop was never sent (though if it
|
||||||
|
# was it shouldn't matter since it's unlikely a user
|
||||||
|
# will try to re-use a stream after attemping to close
|
||||||
|
# it).
|
||||||
await self._ctx.send_stop()
|
await self._ctx.send_stop()
|
||||||
|
|
||||||
except (
|
except (
|
||||||
|
@ -217,9 +195,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
||||||
# it can't traverse the transport.
|
# it can't traverse the transport.
|
||||||
log.debug(f'Channel for {self} was already closed')
|
log.debug(f'Channel for {self} was already closed')
|
||||||
|
|
||||||
# close the local mem chan ``self._rx_chan`` ??!?
|
# Do we close the local mem chan ``self._rx_chan`` ??!?
|
||||||
|
|
||||||
# DEFINITELY NOT if we're a bi-dir ``MsgStream``!
|
# NO, DEFINITELY NOT if we're a bi-dir ``MsgStream``!
|
||||||
# BECAUSE this same core-msg-loop mem recv-chan is used to deliver
|
# BECAUSE this same core-msg-loop mem recv-chan is used to deliver
|
||||||
# the potential final result from the surrounding inter-actor
|
# the potential final result from the surrounding inter-actor
|
||||||
# `Context` so we don't want to close it until that context has
|
# `Context` so we don't want to close it until that context has
|
||||||
|
@ -397,7 +375,6 @@ class Context:
|
||||||
async def open_stream(
|
async def open_stream(
|
||||||
|
|
||||||
self,
|
self,
|
||||||
shield: bool = False,
|
|
||||||
|
|
||||||
) -> AsyncGenerator[MsgStream, None]:
|
) -> AsyncGenerator[MsgStream, None]:
|
||||||
'''Open a ``MsgStream``, a bi-directional stream connected to the
|
'''Open a ``MsgStream``, a bi-directional stream connected to the
|
||||||
|
@ -455,7 +432,6 @@ class Context:
|
||||||
async with MsgStream(
|
async with MsgStream(
|
||||||
ctx=self,
|
ctx=self,
|
||||||
rx_chan=recv_chan,
|
rx_chan=recv_chan,
|
||||||
shield=shield,
|
|
||||||
) as rchan:
|
) as rchan:
|
||||||
|
|
||||||
if self._portal:
|
if self._portal:
|
||||||
|
|
Loading…
Reference in New Issue