Finally implement peer-lookup optimization..

There's a been a todo for soo long for this XD

Since all `Actor`'s store a set of `._peers` we can try a lookup on that
table as a shortcut before pinging the registry Bo

Impl deats:
- add a new `._discovery.get_peer_by_name()` routine which attempts the
  `._peers` lookup by combining a copy of that `dict` + an entry added
  for `Actor._parent_chan` (since all subs have a parent and often the
  desired contact is just that connection).
- change `.find_actor()` (for the `only_first == True` case),
  `.query_actor()` and `.wait_for_actor()` to call the new helper and
  deliver appropriate outputs if possible.

Other,
- deprecate `get_arbiter()` def and all usage in tests and examples.
- drop lingering use of `arbiter_sockaddr` arg to various routines.
- tweak the `Actor` doc str as well as some code fmting and a tweak to
  the `._stream_handler()`'s initial `con_status: str` logging value
  since the way it was could never be reached.. oh and `.warning()` on
  any new connections which already have a `_pre_chan: Channel` entry in
  `._peers` so we can start minimizing IPC duplications.
aio_abandons
Tyler Goodlet 2024-07-04 19:40:11 -04:00
parent 5f8f8e98ba
commit 31207f92ee
7 changed files with 151 additions and 125 deletions

View File

@ -9,7 +9,7 @@ async def main(service_name):
async with tractor.open_nursery() as an: async with tractor.open_nursery() as an:
await an.start_actor(service_name) await an.start_actor(service_name)
async with tractor.get_arbiter('127.0.0.1', 1616) as portal: async with tractor.get_registry('127.0.0.1', 1616) as portal:
print(f"Arbiter is listening on {portal.channel}") print(f"Arbiter is listening on {portal.channel}")
async with tractor.wait_for_actor(service_name) as sockaddr: async with tractor.wait_for_actor(service_name) as sockaddr:

View File

@ -26,7 +26,7 @@ async def test_reg_then_unreg(reg_addr):
portal = await n.start_actor('actor', enable_modules=[__name__]) portal = await n.start_actor('actor', enable_modules=[__name__])
uid = portal.channel.uid uid = portal.channel.uid
async with tractor.get_arbiter(*reg_addr) as aportal: async with tractor.get_registry(*reg_addr) as aportal:
# this local actor should be the arbiter # this local actor should be the arbiter
assert actor is aportal.actor assert actor is aportal.actor
@ -160,7 +160,7 @@ async def spawn_and_check_registry(
async with tractor.open_root_actor( async with tractor.open_root_actor(
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
): ):
async with tractor.get_arbiter(*reg_addr) as portal: async with tractor.get_registry(*reg_addr) as portal:
# runtime needs to be up to call this # runtime needs to be up to call this
actor = tractor.current_actor() actor = tractor.current_actor()
@ -298,7 +298,7 @@ async def close_chans_before_nursery(
async with tractor.open_root_actor( async with tractor.open_root_actor(
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
): ):
async with tractor.get_arbiter(*reg_addr) as aportal: async with tractor.get_registry(*reg_addr) as aportal:
try: try:
get_reg = partial(unpack_reg, aportal) get_reg = partial(unpack_reg, aportal)

View File

@ -38,7 +38,7 @@ async def test_self_is_registered_localportal(reg_addr):
"Verify waiting on the arbiter to register itself using a local portal." "Verify waiting on the arbiter to register itself using a local portal."
actor = tractor.current_actor() actor = tractor.current_actor()
assert actor.is_arbiter assert actor.is_arbiter
async with tractor.get_arbiter(*reg_addr) as portal: async with tractor.get_registry(*reg_addr) as portal:
assert isinstance(portal, tractor._portal.LocalPortal) assert isinstance(portal, tractor._portal.LocalPortal)
with trio.fail_after(0.2): with trio.fail_after(0.2):

View File

@ -32,7 +32,7 @@ def test_abort_on_sigint(daemon):
@tractor_test @tractor_test
async def test_cancel_remote_arbiter(daemon, reg_addr): async def test_cancel_remote_arbiter(daemon, reg_addr):
assert not tractor.current_actor().is_arbiter assert not tractor.current_actor().is_arbiter
async with tractor.get_arbiter(*reg_addr) as portal: async with tractor.get_registry(*reg_addr) as portal:
await portal.cancel_actor() await portal.cancel_actor()
time.sleep(0.1) time.sleep(0.1)
@ -41,7 +41,7 @@ async def test_cancel_remote_arbiter(daemon, reg_addr):
# no arbiter socket should exist # no arbiter socket should exist
with pytest.raises(OSError): with pytest.raises(OSError):
async with tractor.get_arbiter(*reg_addr) as portal: async with tractor.get_registry(*reg_addr) as portal:
pass pass

View File

@ -30,7 +30,7 @@ from ._streaming import (
stream as stream, stream as stream,
) )
from ._discovery import ( from ._discovery import (
get_arbiter as get_arbiter, get_registry as get_registry,
find_actor as find_actor, find_actor as find_actor,
wait_for_actor as wait_for_actor, wait_for_actor as wait_for_actor,
query_actor as query_actor, query_actor as query_actor,

View File

@ -26,8 +26,8 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
import warnings
from tractor.log import get_logger
from .trionics import gather_contexts from .trionics import gather_contexts
from ._ipc import _connect_chan, Channel from ._ipc import _connect_chan, Channel
from ._portal import ( from ._portal import (
@ -40,11 +40,13 @@ from ._state import (
_runtime_vars, _runtime_vars,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor from ._runtime import Actor
log = get_logger(__name__)
@acm @acm
async def get_registry( async def get_registry(
host: str, host: str,
@ -56,14 +58,12 @@ async def get_registry(
]: ]:
''' '''
Return a portal instance connected to a local or remote Return a portal instance connected to a local or remote
arbiter. registry-service actor; if a connection already exists re-use it
(presumably to call a `.register_actor()` registry runtime RPC
ep).
''' '''
actor = current_actor() actor: Actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
if actor.is_registrar: if actor.is_registrar:
# we're already the arbiter # we're already the arbiter
# (likely a re-entrant call from the arbiter actor) # (likely a re-entrant call from the arbiter actor)
@ -72,6 +72,8 @@ async def get_registry(
Channel((host, port)) Channel((host, port))
) )
else: else:
# TODO: try to look pre-existing connection from
# `Actor._peers` and use it instead?
async with ( async with (
_connect_chan(host, port) as chan, _connect_chan(host, port) as chan,
open_portal(chan) as regstr_ptl, open_portal(chan) as regstr_ptl,
@ -80,19 +82,6 @@ async def get_registry(
# TODO: deprecate and this remove _arbiter form!
@acm
async def get_arbiter(*args, **kwargs):
warnings.warn(
'`tractor.get_arbiter()` is now deprecated!\n'
'Use `.get_registry()` instead!',
DeprecationWarning,
stacklevel=2,
)
async with get_registry(*args, **kwargs) as to_yield:
yield to_yield
@acm @acm
async def get_root( async def get_root(
**kwargs, **kwargs,
@ -110,10 +99,41 @@ async def get_root(
yield portal yield portal
def get_peer_by_name(
name: str,
# uuid: str|None = None,
) -> list[Channel]|None: # at least 1
'''
Scan for an existing connection (set) to a named actor
and return any channels from `Actor._peers`.
This is an optimization method over querying the registrar for
the same info.
'''
actor: Actor = current_actor()
to_scan: dict[tuple, list[Channel]] = actor._peers.copy()
pchan: Channel|None = actor._parent_chan
if pchan:
to_scan[pchan.uid].append(pchan)
for aid, chans in to_scan.items():
_, peer_name = aid
if name == peer_name:
if not chans:
log.warning(
'No IPC chans for matching peer {peer_name}\n'
)
continue
return chans
return None
@acm @acm
async def query_actor( async def query_actor(
name: str, name: str,
arbiter_sockaddr: tuple[str, int] | None = None,
regaddr: tuple[str, int]|None = None, regaddr: tuple[str, int]|None = None,
) -> AsyncGenerator[ ) -> AsyncGenerator[
@ -121,11 +141,11 @@ async def query_actor(
None, None,
]: ]:
''' '''
Make a transport address lookup for an actor name to a specific Lookup a transport address (by actor name) via querying a registrar
registrar. listening @ `regaddr`.
Returns the (socket) address or ``None`` if no entry under that Returns the transport protocol (socket) address or `None` if no
name exists for the given registrar listening @ `regaddr`. entry under that name exists.
''' '''
actor: Actor = current_actor() actor: Actor = current_actor()
@ -137,14 +157,10 @@ async def query_actor(
'The current actor IS the registry!?' 'The current actor IS the registry!?'
) )
if arbiter_sockaddr is not None: maybe_peers: list[Channel]|None = get_peer_by_name(name)
warnings.warn( if maybe_peers:
'`tractor.query_actor(regaddr=<blah>)` is deprecated.\n' yield maybe_peers[0].raddr
'Use `registry_addrs: list[tuple]` instead!', return
DeprecationWarning,
stacklevel=2,
)
regaddr: list[tuple[str, int]] = arbiter_sockaddr
reg_portal: Portal reg_portal: Portal
regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0] regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0]
@ -159,10 +175,28 @@ async def query_actor(
yield sockaddr yield sockaddr
@acm
async def maybe_open_portal(
addr: tuple[str, int],
name: str,
):
async with query_actor(
name=name,
regaddr=addr,
) as sockaddr:
pass
if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
@acm @acm
async def find_actor( async def find_actor(
name: str, name: str,
arbiter_sockaddr: tuple[str, int]|None = None,
registry_addrs: list[tuple[str, int]]|None = None, registry_addrs: list[tuple[str, int]]|None = None,
only_first: bool = True, only_first: bool = True,
@ -179,29 +213,12 @@ async def find_actor(
known to the arbiter. known to the arbiter.
''' '''
if arbiter_sockaddr is not None: # optimization path, use any pre-existing peer channel
warnings.warn( maybe_peers: list[Channel]|None = get_peer_by_name(name)
'`tractor.find_actor(arbiter_sockaddr=<blah>)` is deprecated.\n' if maybe_peers and only_first:
'Use `registry_addrs: list[tuple]` instead!', async with open_portal(maybe_peers[0]) as peer_portal:
DeprecationWarning, yield peer_portal
stacklevel=2, return
)
registry_addrs: list[tuple[str, int]] = [arbiter_sockaddr]
@acm
async def maybe_open_portal_from_reg_addr(
addr: tuple[str, int],
):
async with query_actor(
name=name,
regaddr=addr,
) as sockaddr:
if sockaddr:
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
if not registry_addrs: if not registry_addrs:
# XXX NOTE: make sure to dynamically read the value on # XXX NOTE: make sure to dynamically read the value on
@ -217,10 +234,13 @@ async def find_actor(
maybe_portals: list[ maybe_portals: list[
AsyncContextManager[tuple[str, int]] AsyncContextManager[tuple[str, int]]
] = list( ] = list(
maybe_open_portal_from_reg_addr(addr) maybe_open_portal(
addr=addr,
name=name,
)
for addr in registry_addrs for addr in registry_addrs
) )
portals: list[Portal]
async with gather_contexts( async with gather_contexts(
mngrs=maybe_portals, mngrs=maybe_portals,
) as portals: ) as portals:
@ -254,31 +274,31 @@ async def find_actor(
@acm @acm
async def wait_for_actor( async def wait_for_actor(
name: str, name: str,
arbiter_sockaddr: tuple[str, int] | None = None,
registry_addr: tuple[str, int] | None = None, registry_addr: tuple[str, int] | None = None,
) -> AsyncGenerator[Portal, None]: ) -> AsyncGenerator[Portal, None]:
''' '''
Wait on an actor to register with the arbiter. Wait on at least one peer actor to register `name` with the
registrar, yield a `Portal to the first registree.
A portal to the first registered actor is returned.
''' '''
actor: Actor = current_actor() actor: Actor = current_actor()
if arbiter_sockaddr is not None: # optimization path, use any pre-existing peer channel
warnings.warn( maybe_peers: list[Channel]|None = get_peer_by_name(name)
'`tractor.wait_for_actor(arbiter_sockaddr=<foo>)` is deprecated.\n' if maybe_peers:
'Use `registry_addr: tuple` instead!', async with open_portal(maybe_peers[0]) as peer_portal:
DeprecationWarning, yield peer_portal
stacklevel=2, return
)
registry_addr: tuple[str, int] = arbiter_sockaddr
regaddr: tuple[str, int] = (
registry_addr
or
actor.reg_addrs[0]
)
# TODO: use `.trionics.gather_contexts()` like # TODO: use `.trionics.gather_contexts()` like
# above in `find_actor()` as well? # above in `find_actor()` as well?
reg_portal: Portal reg_portal: Portal
regaddr: tuple[str, int] = registry_addr or actor.reg_addrs[0]
async with get_registry(*regaddr) as reg_portal: async with get_registry(*regaddr) as reg_portal:
sockaddrs = await reg_portal.run_from_ns( sockaddrs = await reg_portal.run_from_ns(
'self', 'self',

View File

@ -115,25 +115,26 @@ class Actor:
''' '''
The fundamental "runtime" concurrency primitive. The fundamental "runtime" concurrency primitive.
An *actor* is the combination of a regular Python process executing An "actor" is the combination of a regular Python process
a ``trio`` task tree, communicating with other actors through executing a `trio.run()` task tree, communicating with other
"memory boundary portals" - which provide a native async API around "actors" through "memory boundary portals": `Portal`, which
IPC transport "channels" which themselves encapsulate various provide a high-level async API around IPC "channels" (`Channel`)
(swappable) network protocols. which themselves encapsulate various (swappable) network
transport protocols for sending msgs between said memory domains
(processes, hosts, non-GIL threads).
Each "actor" is `trio.run()` scheduled "runtime" composed of many
Each "actor" is ``trio.run()`` scheduled "runtime" composed of concurrent tasks in a single thread. The "runtime" tasks conduct
many concurrent tasks in a single thread. The "runtime" tasks a slew of low(er) level functions to make it possible for message
conduct a slew of low(er) level functions to make it possible passing between actors as well as the ability to create new
for message passing between actors as well as the ability to actors (aka new "runtimes" in new processes which are supervised
create new actors (aka new "runtimes" in new processes which via an "actor-nursery" construct). Each task which sends messages
are supervised via a nursery construct). Each task which sends to a task in a "peer" actor (not necessarily a parent-child,
messages to a task in a "peer" (not necessarily a parent-child,
depth hierarchy) is able to do so via an "address", which maps depth hierarchy) is able to do so via an "address", which maps
IPC connections across memory boundaries, and a task request id IPC connections across memory boundaries, and a task request id
which allows for per-actor tasks to send and receive messages which allows for per-actor tasks to send and receive messages to
to specific peer-actor tasks with which there is an ongoing specific peer-actor tasks with which there is an ongoing RPC/IPC
RPC/IPC dialog. dialog.
''' '''
# ugh, we need to get rid of this and replace with a "registry" sys # ugh, we need to get rid of this and replace with a "registry" sys
@ -230,17 +231,20 @@ class Actor:
# by the user (currently called the "arbiter") # by the user (currently called the "arbiter")
self._spawn_method: str = spawn_method self._spawn_method: str = spawn_method
self._peers: defaultdict = defaultdict(list) self._peers: defaultdict[
str, # uaid
list[Channel], # IPC conns from peer
] = defaultdict(list)
self._peer_connected: dict[tuple[str, str], trio.Event] = {} self._peer_connected: dict[tuple[str, str], trio.Event] = {}
self._no_more_peers = trio.Event() self._no_more_peers = trio.Event()
self._no_more_peers.set() self._no_more_peers.set()
# RPC state
self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks = trio.Event()
self._ongoing_rpc_tasks.set() self._ongoing_rpc_tasks.set()
# (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: dict[ self._rpc_tasks: dict[
tuple[Channel, str], tuple[Channel, str], # (chan, cid)
tuple[Context, Callable, trio.Event] tuple[Context, Callable, trio.Event] # (ctx=>, fn(), done?)
] = {} ] = {}
# map {actor uids -> Context} # map {actor uids -> Context}
@ -317,7 +321,10 @@ class Actor:
event = self._peer_connected.setdefault(uid, trio.Event()) event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait() await event.wait()
log.debug(f'{uid!r} successfully connected back to us') log.debug(f'{uid!r} successfully connected back to us')
return event, self._peers[uid][-1] return (
event,
self._peers[uid][-1],
)
def load_modules( def load_modules(
self, self,
@ -408,26 +415,11 @@ class Actor:
''' '''
self._no_more_peers = trio.Event() # unset by making new self._no_more_peers = trio.Event() # unset by making new
chan = Channel.from_stream(stream) chan = Channel.from_stream(stream)
their_uid: tuple[str, str]|None = chan.uid con_status: str = (
con_status: str = ''
# TODO: remove this branch since can never happen?
# NOTE: `.uid` is only set after first contact
if their_uid:
con_status = (
'IPC Re-connection from already known peer?\n'
)
else:
con_status = (
'New inbound IPC connection <=\n' 'New inbound IPC connection <=\n'
f'|_{chan}\n'
) )
con_status += (
f'|_{chan}\n'
# f' |_@{chan.raddr}\n\n'
# ^-TODO-^ remove since alfready in chan.__repr__()?
)
# send/receive initial handshake response # send/receive initial handshake response
try: try:
uid: tuple|None = await self._do_handshake(chan) uid: tuple|None = await self._do_handshake(chan)
@ -452,9 +444,22 @@ class Actor:
) )
return return
familiar: str = 'new-peer'
if _pre_chan := self._peers.get(uid):
familiar: str = 'pre-existing-peer'
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
con_status += ( con_status += (
f' -> Handshake with actor `{uid[0]}[{uid[1][-6:]}]` complete\n' f' -> Handshake with {familiar} `{uid_short}` complete\n'
) )
if _pre_chan:
log.warning(
# con_status += (
# ^TODO^ swap once we minimize conn duplication
f' -> Wait, we already have IPC with `{uid_short}`??\n'
f' |_{_pre_chan}\n'
)
# IPC connection tracking for both peers and new children: # IPC connection tracking for both peers and new children:
# - if this is a new channel to a locally spawned # - if this is a new channel to a locally spawned
# sub-actor there will be a spawn wait even registered # sub-actor there will be a spawn wait even registered
@ -1550,7 +1555,7 @@ class Actor:
def accept_addr(self) -> tuple[str, int]: def accept_addr(self) -> tuple[str, int]:
''' '''
Primary address to which the IPC transport server is Primary address to which the IPC transport server is
bound. bound and listening for new connections.
''' '''
# throws OSError on failure # throws OSError on failure
@ -1567,6 +1572,7 @@ class Actor:
def get_chans( def get_chans(
self, self,
uid: tuple[str, str], uid: tuple[str, str],
) -> list[Channel]: ) -> list[Channel]:
''' '''
Return all IPC channels to the actor with provided `uid`. Return all IPC channels to the actor with provided `uid`.