forked from goodboy/tractor
				
			Map broken stream errs to transport closed; msgspec seems to be racy
							parent
							
								
									562419c907
								
							
						
					
					
						commit
						1ab495a64d
					
				| 
						 | 
				
			
			@ -430,7 +430,10 @@ class Actor:
 | 
			
		|||
            uid = await self._do_handshake(chan)
 | 
			
		||||
 | 
			
		||||
        except (
 | 
			
		||||
            # we need this for ``msgspec`` for some reason?
 | 
			
		||||
            # for now, it's been put in the stream backend.
 | 
			
		||||
            # trio.BrokenResourceError,
 | 
			
		||||
 | 
			
		||||
            # trio.ClosedResourceError,
 | 
			
		||||
            TransportClosed,
 | 
			
		||||
        ):
 | 
			
		||||
| 
						 | 
				
			
			@ -797,7 +800,7 @@ class Actor:
 | 
			
		|||
                        # XXX: msgspec doesn't support serializing tuples
 | 
			
		||||
                        # so just cash manually here since it's what our
 | 
			
		||||
                        # internals expect.
 | 
			
		||||
                        address: Tuple[str, int] = tuple(value)
 | 
			
		||||
                        address: Tuple[str, int] = tuple(value) if value else value
 | 
			
		||||
                        self._arb_addr = address
 | 
			
		||||
 | 
			
		||||
                    else:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -148,7 +148,14 @@ class MsgspecTCPStream(MsgpackTCPStream):
 | 
			
		|||
            try:
 | 
			
		||||
                header = await self.recv_stream.receive_exactly(4)
 | 
			
		||||
 | 
			
		||||
            except (ValueError):
 | 
			
		||||
            except (
 | 
			
		||||
                ValueError,
 | 
			
		||||
 | 
			
		||||
                # not sure entirely why we need this but without it we
 | 
			
		||||
                # seem to be getting racy failures here on
 | 
			
		||||
                # arbiter/registry name subs..
 | 
			
		||||
                trio.BrokenResourceError,
 | 
			
		||||
            ):
 | 
			
		||||
                raise TransportClosed(
 | 
			
		||||
                    f'transport {self} was already closed prior ro read'
 | 
			
		||||
                )
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue