Compare commits
	
		
			7 Commits 
		
	
	
		
			main
			...
			dereg_on_o
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | c9fda3ff2c | |
|  | 11acdf8625 | |
|  | 0c2fb98d5e | |
|  | 8b82808fcd | |
|  | c93fbcc9f7 | |
|  | c089c8d0d6 | |
|  | 9f5b112d19 | 
|  | @ -1,6 +1,7 @@ | |||
| """ | ||||
| Actor "discovery" testing | ||||
| """ | ||||
| ''' | ||||
| Discovery subsystem via a "registrar" actor scenarios. | ||||
| 
 | ||||
| ''' | ||||
| import os | ||||
| import signal | ||||
| import platform | ||||
|  | @ -149,7 +150,10 @@ async def unpack_reg(actor_or_portal): | |||
|     else: | ||||
|         msg = await actor_or_portal.run_from_ns('self', 'get_registry') | ||||
| 
 | ||||
|     return {tuple(key.split('.')): val for key, val in msg.items()} | ||||
|     return { | ||||
|         tuple(key.split('.')): val | ||||
|         for key, val in msg.items() | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| async def spawn_and_check_registry( | ||||
|  | @ -325,20 +329,24 @@ async def close_chans_before_nursery( | |||
|             try: | ||||
|                 get_reg = partial(unpack_reg, aportal) | ||||
| 
 | ||||
|                 async with tractor.open_nursery() as tn: | ||||
|                     portal1 = await tn.start_actor( | ||||
|                         name='consumer1', enable_modules=[__name__]) | ||||
|                     portal2 = await tn.start_actor( | ||||
|                         'consumer2', enable_modules=[__name__]) | ||||
|                 async with tractor.open_nursery() as an: | ||||
|                     portal1 = await an.start_actor( | ||||
|                         name='consumer1', | ||||
|                         enable_modules=[__name__], | ||||
|                     ) | ||||
|                     portal2 = await an.start_actor( | ||||
|                         'consumer2', | ||||
|                         enable_modules=[__name__], | ||||
|                     ) | ||||
| 
 | ||||
|                     # TODO: compact this back as was in last commit once | ||||
|                     # 3.9+, see https://github.com/goodboy/tractor/issues/207 | ||||
|                     async with portal1.open_stream_from( | ||||
|                         stream_forever | ||||
|                     ) as agen1: | ||||
|                         async with portal2.open_stream_from( | ||||
|                     async with ( | ||||
|                         portal1.open_stream_from( | ||||
|                             stream_forever | ||||
|                         ) as agen2: | ||||
|                         ) as agen1, | ||||
|                         portal2.open_stream_from( | ||||
|                             stream_forever | ||||
|                         ) as agen2, | ||||
|                     ): | ||||
|                             async with ( | ||||
|                                 collapse_eg(), | ||||
|                                 trio.open_nursery() as tn, | ||||
|  | @ -361,6 +369,7 @@ async def close_chans_before_nursery( | |||
|                                     # also kill off channels cuz why not | ||||
|                                     await agen1.aclose() | ||||
|                                     await agen2.aclose() | ||||
| 
 | ||||
|             finally: | ||||
|                 with trio.CancelScope(shield=True): | ||||
|                     await trio.sleep(1) | ||||
|  | @ -378,10 +387,12 @@ def test_close_channel_explicit( | |||
|     use_signal, | ||||
|     reg_addr, | ||||
| ): | ||||
|     """Verify that closing a stream explicitly and killing the actor's | ||||
|     ''' | ||||
|     Verify that closing a stream explicitly and killing the actor's | ||||
|     "root nursery" **before** the containing nursery tears down also | ||||
|     results in subactor(s) deregistering from the arbiter. | ||||
|     """ | ||||
| 
 | ||||
|     ''' | ||||
|     with pytest.raises(KeyboardInterrupt): | ||||
|         trio.run( | ||||
|             partial( | ||||
|  | @ -394,16 +405,18 @@ def test_close_channel_explicit( | |||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize('use_signal', [False, True]) | ||||
| def test_close_channel_explicit_remote_arbiter( | ||||
| def test_close_channel_explicit_remote_registrar( | ||||
|     daemon: subprocess.Popen, | ||||
|     start_method, | ||||
|     use_signal, | ||||
|     reg_addr, | ||||
| ): | ||||
|     """Verify that closing a stream explicitly and killing the actor's | ||||
|     ''' | ||||
|     Verify that closing a stream explicitly and killing the actor's | ||||
|     "root nursery" **before** the containing nursery tears down also | ||||
|     results in subactor(s) deregistering from the arbiter. | ||||
|     """ | ||||
| 
 | ||||
|     ''' | ||||
|     with pytest.raises(KeyboardInterrupt): | ||||
|         trio.run( | ||||
|             partial( | ||||
|  | @ -413,3 +426,61 @@ def test_close_channel_explicit_remote_arbiter( | |||
|                 remote_arbiter=True, | ||||
|             ), | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def kill_transport( | ||||
|     ctx: tractor.Context, | ||||
| ) -> None: | ||||
| 
 | ||||
|     await ctx.started() | ||||
|     actor: tractor.Actor = tractor.current_actor() | ||||
|     actor.ipc_server.cancel() | ||||
|     await trio.sleep_forever() | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| # @pytest.mark.parametrize('use_signal', [False, True]) | ||||
| def test_stale_entry_is_deleted( | ||||
|     debug_mode: bool, | ||||
|     daemon: subprocess.Popen, | ||||
|     start_method: str, | ||||
|     reg_addr: tuple, | ||||
| ): | ||||
|     ''' | ||||
|     Ensure that when a stale entry is detected in the registrar's | ||||
|     table that the `find_actor()` API takes care of deleting the | ||||
|     stale entry and not delivering a bad portal. | ||||
| 
 | ||||
|     ''' | ||||
|     async def main(): | ||||
| 
 | ||||
|         name: str = 'transport_fails_actor' | ||||
|         _reg_ptl: tractor.Portal | ||||
|         an: tractor.ActorNursery | ||||
|         async with ( | ||||
|             tractor.open_nursery( | ||||
|                 debug_mode=debug_mode, | ||||
|             ) as an, | ||||
|             tractor.get_registry(reg_addr) as _reg_ptl, | ||||
|         ): | ||||
|             ptl: tractor.Portal = await an.start_actor( | ||||
|                 name, | ||||
|                 enable_modules=[__name__], | ||||
|             ) | ||||
|             async with ptl.open_context( | ||||
|                 kill_transport, | ||||
|             ) as (first, ctx): | ||||
|                 async with tractor.find_actor(name) as maybe_portal: | ||||
|                     # because the transitive | ||||
|                     # `._discovery.maybe_open_portal()` call should | ||||
|                     # fail and implicitly call `.delete_addr()` | ||||
|                     assert maybe_portal is None | ||||
|                     registry: dict = await unpack_reg(_reg_ptl) | ||||
|                     assert ptl.chan.aid.uid not in registry | ||||
| 
 | ||||
|                 # should fail since we knocked out the IPC tpt XD | ||||
|                 await ptl.cancel_actor() | ||||
|                 await an.cancel() | ||||
| 
 | ||||
|     trio.run(main) | ||||
|  |  | |||
|  | @ -60,7 +60,7 @@ log = get_logger(__name__) | |||
| async def get_registry( | ||||
|     addr: UnwrappedAddress|None = None, | ||||
| ) -> AsyncGenerator[ | ||||
|     Portal | LocalPortal | None, | ||||
|     Portal|LocalPortal|None, | ||||
|     None, | ||||
| ]: | ||||
|     ''' | ||||
|  | @ -150,7 +150,7 @@ async def query_actor( | |||
|     regaddr: UnwrappedAddress|None = None, | ||||
| 
 | ||||
| ) -> AsyncGenerator[ | ||||
|     UnwrappedAddress|None, | ||||
|     tuple[UnwrappedAddress|None, Portal|None], | ||||
|     None, | ||||
| ]: | ||||
|     ''' | ||||
|  | @ -164,7 +164,8 @@ async def query_actor( | |||
|     actor: Actor = current_actor() | ||||
|     if ( | ||||
|         name == 'registrar' | ||||
|         and actor.is_registrar | ||||
|         and | ||||
|         actor.is_registrar | ||||
|     ): | ||||
|         raise RuntimeError( | ||||
|             'The current actor IS the registry!?' | ||||
|  | @ -172,7 +173,7 @@ async def query_actor( | |||
| 
 | ||||
|     maybe_peers: list[Channel]|None = get_peer_by_name(name) | ||||
|     if maybe_peers: | ||||
|         yield maybe_peers[0].raddr | ||||
|         yield maybe_peers[0].raddr, None | ||||
|         return | ||||
| 
 | ||||
|     reg_portal: Portal | ||||
|  | @ -185,8 +186,7 @@ async def query_actor( | |||
|             'find_actor', | ||||
|             name=name, | ||||
|         ) | ||||
|         yield addr | ||||
| 
 | ||||
|         yield addr, reg_portal | ||||
| 
 | ||||
| @acm | ||||
| async def maybe_open_portal( | ||||
|  | @ -196,15 +196,37 @@ async def maybe_open_portal( | |||
|     async with query_actor( | ||||
|         name=name, | ||||
|         regaddr=addr, | ||||
|     ) as addr: | ||||
|         pass | ||||
|     ) as (addr, reg_portal): | ||||
|         if not addr: | ||||
|             yield None | ||||
|             return | ||||
| 
 | ||||
|     if addr: | ||||
|         async with _connect_chan(addr) as chan: | ||||
|             async with open_portal(chan) as portal: | ||||
|                 yield portal | ||||
|     else: | ||||
|         yield None | ||||
|         try: | ||||
|             async with _connect_chan(addr) as chan: | ||||
|                 async with open_portal(chan) as portal: | ||||
|                     yield portal | ||||
| 
 | ||||
|         # most likely we were unable to connect the | ||||
|         # transport and there is likely a stale entry in | ||||
|         # the registry actor's table, thus we need to | ||||
|         # instruct it to clear that stale entry and then | ||||
|         # more silently (pretend there was no reason but | ||||
|         # to) indicate that the target actor can't be | ||||
|         # contacted at that addr. | ||||
|         except OSError: | ||||
|             # NOTE: ensure we delete the stale entry from the | ||||
|             # registar actor. | ||||
|             uid: tuple[str, str] = await reg_portal.run_from_ns( | ||||
|                 'self', | ||||
|                 'delete_addr', | ||||
|                 addr=addr, | ||||
|             ) | ||||
|             log.warning( | ||||
|                 f'Deleted stale registry entry !\n' | ||||
|                 f'addr: {addr!r}\n' | ||||
|                 f'uid: {uid!r}\n' | ||||
|             ) | ||||
|             yield None | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
|  | @ -272,7 +294,7 @@ async def find_actor( | |||
|         if not any(portals): | ||||
|             if raise_on_none: | ||||
|                 raise RuntimeError( | ||||
|                     f'No actor "{name}" found registered @ {registry_addrs}' | ||||
|                     f'No actor {name!r} found registered @ {registry_addrs!r}' | ||||
|                 ) | ||||
|             yield None | ||||
|             return | ||||
|  |  | |||
|  | @ -68,6 +68,7 @@ import textwrap | |||
| from types import ModuleType | ||||
| import warnings | ||||
| 
 | ||||
| from bidict import bidict | ||||
| import trio | ||||
| from trio._core import _run as trio_runtime | ||||
| from trio import ( | ||||
|  | @ -1879,10 +1880,10 @@ class Arbiter(Actor): | |||
|         **kwargs, | ||||
|     ) -> None: | ||||
| 
 | ||||
|         self._registry: dict[ | ||||
|         self._registry: bidict[ | ||||
|             tuple[str, str], | ||||
|             UnwrappedAddress, | ||||
|         ] = {} | ||||
|         ] = bidict({}) | ||||
|         self._waiters: dict[ | ||||
|             str, | ||||
|             # either an event to sync to receiving an actor uid (which | ||||
|  | @ -1971,7 +1972,8 @@ class Arbiter(Actor): | |||
|             # should never be 0-dynamic-os-alloc | ||||
|             await debug.pause() | ||||
| 
 | ||||
|         self._registry[uid] = addr | ||||
|         # XXX NOTE, value must also be hashable. | ||||
|         self._registry[uid] = tuple(addr) | ||||
| 
 | ||||
|         # pop and signal all waiter events | ||||
|         events = self._waiters.pop(name, []) | ||||
|  | @ -1988,4 +1990,26 @@ class Arbiter(Actor): | |||
|         uid = (str(uid[0]), str(uid[1])) | ||||
|         entry: tuple = self._registry.pop(uid, None) | ||||
|         if entry is None: | ||||
|             log.warning(f'Request to de-register {uid} failed?') | ||||
|             log.warning( | ||||
|                 f'Request to de-register {uid!r} failed?' | ||||
|             ) | ||||
| 
 | ||||
|     async def delete_addr( | ||||
|         self, | ||||
|         addr: tuple[str, int|str], | ||||
|     ) -> tuple[str, str]: | ||||
|         uid: tuple | None = self._registry.inverse.pop( | ||||
|             addr, | ||||
|             None, | ||||
|         ) | ||||
|         if uid: | ||||
|             report: str = 'Deleting registry-entry for,\n' | ||||
|         else: | ||||
|             report: str = 'No registry entry for,\n' | ||||
| 
 | ||||
|         log.warning( | ||||
|             report | ||||
|             + | ||||
|             f'{addr!r}@{uid!r}' | ||||
|         ) | ||||
|         return uid | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue