forked from goodboy/tractor
Drop entrypoints from `Actor`
parent
d19c0f9b1f
commit
fcd1566834
|
@ -179,6 +179,9 @@ class Actor:
|
||||||
# Information about `__main__` from parent
|
# Information about `__main__` from parent
|
||||||
_parent_main_data: Dict[str, str]
|
_parent_main_data: Dict[str, str]
|
||||||
|
|
||||||
|
# if started on ``asycio`` running ``trio`` in guest mode
|
||||||
|
_infected_aio: bool = False
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
@ -539,58 +542,6 @@ class Actor:
|
||||||
f"Exiting msg loop for {chan} from {chan.uid} "
|
f"Exiting msg loop for {chan} from {chan.uid} "
|
||||||
f"with last msg:\n{msg}")
|
f"with last msg:\n{msg}")
|
||||||
|
|
||||||
def _mp_main(
|
|
||||||
self,
|
|
||||||
accept_addr: Tuple[str, int],
|
|
||||||
forkserver_info: Tuple[Any, Any, Any, Any, Any],
|
|
||||||
start_method: str,
|
|
||||||
parent_addr: Tuple[str, int] = None
|
|
||||||
) -> None:
|
|
||||||
"""The routine called *after fork* which invokes a fresh ``trio.run``
|
|
||||||
"""
|
|
||||||
self._forkserver_info = forkserver_info
|
|
||||||
from ._spawn import try_set_start_method
|
|
||||||
spawn_ctx = try_set_start_method(start_method)
|
|
||||||
|
|
||||||
if self.loglevel is not None:
|
|
||||||
log.info(
|
|
||||||
f"Setting loglevel for {self.uid} to {self.loglevel}")
|
|
||||||
get_console_log(self.loglevel)
|
|
||||||
|
|
||||||
assert spawn_ctx
|
|
||||||
log.info(
|
|
||||||
f"Started new {spawn_ctx.current_process()} for {self.uid}")
|
|
||||||
|
|
||||||
_state._current_actor = self
|
|
||||||
|
|
||||||
log.debug(f"parent_addr is {parent_addr}")
|
|
||||||
try:
|
|
||||||
trio.run(partial(
|
|
||||||
self._async_main, accept_addr, parent_addr=parent_addr))
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass # handle it the same way trio does?
|
|
||||||
log.info(f"Actor {self.uid} terminated")
|
|
||||||
|
|
||||||
async def _trip_main(
|
|
||||||
self,
|
|
||||||
accept_addr: Tuple[str, int],
|
|
||||||
parent_addr: Tuple[str, int] = None
|
|
||||||
) -> None:
|
|
||||||
"""Entry point for a `trio_run_in_process` subactor.
|
|
||||||
|
|
||||||
Here we don't need to call `trio.run()` since trip does that as
|
|
||||||
part of its subprocess startup sequence.
|
|
||||||
"""
|
|
||||||
if self.loglevel is not None:
|
|
||||||
log.info(
|
|
||||||
f"Setting loglevel for {self.uid} to {self.loglevel}")
|
|
||||||
get_console_log(self.loglevel)
|
|
||||||
|
|
||||||
log.info(f"Started new TRIP process for {self.uid}")
|
|
||||||
_state._current_actor = self
|
|
||||||
await self._async_main(accept_addr, parent_addr=parent_addr)
|
|
||||||
log.info(f"Actor {self.uid} terminated")
|
|
||||||
|
|
||||||
async def _async_main(
|
async def _async_main(
|
||||||
self,
|
self,
|
||||||
accept_addr: Tuple[str, int],
|
accept_addr: Tuple[str, int],
|
||||||
|
@ -846,6 +797,8 @@ class Actor:
|
||||||
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
|
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
|
||||||
return uid
|
return uid
|
||||||
|
|
||||||
|
def is_infected_aio(self) -> bool:
|
||||||
|
return self._infected_aio
|
||||||
|
|
||||||
class Arbiter(Actor):
|
class Arbiter(Actor):
|
||||||
"""A special actor who knows all the other actors and always has
|
"""A special actor who knows all the other actors and always has
|
||||||
|
|
Loading…
Reference in New Issue