forked from goodboy/tractor
1
0
Fork 0

Always shield cancel the caller on cancel-causing-errors, add teardown logging

pre_bad_close
Tyler Goodlet 2021-06-28 00:18:28 -04:00
parent 4d0c5f04c6
commit 9fa451fdd3
2 changed files with 62 additions and 30 deletions

View File

@ -295,6 +295,7 @@ class Portal:
self, self,
async_gen_func: Callable, # typing: ignore async_gen_func: Callable, # typing: ignore
**kwargs, **kwargs,
) -> AsyncGenerator[ReceiveMsgStream, None]: ) -> AsyncGenerator[ReceiveMsgStream, None]:
if not inspect.isasyncgenfunction(async_gen_func): if not inspect.isasyncgenfunction(async_gen_func):
@ -347,7 +348,6 @@ class Portal:
self, self,
func: Callable, func: Callable,
cancel_on_exit: bool = False,
**kwargs, **kwargs,
) -> AsyncGenerator[Tuple[Context, Any], None]: ) -> AsyncGenerator[Tuple[Context, Any], None]:
@ -359,6 +359,7 @@ class Portal:
and synchronized final result collection. See ``tractor.Context``. and synchronized final result collection. See ``tractor.Context``.
''' '''
# conduct target func method structural checks # conduct target func method structural checks
if not inspect.iscoroutinefunction(func) and ( if not inspect.iscoroutinefunction(func) and (
getattr(func, '_tractor_contex_function', False) getattr(func, '_tractor_contex_function', False)
@ -390,6 +391,7 @@ class Portal:
else: else:
raise raise
_err = None
# deliver context instance and .started() msg value in open # deliver context instance and .started() msg value in open
# tuple. # tuple.
try: try:
@ -403,26 +405,20 @@ class Portal:
) )
recv_chan._ctx = ctx recv_chan._ctx = ctx
# await trio.lowlevel.checkpoint()
yield ctx, first yield ctx, first
log.info(f'Context for {func.__name__} completed') # if not ctx._cancel_called:
# await ctx.result()
if cancel_on_exit: # await recv_chan.aclose()
await ctx.cancel()
else: except ContextCancelled as err:
if not ctx._cancel_called: _err = err
await ctx.result()
await recv_chan.aclose()
# except TypeError:
# # if fn_name == '_emsd_main':
# import tractor
# await tractor.breakpoint()
except ContextCancelled:
if not ctx._cancel_called: if not ctx._cancel_called:
# context was cancelled at the far end but was
# not part of this end requesting that cancel
# so raise for the local task to respond and handle.
raise raise
# if the context was cancelled by client code # if the context was cancelled by client code
@ -431,16 +427,43 @@ class Portal:
else: else:
log.debug(f'Context {ctx} cancelled gracefully') log.debug(f'Context {ctx} cancelled gracefully')
except trio.Cancelled: except (
# the context cancels itself on any deviation trio.Cancelled,
await ctx.cancel() trio.MultiError,
Exception,
) as err:
_err = err
# the context cancels itself on any cancel
# causing error.
log.error(f'Context {ctx} sending cancel to far end')
with trio.CancelScope(shield=True):
await ctx.cancel()
raise raise
# finally: finally:
# log.info(f'Context for {func.__name__} completed') result = await ctx.result()
# finally: # though it should be impossible for any tasks
# if recv_chan is not None: # operating *in* this scope to have survived
# we tear down the runtime feeder chan last
# to avoid premature stream clobbers.
if recv_chan is not None:
await recv_chan.aclose()
if _err:
if ctx._cancel_called:
log.warning(
f'Context {fn_name} cancelled by caller with\n{_err}'
)
elif _err is not None:
log.warning(
f'Context {fn_name} cancelled by callee with\n{_err}'
)
else:
log.info(
f'Context {fn_name} returned '
f'value from callee `{self._result}`'
)
@dataclass @dataclass
@ -465,10 +488,12 @@ class LocalPortal:
@asynccontextmanager @asynccontextmanager
async def open_portal( async def open_portal(
channel: Channel, channel: Channel,
nursery: Optional[trio.Nursery] = None, nursery: Optional[trio.Nursery] = None,
start_msg_loop: bool = True, start_msg_loop: bool = True,
shield: bool = False, shield: bool = False,
) -> AsyncGenerator[Portal, None]: ) -> AsyncGenerator[Portal, None]:
"""Open a ``Portal`` through the provided ``channel``. """Open a ``Portal`` through the provided ``channel``.
@ -508,6 +533,7 @@ async def open_portal(
if was_connected: if was_connected:
# gracefully signal remote channel-msg loop # gracefully signal remote channel-msg loop
await channel.send(None) await channel.send(None)
# await channel.aclose()
# cancel background msg loop task # cancel background msg loop task
if msg_loop_cs: if msg_loop_cs:

View File

@ -67,8 +67,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
raise trio.EndOfChannel raise trio.EndOfChannel
try: try:
# if self._ctx.chan.uid[0] == 'brokerd.ib':
# breakpoint()
msg = await self._rx_chan.receive() msg = await self._rx_chan.receive()
return msg['yield'] return msg['yield']
@ -173,7 +171,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
rx_chan = self._rx_chan rx_chan = self._rx_chan
if rx_chan._closed: # or self._eoc: if rx_chan._closed: # or self._eoc:
log.warning(f"{self} is already closed") log.warning(f"{self} is already closed")
# this stream has already been closed so silently succeed as # this stream has already been closed so silently succeed as
@ -338,6 +336,12 @@ class Context:
msg: Dict[str, Any], msg: Dict[str, Any],
) -> None: ) -> None:
'''Unpack and raise a msg error into the local scope
nursery for this context.
Acts as a form of "relay" for a remote error raised
in the corresponding remote callee task.
'''
async def raiser(): async def raiser():
raise unpack_error(msg, self.chan) raise unpack_error(msg, self.chan)
@ -350,11 +354,13 @@ class Context:
Timeout quickly in an attempt to sidestep 2-generals... Timeout quickly in an attempt to sidestep 2-generals...
''' '''
log.warning(f'Cancelling caller side of context {self}') side = 'caller' if self._portal else 'callee'
log.warning(f'Cancelling {side} side of context to {self.chan}')
self._cancel_called = True self._cancel_called = True
if self._portal: # caller side: if side == 'caller':
if not self._portal: if not self._portal:
raise RuntimeError( raise RuntimeError(
"No portal found, this is likely a callee side context" "No portal found, this is likely a callee side context"
@ -382,8 +388,8 @@ class Context:
"May have failed to cancel remote task " "May have failed to cancel remote task "
f"{cid} for {self._portal.channel.uid}") f"{cid} for {self._portal.channel.uid}")
else: else:
# ensure callee side # callee side remote task
assert self._scope_nursery
# TODO: should we have an explicit cancel message # TODO: should we have an explicit cancel message
# or is relaying the local `trio.Cancelled` as an # or is relaying the local `trio.Cancelled` as an
# {'error': trio.Cancelled, cid: "blah"} enough? # {'error': trio.Cancelled, cid: "blah"} enough?