Compare commits

..

No commits in common. "eebd9bf05c3d7ca49e3b9d618556acc7a4c80124" and "ed96672136be7a70d603fc6dcdb2fffef666446e" have entirely different histories.

10 changed files with 23 additions and 273 deletions

View File

@ -1,28 +0,0 @@
import tractor
import trio
async def bubble():
print('IN BUBBLE')
await trio.sleep(.1)
await tractor.breakpoint()
async def bail():
getattr(doggy)
async def main():
"""The main ``tractor`` routine.
"""
async with tractor.open_nursery() as n:
portal = await n.run_in_actor('future_self', bubble)
# portal = await n.run_in_actor('future_self', bail)
# The ``async with`` will unblock here since the 'some_linguist'
# actor has completed its main task ``cellar_door``.
if __name__ == '__main__':
tractor.run(main, loglevel='trace', debug_mode=True)

View File

@ -63,7 +63,7 @@ def test_rpc_errors(arb_addr, to_call, testdir):
# module should raise a ModuleNotFoundError at import
testdir.makefile('.py', tmp_mod=funcname)
# no need to expose module to the subactor
# no need to exposed module to the subactor
subactor_exposed_mods = exposed_mods
exposed_mods = []
func_defined = False
@ -95,7 +95,7 @@ def test_rpc_errors(arb_addr, to_call, testdir):
tractor.run(
main,
arbiter_addr=arb_addr,
rpc_module_paths=exposed_mods.copy(),
rpc_module_paths=exposed_mods,
)
# handle both parameterized cases

View File

@ -18,7 +18,6 @@ from ._actor import Actor, _start_actor, Arbiter
from ._trionics import open_nursery
from ._state import current_actor
from ._exceptions import RemoteActorError, ModuleNotExposed
from ._debug import breakpoint, post_mortem
from . import msg
from . import _spawn
@ -104,26 +103,14 @@ def run(
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# OR `trio_run_in_process` (the new default).
start_method: Optional[str] = None,
debug_mode: bool = False,
**kwargs,
) -> Any:
"""Run a trio-actor async function in process.
This is tractor's main entry and the start point for any async actor.
"""
# mark top most level process as root actor
_state._runtime_vars['_is_root'] = True
if start_method is not None:
_spawn.try_set_start_method(start_method)
if debug_mode:
_state._runtime_vars['_debug_mode'] = True
# expose internal debug module to every actor allowing
# for use of ``await tractor.breakpoint()``
kwargs.setdefault('rpc_module_paths', []).append('tractor._debug')
return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name)

View File

@ -7,7 +7,6 @@ from itertools import chain
import importlib
import importlib.util
import inspect
import bdb
import uuid
import typing
from typing import Dict, List, Tuple, Any, Optional
@ -126,15 +125,8 @@ async def _invoke(
task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid})
except (Exception, trio.MultiError) as err:
log.exception("Actor errored:")
# NOTE: don't enter debug mode recursively after quitting pdb
if _state.debug_mode() and not isinstance(err, bdb.BdbQuit):
# Allow for pdb control in parent
from ._debug import post_mortem
await post_mortem()
# always ship errors back to caller
log.exception("Actor errored:")
err_msg = pack_error(err)
err_msg['cid'] = cid
try:
@ -190,7 +182,6 @@ class Actor:
def __init__(
self,
name: str,
*,
rpc_module_paths: List[str] = [],
statespace: Optional[Dict[str, Any]] = None,
uid: str = None,
@ -244,7 +235,6 @@ class Actor:
self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[
Tuple[Any, Any, Any, Any, Any]] = None
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {}
async def wait_for_peer(
self, uid: Tuple[str, str]
@ -719,17 +709,11 @@ class Actor:
- cancelling all rpc tasks
- cancelling the channel server
- cancel the "root" nursery
- if root actor, cancel all nurseries under the root ``trio`` task
"""
# cancel all ongoing rpc tasks
await self.cancel_rpc_tasks()
self.cancel_server()
self._root_nursery.cancel_scope.cancel()
if self._parent_chan is None: # TODO: more robust check
# we're the root actor or zombied
root = trio.lowlevel.current_root_task()
for n in root.child_nurseries:
n.cancel_scope.cancel()
async def _cancel_task(self, cid, chan):
"""Cancel a local task by call-id / channel.

View File

@ -1,154 +0,0 @@
"""
Multi-core debugging for da peeps!
"""
import pdb
import sys
import tty
import termios
from functools import partial
from typing import Awaitable, Tuple
from async_generator import aclosing
import tractor
import trio
from .log import get_logger
log = get_logger(__name__)
__all__ = ['breakpoint', 'post_mortem']
# TODO: is there some way to determine this programatically?
_pdb_exit_patterns = tuple(
str.encode(patt + "\n") for patt in ('c', 'cont', 'continue', 'q', 'quit')
)
def subactoruid2proc(
actor: 'Actor', # noqa
uid: Tuple[str, str]
) -> trio.Process:
n = actor._actoruid2nursery[uid]
_, proc, _ = n._children[uid]
return proc
async def _hijack_stdin_relay_to_child(
subactor_uid: Tuple[str, str]
) -> None:
actor = tractor.current_actor()
proc = subactoruid2proc(actor, subactor_uid)
nlb = []
line = []
async def hijack_stdin():
log.info(f"Hijacking stdin from {actor.uid}")
try:
# disable cooked mode
fd = sys.stdin.fileno()
old = tty.tcgetattr(fd)
# tty.setcbreak(fd)
tty.setcbreak(fd)
# trap std in and relay to subproc
async_stdin = trio.wrap_file(sys.stdin)
async with aclosing(async_stdin):
while True:
# async for msg in async_stdin:
msg = await async_stdin.read(1)
nlb.append(msg)
# encode to bytes
bmsg = str.encode(msg)
log.trace(f"Stdin str input:\n{msg}")
log.trace(f"Stdin bytes input:\n{bmsg}")
# relay bytes to subproc over pipe
await proc.stdin.send_all(bmsg)
# termios.tcflush(fd, termios.TCIFLUSH)
# don't write control chars to local stdout
if bmsg not in (b'\t'):
# mirror input to stdout
sys.stdout.write(msg)
sys.stdout.flush()
if bmsg == b'\n':
line = str.encode(''.join(nlb))
# print(line.decode())
if line in _pdb_exit_patterns:
log.info("Closing stdin hijack")
line = []
break
finally:
tty.tcsetattr(fd, tty.TCSAFLUSH, old)
# schedule hijacking in root scope
actor._root_nursery.start_soon(hijack_stdin)
# XXX: We only make this sync in case someone wants to
# overload the ``breakpoint()`` built-in.
def _breakpoint(debug_func) -> Awaitable[None]:
"""``tractor`` breakpoint entry for engaging pdb machinery
in subactors.
"""
actor = tractor.current_actor()
try:
import ipdb
db = ipdb
except ImportError:
import pdb
db = pdb
async def wait_for_parent_stdin_hijack():
log.debug('Breakpoint engaged!')
# TODO: need a more robust check for the "root" actor
if actor._parent_chan:
async with tractor._portal.open_portal(
actor._parent_chan,
start_msg_loop=False,
) as portal:
# with trio.fail_after(1):
await portal.run(
'tractor._debug',
'_hijack_stdin_relay_to_child',
subactor_uid=actor.uid,
)
# block here one frame up where ``breakpoint()``
# was awaited and begin handling stdin
debug_func(actor, db)
# this must be awaited by caller
return wait_for_parent_stdin_hijack()
def _set_trace(actor, dbmod):
dbmod.set_trace(
header=f"\nAttaching pdb to actor: {actor.uid}\n",
# start 2 levels up
frame=sys._getframe().f_back.f_back,
)
breakpoint = partial(
_breakpoint,
_set_trace,
)
def _post_mortem(actor, dbmod):
log.error(
f"\nAttaching to {dbmod} in crashed actor: {actor.uid}\n")
dbmod.post_mortem()
post_mortem = partial(
_breakpoint,
_post_mortem,
)

View File

@ -2,8 +2,7 @@
Process entry points.
"""
from functools import partial
from typing import Tuple, Any, Dict
import os
from typing import Tuple, Any
import trio # type: ignore
@ -55,17 +54,13 @@ def _mp_main(
async def _trio_main(
actor: 'Actor',
accept_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
parent_addr: Tuple[str, int] = None
) -> None:
"""Entry point for a `trio_run_in_process` subactor.
Here we don't need to call `trio.run()` since trip does that as
part of its subprocess startup sequence.
"""
# TODO: make a global func to set this or is it too hacky?
# os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.breakpoint'
if actor.loglevel is not None:
log.info(
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
@ -74,7 +69,6 @@ async def _trio_main(
log.info(f"Started new trio process for {actor.uid}")
_state._current_actor = actor
_state._runtime_vars.update(_runtime_vars)
await actor._async_main(accept_addr, parent_addr=parent_addr)
log.info(f"Actor {actor.uid} terminated")

View File

@ -314,8 +314,7 @@ class LocalPortal:
@asynccontextmanager
async def open_portal(
channel: Channel,
nursery: Optional[trio.Nursery] = None,
start_msg_loop: bool = True,
nursery: Optional[trio.Nursery] = None
) -> typing.AsyncGenerator[Portal, None]:
"""Open a ``Portal`` through the provided ``channel``.
@ -333,17 +332,15 @@ async def open_portal(
if channel.uid is None:
await actor._do_handshake(channel)
msg_loop_cs = None
if start_msg_loop:
msg_loop_cs: trio.CancelScope = await nursery.start(
partial(
actor._process_messages,
channel,
# if the local task is cancelled we want to keep
# the msg loop running until our block ends
shield=True,
)
msg_loop_cs: trio.CancelScope = await nursery.start(
partial(
actor._process_messages,
channel,
# if the local task is cancelled we want to keep
# the msg loop running until our block ends
shield=True,
)
)
portal = Portal(channel)
try:
yield portal
@ -355,7 +352,6 @@ async def open_portal(
await channel.send(None)
# cancel background msg loop task
if msg_loop_cs:
msg_loop_cs.cancel()
msg_loop_cs.cancel()
nursery.cancel_scope.cancel()

View File

@ -26,7 +26,7 @@ from multiprocessing import forkserver # type: ignore
from typing import Tuple
from . import _forkserver_override
from ._state import current_actor, is_main_process
from ._state import current_actor
from .log import get_logger
from ._portal import Portal
from ._actor import Actor, ActorFailure
@ -90,6 +90,12 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
return _ctx
def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process.
"""
return mp.current_process().name == 'MainProcess'
async def exhaust_portal(
portal: Portal,
actor: Actor
@ -182,8 +188,6 @@ async def new_proc(
# passed through to actor main
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
use_trio_run_in_process: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
@ -203,7 +207,6 @@ async def new_proc(
subactor,
bind_addr,
parent_addr,
_runtime_vars=_runtime_vars,
) as proc:
log.info(f"Started {proc}")
@ -215,10 +218,6 @@ async def new_proc(
portal = Portal(chan)
actor_nursery._children[subactor.uid] = (
subactor, proc, portal)
curr_actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
task_status.started(portal)
# wait for ActorNursery.wait() to be called

View File

@ -3,15 +3,10 @@ Per process state
"""
from typing import Optional
from collections import Mapping
import multiprocessing as mp
import trio
_current_actor: Optional['Actor'] = None # type: ignore
_runtime_vars = {
'_debug_mode': False,
'_is_root': False,
}
def current_actor() -> 'Actor': # type: ignore
@ -41,20 +36,3 @@ class ActorContextInfo(Mapping):
except RuntimeError:
# no local actor/task context initialized yet
return f'no {key} context'
def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process.
"""
return mp.current_process().name == 'MainProcess'
def debug_mode() -> bool:
"""Bool determining if "debug mode" is on which enables
remote subactor pdb entry on crashes.
"""
return _runtime_vars['_debug_mode']
def is_root_process() -> bool:
return _runtime_vars['_is_root']

View File

@ -13,11 +13,10 @@ from ._state import current_actor
from .log import get_logger, get_loglevel
from ._actor import Actor
from ._portal import Portal
from . import _state
from . import _spawn
log = get_logger(__name__)
log = get_logger('tractor')
class ActorNursery:
@ -57,10 +56,6 @@ class ActorNursery:
) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel()
# configure and pass runtime state
_rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False
subactor = Actor(
name,
# modules allowed to invoked funcs from
@ -86,7 +81,6 @@ class ActorNursery:
self.errors,
bind_addr,
parent_addr,
_rtv, # run time vars
)
)