Make `Actor._process_messages()` report disconnects
The method now returns a `bool` which flags whether the transport died to the caller and allows for reporting a disconnect in the channel-transport handler task. This is something a user will normally want to know about on the caller side especially after seeing a traceback from the peer (if in tree) on console.sigint_ignore_in_pdb_repl
parent
02210d8f8e
commit
fc90e1f171
|
@ -200,7 +200,7 @@ async def _invoke(
|
||||||
ctx = actor._contexts.pop((chan.uid, cid))
|
ctx = actor._contexts.pop((chan.uid, cid))
|
||||||
if ctx:
|
if ctx:
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Context entrypoint for {func} was terminated:\n{ctx}'
|
f'Context entrypoint {func} was terminated:\n{ctx}'
|
||||||
)
|
)
|
||||||
|
|
||||||
assert cs
|
assert cs
|
||||||
|
@ -316,7 +316,9 @@ async def try_ship_error_to_parent(
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
):
|
):
|
||||||
log.error(
|
# in SC terms this is one of the worst things that can
|
||||||
|
# happen and creates the 2-general's dilemma.
|
||||||
|
log.critical(
|
||||||
f"Failed to ship error to parent "
|
f"Failed to ship error to parent "
|
||||||
f"{channel.uid}, channel was closed"
|
f"{channel.uid}, channel was closed"
|
||||||
)
|
)
|
||||||
|
@ -560,33 +562,49 @@ class Actor:
|
||||||
# append new channel
|
# append new channel
|
||||||
self._peers[uid].append(chan)
|
self._peers[uid].append(chan)
|
||||||
|
|
||||||
|
local_nursery: Optional['ActorNursery'] = None # noqa
|
||||||
|
|
||||||
# Begin channel management - respond to remote requests and
|
# Begin channel management - respond to remote requests and
|
||||||
# process received reponses.
|
# process received reponses.
|
||||||
try:
|
try:
|
||||||
await self._process_messages(chan)
|
disconnected = await self._process_messages(chan)
|
||||||
|
|
||||||
except trio.Cancelled:
|
except trio.Cancelled:
|
||||||
log.cancel(f"Msg loop was cancelled for {chan}")
|
log.cancel(f"Msg loop was cancelled for {chan}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
local_nursery = self._actoruid2nursery.get(uid, local_nursery)
|
||||||
|
|
||||||
# This is set in ``Portal.cancel_actor()``. So if
|
# This is set in ``Portal.cancel_actor()``. So if
|
||||||
# the peer was cancelled we try to wait for them
|
# the peer was cancelled we try to wait for them
|
||||||
# to tear down their side of the connection before
|
# to tear down their side of the connection before
|
||||||
# moving on with closing our own side.
|
# moving on with closing our own side.
|
||||||
local_nursery = self._actoruid2nursery.get(chan.uid)
|
|
||||||
if (
|
if (
|
||||||
local_nursery
|
local_nursery
|
||||||
):
|
):
|
||||||
|
if disconnected:
|
||||||
|
# if the transport died and this actor is still
|
||||||
|
# registered within a local nursery, we report that the
|
||||||
|
# IPC layer may have failed unexpectedly since it may be
|
||||||
|
# the cause of other downstream errors.
|
||||||
|
entry = local_nursery._children.get(uid)
|
||||||
|
if entry:
|
||||||
|
_, proc, _ = entry
|
||||||
|
if proc.poll() is not None:
|
||||||
|
log.error('Actor {uid} proc died and IPC broke?')
|
||||||
|
else:
|
||||||
|
log.error(f'Actor {uid} IPC connection broke!?')
|
||||||
|
|
||||||
log.cancel(f"Waiting on cancel request to peer {chan.uid}")
|
log.cancel(f"Waiting on cancel request to peer {chan.uid}")
|
||||||
# XXX: this is a soft wait on the channel (and its
|
# XXX: this is a soft wait on the channel (and its
|
||||||
# underlying transport protocol) to close from the remote
|
# underlying transport protocol) to close from the
|
||||||
# peer side since we presume that any channel which
|
# remote peer side since we presume that any channel
|
||||||
# is mapped to a sub-actor (i.e. it's managed by
|
# which is mapped to a sub-actor (i.e. it's managed by
|
||||||
# one of our local nurseries)
|
# one of our local nurseries) has a message is sent to
|
||||||
# message is sent to the peer likely by this actor which is
|
# the peer likely by this actor (which is now in
|
||||||
# now in a cancelled condition) when the local runtime here
|
# a cancelled condition) when the local runtime here is
|
||||||
# is now cancelled while (presumably) in the middle of msg
|
# now cancelled while (presumably) in the middle of msg
|
||||||
# loop processing.
|
# loop processing.
|
||||||
with trio.move_on_after(0.5) as cs:
|
with trio.move_on_after(0.5) as cs:
|
||||||
cs.shield = True
|
cs.shield = True
|
||||||
|
@ -609,16 +627,19 @@ class Actor:
|
||||||
|
|
||||||
await local_nursery.exited.wait()
|
await local_nursery.exited.wait()
|
||||||
|
|
||||||
|
# if local_nursery._children
|
||||||
|
|
||||||
# ``Channel`` teardown and closure sequence
|
# ``Channel`` teardown and closure sequence
|
||||||
|
|
||||||
# Drop ref to channel so it can be gc-ed and disconnected
|
# Drop ref to channel so it can be gc-ed and disconnected
|
||||||
log.runtime(f"Releasing channel {chan} from {chan.uid}")
|
log.runtime(f"Releasing channel {chan} from {chan.uid}")
|
||||||
chans = self._peers.get(chan.uid)
|
chans = self._peers.get(chan.uid)
|
||||||
chans.remove(chan)
|
chans.remove(chan)
|
||||||
|
uid = chan.uid
|
||||||
|
|
||||||
if not chans:
|
if not chans:
|
||||||
log.runtime(f"No more channels for {chan.uid}")
|
log.runtime(f"No more channels for {chan.uid}")
|
||||||
self._peers.pop(chan.uid, None)
|
self._peers.pop(uid, None)
|
||||||
|
|
||||||
# for (uid, cid) in self._contexts.copy():
|
# for (uid, cid) in self._contexts.copy():
|
||||||
# if chan.uid == uid:
|
# if chan.uid == uid:
|
||||||
|
@ -626,11 +647,13 @@ class Actor:
|
||||||
|
|
||||||
log.runtime(f"Peers is {self._peers}")
|
log.runtime(f"Peers is {self._peers}")
|
||||||
|
|
||||||
if not self._peers: # no more channels connected
|
# No more channels to other actors (at all) registered
|
||||||
|
# as connected.
|
||||||
|
if not self._peers:
|
||||||
log.runtime("Signalling no more peer channels")
|
log.runtime("Signalling no more peer channels")
|
||||||
self._no_more_peers.set()
|
self._no_more_peers.set()
|
||||||
|
|
||||||
# # XXX: is this necessary (GC should do it?)
|
# XXX: is this necessary (GC should do it)?
|
||||||
if chan.connected():
|
if chan.connected():
|
||||||
# if the channel is still connected it may mean the far
|
# if the channel is still connected it may mean the far
|
||||||
# end has not closed and we may have gotten here due to
|
# end has not closed and we may have gotten here due to
|
||||||
|
@ -665,7 +688,7 @@ class Actor:
|
||||||
ctx = self._contexts[(uid, cid)]
|
ctx = self._contexts[(uid, cid)]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Ignoring msg from [no-longer/un]known context with {uid}:'
|
f'Ignoring msg from [no-longer/un]known context {uid}:'
|
||||||
f'\n{msg}')
|
f'\n{msg}')
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -813,7 +836,7 @@ class Actor:
|
||||||
shield: bool = False,
|
shield: bool = False,
|
||||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> bool:
|
||||||
'''
|
'''
|
||||||
Process messages for the channel async-RPC style.
|
Process messages for the channel async-RPC style.
|
||||||
|
|
||||||
|
@ -839,7 +862,7 @@ class Actor:
|
||||||
if msg is None: # loop terminate sentinel
|
if msg is None: # loop terminate sentinel
|
||||||
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"Channerl to {chan.uid} terminated?\n"
|
f"Channel to {chan.uid} terminated?\n"
|
||||||
"Cancelling all associated tasks..")
|
"Cancelling all associated tasks..")
|
||||||
|
|
||||||
for (channel, cid) in self._rpc_tasks.copy():
|
for (channel, cid) in self._rpc_tasks.copy():
|
||||||
|
@ -986,6 +1009,9 @@ class Actor:
|
||||||
# up.
|
# up.
|
||||||
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
|
log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
|
||||||
|
|
||||||
|
# transport **was** disconnected
|
||||||
|
return True
|
||||||
|
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
if nursery_cancelled_before_task:
|
if nursery_cancelled_before_task:
|
||||||
sn = self._service_n
|
sn = self._service_n
|
||||||
|
@ -1010,6 +1036,9 @@ class Actor:
|
||||||
f"Exiting msg loop for {chan} from {chan.uid} "
|
f"Exiting msg loop for {chan} from {chan.uid} "
|
||||||
f"with last msg:\n{msg}")
|
f"with last msg:\n{msg}")
|
||||||
|
|
||||||
|
# transport **was not** disconnected
|
||||||
|
return False
|
||||||
|
|
||||||
async def _from_parent(
|
async def _from_parent(
|
||||||
self,
|
self,
|
||||||
parent_addr: Optional[tuple[str, int]],
|
parent_addr: Optional[tuple[str, int]],
|
||||||
|
|
Loading…
Reference in New Issue