Factor daemon spawning logic, use it to spawn emsd

ems_to_bidir_streaming
Tyler Goodlet 2021-06-01 06:56:55 -04:00
parent 0da02aa260
commit 7fb2c95ef1
2 changed files with 91 additions and 70 deletions

View File

@ -197,6 +197,66 @@ _data_mods = [
]
class Brokerd:
locks = defaultdict(trio.Lock)
@asynccontextmanager
async def maybe_spawn_daemon(
service_name: str,
spawn_func: Callable,
spawn_args: dict[str, Any],
# brokername: str,
loglevel: Optional[str] = None,
**kwargs,
) -> tractor.Portal:
"""
If no ``service_name`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
"""
if loglevel:
get_console_log(loglevel)
# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Brokerd.locks[service_name]
await lock.acquire()
# attach to existing brokerd if possible
async with tractor.find_actor(service_name) as portal:
if portal is not None:
lock.release()
yield portal
return
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel,
**kwargs,
) as pikerd_portal:
if pikerd_portal is None:
# we are root so spawn brokerd directly in our tree
# the root nursery is accessed through process global state
# await spawn_brokerd(brokername, loglevel=loglevel)
await spawn_func(**spawn_args)
else:
await pikerd_portal.run(
spawn_func,
**spawn_args,
)
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
async def spawn_brokerd(
brokername: str,
@ -242,10 +302,6 @@ async def spawn_brokerd(
return dname
class Brokerd:
locks = defaultdict(trio.Lock)
@asynccontextmanager
async def maybe_spawn_brokerd(
@ -253,52 +309,20 @@ async def maybe_spawn_brokerd(
loglevel: Optional[str] = None,
**kwargs,
) -> tractor._portal.Portal:
"""
If no ``brokerd.{brokername}`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
) -> tractor.Portal:
'''Helper to spawn a brokerd service.
"""
if loglevel:
get_console_log(loglevel)
'''
async with maybe_spawn_daemon(
dname = f'brokerd.{brokername}'
# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Brokerd.locks[brokername]
await lock.acquire()
# attach to existing brokerd if possible
async with tractor.find_actor(dname) as portal:
if portal is not None:
lock.release()
yield portal
return
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
f'brokerd.{brokername}',
spawn_func=spawn_brokerd,
spawn_args={'brokername': brokername, 'loglevel': loglevel},
loglevel=loglevel,
**kwargs,
) as pikerd_portal:
if pikerd_portal is None:
# we are root so spawn brokerd directly in our tree
# the root nursery is accessed through process global state
await spawn_brokerd(brokername, loglevel=loglevel)
else:
await pikerd_portal.run(
spawn_brokerd,
brokername=brokername,
loglevel=loglevel,
)
async with tractor.wait_for_actor(dname) as portal:
lock.release()
yield portal
) as portal:
yield portal
async def spawn_emsd(
@ -328,3 +352,24 @@ async def spawn_emsd(
**extra_tractor_kwargs
)
return 'emsd'
@asynccontextmanager
async def maybe_open_emsd(
brokername: str,
loglevel: Optional[str] = None,
**kwargs,
) -> tractor._portal.Portal: # noqa
async with maybe_spawn_daemon(
'emsd',
spawn_func=spawn_emsd,
spawn_args={'brokername': brokername, 'loglevel': loglevel},
loglevel=loglevel,
**kwargs,
) as portal:
yield portal

View File

@ -30,6 +30,7 @@ import tractor
from ..data._source import Symbol
from ..log import get_logger
from ._ems import _emsd_main
from .._daemon import maybe_open_emsd
log = get_logger(__name__)
@ -174,31 +175,6 @@ async def send_order_cmds(symbol_key: str):
book._to_ems.send_nowait(cmd)
@asynccontextmanager
async def maybe_open_emsd(
brokername: str,
) -> tractor._portal.Portal: # noqa
async with tractor.find_actor('emsd') as portal:
if portal is not None:
yield portal
return
# ask remote daemon tree to spawn it
from .._daemon import spawn_emsd
async with tractor.find_actor('pikerd') as portal:
assert portal
name = await portal.run(
spawn_emsd,
brokername=brokername,
)
async with tractor.wait_for_actor(name) as portal:
yield portal
@asynccontextmanager
async def open_ems(
broker: str,