forked from goodboy/tractor
				
			Finally implement peer-lookup optimization..
There's a been a todo for soo long for this XD Since all `Actor`'s store a set of `._peers` we can try a lookup on that table as a shortcut before pinging the registry Bo Impl deats: - add a new `._discovery.get_peer_by_name()` routine which attempts the `._peers` lookup by combining a copy of that `dict` + an entry added for `Actor._parent_chan` (since all subs have a parent and often the desired contact is just that connection). - change `.find_actor()` (for the `only_first == True` case), `.query_actor()` and `.wait_for_actor()` to call the new helper and deliver appropriate outputs if possible. Other, - deprecate `get_arbiter()` def and all usage in tests and examples. - drop lingering use of `arbiter_sockaddr` arg to various routines. - tweak the `Actor` doc str as well as some code fmting and a tweak to the `._stream_handler()`'s initial `con_status: str` logging value since the way it was could never be reached.. oh and `.warning()` on any new connections which already have a `_pre_chan: Channel` entry in `._peers` so we can start minimizing IPC duplications.remotes/1757153874605917753/main
							parent
							
								
									46066c02e4
								
							
						
					
					
						commit
						32f7742e53
					
				|  | @ -9,7 +9,7 @@ async def main(service_name): | |||
|     async with tractor.open_nursery() as an: | ||||
|         await an.start_actor(service_name) | ||||
| 
 | ||||
|         async with tractor.get_arbiter('127.0.0.1', 1616) as portal: | ||||
|         async with tractor.get_registry('127.0.0.1', 1616) as portal: | ||||
|             print(f"Arbiter is listening on {portal.channel}") | ||||
| 
 | ||||
|         async with tractor.wait_for_actor(service_name) as sockaddr: | ||||
|  |  | |||
|  | @ -26,7 +26,7 @@ async def test_reg_then_unreg(reg_addr): | |||
|         portal = await n.start_actor('actor', enable_modules=[__name__]) | ||||
|         uid = portal.channel.uid | ||||
| 
 | ||||
|         async with tractor.get_arbiter(*reg_addr) as aportal: | ||||
|         async with tractor.get_registry(*reg_addr) as aportal: | ||||
|             # this local actor should be the arbiter | ||||
|             assert actor is aportal.actor | ||||
| 
 | ||||
|  | @ -160,7 +160,7 @@ async def spawn_and_check_registry( | |||
|     async with tractor.open_root_actor( | ||||
|         registry_addrs=[reg_addr], | ||||
|     ): | ||||
|         async with tractor.get_arbiter(*reg_addr) as portal: | ||||
|         async with tractor.get_registry(*reg_addr) as portal: | ||||
|             # runtime needs to be up to call this | ||||
|             actor = tractor.current_actor() | ||||
| 
 | ||||
|  | @ -298,7 +298,7 @@ async def close_chans_before_nursery( | |||
|     async with tractor.open_root_actor( | ||||
|         registry_addrs=[reg_addr], | ||||
|     ): | ||||
|         async with tractor.get_arbiter(*reg_addr) as aportal: | ||||
|         async with tractor.get_registry(*reg_addr) as aportal: | ||||
|             try: | ||||
|                 get_reg = partial(unpack_reg, aportal) | ||||
| 
 | ||||
|  |  | |||
|  | @ -38,7 +38,7 @@ async def test_self_is_registered_localportal(reg_addr): | |||
|     "Verify waiting on the arbiter to register itself using a local portal." | ||||
|     actor = tractor.current_actor() | ||||
|     assert actor.is_arbiter | ||||
|     async with tractor.get_arbiter(*reg_addr) as portal: | ||||
|     async with tractor.get_registry(*reg_addr) as portal: | ||||
|         assert isinstance(portal, tractor._portal.LocalPortal) | ||||
| 
 | ||||
|         with trio.fail_after(0.2): | ||||
|  |  | |||
|  | @ -32,7 +32,7 @@ def test_abort_on_sigint(daemon): | |||
| @tractor_test | ||||
| async def test_cancel_remote_arbiter(daemon, reg_addr): | ||||
|     assert not tractor.current_actor().is_arbiter | ||||
|     async with tractor.get_arbiter(*reg_addr) as portal: | ||||
|     async with tractor.get_registry(*reg_addr) as portal: | ||||
|         await portal.cancel_actor() | ||||
| 
 | ||||
|     time.sleep(0.1) | ||||
|  | @ -41,7 +41,7 @@ async def test_cancel_remote_arbiter(daemon, reg_addr): | |||
| 
 | ||||
|     # no arbiter socket should exist | ||||
|     with pytest.raises(OSError): | ||||
|         async with tractor.get_arbiter(*reg_addr) as portal: | ||||
|         async with tractor.get_registry(*reg_addr) as portal: | ||||
|             pass | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
|  | @ -31,7 +31,7 @@ from ._streaming import ( | |||
|     stream as stream, | ||||
| ) | ||||
| from ._discovery import ( | ||||
|     get_arbiter as get_arbiter, | ||||
|     get_registry as get_registry, | ||||
|     find_actor as find_actor, | ||||
|     wait_for_actor as wait_for_actor, | ||||
|     query_actor as query_actor, | ||||
|  |  | |||
|  | @ -26,8 +26,8 @@ from typing import ( | |||
|     TYPE_CHECKING, | ||||
| ) | ||||
| from contextlib import asynccontextmanager as acm | ||||
| import warnings | ||||
| 
 | ||||
| from tractor.log import get_logger | ||||
| from .trionics import gather_contexts | ||||
| from ._ipc import _connect_chan, Channel | ||||
| from ._portal import ( | ||||
|  | @ -40,11 +40,13 @@ from ._state import ( | |||
|     _runtime_vars, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from ._runtime import Actor | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def get_registry( | ||||
|     host: str, | ||||
|  | @ -56,14 +58,12 @@ async def get_registry( | |||
| ]: | ||||
|     ''' | ||||
|     Return a portal instance connected to a local or remote | ||||
|     arbiter. | ||||
|     registry-service actor; if a connection already exists re-use it | ||||
|     (presumably to call a `.register_actor()` registry runtime RPC | ||||
|     ep). | ||||
| 
 | ||||
|     ''' | ||||
|     actor = current_actor() | ||||
| 
 | ||||
|     if not actor: | ||||
|         raise RuntimeError("No actor instance has been defined yet?") | ||||
| 
 | ||||
|     actor: Actor = current_actor() | ||||
|     if actor.is_registrar: | ||||
|         # we're already the arbiter | ||||
|         # (likely a re-entrant call from the arbiter actor) | ||||
|  | @ -72,6 +72,8 @@ async def get_registry( | |||
|             Channel((host, port)) | ||||
|         ) | ||||
|     else: | ||||
|         # TODO: try to look pre-existing connection from | ||||
|         # `Actor._peers` and use it instead? | ||||
|         async with ( | ||||
|             _connect_chan(host, port) as chan, | ||||
|             open_portal(chan) as regstr_ptl, | ||||
|  | @ -80,19 +82,6 @@ async def get_registry( | |||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| # TODO: deprecate and this remove _arbiter form! | ||||
| @acm | ||||
| async def get_arbiter(*args, **kwargs): | ||||
|     warnings.warn( | ||||
|         '`tractor.get_arbiter()` is now deprecated!\n' | ||||
|         'Use `.get_registry()` instead!', | ||||
|         DeprecationWarning, | ||||
|         stacklevel=2, | ||||
|     ) | ||||
|     async with get_registry(*args, **kwargs) as to_yield: | ||||
|         yield to_yield | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def get_root( | ||||
|     **kwargs, | ||||
|  | @ -110,22 +99,53 @@ async def get_root( | |||
|         yield portal | ||||
| 
 | ||||
| 
 | ||||
| def get_peer_by_name( | ||||
|     name: str, | ||||
|     # uuid: str|None = None, | ||||
| 
 | ||||
| ) -> list[Channel]|None:  # at least 1 | ||||
|     ''' | ||||
|     Scan for an existing connection (set) to a named actor | ||||
|     and return any channels from `Actor._peers`. | ||||
| 
 | ||||
|     This is an optimization method over querying the registrar for | ||||
|     the same info. | ||||
| 
 | ||||
|     ''' | ||||
|     actor: Actor = current_actor() | ||||
|     to_scan: dict[tuple, list[Channel]] = actor._peers.copy() | ||||
|     pchan: Channel|None = actor._parent_chan | ||||
|     if pchan: | ||||
|         to_scan[pchan.uid].append(pchan) | ||||
| 
 | ||||
|     for aid, chans in to_scan.items(): | ||||
|         _, peer_name = aid | ||||
|         if name == peer_name: | ||||
|             if not chans: | ||||
|                 log.warning( | ||||
|                     'No IPC chans for matching peer {peer_name}\n' | ||||
|                 ) | ||||
|                 continue | ||||
|             return chans | ||||
| 
 | ||||
|     return None | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def query_actor( | ||||
|     name: str, | ||||
|     arbiter_sockaddr: tuple[str, int] | None = None, | ||||
|     regaddr: tuple[str, int] | None = None, | ||||
|     regaddr: tuple[str, int]|None = None, | ||||
| 
 | ||||
| ) -> AsyncGenerator[ | ||||
|     tuple[str, int] | None, | ||||
|     tuple[str, int]|None, | ||||
|     None, | ||||
| ]: | ||||
|     ''' | ||||
|     Make a transport address lookup for an actor name to a specific | ||||
|     registrar. | ||||
|     Lookup a transport address (by actor name) via querying a registrar | ||||
|     listening @ `regaddr`. | ||||
| 
 | ||||
|     Returns the (socket) address or ``None`` if no entry under that | ||||
|     name exists for the given registrar listening @ `regaddr`. | ||||
|     Returns the transport protocol (socket) address or `None` if no | ||||
|     entry under that name exists. | ||||
| 
 | ||||
|     ''' | ||||
|     actor: Actor = current_actor() | ||||
|  | @ -137,14 +157,10 @@ async def query_actor( | |||
|             'The current actor IS the registry!?' | ||||
|         ) | ||||
| 
 | ||||
|     if arbiter_sockaddr is not None: | ||||
|         warnings.warn( | ||||
|             '`tractor.query_actor(regaddr=<blah>)` is deprecated.\n' | ||||
|             'Use `registry_addrs: list[tuple]` instead!', | ||||
|             DeprecationWarning, | ||||
|             stacklevel=2, | ||||
|         ) | ||||
|         regaddr: list[tuple[str, int]] = arbiter_sockaddr | ||||
|     maybe_peers: list[Channel]|None = get_peer_by_name(name) | ||||
|     if maybe_peers: | ||||
|         yield maybe_peers[0].raddr | ||||
|         return | ||||
| 
 | ||||
|     reg_portal: Portal | ||||
|     regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0] | ||||
|  | @ -159,10 +175,28 @@ async def query_actor( | |||
|         yield sockaddr | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def maybe_open_portal( | ||||
|     addr: tuple[str, int], | ||||
|     name: str, | ||||
| ): | ||||
|     async with query_actor( | ||||
|         name=name, | ||||
|         regaddr=addr, | ||||
|     ) as sockaddr: | ||||
|         pass | ||||
| 
 | ||||
|     if sockaddr: | ||||
|         async with _connect_chan(*sockaddr) as chan: | ||||
|             async with open_portal(chan) as portal: | ||||
|                 yield portal | ||||
|     else: | ||||
|         yield None | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def find_actor( | ||||
|     name: str, | ||||
|     arbiter_sockaddr: tuple[str, int]|None = None, | ||||
|     registry_addrs: list[tuple[str, int]]|None = None, | ||||
| 
 | ||||
|     only_first: bool = True, | ||||
|  | @ -179,29 +213,12 @@ async def find_actor( | |||
|     known to the arbiter. | ||||
| 
 | ||||
|     ''' | ||||
|     if arbiter_sockaddr is not None: | ||||
|         warnings.warn( | ||||
|             '`tractor.find_actor(arbiter_sockaddr=<blah>)` is deprecated.\n' | ||||
|             'Use `registry_addrs: list[tuple]` instead!', | ||||
|             DeprecationWarning, | ||||
|             stacklevel=2, | ||||
|         ) | ||||
|         registry_addrs: list[tuple[str, int]] = [arbiter_sockaddr] | ||||
| 
 | ||||
|     @acm | ||||
|     async def maybe_open_portal_from_reg_addr( | ||||
|         addr: tuple[str, int], | ||||
|     ): | ||||
|         async with query_actor( | ||||
|             name=name, | ||||
|             regaddr=addr, | ||||
|         ) as sockaddr: | ||||
|             if sockaddr: | ||||
|                 async with _connect_chan(*sockaddr) as chan: | ||||
|                     async with open_portal(chan) as portal: | ||||
|                         yield portal | ||||
|             else: | ||||
|                 yield None | ||||
|     # optimization path, use any pre-existing peer channel | ||||
|     maybe_peers: list[Channel]|None = get_peer_by_name(name) | ||||
|     if maybe_peers and only_first: | ||||
|         async with open_portal(maybe_peers[0]) as peer_portal: | ||||
|             yield peer_portal | ||||
|             return | ||||
| 
 | ||||
|     if not registry_addrs: | ||||
|         # XXX NOTE: make sure to dynamically read the value on | ||||
|  | @ -217,10 +234,13 @@ async def find_actor( | |||
|     maybe_portals: list[ | ||||
|         AsyncContextManager[tuple[str, int]] | ||||
|     ] = list( | ||||
|         maybe_open_portal_from_reg_addr(addr) | ||||
|         maybe_open_portal( | ||||
|             addr=addr, | ||||
|             name=name, | ||||
|         ) | ||||
|         for addr in registry_addrs | ||||
|     ) | ||||
| 
 | ||||
|     portals: list[Portal] | ||||
|     async with gather_contexts( | ||||
|         mngrs=maybe_portals, | ||||
|     ) as portals: | ||||
|  | @ -254,31 +274,31 @@ async def find_actor( | |||
| @acm | ||||
| async def wait_for_actor( | ||||
|     name: str, | ||||
|     arbiter_sockaddr: tuple[str, int] | None = None, | ||||
|     registry_addr: tuple[str, int] | None = None, | ||||
| 
 | ||||
| ) -> AsyncGenerator[Portal, None]: | ||||
|     ''' | ||||
|     Wait on an actor to register with the arbiter. | ||||
| 
 | ||||
|     A portal to the first registered actor is returned. | ||||
|     Wait on at least one peer actor to register `name` with the | ||||
|     registrar, yield a `Portal to the first registree. | ||||
| 
 | ||||
|     ''' | ||||
|     actor: Actor = current_actor() | ||||
| 
 | ||||
|     if arbiter_sockaddr is not None: | ||||
|         warnings.warn( | ||||
|             '`tractor.wait_for_actor(arbiter_sockaddr=<foo>)` is deprecated.\n' | ||||
|             'Use `registry_addr: tuple` instead!', | ||||
|             DeprecationWarning, | ||||
|             stacklevel=2, | ||||
|         ) | ||||
|         registry_addr: tuple[str, int] = arbiter_sockaddr | ||||
|     # optimization path, use any pre-existing peer channel | ||||
|     maybe_peers: list[Channel]|None = get_peer_by_name(name) | ||||
|     if maybe_peers: | ||||
|         async with open_portal(maybe_peers[0]) as peer_portal: | ||||
|             yield peer_portal | ||||
|             return | ||||
| 
 | ||||
|     regaddr: tuple[str, int] = ( | ||||
|         registry_addr | ||||
|         or | ||||
|         actor.reg_addrs[0] | ||||
|     ) | ||||
|     # TODO: use `.trionics.gather_contexts()` like | ||||
|     # above in `find_actor()` as well? | ||||
|     reg_portal: Portal | ||||
|     regaddr: tuple[str, int] = registry_addr or actor.reg_addrs[0] | ||||
|     async with get_registry(*regaddr) as reg_portal: | ||||
|         sockaddrs = await reg_portal.run_from_ns( | ||||
|             'self', | ||||
|  |  | |||
|  | @ -111,25 +111,26 @@ class Actor: | |||
|     ''' | ||||
|     The fundamental "runtime" concurrency primitive. | ||||
| 
 | ||||
|     An *actor* is the combination of a regular Python process executing | ||||
|     a ``trio`` task tree, communicating with other actors through | ||||
|     "memory boundary portals" - which provide a native async API around | ||||
|     IPC transport "channels" which themselves encapsulate various | ||||
|     (swappable) network protocols. | ||||
|     An "actor" is the combination of a regular Python process | ||||
|     executing a `trio.run()` task tree, communicating with other | ||||
|     "actors" through "memory boundary portals": `Portal`, which | ||||
|     provide a high-level async API around IPC "channels" (`Channel`) | ||||
|     which themselves encapsulate various (swappable) network | ||||
|     transport protocols for sending msgs between said memory domains | ||||
|     (processes, hosts, non-GIL threads). | ||||
| 
 | ||||
| 
 | ||||
|     Each "actor" is ``trio.run()`` scheduled "runtime" composed of | ||||
|     many concurrent tasks in a single thread. The "runtime" tasks | ||||
|     conduct a slew of low(er) level functions to make it possible | ||||
|     for message passing between actors as well as the ability to | ||||
|     create new actors (aka new "runtimes" in new processes which | ||||
|     are supervised via a nursery construct). Each task which sends | ||||
|     messages to a task in a "peer" (not necessarily a parent-child, | ||||
|     Each "actor" is `trio.run()` scheduled "runtime" composed of many | ||||
|     concurrent tasks in a single thread. The "runtime" tasks conduct | ||||
|     a slew of low(er) level functions to make it possible for message | ||||
|     passing between actors as well as the ability to create new | ||||
|     actors (aka new "runtimes" in new processes which are supervised | ||||
|     via an "actor-nursery" construct). Each task which sends messages | ||||
|     to a task in a "peer" actor (not necessarily a parent-child, | ||||
|     depth hierarchy) is able to do so via an "address", which maps | ||||
|     IPC connections across memory boundaries, and a task request id | ||||
|     which allows for per-actor tasks to send and receive messages | ||||
|     to specific peer-actor tasks with which there is an ongoing | ||||
|     RPC/IPC dialog. | ||||
|     which allows for per-actor tasks to send and receive messages to | ||||
|     specific peer-actor tasks with which there is an ongoing RPC/IPC | ||||
|     dialog. | ||||
| 
 | ||||
|     ''' | ||||
|     # ugh, we need to get rid of this and replace with a "registry" sys | ||||
|  | @ -226,17 +227,20 @@ class Actor: | |||
|         # by the user (currently called the "arbiter") | ||||
|         self._spawn_method: str = spawn_method | ||||
| 
 | ||||
|         self._peers: defaultdict = defaultdict(list) | ||||
|         self._peers: defaultdict[ | ||||
|             str,  # uaid | ||||
|             list[Channel],  # IPC conns from peer | ||||
|         ] = defaultdict(list) | ||||
|         self._peer_connected: dict[tuple[str, str], trio.Event] = {} | ||||
|         self._no_more_peers = trio.Event() | ||||
|         self._no_more_peers.set() | ||||
| 
 | ||||
|         # RPC state | ||||
|         self._ongoing_rpc_tasks = trio.Event() | ||||
|         self._ongoing_rpc_tasks.set() | ||||
| 
 | ||||
|         # (chan, cid) -> (cancel_scope, func) | ||||
|         self._rpc_tasks: dict[ | ||||
|             tuple[Channel, str], | ||||
|             tuple[Context, Callable, trio.Event] | ||||
|             tuple[Channel, str],  # (chan, cid) | ||||
|             tuple[Context, Callable, trio.Event]  # (ctx=>, fn(), done?) | ||||
|         ] = {} | ||||
| 
 | ||||
|         # map {actor uids -> Context} | ||||
|  | @ -313,7 +317,10 @@ class Actor: | |||
|         event = self._peer_connected.setdefault(uid, trio.Event()) | ||||
|         await event.wait() | ||||
|         log.debug(f'{uid!r} successfully connected back to us') | ||||
|         return event, self._peers[uid][-1] | ||||
|         return ( | ||||
|             event, | ||||
|             self._peers[uid][-1], | ||||
|         ) | ||||
| 
 | ||||
|     def load_modules( | ||||
|         self, | ||||
|  | @ -404,32 +411,11 @@ class Actor: | |||
|         ''' | ||||
|         self._no_more_peers = trio.Event()  # unset by making new | ||||
|         chan = Channel.from_stream(stream) | ||||
|         their_uid: tuple[str, str]|None = chan.uid | ||||
|         if their_uid: | ||||
|             log.warning( | ||||
|                 f'Re-connection from already known {their_uid}' | ||||
|             ) | ||||
|         else: | ||||
|            log.runtime(f'New connection to us @{chan.raddr}') | ||||
| 
 | ||||
|         con_status: str = '' | ||||
| 
 | ||||
|         # TODO: remove this branch since can never happen? | ||||
|         # NOTE: `.uid` is only set after first contact | ||||
|         if their_uid: | ||||
|             con_status = ( | ||||
|                 'IPC Re-connection from already known peer?\n' | ||||
|             ) | ||||
|         else: | ||||
|             con_status = ( | ||||
|                 'New inbound IPC connection <=\n' | ||||
|             ) | ||||
| 
 | ||||
|         con_status += ( | ||||
|         con_status: str = ( | ||||
|             'New inbound IPC connection <=\n' | ||||
|             f'|_{chan}\n' | ||||
|             # f' |_@{chan.raddr}\n\n' | ||||
|             # ^-TODO-^ remove since alfready in chan.__repr__()? | ||||
|         ) | ||||
| 
 | ||||
|         # send/receive initial handshake response | ||||
|         try: | ||||
|             uid: tuple|None = await self._do_handshake(chan) | ||||
|  | @ -454,9 +440,22 @@ class Actor: | |||
|             ) | ||||
|             return | ||||
| 
 | ||||
|         familiar: str = 'new-peer' | ||||
|         if _pre_chan := self._peers.get(uid): | ||||
|             familiar: str = 'pre-existing-peer' | ||||
|         uid_short: str = f'{uid[0]}[{uid[1][-6:]}]' | ||||
|         con_status += ( | ||||
|             f' -> Handshake with actor `{uid[0]}[{uid[1][-6:]}]` complete\n' | ||||
|             f' -> Handshake with {familiar} `{uid_short}` complete\n' | ||||
|         ) | ||||
| 
 | ||||
|         if _pre_chan: | ||||
|             log.warning( | ||||
|             # con_status += ( | ||||
|             # ^TODO^ swap once we minimize conn duplication | ||||
|                 f' -> Wait, we already have IPC with `{uid_short}`??\n' | ||||
|                 f'   |_{_pre_chan}\n' | ||||
|             ) | ||||
| 
 | ||||
|         # IPC connection tracking for both peers and new children: | ||||
|         # - if this is a new channel to a locally spawned | ||||
|         #   sub-actor there will be a spawn wait even registered | ||||
|  | @ -1552,7 +1551,7 @@ class Actor: | |||
|     def accept_addr(self) -> tuple[str, int]: | ||||
|         ''' | ||||
|         Primary address to which the IPC transport server is | ||||
|         bound. | ||||
|         bound and listening for new connections. | ||||
| 
 | ||||
|         ''' | ||||
|         # throws OSError on failure | ||||
|  | @ -1569,6 +1568,7 @@ class Actor: | |||
|     def get_chans( | ||||
|         self, | ||||
|         uid: tuple[str, str], | ||||
| 
 | ||||
|     ) -> list[Channel]: | ||||
|         ''' | ||||
|         Return all IPC channels to the actor with provided `uid`. | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue