forked from goodboy/tractor
1
0
Fork 0

First pass, swap `MultiError` for `BaseExceptionGroup`

egs_with_ctx_res_consumption
Tyler Goodlet 2022-10-09 13:12:50 -04:00
parent 53d5b59b7b
commit cd79fd79b9
8 changed files with 60 additions and 40 deletions

View File

@ -18,7 +18,7 @@
tractor: structured concurrent "actors". tractor: structured concurrent "actors".
""" """
from trio import MultiError from exceptiongroup import BaseExceptionGroup
from ._clustering import open_actor_cluster from ._clustering import open_actor_cluster
from ._ipc import Channel from ._ipc import Channel
@ -62,7 +62,7 @@ __all__ = [
'ContextCancelled', 'ContextCancelled',
'ModuleNotExposed', 'ModuleNotExposed',
'MsgStream', 'MsgStream',
'MultiError', 'BaseExceptionGroup',
'Portal', 'Portal',
'ReceiveMsgStream', 'ReceiveMsgStream',
'RemoteActorError', 'RemoteActorError',

View File

@ -27,6 +27,7 @@ import importlib
import builtins import builtins
import traceback import traceback
import exceptiongroup as eg
import trio import trio
@ -52,9 +53,6 @@ class RemoteActorError(Exception):
self.type = suberror_type self.type = suberror_type
self.msgdata = msgdata self.msgdata = msgdata
# TODO: a trio.MultiError.catch like context manager
# for catching underlying remote errors of a particular type
class InternalActorError(RemoteActorError): class InternalActorError(RemoteActorError):
"""Remote internal ``tractor`` error indicating """Remote internal ``tractor`` error indicating
@ -139,7 +137,12 @@ def unpack_error(
suberror_type = trio.Cancelled suberror_type = trio.Cancelled
else: # try to lookup a suitable local error type else: # try to lookup a suitable local error type
for ns in [builtins, _this_mod, trio]: for ns in [
builtins,
_this_mod,
eg,
trio,
]:
try: try:
suberror_type = getattr(ns, type_name) suberror_type = getattr(ns, type_name)
break break
@ -158,12 +161,15 @@ def unpack_error(
def is_multi_cancelled(exc: BaseException) -> bool: def is_multi_cancelled(exc: BaseException) -> bool:
"""Predicate to determine if a ``trio.MultiError`` contains only '''
``trio.Cancelled`` sub-exceptions (and is likely the result of Predicate to determine if a possible ``eg.BaseExceptionGroup`` contains
only ``trio.Cancelled`` sub-exceptions (and is likely the result of
cancelling a collection of subtasks. cancelling a collection of subtasks.
""" '''
return not trio.MultiError.filter( if isinstance(exc, eg.BaseExceptionGroup):
lambda exc: exc if not isinstance(exc, trio.Cancelled) else None, return exc.subgroup(
exc, lambda exc: isinstance(exc, trio.Cancelled)
) ) is not None
return False

View File

@ -460,7 +460,6 @@ class Portal:
# sure it's worth being pedantic: # sure it's worth being pedantic:
# Exception, # Exception,
# trio.Cancelled, # trio.Cancelled,
# trio.MultiError,
# KeyboardInterrupt, # KeyboardInterrupt,
) as err: ) as err:

View File

@ -29,6 +29,8 @@ from typing import (
import typing import typing
import warnings import warnings
from exceptiongroup import BaseExceptionGroup
import trio import trio
from ._runtime import Actor, Arbiter, async_main from ._runtime import Actor, Arbiter, async_main
@ -205,7 +207,10 @@ async def open_root_actor(
try: try:
yield actor yield actor
except (Exception, trio.MultiError) as err: except (
Exception,
BaseExceptionGroup,
) as err:
entered = await _debug._maybe_enter_pm(err) entered = await _debug._maybe_enter_pm(err)

View File

@ -37,9 +37,10 @@ import os
from contextlib import ExitStack from contextlib import ExitStack
import warnings import warnings
from async_generator import aclosing
from exceptiongroup import BaseExceptionGroup
import trio # type: ignore import trio # type: ignore
from trio_typing import TaskStatus from trio_typing import TaskStatus
from async_generator import aclosing
from ._ipc import Channel from ._ipc import Channel
from ._streaming import Context from ._streaming import Context
@ -194,7 +195,7 @@ async def _invoke(
res = await coro res = await coro
await chan.send({'return': res, 'cid': cid}) await chan.send({'return': res, 'cid': cid})
except trio.MultiError: except BaseExceptionGroup:
# if a context error was set then likely # if a context error was set then likely
# thei multierror was raised due to that # thei multierror was raised due to that
if ctx._error is not None: if ctx._error is not None:
@ -266,7 +267,7 @@ async def _invoke(
except ( except (
Exception, Exception,
trio.MultiError BaseExceptionGroup,
) as err: ) as err:
if not is_multi_cancelled(err): if not is_multi_cancelled(err):
@ -349,7 +350,7 @@ def _get_mod_abspath(module):
async def try_ship_error_to_parent( async def try_ship_error_to_parent(
channel: Channel, channel: Channel,
err: Union[Exception, trio.MultiError], err: Union[Exception, BaseExceptionGroup],
) -> None: ) -> None:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
@ -1549,7 +1550,10 @@ async def process_messages(
partial(_invoke, actor, cid, chan, func, kwargs), partial(_invoke, actor, cid, chan, func, kwargs),
name=funcname, name=funcname,
) )
except (RuntimeError, trio.MultiError): except (
RuntimeError,
BaseExceptionGroup,
):
# avoid reporting a benign race condition # avoid reporting a benign race condition
# during actor runtime teardown. # during actor runtime teardown.
nursery_cancelled_before_task = True nursery_cancelled_before_task = True
@ -1594,7 +1598,10 @@ async def process_messages(
# transport **was** disconnected # transport **was** disconnected
return True return True
except (Exception, trio.MultiError) as err: except (
Exception,
BaseExceptionGroup,
) as err:
if nursery_cancelled_before_task: if nursery_cancelled_before_task:
sn = actor._service_n sn = actor._service_n
assert sn and sn.cancel_scope.cancel_called assert sn and sn.cancel_scope.cancel_called

View File

@ -31,6 +31,7 @@ from typing import (
) )
from collections.abc import Awaitable from collections.abc import Awaitable
from exceptiongroup import BaseExceptionGroup
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -146,8 +147,11 @@ async def exhaust_portal(
# always be established and shutdown using a context manager api # always be established and shutdown using a context manager api
final = await portal.result() final = await portal.result()
except (Exception, trio.MultiError) as err: except (
# we reraise in the parent task via a ``trio.MultiError`` Exception,
BaseExceptionGroup,
) as err:
# we reraise in the parent task via a ``BaseExceptionGroup``
return err return err
except trio.Cancelled as err: except trio.Cancelled as err:
# lol, of course we need this too ;P # lol, of course we need this too ;P
@ -175,7 +179,7 @@ async def cancel_on_completion(
''' '''
# if this call errors we store the exception for later # if this call errors we store the exception for later
# in ``errors`` which will be reraised inside # in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request # an exception group and we still send out a cancel request
result = await exhaust_portal(portal, actor) result = await exhaust_portal(portal, actor)
if isinstance(result, Exception): if isinstance(result, Exception):
errors[actor.uid] = result errors[actor.uid] = result

View File

@ -22,7 +22,6 @@ from typing import (
Optional, Optional,
Any, Any,
) )
from collections.abc import Mapping
import trio import trio
@ -46,12 +45,6 @@ def current_actor(err_on_no_runtime: bool = True) -> 'Actor': # type: ignore #
return _current_actor return _current_actor
_conc_name_getters = {
'task': trio.lowlevel.current_task,
'actor': current_actor
}
def is_main_process() -> bool: def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process. """Bool determining if this actor is running in the top-most process.
""" """

View File

@ -18,6 +18,7 @@
``trio`` inspired apis and helpers ``trio`` inspired apis and helpers
""" """
from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
import inspect import inspect
from typing import ( from typing import (
@ -27,8 +28,8 @@ from typing import (
import typing import typing
import warnings import warnings
from exceptiongroup import BaseExceptionGroup
import trio import trio
from async_generator import asynccontextmanager
from ._debug import maybe_wait_for_debugger from ._debug import maybe_wait_for_debugger
from ._state import current_actor, is_main_process from ._state import current_actor, is_main_process
@ -294,7 +295,7 @@ class ActorNursery:
self._join_procs.set() self._join_procs.set()
@asynccontextmanager @acm
async def _open_and_supervise_one_cancels_all_nursery( async def _open_and_supervise_one_cancels_all_nursery(
actor: Actor, actor: Actor,
) -> typing.AsyncGenerator[ActorNursery, None]: ) -> typing.AsyncGenerator[ActorNursery, None]:
@ -387,13 +388,16 @@ async def _open_and_supervise_one_cancels_all_nursery(
# cancel all subactors # cancel all subactors
await anursery.cancel() await anursery.cancel()
except trio.MultiError as merr: except BaseExceptionGroup as merr:
# If we receive additional errors while waiting on # If we receive additional errors while waiting on
# remaining subactors that were cancelled, # remaining subactors that were cancelled,
# aggregate those errors with the original error # aggregate those errors with the original error
# that triggered this teardown. # that triggered this teardown.
if err not in merr.exceptions: if err not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [err]) raise BaseExceptionGroup(
'tractor.ActorNursery errored with',
list(merr.exceptions) + [err],
)
else: else:
raise raise
@ -402,9 +406,8 @@ async def _open_and_supervise_one_cancels_all_nursery(
# XXX: do we need a `trio.Cancelled` catch here as well? # XXX: do we need a `trio.Cancelled` catch here as well?
# this is the catch around the ``.run_in_actor()`` nursery # this is the catch around the ``.run_in_actor()`` nursery
except ( except (
Exception, Exception,
trio.MultiError, BaseExceptionGroup,
trio.Cancelled trio.Cancelled
) as err: ) as err:
@ -436,9 +439,12 @@ async def _open_and_supervise_one_cancels_all_nursery(
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await anursery.cancel() await anursery.cancel()
# use `MultiError` as needed # use `BaseExceptionGroup` as needed
if len(errors) > 1: if len(errors) > 1:
raise trio.MultiError(tuple(errors.values())) raise BaseExceptionGroup(
'tractor.ActorNursery errored with',
tuple(errors.values()),
)
else: else:
raise list(errors.values())[0] raise list(errors.values())[0]
@ -447,7 +453,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# after nursery exit # after nursery exit
@asynccontextmanager @acm
async def open_nursery( async def open_nursery(
**kwargs, **kwargs,