forked from goodboy/tractor
Only cancel/get-result from a ctx if transport is up
There's no point in sending a cancel message to the remote linked task and especially no reason to block waiting on a result from that task if the transport layer is detected to be disconnected. We expect that the transport shouldn't go down at the layer of the message loop (reconnection logic should be handled in the transport layer itself) so if we detect the channel is not connected we don't bother requesting cancels nor waiting on a final result message. Why? - if the connection goes down in error the caller side won't have a way to know "how long" it should block to wait for a cancel ack or result and causes a potential hang that may require an additional ctrl-c from the user especially if using the debugger or if the traceback is not seen on console. - obviously there's no point in waiting for messages when there's no transport to deliver them XD Further, add some more detailed cancel logging detailing the task and actor ids.sigint2
parent
52ad597c20
commit
a80591b914
|
@ -442,6 +442,10 @@ class Portal:
|
|||
_err: Optional[BaseException] = None
|
||||
ctx._portal = self
|
||||
|
||||
uid = self.channel.uid
|
||||
cid = ctx.cid
|
||||
etype: Optional[Exception] = None
|
||||
|
||||
# deliver context instance and .started() msg value in open tuple.
|
||||
try:
|
||||
async with trio.open_nursery() as scope_nursery:
|
||||
|
@ -477,13 +481,24 @@ class Portal:
|
|||
# KeyboardInterrupt,
|
||||
|
||||
) as err:
|
||||
_err = err
|
||||
etype = type(err)
|
||||
# the context cancels itself on any cancel
|
||||
# causing error.
|
||||
log.cancel(
|
||||
f'Context to {self.channel.uid} sending cancel request..')
|
||||
|
||||
if ctx.chan.connected():
|
||||
log.cancel(
|
||||
'Context cancelled for task, sending cancel request..\n'
|
||||
f'task:{cid}\n'
|
||||
f'actor:{uid}'
|
||||
)
|
||||
await ctx.cancel()
|
||||
else:
|
||||
log.warning(
|
||||
'IPC connection for context is broken?\n'
|
||||
f'task:{cid}\n'
|
||||
f'actor:{uid}'
|
||||
)
|
||||
|
||||
raise
|
||||
|
||||
finally:
|
||||
|
@ -492,6 +507,12 @@ class Portal:
|
|||
# sure we get the error the underlying feeder mem chan.
|
||||
# if it's not raised here it *should* be raised from the
|
||||
# msg loop nursery right?
|
||||
if ctx.chan.connected():
|
||||
log.info(
|
||||
'Waiting on final context-task result for\n'
|
||||
f'task:{cid}\n'
|
||||
f'actor:{uid}'
|
||||
)
|
||||
result = await ctx.result()
|
||||
|
||||
# though it should be impossible for any tasks
|
||||
|
@ -502,14 +523,17 @@ class Portal:
|
|||
# should we encapsulate this in the context api?
|
||||
await ctx._recv_chan.aclose()
|
||||
|
||||
if _err:
|
||||
if etype:
|
||||
if ctx._cancel_called:
|
||||
log.cancel(
|
||||
f'Context {fn_name} cancelled by caller with\n{_err}'
|
||||
f'Context {fn_name} cancelled by caller with\n{etype}'
|
||||
)
|
||||
elif _err is not None:
|
||||
log.cancel(
|
||||
f'Context {fn_name} cancelled by callee with\n{_err}'
|
||||
f'Context for task cancelled by callee with {etype}\n'
|
||||
f'target: `{fn_name}`\n'
|
||||
f'task:{cid}\n'
|
||||
f'actor:{uid}'
|
||||
)
|
||||
else:
|
||||
log.runtime(
|
||||
|
|
|
@ -604,7 +604,8 @@ class Context:
|
|||
self._portal._streams.remove(rchan)
|
||||
|
||||
async def result(self) -> Any:
|
||||
'''From a caller side, wait for and return the final result from
|
||||
'''
|
||||
From a caller side, wait for and return the final result from
|
||||
the callee side task.
|
||||
|
||||
'''
|
||||
|
|
Loading…
Reference in New Issue