diff --git a/piker/_daemon.py b/piker/_daemon.py new file mode 100644 index 00000000..72e390f2 --- /dev/null +++ b/piker/_daemon.py @@ -0,0 +1,226 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Structured, daemon tree service management. + +""" +from typing import Optional, Union +from contextlib import asynccontextmanager + +from pydantic import BaseModel +import trio +import tractor + +from .log import get_logger, get_console_log +from .brokers import get_brokermod + + +log = get_logger(__name__) + +_root_dname = 'pikerd' +_root_modules = [ + __name__, + 'piker.clearing._ems', + 'piker.clearing._client', +] + + +class Services(BaseModel): + actor_n: tractor._trionics.ActorNursery + service_n: trio.Nursery + + class Config: + arbitrary_types_allowed = True + + +_services: Optional[Services] = None + + +@asynccontextmanager +async def open_pikerd( + loglevel: Optional[str] = None, + **kwargs, +) -> Optional[tractor._portal.Portal]: + """Start a root piker daemon who's lifetime extends indefinitely + until cancelled. + + A root actor nursery is created which can be used to create and keep + alive underling services (see below). + + """ + global _services + assert _services is None + + # XXX: this may open a root actor as well + async with tractor.open_root_actor( + # passed through to ``open_root_actor`` + name=_root_dname, + loglevel=loglevel, + # TODO: eventually we should be able to avoid + # having the root have more then permissions to + # spawn other specialized daemons I think? + # enable_modules=[__name__], + enable_modules=_root_modules, + ) as _, tractor.open_nursery() as actor_nursery: + async with trio.open_nursery() as service_nursery: + + # assign globally for future daemon/task creation + _services = Services( + actor_n=actor_nursery, + service_n=service_nursery + ) + + yield _services + + +@asynccontextmanager +async def maybe_open_pikerd( + loglevel: Optional[str] = None, + **kwargs, +) -> Union[tractor._portal.Portal, Services]: + """If no ``pikerd`` daemon-root-actor can be found start it and + yield up (we should probably figure out returning a portal to self + though). + + """ + if loglevel: + get_console_log(loglevel) + + try: + async with tractor.find_actor(_root_dname) as portal: + assert portal is not None + yield portal + return + + except (RuntimeError, AssertionError): # tractor runtime not started yet + + # presume pikerd role + async with open_pikerd( + loglevel, + **kwargs, + ) as _: + # in the case where we're starting up the + # tractor-piker runtime stack in **this** process + # we return no portal to self. + yield None + + +# brokerd enabled modules +_data_mods = [ + 'piker.brokers.core', + 'piker.brokers.data', + 'piker.data', + 'piker.data._buffer' +] + + +async def spawn_brokerd( + brokername, + loglevel: Optional[str] = None, + **tractor_kwargs +) -> tractor._portal.Portal: + + log.info(f'Spawning {brokername} broker daemon') + + brokermod = get_brokermod(brokername) + dname = f'brokerd.{brokername}' + + extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) + tractor_kwargs.update(extra_tractor_kwargs) + + global _services + assert _services + + await _services.actor_n.start_actor( + dname, + enable_modules=_data_mods + [brokermod.__name__], + loglevel=loglevel, + **tractor_kwargs + ) + + return dname + + +@asynccontextmanager +async def maybe_spawn_brokerd( + brokername: str, + loglevel: Optional[str] = None, + + # XXX: you should pretty much never want debug mode + # for data daemons when running in production. + debug_mode: bool = True, +) -> tractor._portal.Portal: + """If no ``brokerd.{brokername}`` daemon-actor can be found, + spawn one in a local subactor and return a portal to it. + + """ + if loglevel: + get_console_log(loglevel) + + dname = f'brokerd.{brokername}' + + # attach to existing brokerd if possible + async with tractor.find_actor(dname) as portal: + if portal is not None: + yield portal + return + + # ask root ``pikerd`` daemon to spawn the daemon we need if + # pikerd is not live we now become the root of the + # process tree + async with maybe_open_pikerd( + loglevel=loglevel + ) as pikerd_portal: + + if pikerd_portal is None: + # we are root so spawn brokerd directly in our tree + # the root nursery is accessed through process global state + await spawn_brokerd(brokername, loglevel=loglevel) + + else: + await pikerd_portal.run( + spawn_brokerd, + brokername=brokername, + loglevel=loglevel, + debug_mode=debug_mode, + ) + + async with tractor.wait_for_actor(dname) as portal: + yield portal + + +async def spawn_emsd( + brokername, + loglevel: Optional[str] = None, + **extra_tractor_kwargs +) -> tractor._portal.Portal: + + log.info('Spawning emsd') + + # TODO: raise exception when _services == None? + global _services + + await _services.actor_n.start_actor( + 'emsd', + enable_modules=[ + 'piker.clearing._ems', + 'piker.clearing._client', + ], + loglevel=loglevel, + **extra_tractor_kwargs + ) + return 'emsd' diff --git a/piker/exchange/__init__.py b/piker/clearing/__init__.py similarity index 100% rename from piker/exchange/__init__.py rename to piker/clearing/__init__.py diff --git a/piker/exchange/_client.py b/piker/clearing/_client.py similarity index 92% rename from piker/exchange/_client.py rename to piker/clearing/_client.py index 0fa23e60..6accc0b8 100644 --- a/piker/exchange/_client.py +++ b/piker/clearing/_client.py @@ -168,27 +168,27 @@ async def send_order_cmds(): @asynccontextmanager async def maybe_open_emsd( -) -> 'StreamReceiveChannel': # noqa + brokername: str, +) -> tractor._portal.Portal: # noqa async with tractor.find_actor('emsd') as portal: if portal is not None: yield portal + return - else: - # we gotta spawn it - log.info("Spawning EMS daemon") + # ask remote daemon tree to spawn it + from .._daemon import spawn_emsd - # TODO: add ``maybe_spawn_emsd()`` for this - async with tractor.open_nursery() as n: + async with tractor.find_actor('pikerd') as portal: + assert portal + name = await portal.run( + spawn_emsd, + brokername=brokername, + ) - portal = await n.start_actor( - 'emsd', - enable_modules=[ - 'piker.exchange._ems', - ], - ) + async with tractor.wait_for_actor(name) as portal: + yield portal - yield portal @asynccontextmanager @@ -235,7 +235,7 @@ async def open_ems( # ready for order commands book = get_orders() - async with maybe_open_emsd() as portal: + async with maybe_open_emsd(broker) as portal: trades_stream = await portal.run( _emsd_main, diff --git a/piker/exchange/_ems.py b/piker/clearing/_ems.py similarity index 100% rename from piker/exchange/_ems.py rename to piker/clearing/_ems.py diff --git a/piker/exchange/_paper_engine.py b/piker/clearing/_paper_engine.py similarity index 100% rename from piker/exchange/_paper_engine.py rename to piker/clearing/_paper_engine.py diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index b43f52b1..f8f37095 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -4,6 +4,7 @@ CLI commons. import os import click +import trio import tractor from ..log import get_console_log, get_logger, colorize_json @@ -35,13 +36,14 @@ _context_defaults = dict( def pikerd(loglevel, host, tl): """Spawn the piker broker-daemon. """ - from ..data import _data_mods + from .._daemon import _data_mods, open_pikerd get_console_log(loglevel) - tractor.run_daemon( - rpc_module_paths=_data_mods, - name='brokerd', - loglevel=loglevel if tl else None, - ) + + async def main(): + async with open_pikerd(loglevel): + await trio.sleep_forever() + + trio.run(main) @click.group(context_settings=_context_defaults) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 49b0acb9..0559cbfb 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -29,11 +29,11 @@ from typing import ( Dict, Any, Sequence, AsyncIterator, Optional ) -import trio import tractor from ..brokers import get_brokermod from ..log import get_logger, get_console_log +from .._daemon import spawn_brokerd, maybe_open_pikerd from ._normalize import iterticks from ._sharedmem import ( maybe_open_shm_array, @@ -48,7 +48,6 @@ from ._buffer import ( subscribe_ohlc_for_increment ) - __all__ = [ 'iterticks', 'maybe_open_shm_array', @@ -75,15 +74,6 @@ def get_ingestormod(name: str) -> ModuleType: return module -# capable rpc modules -_data_mods = [ - 'piker.brokers.core', - 'piker.brokers.data', - 'piker.data', - 'piker.data._buffer', -] - - @asynccontextmanager async def maybe_spawn_brokerd( brokername: str, @@ -95,11 +85,11 @@ async def maybe_spawn_brokerd( ) -> tractor._portal.Portal: """If no ``brokerd.{brokername}`` daemon-actor can be found, spawn one in a local subactor and return a portal to it. + """ if loglevel: get_console_log(loglevel) - brokermod = get_brokermod(brokername) dname = f'brokerd.{brokername}' async with tractor.find_actor(dname) as portal: @@ -107,32 +97,29 @@ async def maybe_spawn_brokerd( if portal is not None: yield portal - else: # no daemon has been spawned yet + else: + # ask root ``pikerd`` daemon to spawn the daemon we need if + # pikerd is not live we now become the root of the + # process tree + async with maybe_open_pikerd( + loglevel=loglevel + ) as pikerd_portal: - log.info(f"Spawning {brokername} broker daemon") + if pikerd_portal is None: + # we are root so spawn brokerd directly in our tree + # the root nursery is accessed through process global state + await spawn_brokerd(brokername, loglevel=loglevel) - # retrieve any special config from the broker mod - tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) - - async with tractor.open_nursery( - #debug_mode=debug_mode, - ) as nursery: - try: - # spawn new daemon - portal = await nursery.start_actor( - dname, - enable_modules=_data_mods + [brokermod.__name__], + else: + await pikerd_portal.run( + spawn_brokerd, + brokername=brokername, loglevel=loglevel, debug_mode=debug_mode, - **tractor_kwargs ) - async with tractor.wait_for_actor(dname) as portal: - yield portal - finally: - # client code may block indefinitely so cancel when - # teardown is invoked - await nursery.cancel() + async with tractor.wait_for_actor(dname) as portal: + yield portal @dataclass diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py index eb0d662c..85901d0c 100644 --- a/piker/ui/_exec.py +++ b/piker/ui/_exec.py @@ -41,6 +41,7 @@ import trio import tractor from outcome import Error +from .._daemon import maybe_open_pikerd from ..log import get_logger from ._pg_overrides import _do_overrides @@ -194,11 +195,7 @@ def run_qtractor( # define tractor entrypoint async def main(): - async with tractor.open_root_actor( - arbiter_addr=( - tractor._root._default_arbiter_host, - tractor._root._default_arbiter_port, - ), + async with maybe_open_pikerd( name='qtractor', **tractor_kwargs, ): diff --git a/piker/ui/cli.py b/piker/ui/cli.py index 387a2b4b..a407afe5 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -149,7 +149,7 @@ def chart(config, symbol, profile): 'debug_mode': True, 'loglevel': tractorloglevel, 'enable_modules': [ - 'piker.exchange._client' + 'piker.clearing._client' ], }, ) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 84806156..27059894 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -31,7 +31,7 @@ from pydantic import BaseModel from ._graphics._lines import LevelLine, position_line from ._interaction import LineEditor, ArrowEditor, _order_lines -from ..exchange._client import open_ems, OrderBook +from ..clearing._client import open_ems, OrderBook from ..data._source import Symbol from ..log import get_logger