Add an obnoxious error message on internal failures
parent
1706791313
commit
8fbdfd6a3a
|
@ -20,7 +20,7 @@ from async_generator import aclosing
|
||||||
|
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
from ._streaming import Context, _context
|
from ._streaming import Context, _context
|
||||||
from .log import get_console_log, get_logger
|
from .log import get_logger
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
pack_error,
|
pack_error,
|
||||||
unpack_error,
|
unpack_error,
|
||||||
|
@ -149,7 +149,7 @@ async def _invoke(
|
||||||
f"Task {func} was likely cancelled before it was started")
|
f"Task {func} was likely cancelled before it was started")
|
||||||
|
|
||||||
if not actor._rpc_tasks:
|
if not actor._rpc_tasks:
|
||||||
log.info(f"All RPC tasks have completed")
|
log.info("All RPC tasks have completed")
|
||||||
actor._ongoing_rpc_tasks.set()
|
actor._ongoing_rpc_tasks.set()
|
||||||
|
|
||||||
|
|
||||||
|
@ -339,7 +339,7 @@ class Actor:
|
||||||
|
|
||||||
if not self._peers: # no more channels connected
|
if not self._peers: # no more channels connected
|
||||||
self._no_more_peers.set()
|
self._no_more_peers.set()
|
||||||
log.debug(f"Signalling no more peer channels")
|
log.debug("Signalling no more peer channels")
|
||||||
|
|
||||||
# # XXX: is this necessary (GC should do it?)
|
# # XXX: is this necessary (GC should do it?)
|
||||||
if chan.connected():
|
if chan.connected():
|
||||||
|
@ -609,9 +609,18 @@ class Actor:
|
||||||
# killed (i.e. this actor is cancelled or signalled by the parent)
|
# killed (i.e. this actor is cancelled or signalled by the parent)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
if not registered_with_arbiter:
|
if not registered_with_arbiter:
|
||||||
|
# TODO: I guess we could try to connect back
|
||||||
|
# to the parent through a channel and engage a debugger
|
||||||
|
# once we have that all working with std streams locking?
|
||||||
log.exception(
|
log.exception(
|
||||||
f"Actor errored and failed to register with arbiter "
|
f"Actor errored and failed to register with arbiter "
|
||||||
f"@ {arbiter_addr}")
|
f"@ {arbiter_addr}?")
|
||||||
|
log.error(
|
||||||
|
"\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n"
|
||||||
|
"\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n"
|
||||||
|
"\tYOUR PARENT CODE IS GOING TO KEEP WORKING FINE!!!\n"
|
||||||
|
"\tTHIS IS HOW RELIABlE SYSTEMS ARE SUPPOSED TO WORK!?!?\n"
|
||||||
|
)
|
||||||
|
|
||||||
if self._parent_chan:
|
if self._parent_chan:
|
||||||
try:
|
try:
|
||||||
|
@ -629,6 +638,7 @@ class Actor:
|
||||||
# XXX wait, why?
|
# XXX wait, why?
|
||||||
# causes a hang if I always raise..
|
# causes a hang if I always raise..
|
||||||
# A parent process does something weird here?
|
# A parent process does something weird here?
|
||||||
|
# i'm so lost now..
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
@ -643,7 +653,7 @@ class Actor:
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Waiting for remaining peers {self._peers} to clear")
|
f"Waiting for remaining peers {self._peers} to clear")
|
||||||
await self._no_more_peers.wait()
|
await self._no_more_peers.wait()
|
||||||
log.debug(f"All peer channels are complete")
|
log.debug("All peer channels are complete")
|
||||||
|
|
||||||
# tear down channel server no matter what since we errored
|
# tear down channel server no matter what since we errored
|
||||||
# or completed
|
# or completed
|
||||||
|
@ -677,8 +687,8 @@ class Actor:
|
||||||
port=accept_port, host=accept_host,
|
port=accept_port, host=accept_host,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
log.debug(f"Started tcp server(s) on"
|
log.debug("Started tcp server(s) on"
|
||||||
" {[l.socket for l in listeners]}") # type: ignore
|
f" {[l.socket for l in listeners]}") # type: ignore
|
||||||
self._listeners.extend(listeners)
|
self._listeners.extend(listeners)
|
||||||
task_status.started()
|
task_status.started()
|
||||||
|
|
||||||
|
@ -794,6 +804,7 @@ class Actor:
|
||||||
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
|
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
|
||||||
return uid
|
return uid
|
||||||
|
|
||||||
|
|
||||||
class Arbiter(Actor):
|
class Arbiter(Actor):
|
||||||
"""A special actor who knows all the other actors and always has
|
"""A special actor who knows all the other actors and always has
|
||||||
access to a top level nursery.
|
access to a top level nursery.
|
||||||
|
@ -864,7 +875,7 @@ async def _start_actor(
|
||||||
port: int,
|
port: int,
|
||||||
arbiter_addr: Tuple[str, int],
|
arbiter_addr: Tuple[str, int],
|
||||||
nursery: trio.Nursery = None
|
nursery: trio.Nursery = None
|
||||||
):
|
) -> Any:
|
||||||
"""Spawn a local actor by starting a task to execute it's main async
|
"""Spawn a local actor by starting a task to execute it's main async
|
||||||
function.
|
function.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue