From 31207f92eea49753f2c0c78a2be3301109067e38 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 4 Jul 2024 19:40:11 -0400 Subject: [PATCH] Finally implement peer-lookup optimization.. There's a been a todo for soo long for this XD Since all `Actor`'s store a set of `._peers` we can try a lookup on that table as a shortcut before pinging the registry Bo Impl deats: - add a new `._discovery.get_peer_by_name()` routine which attempts the `._peers` lookup by combining a copy of that `dict` + an entry added for `Actor._parent_chan` (since all subs have a parent and often the desired contact is just that connection). - change `.find_actor()` (for the `only_first == True` case), `.query_actor()` and `.wait_for_actor()` to call the new helper and deliver appropriate outputs if possible. Other, - deprecate `get_arbiter()` def and all usage in tests and examples. - drop lingering use of `arbiter_sockaddr` arg to various routines. - tweak the `Actor` doc str as well as some code fmting and a tweak to the `._stream_handler()`'s initial `con_status: str` logging value since the way it was could never be reached.. oh and `.warning()` on any new connections which already have a `_pre_chan: Channel` entry in `._peers` so we can start minimizing IPC duplications. --- examples/service_discovery.py | 2 +- tests/test_discovery.py | 6 +- tests/test_local.py | 2 +- tests/test_multi_program.py | 4 +- tractor/__init__.py | 2 +- tractor/_discovery.py | 170 +++++++++++++++++++--------------- tractor/_runtime.py | 90 +++++++++--------- 7 files changed, 151 insertions(+), 125 deletions(-) diff --git a/examples/service_discovery.py b/examples/service_discovery.py index 858f7f1..a0f37b8 100644 --- a/examples/service_discovery.py +++ b/examples/service_discovery.py @@ -9,7 +9,7 @@ async def main(service_name): async with tractor.open_nursery() as an: await an.start_actor(service_name) - async with tractor.get_arbiter('127.0.0.1', 1616) as portal: + async with tractor.get_registry('127.0.0.1', 1616) as portal: print(f"Arbiter is listening on {portal.channel}") async with tractor.wait_for_actor(service_name) as sockaddr: diff --git a/tests/test_discovery.py b/tests/test_discovery.py index cd9dc02..508fdbe 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -26,7 +26,7 @@ async def test_reg_then_unreg(reg_addr): portal = await n.start_actor('actor', enable_modules=[__name__]) uid = portal.channel.uid - async with tractor.get_arbiter(*reg_addr) as aportal: + async with tractor.get_registry(*reg_addr) as aportal: # this local actor should be the arbiter assert actor is aportal.actor @@ -160,7 +160,7 @@ async def spawn_and_check_registry( async with tractor.open_root_actor( registry_addrs=[reg_addr], ): - async with tractor.get_arbiter(*reg_addr) as portal: + async with tractor.get_registry(*reg_addr) as portal: # runtime needs to be up to call this actor = tractor.current_actor() @@ -298,7 +298,7 @@ async def close_chans_before_nursery( async with tractor.open_root_actor( registry_addrs=[reg_addr], ): - async with tractor.get_arbiter(*reg_addr) as aportal: + async with tractor.get_registry(*reg_addr) as aportal: try: get_reg = partial(unpack_reg, aportal) diff --git a/tests/test_local.py b/tests/test_local.py index a019d77..ecdad5f 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -38,7 +38,7 @@ async def test_self_is_registered_localportal(reg_addr): "Verify waiting on the arbiter to register itself using a local portal." actor = tractor.current_actor() assert actor.is_arbiter - async with tractor.get_arbiter(*reg_addr) as portal: + async with tractor.get_registry(*reg_addr) as portal: assert isinstance(portal, tractor._portal.LocalPortal) with trio.fail_after(0.2): diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index 0b6b5ba..27521a0 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -32,7 +32,7 @@ def test_abort_on_sigint(daemon): @tractor_test async def test_cancel_remote_arbiter(daemon, reg_addr): assert not tractor.current_actor().is_arbiter - async with tractor.get_arbiter(*reg_addr) as portal: + async with tractor.get_registry(*reg_addr) as portal: await portal.cancel_actor() time.sleep(0.1) @@ -41,7 +41,7 @@ async def test_cancel_remote_arbiter(daemon, reg_addr): # no arbiter socket should exist with pytest.raises(OSError): - async with tractor.get_arbiter(*reg_addr) as portal: + async with tractor.get_registry(*reg_addr) as portal: pass diff --git a/tractor/__init__.py b/tractor/__init__.py index bda4ac2..3bfda13 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -30,7 +30,7 @@ from ._streaming import ( stream as stream, ) from ._discovery import ( - get_arbiter as get_arbiter, + get_registry as get_registry, find_actor as find_actor, wait_for_actor as wait_for_actor, query_actor as query_actor, diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 99a4dd6..a681c63 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -26,8 +26,8 @@ from typing import ( TYPE_CHECKING, ) from contextlib import asynccontextmanager as acm -import warnings +from tractor.log import get_logger from .trionics import gather_contexts from ._ipc import _connect_chan, Channel from ._portal import ( @@ -40,11 +40,13 @@ from ._state import ( _runtime_vars, ) - if TYPE_CHECKING: from ._runtime import Actor +log = get_logger(__name__) + + @acm async def get_registry( host: str, @@ -56,14 +58,12 @@ async def get_registry( ]: ''' Return a portal instance connected to a local or remote - arbiter. + registry-service actor; if a connection already exists re-use it + (presumably to call a `.register_actor()` registry runtime RPC + ep). ''' - actor = current_actor() - - if not actor: - raise RuntimeError("No actor instance has been defined yet?") - + actor: Actor = current_actor() if actor.is_registrar: # we're already the arbiter # (likely a re-entrant call from the arbiter actor) @@ -72,6 +72,8 @@ async def get_registry( Channel((host, port)) ) else: + # TODO: try to look pre-existing connection from + # `Actor._peers` and use it instead? async with ( _connect_chan(host, port) as chan, open_portal(chan) as regstr_ptl, @@ -80,19 +82,6 @@ async def get_registry( -# TODO: deprecate and this remove _arbiter form! -@acm -async def get_arbiter(*args, **kwargs): - warnings.warn( - '`tractor.get_arbiter()` is now deprecated!\n' - 'Use `.get_registry()` instead!', - DeprecationWarning, - stacklevel=2, - ) - async with get_registry(*args, **kwargs) as to_yield: - yield to_yield - - @acm async def get_root( **kwargs, @@ -110,22 +99,53 @@ async def get_root( yield portal +def get_peer_by_name( + name: str, + # uuid: str|None = None, + +) -> list[Channel]|None: # at least 1 + ''' + Scan for an existing connection (set) to a named actor + and return any channels from `Actor._peers`. + + This is an optimization method over querying the registrar for + the same info. + + ''' + actor: Actor = current_actor() + to_scan: dict[tuple, list[Channel]] = actor._peers.copy() + pchan: Channel|None = actor._parent_chan + if pchan: + to_scan[pchan.uid].append(pchan) + + for aid, chans in to_scan.items(): + _, peer_name = aid + if name == peer_name: + if not chans: + log.warning( + 'No IPC chans for matching peer {peer_name}\n' + ) + continue + return chans + + return None + + @acm async def query_actor( name: str, - arbiter_sockaddr: tuple[str, int] | None = None, - regaddr: tuple[str, int] | None = None, + regaddr: tuple[str, int]|None = None, ) -> AsyncGenerator[ - tuple[str, int] | None, + tuple[str, int]|None, None, ]: ''' - Make a transport address lookup for an actor name to a specific - registrar. + Lookup a transport address (by actor name) via querying a registrar + listening @ `regaddr`. - Returns the (socket) address or ``None`` if no entry under that - name exists for the given registrar listening @ `regaddr`. + Returns the transport protocol (socket) address or `None` if no + entry under that name exists. ''' actor: Actor = current_actor() @@ -137,14 +157,10 @@ async def query_actor( 'The current actor IS the registry!?' ) - if arbiter_sockaddr is not None: - warnings.warn( - '`tractor.query_actor(regaddr=)` is deprecated.\n' - 'Use `registry_addrs: list[tuple]` instead!', - DeprecationWarning, - stacklevel=2, - ) - regaddr: list[tuple[str, int]] = arbiter_sockaddr + maybe_peers: list[Channel]|None = get_peer_by_name(name) + if maybe_peers: + yield maybe_peers[0].raddr + return reg_portal: Portal regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0] @@ -159,10 +175,28 @@ async def query_actor( yield sockaddr +@acm +async def maybe_open_portal( + addr: tuple[str, int], + name: str, +): + async with query_actor( + name=name, + regaddr=addr, + ) as sockaddr: + pass + + if sockaddr: + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal + else: + yield None + + @acm async def find_actor( name: str, - arbiter_sockaddr: tuple[str, int]|None = None, registry_addrs: list[tuple[str, int]]|None = None, only_first: bool = True, @@ -179,29 +213,12 @@ async def find_actor( known to the arbiter. ''' - if arbiter_sockaddr is not None: - warnings.warn( - '`tractor.find_actor(arbiter_sockaddr=)` is deprecated.\n' - 'Use `registry_addrs: list[tuple]` instead!', - DeprecationWarning, - stacklevel=2, - ) - registry_addrs: list[tuple[str, int]] = [arbiter_sockaddr] - - @acm - async def maybe_open_portal_from_reg_addr( - addr: tuple[str, int], - ): - async with query_actor( - name=name, - regaddr=addr, - ) as sockaddr: - if sockaddr: - async with _connect_chan(*sockaddr) as chan: - async with open_portal(chan) as portal: - yield portal - else: - yield None + # optimization path, use any pre-existing peer channel + maybe_peers: list[Channel]|None = get_peer_by_name(name) + if maybe_peers and only_first: + async with open_portal(maybe_peers[0]) as peer_portal: + yield peer_portal + return if not registry_addrs: # XXX NOTE: make sure to dynamically read the value on @@ -217,10 +234,13 @@ async def find_actor( maybe_portals: list[ AsyncContextManager[tuple[str, int]] ] = list( - maybe_open_portal_from_reg_addr(addr) + maybe_open_portal( + addr=addr, + name=name, + ) for addr in registry_addrs ) - + portals: list[Portal] async with gather_contexts( mngrs=maybe_portals, ) as portals: @@ -254,31 +274,31 @@ async def find_actor( @acm async def wait_for_actor( name: str, - arbiter_sockaddr: tuple[str, int] | None = None, registry_addr: tuple[str, int] | None = None, ) -> AsyncGenerator[Portal, None]: ''' - Wait on an actor to register with the arbiter. - - A portal to the first registered actor is returned. + Wait on at least one peer actor to register `name` with the + registrar, yield a `Portal to the first registree. ''' actor: Actor = current_actor() - if arbiter_sockaddr is not None: - warnings.warn( - '`tractor.wait_for_actor(arbiter_sockaddr=)` is deprecated.\n' - 'Use `registry_addr: tuple` instead!', - DeprecationWarning, - stacklevel=2, - ) - registry_addr: tuple[str, int] = arbiter_sockaddr + # optimization path, use any pre-existing peer channel + maybe_peers: list[Channel]|None = get_peer_by_name(name) + if maybe_peers: + async with open_portal(maybe_peers[0]) as peer_portal: + yield peer_portal + return + regaddr: tuple[str, int] = ( + registry_addr + or + actor.reg_addrs[0] + ) # TODO: use `.trionics.gather_contexts()` like # above in `find_actor()` as well? reg_portal: Portal - regaddr: tuple[str, int] = registry_addr or actor.reg_addrs[0] async with get_registry(*regaddr) as reg_portal: sockaddrs = await reg_portal.run_from_ns( 'self', diff --git a/tractor/_runtime.py b/tractor/_runtime.py index f946cb1..5615373 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -115,25 +115,26 @@ class Actor: ''' The fundamental "runtime" concurrency primitive. - An *actor* is the combination of a regular Python process executing - a ``trio`` task tree, communicating with other actors through - "memory boundary portals" - which provide a native async API around - IPC transport "channels" which themselves encapsulate various - (swappable) network protocols. + An "actor" is the combination of a regular Python process + executing a `trio.run()` task tree, communicating with other + "actors" through "memory boundary portals": `Portal`, which + provide a high-level async API around IPC "channels" (`Channel`) + which themselves encapsulate various (swappable) network + transport protocols for sending msgs between said memory domains + (processes, hosts, non-GIL threads). - - Each "actor" is ``trio.run()`` scheduled "runtime" composed of - many concurrent tasks in a single thread. The "runtime" tasks - conduct a slew of low(er) level functions to make it possible - for message passing between actors as well as the ability to - create new actors (aka new "runtimes" in new processes which - are supervised via a nursery construct). Each task which sends - messages to a task in a "peer" (not necessarily a parent-child, + Each "actor" is `trio.run()` scheduled "runtime" composed of many + concurrent tasks in a single thread. The "runtime" tasks conduct + a slew of low(er) level functions to make it possible for message + passing between actors as well as the ability to create new + actors (aka new "runtimes" in new processes which are supervised + via an "actor-nursery" construct). Each task which sends messages + to a task in a "peer" actor (not necessarily a parent-child, depth hierarchy) is able to do so via an "address", which maps IPC connections across memory boundaries, and a task request id - which allows for per-actor tasks to send and receive messages - to specific peer-actor tasks with which there is an ongoing - RPC/IPC dialog. + which allows for per-actor tasks to send and receive messages to + specific peer-actor tasks with which there is an ongoing RPC/IPC + dialog. ''' # ugh, we need to get rid of this and replace with a "registry" sys @@ -230,17 +231,20 @@ class Actor: # by the user (currently called the "arbiter") self._spawn_method: str = spawn_method - self._peers: defaultdict = defaultdict(list) + self._peers: defaultdict[ + str, # uaid + list[Channel], # IPC conns from peer + ] = defaultdict(list) self._peer_connected: dict[tuple[str, str], trio.Event] = {} self._no_more_peers = trio.Event() self._no_more_peers.set() + + # RPC state self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks.set() - - # (chan, cid) -> (cancel_scope, func) self._rpc_tasks: dict[ - tuple[Channel, str], - tuple[Context, Callable, trio.Event] + tuple[Channel, str], # (chan, cid) + tuple[Context, Callable, trio.Event] # (ctx=>, fn(), done?) ] = {} # map {actor uids -> Context} @@ -317,7 +321,10 @@ class Actor: event = self._peer_connected.setdefault(uid, trio.Event()) await event.wait() log.debug(f'{uid!r} successfully connected back to us') - return event, self._peers[uid][-1] + return ( + event, + self._peers[uid][-1], + ) def load_modules( self, @@ -408,26 +415,11 @@ class Actor: ''' self._no_more_peers = trio.Event() # unset by making new chan = Channel.from_stream(stream) - their_uid: tuple[str, str]|None = chan.uid - - con_status: str = '' - - # TODO: remove this branch since can never happen? - # NOTE: `.uid` is only set after first contact - if their_uid: - con_status = ( - 'IPC Re-connection from already known peer?\n' - ) - else: - con_status = ( - 'New inbound IPC connection <=\n' - ) - - con_status += ( + con_status: str = ( + 'New inbound IPC connection <=\n' f'|_{chan}\n' - # f' |_@{chan.raddr}\n\n' - # ^-TODO-^ remove since alfready in chan.__repr__()? ) + # send/receive initial handshake response try: uid: tuple|None = await self._do_handshake(chan) @@ -452,9 +444,22 @@ class Actor: ) return + familiar: str = 'new-peer' + if _pre_chan := self._peers.get(uid): + familiar: str = 'pre-existing-peer' + uid_short: str = f'{uid[0]}[{uid[1][-6:]}]' con_status += ( - f' -> Handshake with actor `{uid[0]}[{uid[1][-6:]}]` complete\n' + f' -> Handshake with {familiar} `{uid_short}` complete\n' ) + + if _pre_chan: + log.warning( + # con_status += ( + # ^TODO^ swap once we minimize conn duplication + f' -> Wait, we already have IPC with `{uid_short}`??\n' + f' |_{_pre_chan}\n' + ) + # IPC connection tracking for both peers and new children: # - if this is a new channel to a locally spawned # sub-actor there will be a spawn wait even registered @@ -1550,7 +1555,7 @@ class Actor: def accept_addr(self) -> tuple[str, int]: ''' Primary address to which the IPC transport server is - bound. + bound and listening for new connections. ''' # throws OSError on failure @@ -1567,6 +1572,7 @@ class Actor: def get_chans( self, uid: tuple[str, str], + ) -> list[Channel]: ''' Return all IPC channels to the actor with provided `uid`.