forked from goodboy/tractor
1
0
Fork 0

Fix that thing where the first example in your docs is supposed to work

Thanks to @salotz for pointing out that the first example in the docs
was broken. Though it's somewhat embarrassing this might also explain
the problem in #79 and certain issues in #59...

The solution here is to import the target RPC module using the its
unique basename and absolute filepath in the sub-actor that requires it.
Special handling for `__main__` and `__mp_main__` is needed since the
spawned subprocess will have no knowledge about these parent-
-state-specific module variables. Solution: map the modules name to the
respective module file basename in the child process since the module
variables will of course have different values in children.
try_trip^2
Tyler Goodlet 2020-01-29 00:51:25 -05:00
parent 7feef44798
commit 2a4307975d
3 changed files with 26 additions and 21 deletions

View File

@ -151,6 +151,10 @@ async def _invoke(
actor._ongoing_rpc_tasks.set() actor._ongoing_rpc_tasks.set()
def _get_mod_abspath(module):
return os.path.abspath(module.__file__)
class Actor: class Actor:
"""The fundamental concurrency primitive. """The fundamental concurrency primitive.
@ -178,9 +182,13 @@ class Actor:
self.uid = (name, uid or str(uuid.uuid4())) self.uid = (name, uid or str(uuid.uuid4()))
mods = {} mods = {}
for path in rpc_module_paths or (): for name in rpc_module_paths or ():
mod = importlib.import_module(path) mod = importlib.import_module(name)
mods[path] = mod.__file__ suffix_index = mod.__file__.find('.py')
unique_modname = os.path.basename(mod.__file__[:suffix_index])
mods[unique_modname] = _get_mod_abspath(mod)
if mod.__name__ == '__main__' or mod.__name__ == '__mp_main__':
self._main_mod = unique_modname
self.rpc_module_paths = mods self.rpc_module_paths = mods
self._mods: dict = {} self._mods: dict = {}
@ -235,21 +243,20 @@ class Actor:
code (if it exists). code (if it exists).
""" """
try: try:
for path, absfilepath in self.rpc_module_paths.items(): for modname, absfilepath in self.rpc_module_paths.items():
log.debug(f"Attempting to import {path}") sys.path.append(os.path.dirname(absfilepath))
# spec = importlib.util.spec_from_file_location( log.debug(f"Attempting to import {modname}@{absfilepath}")
# path, absfilepath) spec = importlib.util.spec_from_file_location(
# mod = importlib.util.module_from_spec(spec) modname, absfilepath)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod) # type: ignore
self._mods[modname] = mod
# XXX append the allowed module to the python path # XXX append the allowed module to the python path
# which should allow for relative (at least downward) # which should allow for relative (at least downward)
# imports. Seems to be the only that will work currently # imports. Seems to be the only that will work currently
# to get `trio-run-in-process` to import modules we "send # to get `trio-run-in-process` to import modules we "send
# it". # it".
sys.path.append(os.path.dirname(absfilepath))
# spec.loader.exec_module(mod)
mod = importlib.import_module(path)
self._mods[path] = mod
# if self.name != 'arbiter': # if self.name != 'arbiter':
# importlib.import_module('doggy') # importlib.import_module('doggy')
@ -257,10 +264,14 @@ class Actor:
except ModuleNotFoundError: except ModuleNotFoundError:
# it is expected the corresponding `ModuleNotExposed` error # it is expected the corresponding `ModuleNotExposed` error
# will be raised later # will be raised later
log.error(f"Failed to import {path} in {self.name}") log.error(f"Failed to import {modname} in {self.name}")
raise raise
def _get_rpc_func(self, ns, funcname): def _get_rpc_func(self, ns, funcname):
if ns == '__main__' or ns == '__mp_main__':
# lookup the specific module in the child denoted
# as `__main__`/`__mp_main__` in the parent
ns = self._main_mod
try: try:
return getattr(self._mods[ns], funcname) return getattr(self._mods[ns], funcname)
except KeyError as err: except KeyError as err:
@ -640,8 +651,6 @@ class Actor:
# blocks here as expected until the channel server is # blocks here as expected until the channel server is
# killed (i.e. this actor is cancelled or signalled by the parent) # killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as err: except Exception as err:
# if self.name == 'arbiter':
# import pdb; pdb.set_trace()
if not registered_with_arbiter: if not registered_with_arbiter:
log.exception( log.exception(
f"Actor errored and failed to register with arbiter " f"Actor errored and failed to register with arbiter "
@ -666,8 +675,6 @@ class Actor:
raise raise
finally: finally:
# if self.name == 'arbiter':
# import pdb; pdb.set_trace()
if registered_with_arbiter: if registered_with_arbiter:
await self._do_unreg(arbiter_addr) await self._do_unreg(arbiter_addr)
# terminate actor once all it's peers (actors that connected # terminate actor once all it's peers (actors that connected
@ -713,8 +720,8 @@ class Actor:
port=accept_port, host=accept_host, port=accept_port, host=accept_host,
) )
) )
log.debug( log.debug(f"Started tcp server(s) on"
f"Started tcp server(s) on {[l.socket for l in listeners]}") # type: ignore " {[l.socket for l in listeners]}") # type: ignore
self._listeners.extend(listeners) self._listeners.extend(listeners)
task_status.started() task_status.started()

View File

@ -158,7 +158,6 @@ async def new_proc(
# passed through to actor main # passed through to actor main
bind_addr: Tuple[str, int], bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
begin_wait_phase: trio.Event,
use_trio_run_in_process: bool = False, use_trio_run_in_process: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:

View File

@ -78,7 +78,6 @@ class ActorNursery:
self.errors, self.errors,
bind_addr, bind_addr,
parent_addr, parent_addr,
nursery,
) )
async def run_in_actor( async def run_in_actor(