Only cancel/get-result from a ctx if transport is up
There's no point in sending a cancel message to the remote linked task and especially no reason to block waiting on a result from that task if the transport layer is detected to be disconnected. We expect that the transport shouldn't go down at the layer of the message loop (reconnection logic should be handled in the transport layer itself) so if we detect the channel is not connected we don't bother requesting cancels nor waiting on a final result message. Why? - if the connection goes down in error the caller side won't have a way to know "how long" it should block to wait for a cancel ack or result and causes a potential hang that may require an additional ctrl-c from the user especially if using the debugger or if the traceback is not seen on console. - obviously there's no point in waiting for messages when there's no transport to deliver them XD Further, add some more detailed cancel logging detailing the task and actor ids.310_plus
parent
e6a520c944
commit
83367caf42
|
@ -442,6 +442,10 @@ class Portal:
|
||||||
_err: Optional[BaseException] = None
|
_err: Optional[BaseException] = None
|
||||||
ctx._portal = self
|
ctx._portal = self
|
||||||
|
|
||||||
|
uid = self.channel.uid
|
||||||
|
cid = ctx.cid
|
||||||
|
etype: Optional[Exception] = None
|
||||||
|
|
||||||
# deliver context instance and .started() msg value in open tuple.
|
# deliver context instance and .started() msg value in open tuple.
|
||||||
try:
|
try:
|
||||||
async with trio.open_nursery() as scope_nursery:
|
async with trio.open_nursery() as scope_nursery:
|
||||||
|
@ -477,13 +481,24 @@ class Portal:
|
||||||
# KeyboardInterrupt,
|
# KeyboardInterrupt,
|
||||||
|
|
||||||
) as err:
|
) as err:
|
||||||
_err = err
|
etype = type(err)
|
||||||
# the context cancels itself on any cancel
|
# the context cancels itself on any cancel
|
||||||
# causing error.
|
# causing error.
|
||||||
log.cancel(
|
|
||||||
f'Context to {self.channel.uid} sending cancel request..')
|
|
||||||
|
|
||||||
|
if ctx.chan.connected():
|
||||||
|
log.cancel(
|
||||||
|
'Context cancelled for task, sending cancel request..\n'
|
||||||
|
f'task:{cid}\n'
|
||||||
|
f'actor:{uid}'
|
||||||
|
)
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
|
else:
|
||||||
|
log.warning(
|
||||||
|
'IPC connection for context is broken?\n'
|
||||||
|
f'task:{cid}\n'
|
||||||
|
f'actor:{uid}'
|
||||||
|
)
|
||||||
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
@ -492,6 +507,12 @@ class Portal:
|
||||||
# sure we get the error the underlying feeder mem chan.
|
# sure we get the error the underlying feeder mem chan.
|
||||||
# if it's not raised here it *should* be raised from the
|
# if it's not raised here it *should* be raised from the
|
||||||
# msg loop nursery right?
|
# msg loop nursery right?
|
||||||
|
if ctx.chan.connected():
|
||||||
|
log.info(
|
||||||
|
'Waiting on final context-task result for\n'
|
||||||
|
f'task:{cid}\n'
|
||||||
|
f'actor:{uid}'
|
||||||
|
)
|
||||||
result = await ctx.result()
|
result = await ctx.result()
|
||||||
|
|
||||||
# though it should be impossible for any tasks
|
# though it should be impossible for any tasks
|
||||||
|
@ -502,14 +523,17 @@ class Portal:
|
||||||
# should we encapsulate this in the context api?
|
# should we encapsulate this in the context api?
|
||||||
await ctx._recv_chan.aclose()
|
await ctx._recv_chan.aclose()
|
||||||
|
|
||||||
if _err:
|
if etype:
|
||||||
if ctx._cancel_called:
|
if ctx._cancel_called:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Context {fn_name} cancelled by caller with\n{_err}'
|
f'Context {fn_name} cancelled by caller with\n{etype}'
|
||||||
)
|
)
|
||||||
elif _err is not None:
|
elif _err is not None:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Context {fn_name} cancelled by callee with\n{_err}'
|
f'Context for task cancelled by callee with {etype}\n'
|
||||||
|
f'target: `{fn_name}`\n'
|
||||||
|
f'task:{cid}\n'
|
||||||
|
f'actor:{uid}'
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
log.runtime(
|
log.runtime(
|
||||||
|
|
|
@ -604,7 +604,8 @@ class Context:
|
||||||
self._portal._streams.remove(rchan)
|
self._portal._streams.remove(rchan)
|
||||||
|
|
||||||
async def result(self) -> Any:
|
async def result(self) -> Any:
|
||||||
'''From a caller side, wait for and return the final result from
|
'''
|
||||||
|
From a caller side, wait for and return the final result from
|
||||||
the callee side task.
|
the callee side task.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
Loading…
Reference in New Issue