Compare commits

..

8 Commits

Author SHA1 Message Date
Tyler Goodlet 0294455c5e `_root`: drop unused `typing` import 2024-01-02 18:43:43 -05:00
Tyler Goodlet 734bc09b67 Move missing-key-in-msg raiser to `._exceptions`
Since we use basically the exact same set of logic in
`Portal.open_context()` when expecting the first `'started'` msg factor
and generalize `._streaming._raise_from_no_yield_msg()` into a new
`._exceptions._raise_from_no_key_in_msg()` (as per the lingering todo)
which obvi requires a more generalized / optional signature including
a caller specific `log` obj. Obvi call the new func from all the other
modules X)
2024-01-02 18:34:15 -05:00
Tyler Goodlet 0bcdea28a0 Fmt repr as multi-line style call 2024-01-02 11:28:55 -05:00
Tyler Goodlet fdf3a1b01b Only use `greenback` if actor-runtime is up.. 2024-01-02 11:28:02 -05:00
Tyler Goodlet ce7b8a5e18 Drop unused walrus assign of `re` 2024-01-02 11:21:20 -05:00
Tyler Goodlet 00024181cd `StackLevelAdapter._log(stacklevel: int)` for custom levels..
Apparently (and i don't know if this was always broken [i feel like no?]
or is a recent change to stdlib's `logging` stuff) we need increment the
`stacklevel` input by one for our custom level methods now? Without this
you're going to see the path to the method's-callstack-frame on every
emission instead of to the caller's. I first noticed this when debugging
the workspace layer spawning in `modden.bigd` and then verified it in
other depended projects..

I guess we should add some tests for this as well XD
2024-01-02 10:38:04 -05:00
Tyler Goodlet 814384848d Use `import <name> as <name>,` style over `__all__` in pkg mod 2024-01-02 10:25:17 -05:00
Tyler Goodlet bea31f6d19 ._child: remove some unused imports.. 2024-01-02 10:24:39 -05:00
10 changed files with 230 additions and 193 deletions

View File

@ -18,76 +18,49 @@
tractor: structured concurrent ``trio``-"actors".
"""
from exceptiongroup import BaseExceptionGroup
from exceptiongroup import BaseExceptionGroup as BaseExceptionGroup
from ._clustering import open_actor_cluster
from ._clustering import (
open_actor_cluster as open_actor_cluster,
)
from ._context import (
Context, # the type
context, # a func-decorator
Context as Context, # the type
context as context, # a func-decorator
)
from ._streaming import (
MsgStream,
stream,
MsgStream as MsgStream,
stream as stream,
)
from ._discovery import (
get_arbiter,
find_actor,
wait_for_actor,
query_actor,
get_arbiter as get_arbiter,
find_actor as find_actor,
wait_for_actor as wait_for_actor,
query_actor as query_actor,
)
from ._supervise import (
open_nursery as open_nursery,
ActorNursery as ActorNursery,
)
from ._supervise import open_nursery
from ._state import (
current_actor,
is_root_process,
current_actor as current_actor,
is_root_process as is_root_process,
)
from ._exceptions import (
RemoteActorError,
ModuleNotExposed,
ContextCancelled,
RemoteActorError as RemoteActorError,
ModuleNotExposed as ModuleNotExposed,
ContextCancelled as ContextCancelled,
)
from .devx import (
breakpoint,
pause,
pause_from_sync,
post_mortem,
breakpoint as breakpoint,
pause as pause,
pause_from_sync as pause_from_sync,
post_mortem as post_mortem,
)
from . import msg
from . import msg as msg
from ._root import (
run_daemon,
open_root_actor,
run_daemon as run_daemon,
open_root_actor as open_root_actor,
)
from ._ipc import Channel
from ._portal import Portal
from ._runtime import Actor
__all__ = [
'Actor',
'BaseExceptionGroup',
'Channel',
'Context',
'ContextCancelled',
'ModuleNotExposed',
'MsgStream',
'Portal',
'RemoteActorError',
'breakpoint',
'context',
'current_actor',
'find_actor',
'query_actor',
'get_arbiter',
'is_root_process',
'msg',
'open_actor_cluster',
'open_nursery',
'open_root_actor',
'pause',
'post_mortem',
'pause_from_sync',
'query_actor',
'run_daemon',
'stream',
'to_asyncio',
'wait_for_actor',
]
from ._ipc import Channel as Channel
from ._portal import Portal as Portal
from ._runtime import Actor as Actor

View File

@ -18,8 +18,6 @@
This is the "bootloader" for actors started using the native trio backend.
"""
import sys
import trio
import argparse
from ast import literal_eval
@ -37,8 +35,6 @@ def parse_ipaddr(arg):
return (str(host), int(port))
from ._entry import _trio_main
if __name__ == "__main__":
parser = argparse.ArgumentParser()

View File

@ -44,9 +44,11 @@ import warnings
import trio
from ._exceptions import (
# _raise_from_no_key_in_msg,
unpack_error,
pack_error,
ContextCancelled,
# MessagingError,
StreamOverrun,
)
from .log import get_logger
@ -62,6 +64,7 @@ if TYPE_CHECKING:
log = get_logger(__name__)
# TODO: make this a msgspec.Struct!
@dataclass
class Context:
'''
@ -491,15 +494,15 @@ class Context:
if self._cancel_called:
# XXX NOTE: ALWAYS RAISE any remote error here even if
# it's an expected `ContextCancelled` (after some local
# task having called `.cancel()` !
# it's an expected `ContextCancelled` due to a local
# task having called `.cancel()`!
#
# WHY: we expect the error to always bubble up to the
# surrounding `Portal.open_context()` call and be
# absorbed there (silently) and we DO NOT want to
# actually try to stream - a cancel msg was already
# sent to the other side!
if re := self._remote_error:
if self._remote_error:
raise self._remote_error
# XXX NOTE: if no `ContextCancelled` has been responded

View File

@ -14,16 +14,18 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
'''
Our classy exception set.
"""
'''
from __future__ import annotations
import builtins
import importlib
from pprint import pformat
from typing import (
Any,
Type,
TYPE_CHECKING,
)
import traceback
@ -32,6 +34,11 @@ import trio
from ._state import current_actor
if TYPE_CHECKING:
from ._context import Context
from ._stream import MsgStream
from .log import StackLevelAdapter
_this_mod = importlib.import_module(__name__)
@ -246,3 +253,88 @@ def is_multi_cancelled(exc: BaseException) -> bool:
) is not None
return False
def _raise_from_no_key_in_msg(
ctx: Context,
msg: dict,
src_err: KeyError,
log: StackLevelAdapter, # caller specific `log` obj
expect_key: str = 'yield',
stream: MsgStream | None = None,
) -> bool:
'''
Raise an appopriate local error when a `MsgStream` msg arrives
which does not contain the expected (under normal operation)
`'yield'` field.
'''
__tracebackhide__: bool = True
# internal error should never get here
try:
cid: str = msg['cid']
except KeyError as src_err:
raise MessagingError(
f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
f'cid: {cid}\n'
'received msg:\n'
f'{pformat(msg)}\n'
) from src_err
# TODO: test that shows stream raising an expected error!!!
if msg.get('error'):
# raise the error message
raise unpack_error(
msg,
ctx.chan,
) from None
elif (
msg.get('stop')
or (
stream
and stream._eoc
)
):
log.debug(
f'Context[{cid}] stream was stopped by remote side\n'
f'cid: {cid}\n'
)
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc: bool = True
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel(
'Context[{cid}] stream ended due to msg:\n'
f'{pformat(msg)}'
) from src_err
if (
stream
and stream._closed
):
raise trio.ClosedResourceError('This stream was closed')
# always re-raise the source error if no translation error case
# is activated above.
_type: str = 'Stream' if stream else 'Context'
raise MessagingError(
f'{_type} was expecting a `{expect_key}` message'
' BUT received a non-`error` msg:\n'
f'cid: {cid}\n'
'{pformat(msg)}'
) from src_err

View File

@ -329,8 +329,11 @@ class Channel:
def __repr__(self) -> str:
if self.msgstream:
return repr(
self.msgstream.stream.socket._sock).replace( # type: ignore
"socket.socket", "Channel")
self.msgstream.stream.socket._sock
).replace( # type: ignore
"socket.socket",
"Channel",
)
return object.__repr__(self)
@property

View File

@ -33,7 +33,6 @@ from typing import (
)
from functools import partial
from dataclasses import dataclass
from pprint import pformat
import warnings
import trio
@ -45,13 +44,17 @@ from ._ipc import Channel
from .log import get_logger
from .msg import NamespacePath
from ._exceptions import (
_raise_from_no_key_in_msg,
unpack_error,
NoResult,
ContextCancelled,
MessagingError,
)
from ._context import Context
from ._streaming import MsgStream
from ._context import (
Context,
)
from ._streaming import (
MsgStream,
)
from .devx._debug import maybe_wait_for_debugger
@ -465,26 +468,15 @@ class Portal:
first: Any = msg['started']
ctx._started_called: bool = True
except KeyError:
except KeyError as src_error:
# TODO: can we maybe factor this into the new raiser
# `_streaming._raise_from_no_yield_msg()` and make that
# helper more generic, say with a `_no_<blah>_msg()`?
if not (cid := msg.get('cid')):
raise MessagingError(
'Received internal error at context?\n'
'No call-id (cid) in startup msg?'
)
if msg.get('error'):
# NOTE: mask the key error with the remote one
raise unpack_error(msg, self.channel) from None
else:
raise MessagingError(
f'Context for {cid} was expecting a `started` message'
' but received a non-error msg:\n'
f'{pformat(msg)}'
)
_raise_from_no_key_in_msg(
ctx=ctx,
msg=msg,
src_err=src_error,
log=log,
expect_key='started',
)
ctx._portal: Portal = self
uid: tuple = self.channel.uid

View File

@ -25,7 +25,6 @@ import logging
import signal
import sys
import os
import typing
import warnings

View File

@ -23,7 +23,6 @@ The machinery and types behind ``Context.open_stream()``
from __future__ import annotations
import inspect
from contextlib import asynccontextmanager as acm
from pprint import pformat
from typing import (
Any,
Callable,
@ -35,8 +34,7 @@ import warnings
import trio
from ._exceptions import (
unpack_error,
MessagingError,
_raise_from_no_key_in_msg,
)
from .log import get_logger
from .trionics import (
@ -56,71 +54,6 @@ log = get_logger(__name__)
# messages? class ReceiveChannel(AsyncResource, Generic[ReceiveType]):
# - use __slots__ on ``Context``?
def _raise_from_no_yield_msg(
stream: MsgStream,
msg: dict,
src_err: KeyError,
) -> bool:
'''
Raise an appopriate local error when a `MsgStream` msg arrives
which does not contain the expected (under normal operation)
`'yield'` field.
'''
__tracebackhide__: bool = True
# internal error should never get here
assert msg.get('cid'), ("Received internal error at portal?")
# TODO: handle 2 cases with 3.10+ match syntax
# - 'stop'
# - 'error'
# possibly just handle msg['stop'] here!
# breakpoint()
if stream._closed:
raise trio.ClosedResourceError('This stream was closed')
if (
msg.get('stop')
or stream._eoc
):
log.debug(f'{stream} was stopped at remote end')
# XXX: important to set so that a new ``.receive()``
# call (likely by another task using a broadcast receiver)
# doesn't accidentally pull the ``return`` message
# value out of the underlying feed mem chan!
stream._eoc: bool = True
# # when the send is closed we assume the stream has
# # terminated and signal this local iterator to stop
# await stream.aclose()
# XXX: this causes ``ReceiveChannel.__anext__()`` to
# raise a ``StopAsyncIteration`` **and** in our catch
# block below it will trigger ``.aclose()``.
raise trio.EndOfChannel(
'Stream ended due to msg:\n'
f'{pformat(msg)}'
) from src_err
# TODO: test that shows stream raising an expected error!!!
elif msg.get('error'):
# raise the error message
raise unpack_error(msg, stream._ctx.chan)
# always re-raise the source error if no translation error case
# is activated above.
raise MessagingError(
f'Context received unexpected non-error msg!?\n'
f'cid: {cid}\n'
'received msg:\n'
f'{pformat(msg)}'
) from src_err
class MsgStream(trio.abc.Channel):
'''
A bidirectional message stream for receiving logically sequenced
@ -160,10 +93,13 @@ class MsgStream(trio.abc.Channel):
try:
return msg['yield']
except KeyError as kerr:
_raise_from_no_yield_msg(
stream=self,
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
src_err=kerr,
log=log,
expect_key='yield',
stream=self,
)
async def receive(self):
@ -196,10 +132,13 @@ class MsgStream(trio.abc.Channel):
return msg['yield']
except KeyError as kerr:
_raise_from_no_yield_msg(
stream=self,
_raise_from_no_key_in_msg(
ctx=self._ctx,
msg=msg,
src_err=kerr,
log=log,
expect_key='yield',
stream=self,
)
except (

View File

@ -682,7 +682,7 @@ async def pause(
https://en.wikipedia.org/wiki/Breakpoint
'''
__tracebackhide__ = True
# __tracebackhide__ = True
actor = tractor.current_actor()
pdb, undo_sigint = mk_mpdb()
task_name = trio.lowlevel.current_task().name
@ -836,25 +836,32 @@ async def pause(
# runtime aware version which takes care of all .
def pause_from_sync() -> None:
print("ENTER SYNC PAUSE")
try:
import greenback
__tracebackhide__ = True
actor: tractor.Actor = tractor.current_actor(
err_on_no_runtime=False,
)
if actor:
try:
import greenback
# __tracebackhide__ = True
actor: tractor.Actor = tractor.current_actor()
# task_can_release_tty_lock = trio.Event()
# spawn bg task which will lock out the TTY, we poll
# just below until the release event is reporting that task as
# waiting.. not the most ideal but works for now ;)
greenback.await_(
actor._service_n.start(partial(
pause,
debug_func=None,
# release_lock_signal=task_can_release_tty_lock,
))
)
except ModuleNotFoundError:
log.warning('NO GREENBACK FOUND')
# task_can_release_tty_lock = trio.Event()
# spawn bg task which will lock out the TTY, we poll
# just below until the release event is reporting that task as
# waiting.. not the most ideal but works for now ;)
greenback.await_(
actor._service_n.start(partial(
pause,
debug_func=None,
# release_lock_signal=task_can_release_tty_lock,
))
)
except ModuleNotFoundError:
log.warning('NO GREENBACK FOUND')
else:
log.warning('Not inside actor-runtime')
db, undo_sigint = mk_mpdb()
Lock.local_task_in_debug = 'sync'

View File

@ -48,12 +48,15 @@ LOG_FORMAT = (
DATE_FORMAT = '%b %d %H:%M:%S'
LEVELS = {
LEVELS: dict[str, int] = {
'TRANSPORT': 5,
'RUNTIME': 15,
'CANCEL': 16,
'PDB': 500,
}
# _custom_levels: set[str] = {
# lvlname.lower for lvlname in LEVELS.keys()
# }
STD_PALETTE = {
'CRITICAL': 'red',
@ -102,7 +105,11 @@ class StackLevelAdapter(logging.LoggerAdapter):
Cancellation logging, mostly for runtime reporting.
'''
return self.log(16, msg)
return self.log(
level=16,
msg=msg,
# stacklevel=4,
)
def pdb(
self,
@ -114,14 +121,37 @@ class StackLevelAdapter(logging.LoggerAdapter):
'''
return self.log(500, msg)
def log(self, level, msg, *args, **kwargs):
"""
def log(
self,
level,
msg,
*args,
**kwargs,
):
'''
Delegate a log call to the underlying logger, after adding
contextual information from this adapter instance.
"""
'''
if self.isEnabledFor(level):
stacklevel: int = 3
if (
level in LEVELS.values()
# or level in _custom_levels
):
stacklevel: int = 4
# msg, kwargs = self.process(msg, kwargs)
self._log(level, msg, args, **kwargs)
self._log(
level=level,
msg=msg,
args=args,
# NOTE: not sure how this worked before but, it
# seems with our custom level methods defined above
# we do indeed (now) require another stack level??
stacklevel=stacklevel,
**kwargs,
)
# LOL, the stdlib doesn't allow passing through ``stacklevel``..
def _log(
@ -134,12 +164,15 @@ class StackLevelAdapter(logging.LoggerAdapter):
stack_info=False,
# XXX: bit we added to show fileinfo from actual caller.
# this level then ``.log()`` then finally the caller's level..
stacklevel=3,
# - this level
# - then ``.log()``
# - then finally the caller's level..
stacklevel=4,
):
"""
'''
Low-level log implementation, proxied to allow nested logger adapters.
"""
'''
return self.logger._log(
level,
msg,