Accept transport closed error during handshake and msg loop
							parent
							
								
									d8dcee3713
								
							
						
					
					
						commit
						700f09ce9b
					
				|  | @ -27,6 +27,7 @@ from ._exceptions import ( | |||
|     unpack_error, | ||||
|     ModuleNotExposed, | ||||
|     is_multi_cancelled, | ||||
|     TransportClosed, | ||||
| ) | ||||
| from . import _debug | ||||
| from ._discovery import get_arbiter | ||||
|  | @ -202,7 +203,7 @@ class Actor: | |||
|         enable_modules: List[str] = [], | ||||
|         uid: str = None, | ||||
|         loglevel: str = None, | ||||
|         arbiter_addr: Optional[Tuple[str, int]] = None, | ||||
|         arbiter_addr: Optional[Tuple[str, int]] = (None, None), | ||||
|         spawn_method: Optional[str] = None | ||||
|     ) -> None: | ||||
|         """This constructor is called in the parent actor **before** the spawning | ||||
|  | @ -232,7 +233,7 @@ class Actor: | |||
|         # TODO: consider making this a dynamically defined | ||||
|         # @dataclass once we get py3.7 | ||||
|         self.loglevel = loglevel | ||||
|         self._arb_addr = tuple(arbiter_addr) if arbiter_addr is not None else (None, None) | ||||
|         self._arb_addr = tuple(arbiter_addr) | ||||
| 
 | ||||
|         # marked by the process spawning backend at startup | ||||
|         # will be None for the parent most process started manually | ||||
|  | @ -338,7 +339,18 @@ class Actor: | |||
|         # send/receive initial handshake response | ||||
|         try: | ||||
|             uid = await self._do_handshake(chan) | ||||
|         except StopAsyncIteration: | ||||
| 
 | ||||
|         except ( | ||||
|             trio.BrokenResourceError, | ||||
|             trio.ClosedResourceError, | ||||
|             TransportClosed, | ||||
|         ): | ||||
|             # XXX: This may propagate up from ``Channel._aiter_recv()`` | ||||
|             # and ``MsgpackStream._inter_packets()`` on a read from the | ||||
|             # stream particularly when the runtime is first starting up | ||||
|             # inside ``open_root_actor()`` where there is a check for | ||||
|             # a bound listener on the "arbiter" addr.  the reset will be | ||||
|             # because the handshake was never meant took place. | ||||
|             log.warning(f"Channel {chan} failed to handshake") | ||||
|             return | ||||
| 
 | ||||
|  | @ -579,22 +591,39 @@ class Actor: | |||
|                     ) | ||||
|                     await self.cancel_rpc_tasks(chan) | ||||
| 
 | ||||
|         except ( | ||||
|             TransportClosed, | ||||
|             trio.BrokenResourceError, | ||||
|             trio.ClosedResourceError | ||||
|         ): | ||||
|             # channels "breaking" is ok since we don't have a teardown | ||||
|             # handshake for them (yet) and instead we simply bail out | ||||
|             # of the message loop and expect the teardown sequence | ||||
|             # to clean up. | ||||
|             log.error(f"{chan} form {chan.uid} closed abruptly") | ||||
|             # raise | ||||
| 
 | ||||
|         except trio.ClosedResourceError: | ||||
|             log.error(f"{chan} form {chan.uid} broke") | ||||
| 
 | ||||
|         except (Exception, trio.MultiError) as err: | ||||
|             # ship any "internal" exception (i.e. one from internal machinery | ||||
|             # not from an rpc task) to parent | ||||
|             log.exception("Actor errored:") | ||||
|             if self._parent_chan: | ||||
|                 await self._parent_chan.send(pack_error(err)) | ||||
|             raise | ||||
| 
 | ||||
|             # if this is the `MainProcess` we expect the error broadcasting | ||||
|             # above to trigger an error at consuming portal "checkpoints" | ||||
|             raise | ||||
| 
 | ||||
|         except trio.Cancelled: | ||||
|             # debugging only | ||||
|             log.debug(f"Msg loop was cancelled for {chan}") | ||||
|             raise | ||||
| 
 | ||||
|         finally: | ||||
|             # msg debugging for when he machinery is brokey | ||||
|             log.debug( | ||||
|                 f"Exiting msg loop for {chan} from {chan.uid} " | ||||
|                 f"with last msg:\n{msg}") | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue