forked from goodboy/tractor
1
0
Fork 0

Make `Actor._process_messages()` report disconnects

The method now returns a `bool` which flags whether the transport died
to the caller and allows for reporting a disconnect in the
channel-transport handler task. This is something a user will normally
want to know about on the caller side especially after seeing
a traceback from the peer (if in tree) on console.
sigintsaviour_ci_worked
Tyler Goodlet 2022-02-14 09:25:35 -05:00
parent d280c26f15
commit a5f543eb22
1 changed files with 47 additions and 18 deletions

View File

@ -200,7 +200,7 @@ async def _invoke(
ctx = actor._contexts.pop((chan.uid, cid)) ctx = actor._contexts.pop((chan.uid, cid))
if ctx: if ctx:
log.runtime( log.runtime(
f'Context entrypoint for {func} was terminated:\n{ctx}' f'Context entrypoint {func} was terminated:\n{ctx}'
) )
assert cs assert cs
@ -316,7 +316,9 @@ async def try_ship_error_to_parent(
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
): ):
log.error( # in SC terms this is one of the worst things that can
# happen and creates the 2-general's dilemma.
log.critical(
f"Failed to ship error to parent " f"Failed to ship error to parent "
f"{channel.uid}, channel was closed" f"{channel.uid}, channel was closed"
) )
@ -560,33 +562,49 @@ class Actor:
# append new channel # append new channel
self._peers[uid].append(chan) self._peers[uid].append(chan)
local_nursery: Optional['ActorNursery'] = None # noqa
# Begin channel management - respond to remote requests and # Begin channel management - respond to remote requests and
# process received reponses. # process received reponses.
try: try:
await self._process_messages(chan) disconnected = await self._process_messages(chan)
except trio.Cancelled: except trio.Cancelled:
log.cancel(f"Msg loop was cancelled for {chan}") log.cancel(f"Msg loop was cancelled for {chan}")
raise raise
finally: finally:
local_nursery = self._actoruid2nursery.get(uid, local_nursery)
# This is set in ``Portal.cancel_actor()``. So if # This is set in ``Portal.cancel_actor()``. So if
# the peer was cancelled we try to wait for them # the peer was cancelled we try to wait for them
# to tear down their side of the connection before # to tear down their side of the connection before
# moving on with closing our own side. # moving on with closing our own side.
local_nursery = self._actoruid2nursery.get(chan.uid)
if ( if (
local_nursery local_nursery
): ):
if disconnected:
# if the transport died and this actor is still
# registered within a local nursery, we report that the
# IPC layer may have failed unexpectedly since it may be
# the cause of other downstream errors.
entry = local_nursery._children.get(uid)
if entry:
_, proc, _ = entry
if proc.poll() is not None:
log.error('Actor {uid} proc died and IPC broke?')
else:
log.error(f'Actor {uid} IPC connection broke!?')
log.cancel(f"Waiting on cancel request to peer {chan.uid}") log.cancel(f"Waiting on cancel request to peer {chan.uid}")
# XXX: this is a soft wait on the channel (and its # XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the remote # underlying transport protocol) to close from the
# peer side since we presume that any channel which # remote peer side since we presume that any channel
# is mapped to a sub-actor (i.e. it's managed by # which is mapped to a sub-actor (i.e. it's managed by
# one of our local nurseries) # one of our local nurseries) has a message is sent to
# message is sent to the peer likely by this actor which is # the peer likely by this actor (which is now in
# now in a cancelled condition) when the local runtime here # a cancelled condition) when the local runtime here is
# is now cancelled while (presumably) in the middle of msg # now cancelled while (presumably) in the middle of msg
# loop processing. # loop processing.
with trio.move_on_after(0.5) as cs: with trio.move_on_after(0.5) as cs:
cs.shield = True cs.shield = True
@ -609,16 +627,19 @@ class Actor:
await local_nursery.exited.wait() await local_nursery.exited.wait()
# if local_nursery._children
# ``Channel`` teardown and closure sequence # ``Channel`` teardown and closure sequence
# Drop ref to channel so it can be gc-ed and disconnected # Drop ref to channel so it can be gc-ed and disconnected
log.runtime(f"Releasing channel {chan} from {chan.uid}") log.runtime(f"Releasing channel {chan} from {chan.uid}")
chans = self._peers.get(chan.uid) chans = self._peers.get(chan.uid)
chans.remove(chan) chans.remove(chan)
uid = chan.uid
if not chans: if not chans:
log.runtime(f"No more channels for {chan.uid}") log.runtime(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None) self._peers.pop(uid, None)
# for (uid, cid) in self._contexts.copy(): # for (uid, cid) in self._contexts.copy():
# if chan.uid == uid: # if chan.uid == uid:
@ -626,11 +647,13 @@ class Actor:
log.runtime(f"Peers is {self._peers}") log.runtime(f"Peers is {self._peers}")
if not self._peers: # no more channels connected # No more channels to other actors (at all) registered
# as connected.
if not self._peers:
log.runtime("Signalling no more peer channels") log.runtime("Signalling no more peer channels")
self._no_more_peers.set() self._no_more_peers.set()
# # XXX: is this necessary (GC should do it?) # XXX: is this necessary (GC should do it)?
if chan.connected(): if chan.connected():
# if the channel is still connected it may mean the far # if the channel is still connected it may mean the far
# end has not closed and we may have gotten here due to # end has not closed and we may have gotten here due to
@ -665,7 +688,7 @@ class Actor:
ctx = self._contexts[(uid, cid)] ctx = self._contexts[(uid, cid)]
except KeyError: except KeyError:
log.warning( log.warning(
f'Ignoring msg from [no-longer/un]known context with {uid}:' f'Ignoring msg from [no-longer/un]known context {uid}:'
f'\n{msg}') f'\n{msg}')
return return
@ -813,7 +836,7 @@ class Actor:
shield: bool = False, shield: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> bool:
''' '''
Process messages for the channel async-RPC style. Process messages for the channel async-RPC style.
@ -839,7 +862,7 @@ class Actor:
if msg is None: # loop terminate sentinel if msg is None: # loop terminate sentinel
log.cancel( log.cancel(
f"Channerl to {chan.uid} terminated?\n" f"Channel to {chan.uid} terminated?\n"
"Cancelling all associated tasks..") "Cancelling all associated tasks..")
for (channel, cid) in self._rpc_tasks.copy(): for (channel, cid) in self._rpc_tasks.copy():
@ -986,6 +1009,9 @@ class Actor:
# up. # up.
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
# transport **was** disconnected
return True
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
if nursery_cancelled_before_task: if nursery_cancelled_before_task:
sn = self._service_n sn = self._service_n
@ -1010,6 +1036,9 @@ class Actor:
f"Exiting msg loop for {chan} from {chan.uid} " f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}") f"with last msg:\n{msg}")
# transport **was not** disconnected
return False
async def _from_parent( async def _from_parent(
self, self,
parent_addr: Optional[tuple[str, int]], parent_addr: Optional[tuple[str, int]],