forked from goodboy/tractor
1
0
Fork 0

Explicitly formalize context/streaming teardown

Add clear teardown semantics for `Context` such that the remote side
cancellation propagation happens only on error or if client code
explicitly requests it (either by exit flag to `Portal.open_context()`
or by manually calling `Context.cancel()`).  Add `Context.result()`
to wait on and capture the final result from a remote context function;
any lingering msg sequence will be consumed/discarded.

Changes in order to make this possible:
- pass the runtime msg loop's feeder receive channel in to the context
  on the calling (portal opening) side such that a final 'return' msg
  can be waited upon using `Context.result()` which delivers the final
  return value from the callee side `@tractor.context` async function.
- always await a final result from the target context function in
  `Portal.open_context()`'s `__aexit__()` if the context has not
  been (requested to be) cancelled by client code on block exit.
- add an internal `Context._cancel_called` for context "cancel
  requested" tracking (much like `trio`'s cancel scope).
- allow flagging a stream as terminated using an internal
  `._eoc` flag which will mark the stream as stopped for iteration.
- drop `StopAsyncIteration` catching in `.receive()`; it does
  nothing.
bi_streaming
Tyler Goodlet 2021-06-13 19:58:52 -04:00
parent f8e2d4007c
commit 0af58522a4
2 changed files with 210 additions and 42 deletions

View File

@ -17,7 +17,12 @@ from async_generator import asynccontextmanager
from ._state import current_actor
from ._ipc import Channel
from .log import get_logger
from ._exceptions import unpack_error, NoResult, RemoteActorError
from ._exceptions import (
unpack_error,
NoResult,
RemoteActorError,
ContextCancelled,
)
from ._streaming import Context, ReceiveMsgStream
@ -84,7 +89,7 @@ class Portal:
ns: str,
func: str,
kwargs,
) -> Tuple[str, trio.abc.ReceiveChannel, str, Dict[str, Any]]:
) -> Tuple[str, trio.MemoryReceiveChannel, str, Dict[str, Any]]:
"""Submit a function to be scheduled and run by actor, return the
associated caller id, response queue, response type str,
first message packet as a tuple.
@ -327,7 +332,14 @@ class Portal:
# message right now since there shouldn't be a reason to
# stop and restart the stream, right?
try:
# We are for sure done with this stream and no more
# messages are expected to be delivered from the
# runtime's msg loop.
await recv_chan.aclose()
await ctx.cancel()
except trio.ClosedResourceError:
# if the far end terminates before we send a cancel the
# underlying transport-channel may already be closed.
@ -337,18 +349,21 @@ class Portal:
@asynccontextmanager
async def open_context(
self,
func: Callable,
cancel_on_exit: bool = False,
**kwargs,
) -> AsyncGenerator[Tuple[Context, Any], None]:
"""Open an inter-actor task context.
'''Open an inter-actor task context.
This is a synchronous API which allows for deterministic
setup/teardown of a remote task. The yielded ``Context`` further
allows for opening bidirectional streams - see
``Context.open_stream()``.
allows for opening bidirectional streams, explicit cancellation
and synchronized final result collection. See ``tractor.Context``.
"""
'''
# conduct target func method structural checks
if not inspect.iscoroutinefunction(func) and (
getattr(func, '_tractor_contex_function', False)
@ -358,8 +373,8 @@ class Portal:
fn_mod_path, fn_name = func_deats(func)
recv_chan: Optional[trio.MemoryReceiveChannel] = None
recv_chan: trio.ReceiveMemoryChannel = None
try:
cid, recv_chan, functype, first_msg = await self._submit(
fn_mod_path, fn_name, kwargs)
@ -383,13 +398,38 @@ class Portal:
# deliver context instance and .started() msg value in open
# tuple.
ctx = Context(self.channel, cid, _portal=self)
ctx = Context(
self.channel,
cid,
_portal=self,
_recv_chan=recv_chan,
)
try:
yield ctx, first
finally:
if cancel_on_exit:
await ctx.cancel()
else:
if not ctx._cancel_called:
await ctx.result()
except ContextCancelled:
# if the context was cancelled by client code
# then we don't need to raise since user code
# is expecting this.
if not ctx._cancel_called:
raise
except BaseException:
# the context cancels itself on any deviation
await ctx.cancel()
raise
finally:
log.info(f'Context for {func.__name__} completed')
finally:
if recv_chan is not None:
await recv_chan.aclose()

View File

@ -35,10 +35,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
which invoked a remote streaming function using `Portal.run()`.
Termination rules:
- if the local task signals stop iteration a cancel signal is
relayed to the remote task indicating to stop streaming
- if the remote task signals the end of a stream, raise a
``StopAsyncIteration`` to terminate the local ``async for``
- if the remote task signals the end of a stream, raise
a ``StopAsyncIteration`` to terminate the local ``async for``
"""
def __init__(
@ -51,12 +52,19 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
self._rx_chan = rx_chan
self._shielded = shield
# flag to denote end of stream
self._eoc: bool = False
# delegate directly to underlying mem channel
def receive_nowait(self):
msg = self._rx_chan.receive_nowait()
return msg['yield']
async def receive(self):
# see ``.aclose()`` to an alt to always checking this
if self._eoc:
raise trio.EndOfChannel
try:
msg = await self._rx_chan.receive()
return msg['yield']
@ -72,9 +80,14 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
if msg.get('stop'):
log.debug(f"{self} was stopped at remote end")
self._eoc = True
# when the send is closed we assume the stream has
# terminated and signal this local iterator to stop
await self.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration``.
raise trio.EndOfChannel
# TODO: test that shows stream raising an expected error!!!
@ -85,7 +98,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
else:
raise
except (trio.ClosedResourceError, StopAsyncIteration):
except trio.ClosedResourceError:
# XXX: this indicates that a `stop` message was
# sent by the far side of the underlying channel.
# Currently this is triggered by calling ``.aclose()`` on
@ -108,8 +121,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# terminated and signal this local iterator to stop
await self.aclose()
# await self._ctx.send_stop()
raise StopAsyncIteration
raise # propagate
except trio.Cancelled:
# relay cancels to the remote task
@ -138,12 +150,16 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
on close.
"""
# TODO: proper adherance to trio's `.aclose()` semantics:
# XXX: keep proper adherance to trio's `.aclose()` semantics:
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
rx_chan = self._rx_chan
if rx_chan._closed:
log.warning(f"{self} is already closed")
# this stream has already been closed so silently succeed as
# per ``trio.AsyncResource`` semantics.
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
return
# TODO: broadcasting to multiple consumers
@ -173,12 +189,45 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# `Context.open_stream()` will create the `Actor._cids2qs`
# entry from a call to `Actor.get_memchans()`.
if not self._ctx._portal:
try:
# only for 2 way streams can we can send
# stop from the caller side
await self._ctx.send_stop()
# close the local mem chan
await rx_chan.aclose()
except trio.BrokenResourceError:
# the underlying channel may already have been pulled
# in which case our stop message is meaningless since
# it can't traverse the transport.
log.debug(f'Channel for {self} was already closed')
self._eoc = True
# close the local mem chan??!?
# NOT if we're a ``MsgStream``!
# BECAUSE this same core-msg-loop mem recv-chan is used to deliver
# the potential final result from the surrounding inter-actor
# `Context` so we don't want to close it until that context has
# run to completion.
# XXX: Notes on old behaviour.
# In the receive-only case, ``Portal.open_stream_from()`` should
# call this explicitly on teardown but additionally if for some
# reason stream consumer code tries to manually receive a new
# value before ``.aclose()`` is called **but** the far end has
# stopped `.receive()` **must** raise ``trio.EndofChannel`` in
# order to avoid an infinite hang on ``.__anext__()``. So we can
# instead uncomment this check and close the underlying msg-loop
# mem chan below and not then **not** check for ``self._eoc`` in
# ``.receive()`` (if for some reason we think that check is
# a bottle neck - not likely) such that the
# ``trio.ClosedResourceError`` would instead trigger the
# ``trio.EndOfChannel`` in ``.receive()`` (as it originally was
# before bi-dir streaming support).
# if not isinstance(self, MsgStream):
# await rx_chan.aclose()
# TODO: but make it broadcasting to consumers
# def clone(self):
@ -206,29 +255,29 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel):
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
@dataclass(frozen=True)
@dataclass
class Context:
"""An IAC (inter-actor communication) context.
'''An inter-actor task communication context.
Allows maintaining task or protocol specific state between communicating
actors. A unique context is created on the receiving end for every request
to a remote actor.
Allows maintaining task or protocol specific state between
2 communicating actor tasks. A unique context is created on the
callee side/end for every request to a remote actor from a portal.
A context can be cancelled and (eventually) restarted from
A context can be cancelled and (possibly eventually restarted) from
either side of the underlying IPC channel.
A context can be used to open task oriented message streams.
A context can be used to open task oriented message streams and can
be thought of as an IPC aware inter-actor cancel scope.
"""
'''
chan: Channel
cid: str
# TODO: should we have seperate types for caller vs. callee
# side contexts? The caller always opens a portal whereas the callee
# is always responding back through a context-stream
# only set on the caller side
_portal: Optional['Portal'] = None # type: ignore # noqa
_recv_chan: Optional[trio.MemoryReceiveChannel] = None
_result: Optional[Any] = False
_cancel_called: bool = False
# only set on the callee side
_cancel_scope: Optional[trio.CancelScope] = None
@ -247,12 +296,14 @@ class Context:
await self.chan.send({'stop': True, 'cid': self.cid})
async def cancel(self) -> None:
"""Cancel this inter-actor-task context.
'''Cancel this inter-actor-task context.
Request that the far side cancel it's current linked context,
timeout quickly to sidestep 2-generals...
Timeout quickly in an attempt to sidestep 2-generals...
'''
self._cancel_called = True
"""
if self._portal: # caller side:
if not self._portal:
raise RuntimeError(
@ -290,17 +341,31 @@ class Context:
# https://github.com/goodboy/tractor/issues/36
self._cancel_scope.cancel()
# TODO: do we need a restart api?
# async def restart(self) -> None:
# pass
@asynccontextmanager
async def open_stream(
self,
shield: bool = False,
) -> AsyncGenerator[MsgStream, None]:
# TODO
) -> AsyncGenerator[MsgStream, None]:
'''Open a ``MsgStream``, a bi-directional stream connected to the
cross-actor (far end) task for this ``Context``.
This context manager must be entered on both the caller and
callee for the stream to logically be considered "connected".
A ``MsgStream`` is currently "one-shot" use, meaning if you
close it you can not "re-open" it for streaming and instead you
must re-establish a new surrounding ``Context`` using
``Portal.open_context()``. In the future this may change but
currently there seems to be no obvious reason to support
"re-opening":
- pausing a stream can be done with a message.
- task errors will normally require a restart of the entire
scope of the inter-actor task context due to the nature of
``trio``'s cancellation system.
'''
actor = current_actor()
# here we create a mem chan that corresponds to the
@ -316,6 +381,19 @@ class Context:
self.cid
)
# XXX: If the underlying receive mem chan has been closed then
# likely client code has already exited a ``.open_stream()``
# block prior. we error here until such a time that we decide
# allowing streams to be "re-connected" is supported and/or
# a good idea.
if recv_chan._closed:
task = trio.lowlevel.current_task().name
raise trio.ClosedResourceError(
f'stream for {actor.uid[0]}:{task} has already been closed.'
'\nRe-opening a closed stream is not yet supported!'
'\nConsider re-calling the containing `@tractor.context` func'
)
async with MsgStream(
ctx=self,
rx_chan=recv_chan,
@ -326,19 +404,65 @@ class Context:
self._portal._streams.add(rchan)
try:
# ensure we aren't cancelled before delivering
# the stream
# await trio.lowlevel.checkpoint()
yield rchan
except trio.EndOfChannel:
# stream iteration stop signal
raise
else:
# signal ``StopAsyncIteration`` on far end.
# XXX: Make the stream "one-shot use". On exit, signal
# ``trio.EndOfChannel``/``StopAsyncIteration`` to the
# far end.
await self.send_stop()
finally:
if self._portal:
self._portal._streams.remove(rchan)
async def result(self) -> Any:
'''From a caller side, wait for and return the final result from
the callee side task.
'''
assert self._portal, "Context.result() can not be called from callee!"
assert self._recv_chan
if self._result is False:
if not self._recv_chan._closed: # type: ignore
# wait for a final context result consuming
# and discarding any bi dir stream msgs still
# in transit from the far end.
while True:
msg = await self._recv_chan.receive()
try:
self._result = msg['return']
break
except KeyError:
if 'yield' in msg:
# far end task is still streaming to us..
log.warning(f'Remote stream deliverd {msg}')
# do disard
continue
elif 'stop' in msg:
log.debug('Remote stream terminated')
continue
# internal error should never get here
assert msg.get('cid'), (
"Received internal error at portal?")
raise unpack_error(msg, self._portal.channel)
return self._result
async def started(self, value: Optional[Any] = None) -> None:
if self._portal:
@ -347,6 +471,10 @@ class Context:
await self.chan.send({'started': value, 'cid': self.cid})
# TODO: do we need a restart api?
# async def restart(self) -> None:
# pass
def stream(func: Callable) -> Callable:
"""Mark an async function as a streaming routine with ``@stream``.