Compare commits
No commits in common. "250275d98d52e8076d2797b8c3fabf5f8098d124" and "3f159235374b960d120c55a1be844cd1434ca48b" have entirely different histories.
250275d98d
...
3f15923537
|
@ -76,18 +76,8 @@ async def get_registry(
|
||||||
yield regstr_ptl
|
yield regstr_ptl
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: deprecate and remove _arbiter form
|
||||||
# TODO: deprecate and this remove _arbiter form!
|
get_arbiter = get_registry
|
||||||
@acm
|
|
||||||
async def get_arbiter(*args, **kwargs):
|
|
||||||
warnings.warn(
|
|
||||||
'`tractor.get_arbiter()` is now deprecated!\n'
|
|
||||||
'Use `.get_registry()` instead!',
|
|
||||||
DeprecationWarning,
|
|
||||||
stacklevel=2,
|
|
||||||
)
|
|
||||||
async with get_registry(*args, **kwargs) as to_yield:
|
|
||||||
yield to_yield
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
|
|
@ -15,10 +15,7 @@
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
The fundamental core machinery implementing every "actor" including
|
Actor primitives and helpers
|
||||||
the process-local (python-interpreter global) `Actor` state-type
|
|
||||||
primitive(s), RPC-in-task scheduling, and IPC connectivity and
|
|
||||||
low-level transport msg handling.
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
@ -44,14 +41,8 @@ import warnings
|
||||||
|
|
||||||
from async_generator import aclosing
|
from async_generator import aclosing
|
||||||
from exceptiongroup import BaseExceptionGroup
|
from exceptiongroup import BaseExceptionGroup
|
||||||
import trio
|
import trio # type: ignore
|
||||||
from trio import (
|
from trio_typing import TaskStatus
|
||||||
CancelScope,
|
|
||||||
)
|
|
||||||
from trio_typing import (
|
|
||||||
Nursery,
|
|
||||||
TaskStatus,
|
|
||||||
)
|
|
||||||
|
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
from ._context import (
|
from ._context import (
|
||||||
|
@ -99,9 +90,10 @@ async def _invoke(
|
||||||
connected IPC channel.
|
connected IPC channel.
|
||||||
|
|
||||||
This is the core "RPC" `trio.Task` scheduling machinery used to start every
|
This is the core "RPC" `trio.Task` scheduling machinery used to start every
|
||||||
remotely invoked function, normally in `Actor._service_n: Nursery`.
|
remotely invoked function, normally in `Actor._service_n: trio.Nursery`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
__tracebackhide__: bool = True
|
||||||
treat_as_gen: bool = False
|
treat_as_gen: bool = False
|
||||||
failed_resp: bool = False
|
failed_resp: bool = False
|
||||||
|
|
||||||
|
@ -118,9 +110,9 @@ async def _invoke(
|
||||||
# possibly a traceback (not sure what typing is for this..)
|
# possibly a traceback (not sure what typing is for this..)
|
||||||
tb = None
|
tb = None
|
||||||
|
|
||||||
cancel_scope = CancelScope()
|
cancel_scope = trio.CancelScope()
|
||||||
# activated cancel scope ref
|
# activated cancel scope ref
|
||||||
cs: CancelScope | None = None
|
cs: trio.CancelScope | None = None
|
||||||
|
|
||||||
ctx = actor.get_context(
|
ctx = actor.get_context(
|
||||||
chan,
|
chan,
|
||||||
|
@ -132,7 +124,6 @@ async def _invoke(
|
||||||
)
|
)
|
||||||
context: bool = False
|
context: bool = False
|
||||||
|
|
||||||
# TODO: deprecate this style..
|
|
||||||
if getattr(func, '_tractor_stream_function', False):
|
if getattr(func, '_tractor_stream_function', False):
|
||||||
# handle decorated ``@tractor.stream`` async functions
|
# handle decorated ``@tractor.stream`` async functions
|
||||||
sig = inspect.signature(func)
|
sig = inspect.signature(func)
|
||||||
|
@ -174,7 +165,6 @@ async def _invoke(
|
||||||
except TypeError:
|
except TypeError:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# TODO: can we unify this with the `context=True` impl below?
|
|
||||||
if inspect.isasyncgen(coro):
|
if inspect.isasyncgen(coro):
|
||||||
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
||||||
# XXX: massive gotcha! If the containing scope
|
# XXX: massive gotcha! If the containing scope
|
||||||
|
@ -205,7 +195,6 @@ async def _invoke(
|
||||||
await chan.send({'stop': True, 'cid': cid})
|
await chan.send({'stop': True, 'cid': cid})
|
||||||
|
|
||||||
# one way @stream func that gets treated like an async gen
|
# one way @stream func that gets treated like an async gen
|
||||||
# TODO: can we unify this with the `context=True` impl below?
|
|
||||||
elif treat_as_gen:
|
elif treat_as_gen:
|
||||||
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
||||||
# XXX: the async-func may spawn further tasks which push
|
# XXX: the async-func may spawn further tasks which push
|
||||||
|
@ -222,20 +211,8 @@ async def _invoke(
|
||||||
# far end async gen to tear down
|
# far end async gen to tear down
|
||||||
await chan.send({'stop': True, 'cid': cid})
|
await chan.send({'stop': True, 'cid': cid})
|
||||||
|
|
||||||
# our most general case: a remote SC-transitive,
|
|
||||||
# IPC-linked, cross-actor-task "context"
|
|
||||||
# ------ - ------
|
|
||||||
# TODO: every other "func type" should be implemented from
|
# TODO: every other "func type" should be implemented from
|
||||||
# a special case of this impl eventually!
|
# a special case of a context eventually!
|
||||||
# -[ ] streaming funcs should instead of being async-for
|
|
||||||
# handled directly here wrapped in
|
|
||||||
# a async-with-open_stream() closure that does the
|
|
||||||
# normal thing you'd expect a far end streaming context
|
|
||||||
# to (if written by the app-dev).
|
|
||||||
# -[ ] one off async funcs can literally just be called
|
|
||||||
# here and awaited directly, possibly just with a small
|
|
||||||
# wrapper that calls `Context.started()` and then does
|
|
||||||
# the `await coro()`?
|
|
||||||
elif context:
|
elif context:
|
||||||
# context func with support for bi-dir streaming
|
# context func with support for bi-dir streaming
|
||||||
await chan.send({'functype': 'context', 'cid': cid})
|
await chan.send({'functype': 'context', 'cid': cid})
|
||||||
|
@ -296,12 +273,11 @@ async def _invoke(
|
||||||
ctx._maybe_raise_remote_err(re)
|
ctx._maybe_raise_remote_err(re)
|
||||||
|
|
||||||
fname: str = func.__name__
|
fname: str = func.__name__
|
||||||
cs: CancelScope = ctx._scope
|
cs: trio.CancelScope = ctx._scope
|
||||||
if cs.cancel_called:
|
if cs.cancel_called:
|
||||||
our_uid: tuple = actor.uid
|
|
||||||
canceller: tuple = ctx.canceller
|
canceller: tuple = ctx.canceller
|
||||||
msg: str = (
|
msg: str = (
|
||||||
f'`{fname}()`@{our_uid} cancelled by '
|
f'`{fname}()`@{actor.uid} cancelled by '
|
||||||
)
|
)
|
||||||
|
|
||||||
# NOTE / TODO: if we end up having
|
# NOTE / TODO: if we end up having
|
||||||
|
@ -310,8 +286,6 @@ async def _invoke(
|
||||||
# need to change this logic branch since it
|
# need to change this logic branch since it
|
||||||
# will always enter..
|
# will always enter..
|
||||||
if ctx._cancel_called:
|
if ctx._cancel_called:
|
||||||
# TODO: test for this!!!!!
|
|
||||||
canceller: tuple = our_uid
|
|
||||||
msg += 'itself '
|
msg += 'itself '
|
||||||
|
|
||||||
# if the channel which spawned the ctx is the
|
# if the channel which spawned the ctx is the
|
||||||
|
@ -344,76 +318,40 @@ async def _invoke(
|
||||||
canceller=canceller,
|
canceller=canceller,
|
||||||
)
|
)
|
||||||
|
|
||||||
# regular async function/method
|
# regular async function
|
||||||
# XXX: possibly just a scheduled `Actor._cancel_task()`
|
|
||||||
# from a remote request to cancel some `Context`.
|
|
||||||
# ------ - ------
|
|
||||||
# TODO: ideally we unify this with the above `context=True`
|
|
||||||
# block such that for any remote invocation ftype, we
|
|
||||||
# always invoke the far end RPC task scheduling the same
|
|
||||||
# way: using the linked IPC context machinery.
|
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
await chan.send({
|
await chan.send({
|
||||||
'functype': 'asyncfunc',
|
'functype': 'asyncfunc',
|
||||||
'cid': cid
|
'cid': cid
|
||||||
})
|
})
|
||||||
except (
|
except trio.BrokenResourceError:
|
||||||
trio.ClosedResourceError,
|
|
||||||
trio.BrokenResourceError,
|
|
||||||
BrokenPipeError,
|
|
||||||
) as ipc_err:
|
|
||||||
failed_resp = True
|
failed_resp = True
|
||||||
if is_rpc:
|
if is_rpc:
|
||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
# TODO: should this be an `.exception()` call?
|
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Failed to respond to non-rpc request: {func}\n'
|
f'Failed to respond to non-rpc request: {func}'
|
||||||
f'{ipc_err}'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
with cancel_scope as cs:
|
with cancel_scope as cs:
|
||||||
ctx._scope: CancelScope = cs
|
ctx._scope = cs
|
||||||
task_status.started(ctx)
|
task_status.started(ctx)
|
||||||
result = await coro
|
result = await coro
|
||||||
fname: str = func.__name__
|
fname: str = func.__name__
|
||||||
log.runtime(f'{fname}() result: {result}')
|
log.runtime(f'{fname}() result: {result}')
|
||||||
|
if not failed_resp:
|
||||||
# NOTE: only send result if we know IPC isn't down
|
# only send result if we know IPC isn't down
|
||||||
if (
|
await chan.send(
|
||||||
not failed_resp
|
{'return': result,
|
||||||
and chan.connected()
|
'cid': cid}
|
||||||
):
|
)
|
||||||
try:
|
|
||||||
await chan.send(
|
|
||||||
{'return': result,
|
|
||||||
'cid': cid}
|
|
||||||
)
|
|
||||||
except (
|
|
||||||
BrokenPipeError,
|
|
||||||
trio.BrokenResourceError,
|
|
||||||
):
|
|
||||||
log.warning(
|
|
||||||
'Failed to return result:\n'
|
|
||||||
f'{func}@{actor.uid}\n'
|
|
||||||
f'remote chan: {chan.uid}'
|
|
||||||
)
|
|
||||||
|
|
||||||
except (
|
except (
|
||||||
Exception,
|
Exception,
|
||||||
BaseExceptionGroup,
|
BaseExceptionGroup,
|
||||||
) as err:
|
) as err:
|
||||||
|
|
||||||
# always hide this frame from debug REPL if the crash
|
|
||||||
# originated from an rpc task and we DID NOT fail
|
|
||||||
# due to an IPC transport error!
|
|
||||||
if (
|
|
||||||
is_rpc
|
|
||||||
and chan.connected()
|
|
||||||
):
|
|
||||||
__tracebackhide__: bool = True
|
|
||||||
|
|
||||||
if not is_multi_cancelled(err):
|
if not is_multi_cancelled(err):
|
||||||
|
|
||||||
# TODO: maybe we'll want different "levels" of debugging
|
# TODO: maybe we'll want different "levels" of debugging
|
||||||
|
@ -447,31 +385,24 @@ async def _invoke(
|
||||||
log.exception("Actor crashed:")
|
log.exception("Actor crashed:")
|
||||||
|
|
||||||
# always ship errors back to caller
|
# always ship errors back to caller
|
||||||
err_msg: dict[str, dict] = pack_error(
|
err_msg = pack_error(err, tb=tb)
|
||||||
err,
|
|
||||||
tb=tb,
|
|
||||||
)
|
|
||||||
err_msg['cid'] = cid
|
err_msg['cid'] = cid
|
||||||
|
|
||||||
if is_rpc:
|
try:
|
||||||
try:
|
await chan.send(err_msg)
|
||||||
await chan.send(err_msg)
|
|
||||||
|
|
||||||
# TODO: tests for this scenario:
|
# TODO: tests for this scenario:
|
||||||
# - RPC caller closes connection before getting a response
|
# - RPC caller closes connection before getting a response
|
||||||
# should **not** crash this actor..
|
# should **not** crash this actor..
|
||||||
except (
|
except (
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
BrokenPipeError,
|
BrokenPipeError,
|
||||||
) as ipc_err:
|
):
|
||||||
|
# if we can't propagate the error that's a big boo boo
|
||||||
# if we can't propagate the error that's a big boo boo
|
log.exception(
|
||||||
log.exception(
|
f"Failed to ship error to caller @ {chan.uid} !?"
|
||||||
f"Failed to ship error to caller @ {chan.uid} !?\n"
|
)
|
||||||
f'{ipc_err}'
|
|
||||||
|
|
||||||
)
|
|
||||||
|
|
||||||
# error is probably from above coro running code *not from the
|
# error is probably from above coro running code *not from the
|
||||||
# underlyingn rpc invocation* since a scope was never allocated
|
# underlyingn rpc invocation* since a scope was never allocated
|
||||||
|
@ -497,11 +428,7 @@ async def _invoke(
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Task {func} likely errored or cancelled before start")
|
f"Task {func} likely errored or cancelled before start")
|
||||||
else:
|
else:
|
||||||
log.cancel(
|
log.cancel(f'{func.__name__}({kwargs}) failed?')
|
||||||
'Failed to de-alloc internal task!?\n'
|
|
||||||
f'cid: {cid}\n'
|
|
||||||
f'{func.__name__}({kwargs})'
|
|
||||||
)
|
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if not actor._rpc_tasks:
|
if not actor._rpc_tasks:
|
||||||
|
@ -518,7 +445,7 @@ async def try_ship_error_to_parent(
|
||||||
err: Exception | BaseExceptionGroup,
|
err: Exception | BaseExceptionGroup,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
with CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
try:
|
try:
|
||||||
# internal error so ship to parent without cid
|
# internal error so ship to parent without cid
|
||||||
await channel.send(pack_error(err))
|
await channel.send(pack_error(err))
|
||||||
|
@ -570,13 +497,13 @@ class Actor:
|
||||||
msg_buffer_size: int = 2**6
|
msg_buffer_size: int = 2**6
|
||||||
|
|
||||||
# nursery placeholders filled in by `async_main()` after fork
|
# nursery placeholders filled in by `async_main()` after fork
|
||||||
_root_n: Nursery | None = None
|
_root_n: trio.Nursery | None = None
|
||||||
_service_n: Nursery | None = None
|
_service_n: trio.Nursery | None = None
|
||||||
_server_n: Nursery | None = None
|
_server_n: trio.Nursery | None = None
|
||||||
|
|
||||||
# Information about `__main__` from parent
|
# Information about `__main__` from parent
|
||||||
_parent_main_data: dict[str, str]
|
_parent_main_data: dict[str, str]
|
||||||
_parent_chan_cs: CancelScope | None = None
|
_parent_chan_cs: trio.CancelScope | None = None
|
||||||
|
|
||||||
# syncs for setup/teardown sequences
|
# syncs for setup/teardown sequences
|
||||||
_server_down: trio.Event | None = None
|
_server_down: trio.Event | None = None
|
||||||
|
@ -1169,12 +1096,12 @@ class Actor:
|
||||||
|
|
||||||
async def _serve_forever(
|
async def _serve_forever(
|
||||||
self,
|
self,
|
||||||
handler_nursery: Nursery,
|
handler_nursery: trio.Nursery,
|
||||||
*,
|
*,
|
||||||
# (host, port) to bind for channel server
|
# (host, port) to bind for channel server
|
||||||
listen_sockaddrs: list[tuple[str, int]] | None = None,
|
listen_sockaddrs: list[tuple[str, int]] | None = None,
|
||||||
|
|
||||||
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Start the channel server, begin listening for new connections.
|
Start the channel server, begin listening for new connections.
|
||||||
|
@ -1261,7 +1188,7 @@ class Actor:
|
||||||
self._cancel_called = True
|
self._cancel_called = True
|
||||||
|
|
||||||
# cancel all ongoing rpc tasks
|
# cancel all ongoing rpc tasks
|
||||||
with CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
|
|
||||||
# kill any debugger request task to avoid deadlock
|
# kill any debugger request task to avoid deadlock
|
||||||
# with the root actor in this tree
|
# with the root actor in this tree
|
||||||
|
@ -1321,7 +1248,7 @@ class Actor:
|
||||||
# this ctx based lookup ensures the requested task to
|
# this ctx based lookup ensures the requested task to
|
||||||
# be cancelled was indeed spawned by a request from this channel
|
# be cancelled was indeed spawned by a request from this channel
|
||||||
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
|
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
|
||||||
scope: CancelScope = ctx._scope
|
scope: trio.CancelScope = ctx._scope
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.cancel(f"{cid} has already completed/terminated?")
|
log.cancel(f"{cid} has already completed/terminated?")
|
||||||
return True
|
return True
|
||||||
|
@ -1686,7 +1613,7 @@ async def async_main(
|
||||||
# block it might be actually possible to debug THIS
|
# block it might be actually possible to debug THIS
|
||||||
# machinery in the same way as user task code?
|
# machinery in the same way as user task code?
|
||||||
# if actor.name == 'brokerd.ib':
|
# if actor.name == 'brokerd.ib':
|
||||||
# with CancelScope(shield=True):
|
# with trio.CancelScope(shield=True):
|
||||||
# await _debug.breakpoint()
|
# await _debug.breakpoint()
|
||||||
|
|
||||||
actor.lifetime_stack.close()
|
actor.lifetime_stack.close()
|
||||||
|
@ -1728,7 +1655,7 @@ async def async_main(
|
||||||
):
|
):
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"Waiting for remaining peers {actor._peers} to clear")
|
f"Waiting for remaining peers {actor._peers} to clear")
|
||||||
with CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await actor._no_more_peers.wait()
|
await actor._no_more_peers.wait()
|
||||||
log.runtime("All peer channels are complete")
|
log.runtime("All peer channels are complete")
|
||||||
|
|
||||||
|
@ -1739,7 +1666,7 @@ async def process_messages(
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
shield: bool = False,
|
shield: bool = False,
|
||||||
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
'''
|
'''
|
||||||
|
@ -1757,7 +1684,7 @@ async def process_messages(
|
||||||
|
|
||||||
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
|
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
|
||||||
try:
|
try:
|
||||||
with CancelScope(shield=shield) as loop_cs:
|
with trio.CancelScope(shield=shield) as loop_cs:
|
||||||
# this internal scope allows for keeping this message
|
# this internal scope allows for keeping this message
|
||||||
# loop running despite the current task having been
|
# loop running despite the current task having been
|
||||||
# cancelled (eg. `open_portal()` may call this method from
|
# cancelled (eg. `open_portal()` may call this method from
|
||||||
|
@ -1819,18 +1746,18 @@ async def process_messages(
|
||||||
|
|
||||||
if ns == 'self':
|
if ns == 'self':
|
||||||
if funcname == 'cancel':
|
if funcname == 'cancel':
|
||||||
func: Callable = actor.cancel
|
func = actor.cancel
|
||||||
kwargs['requesting_uid'] = chan.uid
|
kwargs['requesting_uid'] = chan.uid
|
||||||
|
|
||||||
# don't start entire actor runtime cancellation
|
# don't start entire actor runtime cancellation
|
||||||
# if this actor is currently in debug mode!
|
# if this actor is currently in debug mode!
|
||||||
pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete
|
pdb_complete = _debug.Lock.local_pdb_complete
|
||||||
if pdb_complete:
|
if pdb_complete:
|
||||||
await pdb_complete.wait()
|
await pdb_complete.wait()
|
||||||
|
|
||||||
# we immediately start the runtime machinery
|
# we immediately start the runtime machinery
|
||||||
# shutdown
|
# shutdown
|
||||||
with CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
# actor.cancel() was called so kill this
|
# actor.cancel() was called so kill this
|
||||||
# msg loop and break out into
|
# msg loop and break out into
|
||||||
# ``async_main()``
|
# ``async_main()``
|
||||||
|
@ -1858,7 +1785,7 @@ async def process_messages(
|
||||||
|
|
||||||
# we immediately start the runtime machinery
|
# we immediately start the runtime machinery
|
||||||
# shutdown
|
# shutdown
|
||||||
# with CancelScope(shield=True):
|
# with trio.CancelScope(shield=True):
|
||||||
kwargs['chan'] = chan
|
kwargs['chan'] = chan
|
||||||
target_cid = kwargs['cid']
|
target_cid = kwargs['cid']
|
||||||
kwargs['requesting_uid'] = chan.uid
|
kwargs['requesting_uid'] = chan.uid
|
||||||
|
@ -1883,7 +1810,7 @@ async def process_messages(
|
||||||
else:
|
else:
|
||||||
# normally registry methods, eg.
|
# normally registry methods, eg.
|
||||||
# ``.register_actor()`` etc.
|
# ``.register_actor()`` etc.
|
||||||
func: Callable = getattr(actor, funcname)
|
func = getattr(actor, funcname)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# complain to client about restricted modules
|
# complain to client about restricted modules
|
||||||
|
@ -1973,10 +1900,9 @@ async def process_messages(
|
||||||
Exception,
|
Exception,
|
||||||
BaseExceptionGroup,
|
BaseExceptionGroup,
|
||||||
) as err:
|
) as err:
|
||||||
|
|
||||||
if nursery_cancelled_before_task:
|
if nursery_cancelled_before_task:
|
||||||
sn: Nursery = actor._service_n
|
sn = actor._service_n
|
||||||
assert sn and sn.cancel_scope.cancel_called # sanity
|
assert sn and sn.cancel_scope.cancel_called
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Service nursery cancelled before it handled {funcname}'
|
f'Service nursery cancelled before it handled {funcname}'
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue