forked from goodboy/tractor
Change a bunch of log levels to cancel, including any `ContextCancelled` handling
parent
b6c7f423f0
commit
29a1171142
|
@ -132,7 +132,7 @@ def _trio_main(
|
||||||
else:
|
else:
|
||||||
trio.run(trio_main)
|
trio.run(trio_main)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
log.warning(f"Actor {actor.uid} received KBI")
|
log.cancel(f"Actor {actor.uid} received KBI")
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
log.info(f"Actor {actor.uid} terminated")
|
log.info(f"Actor {actor.uid} terminated")
|
||||||
|
|
|
@ -221,6 +221,8 @@ async def _invoke(
|
||||||
|
|
||||||
assert cs
|
assert cs
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
|
# if 'brokerd.kraken' in actor.uid:
|
||||||
|
# await _debug.breakpoint()
|
||||||
|
|
||||||
# TODO: pack in ``trio.Cancelled.__traceback__`` here
|
# TODO: pack in ``trio.Cancelled.__traceback__`` here
|
||||||
# so they can be unwrapped and displayed on the caller
|
# so they can be unwrapped and displayed on the caller
|
||||||
|
@ -228,11 +230,11 @@ async def _invoke(
|
||||||
|
|
||||||
fname = func.__name__
|
fname = func.__name__
|
||||||
if ctx._cancel_called:
|
if ctx._cancel_called:
|
||||||
msg = f'`{fname}()` cancelled itself'
|
msg = f'`{fname}()`@{actor.uid} cancelled itself'
|
||||||
|
|
||||||
elif cs.cancel_called:
|
elif cs.cancel_called:
|
||||||
msg = (
|
msg = (
|
||||||
f'`{fname}()` was remotely cancelled by its caller '
|
f'`{fname}()`@{actor.uid} was remotely cancelled by its caller '
|
||||||
f'{ctx.chan.uid}'
|
f'{ctx.chan.uid}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -910,9 +912,15 @@ class Actor:
|
||||||
'''
|
'''
|
||||||
cid = str(uuid.uuid4())
|
cid = str(uuid.uuid4())
|
||||||
assert chan.uid
|
assert chan.uid
|
||||||
ctx = self.get_context(chan, cid, msg_buffer_size=msg_buffer_size)
|
ctx = self.get_context(
|
||||||
|
chan,
|
||||||
|
cid,
|
||||||
|
msg_buffer_size=msg_buffer_size,
|
||||||
|
)
|
||||||
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
||||||
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
|
await chan.send(
|
||||||
|
{'cmd': (ns, func, kwargs, self.uid, cid)}
|
||||||
|
)
|
||||||
|
|
||||||
# Wait on first response msg and validate; this should be
|
# Wait on first response msg and validate; this should be
|
||||||
# immediate.
|
# immediate.
|
||||||
|
@ -922,7 +930,11 @@ class Actor:
|
||||||
if 'error' in first_msg:
|
if 'error' in first_msg:
|
||||||
raise unpack_error(first_msg, chan)
|
raise unpack_error(first_msg, chan)
|
||||||
|
|
||||||
elif functype not in ('asyncfunc', 'asyncgen', 'context'):
|
elif functype not in (
|
||||||
|
'asyncfunc',
|
||||||
|
'asyncgen',
|
||||||
|
'context',
|
||||||
|
):
|
||||||
raise ValueError(f"{first_msg} is an invalid response packet?")
|
raise ValueError(f"{first_msg} is an invalid response packet?")
|
||||||
|
|
||||||
ctx._remote_func_type = functype
|
ctx._remote_func_type = functype
|
||||||
|
@ -1130,7 +1142,7 @@ class Actor:
|
||||||
|
|
||||||
async def cancel_rpc_tasks(
|
async def cancel_rpc_tasks(
|
||||||
self,
|
self,
|
||||||
only_chan: Optional[Channel] = None,
|
only_chan: Channel | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Cancel all existing RPC responder tasks using the cancel scope
|
Cancel all existing RPC responder tasks using the cancel scope
|
||||||
|
@ -1331,13 +1343,15 @@ async def async_main(
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
log.runtime("Waiting on service nursery to complete")
|
log.runtime("Waiting on service nursery to complete")
|
||||||
log.runtime("Service nursery complete")
|
log.runtime(
|
||||||
log.runtime("Waiting on root nursery to complete")
|
"Service nursery complete\n"
|
||||||
|
"Waiting on root nursery to complete"
|
||||||
|
)
|
||||||
|
|
||||||
# Blocks here as expected until the root nursery is
|
# Blocks here as expected until the root nursery is
|
||||||
# killed (i.e. this actor is cancelled or signalled by the parent)
|
# killed (i.e. this actor is cancelled or signalled by the parent)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
log.info("Closing all actor lifetime contexts")
|
log.runtime("Closing all actor lifetime contexts")
|
||||||
actor.lifetime_stack.close()
|
actor.lifetime_stack.close()
|
||||||
|
|
||||||
if not registered_with_arbiter:
|
if not registered_with_arbiter:
|
||||||
|
@ -1358,6 +1372,13 @@ async def async_main(
|
||||||
await try_ship_error_to_parent(actor._parent_chan, err)
|
await try_ship_error_to_parent(actor._parent_chan, err)
|
||||||
|
|
||||||
# always!
|
# always!
|
||||||
|
match err:
|
||||||
|
case ContextCancelled():
|
||||||
|
log.cancel(
|
||||||
|
f'Actor: {actor.uid} was task-context-cancelled with,\n'
|
||||||
|
f'str(err)'
|
||||||
|
)
|
||||||
|
case _:
|
||||||
log.exception("Actor errored:")
|
log.exception("Actor errored:")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@ -1631,7 +1652,15 @@ async def process_messages(
|
||||||
else:
|
else:
|
||||||
# ship any "internal" exception (i.e. one from internal
|
# ship any "internal" exception (i.e. one from internal
|
||||||
# machinery not from an rpc task) to parent
|
# machinery not from an rpc task) to parent
|
||||||
|
match err:
|
||||||
|
case ContextCancelled():
|
||||||
|
log.cancel(
|
||||||
|
f'Actor: {actor.uid} was task-context-cancelled with,\n'
|
||||||
|
f'str(err)'
|
||||||
|
)
|
||||||
|
case _:
|
||||||
log.exception("Actor errored:")
|
log.exception("Actor errored:")
|
||||||
|
|
||||||
if actor._parent_chan:
|
if actor._parent_chan:
|
||||||
await try_ship_error_to_parent(actor._parent_chan, err)
|
await try_ship_error_to_parent(actor._parent_chan, err)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue