De-densify some code

bi_streaming
Tyler Goodlet 2021-06-30 08:45:09 -04:00
parent 6f22ee8621
commit 6e75913480
1 changed files with 16 additions and 3 deletions

View File

@ -16,12 +16,14 @@ from ._state import current_actor, _runtime_vars
@asynccontextmanager @asynccontextmanager
async def get_arbiter( async def get_arbiter(
host: str, host: str,
port: int, port: int,
) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: ) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
"""Return a portal instance connected to a local or remote '''Return a portal instance connected to a local or remote
arbiter. arbiter.
""" '''
actor = current_actor() actor = current_actor()
if not actor: if not actor:
@ -33,16 +35,20 @@ async def get_arbiter(
yield LocalPortal(actor, Channel((host, port))) yield LocalPortal(actor, Channel((host, port)))
else: else:
async with _connect_chan(host, port) as chan: async with _connect_chan(host, port) as chan:
async with open_portal(chan) as arb_portal: async with open_portal(chan) as arb_portal:
yield arb_portal yield arb_portal
@asynccontextmanager @asynccontextmanager
async def get_root( async def get_root(
**kwargs, **kwargs,
) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]: ) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
host, port = _runtime_vars['_root_mailbox'] host, port = _runtime_vars['_root_mailbox']
assert host is not None assert host is not None
async with _connect_chan(host, port) as chan: async with _connect_chan(host, port) as chan:
async with open_portal(chan, **kwargs) as portal: async with open_portal(chan, **kwargs) as portal:
yield portal yield portal
@ -60,12 +66,16 @@ async def find_actor(
""" """
actor = current_actor() actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddr = await arb_portal.run_from_ns('self', 'find_actor', name=name) sockaddr = await arb_portal.run_from_ns('self', 'find_actor', name=name)
# TODO: return portals to all available actors - for now just # TODO: return portals to all available actors - for now just
# the last one that registered # the last one that registered
if name == 'arbiter' and actor.is_arbiter: if name == 'arbiter' and actor.is_arbiter:
raise RuntimeError("The current actor is the arbiter") raise RuntimeError("The current actor is the arbiter")
elif sockaddr: elif sockaddr:
async with _connect_chan(*sockaddr) as chan: async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:
yield portal yield portal
@ -83,9 +93,12 @@ async def wait_for_actor(
A portal to the first registered actor is returned. A portal to the first registered actor is returned.
""" """
actor = current_actor() actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddrs = await arb_portal.run_from_ns('self', 'wait_for_actor', name=name) sockaddrs = await arb_portal.run_from_ns('self', 'wait_for_actor', name=name)
sockaddr = sockaddrs[-1] sockaddr = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan: async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal: async with open_portal(chan) as portal:
yield portal yield portal