Compare commits

..

No commits in common. "0294455c5e35a0f0da5fc9605290b293c318f0fd" and "250275d98d52e8076d2797b8c3fabf5f8098d124" have entirely different histories.

10 changed files with 193 additions and 230 deletions

View File

@ -18,49 +18,76 @@
tractor: structured concurrent ``trio``-"actors".
"""
from exceptiongroup import BaseExceptionGroup as BaseExceptionGroup
from exceptiongroup import BaseExceptionGroup
from ._clustering import (
open_actor_cluster as open_actor_cluster,
)
from ._clustering import open_actor_cluster
from ._context import (
Context as Context, # the type
context as context, # a func-decorator
Context, # the type
context, # a func-decorator
)
from ._streaming import (
MsgStream as MsgStream,
stream as stream,
MsgStream,
stream,
)
from ._discovery import (
get_arbiter as get_arbiter,
find_actor as find_actor,
wait_for_actor as wait_for_actor,
query_actor as query_actor,
)
from ._supervise import (
open_nursery as open_nursery,
ActorNursery as ActorNursery,
get_arbiter,
find_actor,
wait_for_actor,
query_actor,
)
from ._supervise import open_nursery
from ._state import (
current_actor as current_actor,
is_root_process as is_root_process,
current_actor,
is_root_process,
)
from ._exceptions import (
RemoteActorError as RemoteActorError,
ModuleNotExposed as ModuleNotExposed,
ContextCancelled as ContextCancelled,
RemoteActorError,
ModuleNotExposed,
ContextCancelled,
)
from .devx import (
breakpoint as breakpoint,
pause as pause,
pause_from_sync as pause_from_sync,
post_mortem as post_mortem,
breakpoint,
pause,
pause_from_sync,
post_mortem,
)
from . import msg as msg
from . import msg
from ._root import (
run_daemon as run_daemon,
open_root_actor as open_root_actor,
run_daemon,
open_root_actor,
)
from ._ipc import Channel as Channel
from ._portal import Portal as Portal
from ._runtime import Actor as Actor
from ._ipc import Channel
from ._portal import Portal
from ._runtime import Actor
__all__ = [
'Actor',
'BaseExceptionGroup',
'Channel',
'Context',
'ContextCancelled',
'ModuleNotExposed',
'MsgStream',
'Portal',
'RemoteActorError',
'breakpoint',
'context',
'current_actor',
'find_actor',
'query_actor',
'get_arbiter',
'is_root_process',
'msg',
'open_actor_cluster',
'open_nursery',
'open_root_actor',
'pause',
'post_mortem',
'pause_from_sync',
'query_actor',
'run_daemon',
'stream',
'to_asyncio',
'wait_for_actor',
]

View File

@ -18,6 +18,8 @@
This is the "bootloader" for actors started using the native trio backend.
"""
import sys
import trio
import argparse
from ast import literal_eval
@ -35,6 +37,8 @@ def parse_ipaddr(arg):
return (str(host), int(port))
from ._entry import _trio_main
if __name__ == "__main__":
parser = argparse.ArgumentParser()

View File

@ -44,11 +44,9 @@ import warnings
import trio
from ._exceptions import (
# _raise_from_no_key_in_msg,
unpack_error,
pack_error,
ContextCancelled,
# MessagingError,
StreamOverrun,
)
from .log import get_logger
@ -64,7 +62,6 @@ if TYPE_CHECKING:
log = get_logger(__name__)
# TODO: make this a msgspec.Struct!
@dataclass
class Context:
'''
@ -494,15 +491,15 @@ class Context:
if self._cancel_called:
# XXX NOTE: ALWAYS RAISE any remote error here even if
# it's an expected `ContextCancelled` due to a local
# task having called `.cancel()`!
# it's an expected `ContextCancelled` (after some local
# task having called `.cancel()` !
#
# WHY: we expect the error to always bubble up to the
# surrounding `Portal.open_context()` call and be
# absorbed there (silently) and we DO NOT want to
# actually try to stream - a cancel msg was already
# sent to the other side!
if self._remote_error:
if re := self._remote_error:
raise self._remote_error
# XXX NOTE: if no `ContextCancelled` has been responded

View File

@ -14,18 +14,16 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
"""
Our classy exception set.
'''
from __future__ import annotations
"""
import builtins
import importlib
from pprint import pformat
from typing import (
Any,
Type,
TYPE_CHECKING,
)
import traceback
@ -34,11 +32,6 @@ import trio
from ._state import current_actor
if TYPE_CHECKING:
from ._context import Context
from ._stream import MsgStream
from .log import StackLevelAdapter
_this_mod = importlib.import_module(__name__)
@ -253,88 +246,3 @@ def is_multi_cancelled(exc: BaseException) -> bool:
) is not None
return False
def _raise_from_no_key_in_msg(
ctx: Context,
msg: dict,
src_err: KeyError,
log: StackLevelAdapter, # caller specific `log` obj
expect_key: str = 'yield',
stream: MsgStream | None = None,
) -> bool:
'''
Raise an appopriate local error when a `MsgStream` msg arrives
which does not contain the expected (under normal operation)
`'yield'` field.
'''
__tracebackhide__: bool = True
# internal error should never get here
try:
cid: str = msg['cid']
except KeyError as src_err:
raise MessagingError(
f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
f'cid: {cid}\n'
'received msg:\n'
f'{pformat(msg)}\n'
) from src_err
# TODO: test that shows stream raising an expected error!!!
if msg.get('error'):
# raise the error message
raise unpack_error(
msg,
ctx.chan,
) from None
elif (
msg.get('stop')
or (
stream
and stream._eoc
)
):
log.debug(
f'Context[{cid}] stream was stopped by remote side\n'
f'cid: {cid}\n'
)
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc: bool = True
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel(
'Context[{cid}] stream ended due to msg:\n'
f'{pformat(msg)}'
) from src_err
if (
stream
and stream._closed
):
raise trio.ClosedResourceError('This stream was closed')
# always re-raise the source error if no translation error case
# is activated above.
_type: str = 'Stream' if stream else 'Context'
raise MessagingError(
f'{_type} was expecting a `{expect_key}` message'
' BUT received a non-`error` msg:\n'
f'cid: {cid}\n'
'{pformat(msg)}'
) from src_err

View File

@ -329,11 +329,8 @@ class Channel:
def __repr__(self) -> str:
if self.msgstream:
return repr(
self.msgstream.stream.socket._sock
).replace( # type: ignore
"socket.socket",
"Channel",
)
self.msgstream.stream.socket._sock).replace( # type: ignore
"socket.socket", "Channel")
return object.__repr__(self)
@property

View File

@ -33,6 +33,7 @@ from typing import (
)
from functools import partial
from dataclasses import dataclass
from pprint import pformat
import warnings
import trio
@ -44,17 +45,13 @@ from ._ipc import Channel
from .log import get_logger
from .msg import NamespacePath
from ._exceptions import (
_raise_from_no_key_in_msg,
unpack_error,
NoResult,
ContextCancelled,
MessagingError,
)
from ._context import (
Context,
)
from ._streaming import (
MsgStream,
)
from ._context import Context
from ._streaming import MsgStream
from .devx._debug import maybe_wait_for_debugger
@ -468,15 +465,26 @@ class Portal:
first: Any = msg['started']
ctx._started_called: bool = True
except KeyError as src_error:
except KeyError:
_raise_from_no_key_in_msg(
ctx=ctx,
msg=msg,
src_err=src_error,
log=log,
expect_key='started',
)
# TODO: can we maybe factor this into the new raiser
# `_streaming._raise_from_no_yield_msg()` and make that
# helper more generic, say with a `_no_<blah>_msg()`?
if not (cid := msg.get('cid')):
raise MessagingError(
'Received internal error at context?\n'
'No call-id (cid) in startup msg?'
)
if msg.get('error'):
# NOTE: mask the key error with the remote one
raise unpack_error(msg, self.channel) from None
else:
raise MessagingError(
f'Context for {cid} was expecting a `started` message'
' but received a non-error msg:\n'
f'{pformat(msg)}'
)
ctx._portal: Portal = self
uid: tuple = self.channel.uid

View File

@ -25,6 +25,7 @@ import logging
import signal
import sys
import os
import typing
import warnings

View File

@ -23,6 +23,7 @@ The machinery and types behind ``Context.open_stream()``
from __future__ import annotations
import inspect
from contextlib import asynccontextmanager as acm
from pprint import pformat
from typing import (
Any,
Callable,
@ -34,7 +35,8 @@ import warnings
import trio
from ._exceptions import (
_raise_from_no_key_in_msg,
unpack_error,
MessagingError,
)
from .log import get_logger
from .trionics import (
@ -54,6 +56,71 @@ log = get_logger(__name__)
# messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
# - use __slots__ on ``Context``?
def _raise_from_no_yield_msg(
stream: MsgStream,
msg: dict,
src_err: KeyError,
) -> bool:
'''
Raise an appopriate local error when a `MsgStream` msg arrives
which does not contain the expected (under normal operation)
`'yield'` field.
'''
__tracebackhide__: bool = True
# internal error should never get here
assert msg.get('cid'), ("Received internal error at portal?")
# TODO: handle 2 cases with 3.10+ match syntax
# - 'stop'
# - 'error'
# possibly just handle msg['stop'] here!
# breakpoint()
if stream._closed:
raise trio.ClosedResourceError('This stream was closed')
if (
msg.get('stop')
or stream._eoc
):
log.debug(f'{stream} was stopped at remote end')
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc: bool = True
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel(
'Stream ended due to msg:\n'
f'{pformat(msg)}'
) from src_err
# TODO: test that shows stream raising an expected error!!!
elif msg.get('error'):
# raise the error message
raise unpack_error(msg, stream._ctx.chan)
# always re-raise the source error if no translation error case
# is activated above.
raise MessagingError(
f'Context received unexpected non-error msg!?\n'
f'cid: {cid}\n'
'received msg:\n'
f'{pformat(msg)}'
) from src_err
class MsgStream(trio.abc.Channel):
'''
A bidirectional message stream for receiving logically sequenced
@ -93,13 +160,10 @@ class MsgStream(trio.abc.Channel):
try:
return msg['yield']
except KeyError as kerr:
_raise_from_no_key_in_msg(
ctx=self._ctx,
_raise_from_no_yield_msg(
stream=self,
msg=msg,
src_err=kerr,
log=log,
expect_key='yield',
stream=self,
)
async def receive(self):
@ -132,13 +196,10 @@ class MsgStream(trio.abc.Channel):
return msg['yield']
except KeyError as kerr:
_raise_from_no_key_in_msg(
ctx=self._ctx,
_raise_from_no_yield_msg(
stream=self,
msg=msg,
src_err=kerr,
log=log,
expect_key='yield',
stream=self,
)
except (

View File

@ -682,7 +682,7 @@ async def pause(
https://en.wikipedia.org/wiki/Breakpoint
'''
# __tracebackhide__ = True
__tracebackhide__ = True
actor = tractor.current_actor()
pdb, undo_sigint = mk_mpdb()
task_name = trio.lowlevel.current_task().name
@ -836,32 +836,25 @@ async def pause(
# runtime aware version which takes care of all .
def pause_from_sync() -> None:
print("ENTER SYNC PAUSE")
actor: tractor.Actor = tractor.current_actor(
err_on_no_runtime=False,
)
if actor:
try:
import greenback
# __tracebackhide__ = True
try:
import greenback
__tracebackhide__ = True
actor: tractor.Actor = tractor.current_actor()
# task_can_release_tty_lock = trio.Event()
# task_can_release_tty_lock = trio.Event()
# spawn bg task which will lock out the TTY, we poll
# just below until the release event is reporting that task as
# waiting.. not the most ideal but works for now ;)
greenback.await_(
actor._service_n.start(partial(
pause,
debug_func=None,
# release_lock_signal=task_can_release_tty_lock,
))
)
except ModuleNotFoundError:
log.warning('NO GREENBACK FOUND')
else:
log.warning('Not inside actor-runtime')
# spawn bg task which will lock out the TTY, we poll
# just below until the release event is reporting that task as
# waiting.. not the most ideal but works for now ;)
greenback.await_(
actor._service_n.start(partial(
pause,
debug_func=None,
# release_lock_signal=task_can_release_tty_lock,
))
)
except ModuleNotFoundError:
log.warning('NO GREENBACK FOUND')
db, undo_sigint = mk_mpdb()
Lock.local_task_in_debug = 'sync'

View File

@ -48,15 +48,12 @@ LOG_FORMAT = (
DATE_FORMAT = '%b %d %H:%M:%S'
LEVELS: dict[str, int] = {
LEVELS = {
'TRANSPORT': 5,
'RUNTIME': 15,
'CANCEL': 16,
'PDB': 500,
}
# _custom_levels: set[str] = {
# lvlname.lower for lvlname in LEVELS.keys()
# }
STD_PALETTE = {
'CRITICAL': 'red',
@ -105,11 +102,7 @@ class StackLevelAdapter(logging.LoggerAdapter):
Cancellation logging, mostly for runtime reporting.
'''
return self.log(
level=16,
msg=msg,
# stacklevel=4,
)
return self.log(16, msg)
def pdb(
self,
@ -121,37 +114,14 @@ class StackLevelAdapter(logging.LoggerAdapter):
'''
return self.log(500, msg)
def log(
self,
level,
msg,
*args,
**kwargs,
):
'''
def log(self, level, msg, *args, **kwargs):
"""
Delegate a log call to the underlying logger, after adding
contextual information from this adapter instance.
'''
"""
if self.isEnabledFor(level):
stacklevel: int = 3
if (
level in LEVELS.values()
# or level in _custom_levels
):
stacklevel: int = 4
# msg, kwargs = self.process(msg, kwargs)
self._log(
level=level,
msg=msg,
args=args,
# NOTE: not sure how this worked before but, it
# seems with our custom level methods defined above
# we do indeed (now) require another stack level??
stacklevel=stacklevel,
**kwargs,
)
self._log(level, msg, args, **kwargs)
# LOL, the stdlib doesn't allow passing through ``stacklevel``..
def _log(
@ -164,15 +134,12 @@ class StackLevelAdapter(logging.LoggerAdapter):
stack_info=False,
# XXX: bit we added to show fileinfo from actual caller.
# - this level
# - then ``.log()``
# - then finally the caller's level..
stacklevel=4,
# this level then ``.log()`` then finally the caller's level..
stacklevel=3,
):
'''
"""
Low-level log implementation, proxied to allow nested logger adapters.
'''
"""
return self.logger._log(
level,
msg,