forked from goodboy/tractor
1
0
Fork 0

Expose per-actor registry addrs via `.reg_addrs`

Since it's handy to be able to debug the *writing* of this instance var
(particularly when checking state passed down to a child in
`Actor._from_parent()`), rename and wrap the underlying
`Actor._reg_addrs` as a settable `@property` and add validation to
the `.setter` for sanity - actor discovery is a critical functionality.

Other tweaks:
- fix `.cancel_soon()` to pass expected argument..
- update internal runtime error message to be simpler and link to GH issues.
- use new `Actor.reg_addrs` throughout core.
multihomed
Tyler Goodlet 2023-10-19 12:05:44 -04:00
parent a3ed30e62b
commit 1d6f55543d
1 changed files with 78 additions and 29 deletions

View File

@ -553,12 +553,6 @@ class Actor:
) )
registry_addrs: list[tuple[str, int]] = [arbiter_addr] registry_addrs: list[tuple[str, int]] = [arbiter_addr]
self._reg_addrs: list[tuple[str, int]] = (
registry_addrs
or
None
)
# marked by the process spawning backend at startup # marked by the process spawning backend at startup
# will be None for the parent most process started manually # will be None for the parent most process started manually
# by the user (currently called the "arbiter") # by the user (currently called the "arbiter")
@ -591,6 +585,44 @@ class Actor:
ActorNursery | None, ActorNursery | None,
] = {} # type: ignore # noqa ] = {} # type: ignore # noqa
# when provided, init the registry addresses property from
# input via the validator.
self._reg_addrs: list[tuple[str, int]] = []
if registry_addrs:
self.reg_addrs: list[tuple[str, int]] = registry_addrs
@property
def reg_addrs(self) -> list[tuple[str, int]]:
'''
List of (socket) addresses for all known (and contactable)
registry actors.
'''
return self._reg_addrs
@reg_addrs.setter
def reg_addrs(
self,
addrs: list[tuple[str, int]],
) -> None:
if not addrs:
log.warning(
'Empty registry address list is invalid:\n'
f'{addrs}'
)
return
# always sanity check the input list since it's critical
# that addrs are correct for discovery sys operation.
for addr in addrs:
if not isinstance(addr, tuple):
raise ValueError(
'Expected `Actor.reg_addrs: list[tuple[str, int]]`\n'
f'Got {addrs}'
)
self._reg_addrs = addrs
async def wait_for_peer( async def wait_for_peer(
self, uid: tuple[str, str] self, uid: tuple[str, str]
) -> tuple[trio.Event, Channel]: ) -> tuple[trio.Event, Channel]:
@ -670,9 +702,10 @@ class Actor:
stream: trio.SocketStream, stream: trio.SocketStream,
) -> None: ) -> None:
"""Entry point for new inbound connections to the channel server. '''
Entry point for new inbound connections to the channel server.
""" '''
self._no_more_peers = trio.Event() # unset self._no_more_peers = trio.Event() # unset
chan = Channel.from_stream(stream) chan = Channel.from_stream(stream)
@ -792,17 +825,21 @@ class Actor:
if disconnected: if disconnected:
# if the transport died and this actor is still # if the transport died and this actor is still
# registered within a local nursery, we report that the # registered within a local nursery, we report
# IPC layer may have failed unexpectedly since it may be # that the IPC layer may have failed
# the cause of other downstream errors. # unexpectedly since it may be the cause of
# other downstream errors.
entry = local_nursery._children.get(uid) entry = local_nursery._children.get(uid)
if entry: if entry:
_, proc, _ = entry _, proc, _ = entry
poll = getattr(proc, 'poll', None) if (
if poll and poll() is None: (poll := getattr(proc, 'poll', None))
and poll() is None
):
log.cancel( log.cancel(
f'Actor {uid} IPC broke but proc is alive?' f'Actor {uid} IPC broke but proc is alive?\n'
'Attempting to self cancel..'
) )
# ``Channel`` teardown and closure sequence # ``Channel`` teardown and closure sequence
@ -1016,14 +1053,18 @@ class Actor:
_state._runtime_vars.update(rvs) _state._runtime_vars.update(rvs)
for attr, value in parent_data.items(): for attr, value in parent_data.items():
if (
if attr == '_reg_addrs': attr == 'reg_addrs'
and value
):
# XXX: ``msgspec`` doesn't support serializing tuples # XXX: ``msgspec`` doesn't support serializing tuples
# so just cash manually here since it's what our # so just cash manually here since it's what our
# internals expect. # internals expect.
self._reg_addrs = [ # TODO: we don't really NEED these as
tuple(val) for val in value # tuples so we can probably drop this
] if value else None # casting since apparently in python lists
# are "more efficient"?
self.reg_addrs = [tuple(val) for val in value]
else: else:
setattr(self, attr, value) setattr(self, attr, value)
@ -1099,7 +1140,10 @@ class Actor:
''' '''
assert self._service_n assert self._service_n
self._service_n.start_soon(self.cancel) self._service_n.start_soon(
self.cancel,
self.uid,
)
async def cancel( async def cancel(
self, self,
@ -1445,9 +1489,12 @@ async def async_main(
# if addresses point to the same actor.. # if addresses point to the same actor..
# So we need a way to detect that? maybe iterate # So we need a way to detect that? maybe iterate
# only on unique actor uids? # only on unique actor uids?
for addr in actor._reg_addrs: for addr in actor.reg_addrs:
assert isinstance(addr, tuple) try:
assert addr[1] # non-zero after bind assert isinstance(addr, tuple)
assert addr[1] # non-zero after bind
except AssertionError:
await _debug.pause()
async with get_registry(*addr) as reg_portal: async with get_registry(*addr) as reg_portal:
for accept_addr in accept_addrs: for accept_addr in accept_addrs:
@ -1500,12 +1547,14 @@ async def async_main(
# once we have that all working with std streams locking? # once we have that all working with std streams locking?
log.exception( log.exception(
f"Actor errored and failed to register with arbiter " f"Actor errored and failed to register with arbiter "
f"@ {actor._reg_addrs[0]}?") f"@ {actor.reg_addrs[0]}?")
log.error( log.error(
"\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n" "\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n"
"\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n" "\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n"
"\tIf this is a sub-actor likely its parent will keep running " "\tIf this is a sub-actor hopefully its parent will keep running "
"\tcorrectly if this error is caught and ignored.." "correctly presuming this error was safely ignored..\n\n"
"\tPLEASE REPORT THIS TRACEBACK IN A BUG REPORT: "
"https://github.com/goodboy/tractor/issues\n"
) )
if actor._parent_chan: if actor._parent_chan:
@ -1546,7 +1595,7 @@ async def async_main(
and not actor.is_registrar and not actor.is_registrar
): ):
failed: bool = False failed: bool = False
for addr in actor._reg_addrs: for addr in actor.reg_addrs:
assert isinstance(addr, tuple) assert isinstance(addr, tuple)
with trio.move_on_after(0.5) as cs: with trio.move_on_after(0.5) as cs:
cs.shield = True cs.shield = True