From 189c56c01254a4c0a94fc773fdc38c9414af8093 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Sun, 31 Jan 2021 18:11:50 -0300 Subject: [PATCH 01/18] Add piker root daemon spawning machinery Refactor maybe_spawn_brokerd to adapt to new process tree structure and add a ``maybe_open_pikerd``. --- piker/_daemon.py | 85 ++++++++++++++++++++++++++++++++++++++++++ piker/data/__init__.py | 51 ++++++++++--------------- 2 files changed, 104 insertions(+), 32 deletions(-) create mode 100644 piker/_daemon.py diff --git a/piker/_daemon.py b/piker/_daemon.py new file mode 100644 index 00000000..a7030693 --- /dev/null +++ b/piker/_daemon.py @@ -0,0 +1,85 @@ +""" +pikerd daemon lifecylcle & rpc +""" +from typing import Optional +from contextlib import asynccontextmanager + +import tractor + +from .log import get_logger, get_console_log +from .brokers import get_brokermod + + +log = get_logger(__name__) + +_root_nursery: Optional[tractor._trionics.ActorNursery] = None +root_dname = 'pikerd' +root_modules = [ + __name__, + 'piker._ems' +] + + +@asynccontextmanager +async def maybe_open_pikerd( + loglevel: Optional[str] = None +) -> Optional[tractor._portal.Portal]: + """If no ``pikerd`` daemon-root-actor can be found, + assume that role and return a portal to myself + + """ + global _root_nursery + + if loglevel: + get_console_log(loglevel) + + async with tractor.find_actor(root_dname) as portal: + + if portal is not None: # pikerd exists + yield portal + + else: # assume role + async with tractor.open_root_actor( + name=root_dname, + loglevel=loglevel, + enable_modules=root_modules + ): + # init root nursery + try: + async with tractor.open_nursery() as nursery: + _root_nursery = nursery + yield None + finally: + # client code may block indefinitely so cancel when + # teardown is invoked + await nursery.cancel() + + +# brokerd enable modules +_data_mods = [ + 'piker.brokers.core', + 'piker.brokers.data', + 'piker.data', + 'piker.data._buffer' +] + +async def spawn_brokerd( + brokername, + loglevel: Optional[str] = None, + **tractor_kwargs +): + + brokermod = get_brokermod(brokername) + dname = f'brokerd.{brokername}' + log.info(f'Spawning {brokername} broker daemon') + tractor_kwargs = getattr(brokermod, '_spawnkwargs', {}) + + # TODO: raise exception when _root_nursery == None? + global _root_nursery + await _root_nursery.start_actor( + dname, + enable_modules=_data_mods + [brokermod.__name__], + loglevel=loglevel, + **tractor_kwargs + ) + diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 49b0acb9..0559cbfb 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -29,11 +29,11 @@ from typing import ( Dict, Any, Sequence, AsyncIterator, Optional ) -import trio import tractor from ..brokers import get_brokermod from ..log import get_logger, get_console_log +from .._daemon import spawn_brokerd, maybe_open_pikerd from ._normalize import iterticks from ._sharedmem import ( maybe_open_shm_array, @@ -48,7 +48,6 @@ from ._buffer import ( subscribe_ohlc_for_increment ) - __all__ = [ 'iterticks', 'maybe_open_shm_array', @@ -75,15 +74,6 @@ def get_ingestormod(name: str) -> ModuleType: return module -# capable rpc modules -_data_mods = [ - 'piker.brokers.core', - 'piker.brokers.data', - 'piker.data', - 'piker.data._buffer', -] - - @asynccontextmanager async def maybe_spawn_brokerd( brokername: str, @@ -95,11 +85,11 @@ async def maybe_spawn_brokerd( ) -> tractor._portal.Portal: """If no ``brokerd.{brokername}`` daemon-actor can be found, spawn one in a local subactor and return a portal to it. + """ if loglevel: get_console_log(loglevel) - brokermod = get_brokermod(brokername) dname = f'brokerd.{brokername}' async with tractor.find_actor(dname) as portal: @@ -107,32 +97,29 @@ async def maybe_spawn_brokerd( if portal is not None: yield portal - else: # no daemon has been spawned yet + else: + # ask root ``pikerd`` daemon to spawn the daemon we need if + # pikerd is not live we now become the root of the + # process tree + async with maybe_open_pikerd( + loglevel=loglevel + ) as pikerd_portal: - log.info(f"Spawning {brokername} broker daemon") + if pikerd_portal is None: + # we are root so spawn brokerd directly in our tree + # the root nursery is accessed through process global state + await spawn_brokerd(brokername, loglevel=loglevel) - # retrieve any special config from the broker mod - tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) - - async with tractor.open_nursery( - #debug_mode=debug_mode, - ) as nursery: - try: - # spawn new daemon - portal = await nursery.start_actor( - dname, - enable_modules=_data_mods + [brokermod.__name__], + else: + await pikerd_portal.run( + spawn_brokerd, + brokername=brokername, loglevel=loglevel, debug_mode=debug_mode, - **tractor_kwargs ) - async with tractor.wait_for_actor(dname) as portal: - yield portal - finally: - # client code may block indefinitely so cancel when - # teardown is invoked - await nursery.cancel() + async with tractor.wait_for_actor(dname) as portal: + yield portal @dataclass From 2b9ac8d8ec18eff7faecf51814ac5d3377e0c32c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 20 Mar 2021 13:33:32 -0400 Subject: [PATCH 02/18] Port daemon(s) to new clearing system apis --- piker/_daemon.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index a7030693..9fd354cf 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -13,10 +13,12 @@ from .brokers import get_brokermod log = get_logger(__name__) _root_nursery: Optional[tractor._trionics.ActorNursery] = None -root_dname = 'pikerd' -root_modules = [ +_root_dname = 'pikerd' + +_root_modules = [ __name__, - 'piker._ems' + 'piker.exchange._ems', + 'piker.exchange._client', ] @@ -33,16 +35,16 @@ async def maybe_open_pikerd( if loglevel: get_console_log(loglevel) - async with tractor.find_actor(root_dname) as portal: + async with tractor.find_actor(_root_dname) as portal: if portal is not None: # pikerd exists yield portal else: # assume role async with tractor.open_root_actor( - name=root_dname, + name=_root_dname, loglevel=loglevel, - enable_modules=root_modules + enable_modules=_root_modules ): # init root nursery try: @@ -55,7 +57,7 @@ async def maybe_open_pikerd( await nursery.cancel() -# brokerd enable modules +# brokerd enabled modules _data_mods = [ 'piker.brokers.core', 'piker.brokers.data', @@ -63,11 +65,12 @@ _data_mods = [ 'piker.data._buffer' ] + async def spawn_brokerd( brokername, loglevel: Optional[str] = None, **tractor_kwargs -): +) -> tractor._portal.Portal: brokermod = get_brokermod(brokername) dname = f'brokerd.{brokername}' @@ -76,10 +79,10 @@ async def spawn_brokerd( # TODO: raise exception when _root_nursery == None? global _root_nursery - await _root_nursery.start_actor( + portal = await _root_nursery.start_actor( dname, enable_modules=_data_mods + [brokermod.__name__], loglevel=loglevel, **tractor_kwargs ) - + return portal From 26ee7260d6090016458dba6223f24a1a8212d06f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 20 Mar 2021 15:06:00 -0400 Subject: [PATCH 03/18] Clearing is a better name --- piker/{exchange => clearing}/__init__.py | 0 piker/{exchange => clearing}/_client.py | 0 piker/{exchange => clearing}/_ems.py | 0 piker/{exchange => clearing}/_paper_engine.py | 0 4 files changed, 0 insertions(+), 0 deletions(-) rename piker/{exchange => clearing}/__init__.py (100%) rename piker/{exchange => clearing}/_client.py (100%) rename piker/{exchange => clearing}/_ems.py (100%) rename piker/{exchange => clearing}/_paper_engine.py (100%) diff --git a/piker/exchange/__init__.py b/piker/clearing/__init__.py similarity index 100% rename from piker/exchange/__init__.py rename to piker/clearing/__init__.py diff --git a/piker/exchange/_client.py b/piker/clearing/_client.py similarity index 100% rename from piker/exchange/_client.py rename to piker/clearing/_client.py diff --git a/piker/exchange/_ems.py b/piker/clearing/_ems.py similarity index 100% rename from piker/exchange/_ems.py rename to piker/clearing/_ems.py diff --git a/piker/exchange/_paper_engine.py b/piker/clearing/_paper_engine.py similarity index 100% rename from piker/exchange/_paper_engine.py rename to piker/clearing/_paper_engine.py From 8812aff3b83fefaf8827d49ca5aad45cab1122be Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 20 Mar 2021 15:06:19 -0400 Subject: [PATCH 04/18] Fix import --- piker/cli/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index b43f52b1..50f38346 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -35,7 +35,7 @@ _context_defaults = dict( def pikerd(loglevel, host, tl): """Spawn the piker broker-daemon. """ - from ..data import _data_mods + from .._daemon import _data_mods get_console_log(loglevel) tractor.run_daemon( rpc_module_paths=_data_mods, From 79d37646a273efdd0c13c1c15122ae758e534bf1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 21 Mar 2021 21:52:53 -0400 Subject: [PATCH 05/18] Add `open_pikerd()` and `spawn_emsd()` The direct open is needed for running `pikerd` cmd and the ems spawn point is the first step toward detaching UI based order entry from the engine itself. --- piker/_daemon.py | 91 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 69 insertions(+), 22 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 9fd354cf..c792d669 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -1,7 +1,7 @@ """ pikerd daemon lifecylcle & rpc """ -from typing import Optional +from typing import Optional, Dict, Callable from contextlib import asynccontextmanager import tractor @@ -14,14 +14,41 @@ log = get_logger(__name__) _root_nursery: Optional[tractor._trionics.ActorNursery] = None _root_dname = 'pikerd' - _root_modules = [ __name__, - 'piker.exchange._ems', - 'piker.exchange._client', + 'piker.clearing._ems', + 'piker.clearing._client', ] +@asynccontextmanager +async def open_pikerd( + loglevel: Optional[str] = None +) -> Optional[tractor._portal.Portal]: + + global _root_nursery + + async with tractor.open_root_actor( + name=_root_dname, + loglevel=loglevel, + + # TODO: eventually we should be able to avoid + # having the root have more then permissions to + # spawn other specialized daemons I think? + # enable_modules=[__name__], + enable_modules=_root_modules + ): + # init root nursery + try: + async with tractor.open_nursery() as nursery: + _root_nursery = nursery + yield nursery + finally: + # client code may block indefinitely so cancel when + # teardown is invoked + await nursery.cancel() + + @asynccontextmanager async def maybe_open_pikerd( loglevel: Optional[str] = None @@ -41,20 +68,12 @@ async def maybe_open_pikerd( yield portal else: # assume role - async with tractor.open_root_actor( - name=_root_dname, - loglevel=loglevel, - enable_modules=_root_modules - ): - # init root nursery - try: - async with tractor.open_nursery() as nursery: - _root_nursery = nursery - yield None - finally: - # client code may block indefinitely so cancel when - # teardown is invoked - await nursery.cancel() + async with open_pikerd(loglevel) as nursery: + # in the case where we're starting up the + # tractor-piker runtime stack in **this** process + # we want to hand off a nursery for starting (as a sub) + # whatever actor is requesting pikerd. + yield None # brokerd enabled modules @@ -72,10 +91,12 @@ async def spawn_brokerd( **tractor_kwargs ) -> tractor._portal.Portal: + log.info(f'Spawning {brokername} broker daemon') + brokermod = get_brokermod(brokername) dname = f'brokerd.{brokername}' - log.info(f'Spawning {brokername} broker daemon') - tractor_kwargs = getattr(brokermod, '_spawnkwargs', {}) + + extra_tractor_kwargs = getattr(brokermod, '_spawnkwargs', {}) # TODO: raise exception when _root_nursery == None? global _root_nursery @@ -83,6 +104,32 @@ async def spawn_brokerd( dname, enable_modules=_data_mods + [brokermod.__name__], loglevel=loglevel, - **tractor_kwargs + **extra_tractor_kwargs ) - return portal + return dname + + +async def spawn_emsd( + brokername, + loglevel: Optional[str] = None, + **extra_tractor_kwargs +) -> tractor._portal.Portal: + + from .clearing import _client + + log.info('Spawning emsd') + + # TODO: raise exception when _root_nursery == None? + global _root_nursery + assert _root_nursery + + portal = await _root_nursery.start_actor( + 'emsd', + enable_modules=[ + 'piker.clearing._ems', + 'piker.clearing._client', + ], + loglevel=loglevel, + **extra_tractor_kwargs + ) + return 'emsd' From 7ca05238af47f1822b42821a50e47fd8c6cfdd7d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 21 Mar 2021 21:56:06 -0400 Subject: [PATCH 06/18] Port pikerd entry to drop `tractor.run()` --- piker/cli/__init__.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 50f38346..f8f37095 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -4,6 +4,7 @@ CLI commons. import os import click +import trio import tractor from ..log import get_console_log, get_logger, colorize_json @@ -35,13 +36,14 @@ _context_defaults = dict( def pikerd(loglevel, host, tl): """Spawn the piker broker-daemon. """ - from .._daemon import _data_mods + from .._daemon import _data_mods, open_pikerd get_console_log(loglevel) - tractor.run_daemon( - rpc_module_paths=_data_mods, - name='brokerd', - loglevel=loglevel if tl else None, - ) + + async def main(): + async with open_pikerd(loglevel): + await trio.sleep_forever() + + trio.run(main) @click.group(context_settings=_context_defaults) From 1931da97f4a9a9eeb19795bf20635f9c36d83579 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 22 Mar 2021 06:29:09 -0400 Subject: [PATCH 07/18] Fix spawn attr typo, merge tractor kwargs --- piker/_daemon.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index c792d669..15f4b54d 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -96,7 +96,8 @@ async def spawn_brokerd( brokermod = get_brokermod(brokername) dname = f'brokerd.{brokername}' - extra_tractor_kwargs = getattr(brokermod, '_spawnkwargs', {}) + extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) + tractor_kwargs.update(extra_tractor_kwargs) # TODO: raise exception when _root_nursery == None? global _root_nursery @@ -104,7 +105,7 @@ async def spawn_brokerd( dname, enable_modules=_data_mods + [brokermod.__name__], loglevel=loglevel, - **extra_tractor_kwargs + **tractor_kwargs ) return dname From 8ce37875a02990f514ee4424cae2da83f43a7b64 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 22 Mar 2021 09:24:38 -0400 Subject: [PATCH 08/18] Use pikerd to spawn emsd --- piker/clearing/_client.py | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 0fa23e60..97412280 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -168,27 +168,28 @@ async def send_order_cmds(): @asynccontextmanager async def maybe_open_emsd( -) -> 'StreamReceiveChannel': # noqa + brokername: str, +) -> tractor._portal.Portal: # noqa async with tractor.find_actor('emsd') as portal: if portal is not None: yield portal else: - # we gotta spawn it - log.info("Spawning EMS daemon") + # ask remote daemon tree to spawn it + from .._daemon import spawn_emsd - # TODO: add ``maybe_spawn_emsd()`` for this - async with tractor.open_nursery() as n: + async with tractor.find_actor('pikerd') as portal: - portal = await n.start_actor( - 'emsd', - enable_modules=[ - 'piker.exchange._ems', - ], - ) + if portal is not None: - yield portal + name = await portal.run( + spawn_emsd, + brokername=brokername, + ) + + async with tractor.wait_for_actor(name) as portal: + yield portal @asynccontextmanager @@ -235,7 +236,7 @@ async def open_ems( # ready for order commands book = get_orders() - async with maybe_open_emsd() as portal: + async with maybe_open_emsd(broker) as portal: trades_stream = await portal.run( _emsd_main, From 4ab8545e87128dfe43b38edfdfd50cf1092df2bb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 22 Mar 2021 09:30:30 -0400 Subject: [PATCH 09/18] Return early on remote actor lookups instead of branching --- piker/_daemon.py | 67 ++++++++++++++++++++++++++---------------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 15f4b54d..25adb5ec 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -1,7 +1,7 @@ """ pikerd daemon lifecylcle & rpc """ -from typing import Optional, Dict, Callable +from typing import Optional from contextlib import asynccontextmanager import tractor @@ -23,35 +23,32 @@ _root_modules = [ @asynccontextmanager async def open_pikerd( - loglevel: Optional[str] = None + loglevel: Optional[str] = None, + **kwargs, ) -> Optional[tractor._portal.Portal]: global _root_nursery - async with tractor.open_root_actor( + # XXX: this may open a root actor as well + async with tractor.open_nursery( name=_root_dname, - loglevel=loglevel, # TODO: eventually we should be able to avoid # having the root have more then permissions to # spawn other specialized daemons I think? # enable_modules=[__name__], - enable_modules=_root_modules - ): - # init root nursery - try: - async with tractor.open_nursery() as nursery: - _root_nursery = nursery - yield nursery - finally: - # client code may block indefinitely so cancel when - # teardown is invoked - await nursery.cancel() + enable_modules=_root_modules, + + loglevel=loglevel, + ) as nursery: + _root_nursery = nursery + yield nursery @asynccontextmanager async def maybe_open_pikerd( - loglevel: Optional[str] = None + loglevel: Optional[str] = None, + **kwargs, ) -> Optional[tractor._portal.Portal]: """If no ``pikerd`` daemon-root-actor can be found, assume that role and return a portal to myself @@ -62,18 +59,27 @@ async def maybe_open_pikerd( if loglevel: get_console_log(loglevel) - async with tractor.find_actor(_root_dname) as portal: + try: + async with tractor.find_actor(_root_dname) as portal: + if portal is not None: # pikerd exists + yield portal + return - if portal is not None: # pikerd exists - yield portal + except RuntimeError: # tractor runtime not started yet + pass - else: # assume role - async with open_pikerd(loglevel) as nursery: - # in the case where we're starting up the - # tractor-piker runtime stack in **this** process - # we want to hand off a nursery for starting (as a sub) - # whatever actor is requesting pikerd. - yield None + # assume pikerd role + async with open_pikerd( + loglevel, + **kwargs, + ): + assert _root_nursery + + # in the case where we're starting up the + # tractor-piker runtime stack in **this** process + # we want to hand off a nursery for starting (as a sub) + # whatever actor is requesting pikerd. + yield None # brokerd enabled modules @@ -101,12 +107,14 @@ async def spawn_brokerd( # TODO: raise exception when _root_nursery == None? global _root_nursery - portal = await _root_nursery.start_actor( + + await _root_nursery.start_actor( dname, enable_modules=_data_mods + [brokermod.__name__], loglevel=loglevel, **tractor_kwargs ) + return dname @@ -116,15 +124,12 @@ async def spawn_emsd( **extra_tractor_kwargs ) -> tractor._portal.Portal: - from .clearing import _client - log.info('Spawning emsd') # TODO: raise exception when _root_nursery == None? global _root_nursery - assert _root_nursery - portal = await _root_nursery.start_actor( + await _root_nursery.start_actor( 'emsd', enable_modules=[ 'piker.clearing._ems', From 20a9617c18e922b3583e881f36229e98f531c365 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 22 Mar 2021 09:36:47 -0400 Subject: [PATCH 10/18] Use open pikerd from chart entrypoint --- piker/ui/_exec.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py index eb0d662c..85901d0c 100644 --- a/piker/ui/_exec.py +++ b/piker/ui/_exec.py @@ -41,6 +41,7 @@ import trio import tractor from outcome import Error +from .._daemon import maybe_open_pikerd from ..log import get_logger from ._pg_overrides import _do_overrides @@ -194,11 +195,7 @@ def run_qtractor( # define tractor entrypoint async def main(): - async with tractor.open_root_actor( - arbiter_addr=( - tractor._root._default_arbiter_host, - tractor._root._default_arbiter_port, - ), + async with maybe_open_pikerd( name='qtractor', **tractor_kwargs, ): From fb996bfffb62ee80a9cf11fbe0629f7697e96ba3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 22 Mar 2021 09:37:13 -0400 Subject: [PATCH 11/18] Change subpkg name in cli --- piker/ui/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/ui/cli.py b/piker/ui/cli.py index 387a2b4b..a407afe5 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -149,7 +149,7 @@ def chart(config, symbol, profile): 'debug_mode': True, 'loglevel': tractorloglevel, 'enable_modules': [ - 'piker.exchange._client' + 'piker.clearing._client' ], }, ) From 38471b7b347105bb14d40f7a877dad6c9b6bf082 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 22 Mar 2021 09:52:22 -0400 Subject: [PATCH 12/18] Use early return instead of branching for remote emsd spawning --- piker/clearing/_client.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 97412280..6accc0b8 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -174,22 +174,21 @@ async def maybe_open_emsd( async with tractor.find_actor('emsd') as portal: if portal is not None: yield portal + return - else: - # ask remote daemon tree to spawn it - from .._daemon import spawn_emsd + # ask remote daemon tree to spawn it + from .._daemon import spawn_emsd - async with tractor.find_actor('pikerd') as portal: + async with tractor.find_actor('pikerd') as portal: + assert portal + name = await portal.run( + spawn_emsd, + brokername=brokername, + ) - if portal is not None: + async with tractor.wait_for_actor(name) as portal: + yield portal - name = await portal.run( - spawn_emsd, - brokername=brokername, - ) - - async with tractor.wait_for_actor(name) as portal: - yield portal @asynccontextmanager From 2a51582ec0f74f778a82ae71a87e391df838720e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 25 Mar 2021 10:19:35 -0400 Subject: [PATCH 13/18] Start forming an services api Add a `Services` nurseries container singleton for spawning sub-daemons inside the long running `pikerd` tree. Bring in `brokerd` spawning util funcs to start getting eyes on what can be factored into a service api. --- piker/_daemon.py | 137 +++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 114 insertions(+), 23 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 25adb5ec..a844708a 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -1,9 +1,12 @@ """ pikerd daemon lifecylcle & rpc """ -from typing import Optional +from typing import Optional, Union from contextlib import asynccontextmanager +from functools import partial +from pydantic import BaseModel +import trio import tractor from .log import get_logger, get_console_log @@ -12,7 +15,6 @@ from .brokers import get_brokermod log = get_logger(__name__) -_root_nursery: Optional[tractor._trionics.ActorNursery] = None _root_dname = 'pikerd' _root_modules = [ __name__, @@ -21,17 +23,39 @@ _root_modules = [ ] +# @dataclass +class Services(BaseModel): + actor_n: tractor._trionics.ActorNursery + service_n: trio.Nursery + + class Config: + arbitrary_types_allowed = True + + +_services: Optional[Services] = None + + @asynccontextmanager async def open_pikerd( loglevel: Optional[str] = None, **kwargs, ) -> Optional[tractor._portal.Portal]: + """Start a root piker daemon who's lifetime extends indefinitely + until cancelled. - global _root_nursery + A root actor nursery is created which can be used to create and keep + alive underling services (see below). + + """ + global _services + assert _services is None # XXX: this may open a root actor as well async with tractor.open_nursery( + + # passed through to ``open_root_actor`` name=_root_dname, + loglevel=loglevel, # TODO: eventually we should be able to avoid # having the root have more then permissions to @@ -39,23 +63,28 @@ async def open_pikerd( # enable_modules=[__name__], enable_modules=_root_modules, - loglevel=loglevel, - ) as nursery: - _root_nursery = nursery - yield nursery + ) as actor_nursery: + async with trio.open_nursery() as service_nursery: + + # assign globally for future daemon/task creation + _services = Services( + actor_n=actor_nursery, + service_n=service_nursery + ) + + yield _services @asynccontextmanager async def maybe_open_pikerd( loglevel: Optional[str] = None, **kwargs, -) -> Optional[tractor._portal.Portal]: - """If no ``pikerd`` daemon-root-actor can be found, - assume that role and return a portal to myself +) -> Union[tractor._portal.Portal, Services]: + """If no ``pikerd`` daemon-root-actor can be found start it and + yield up (we should probably figure out returning a portal to self + though). """ - global _root_nursery - if loglevel: get_console_log(loglevel) @@ -72,13 +101,10 @@ async def maybe_open_pikerd( async with open_pikerd( loglevel, **kwargs, - ): - assert _root_nursery - + ) as _: # in the case where we're starting up the # tractor-piker runtime stack in **this** process - # we want to hand off a nursery for starting (as a sub) - # whatever actor is requesting pikerd. + # we return no portal to self. yield None @@ -97,6 +123,8 @@ async def spawn_brokerd( **tractor_kwargs ) -> tractor._portal.Portal: + from .data import _setup_persistent_feeds + log.info(f'Spawning {brokername} broker daemon') brokermod = get_brokermod(brokername) @@ -105,19 +133,82 @@ async def spawn_brokerd( extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) tractor_kwargs.update(extra_tractor_kwargs) - # TODO: raise exception when _root_nursery == None? - global _root_nursery + global _services + assert _services - await _root_nursery.start_actor( + portal = await _services.actor_n.start_actor( dname, enable_modules=_data_mods + [brokermod.__name__], loglevel=loglevel, **tractor_kwargs ) + # TODO: so i think this is the perfect use case for supporting + # a cross-actor async context manager api instead of this + # shoort-and-forget task spawned in the root nursery, we'd have an + # async exit stack that we'd register the `portal.open_context()` + # call with and then have the ability to unwind the call whenevs. + + # non-blocking setup of brokerd service nursery + _services.service_n.start_soon( + partial( + portal.run, + _setup_persistent_feeds, + brokername=brokername, + ) + ) + return dname +@asynccontextmanager +async def maybe_spawn_brokerd( + brokername: str, + loglevel: Optional[str] = None, + + # XXX: you should pretty much never want debug mode + # for data daemons when running in production. + debug_mode: bool = True, +) -> tractor._portal.Portal: + """If no ``brokerd.{brokername}`` daemon-actor can be found, + spawn one in a local subactor and return a portal to it. + + """ + if loglevel: + get_console_log(loglevel) + + dname = f'brokerd.{brokername}' + + # attach to existing brokerd if possible + async with tractor.find_actor(dname) as portal: + if portal is not None: + yield portal + return + + # ask root ``pikerd`` daemon to spawn the daemon we need if + # pikerd is not live we now become the root of the + # process tree + async with maybe_open_pikerd( + loglevel=loglevel + ) as pikerd_portal: + + if pikerd_portal is None: + # we are root so spawn brokerd directly in our tree + # the root nursery is accessed through process global state + await spawn_brokerd(brokername, loglevel=loglevel) + + else: + await pikerd_portal.run( + spawn_brokerd, + brokername=brokername, + loglevel=loglevel, + debug_mode=debug_mode, + ) + + async with tractor.wait_for_actor(dname) as portal: + yield portal + + async def spawn_emsd( brokername, loglevel: Optional[str] = None, @@ -126,10 +217,10 @@ async def spawn_emsd( log.info('Spawning emsd') - # TODO: raise exception when _root_nursery == None? - global _root_nursery + # TODO: raise exception when _services == None? + global _services - await _root_nursery.start_actor( + await _services.actor_n.start_actor( 'emsd', enable_modules=[ 'piker.clearing._ems', From 877db521629103d3ad6ec3333b3d6d5dad409664 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 26 Mar 2021 11:51:18 -0400 Subject: [PATCH 14/18] Add license header --- piker/_daemon.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index a844708a..1dc11c16 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -1,5 +1,22 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + """ -pikerd daemon lifecylcle & rpc +Structured, daemon tree service management. + """ from typing import Optional, Union from contextlib import asynccontextmanager From b1a1b323150ab9f112735d328228758ead8d2ed7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 31 Mar 2021 14:21:37 -0400 Subject: [PATCH 15/18] Better tractor startup, runtime detection --- piker/_daemon.py | 50 ++++++++++++++++++++++-------------------------- 1 file changed, 23 insertions(+), 27 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 1dc11c16..8f17ad2a 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -68,19 +68,16 @@ async def open_pikerd( assert _services is None # XXX: this may open a root actor as well - async with tractor.open_nursery( - - # passed through to ``open_root_actor`` - name=_root_dname, - loglevel=loglevel, - - # TODO: eventually we should be able to avoid - # having the root have more then permissions to - # spawn other specialized daemons I think? - # enable_modules=[__name__], - enable_modules=_root_modules, - - ) as actor_nursery: + async with tractor.open_root_actor( + # passed through to ``open_root_actor`` + name=_root_dname, + loglevel=loglevel, + # TODO: eventually we should be able to avoid + # having the root have more then permissions to + # spawn other specialized daemons I think? + # enable_modules=[__name__], + enable_modules=_root_modules, + ) as _, tractor.open_nursery() as actor_nursery: async with trio.open_nursery() as service_nursery: # assign globally for future daemon/task creation @@ -107,22 +104,21 @@ async def maybe_open_pikerd( try: async with tractor.find_actor(_root_dname) as portal: - if portal is not None: # pikerd exists - yield portal - return + assert portal is not None + yield portal + return - except RuntimeError: # tractor runtime not started yet - pass + except (RuntimeError, AssertionError): # tractor runtime not started yet - # assume pikerd role - async with open_pikerd( - loglevel, - **kwargs, - ) as _: - # in the case where we're starting up the - # tractor-piker runtime stack in **this** process - # we return no portal to self. - yield None + # presume pikerd role + async with open_pikerd( + loglevel, + **kwargs, + ) as _: + # in the case where we're starting up the + # tractor-piker runtime stack in **this** process + # we return no portal to self. + yield None # brokerd enabled modules From 6891309abd663ce812b6aa9d06f5d7f19daa8e0a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 31 Mar 2021 14:56:20 -0400 Subject: [PATCH 16/18] Fix import --- piker/ui/order_mode.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 84806156..27059894 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -31,7 +31,7 @@ from pydantic import BaseModel from ._graphics._lines import LevelLine, position_line from ._interaction import LineEditor, ArrowEditor, _order_lines -from ..exchange._client import open_ems, OrderBook +from ..clearing._client import open_ems, OrderBook from ..data._source import Symbol from ..log import get_logger From 2b7cecc78ed88062b546fd2a560a17392155cc6a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 31 Mar 2021 14:56:42 -0400 Subject: [PATCH 17/18] Drop upcoming cached feed usage --- piker/_daemon.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 8f17ad2a..da0b02b8 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -136,8 +136,6 @@ async def spawn_brokerd( **tractor_kwargs ) -> tractor._portal.Portal: - from .data import _setup_persistent_feeds - log.info(f'Spawning {brokername} broker daemon') brokermod = get_brokermod(brokername) @@ -156,21 +154,6 @@ async def spawn_brokerd( **tractor_kwargs ) - # TODO: so i think this is the perfect use case for supporting - # a cross-actor async context manager api instead of this - # shoort-and-forget task spawned in the root nursery, we'd have an - # async exit stack that we'd register the `portal.open_context()` - # call with and then have the ability to unwind the call whenevs. - - # non-blocking setup of brokerd service nursery - _services.service_n.start_soon( - partial( - portal.run, - _setup_persistent_feeds, - brokername=brokername, - ) - ) - return dname From 549f81e85dd6eb390bef19acb7dedb5b7ec87480 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 3 Apr 2021 12:29:56 -0400 Subject: [PATCH 18/18] Fix lints --- piker/_daemon.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index da0b02b8..72e390f2 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -20,7 +20,6 @@ Structured, daemon tree service management. """ from typing import Optional, Union from contextlib import asynccontextmanager -from functools import partial from pydantic import BaseModel import trio @@ -40,7 +39,6 @@ _root_modules = [ ] -# @dataclass class Services(BaseModel): actor_n: tractor._trionics.ActorNursery service_n: trio.Nursery @@ -147,7 +145,7 @@ async def spawn_brokerd( global _services assert _services - portal = await _services.actor_n.start_actor( + await _services.actor_n.start_actor( dname, enable_modules=_data_mods + [brokermod.__name__], loglevel=loglevel,