Removed arbiter_addr local, and bind_addr is now passed through channel, in early child actor init.

start_up_sequence_trickery
Guillermo Rodriguez 2020-07-28 11:55:11 -03:00
parent 8b44ec7a5d
commit 0a5691e0a8
No known key found for this signature in database
GPG Key ID: 3F61096EC7DF75A8
4 changed files with 22 additions and 34 deletions

View File

@ -542,8 +542,7 @@ class Actor:
async def _async_main( async def _async_main(
self, self,
accept_addr: Tuple[str, int], accept_addr: Optional[Tuple[str, int]] = None,
arbiter_addr: Optional[Tuple[str, int]] = None,
parent_addr: Optional[Tuple[str, int]] = None, parent_addr: Optional[Tuple[str, int]] = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -554,17 +553,10 @@ class Actor:
and when cancelled effectively cancels the actor. and when cancelled effectively cancels the actor.
""" """
registered_with_arbiter = False registered_with_arbiter = False
arbiter_addr = arbiter_addr or self._arb_addr
try: try:
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
self._root_nursery = nursery self._root_nursery = nursery
# Startup up channel server
host, port = accept_addr
await nursery.start(partial(
self._serve_forever, accept_host=host, accept_port=port)
)
if parent_addr is not None: if parent_addr is not None:
try: try:
# Connect back to the parent actor and conduct initial # Connect back to the parent actor and conduct initial
@ -583,9 +575,7 @@ class Actor:
for attr, value in parent_data.items(): for attr, value in parent_data.items():
setattr(self, attr, value) setattr(self, attr, value)
# update local arbiter_addr var accept_addr = self.bind_host, self.bind_port
if "_arb_addr" in parent_data:
arbiter_addr = self._arb_addr
except OSError: # failed to connect except OSError: # failed to connect
log.warning( log.warning(
@ -600,6 +590,12 @@ class Actor:
nursery.start_soon( nursery.start_soon(
self._process_messages, self._parent_chan) self._process_messages, self._parent_chan)
# Startup up channel server
host, port = accept_addr
await nursery.start(partial(
self._serve_forever, accept_host=host, accept_port=port)
)
# load exposed/allowed RPC modules # load exposed/allowed RPC modules
# XXX: do this **after** establishing connection to parent # XXX: do this **after** establishing connection to parent
# so that import errors are properly propagated upwards # so that import errors are properly propagated upwards
@ -607,8 +603,8 @@ class Actor:
# register with the arbiter if we're told its addr # register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`") log.debug(f"Registering {self} for role `{self.name}`")
assert isinstance(arbiter_addr, tuple) assert isinstance(self._arb_addr, tuple)
async with get_arbiter(*arbiter_addr) as arb_portal: async with get_arbiter(*self._arb_addr) as arb_portal:
await arb_portal.run( await arb_portal.run(
'self', 'register_actor', 'self', 'register_actor',
uid=self.uid, sockaddr=self.accept_addr) uid=self.uid, sockaddr=self.accept_addr)
@ -626,7 +622,7 @@ class Actor:
# once we have that all working with std streams locking? # once we have that all working with std streams locking?
log.exception( log.exception(
f"Actor errored and failed to register with arbiter " f"Actor errored and failed to register with arbiter "
f"@ {arbiter_addr}?") f"@ {self._arb_addr}?")
log.error( log.error(
"\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n" "\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n"
"\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n" "\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n"
@ -655,7 +651,7 @@ class Actor:
finally: finally:
if registered_with_arbiter: if registered_with_arbiter:
await self._do_unreg(arbiter_addr) await self._do_unreg(self._arb_addr)
# terminate actor once all it's peers (actors that connected # terminate actor once all it's peers (actors that connected
# to it as clients) have disappeared # to it as clients) have disappeared
if not self._no_more_peers.is_set(): if not self._no_more_peers.is_set():
@ -906,8 +902,7 @@ async def _start_actor(
partial( partial(
actor._async_main, actor._async_main,
accept_addr=(host, port), accept_addr=(host, port),
parent_addr=None, parent_addr=None
arbiter_addr=arbiter_addr,
) )
) )
result = await main() result = await main()

View File

@ -12,18 +12,12 @@ from ._entry import _trio_main
""" """
def parse_uid(arg): def parse_uid(arg):
uid = literal_eval(arg) name, uuid = literal_eval(arg) # ensure 2 elements
assert len(uid) == 2 return str(name), str(uuid) # ensures str encoding
assert isinstance(uid[0], str)
assert isinstance(uid[1], str)
return uid
def parse_ipaddr(arg): def parse_ipaddr(arg):
addr = literal_eval(arg) host, port = literal_eval(arg)
assert len(addr) == 2 return (str(host), int(port))
assert isinstance(addr[0], str)
assert isinstance(addr[1], int)
return addr
if __name__ == "__main__": if __name__ == "__main__":
@ -40,11 +34,10 @@ if __name__ == "__main__":
args.uid[0], args.uid[0],
uid=args.uid[1], uid=args.uid[1],
loglevel=args.loglevel, loglevel=args.loglevel,
spawn_method="trio" spawn_method="trio"
) )
_trio_main( _trio_main(
subactor, subactor,
("127.0.0.1", 0),
parent_addr=args.parent_addr parent_addr=args.parent_addr
) )

View File

@ -53,7 +53,6 @@ def _mp_main(
def _trio_main( def _trio_main(
actor: 'Actor', actor: 'Actor',
accept_addr: Tuple[str, int],
parent_addr: Tuple[str, int] = None parent_addr: Tuple[str, int] = None
) -> None: ) -> None:
"""Entry point for a `trio_run_in_process` subactor. """Entry point for a `trio_run_in_process` subactor.
@ -71,7 +70,6 @@ def _trio_main(
log.debug(f"parent_addr is {parent_addr}") log.debug(f"parent_addr is {parent_addr}")
trio_main = partial( trio_main = partial(
actor._async_main, actor._async_main,
accept_addr,
parent_addr=parent_addr parent_addr=parent_addr
) )
try: try:

View File

@ -227,7 +227,9 @@ async def new_proc(
"_parent_main_data": subactor._parent_main_data, "_parent_main_data": subactor._parent_main_data,
"rpc_module_paths": subactor.rpc_module_paths, "rpc_module_paths": subactor.rpc_module_paths,
"statespace": subactor.statespace, "statespace": subactor.statespace,
"_arb_addr": subactor._arb_addr "_arb_addr": subactor._arb_addr,
"bind_host": bind_addr[0],
"bind_port": bind_addr[1]
}) })
task_status.started(portal) task_status.started(portal)