forked from goodboy/tractor
Compare commits
9 Commits
master
...
stin_char_
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | eebd9bf05c | |
Tyler Goodlet | 7886a9fa64 | |
Tyler Goodlet | 042f2326db | |
Tyler Goodlet | 7b5af3b2d4 | |
Tyler Goodlet | bcbef1a095 | |
Tyler Goodlet | b1897da328 | |
Tyler Goodlet | 23fee8820b | |
Tyler Goodlet | 96cf4b6e9f | |
Tyler Goodlet | 90499c19d7 |
|
@ -0,0 +1,28 @@
|
||||||
|
import tractor
|
||||||
|
import trio
|
||||||
|
|
||||||
|
|
||||||
|
async def bubble():
|
||||||
|
print('IN BUBBLE')
|
||||||
|
await trio.sleep(.1)
|
||||||
|
await tractor.breakpoint()
|
||||||
|
|
||||||
|
|
||||||
|
async def bail():
|
||||||
|
getattr(doggy)
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
"""The main ``tractor`` routine.
|
||||||
|
"""
|
||||||
|
async with tractor.open_nursery() as n:
|
||||||
|
|
||||||
|
portal = await n.run_in_actor('future_self', bubble)
|
||||||
|
# portal = await n.run_in_actor('future_self', bail)
|
||||||
|
|
||||||
|
# The ``async with`` will unblock here since the 'some_linguist'
|
||||||
|
# actor has completed its main task ``cellar_door``.
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
tractor.run(main, loglevel='trace', debug_mode=True)
|
|
@ -63,7 +63,7 @@ def test_rpc_errors(arb_addr, to_call, testdir):
|
||||||
# module should raise a ModuleNotFoundError at import
|
# module should raise a ModuleNotFoundError at import
|
||||||
testdir.makefile('.py', tmp_mod=funcname)
|
testdir.makefile('.py', tmp_mod=funcname)
|
||||||
|
|
||||||
# no need to exposed module to the subactor
|
# no need to expose module to the subactor
|
||||||
subactor_exposed_mods = exposed_mods
|
subactor_exposed_mods = exposed_mods
|
||||||
exposed_mods = []
|
exposed_mods = []
|
||||||
func_defined = False
|
func_defined = False
|
||||||
|
@ -95,7 +95,7 @@ def test_rpc_errors(arb_addr, to_call, testdir):
|
||||||
tractor.run(
|
tractor.run(
|
||||||
main,
|
main,
|
||||||
arbiter_addr=arb_addr,
|
arbiter_addr=arb_addr,
|
||||||
rpc_module_paths=exposed_mods,
|
rpc_module_paths=exposed_mods.copy(),
|
||||||
)
|
)
|
||||||
|
|
||||||
# handle both parameterized cases
|
# handle both parameterized cases
|
||||||
|
|
|
@ -18,6 +18,7 @@ from ._actor import Actor, _start_actor, Arbiter
|
||||||
from ._trionics import open_nursery
|
from ._trionics import open_nursery
|
||||||
from ._state import current_actor
|
from ._state import current_actor
|
||||||
from ._exceptions import RemoteActorError, ModuleNotExposed
|
from ._exceptions import RemoteActorError, ModuleNotExposed
|
||||||
|
from ._debug import breakpoint, post_mortem
|
||||||
from . import msg
|
from . import msg
|
||||||
from . import _spawn
|
from . import _spawn
|
||||||
|
|
||||||
|
@ -103,14 +104,26 @@ def run(
|
||||||
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
|
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
|
||||||
# OR `trio_run_in_process` (the new default).
|
# OR `trio_run_in_process` (the new default).
|
||||||
start_method: Optional[str] = None,
|
start_method: Optional[str] = None,
|
||||||
|
debug_mode: bool = False,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) -> Any:
|
) -> Any:
|
||||||
"""Run a trio-actor async function in process.
|
"""Run a trio-actor async function in process.
|
||||||
|
|
||||||
This is tractor's main entry and the start point for any async actor.
|
This is tractor's main entry and the start point for any async actor.
|
||||||
"""
|
"""
|
||||||
|
# mark top most level process as root actor
|
||||||
|
_state._runtime_vars['_is_root'] = True
|
||||||
|
|
||||||
if start_method is not None:
|
if start_method is not None:
|
||||||
_spawn.try_set_start_method(start_method)
|
_spawn.try_set_start_method(start_method)
|
||||||
|
|
||||||
|
if debug_mode:
|
||||||
|
_state._runtime_vars['_debug_mode'] = True
|
||||||
|
|
||||||
|
# expose internal debug module to every actor allowing
|
||||||
|
# for use of ``await tractor.breakpoint()``
|
||||||
|
kwargs.setdefault('rpc_module_paths', []).append('tractor._debug')
|
||||||
|
|
||||||
return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name)
|
return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,7 @@ from itertools import chain
|
||||||
import importlib
|
import importlib
|
||||||
import importlib.util
|
import importlib.util
|
||||||
import inspect
|
import inspect
|
||||||
|
import bdb
|
||||||
import uuid
|
import uuid
|
||||||
import typing
|
import typing
|
||||||
from typing import Dict, List, Tuple, Any, Optional
|
from typing import Dict, List, Tuple, Any, Optional
|
||||||
|
@ -125,8 +126,15 @@ async def _invoke(
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
await chan.send({'return': await coro, 'cid': cid})
|
await chan.send({'return': await coro, 'cid': cid})
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
# always ship errors back to caller
|
|
||||||
log.exception("Actor errored:")
|
log.exception("Actor errored:")
|
||||||
|
|
||||||
|
# NOTE: don't enter debug mode recursively after quitting pdb
|
||||||
|
if _state.debug_mode() and not isinstance(err, bdb.BdbQuit):
|
||||||
|
# Allow for pdb control in parent
|
||||||
|
from ._debug import post_mortem
|
||||||
|
await post_mortem()
|
||||||
|
|
||||||
|
# always ship errors back to caller
|
||||||
err_msg = pack_error(err)
|
err_msg = pack_error(err)
|
||||||
err_msg['cid'] = cid
|
err_msg['cid'] = cid
|
||||||
try:
|
try:
|
||||||
|
@ -182,6 +190,7 @@ class Actor:
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
*,
|
||||||
rpc_module_paths: List[str] = [],
|
rpc_module_paths: List[str] = [],
|
||||||
statespace: Optional[Dict[str, Any]] = None,
|
statespace: Optional[Dict[str, Any]] = None,
|
||||||
uid: str = None,
|
uid: str = None,
|
||||||
|
@ -235,6 +244,7 @@ class Actor:
|
||||||
self._parent_chan: Optional[Channel] = None
|
self._parent_chan: Optional[Channel] = None
|
||||||
self._forkserver_info: Optional[
|
self._forkserver_info: Optional[
|
||||||
Tuple[Any, Any, Any, Any, Any]] = None
|
Tuple[Any, Any, Any, Any, Any]] = None
|
||||||
|
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {}
|
||||||
|
|
||||||
async def wait_for_peer(
|
async def wait_for_peer(
|
||||||
self, uid: Tuple[str, str]
|
self, uid: Tuple[str, str]
|
||||||
|
@ -709,11 +719,17 @@ class Actor:
|
||||||
- cancelling all rpc tasks
|
- cancelling all rpc tasks
|
||||||
- cancelling the channel server
|
- cancelling the channel server
|
||||||
- cancel the "root" nursery
|
- cancel the "root" nursery
|
||||||
|
- if root actor, cancel all nurseries under the root ``trio`` task
|
||||||
"""
|
"""
|
||||||
# cancel all ongoing rpc tasks
|
# cancel all ongoing rpc tasks
|
||||||
await self.cancel_rpc_tasks()
|
await self.cancel_rpc_tasks()
|
||||||
self.cancel_server()
|
self.cancel_server()
|
||||||
self._root_nursery.cancel_scope.cancel()
|
self._root_nursery.cancel_scope.cancel()
|
||||||
|
if self._parent_chan is None: # TODO: more robust check
|
||||||
|
# we're the root actor or zombied
|
||||||
|
root = trio.lowlevel.current_root_task()
|
||||||
|
for n in root.child_nurseries:
|
||||||
|
n.cancel_scope.cancel()
|
||||||
|
|
||||||
async def _cancel_task(self, cid, chan):
|
async def _cancel_task(self, cid, chan):
|
||||||
"""Cancel a local task by call-id / channel.
|
"""Cancel a local task by call-id / channel.
|
||||||
|
|
|
@ -0,0 +1,154 @@
|
||||||
|
"""
|
||||||
|
Multi-core debugging for da peeps!
|
||||||
|
"""
|
||||||
|
import pdb
|
||||||
|
import sys
|
||||||
|
import tty
|
||||||
|
import termios
|
||||||
|
from functools import partial
|
||||||
|
from typing import Awaitable, Tuple
|
||||||
|
|
||||||
|
from async_generator import aclosing
|
||||||
|
import tractor
|
||||||
|
import trio
|
||||||
|
|
||||||
|
from .log import get_logger
|
||||||
|
|
||||||
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ['breakpoint', 'post_mortem']
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: is there some way to determine this programatically?
|
||||||
|
_pdb_exit_patterns = tuple(
|
||||||
|
str.encode(patt + "\n") for patt in ('c', 'cont', 'continue', 'q', 'quit')
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def subactoruid2proc(
|
||||||
|
actor: 'Actor', # noqa
|
||||||
|
uid: Tuple[str, str]
|
||||||
|
) -> trio.Process:
|
||||||
|
n = actor._actoruid2nursery[uid]
|
||||||
|
_, proc, _ = n._children[uid]
|
||||||
|
return proc
|
||||||
|
|
||||||
|
|
||||||
|
async def _hijack_stdin_relay_to_child(
|
||||||
|
subactor_uid: Tuple[str, str]
|
||||||
|
) -> None:
|
||||||
|
actor = tractor.current_actor()
|
||||||
|
proc = subactoruid2proc(actor, subactor_uid)
|
||||||
|
|
||||||
|
nlb = []
|
||||||
|
line = []
|
||||||
|
|
||||||
|
async def hijack_stdin():
|
||||||
|
log.info(f"Hijacking stdin from {actor.uid}")
|
||||||
|
try:
|
||||||
|
# disable cooked mode
|
||||||
|
fd = sys.stdin.fileno()
|
||||||
|
old = tty.tcgetattr(fd)
|
||||||
|
# tty.setcbreak(fd)
|
||||||
|
tty.setcbreak(fd)
|
||||||
|
|
||||||
|
# trap std in and relay to subproc
|
||||||
|
async_stdin = trio.wrap_file(sys.stdin)
|
||||||
|
|
||||||
|
async with aclosing(async_stdin):
|
||||||
|
while True:
|
||||||
|
# async for msg in async_stdin:
|
||||||
|
msg = await async_stdin.read(1)
|
||||||
|
nlb.append(msg)
|
||||||
|
# encode to bytes
|
||||||
|
bmsg = str.encode(msg)
|
||||||
|
log.trace(f"Stdin str input:\n{msg}")
|
||||||
|
log.trace(f"Stdin bytes input:\n{bmsg}")
|
||||||
|
|
||||||
|
# relay bytes to subproc over pipe
|
||||||
|
await proc.stdin.send_all(bmsg)
|
||||||
|
# termios.tcflush(fd, termios.TCIFLUSH)
|
||||||
|
|
||||||
|
# don't write control chars to local stdout
|
||||||
|
if bmsg not in (b'\t'):
|
||||||
|
# mirror input to stdout
|
||||||
|
sys.stdout.write(msg)
|
||||||
|
sys.stdout.flush()
|
||||||
|
|
||||||
|
if bmsg == b'\n':
|
||||||
|
line = str.encode(''.join(nlb))
|
||||||
|
# print(line.decode())
|
||||||
|
if line in _pdb_exit_patterns:
|
||||||
|
log.info("Closing stdin hijack")
|
||||||
|
line = []
|
||||||
|
break
|
||||||
|
finally:
|
||||||
|
tty.tcsetattr(fd, tty.TCSAFLUSH, old)
|
||||||
|
|
||||||
|
# schedule hijacking in root scope
|
||||||
|
actor._root_nursery.start_soon(hijack_stdin)
|
||||||
|
|
||||||
|
|
||||||
|
# XXX: We only make this sync in case someone wants to
|
||||||
|
# overload the ``breakpoint()`` built-in.
|
||||||
|
def _breakpoint(debug_func) -> Awaitable[None]:
|
||||||
|
"""``tractor`` breakpoint entry for engaging pdb machinery
|
||||||
|
in subactors.
|
||||||
|
"""
|
||||||
|
actor = tractor.current_actor()
|
||||||
|
try:
|
||||||
|
import ipdb
|
||||||
|
db = ipdb
|
||||||
|
except ImportError:
|
||||||
|
import pdb
|
||||||
|
db = pdb
|
||||||
|
|
||||||
|
async def wait_for_parent_stdin_hijack():
|
||||||
|
log.debug('Breakpoint engaged!')
|
||||||
|
|
||||||
|
# TODO: need a more robust check for the "root" actor
|
||||||
|
if actor._parent_chan:
|
||||||
|
async with tractor._portal.open_portal(
|
||||||
|
actor._parent_chan,
|
||||||
|
start_msg_loop=False,
|
||||||
|
) as portal:
|
||||||
|
# with trio.fail_after(1):
|
||||||
|
await portal.run(
|
||||||
|
'tractor._debug',
|
||||||
|
'_hijack_stdin_relay_to_child',
|
||||||
|
subactor_uid=actor.uid,
|
||||||
|
)
|
||||||
|
|
||||||
|
# block here one frame up where ``breakpoint()``
|
||||||
|
# was awaited and begin handling stdin
|
||||||
|
debug_func(actor, db)
|
||||||
|
|
||||||
|
# this must be awaited by caller
|
||||||
|
return wait_for_parent_stdin_hijack()
|
||||||
|
|
||||||
|
|
||||||
|
def _set_trace(actor, dbmod):
|
||||||
|
dbmod.set_trace(
|
||||||
|
header=f"\nAttaching pdb to actor: {actor.uid}\n",
|
||||||
|
# start 2 levels up
|
||||||
|
frame=sys._getframe().f_back.f_back,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
breakpoint = partial(
|
||||||
|
_breakpoint,
|
||||||
|
_set_trace,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _post_mortem(actor, dbmod):
|
||||||
|
log.error(
|
||||||
|
f"\nAttaching to {dbmod} in crashed actor: {actor.uid}\n")
|
||||||
|
dbmod.post_mortem()
|
||||||
|
|
||||||
|
|
||||||
|
post_mortem = partial(
|
||||||
|
_breakpoint,
|
||||||
|
_post_mortem,
|
||||||
|
)
|
|
@ -2,7 +2,8 @@
|
||||||
Process entry points.
|
Process entry points.
|
||||||
"""
|
"""
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from typing import Tuple, Any
|
from typing import Tuple, Any, Dict
|
||||||
|
import os
|
||||||
|
|
||||||
import trio # type: ignore
|
import trio # type: ignore
|
||||||
|
|
||||||
|
@ -54,13 +55,17 @@ def _mp_main(
|
||||||
async def _trio_main(
|
async def _trio_main(
|
||||||
actor: 'Actor',
|
actor: 'Actor',
|
||||||
accept_addr: Tuple[str, int],
|
accept_addr: Tuple[str, int],
|
||||||
parent_addr: Tuple[str, int] = None
|
parent_addr: Tuple[str, int],
|
||||||
|
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Entry point for a `trio_run_in_process` subactor.
|
"""Entry point for a `trio_run_in_process` subactor.
|
||||||
|
|
||||||
Here we don't need to call `trio.run()` since trip does that as
|
Here we don't need to call `trio.run()` since trip does that as
|
||||||
part of its subprocess startup sequence.
|
part of its subprocess startup sequence.
|
||||||
"""
|
"""
|
||||||
|
# TODO: make a global func to set this or is it too hacky?
|
||||||
|
# os.environ['PYTHONBREAKPOINT'] = 'tractor._debug.breakpoint'
|
||||||
|
|
||||||
if actor.loglevel is not None:
|
if actor.loglevel is not None:
|
||||||
log.info(
|
log.info(
|
||||||
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
|
f"Setting loglevel for {actor.uid} to {actor.loglevel}")
|
||||||
|
@ -69,6 +74,7 @@ async def _trio_main(
|
||||||
log.info(f"Started new trio process for {actor.uid}")
|
log.info(f"Started new trio process for {actor.uid}")
|
||||||
|
|
||||||
_state._current_actor = actor
|
_state._current_actor = actor
|
||||||
|
_state._runtime_vars.update(_runtime_vars)
|
||||||
|
|
||||||
await actor._async_main(accept_addr, parent_addr=parent_addr)
|
await actor._async_main(accept_addr, parent_addr=parent_addr)
|
||||||
log.info(f"Actor {actor.uid} terminated")
|
log.info(f"Actor {actor.uid} terminated")
|
||||||
|
|
|
@ -314,7 +314,8 @@ class LocalPortal:
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def open_portal(
|
async def open_portal(
|
||||||
channel: Channel,
|
channel: Channel,
|
||||||
nursery: Optional[trio.Nursery] = None
|
nursery: Optional[trio.Nursery] = None,
|
||||||
|
start_msg_loop: bool = True,
|
||||||
) -> typing.AsyncGenerator[Portal, None]:
|
) -> typing.AsyncGenerator[Portal, None]:
|
||||||
"""Open a ``Portal`` through the provided ``channel``.
|
"""Open a ``Portal`` through the provided ``channel``.
|
||||||
|
|
||||||
|
@ -332,15 +333,17 @@ async def open_portal(
|
||||||
if channel.uid is None:
|
if channel.uid is None:
|
||||||
await actor._do_handshake(channel)
|
await actor._do_handshake(channel)
|
||||||
|
|
||||||
msg_loop_cs: trio.CancelScope = await nursery.start(
|
msg_loop_cs = None
|
||||||
partial(
|
if start_msg_loop:
|
||||||
actor._process_messages,
|
msg_loop_cs: trio.CancelScope = await nursery.start(
|
||||||
channel,
|
partial(
|
||||||
# if the local task is cancelled we want to keep
|
actor._process_messages,
|
||||||
# the msg loop running until our block ends
|
channel,
|
||||||
shield=True,
|
# if the local task is cancelled we want to keep
|
||||||
|
# the msg loop running until our block ends
|
||||||
|
shield=True,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
|
||||||
portal = Portal(channel)
|
portal = Portal(channel)
|
||||||
try:
|
try:
|
||||||
yield portal
|
yield portal
|
||||||
|
@ -352,6 +355,7 @@ async def open_portal(
|
||||||
await channel.send(None)
|
await channel.send(None)
|
||||||
|
|
||||||
# cancel background msg loop task
|
# cancel background msg loop task
|
||||||
msg_loop_cs.cancel()
|
if msg_loop_cs:
|
||||||
|
msg_loop_cs.cancel()
|
||||||
|
|
||||||
nursery.cancel_scope.cancel()
|
nursery.cancel_scope.cancel()
|
||||||
|
|
|
@ -26,7 +26,7 @@ from multiprocessing import forkserver # type: ignore
|
||||||
from typing import Tuple
|
from typing import Tuple
|
||||||
|
|
||||||
from . import _forkserver_override
|
from . import _forkserver_override
|
||||||
from ._state import current_actor
|
from ._state import current_actor, is_main_process
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from ._actor import Actor, ActorFailure
|
from ._actor import Actor, ActorFailure
|
||||||
|
@ -90,12 +90,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
|
||||||
return _ctx
|
return _ctx
|
||||||
|
|
||||||
|
|
||||||
def is_main_process() -> bool:
|
|
||||||
"""Bool determining if this actor is running in the top-most process.
|
|
||||||
"""
|
|
||||||
return mp.current_process().name == 'MainProcess'
|
|
||||||
|
|
||||||
|
|
||||||
async def exhaust_portal(
|
async def exhaust_portal(
|
||||||
portal: Portal,
|
portal: Portal,
|
||||||
actor: Actor
|
actor: Actor
|
||||||
|
@ -188,6 +182,8 @@ async def new_proc(
|
||||||
# passed through to actor main
|
# passed through to actor main
|
||||||
bind_addr: Tuple[str, int],
|
bind_addr: Tuple[str, int],
|
||||||
parent_addr: Tuple[str, int],
|
parent_addr: Tuple[str, int],
|
||||||
|
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
||||||
|
*,
|
||||||
use_trio_run_in_process: bool = False,
|
use_trio_run_in_process: bool = False,
|
||||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -207,6 +203,7 @@ async def new_proc(
|
||||||
subactor,
|
subactor,
|
||||||
bind_addr,
|
bind_addr,
|
||||||
parent_addr,
|
parent_addr,
|
||||||
|
_runtime_vars=_runtime_vars,
|
||||||
) as proc:
|
) as proc:
|
||||||
log.info(f"Started {proc}")
|
log.info(f"Started {proc}")
|
||||||
|
|
||||||
|
@ -218,6 +215,10 @@ async def new_proc(
|
||||||
portal = Portal(chan)
|
portal = Portal(chan)
|
||||||
actor_nursery._children[subactor.uid] = (
|
actor_nursery._children[subactor.uid] = (
|
||||||
subactor, proc, portal)
|
subactor, proc, portal)
|
||||||
|
|
||||||
|
curr_actor = current_actor()
|
||||||
|
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
|
||||||
|
|
||||||
task_status.started(portal)
|
task_status.started(portal)
|
||||||
|
|
||||||
# wait for ActorNursery.wait() to be called
|
# wait for ActorNursery.wait() to be called
|
||||||
|
|
|
@ -3,10 +3,15 @@ Per process state
|
||||||
"""
|
"""
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from collections import Mapping
|
from collections import Mapping
|
||||||
|
import multiprocessing as mp
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
_current_actor: Optional['Actor'] = None # type: ignore
|
_current_actor: Optional['Actor'] = None # type: ignore
|
||||||
|
_runtime_vars = {
|
||||||
|
'_debug_mode': False,
|
||||||
|
'_is_root': False,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def current_actor() -> 'Actor': # type: ignore
|
def current_actor() -> 'Actor': # type: ignore
|
||||||
|
@ -36,3 +41,20 @@ class ActorContextInfo(Mapping):
|
||||||
except RuntimeError:
|
except RuntimeError:
|
||||||
# no local actor/task context initialized yet
|
# no local actor/task context initialized yet
|
||||||
return f'no {key} context'
|
return f'no {key} context'
|
||||||
|
|
||||||
|
|
||||||
|
def is_main_process() -> bool:
|
||||||
|
"""Bool determining if this actor is running in the top-most process.
|
||||||
|
"""
|
||||||
|
return mp.current_process().name == 'MainProcess'
|
||||||
|
|
||||||
|
|
||||||
|
def debug_mode() -> bool:
|
||||||
|
"""Bool determining if "debug mode" is on which enables
|
||||||
|
remote subactor pdb entry on crashes.
|
||||||
|
"""
|
||||||
|
return _runtime_vars['_debug_mode']
|
||||||
|
|
||||||
|
|
||||||
|
def is_root_process() -> bool:
|
||||||
|
return _runtime_vars['_is_root']
|
||||||
|
|
|
@ -13,10 +13,11 @@ from ._state import current_actor
|
||||||
from .log import get_logger, get_loglevel
|
from .log import get_logger, get_loglevel
|
||||||
from ._actor import Actor
|
from ._actor import Actor
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
|
from . import _state
|
||||||
from . import _spawn
|
from . import _spawn
|
||||||
|
|
||||||
|
|
||||||
log = get_logger('tractor')
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ActorNursery:
|
class ActorNursery:
|
||||||
|
@ -56,6 +57,10 @@ class ActorNursery:
|
||||||
) -> Portal:
|
) -> Portal:
|
||||||
loglevel = loglevel or self._actor.loglevel or get_loglevel()
|
loglevel = loglevel or self._actor.loglevel or get_loglevel()
|
||||||
|
|
||||||
|
# configure and pass runtime state
|
||||||
|
_rtv = _state._runtime_vars.copy()
|
||||||
|
_rtv['_is_root'] = False
|
||||||
|
|
||||||
subactor = Actor(
|
subactor = Actor(
|
||||||
name,
|
name,
|
||||||
# modules allowed to invoked funcs from
|
# modules allowed to invoked funcs from
|
||||||
|
@ -81,6 +86,7 @@ class ActorNursery:
|
||||||
self.errors,
|
self.errors,
|
||||||
bind_addr,
|
bind_addr,
|
||||||
parent_addr,
|
parent_addr,
|
||||||
|
_rtv, # run time vars
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue