Consider relaying context error via raised-in-scope-nursery task
parent
aace4eae5f
commit
589d16dd95
|
@ -46,6 +46,7 @@ class ActorFailure(Exception):
|
|||
|
||||
|
||||
async def _invoke(
|
||||
|
||||
actor: 'Actor',
|
||||
cid: str,
|
||||
chan: Channel,
|
||||
|
@ -58,10 +59,12 @@ async def _invoke(
|
|||
"""Invoke local func and deliver result(s) over provided channel.
|
||||
"""
|
||||
treat_as_gen = False
|
||||
cs = None
|
||||
|
||||
cancel_scope = trio.CancelScope()
|
||||
ctx = Context(chan, cid, _cancel_scope=cancel_scope)
|
||||
context = False
|
||||
cs: trio.CancelScope = None
|
||||
|
||||
ctx = Context(chan, cid)
|
||||
context: bool = False
|
||||
|
||||
if getattr(func, '_tractor_stream_function', False):
|
||||
# handle decorated ``@tractor.stream`` async functions
|
||||
|
@ -149,14 +152,22 @@ async def _invoke(
|
|||
# context func with support for bi-dir streaming
|
||||
await chan.send({'functype': 'context', 'cid': cid})
|
||||
|
||||
with cancel_scope as cs:
|
||||
async with trio.open_nursery() as scope_nursery:
|
||||
ctx._scope_nursery = scope_nursery
|
||||
cs = scope_nursery.cancel_scope
|
||||
task_status.started(cs)
|
||||
await chan.send({'return': await coro, 'cid': cid})
|
||||
|
||||
if cs.cancelled_caught:
|
||||
if ctx._cancel_called:
|
||||
msg = f'{func.__name__} cancelled itself',
|
||||
|
||||
else:
|
||||
msg = f'{func.__name__} was remotely cancelled',
|
||||
|
||||
# task-contex was cancelled so relay to the cancel to caller
|
||||
raise ContextCancelled(
|
||||
f'{func.__name__} cancelled itself',
|
||||
msg,
|
||||
suberror_type=trio.Cancelled,
|
||||
)
|
||||
|
||||
|
@ -204,8 +215,10 @@ async def _invoke(
|
|||
await chan.send(err_msg)
|
||||
|
||||
except trio.ClosedResourceError:
|
||||
log.warning(
|
||||
f"Failed to ship error to caller @ {chan.uid}")
|
||||
# if we can't propagate the error that's a big boo boo
|
||||
log.error(
|
||||
f"Failed to ship error to caller @ {chan.uid} !?"
|
||||
)
|
||||
|
||||
if cs is None:
|
||||
# error is from above code not from rpc invocation
|
||||
|
@ -385,12 +398,16 @@ class Actor:
|
|||
raise mne
|
||||
|
||||
async def _stream_handler(
|
||||
|
||||
self,
|
||||
stream: trio.SocketStream,
|
||||
|
||||
) -> None:
|
||||
"""Entry point for new inbound connections to the channel server.
|
||||
|
||||
"""
|
||||
self._no_more_peers = trio.Event() # unset
|
||||
|
||||
chan = Channel(stream=stream)
|
||||
log.info(f"New connection to us {chan}")
|
||||
|
||||
|
@ -437,10 +454,24 @@ class Actor:
|
|||
try:
|
||||
await self._process_messages(chan)
|
||||
finally:
|
||||
|
||||
# channel cleanup sequence
|
||||
|
||||
# for (channel, cid) in self._rpc_tasks.copy():
|
||||
# if channel is chan:
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await self._cancel_task(cid, channel)
|
||||
|
||||
# # close all consumer side task mem chans
|
||||
# send_chan, _ = self._cids2qs[(chan.uid, cid)]
|
||||
# assert send_chan.cid == cid # type: ignore
|
||||
# await send_chan.aclose()
|
||||
|
||||
# Drop ref to channel so it can be gc-ed and disconnected
|
||||
log.debug(f"Releasing channel {chan} from {chan.uid}")
|
||||
chans = self._peers.get(chan.uid)
|
||||
chans.remove(chan)
|
||||
|
||||
if not chans:
|
||||
log.debug(f"No more channels for {chan.uid}")
|
||||
self._peers.pop(chan.uid, None)
|
||||
|
@ -453,14 +484,22 @@ class Actor:
|
|||
|
||||
# # XXX: is this necessary (GC should do it?)
|
||||
if chan.connected():
|
||||
# if the channel is still connected it may mean the far
|
||||
# end has not closed and we may have gotten here due to
|
||||
# an error and so we should at least try to terminate
|
||||
# the channel from this end gracefully.
|
||||
|
||||
log.debug(f"Disconnecting channel {chan}")
|
||||
try:
|
||||
# send our msg loop terminate sentinel
|
||||
# send a msg loop terminate sentinel
|
||||
await chan.send(None)
|
||||
|
||||
# XXX: do we want this?
|
||||
# causes "[104] connection reset by peer" on other end
|
||||
# await chan.aclose()
|
||||
|
||||
except trio.BrokenResourceError:
|
||||
log.exception(
|
||||
f"Channel for {chan.uid} was already zonked..")
|
||||
log.warning(f"Channel for {chan.uid} was already closed")
|
||||
|
||||
async def _push_result(
|
||||
self,
|
||||
|
@ -470,18 +509,22 @@ class Actor:
|
|||
) -> None:
|
||||
"""Push an RPC result to the local consumer's queue.
|
||||
"""
|
||||
actorid = chan.uid
|
||||
assert actorid, f"`actorid` can't be {actorid}"
|
||||
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
|
||||
# actorid = chan.uid
|
||||
assert chan.uid, f"`chan.uid` can't be {chan.uid}"
|
||||
send_chan, recv_chan = self._cids2qs[(chan.uid, cid)]
|
||||
assert send_chan.cid == cid # type: ignore
|
||||
|
||||
# if 'stop' in msg:
|
||||
if 'error' in msg:
|
||||
ctx = getattr(recv_chan, '_ctx', None)
|
||||
# if ctx:
|
||||
# ctx._error_from_remote_msg(msg)
|
||||
|
||||
# log.debug(f"{send_chan} was terminated at remote end")
|
||||
# # indicate to consumer that far end has stopped
|
||||
# return await send_chan.aclose()
|
||||
|
||||
try:
|
||||
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
|
||||
log.debug(f"Delivering {msg} from {chan.uid} to caller {cid}")
|
||||
# maintain backpressure
|
||||
await send_chan.send(msg)
|
||||
|
||||
|
@ -500,7 +543,9 @@ class Actor:
|
|||
self,
|
||||
actorid: Tuple[str, str],
|
||||
cid: str
|
||||
|
||||
) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]:
|
||||
|
||||
log.debug(f"Getting result queue for {actorid} cid {cid}")
|
||||
try:
|
||||
send_chan, recv_chan = self._cids2qs[(actorid, cid)]
|
||||
|
@ -562,9 +607,15 @@ class Actor:
|
|||
if channel is chan:
|
||||
await self._cancel_task(cid, channel)
|
||||
|
||||
# close all consumer side task mem chans
|
||||
# send_chan, _ = self._cids2qs[(chan.uid, cid)]
|
||||
# assert send_chan.cid == cid # type: ignore
|
||||
# await send_chan.aclose()
|
||||
|
||||
log.debug(
|
||||
f"Msg loop signalled to terminate for"
|
||||
f" {chan} from {chan.uid}")
|
||||
|
||||
break
|
||||
|
||||
log.transport( # type: ignore
|
||||
|
@ -671,9 +722,6 @@ class Actor:
|
|||
log.error(f"{chan} form {chan.uid} closed abruptly")
|
||||
# raise
|
||||
|
||||
except trio.ClosedResourceError:
|
||||
log.error(f"{chan} form {chan.uid} broke")
|
||||
|
||||
except (Exception, trio.MultiError) as err:
|
||||
# ship any "internal" exception (i.e. one from internal machinery
|
||||
# not from an rpc task) to parent
|
||||
|
|
|
@ -177,6 +177,7 @@ class Portal:
|
|||
f"Cancelling all streams with {self.channel.uid}")
|
||||
for stream in self._streams.copy():
|
||||
try:
|
||||
# with trio.CancelScope(shield=True):
|
||||
await stream.aclose()
|
||||
except trio.ClosedResourceError:
|
||||
# don't error the stream having already been closed
|
||||
|
@ -369,64 +370,78 @@ class Portal:
|
|||
|
||||
recv_chan: Optional[trio.MemoryReceiveChannel] = None
|
||||
|
||||
cid, recv_chan, functype, first_msg = await self._submit(
|
||||
fn_mod_path, fn_name, kwargs)
|
||||
|
||||
assert functype == 'context'
|
||||
msg = await recv_chan.receive()
|
||||
|
||||
try:
|
||||
cid, recv_chan, functype, first_msg = await self._submit(
|
||||
fn_mod_path, fn_name, kwargs)
|
||||
# the "first" value here is delivered by the callee's
|
||||
# ``Context.started()`` call.
|
||||
first = msg['started']
|
||||
|
||||
assert functype == 'context'
|
||||
msg = await recv_chan.receive()
|
||||
except KeyError:
|
||||
assert msg.get('cid'), ("Received internal error at context?")
|
||||
|
||||
try:
|
||||
# the "first" value here is delivered by the callee's
|
||||
# ``Context.started()`` call.
|
||||
first = msg['started']
|
||||
|
||||
except KeyError:
|
||||
assert msg.get('cid'), ("Received internal error at context?")
|
||||
|
||||
if msg.get('error'):
|
||||
# raise the error message
|
||||
raise unpack_error(msg, self.channel)
|
||||
else:
|
||||
raise
|
||||
|
||||
# deliver context instance and .started() msg value in open
|
||||
# tuple.
|
||||
ctx = Context(
|
||||
self.channel,
|
||||
cid,
|
||||
_portal=self,
|
||||
_recv_chan=recv_chan,
|
||||
)
|
||||
|
||||
try:
|
||||
yield ctx, first
|
||||
|
||||
if cancel_on_exit:
|
||||
await ctx.cancel()
|
||||
|
||||
else:
|
||||
if not ctx._cancel_called:
|
||||
await ctx.result()
|
||||
|
||||
except ContextCancelled:
|
||||
# if the context was cancelled by client code
|
||||
# then we don't need to raise since user code
|
||||
# is expecting this.
|
||||
if not ctx._cancel_called:
|
||||
raise
|
||||
|
||||
except BaseException:
|
||||
# the context cancels itself on any deviation
|
||||
await ctx.cancel()
|
||||
if msg.get('error'):
|
||||
# raise the error message
|
||||
raise unpack_error(msg, self.channel)
|
||||
else:
|
||||
raise
|
||||
|
||||
finally:
|
||||
log.info(f'Context for {func.__name__} completed')
|
||||
# deliver context instance and .started() msg value in open
|
||||
# tuple.
|
||||
try:
|
||||
async with trio.open_nursery() as scope_nursery:
|
||||
ctx = Context(
|
||||
self.channel,
|
||||
cid,
|
||||
_portal=self,
|
||||
_recv_chan=recv_chan,
|
||||
_scope_nursery=scope_nursery,
|
||||
)
|
||||
recv_chan._ctx = ctx
|
||||
|
||||
yield ctx, first
|
||||
|
||||
log.info(f'Context for {func.__name__} completed')
|
||||
|
||||
if cancel_on_exit:
|
||||
await ctx.cancel()
|
||||
|
||||
else:
|
||||
if not ctx._cancel_called:
|
||||
await ctx.result()
|
||||
|
||||
await recv_chan.aclose()
|
||||
|
||||
# except TypeError:
|
||||
# # if fn_name == '_emsd_main':
|
||||
# import tractor
|
||||
# await tractor.breakpoint()
|
||||
|
||||
except ContextCancelled:
|
||||
if not ctx._cancel_called:
|
||||
raise
|
||||
|
||||
# if the context was cancelled by client code
|
||||
# then we don't need to raise since user code
|
||||
# is expecting this and the block should exit.
|
||||
else:
|
||||
log.debug(f'Context {ctx} cancelled gracefully')
|
||||
|
||||
except trio.Cancelled:
|
||||
# the context cancels itself on any deviation
|
||||
await ctx.cancel()
|
||||
raise
|
||||
|
||||
# finally:
|
||||
# log.info(f'Context for {func.__name__} completed')
|
||||
|
||||
# finally:
|
||||
# if recv_chan is not None:
|
||||
|
||||
finally:
|
||||
if recv_chan is not None:
|
||||
await recv_chan.aclose()
|
||||
|
||||
@dataclass
|
||||
class LocalPortal:
|
||||
|
@ -464,6 +479,7 @@ async def open_portal(
|
|||
was_connected = False
|
||||
|
||||
async with maybe_open_nursery(nursery, shield=shield) as nursery:
|
||||
|
||||
if not channel.connected():
|
||||
await channel.connect()
|
||||
was_connected = True
|
||||
|
@ -485,11 +501,12 @@ async def open_portal(
|
|||
portal = Portal(channel)
|
||||
try:
|
||||
yield portal
|
||||
|
||||
finally:
|
||||
await portal.aclose()
|
||||
|
||||
if was_connected:
|
||||
# cancel remote channel-msg loop
|
||||
# gracefully signal remote channel-msg loop
|
||||
await channel.send(None)
|
||||
|
||||
# cancel background msg loop task
|
||||
|
|
|
@ -7,7 +7,7 @@ from contextlib import contextmanager, asynccontextmanager
|
|||
from dataclasses import dataclass
|
||||
from typing import (
|
||||
Any, Iterator, Optional, Callable,
|
||||
AsyncGenerator,
|
||||
AsyncGenerator, Dict,
|
||||
)
|
||||
|
||||
import warnings
|
||||
|
@ -67,6 +67,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
|||
raise trio.EndOfChannel
|
||||
|
||||
try:
|
||||
# if self._ctx.chan.uid[0] == 'brokerd.ib':
|
||||
# breakpoint()
|
||||
|
||||
msg = await self._rx_chan.receive()
|
||||
return msg['yield']
|
||||
|
||||
|
@ -134,6 +137,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
|||
|
||||
raise # propagate
|
||||
|
||||
# except trio.Cancelled:
|
||||
# if not self._shielded:
|
||||
# # if shielded we don't propagate a cancelled
|
||||
# raise
|
||||
|
||||
# except trio.Cancelled:
|
||||
# # relay cancels to the remote task
|
||||
# await self.aclose()
|
||||
|
@ -165,7 +173,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
|||
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
|
||||
rx_chan = self._rx_chan
|
||||
|
||||
if rx_chan._closed:
|
||||
if rx_chan._closed: # or self._eoc:
|
||||
log.warning(f"{self} is already closed")
|
||||
|
||||
# this stream has already been closed so silently succeed as
|
||||
|
@ -212,7 +220,10 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
|||
# stop from the caller side
|
||||
await self._ctx.send_stop()
|
||||
|
||||
except trio.BrokenResourceError:
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError
|
||||
):
|
||||
# the underlying channel may already have been pulled
|
||||
# in which case our stop message is meaningless since
|
||||
# it can't traverse the transport.
|
||||
|
@ -254,18 +265,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
|
|||
# still need to consume msgs that are "in transit" from the far
|
||||
# end (eg. for ``Context.result()``).
|
||||
|
||||
# TODO: but make it broadcasting to consumers
|
||||
# def clone(self):
|
||||
# """Clone this receive channel allowing for multi-task
|
||||
# consumption from the same channel.
|
||||
|
||||
# """
|
||||
# return ReceiveStream(
|
||||
# self._cid,
|
||||
# self._rx_chan.clone(),
|
||||
# self._portal,
|
||||
# )
|
||||
|
||||
|
||||
class MsgStream(ReceiveMsgStream, trio.abc.Channel):
|
||||
"""
|
||||
|
@ -282,6 +281,17 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel):
|
|||
'''
|
||||
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
|
||||
|
||||
# TODO: but make it broadcasting to consumers
|
||||
def clone(self):
|
||||
"""Clone this receive channel allowing for multi-task
|
||||
consumption from the same channel.
|
||||
|
||||
"""
|
||||
return MsgStream(
|
||||
self._ctx,
|
||||
self._rx_chan.clone(),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Context:
|
||||
|
@ -308,7 +318,7 @@ class Context:
|
|||
_cancel_called: bool = False
|
||||
|
||||
# only set on the callee side
|
||||
_cancel_scope: Optional[trio.CancelScope] = None
|
||||
_scope_nursery: Optional[trio.Nursery] = None
|
||||
|
||||
async def send_yield(self, data: Any) -> None:
|
||||
|
||||
|
@ -323,6 +333,16 @@ class Context:
|
|||
async def send_stop(self) -> None:
|
||||
await self.chan.send({'stop': True, 'cid': self.cid})
|
||||
|
||||
def _error_from_remote_msg(
|
||||
self,
|
||||
msg: Dict[str, Any],
|
||||
|
||||
) -> None:
|
||||
async def raiser():
|
||||
raise unpack_error(msg, self.chan)
|
||||
|
||||
self._scope_nursery.start_soon(raiser)
|
||||
|
||||
async def cancel(self) -> None:
|
||||
'''Cancel this inter-actor-task context.
|
||||
|
||||
|
@ -361,13 +381,16 @@ class Context:
|
|||
f"{cid} for {self._portal.channel.uid}")
|
||||
else:
|
||||
# ensure callee side
|
||||
assert self._cancel_scope
|
||||
assert self._scope_nursery
|
||||
# TODO: should we have an explicit cancel message
|
||||
# or is relaying the local `trio.Cancelled` as an
|
||||
# {'error': trio.Cancelled, cid: "blah"} enough?
|
||||
# This probably gets into the discussion in
|
||||
# https://github.com/goodboy/tractor/issues/36
|
||||
self._cancel_scope.cancel()
|
||||
self._scope_nursery.cancel_scope.cancel()
|
||||
|
||||
if self._recv_chan:
|
||||
await self._recv_chan.aclose()
|
||||
|
||||
@asynccontextmanager
|
||||
async def open_stream(
|
||||
|
|
Loading…
Reference in New Issue