Compare commits

..

No commits in common. "0294455c5e35a0f0da5fc9605290b293c318f0fd" and "250275d98d52e8076d2797b8c3fabf5f8098d124" have entirely different histories.

10 changed files with 193 additions and 230 deletions

View File

@ -18,49 +18,76 @@
tractor: structured concurrent ``trio``-"actors". tractor: structured concurrent ``trio``-"actors".
""" """
from exceptiongroup import BaseExceptionGroup as BaseExceptionGroup from exceptiongroup import BaseExceptionGroup
from ._clustering import ( from ._clustering import open_actor_cluster
open_actor_cluster as open_actor_cluster,
)
from ._context import ( from ._context import (
Context as Context, # the type Context, # the type
context as context, # a func-decorator context, # a func-decorator
) )
from ._streaming import ( from ._streaming import (
MsgStream as MsgStream, MsgStream,
stream as stream, stream,
) )
from ._discovery import ( from ._discovery import (
get_arbiter as get_arbiter, get_arbiter,
find_actor as find_actor, find_actor,
wait_for_actor as wait_for_actor, wait_for_actor,
query_actor as query_actor, query_actor,
)
from ._supervise import (
open_nursery as open_nursery,
ActorNursery as ActorNursery,
) )
from ._supervise import open_nursery
from ._state import ( from ._state import (
current_actor as current_actor, current_actor,
is_root_process as is_root_process, is_root_process,
) )
from ._exceptions import ( from ._exceptions import (
RemoteActorError as RemoteActorError, RemoteActorError,
ModuleNotExposed as ModuleNotExposed, ModuleNotExposed,
ContextCancelled as ContextCancelled, ContextCancelled,
) )
from .devx import ( from .devx import (
breakpoint as breakpoint, breakpoint,
pause as pause, pause,
pause_from_sync as pause_from_sync, pause_from_sync,
post_mortem as post_mortem, post_mortem,
) )
from . import msg as msg from . import msg
from ._root import ( from ._root import (
run_daemon as run_daemon, run_daemon,
open_root_actor as open_root_actor, open_root_actor,
) )
from ._ipc import Channel as Channel from ._ipc import Channel
from ._portal import Portal as Portal from ._portal import Portal
from ._runtime import Actor as Actor from ._runtime import Actor
__all__ = [
'Actor',
'BaseExceptionGroup',
'Channel',
'Context',
'ContextCancelled',
'ModuleNotExposed',
'MsgStream',
'Portal',
'RemoteActorError',
'breakpoint',
'context',
'current_actor',
'find_actor',
'query_actor',
'get_arbiter',
'is_root_process',
'msg',
'open_actor_cluster',
'open_nursery',
'open_root_actor',
'pause',
'post_mortem',
'pause_from_sync',
'query_actor',
'run_daemon',
'stream',
'to_asyncio',
'wait_for_actor',
]

View File

@ -18,6 +18,8 @@
This is the "bootloader" for actors started using the native trio backend. This is the "bootloader" for actors started using the native trio backend.
""" """
import sys
import trio
import argparse import argparse
from ast import literal_eval from ast import literal_eval
@ -35,6 +37,8 @@ def parse_ipaddr(arg):
return (str(host), int(port)) return (str(host), int(port))
from ._entry import _trio_main
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()

View File

@ -44,11 +44,9 @@ import warnings
import trio import trio
from ._exceptions import ( from ._exceptions import (
# _raise_from_no_key_in_msg,
unpack_error, unpack_error,
pack_error, pack_error,
ContextCancelled, ContextCancelled,
# MessagingError,
StreamOverrun, StreamOverrun,
) )
from .log import get_logger from .log import get_logger
@ -64,7 +62,6 @@ if TYPE_CHECKING:
log = get_logger(__name__) log = get_logger(__name__)
# TODO: make this a msgspec.Struct!
@dataclass @dataclass
class Context: class Context:
''' '''
@ -494,7 +491,7 @@ class Context:
if self._cancel_called: if self._cancel_called:
# XXX NOTE: ALWAYS RAISE any remote error here even if # XXX NOTE: ALWAYS RAISE any remote error here even if
# it's an expected `ContextCancelled` due to a local # it's an expected `ContextCancelled` (after some local
# task having called `.cancel()` ! # task having called `.cancel()` !
# #
# WHY: we expect the error to always bubble up to the # WHY: we expect the error to always bubble up to the
@ -502,7 +499,7 @@ class Context:
# absorbed there (silently) and we DO NOT want to # absorbed there (silently) and we DO NOT want to
# actually try to stream - a cancel msg was already # actually try to stream - a cancel msg was already
# sent to the other side! # sent to the other side!
if self._remote_error: if re := self._remote_error:
raise self._remote_error raise self._remote_error
# XXX NOTE: if no `ContextCancelled` has been responded # XXX NOTE: if no `ContextCancelled` has been responded

View File

@ -14,18 +14,16 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' """
Our classy exception set. Our classy exception set.
''' """
from __future__ import annotations
import builtins import builtins
import importlib import importlib
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
Any, Any,
Type, Type,
TYPE_CHECKING,
) )
import traceback import traceback
@ -34,11 +32,6 @@ import trio
from ._state import current_actor from ._state import current_actor
if TYPE_CHECKING:
from ._context import Context
from ._stream import MsgStream
from .log import StackLevelAdapter
_this_mod = importlib.import_module(__name__) _this_mod = importlib.import_module(__name__)
@ -253,88 +246,3 @@ def is_multi_cancelled(exc: BaseException) -> bool:
) is not None ) is not None
return False return False
def _raise_from_no_key_in_msg(
ctx: Context,
msg: dict,
src_err: KeyError,
log: StackLevelAdapter, # caller specific `log` obj
expect_key: str = 'yield',
stream: MsgStream | None = None,
) -> bool:
'''
Raise an appopriate local error when a `MsgStream` msg arrives
which does not contain the expected (under normal operation)
`'yield'` field.
'''
__tracebackhide__: bool = True
# internal error should never get here
try:
cid: str = msg['cid']
except KeyError as src_err:
raise MessagingError(
f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
f'cid: {cid}\n'
'received msg:\n'
f'{pformat(msg)}\n'
) from src_err
# TODO: test that shows stream raising an expected error!!!
if msg.get('error'):
# raise the error message
raise unpack_error(
msg,
ctx.chan,
) from None
elif (
msg.get('stop')
or (
stream
and stream._eoc
)
):
log.debug(
f'Context[{cid}] stream was stopped by remote side\n'
f'cid: {cid}\n'
)
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc: bool = True
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel(
'Context[{cid}] stream ended due to msg:\n'
f'{pformat(msg)}'
) from src_err
if (
stream
and stream._closed
):
raise trio.ClosedResourceError('This stream was closed')
# always re-raise the source error if no translation error case
# is activated above.
_type: str = 'Stream' if stream else 'Context'
raise MessagingError(
f'{_type} was expecting a `{expect_key}` message'
' BUT received a non-`error` msg:\n'
f'cid: {cid}\n'
'{pformat(msg)}'
) from src_err

View File

@ -329,11 +329,8 @@ class Channel:
def __repr__(self) -> str: def __repr__(self) -> str:
if self.msgstream: if self.msgstream:
return repr( return repr(
self.msgstream.stream.socket._sock self.msgstream.stream.socket._sock).replace( # type: ignore
).replace( # type: ignore "socket.socket", "Channel")
"socket.socket",
"Channel",
)
return object.__repr__(self) return object.__repr__(self)
@property @property

View File

@ -33,6 +33,7 @@ from typing import (
) )
from functools import partial from functools import partial
from dataclasses import dataclass from dataclasses import dataclass
from pprint import pformat
import warnings import warnings
import trio import trio
@ -44,17 +45,13 @@ from ._ipc import Channel
from .log import get_logger from .log import get_logger
from .msg import NamespacePath from .msg import NamespacePath
from ._exceptions import ( from ._exceptions import (
_raise_from_no_key_in_msg,
unpack_error, unpack_error,
NoResult, NoResult,
ContextCancelled, ContextCancelled,
MessagingError,
) )
from ._context import ( from ._context import Context
Context, from ._streaming import MsgStream
)
from ._streaming import (
MsgStream,
)
from .devx._debug import maybe_wait_for_debugger from .devx._debug import maybe_wait_for_debugger
@ -468,14 +465,25 @@ class Portal:
first: Any = msg['started'] first: Any = msg['started']
ctx._started_called: bool = True ctx._started_called: bool = True
except KeyError as src_error: except KeyError:
_raise_from_no_key_in_msg( # TODO: can we maybe factor this into the new raiser
ctx=ctx, # `_streaming._raise_from_no_yield_msg()` and make that
msg=msg, # helper more generic, say with a `_no_<blah>_msg()`?
src_err=src_error, if not (cid := msg.get('cid')):
log=log, raise MessagingError(
expect_key='started', 'Received internal error at context?\n'
'No call-id (cid) in startup msg?'
)
if msg.get('error'):
# NOTE: mask the key error with the remote one
raise unpack_error(msg, self.channel) from None
else:
raise MessagingError(
f'Context for {cid} was expecting a `started` message'
' but received a non-error msg:\n'
f'{pformat(msg)}'
) )
ctx._portal: Portal = self ctx._portal: Portal = self

View File

@ -25,6 +25,7 @@ import logging
import signal import signal
import sys import sys
import os import os
import typing
import warnings import warnings

View File

@ -23,6 +23,7 @@ The machinery and types behind ``Context.open_stream()``
from __future__ import annotations from __future__ import annotations
import inspect import inspect
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from pprint import pformat
from typing import ( from typing import (
Any, Any,
Callable, Callable,
@ -34,7 +35,8 @@ import warnings
import trio import trio
from ._exceptions import ( from ._exceptions import (
_raise_from_no_key_in_msg, unpack_error,
MessagingError,
) )
from .log import get_logger from .log import get_logger
from .trionics import ( from .trionics import (
@ -54,6 +56,71 @@ log = get_logger(__name__)
# messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]): # messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
# - use __slots__ on ``Context``? # - use __slots__ on ``Context``?
def _raise_from_no_yield_msg(
stream: MsgStream,
msg: dict,
src_err: KeyError,
) -> bool:
'''
Raise an appopriate local error when a `MsgStream` msg arrives
which does not contain the expected (under normal operation)
`'yield'` field.
'''
__tracebackhide__: bool = True
# internal error should never get here
assert msg.get('cid'), ("Received internal error at portal?")
# TODO: handle 2 cases with 3.10+ match syntax
# - 'stop'
# - 'error'
# possibly just handle msg['stop'] here!
# breakpoint()
if stream._closed:
raise trio.ClosedResourceError('This stream was closed')
if (
msg.get('stop')
or stream._eoc
):
log.debug(f'{stream} was stopped at remote end')
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc: bool = True
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel(
'Stream ended due to msg:\n'
f'{pformat(msg)}'
) from src_err
# TODO: test that shows stream raising an expected error!!!
elif msg.get('error'):
# raise the error message
raise unpack_error(msg, stream._ctx.chan)
# always re-raise the source error if no translation error case
# is activated above.
raise MessagingError(
f'Context received unexpected non-error msg!?\n'
f'cid: {cid}\n'
'received msg:\n'
f'{pformat(msg)}'
) from src_err
class MsgStream(trio.abc.Channel): class MsgStream(trio.abc.Channel):
''' '''
A bidirectional message stream for receiving logically sequenced A bidirectional message stream for receiving logically sequenced
@ -93,13 +160,10 @@ class MsgStream(trio.abc.Channel):
try: try:
return msg['yield'] return msg['yield']
except KeyError as kerr: except KeyError as kerr:
_raise_from_no_key_in_msg( _raise_from_no_yield_msg(
ctx=self._ctx, stream=self,
msg=msg, msg=msg,
src_err=kerr, src_err=kerr,
log=log,
expect_key='yield',
stream=self,
) )
async def receive(self): async def receive(self):
@ -132,13 +196,10 @@ class MsgStream(trio.abc.Channel):
return msg['yield'] return msg['yield']
except KeyError as kerr: except KeyError as kerr:
_raise_from_no_key_in_msg( _raise_from_no_yield_msg(
ctx=self._ctx, stream=self,
msg=msg, msg=msg,
src_err=kerr, src_err=kerr,
log=log,
expect_key='yield',
stream=self,
) )
except ( except (

View File

@ -682,7 +682,7 @@ async def pause(
https://en.wikipedia.org/wiki/Breakpoint https://en.wikipedia.org/wiki/Breakpoint
''' '''
# __tracebackhide__ = True __tracebackhide__ = True
actor = tractor.current_actor() actor = tractor.current_actor()
pdb, undo_sigint = mk_mpdb() pdb, undo_sigint = mk_mpdb()
task_name = trio.lowlevel.current_task().name task_name = trio.lowlevel.current_task().name
@ -836,15 +836,11 @@ async def pause(
# runtime aware version which takes care of all . # runtime aware version which takes care of all .
def pause_from_sync() -> None: def pause_from_sync() -> None:
print("ENTER SYNC PAUSE") print("ENTER SYNC PAUSE")
actor: tractor.Actor = tractor.current_actor(
err_on_no_runtime=False,
)
if actor:
try: try:
import greenback import greenback
# __tracebackhide__ = True __tracebackhide__ = True
actor: tractor.Actor = tractor.current_actor()
# task_can_release_tty_lock = trio.Event() # task_can_release_tty_lock = trio.Event()
# spawn bg task which will lock out the TTY, we poll # spawn bg task which will lock out the TTY, we poll
@ -857,11 +853,8 @@ def pause_from_sync() -> None:
# release_lock_signal=task_can_release_tty_lock, # release_lock_signal=task_can_release_tty_lock,
)) ))
) )
except ModuleNotFoundError: except ModuleNotFoundError:
log.warning('NO GREENBACK FOUND') log.warning('NO GREENBACK FOUND')
else:
log.warning('Not inside actor-runtime')
db, undo_sigint = mk_mpdb() db, undo_sigint = mk_mpdb()
Lock.local_task_in_debug = 'sync' Lock.local_task_in_debug = 'sync'

View File

@ -48,15 +48,12 @@ LOG_FORMAT = (
DATE_FORMAT = '%b %d %H:%M:%S' DATE_FORMAT = '%b %d %H:%M:%S'
LEVELS: dict[str, int] = { LEVELS = {
'TRANSPORT': 5, 'TRANSPORT': 5,
'RUNTIME': 15, 'RUNTIME': 15,
'CANCEL': 16, 'CANCEL': 16,
'PDB': 500, 'PDB': 500,
} }
# _custom_levels: set[str] = {
# lvlname.lower for lvlname in LEVELS.keys()
# }
STD_PALETTE = { STD_PALETTE = {
'CRITICAL': 'red', 'CRITICAL': 'red',
@ -105,11 +102,7 @@ class StackLevelAdapter(logging.LoggerAdapter):
Cancellation logging, mostly for runtime reporting. Cancellation logging, mostly for runtime reporting.
''' '''
return self.log( return self.log(16, msg)
level=16,
msg=msg,
# stacklevel=4,
)
def pdb( def pdb(
self, self,
@ -121,37 +114,14 @@ class StackLevelAdapter(logging.LoggerAdapter):
''' '''
return self.log(500, msg) return self.log(500, msg)
def log( def log(self, level, msg, *args, **kwargs):
self, """
level,
msg,
*args,
**kwargs,
):
'''
Delegate a log call to the underlying logger, after adding Delegate a log call to the underlying logger, after adding
contextual information from this adapter instance. contextual information from this adapter instance.
"""
'''
if self.isEnabledFor(level): if self.isEnabledFor(level):
stacklevel: int = 3
if (
level in LEVELS.values()
# or level in _custom_levels
):
stacklevel: int = 4
# msg, kwargs = self.process(msg, kwargs) # msg, kwargs = self.process(msg, kwargs)
self._log( self._log(level, msg, args, **kwargs)
level=level,
msg=msg,
args=args,
# NOTE: not sure how this worked before but, it
# seems with our custom level methods defined above
# we do indeed (now) require another stack level??
stacklevel=stacklevel,
**kwargs,
)
# LOL, the stdlib doesn't allow passing through ``stacklevel``.. # LOL, the stdlib doesn't allow passing through ``stacklevel``..
def _log( def _log(
@ -164,15 +134,12 @@ class StackLevelAdapter(logging.LoggerAdapter):
stack_info=False, stack_info=False,
# XXX: bit we added to show fileinfo from actual caller. # XXX: bit we added to show fileinfo from actual caller.
# - this level # this level then ``.log()`` then finally the caller's level..
# - then ``.log()`` stacklevel=3,
# - then finally the caller's level..
stacklevel=4,
): ):
''' """
Low-level log implementation, proxied to allow nested logger adapters. Low-level log implementation, proxied to allow nested logger adapters.
"""
'''
return self.logger._log( return self.logger._log(
level, level,
msg, msg,