Emit warning on any `ContextCancelled.canceller == None`
							parent
							
								
									96150600fb
								
							
						
					
					
						commit
						27b750e907
					
				|  | @ -33,12 +33,15 @@ import exceptiongroup as eg | ||||||
| import trio | import trio | ||||||
| 
 | 
 | ||||||
| from ._state import current_actor | from ._state import current_actor | ||||||
|  | from .log import get_logger | ||||||
| 
 | 
 | ||||||
| if TYPE_CHECKING: | if TYPE_CHECKING: | ||||||
|     from ._context import Context |     from ._context import Context | ||||||
|     from ._stream import MsgStream |     from ._stream import MsgStream | ||||||
|     from .log import StackLevelAdapter |     from .log import StackLevelAdapter | ||||||
| 
 | 
 | ||||||
|  | log = get_logger('tractor') | ||||||
|  | 
 | ||||||
| _this_mod = importlib.import_module(__name__) | _this_mod = importlib.import_module(__name__) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -113,10 +116,35 @@ class ContextCancelled(RemoteActorError): | ||||||
|     ''' |     ''' | ||||||
|     @property |     @property | ||||||
|     def canceller(self) -> tuple[str, str]|None: |     def canceller(self) -> tuple[str, str]|None: | ||||||
|  |         ''' | ||||||
|  |         Return the (maybe) `Actor.uid` for the requesting-author | ||||||
|  |         of this ctxc. | ||||||
|  | 
 | ||||||
|  |         Emit a warning msg when `.canceller` has not been set, | ||||||
|  |         which usually idicates that a `None` msg-loop setinel was | ||||||
|  |         sent before expected in the runtime. This can happen in | ||||||
|  |         a few situations: | ||||||
|  | 
 | ||||||
|  |         - (simulating) an IPC transport network outage | ||||||
|  |         - a (malicious) pkt sent specifically to cancel an actor's | ||||||
|  |           runtime non-gracefully without ensuring ongoing RPC tasks are  | ||||||
|  |           incrementally cancelled as is done with: | ||||||
|  |           `Actor` | ||||||
|  |           |_`.cancel()` | ||||||
|  |           |_`.cancel_soon()` | ||||||
|  |           |_`._cancel_task()` | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|         value = self.msgdata.get('canceller') |         value = self.msgdata.get('canceller') | ||||||
|         if value: |         if value: | ||||||
|             return tuple(value) |             return tuple(value) | ||||||
| 
 | 
 | ||||||
|  |         log.warning( | ||||||
|  |             'IPC Context cancelled without a requesting actor?\n' | ||||||
|  |             'Maybe the IPC transport ended abruptly?\n\n' | ||||||
|  |             f'{self}' | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| class TransportClosed(trio.ClosedResourceError): | class TransportClosed(trio.ClosedResourceError): | ||||||
|     "Underlying channel transport was closed prior to use" |     "Underlying channel transport was closed prior to use" | ||||||
|  | @ -199,7 +227,6 @@ def pack_error( | ||||||
|     ): |     ): | ||||||
|         error_msg.update(exc.msgdata) |         error_msg.update(exc.msgdata) | ||||||
| 
 | 
 | ||||||
| 
 |  | ||||||
|     pkt: dict = {'error': error_msg} |     pkt: dict = {'error': error_msg} | ||||||
|     if cid: |     if cid: | ||||||
|         pkt['cid'] = cid |         pkt['cid'] = cid | ||||||
|  | @ -349,8 +376,8 @@ def _raise_from_no_key_in_msg( | ||||||
|         # raise a ``StopAsyncIteration`` **and** in our catch |         # raise a ``StopAsyncIteration`` **and** in our catch | ||||||
|         # block below it will trigger ``.aclose()``. |         # block below it will trigger ``.aclose()``. | ||||||
|         raise trio.EndOfChannel( |         raise trio.EndOfChannel( | ||||||
|             f'Context stream ended due to msg:\n' |             f'Context stream ended due to msg:\n\n' | ||||||
|             f'{pformat(msg)}' |             f'{pformat(msg)}\n' | ||||||
|         ) from src_err |         ) from src_err | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue