diff --git a/piker/_daemon.py b/piker/_daemon.py index d4ca7f21..b9c74853 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -18,16 +18,27 @@ Structured, daemon tree service management. """ -from typing import Optional, Union, Callable, Any -from contextlib import asynccontextmanager as acm +from __future__ import annotations +import os +from typing import ( + Optional, + Callable, + Any, + ClassVar, +) +from contextlib import ( + asynccontextmanager as acm, +) from collections import defaultdict -from msgspec import Struct import tractor import trio from trio_typing import TaskStatus -from .log import get_logger, get_console_log +from .log import ( + get_logger, + get_console_log, +) from .brokers import get_brokermod @@ -42,28 +53,111 @@ _default_reg_addr: tuple[str, int] = ( _default_registry_port, ) + # NOTE: this value is set as an actor-global once the first endpoint # who is capable, spawns a `pikerd` service tree. -_registry_addr: tuple[str, int] | None = None +_registry: Registry | None = None + + +class Registry: + addr: None | tuple[str, int] = None + + # TODO: table of uids to sockaddrs + peers: dict[ + tuple[str, str], + tuple[str, int], + ] = {} + + +_tractor_kwargs: dict[str, Any] = {} + + +@acm +async def open_registry( + addr: None | tuple[str, int] = None, + ensure_exists: bool = True, + +) -> tuple[str, int]: + + global _tractor_kwargs + actor = tractor.current_actor() + uid = actor.uid + if ( + Registry.addr is not None + and addr + ): + raise RuntimeError( + f'`{uid}` registry addr already bound @ {_registry.sockaddr}' + ) + + was_set: bool = False + + if ( + not tractor.is_root_process() + and Registry.addr is None + ): + Registry.addr = actor._arb_addr + + if ( + ensure_exists + and Registry.addr is None + ): + raise RuntimeError( + f"`{uid}` registry should already exist bug doesn't?" + ) + + if ( + Registry.addr is None + ): + was_set = True + Registry.addr = addr or _default_reg_addr + + _tractor_kwargs['arbiter_addr'] = Registry.addr + + try: + yield Registry.addr + finally: + # XXX: always clear the global addr if we set it so that the + # next (set of) calls will apply whatever new one is passed + # in. + if was_set: + Registry.addr = None + + +def get_tractor_runtime_kwargs() -> dict[str, Any]: + ''' + Deliver ``tractor`` related runtime variables in a `dict`. + + ''' + return _tractor_kwargs + -_tractor_kwargs: dict[str, Any] = { - # use a different registry addr then tractor's default - 'arbiter_addr': _registry_addr -} _root_modules = [ __name__, 'piker.clearing._ems', 'piker.clearing._client', + 'piker.data._sampling', ] -class Services(Struct): +# TODO: factor this into a ``tractor.highlevel`` extension +# pack for the library. +class Services: actor_n: tractor._supervise.ActorNursery service_n: trio.Nursery debug_mode: bool # tractor sub-actor debug mode flag - service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {} + service_tasks: dict[ + str, + tuple[ + trio.CancelScope, + tractor.Portal, + trio.Event, + ] + ] = {} + locks = defaultdict(trio.Lock) + @classmethod async def start_service_task( self, name: str, @@ -82,7 +176,12 @@ class Services(Struct): ''' async def open_context_in_task( task_status: TaskStatus[ - trio.CancelScope] = trio.TASK_STATUS_IGNORED, + tuple[ + trio.CancelScope, + trio.Event, + Any, + ] + ] = trio.TASK_STATUS_IGNORED, ) -> Any: @@ -94,156 +193,173 @@ class Services(Struct): ) as (ctx, first): # unblock once the remote context has started - task_status.started((cs, first)) + complete = trio.Event() + task_status.started((cs, complete, first)) log.info( f'`pikerd` service {name} started with value {first}' ) try: # wait on any context's return value + # and any final portal result from the + # sub-actor. ctx_res = await ctx.result() - except tractor.ContextCancelled: - return await self.cancel_service(name) - else: - # wait on any error from the sub-actor - # NOTE: this will block indefinitely until - # cancelled either by error from the target - # context function or by being cancelled here by - # the surrounding cancel scope + + # NOTE: blocks indefinitely until cancelled + # either by error from the target context + # function or by being cancelled here by the + # surrounding cancel scope. return (await portal.result(), ctx_res) - cs, first = await self.service_n.start(open_context_in_task) + finally: + await portal.cancel_actor() + complete.set() + self.service_tasks.pop(name) + + cs, complete, first = await self.service_n.start(open_context_in_task) # store the cancel scope and portal for later cancellation or # retstart if needed. - self.service_tasks[name] = (cs, portal) + self.service_tasks[name] = (cs, portal, complete) return cs, first - # TODO: per service cancellation by scope, we aren't using this - # anywhere right? + @classmethod async def cancel_service( self, name: str, + ) -> Any: + ''' + Cancel the service task and actor for the given ``name``. + + ''' log.info(f'Cancelling `pikerd` service {name}') - cs, portal = self.service_tasks[name] - # XXX: not entirely sure why this is required, - # and should probably be better fine tuned in - # ``tractor``? + cs, portal, complete = self.service_tasks[name] cs.cancel() - return await portal.cancel_actor() - - -_services: Optional[Services] = None - - -@acm -async def open_pikerd( - start_method: str = 'trio', - loglevel: Optional[str] = None, - - # XXX: you should pretty much never want debug mode - # for data daemons when running in production. - debug_mode: bool = False, - registry_addr: None | tuple[str, int] = None, - -) -> Optional[tractor._portal.Portal]: - ''' - Start a root piker daemon who's lifetime extends indefinitely - until cancelled. - - A root actor nursery is created which can be used to create and keep - alive underling services (see below). - - ''' - global _services - global _registry_addr - - if ( - _registry_addr is None - or registry_addr - ): - _registry_addr = registry_addr or _default_reg_addr - - # XXX: this may open a root actor as well - async with ( - tractor.open_root_actor( - - # passed through to ``open_root_actor`` - arbiter_addr=_registry_addr, - name=_root_dname, - loglevel=loglevel, - debug_mode=debug_mode, - start_method=start_method, - - # TODO: eventually we should be able to avoid - # having the root have more then permissions to - # spawn other specialized daemons I think? - enable_modules=_root_modules, - ) as _, - - tractor.open_nursery() as actor_nursery, - ): - async with trio.open_nursery() as service_nursery: - - # # setup service mngr singleton instance - # async with AsyncExitStack() as stack: - - # assign globally for future daemon/task creation - _services = Services( - actor_n=actor_nursery, - service_n=service_nursery, - debug_mode=debug_mode, - ) - - yield _services + await complete.wait() + assert name not in self.service_tasks, \ + f'Serice task for {name} not terminated?' @acm async def open_piker_runtime( name: str, enable_modules: list[str] = [], - start_method: str = 'trio', loglevel: Optional[str] = None, + # XXX NOTE XXX: you should pretty much never want debug mode + # for data daemons when running in production. + debug_mode: bool = False, + + registry_addr: None | tuple[str, int] = None, + + # TODO: once we have `rsyscall` support we will read a config + # and spawn the service tree distributed per that. + start_method: str = 'trio', + + tractor_kwargs: dict = {}, + +) -> tuple[ + tractor.Actor, + tuple[str, int], +]: + ''' + Start a piker actor who's runtime will automatically sync with + existing piker actors on the local link based on configuration. + + Can be called from a subactor or any program that needs to start + a root actor. + + ''' + try: + # check for existing runtime + actor = tractor.current_actor().uid + + except tractor._exceptions.NoRuntime: + + registry_addr = registry_addr or _default_reg_addr + + async with ( + tractor.open_root_actor( + + # passed through to ``open_root_actor`` + arbiter_addr=registry_addr, + name=name, + loglevel=loglevel, + debug_mode=debug_mode, + start_method=start_method, + + # TODO: eventually we should be able to avoid + # having the root have more then permissions to + # spawn other specialized daemons I think? + enable_modules=enable_modules, + + **tractor_kwargs, + ) as _, + + open_registry(registry_addr, ensure_exists=False) as addr, + ): + yield ( + tractor.current_actor(), + addr, + ) + else: + async with open_registry(registry_addr) as addr: + yield ( + actor, + addr, + ) + + +@acm +async def open_pikerd( + loglevel: str | None = None, + # XXX: you should pretty much never want debug mode # for data daemons when running in production. debug_mode: bool = False, registry_addr: None | tuple[str, int] = None, -) -> tractor.Actor: +) -> Services: ''' - Start a piker actor who's runtime will automatically sync with - existing piker actors on the local link based on configuration. + Start a root piker daemon who's lifetime extends indefinitely until + cancelled. + + A root actor nursery is created which can be used to create and keep + alive underling services (see below). ''' - global _services - global _registry_addr - if ( - _registry_addr is None - or registry_addr - ): - _registry_addr = registry_addr or _default_reg_addr - - # XXX: this may open a root actor as well async with ( - tractor.open_root_actor( - - # passed through to ``open_root_actor`` - arbiter_addr=_registry_addr, - name=name, - loglevel=loglevel, - debug_mode=debug_mode, - start_method=start_method, + open_piker_runtime( + name=_root_dname, # TODO: eventually we should be able to avoid # having the root have more then permissions to # spawn other specialized daemons I think? - enable_modules=_root_modules + enable_modules, - ) as _, + enable_modules=_root_modules, + + loglevel=loglevel, + debug_mode=debug_mode, + registry_addr=registry_addr, + + ) as (root_actor, reg_addr), + tractor.open_nursery() as actor_nursery, + trio.open_nursery() as service_nursery, ): - yield tractor.current_actor() + assert root_actor.accept_addr == reg_addr + + # assign globally for future daemon/task creation + Services.actor_n = actor_nursery + Services.service_n = service_nursery + Services.debug_mode = debug_mode + try: + yield Services + finally: + # TODO: is this more clever/efficient? + # if 'samplerd' in Services.service_tasks: + # await Services.cancel_service('samplerd') + service_nursery.cancel_scope.cancel() @acm @@ -252,51 +368,67 @@ async def maybe_open_runtime( **kwargs, ) -> None: - """ + ''' Start the ``tractor`` runtime (a root actor) if none exists. - """ - settings = _tractor_kwargs - settings.update(kwargs) + ''' + name = kwargs.pop('name') if not tractor.current_actor(err_on_no_runtime=False): - async with tractor.open_root_actor( + async with open_piker_runtime( + name, loglevel=loglevel, - **settings, - ): - yield + **kwargs, + ) as (_, addr): + yield addr, else: - yield + async with open_registry() as addr: + yield addr @acm async def maybe_open_pikerd( loglevel: Optional[str] = None, registry_addr: None | tuple = None, + **kwargs, -) -> Union[tractor._portal.Portal, Services]: - """If no ``pikerd`` daemon-root-actor can be found start it and +) -> tractor._portal.Portal | ClassVar[Services]: + ''' + If no ``pikerd`` daemon-root-actor can be found start it and yield up (we should probably figure out returning a portal to self though). - """ + ''' if loglevel: get_console_log(loglevel) # subtle, we must have the runtime up here or portal lookup will fail + query_name = kwargs.pop('name', f'piker_query_{os.getpid()}') + + # TODO: if we need to make the query part faster we could not init + # an actor runtime and instead just hit the socket? + # from tractor._ipc import _connect_chan, Channel + # async with _connect_chan(host, port) as chan: + # async with open_portal(chan) as arb_portal: + # yield arb_portal + async with ( - maybe_open_runtime(loglevel, **kwargs), - tractor.find_actor(_root_dname) as portal + open_piker_runtime( + name=query_name, + registry_addr=registry_addr, + loglevel=loglevel, + **kwargs, + ) as _, + tractor.find_actor( + _root_dname, + arbiter_sockaddr=registry_addr, + ) as portal ): # connect to any existing daemon presuming # its registry socket was selected. if ( portal is not None - and ( - registry_addr is None - or portal.channel.raddr == registry_addr - ) ): yield portal return @@ -304,19 +436,21 @@ async def maybe_open_pikerd( # presume pikerd role since no daemon could be found at # configured address async with open_pikerd( - loglevel=loglevel, debug_mode=kwargs.get('debug_mode', False), registry_addr=registry_addr, - ) as _: + ) as service_manager: # in the case where we're starting up the # tractor-piker runtime stack in **this** process # we return no portal to self. - yield None + assert service_manager + yield service_manager -# brokerd enabled modules +# `brokerd` enabled modules +# NOTE: keeping this list as small as possible is part of our caps-sec +# model and should be treated with utmost care! _data_mods = [ 'piker.brokers.core', 'piker.brokers.data', @@ -326,37 +460,35 @@ _data_mods = [ ] -class Brokerd: - locks = defaultdict(trio.Lock) - - @acm async def find_service( service_name: str, -) -> Optional[tractor.Portal]: +) -> tractor.Portal | None: - log.info(f'Scanning for service `{service_name}`') - # attach to existing daemon by name if possible - async with tractor.find_actor( - service_name, - arbiter_sockaddr=_registry_addr, - ) as maybe_portal: - yield maybe_portal + async with open_registry() as reg_addr: + log.info(f'Scanning for service `{service_name}`') + # attach to existing daemon by name if possible + async with tractor.find_actor( + service_name, + arbiter_sockaddr=reg_addr, + ) as maybe_portal: + yield maybe_portal async def check_for_service( service_name: str, -) -> bool: +) -> None | tuple[str, int]: ''' Service daemon "liveness" predicate. ''' - async with tractor.query_actor( - service_name, - arbiter_sockaddr=_registry_addr, - ) as sockaddr: - return sockaddr + async with open_registry(ensure_exists=False) as reg_addr: + async with tractor.query_actor( + service_name, + arbiter_sockaddr=reg_addr, + ) as sockaddr: + return sockaddr @acm @@ -366,6 +498,8 @@ async def maybe_spawn_daemon( service_task_target: Callable, spawn_args: dict[str, Any], loglevel: Optional[str] = None, + + singleton: bool = False, **kwargs, ) -> tractor.Portal: @@ -386,7 +520,7 @@ async def maybe_spawn_daemon( # serialize access to this section to avoid # 2 or more tasks racing to create a daemon - lock = Brokerd.locks[service_name] + lock = Services.locks[service_name] await lock.acquire() async with find_service(service_name) as portal: @@ -397,6 +531,9 @@ async def maybe_spawn_daemon( log.warning(f"Couldn't find any existing {service_name}") + # TODO: really shouldn't the actor spawning be part of the service + # starting method `Services.start_service()` ? + # ask root ``pikerd`` daemon to spawn the daemon we need if # pikerd is not live we now become the root of the # process tree @@ -407,15 +544,16 @@ async def maybe_spawn_daemon( ) as pikerd_portal: + # we are the root and thus are `pikerd` + # so spawn the target service directly by calling + # the provided target routine. + # XXX: this assumes that the target is well formed and will + # do the right things to setup both a sub-actor **and** call + # the ``_Services`` api from above to start the top level + # service task for that actor. + started: bool if pikerd_portal is None: - # we are the root and thus are `pikerd` - # so spawn the target service directly by calling - # the provided target routine. - # XXX: this assumes that the target is well formed and will - # do the right things to setup both a sub-actor **and** call - # the ``_Services`` api from above to start the top level - # service task for that actor. - await service_task_target(**spawn_args) + started = await service_task_target(**spawn_args) else: # tell the remote `pikerd` to start the target, @@ -424,11 +562,14 @@ async def maybe_spawn_daemon( # non-blocking and the target task will persist running # on `pikerd` after the client requesting it's start # disconnects. - await pikerd_portal.run( + started = await pikerd_portal.run( service_task_target, **spawn_args, ) + if started: + log.info(f'Service {service_name} started!') + async with tractor.wait_for_actor(service_name) as portal: lock.release() yield portal @@ -451,9 +592,6 @@ async def spawn_brokerd( extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) tractor_kwargs.update(extra_tractor_kwargs) - global _services - assert _services - # ask `pikerd` to spawn a new sub-actor and manage it under its # actor nursery modpath = brokermod.__name__ @@ -466,18 +604,18 @@ async def spawn_brokerd( subpath = f'{modpath}.{submodname}' broker_enable.append(subpath) - portal = await _services.actor_n.start_actor( + portal = await Services.actor_n.start_actor( dname, enable_modules=_data_mods + broker_enable, loglevel=loglevel, - debug_mode=_services.debug_mode, + debug_mode=Services.debug_mode, **tractor_kwargs ) # non-blocking setup of brokerd service nursery from .data import _setup_persistent_brokerd - await _services.start_service_task( + await Services.start_service_task( dname, portal, _setup_persistent_brokerd, @@ -523,24 +661,21 @@ async def spawn_emsd( """ log.info('Spawning emsd') - global _services - assert _services - - portal = await _services.actor_n.start_actor( + portal = await Services.actor_n.start_actor( 'emsd', enable_modules=[ 'piker.clearing._ems', 'piker.clearing._client', ], loglevel=loglevel, - debug_mode=_services.debug_mode, # set by pikerd flag + debug_mode=Services.debug_mode, # set by pikerd flag **extra_tractor_kwargs ) # non-blocking setup of clearing service from .clearing._ems import _setup_persistent_emsd - await _services.start_service_task( + await Services.start_service_task( 'emsd', portal, _setup_persistent_emsd, @@ -567,25 +702,3 @@ async def maybe_open_emsd( ) as portal: yield portal - - -# TODO: ideally we can start the tsdb "on demand" but it's -# probably going to require "rootless" docker, at least if we don't -# want to expect the user to start ``pikerd`` with root perms all the -# time. -# async def maybe_open_marketstored( -# loglevel: Optional[str] = None, -# **kwargs, - -# ) -> tractor._portal.Portal: # noqa - -# async with maybe_spawn_daemon( - -# 'marketstored', -# service_task_target=spawn_emsd, -# spawn_args={'loglevel': loglevel}, -# loglevel=loglevel, -# **kwargs, - -# ) as portal: -# yield portal diff --git a/piker/clearing/__init__.py b/piker/clearing/__init__.py index c4fc2647..06a9212e 100644 --- a/piker/clearing/__init__.py +++ b/piker/clearing/__init__.py @@ -18,3 +18,9 @@ Market machinery for order executions, book, management. """ +from ._client import open_ems + + +__all__ = [ + 'open_ems', +] diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 03fb62d3..f3b26cbe 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -18,8 +18,10 @@ Orders and execution client API. """ +from __future__ import annotations from contextlib import asynccontextmanager as acm from pprint import pformat +from typing import TYPE_CHECKING import trio import tractor @@ -27,11 +29,16 @@ from tractor.trionics import broadcast_receiver from ..log import get_logger from ..data.types import Struct -from ._ems import _emsd_main from .._daemon import maybe_open_emsd from ._messages import Order, Cancel from ..brokers import get_brokermod +if TYPE_CHECKING: + from ._messages import ( + BrokerdPosition, + Status, + ) + log = get_logger(__name__) @@ -167,12 +174,19 @@ async def relay_order_cmds_from_sync_code( @acm async def open_ems( fqsn: str, + mode: str = 'live', -) -> ( +) -> tuple[ OrderBook, tractor.MsgStream, - dict, -): + dict[ + # brokername, acctid + tuple[str, str], + list[BrokerdPosition], + ], + list[str], + dict[str, Status], +]: ''' Spawn an EMS daemon and begin sending orders and receiving alerts. @@ -213,14 +227,16 @@ async def open_ems( from ..data._source import unpack_fqsn broker, symbol, suffix = unpack_fqsn(fqsn) - mode: str = 'live' - async with maybe_open_emsd(broker) as portal: mod = get_brokermod(broker) - if not getattr(mod, 'trades_dialogue', None): + if ( + not getattr(mod, 'trades_dialogue', None) + or mode == 'paper' + ): mode = 'paper' + from ._ems import _emsd_main async with ( # connect to emsd portal.open_context( diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index ad512c08..ba33e584 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -172,6 +172,7 @@ async def clear_dark_triggers( # TODO: # - numba all this! # - this stream may eventually contain multiple symbols + quote_stream._raise_on_lag = False async for quotes in quote_stream: # start = time.time() for sym, quote in quotes.items(): @@ -417,7 +418,7 @@ class Router(Struct): # load the paper trading engine exec_mode = 'paper' - log.warning(f'Entering paper trading mode for {broker}') + log.info(f'{broker}: Entering `paper` trading mode') # load the paper trading engine as a subactor of this emsd # actor to simulate the real IPC load it'll have when also @@ -866,7 +867,7 @@ async def translate_and_relay_brokerd_events( elif status == 'canceled': log.cancel(f'Cancellation for {oid} is complete!') - status_msg = book._active.pop(oid) + status_msg = book._active.pop(oid, None) else: # open # relayed from backend but probably not handled so @@ -1366,7 +1367,15 @@ async def _emsd_main( exec_mode: str, # ('paper', 'live') loglevel: str = 'info', -) -> None: +) -> tuple[ + dict[ + # brokername, acctid + tuple[str, str], + list[BrokerdPosition], + ], + list[str], + dict[str, Status], +]: ''' EMS (sub)actor entrypoint providing the execution management (micro)service which conducts broker order clearing control on diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 67647a83..07484634 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -28,7 +28,6 @@ import tractor from ..log import get_console_log, get_logger, colorize_json from ..brokers import get_brokermod from .._daemon import ( - _tractor_kwargs, _default_registry_host, _default_registry_port, ) @@ -176,20 +175,30 @@ def cli( @cli.command() @click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.argument('names', nargs=-1, required=False) +@click.argument('ports', nargs=-1, required=False) @click.pass_obj -def services(config, tl, names): +def services(config, tl, ports): - from .._daemon import open_piker_runtime + from .._daemon import ( + open_piker_runtime, + _default_registry_port, + _default_registry_host, + ) + + host = _default_registry_host + if not ports: + ports = [_default_registry_port] async def list_services(): + nonlocal host async with ( open_piker_runtime( name='service_query', loglevel=config['loglevel'] if tl else None, ), tractor.get_arbiter( - *_tractor_kwargs['arbiter_addr'] + host=host, + port=ports[0] ) as portal ): registry = await portal.run_from_ns('self', 'get_registry') diff --git a/piker/data/__init__.py b/piker/data/__init__.py index e98195b4..5c83150e 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -22,6 +22,12 @@ and storing data from your brokers as well as sharing live streams over a network. """ +import tractor +import trio + +from ..log import ( + get_console_log, +) from ._normalize import iterticks from ._sharedmem import ( maybe_open_shm_array, @@ -32,7 +38,6 @@ from ._sharedmem import ( ) from .feed import ( open_feed, - _setup_persistent_brokerd, ) @@ -44,5 +49,40 @@ __all__ = [ 'attach_shm_array', 'open_shm_array', 'get_shm_token', - '_setup_persistent_brokerd', ] + + +@tractor.context +async def _setup_persistent_brokerd( + ctx: tractor.Context, + brokername: str, + +) -> None: + ''' + Allocate a actor-wide service nursery in ``brokerd`` + such that feeds can be run in the background persistently by + the broker backend as needed. + + ''' + get_console_log(tractor.current_actor().loglevel) + + from .feed import ( + _bus, + get_feed_bus, + ) + global _bus + assert not _bus + + async with trio.open_nursery() as service_nursery: + # assign a nursery to the feeds bus for spawning + # background tasks from clients + get_feed_bus(brokername, service_nursery) + + # unblock caller + await ctx.started() + + # we pin this task to keep the feeds manager active until the + # parent actor decides to tear it down + await trio.sleep_forever() + + diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index f8230bd7..a5df96cc 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -20,174 +20,255 @@ financial data flows. """ from __future__ import annotations -from collections import Counter +from collections import ( + Counter, + defaultdict, +) +from contextlib import asynccontextmanager as acm import time from typing import ( + AsyncIterator, TYPE_CHECKING, ) import tractor +from tractor.trionics import ( + maybe_open_nursery, +) import trio from trio_typing import TaskStatus -from ..log import get_logger +from ..log import ( + get_logger, + get_console_log, +) +from .._daemon import maybe_spawn_daemon if TYPE_CHECKING: - from ._sharedmem import ShmArray + from ._sharedmem import ( + ShmArray, + ) from .feed import _FeedsBus log = get_logger(__name__) +# highest frequency sample step is 1 second by default, though in +# the future we may want to support shorter periods or a dynamic style +# tick-event stream. _default_delay_s: float = 1.0 -class sampler: +class Sampler: ''' Global sampling engine registry. Manages state for sampling events, shm incrementing and sample period logic. + This non-instantiated type is meant to be a singleton within + a `samplerd` actor-service spawned once by the user wishing to + time-step sample real-time quote feeds, see + ``._daemon.maybe_open_samplerd()`` and the below + ``register_with_sampler()``. + ''' + service_nursery: None | trio.Nursery = None + # TODO: we could stick these in a composed type to avoid # angering the "i hate module scoped variables crowd" (yawn). - ohlcv_shms: dict[int, list[ShmArray]] = {} + ohlcv_shms: dict[float, list[ShmArray]] = {} # holds one-task-per-sample-period tasks which are spawned as-needed by # data feed requests with a given detected time step usually from # history loading. - incrementers: dict[int, trio.CancelScope] = {} + incr_task_cs: trio.CancelScope | None = None # holds all the ``tractor.Context`` remote subscriptions for # a particular sample period increment event: all subscribers are # notified on a step. - subscribers: dict[int, tractor.Context] = {} + # subscribers: dict[int, list[tractor.MsgStream]] = {} + subscribers: defaultdict[ + float, + list[ + float, + set[tractor.MsgStream] + ], + ] = defaultdict( + lambda: [ + round(time.time()), + set(), + ] + ) + @classmethod + async def increment_ohlc_buffer( + self, + period_s: float, + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ): + ''' + Task which inserts new bars into the provide shared memory array + every ``period_s`` seconds. -async def increment_ohlc_buffer( - delay_s: int, - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, -): - ''' - Task which inserts new bars into the provide shared memory array - every ``delay_s`` seconds. + This task fulfills 2 purposes: + - it takes the subscribed set of shm arrays and increments them + on a common time period + - broadcast of this increment "signal" message to other actor + subscribers - This task fulfills 2 purposes: - - it takes the subscribed set of shm arrays and increments them - on a common time period - - broadcast of this increment "signal" message to other actor - subscribers + Note that if **no** actor has initiated this task then **none** of + the underlying buffers will actually be incremented. - Note that if **no** actor has initiated this task then **none** of - the underlying buffers will actually be incremented. + ''' + # TODO: right now we'll spin printing bars if the last time stamp is + # before a large period of no market activity. Likely the best way + # to solve this is to make this task aware of the instrument's + # tradable hours? - ''' - # # wait for brokerd to signal we should start sampling - # await shm_incrementing(shm_token['shm_name']).wait() + total_s: float = 0 # total seconds counted + ad = period_s - 0.001 # compensate for trio processing time - # TODO: right now we'll spin printing bars if the last time stamp is - # before a large period of no market activity. Likely the best way - # to solve this is to make this task aware of the instrument's - # tradable hours? + with trio.CancelScope() as cs: + # register this time period step as active + task_status.started(cs) - # adjust delay to compensate for trio processing time - ad = min(sampler.ohlcv_shms.keys()) - 0.001 + # sample step loop: + # includes broadcasting to all connected consumers on every + # new sample step as well incrementing any registered + # buffers by registered sample period. + while True: + await trio.sleep(ad) + total_s += period_s - total_s = 0 # total seconds counted - lowest = min(sampler.ohlcv_shms.keys()) - lowest_shm = sampler.ohlcv_shms[lowest][0] - ad = lowest - 0.001 + # increment all subscribed shm arrays + # TODO: + # - this in ``numba`` + # - just lookup shms for this step instead of iterating? - with trio.CancelScope() as cs: + i_epoch = round(time.time()) + broadcasted: set[float] = set() - # register this time period step as active - sampler.incrementers[delay_s] = cs - task_status.started(cs) + # print(f'epoch: {i_epoch} -> REGISTRY {self.ohlcv_shms}') + for shm_period_s, shms in self.ohlcv_shms.items(): - while True: - # TODO: do we want to support dynamically - # adding a "lower" lowest increment period? - await trio.sleep(ad) - total_s += delay_s + # short-circuit on any not-ready because slower sample + # rate consuming shm buffers. + if total_s % shm_period_s != 0: + # print(f'skipping `{shm_period_s}s` sample update') + continue - # increment all subscribed shm arrays - # TODO: - # - this in ``numba`` - # - just lookup shms for this step instead of iterating? - for this_delay_s, shms in sampler.ohlcv_shms.items(): + # update last epoch stamp for this period group + if shm_period_s not in broadcasted: + sub_pair = self.subscribers[shm_period_s] + sub_pair[0] = i_epoch + broadcasted.add(shm_period_s) - # short-circuit on any not-ready because slower sample - # rate consuming shm buffers. - if total_s % this_delay_s != 0: - # print(f'skipping `{this_delay_s}s` sample update') - continue + # TODO: ``numba`` this! + for shm in shms: + # print(f'UPDATE {shm_period_s}s STEP for {shm.token}') - # TODO: ``numba`` this! - for shm in shms: - # TODO: in theory we could make this faster by copying the - # "last" readable value into the underlying larger buffer's - # next value and then incrementing the counter instead of - # using ``.push()``? + # append new entry to buffer thus "incrementing" + # the bar + array = shm.array + last = array[-1:][shm._write_fields].copy() - # append new entry to buffer thus "incrementing" the bar - array = shm.array - last = array[-1:][shm._write_fields].copy() - # (index, t, close) = last[0][['index', 'time', 'close']] - (t, close) = last[0][['time', 'close']] + # guard against startup backfilling races where + # the buffer has not yet been filled. + if not last.size: + continue - # this copies non-std fields (eg. vwap) from the last datum - last[ - ['time', 'volume', 'open', 'high', 'low', 'close'] - ][0] = (t + this_delay_s, 0, close, close, close, close) + (t, close) = last[0][[ + 'time', + 'close', + ]] - # write to the buffer - shm.push(last) + next_t = t + shm_period_s - await broadcast(delay_s, shm=lowest_shm) + if shm_period_s <= 1: + next_t = i_epoch + # this copies non-std fields (eg. vwap) from the + # last datum + last[[ + 'time', -async def broadcast( - delay_s: int, - shm: ShmArray | None = None, + 'open', + 'high', + 'low', + 'close', -) -> None: - ''' - Broadcast the given ``shm: ShmArray``'s buffer index step to any - subscribers for a given sample period. + 'volume', + ]][0] = ( + # epoch timestamp + next_t, - The sent msg will include the first and last index which slice into - the buffer's non-empty data. + # OHLC + close, + close, + close, + close, - ''' - subs = sampler.subscribers.get(delay_s, ()) - first = last = -1 + 0, # vlm + ) - if shm is None: - periods = sampler.ohlcv_shms.keys() - # if this is an update triggered by a history update there - # might not actually be any sampling bus setup since there's - # no "live feed" active yet. - if periods: - lowest = min(periods) - shm = sampler.ohlcv_shms[lowest][0] - first = shm._first.value - last = shm._last.value + # TODO: in theory we could make this faster by + # copying the "last" readable value into the + # underlying larger buffer's next value and then + # incrementing the counter instead of using + # ``.push()``? - for stream in subs: - try: - await stream.send({ - 'first': first, - 'last': last, - 'index': last, - }) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - log.error( - f'{stream._ctx.chan.uid} dropped connection' - ) + # write to the buffer + shm.push(last) + + # broadcast increment msg to all updated subs per period + for shm_period_s in broadcasted: + await self.broadcast( + period_s=shm_period_s, + time_stamp=i_epoch, + ) + + @classmethod + async def broadcast( + self, + period_s: float, + time_stamp: float | None = None, + + ) -> None: + ''' + Broadcast the period size and last index step value to all + subscribers for a given sample period. + + ''' + pair = self.subscribers[period_s] + + last_ts, subs = pair + + task = trio.lowlevel.current_task() + log.debug( + f'SUBS {self.subscribers}\n' + f'PAIR {pair}\n' + f'TASK: {task}: {id(task)}\n' + f'broadcasting {period_s} -> {last_ts}\n' + # f'consumers: {subs}' + ) + borked: set[tractor.MsgStream] = set() + for stream in subs: + try: + await stream.send({ + 'index': time_stamp or last_ts, + 'period': period_s, + }) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + log.error( + f'{stream._ctx.chan.uid} dropped connection' + ) + borked.add(stream) + + for stream in borked: try: subs.remove(stream) except ValueError: @@ -195,35 +276,227 @@ async def broadcast( f'{stream._ctx.chan.uid} sub already removed!?' ) + @classmethod + async def broadcast_all(self) -> None: + for period_s in self.subscribers: + await self.broadcast(period_s) + @tractor.context -async def iter_ohlc_periods( +async def register_with_sampler( ctx: tractor.Context, - delay_s: int, + period_s: float, + shms_by_period: dict[float, dict] | None = None, + + open_index_stream: bool = True, # open a 2way stream for sample step msgs? + sub_for_broadcasts: bool = True, # sampler side to send step updates? ) -> None: - ''' - Subscribe to OHLC sampling "step" events: when the time - aggregation period increments, this event stream emits an index - event. - ''' - # add our subscription - subs = sampler.subscribers.setdefault(delay_s, []) - await ctx.started() - async with ctx.open_stream() as stream: - subs.append(stream) + get_console_log(tractor.current_actor().loglevel) + incr_was_started: bool = False - try: - # stream and block until cancelled - await trio.sleep_forever() - finally: - try: - subs.remove(stream) - except ValueError: - log.error( - f'iOHLC step stream was already dropped {ctx.chan.uid}?' + try: + async with maybe_open_nursery( + Sampler.service_nursery + ) as service_nursery: + + # init startup, create (actor-)local service nursery and start + # increment task + Sampler.service_nursery = service_nursery + + # always ensure a period subs entry exists + last_ts, subs = Sampler.subscribers[float(period_s)] + + async with trio.Lock(): + if Sampler.incr_task_cs is None: + Sampler.incr_task_cs = await service_nursery.start( + Sampler.increment_ohlc_buffer, + 1., + ) + incr_was_started = True + + # insert the base 1s period (for OHLC style sampling) into + # the increment buffer set to update and shift every second. + if shms_by_period is not None: + from ._sharedmem import ( + attach_shm_array, + _Token, ) + for period in shms_by_period: + + # load and register shm handles + shm_token_msg = shms_by_period[period] + shm = attach_shm_array( + _Token.from_msg(shm_token_msg), + readonly=False, + ) + shms_by_period[period] = shm + Sampler.ohlcv_shms.setdefault(period, []).append(shm) + + assert Sampler.ohlcv_shms + + # unblock caller + await ctx.started(set(Sampler.ohlcv_shms.keys())) + + if open_index_stream: + try: + async with ctx.open_stream() as stream: + if sub_for_broadcasts: + subs.add(stream) + + # except broadcast requests from the subscriber + async for msg in stream: + if msg == 'broadcast_all': + await Sampler.broadcast_all() + finally: + if sub_for_broadcasts: + subs.remove(stream) + else: + # if no shms are passed in we just wait until cancelled + # by caller. + await trio.sleep_forever() + + finally: + # TODO: why tf isn't this working? + if shms_by_period is not None: + for period, shm in shms_by_period.items(): + Sampler.ohlcv_shms[period].remove(shm) + + if incr_was_started: + Sampler.incr_task_cs.cancel() + Sampler.incr_task_cs = None + + +async def spawn_samplerd( + + loglevel: str | None = None, + **extra_tractor_kwargs + +) -> bool: + ''' + Daemon-side service task: start a sampling daemon for common step + update and increment count write and stream broadcasting. + + ''' + from piker._daemon import Services + + dname = 'samplerd' + log.info(f'Spawning `{dname}`') + + # singleton lock creation of ``samplerd`` since we only ever want + # one daemon per ``pikerd`` proc tree. + # TODO: make this built-into the service api? + async with Services.locks[dname + '_singleton']: + + if dname not in Services.service_tasks: + + portal = await Services.actor_n.start_actor( + dname, + enable_modules=[ + 'piker.data._sampling', + ], + loglevel=loglevel, + debug_mode=Services.debug_mode, # set by pikerd flag + **extra_tractor_kwargs + ) + + await Services.start_service_task( + dname, + portal, + register_with_sampler, + period_s=1, + sub_for_broadcasts=False, + ) + return True + + return False + + +@acm +async def maybe_open_samplerd( + + loglevel: str | None = None, + **kwargs, + +) -> tractor._portal.Portal: # noqa + ''' + Client-side helper to maybe startup the ``samplerd`` service + under the ``pikerd`` tree. + + ''' + dname = 'samplerd' + + async with maybe_spawn_daemon( + dname, + service_task_target=spawn_samplerd, + spawn_args={'loglevel': loglevel}, + loglevel=loglevel, + **kwargs, + + ) as portal: + yield portal + + +@acm +async def open_sample_stream( + period_s: float, + shms_by_period: dict[float, dict] | None = None, + open_index_stream: bool = True, + sub_for_broadcasts: bool = True, + + cache_key: str | None = None, + allow_new_sampler: bool = True, + +) -> AsyncIterator[dict[str, float]]: + ''' + Subscribe to OHLC sampling "step" events: when the time aggregation + period increments, this event stream emits an index event. + + This is a client-side endpoint that does all the work of ensuring + the `samplerd` actor is up and that mult-consumer-tasks are given + a broadcast stream when possible. + + ''' + # TODO: wrap this manager with the following to make it cached + # per client-multitasks entry. + # maybe_open_context( + # acm_func=partial( + # portal.open_context, + # register_with_sampler, + # ), + # key=cache_key or period_s, + # ) + # if cache_hit: + # # add a new broadcast subscription for the quote stream + # # if this feed is likely already in use + # async with istream.subscribe() as bistream: + # yield bistream + # else: + + async with ( + # XXX: this should be singleton on a host, + # a lone broker-daemon per provider should be + # created for all practical purposes + maybe_open_samplerd() as portal, + + portal.open_context( + register_with_sampler, + **{ + 'period_s': period_s, + 'shms_by_period': shms_by_period, + 'open_index_stream': open_index_stream, + 'sub_for_broadcasts': sub_for_broadcasts, + }, + ) as (ctx, first) + ): + async with ( + ctx.open_stream() as istream, + + # TODO: we don't need this task-bcasting right? + # istream.subscribe() as istream, + ): + yield istream async def sample_and_broadcast( @@ -236,7 +509,14 @@ async def sample_and_broadcast( sum_tick_vlm: bool = True, ) -> None: + ''' + `brokerd`-side task which writes latest datum sampled data. + This task is meant to run in the same actor (mem space) as the + `brokerd` real-time quote feed which is being sampled to + a ``ShmArray`` buffer. + + ''' log.info("Started shared mem bar writer") overruns = Counter() @@ -273,7 +553,6 @@ async def sample_and_broadcast( for shm in [rt_shm, hist_shm]: # update last entry # benchmarked in the 4-5 us range - # for shm in [rt_shm, hist_shm]: o, high, low, v = shm.array[-1][ ['open', 'high', 'low', 'volume'] ] @@ -383,6 +662,7 @@ async def sample_and_broadcast( trio.ClosedResourceError, trio.EndOfChannel, ): + ctx = stream._ctx chan = ctx.chan if ctx: log.warning( @@ -404,10 +684,63 @@ async def sample_and_broadcast( ) +# a working tick-type-classes template +_tick_groups = { + 'clears': {'trade', 'dark_trade', 'last'}, + 'bids': {'bid', 'bsize'}, + 'asks': {'ask', 'asize'}, +} + + +def frame_ticks( + first_quote: dict, + last_quote: dict, + ticks_by_type: dict, +) -> None: + # append quotes since last iteration into the last quote's + # tick array/buffer. + ticks = last_quote.get('ticks') + + # TODO: once we decide to get fancy really we should + # have a shared mem tick buffer that is just + # continually filled and the UI just ready from it + # at it's display rate. + if ticks: + # TODO: do we need this any more or can we just + # expect the receiver to unwind the below + # `ticks_by_type: dict`? + # => undwinding would potentially require a + # `dict[str, set | list]` instead with an + # included `'types' field which is an (ordered) + # set of tick type fields in the order which + # types arrived? + first_quote['ticks'].extend(ticks) + + # XXX: build a tick-by-type table of lists + # of tick messages. This allows for less + # iteration on the receiver side by allowing for + # a single "latest tick event" look up by + # indexing the last entry in each sub-list. + # tbt = { + # 'types': ['bid', 'asize', 'last', .. ''], + + # 'bid': [tick0, tick1, tick2, .., tickn], + # 'asize': [tick0, tick1, tick2, .., tickn], + # 'last': [tick0, tick1, tick2, .., tickn], + # ... + # '': [tick0, tick1, tick2, .., tickn], + # } + + # append in reverse FIFO order for in-order iteration on + # receiver side. + for tick in ticks: + ttype = tick['type'] + ticks_by_type[ttype].append(tick) + + # TODO: a less naive throttler, here's some snippets: # token bucket by njs: # https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 - async def uniform_rate_send( rate: float, @@ -418,6 +751,9 @@ async def uniform_rate_send( ) -> None: + # try not to error-out on overruns of the subscribed (chart) client + stream._ctx._backpressure = True + # TODO: compute the approx overhead latency per cycle left_to_sleep = throttle_period = 1/rate - 0.000616 @@ -427,6 +763,12 @@ async def uniform_rate_send( diff = 0 task_status.started() + ticks_by_type: defaultdict[ + str, + list[dict], + ] = defaultdict(list) + + clear_types = _tick_groups['clears'] while True: @@ -445,34 +787,17 @@ async def uniform_rate_send( if not first_quote: first_quote = last_quote + # first_quote['tbt'] = ticks_by_type if (throttle_period - diff) > 0: # received a quote but the send cycle period hasn't yet # expired we aren't supposed to send yet so append # to the tick frame. - - # append quotes since last iteration into the last quote's - # tick array/buffer. - ticks = last_quote.get('ticks') - - # XXX: idea for frame type data structure we could - # use on the wire instead of a simple list? - # frames = { - # 'index': ['type_a', 'type_c', 'type_n', 'type_n'], - - # 'type_a': [tick0, tick1, tick2, .., tickn], - # 'type_b': [tick0, tick1, tick2, .., tickn], - # 'type_c': [tick0, tick1, tick2, .., tickn], - # ... - # 'type_n': [tick0, tick1, tick2, .., tickn], - # } - - # TODO: once we decide to get fancy really we should - # have a shared mem tick buffer that is just - # continually filled and the UI just ready from it - # at it's display rate. - if ticks: - first_quote['ticks'].extend(ticks) + frame_ticks( + first_quote, + last_quote, + ticks_by_type, + ) # send cycle isn't due yet so continue waiting continue @@ -489,12 +814,35 @@ async def uniform_rate_send( # received quote ASAP. sym, first_quote = await quote_stream.receive() + frame_ticks( + first_quote, + first_quote, + ticks_by_type, + ) + # we have a quote already so send it now. + with trio.move_on_after(throttle_period) as cs: + while ( + not set(ticks_by_type).intersection(clear_types) + ): + try: + sym, last_quote = await quote_stream.receive() + except trio.EndOfChannel: + log.exception(f"feed for {stream} ended?") + break + + frame_ticks( + first_quote, + last_quote, + ticks_by_type, + ) + # measured_rate = 1 / (time.time() - last_send) # log.info( # f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}' # ) + first_quote['tbt'] = ticks_by_type # TODO: now if only we could sync this to the display # rate timing exactly lul @@ -520,3 +868,4 @@ async def uniform_rate_send( first_quote = last_quote = None diff = 0 last_send = time.time() + ticks_by_type.clear() diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 1577a678..2dd7f4af 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -92,7 +92,7 @@ class NoBsWs: while True: try: await self._stack.aclose() - except (DisconnectionTimeout, RuntimeError): + except self.recon_errors: await trio.sleep(0.5) else: break diff --git a/piker/data/feed.py b/piker/data/feed.py index 93630a13..534aebc9 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -21,14 +21,17 @@ This module is enabled for ``brokerd`` daemons. """ from __future__ import annotations -from collections import defaultdict +from collections import ( + defaultdict, + Counter, +) from contextlib import asynccontextmanager as acm from datetime import datetime from functools import partial +import time from types import ModuleType from typing import ( Any, - AsyncIterator, AsyncContextManager, Callable, Optional, @@ -51,16 +54,18 @@ import numpy as np from ..brokers import get_brokermod from ..calc import humanize -from ..log import get_logger, get_console_log +from ..log import ( + get_logger, + get_console_log, +) from .._daemon import ( maybe_spawn_brokerd, check_for_service, ) +from .flows import Flume from ._sharedmem import ( maybe_open_shm_array, - attach_shm_array, ShmArray, - _Token, _secs_in_day, ) from .ingest import get_ingestormod @@ -72,13 +77,9 @@ from ._source import ( ) from ..ui import _search from ._sampling import ( - sampler, - broadcast, - increment_ohlc_buffer, - iter_ohlc_periods, + open_sample_stream, sample_and_broadcast, uniform_rate_send, - _default_delay_s, ) from ..brokers._util import ( DataUnavailable, @@ -128,7 +129,7 @@ class _FeedsBus(Struct): target: Awaitable, *args, - ) -> None: + ) -> trio.CancelScope: async def start_with_cs( task_status: TaskStatus[ @@ -226,36 +227,6 @@ def get_feed_bus( return _bus -@tractor.context -async def _setup_persistent_brokerd( - ctx: tractor.Context, - brokername: str, - -) -> None: - ''' - Allocate a actor-wide service nursery in ``brokerd`` - such that feeds can be run in the background persistently by - the broker backend as needed. - - ''' - get_console_log(tractor.current_actor().loglevel) - - global _bus - assert not _bus - - async with trio.open_nursery() as service_nursery: - # assign a nursery to the feeds bus for spawning - # background tasks from clients - get_feed_bus(brokername, service_nursery) - - # unblock caller - await ctx.started() - - # we pin this task to keep the feeds manager active until the - # parent actor decides to tear it down - await trio.sleep_forever() - - def diff_history( array: np.ndarray, timeframe: int, @@ -278,6 +249,7 @@ async def start_backfill( bfqsn: str, shm: ShmArray, timeframe: float, + sampler_stream: tractor.MsgStream, last_tsdb_dt: Optional[datetime] = None, storage: Optional[Storage] = None, @@ -309,6 +281,25 @@ async def start_backfill( - pendulum.from_timestamp(times[-2]) ).seconds + if step_size_s == 60: + inow = round(time.time()) + diff = inow - times[-1] + if abs(diff) > 60: + surr = array[-6:] + diff_in_mins = round(diff/60., ndigits=2) + log.warning( + f'STEP ERROR `{bfqsn}` for period {step_size_s}s:\n' + f'Off by `{diff}` seconds (or `{diff_in_mins}` mins)\n' + 'Surrounding 6 time stamps:\n' + f'{list(surr["time"])}\n' + 'Here is surrounding 6 samples:\n' + f'{surr}\nn' + ) + + # uncomment this for a hacker who wants to investigate + # this case manually.. + # await tractor.breakpoint() + # frame's worth of sample-period-steps, in seconds frame_size_s = len(array) * step_size_s @@ -326,8 +317,7 @@ async def start_backfill( # TODO: *** THIS IS A BUG *** # we need to only broadcast to subscribers for this fqsn.. # otherwise all fsps get reset on every chart.. - for delay_s in sampler.subscribers: - await broadcast(delay_s) + await sampler_stream.send('broadcast_all') # signal that backfilling to tsdb's end datum is complete bf_done = trio.Event() @@ -376,8 +366,9 @@ async def start_backfill( # erlangs = config.get('erlangs', 1) # avoid duplicate history frames with a set of datetime frame - # starts. - starts: set[datetime] = set() + # starts and associated counts of how many duplicates we see + # per time stamp. + starts: Counter[datetime] = Counter() # inline sequential loop where we simply pass the # last retrieved start dt to the next request as @@ -405,14 +396,26 @@ async def start_backfill( # request loop until the condition is resolved? return - if next_start_dt in starts: + if ( + next_start_dt in starts + and starts[next_start_dt] <= 6 + ): start_dt = min(starts) - print("SKIPPING DUPLICATE FRAME @ {next_start_dt}") + log.warning( + f"{bfqsn}: skipping duplicate frame @ {next_start_dt}" + ) + starts[start_dt] += 1 continue + elif starts[next_start_dt] > 6: + log.warning( + f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?' + ) + return + # only update new start point if not-yet-seen start_dt = next_start_dt - starts.add(start_dt) + starts[start_dt] += 1 assert array['time'][0] == start_dt.timestamp() @@ -484,8 +487,7 @@ async def start_backfill( # in the block above to avoid entering new ``frames`` # values while we're pipelining the current ones to # memory... - for delay_s in sampler.subscribers: - await broadcast(delay_s) + await sampler_stream.send('broadcast_all') # short-circuit (for now) bf_done.set() @@ -496,6 +498,7 @@ async def basic_backfill( mod: ModuleType, bfqsn: str, shms: dict[int, ShmArray], + sampler_stream: tractor.MsgStream, ) -> None: @@ -513,7 +516,8 @@ async def basic_backfill( mod, bfqsn, shm, - timeframe=timeframe, + timeframe, + sampler_stream, ) ) except DataUnavailable: @@ -529,6 +533,7 @@ async def tsdb_backfill( fqsn: str, bfqsn: str, shms: dict[int, ShmArray], + sampler_stream: tractor.MsgStream, task_status: TaskStatus[ tuple[ShmArray, ShmArray] @@ -561,7 +566,8 @@ async def tsdb_backfill( mod, bfqsn, shm, - timeframe=timeframe, + timeframe, + sampler_stream, last_tsdb_dt=last_tsdb_dt, tsdb_is_up=True, storage=storage, @@ -599,10 +605,7 @@ async def tsdb_backfill( # unblock the feed bus management task # assert len(shms[1].array) - task_status.started(( - shms[60], - shms[1], - )) + task_status.started() async def back_load_from_tsdb( timeframe: int, @@ -658,10 +661,10 @@ async def tsdb_backfill( # Load TSDB history into shm buffer (for display) if there is # remaining buffer space. + if ( len(tsdb_history) ): - # load the first (smaller) bit of history originally loaded # above from ``Storage.load()``. to_push = tsdb_history[-prepend_start:] @@ -678,26 +681,27 @@ async def tsdb_backfill( tsdb_last_frame_start = tsdb_history['Epoch'][0] + if timeframe == 1: + times = shm.array['time'] + assert (times[1] - times[0]) == 1 + # load as much from storage into shm possible (depends on # user's shm size settings). - while ( - shm._first.value > 0 - ): + while shm._first.value > 0: tsdb_history = await storage.read_ohlcv( fqsn, - end=tsdb_last_frame_start, timeframe=timeframe, + end=tsdb_last_frame_start, ) + # empty query + if not len(tsdb_history): + break + next_start = tsdb_history['Epoch'][0] - if ( - not len(tsdb_history) # empty query - + if next_start >= tsdb_last_frame_start: # no earlier data detected - or next_start >= tsdb_last_frame_start - - ): break else: tsdb_last_frame_start = next_start @@ -725,8 +729,7 @@ async def tsdb_backfill( # (usually a chart showing graphics for said fsp) # which tells the chart to conduct a manual full # graphics loop cycle. - for delay_s in sampler.subscribers: - await broadcast(delay_s) + await sampler_stream.send('broadcast_all') # TODO: write new data to tsdb to be ready to for next read. @@ -770,11 +773,14 @@ async def manage_history( # from tractor._state import _runtime_vars # port = _runtime_vars['_root_mailbox'][1] + uid = tractor.current_actor().uid + suffix = '.'.join(uid) + # (maybe) allocate shm array for this broker/symbol which will # be used for fast near-term history capture and processing. hist_shm, opened = maybe_open_shm_array( # key=f'{fqsn}_hist_p{port}', - key=f'{fqsn}_hist', + key=f'{fqsn}_hist.{suffix}', # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), @@ -792,7 +798,7 @@ async def manage_history( rt_shm, opened = maybe_open_shm_array( # key=f'{fqsn}_rt_p{port}', - key=f'{fqsn}_rt', + key=f'{fqsn}_rt.{suffix}', # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), @@ -815,224 +821,99 @@ async def manage_history( "Persistent shm for sym was already open?!" ) - log.info('Scanning for existing `marketstored`') - tsdb_is_up = await check_for_service('marketstored') + # register 1s and 1m buffers with the global incrementer task + async with open_sample_stream( + period_s=1., + shms_by_period={ + 1.: rt_shm.token, + 60.: hist_shm.token, + }, - bfqsn = fqsn.replace('.' + mod.name, '') - open_history_client = getattr(mod, 'open_history_client', None) - assert open_history_client + # NOTE: we want to only open a stream for doing broadcasts on + # backfill operations, not receive the sample index-stream + # (since there's no code in this data feed layer that needs to + # consume it). + open_index_stream=True, + sub_for_broadcasts=False, - if ( - tsdb_is_up - and opened - and open_history_client - ): - log.info('Found existing `marketstored`') + ) as sample_stream: - from . import marketstore - async with ( - marketstore.open_storage_client(fqsn)as storage, + log.info('Scanning for existing `marketstored`') + tsdb_is_up = await check_for_service('marketstored') + + bfqsn = fqsn.replace('.' + mod.name, '') + open_history_client = getattr(mod, 'open_history_client', None) + assert open_history_client + + if ( + tsdb_is_up + and opened + and open_history_client ): - hist_shm, rt_shm = await bus.nursery.start( - tsdb_backfill, - mod, - marketstore, + log.info('Found existing `marketstored`') + + from . import marketstore + async with ( + marketstore.open_storage_client(fqsn)as storage, + ): + # TODO: drop returning the output that we pass in? + await bus.nursery.start( + tsdb_backfill, + mod, + marketstore, + bus, + storage, + fqsn, + bfqsn, + { + 1: rt_shm, + 60: hist_shm, + }, + sample_stream, + ) + + # yield back after client connect with filled shm + task_status.started(( + hist_zero_index, + hist_shm, + rt_zero_index, + rt_shm, + )) + + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. + some_data_ready.set() + + # history retreival loop depending on user interaction + # and thus a small RPC-prot for remotely controllinlg + # what data is loaded for viewing. + await trio.sleep_forever() + + # load less history if no tsdb can be found + elif ( + not tsdb_is_up + and opened + ): + await basic_backfill( bus, - storage, - fqsn, + mod, bfqsn, { 1: rt_shm, 60: hist_shm, }, + sample_stream, ) - - # yield back after client connect with filled shm task_status.started(( hist_zero_index, hist_shm, rt_zero_index, rt_shm, )) - - # indicate to caller that feed can be delivered to - # remote requesting client since we've loaded history - # data that can be used. some_data_ready.set() - - # history retreival loop depending on user interaction and thus - # a small RPC-prot for remotely controllinlg what data is loaded - # for viewing. await trio.sleep_forever() - # load less history if no tsdb can be found - elif ( - not tsdb_is_up - and opened - ): - await basic_backfill( - bus, - mod, - bfqsn, - shms={ - 1: rt_shm, - 60: hist_shm, - }, - ) - task_status.started(( - hist_zero_index, - hist_shm, - rt_zero_index, - rt_shm, - )) - some_data_ready.set() - await trio.sleep_forever() - - -class Flume(Struct): - ''' - Composite reference type which points to all the addressing handles - and other meta-data necessary for the read, measure and management - of a set of real-time updated data flows. - - Can be thought of as a "flow descriptor" or "flow frame" which - describes the high level properties of a set of data flows that can - be used seamlessly across process-memory boundaries. - - Each instance's sub-components normally includes: - - a msg oriented quote stream provided via an IPC transport - - history and real-time shm buffers which are both real-time - updated and backfilled. - - associated startup indexing information related to both buffer - real-time-append and historical prepend addresses. - - low level APIs to read and measure the updated data and manage - queuing properties. - - ''' - symbol: Symbol - first_quote: dict - _hist_shm_token: _Token - _rt_shm_token: _Token - - # private shm refs loaded dynamically from tokens - _hist_shm: ShmArray | None = None - _rt_shm: ShmArray | None = None - - stream: tractor.MsgStream | None = None - izero_hist: int = 0 - izero_rt: int = 0 - throttle_rate: int | None = None - - # TODO: do we need this really if we can pull the `Portal` from - # ``tractor``'s internals? - feed: Feed | None = None - - @property - def rt_shm(self) -> ShmArray: - - if self._rt_shm is None: - self._rt_shm = attach_shm_array( - token=self._rt_shm_token, - readonly=True, - ) - - return self._rt_shm - - @property - def hist_shm(self) -> ShmArray: - - if self._hist_shm is None: - self._hist_shm = attach_shm_array( - token=self._hist_shm_token, - readonly=True, - ) - - return self._hist_shm - - async def receive(self) -> dict: - return await self.stream.receive() - - @acm - async def index_stream( - self, - delay_s: int = 1, - - ) -> AsyncIterator[int]: - - if not self.feed: - raise RuntimeError('This flume is not part of any ``Feed``?') - - # TODO: maybe a public (property) API for this in ``tractor``? - portal = self.stream._ctx._portal - assert portal - - # XXX: this should be singleton on a host, - # a lone broker-daemon per provider should be - # created for all practical purposes - async with maybe_open_context( - acm_func=partial( - portal.open_context, - iter_ohlc_periods, - ), - kwargs={'delay_s': delay_s}, - ) as (cache_hit, (ctx, first)): - async with ctx.open_stream() as istream: - if cache_hit: - # add a new broadcast subscription for the quote stream - # if this feed is likely already in use - async with istream.subscribe() as bistream: - yield bistream - else: - yield istream - - def get_ds_info( - self, - ) -> tuple[float, float, float]: - ''' - Compute the "downsampling" ratio info between the historical shm - buffer and the real-time (HFT) one. - - Return a tuple of the fast sample period, historical sample - period and ratio between them. - - ''' - times = self.hist_shm.array['time'] - end = pendulum.from_timestamp(times[-1]) - start = pendulum.from_timestamp(times[times != times[-1]][-1]) - hist_step_size_s = (end - start).seconds - - times = self.rt_shm.array['time'] - end = pendulum.from_timestamp(times[-1]) - start = pendulum.from_timestamp(times[times != times[-1]][-1]) - rt_step_size_s = (end - start).seconds - - ratio = hist_step_size_s / rt_step_size_s - return ( - rt_step_size_s, - hist_step_size_s, - ratio, - ) - - # TODO: get native msgspec decoding for these workinn - def to_msg(self) -> dict: - msg = self.to_dict() - msg['symbol'] = msg['symbol'].to_dict() - - # can't serialize the stream or feed objects, it's expected - # you'll have a ref to it since this msg should be rxed on - # a stream on whatever far end IPC.. - msg.pop('stream') - msg.pop('feed') - return msg - - @classmethod - def from_msg(cls, msg: dict) -> dict: - symbol = Symbol(**msg.pop('symbol')) - return cls( - symbol=symbol, - **msg, - ) - async def allocate_persistent_feed( bus: _FeedsBus, @@ -1074,6 +955,8 @@ async def allocate_persistent_feed( some_data_ready = trio.Event() feed_is_live = trio.Event() + symstr = symstr.lower() + # establish broker backend quote stream by calling # ``stream_quotes()``, which is a required broker backend endpoint. init_msg, first_quote = await bus.nursery.start( @@ -1132,6 +1015,7 @@ async def allocate_persistent_feed( # https://github.com/python-trio/trio/issues/2258 # bus.nursery.start_soon( # await bus.start_task( + ( izero_hist, hist_shm, @@ -1165,13 +1049,6 @@ async def allocate_persistent_feed( # feed to that name (for now). bus.feeds[symstr] = bus.feeds[bfqsn] = flume - # insert 1s ohlc into the increment buffer set - # to update and shift every second - sampler.ohlcv_shms.setdefault( - 1, - [] - ).append(rt_shm) - task_status.started() if not start_stream: @@ -1181,18 +1058,6 @@ async def allocate_persistent_feed( # the backend will indicate when real-time quotes have begun. await feed_is_live.wait() - # insert 1m ohlc into the increment buffer set - # to shift every 60s. - sampler.ohlcv_shms.setdefault(60, []).append(hist_shm) - - # create buffer a single incrementer task broker backend - # (aka `brokerd`) using the lowest sampler period. - if sampler.incrementers.get(_default_delay_s) is None: - await bus.start_task( - increment_ohlc_buffer, - _default_delay_s, - ) - sum_tick_vlm: bool = init_msg.get( 'shm_write_opts', {} ).get('sum_tick_vlm', True) @@ -1205,7 +1070,12 @@ async def allocate_persistent_feed( rt_shm.push(hist_shm.array[-3:-1]) ohlckeys = ['open', 'high', 'low', 'close'] rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1] - rt_shm.array['volume'][-2] = 0 + rt_shm.array['volume'][-2:] = 0 + + # set fast buffer time step to 1s + ts = round(time.time()) + rt_shm.array['time'][0] = ts + rt_shm.array['time'][1] = ts + 1 # wait the spawning parent task to register its subscriber # send-stream entry before we start the sample loop. @@ -1248,6 +1118,10 @@ async def open_feed_bus( symbol. ''' + # ensure that a quote feed stream which is pushing too fast doesn't + # cause and overrun in the client. + ctx._backpressure = True + if loglevel is None: loglevel = tractor.current_actor().loglevel @@ -1261,10 +1135,6 @@ async def open_feed_bus( servicename = tractor.current_actor().name assert 'brokerd' in servicename - # XXX: figure this not crashing into debug! - # await tractor.breakpoint() - # assert 0 - assert brokername in servicename bus = get_feed_bus(brokername) @@ -1273,6 +1143,10 @@ async def open_feed_bus( flumes: dict[str, Flume] = {} for symbol in symbols: + + # we always use lower case keys internally + symbol = symbol.lower() + # if no cached feed for this symbol has been created for this # brokerd yet, start persistent stream and shm writer task in # service nursery @@ -1359,6 +1233,7 @@ async def open_feed_bus( # a max ``tick_throttle`` instantaneous rate. send, recv = trio.open_memory_channel(2**10) + ctx._backpressure = False cs = await bus.start_task( uniform_rate_send, tick_throttle, diff --git a/piker/data/flows.py b/piker/data/flows.py new file mode 100644 index 00000000..9bb27230 --- /dev/null +++ b/piker/data/flows.py @@ -0,0 +1,321 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +abstractions for organizing, managing and generally operating-on +real-time data processing data-structures. + +"Streams, flumes, cascades and flows.." + +""" +from __future__ import annotations +from contextlib import asynccontextmanager as acm +from functools import partial +from typing import ( + AsyncIterator, + TYPE_CHECKING, +) + +import tractor +from tractor.trionics import ( + maybe_open_context, +) +import pendulum +import numpy as np + +from .types import Struct +from ._source import ( + Symbol, +) +from ._sharedmem import ( + attach_shm_array, + ShmArray, + _Token, +) +from ._sampling import ( + open_sample_stream, +) + +if TYPE_CHECKING: + from pyqtgraph import PlotItem + from .feed import Feed + + +# TODO: ideas for further abstractions as per +# https://github.com/pikers/piker/issues/216 and +# https://github.com/pikers/piker/issues/270: +# - a ``Cascade`` would be the minimal "connection" of 2 ``Flumes`` +# as per circuit parlance: +# https://en.wikipedia.org/wiki/Two-port_network#Cascade_connection +# - could cover the combination of our `FspAdmin` and the +# backend `.fsp._engine` related machinery to "connect" one flume +# to another? +# - a (financial signal) ``Flow`` would be the a "collection" of such +# minmial cascades. Some engineering based jargon concepts: +# - https://en.wikipedia.org/wiki/Signal_chain +# - https://en.wikipedia.org/wiki/Daisy_chain_(electrical_engineering) +# - https://en.wikipedia.org/wiki/Audio_signal_flow +# - https://en.wikipedia.org/wiki/Digital_signal_processing#Implementation +# - https://en.wikipedia.org/wiki/Dataflow_programming +# - https://en.wikipedia.org/wiki/Signal_programming +# - https://en.wikipedia.org/wiki/Incremental_computing + + +class Flume(Struct): + ''' + Composite reference type which points to all the addressing handles + and other meta-data necessary for the read, measure and management + of a set of real-time updated data flows. + + Can be thought of as a "flow descriptor" or "flow frame" which + describes the high level properties of a set of data flows that can + be used seamlessly across process-memory boundaries. + + Each instance's sub-components normally includes: + - a msg oriented quote stream provided via an IPC transport + - history and real-time shm buffers which are both real-time + updated and backfilled. + - associated startup indexing information related to both buffer + real-time-append and historical prepend addresses. + - low level APIs to read and measure the updated data and manage + queuing properties. + + ''' + symbol: Symbol + first_quote: dict + _rt_shm_token: _Token + + # optional since some data flows won't have a "downsampled" history + # buffer/stream (eg. FSPs). + _hist_shm_token: _Token | None = None + + # private shm refs loaded dynamically from tokens + _hist_shm: ShmArray | None = None + _rt_shm: ShmArray | None = None + + stream: tractor.MsgStream | None = None + izero_hist: int = 0 + izero_rt: int = 0 + throttle_rate: int | None = None + + # TODO: do we need this really if we can pull the `Portal` from + # ``tractor``'s internals? + feed: Feed | None = None + + @property + def rt_shm(self) -> ShmArray: + + if self._rt_shm is None: + self._rt_shm = attach_shm_array( + token=self._rt_shm_token, + readonly=True, + ) + + return self._rt_shm + + @property + def hist_shm(self) -> ShmArray: + + if self._hist_shm_token is None: + raise RuntimeError( + 'No shm token has been set for the history buffer?' + ) + + if ( + self._hist_shm is None + ): + self._hist_shm = attach_shm_array( + token=self._hist_shm_token, + readonly=True, + ) + + return self._hist_shm + + async def receive(self) -> dict: + return await self.stream.receive() + + @acm + async def index_stream( + self, + delay_s: float = 1, + + ) -> AsyncIterator[int]: + + if not self.feed: + raise RuntimeError('This flume is not part of any ``Feed``?') + + # TODO: maybe a public (property) API for this in ``tractor``? + portal = self.stream._ctx._portal + assert portal + + # XXX: this should be singleton on a host, + # a lone broker-daemon per provider should be + # created for all practical purposes + async with open_sample_stream(float(delay_s)) as stream: + yield stream + + def get_ds_info( + self, + ) -> tuple[float, float, float]: + ''' + Compute the "downsampling" ratio info between the historical shm + buffer and the real-time (HFT) one. + + Return a tuple of the fast sample period, historical sample + period and ratio between them. + + ''' + times = self.hist_shm.array['time'] + end = pendulum.from_timestamp(times[-1]) + start = pendulum.from_timestamp(times[times != times[-1]][-1]) + hist_step_size_s = (end - start).seconds + + times = self.rt_shm.array['time'] + end = pendulum.from_timestamp(times[-1]) + start = pendulum.from_timestamp(times[times != times[-1]][-1]) + rt_step_size_s = (end - start).seconds + + ratio = hist_step_size_s / rt_step_size_s + return ( + rt_step_size_s, + hist_step_size_s, + ratio, + ) + + # TODO: get native msgspec decoding for these workinn + def to_msg(self) -> dict: + msg = self.to_dict() + msg['symbol'] = msg['symbol'].to_dict() + + # can't serialize the stream or feed objects, it's expected + # you'll have a ref to it since this msg should be rxed on + # a stream on whatever far end IPC.. + msg.pop('stream') + msg.pop('feed') + return msg + + @classmethod + def from_msg(cls, msg: dict) -> dict: + symbol = Symbol(**msg.pop('symbol')) + return cls( + symbol=symbol, + **msg, + ) + + def get_index( + self, + time_s: float, + + ) -> int: + ''' + Return array shm-buffer index for for epoch time. + + ''' + array = self.rt_shm.array + times = array['time'] + mask = (times >= time_s) + + if any(mask): + return array['index'][mask][0] + + # just the latest index + array['index'][-1] + + def slice_from_time( + self, + array: np.ndarray, + start_t: float, + stop_t: float, + timeframe_s: int = 1, + return_data: bool = False, + + ) -> np.ndarray: + ''' + Slice an input struct array providing only datums + "in view" of this chart. + + ''' + arr = { + 1: self.rt_shm.array, + 60: self.hist_shm.arry, + }[timeframe_s] + + times = arr['time'] + index = array['index'] + + # use advanced indexing to map the + # time range to the index range. + mask = ( + (times >= start_t) + & + (times < stop_t) + ) + + # TODO: if we can ensure each time field has a uniform + # step we can instead do some arithmetic to determine + # the equivalent index like we used to? + # return array[ + # lbar - ifirst: + # (rbar - ifirst) + 1 + # ] + + i_by_t = index[mask] + i_0 = i_by_t[0] + + abs_slc = slice( + i_0, + i_by_t[-1], + ) + # slice data by offset from the first index + # available in the passed datum set. + read_slc = slice( + 0, + i_by_t[-1] - i_0, + ) + if not return_data: + return ( + abs_slc, + read_slc, + ) + + # also return the readable data from the timerange + return ( + abs_slc, + read_slc, + arr[mask], + ) + + def view_data( + self, + plot: PlotItem, + timeframe_s: int = 1, + + ) -> np.ndarray: + + # get far-side x-indices plot view + vr = plot.viewRect() + + ( + abs_slc, + buf_slc, + iv_arr, + ) = self.slice_from_time( + start_t=vr.left(), + stop_t=vr.right(), + timeframe_s=timeframe_s, + return_data=True, + ) + return iv_arr diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index d354f9b0..88553af7 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -454,8 +454,12 @@ class Storage: try: result = await client.query(params) - except purerpc.grpclib.exceptions.UnknownError: + except purerpc.grpclib.exceptions.UnknownError as err: # indicate there is no history for this timeframe + log.exception( + f'Unknown mkts QUERY error: {params}\n' + f'{err.args}' + ) return {} # TODO: it turns out column access on recarrays is actually slower: diff --git a/piker/fsp/_api.py b/piker/fsp/_api.py index f4e42bc1..9654a2a1 100644 --- a/piker/fsp/_api.py +++ b/piker/fsp/_api.py @@ -199,7 +199,10 @@ def maybe_mk_fsp_shm( # TODO: load output types from `Fsp` # - should `index` be a required internal field? fsp_dtype = np.dtype( - [('index', int)] + + [('index', int)] + + + [('time', float)] + + [(field_name, float) for field_name in target.outputs] ) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 5d389e29..a78308a4 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -21,7 +21,9 @@ core task logic for processing chains from dataclasses import dataclass from functools import partial from typing import ( - AsyncIterator, Callable, Optional, + AsyncIterator, + Callable, + Optional, Union, ) @@ -36,9 +38,13 @@ from .. import data from ..data import attach_shm_array from ..data.feed import ( Flume, + Feed, ) from ..data._sharedmem import ShmArray -from ..data._sampling import _default_delay_s +from ..data._sampling import ( + _default_delay_s, + open_sample_stream, +) from ..data._source import Symbol from ._api import ( Fsp, @@ -111,8 +117,9 @@ async def fsp_compute( flume.rt_shm, ) - # Conduct a single iteration of fsp with historical bars input - # and get historical output + # HISTORY COMPUTE PHASE + # conduct a single iteration of fsp with historical bars input + # and get historical output. history_output: Union[ dict[str, np.ndarray], # multi-output case np.ndarray, # single output case @@ -129,9 +136,13 @@ async def fsp_compute( # each respective field. fields = getattr(dst.array.dtype, 'fields', None).copy() fields.pop('index') - history: Optional[np.ndarray] = None # TODO: nptyping here! + history_by_field: Optional[np.ndarray] = None + src_time = src.array['time'] - if fields and len(fields) > 1 and fields: + if ( + fields and + len(fields) > 1 + ): if not isinstance(history_output, dict): raise ValueError( f'`{func_name}` is a multi-output FSP and should yield a ' @@ -142,7 +153,7 @@ async def fsp_compute( if key in history_output: output = history_output[key] - if history is None: + if history_by_field is None: if output is None: length = len(src.array) @@ -152,7 +163,7 @@ async def fsp_compute( # using the first output, determine # the length of the struct-array that # will be pushed to shm. - history = np.zeros( + history_by_field = np.zeros( length, dtype=dst.array.dtype ) @@ -160,7 +171,7 @@ async def fsp_compute( if output is None: continue - history[key] = output + history_by_field[key] = output # single-key output stream else: @@ -169,11 +180,13 @@ async def fsp_compute( f'`{func_name}` is a single output FSP and should yield an ' '`np.ndarray` for history' ) - history = np.zeros( + history_by_field = np.zeros( len(history_output), dtype=dst.array.dtype ) - history[func_name] = history_output + history_by_field[func_name] = history_output + + history_by_field['time'] = src_time[-len(history_by_field):] # TODO: XXX: # THERE'S A BIG BUG HERE WITH THE `index` field since we're @@ -190,7 +203,10 @@ async def fsp_compute( # TODO: can we use this `start` flag instead of the manual # setting above? - index = dst.push(history, start=first) + index = dst.push( + history_by_field, + start=first, + ) profiler(f'{func_name} pushed history') profiler.finish() @@ -216,8 +232,14 @@ async def fsp_compute( log.debug(f"{func_name}: {processed}") key, output = processed - index = src.index - dst.array[-1][key] = output + # dst.array[-1][key] = output + dst.array[[key, 'time']][-1] = ( + output, + # TODO: what about pushing ``time.time_ns()`` + # in which case we'll need to round at the graphics + # processing / sampling layer? + src.array[-1]['time'] + ) # NOTE: for now we aren't streaming this to the consumer # stream latest array index entry which basically just acts @@ -228,6 +250,7 @@ async def fsp_compute( # N-consumers who subscribe for the real-time output, # which we'll likely want to implement using local-mem # chans for the fan out? + # index = src.index # if attach_stream: # await client_stream.send(index) @@ -302,6 +325,7 @@ async def cascade( raise ValueError(f'Unknown fsp target: {ns_path}') # open a data feed stream with requested broker + feed: Feed async with data.feed.maybe_open_feed( [fqsn], @@ -317,7 +341,6 @@ async def cascade( symbol = flume.symbol assert src.token == flume.rt_shm.token profiler(f'{func}: feed up') - # last_len = new_len = len(src.array) func_name = func.__name__ async with ( @@ -365,7 +388,7 @@ async def cascade( ) -> tuple[TaskTracker, int]: # TODO: adopt an incremental update engine/approach # where possible here eventually! - log.debug(f're-syncing fsp {func_name} to source') + log.info(f're-syncing fsp {func_name} to source') tracker.cs.cancel() await tracker.complete.wait() tracker, index = await n.start(fsp_target) @@ -386,7 +409,8 @@ async def cascade( src: ShmArray, dst: ShmArray ) -> tuple[bool, int, int]: - '''Predicate to dertmine if a destination FSP + ''' + Predicate to dertmine if a destination FSP output array is aligned to its source array. ''' @@ -395,16 +419,15 @@ async def cascade( return not ( # the source is likely backfilling and we must # sync history calculations - len_diff > 2 or + len_diff > 2 # we aren't step synced to the source and may be # leading/lagging by a step - step_diff > 1 or - step_diff < 0 + or step_diff > 1 + or step_diff < 0 ), step_diff, len_diff async def poll_and_sync_to_step( - tracker: TaskTracker, src: ShmArray, dst: ShmArray, @@ -424,16 +447,16 @@ async def cascade( # signal times = src.array['time'] if len(times) > 1: - delay_s = times[-1] - times[times != times[-1]][-1] + last_ts = times[-1] + delay_s = float(last_ts - times[times != last_ts][-1]) else: # our default "HFT" sample rate. delay_s = _default_delay_s - # Increment the underlying shared memory buffer on every - # "increment" msg received from the underlying data feed. - async with flume.index_stream( - int(delay_s) - ) as istream: + # sub and increment the underlying shared memory buffer + # on every step msg received from the global `samplerd` + # service. + async with open_sample_stream(float(delay_s)) as istream: profiler(f'{func_name}: sample stream up') profiler.finish() @@ -468,3 +491,23 @@ async def cascade( last = array[-1:].copy() dst.push(last) + + # sync with source buffer's time step + src_l2 = src.array[-2:] + src_li, src_lt = src_l2[-1][['index', 'time']] + src_2li, src_2lt = src_l2[-2][['index', 'time']] + dst._array['time'][src_li] = src_lt + dst._array['time'][src_2li] = src_2lt + + # last2 = dst.array[-2:] + # if ( + # last2[-1]['index'] != src_li + # or last2[-2]['index'] != src_2li + # ): + # dstl2 = list(last2) + # srcl2 = list(src_l2) + # print( + # # f'{dst.token}\n' + # f'src: {srcl2}\n' + # f'dst: {dstl2}\n' + # ) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index b5456fac..06d0be91 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -234,7 +234,7 @@ async def flow_rates( # FSPs, user input, and possibly any general event stream in # real-time. Hint: ideally implemented with caching until mutated # ;) - period: 'Param[int]' = 6, # noqa + period: 'Param[int]' = 1, # noqa # TODO: support other means by providing a map # to weights `partial()`-ed with `wma()`? @@ -268,8 +268,7 @@ async def flow_rates( 'dark_dvlm_rate': None, } - # TODO: 3.10 do ``anext()`` - quote = await source.__anext__() + quote = await anext(source) # ltr = 0 # lvr = 0 diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py index b0fa6446..d8eabb70 100644 --- a/piker/ui/_exec.py +++ b/piker/ui/_exec.py @@ -49,7 +49,10 @@ from qdarkstyle import DarkPalette import trio from outcome import Error -from .._daemon import maybe_open_pikerd, _tractor_kwargs +from .._daemon import ( + maybe_open_pikerd, + get_tractor_runtime_kwargs, +) from ..log import get_logger from ._pg_overrides import _do_overrides from . import _style @@ -170,7 +173,7 @@ def run_qtractor( instance.window = window # override tractor's defaults - tractor_kwargs.update(_tractor_kwargs) + tractor_kwargs.update(get_tractor_runtime_kwargs()) # define tractor entrypoint async def main(): diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index dfbd8b9a..9e05f545 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -51,7 +51,10 @@ from ._forms import ( mk_form, open_form_input_handling, ) -from ..fsp._api import maybe_mk_fsp_shm, Fsp +from ..fsp._api import ( + maybe_mk_fsp_shm, + Fsp, +) from ..fsp import cascade from ..fsp._volume import ( # tina_vwap, diff --git a/tests/conftest.py b/tests/conftest.py index 1bd1d86e..2cfaad7a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,6 +7,9 @@ from piker import ( # log, config, ) +from piker._daemon import ( + Services, +) def pytest_addoption(parser): @@ -137,12 +140,16 @@ async def _open_test_pikerd( port = random.randint(6e3, 7e3) reg_addr = ('127.0.0.1', port) + # try: async with ( maybe_open_pikerd( registry_addr=reg_addr, **kwargs, - ), + ) as service_manager, ): + # this proc/actor is the pikerd + assert service_manager is Services + async with tractor.wait_for_actor( 'pikerd', arbiter_sockaddr=reg_addr, @@ -153,6 +160,7 @@ async def _open_test_pikerd( raddr[0], raddr[1], portal, + service_manager, ) diff --git a/tests/test_services.py b/tests/test_services.py new file mode 100644 index 00000000..936a426e --- /dev/null +++ b/tests/test_services.py @@ -0,0 +1,176 @@ +''' +Actor tree daemon sub-service verifications + +''' +from typing import AsyncContextManager +from contextlib import asynccontextmanager as acm + +import pytest +import trio +import tractor + +from piker._daemon import ( + find_service, + check_for_service, + Services, +) +from piker.data import ( + open_feed, +) +from piker.clearing import ( + open_ems, +) +from piker.clearing._messages import ( + BrokerdPosition, + Status, +) +from piker.clearing._client import ( + OrderBook, +) + + +def test_runtime_boot( + open_test_pikerd: AsyncContextManager +): + ''' + Verify we can boot the `pikerd` service stack using the + `open_test_pikerd` fixture helper and that registry address details + match up. + + ''' + async def main(): + port = 6666 + daemon_addr = ('127.0.0.1', port) + services: Services + + async with ( + open_test_pikerd( + reg_addr=daemon_addr, + ) as (_, _, pikerd_portal, services), + + tractor.wait_for_actor( + 'pikerd', + arbiter_sockaddr=daemon_addr, + ) as portal, + ): + assert pikerd_portal.channel.raddr == daemon_addr + assert pikerd_portal.channel.raddr == portal.channel.raddr + + trio.run(main) + + +@acm +async def ensure_service( + name: str, + sockaddr: tuple[str, int] | None = None, +) -> None: + async with find_service(name) as portal: + remote_sockaddr = portal.channel.raddr + print(f'FOUND `{name}` @ {remote_sockaddr}') + + if sockaddr: + assert remote_sockaddr == sockaddr + + yield portal + + +def test_ensure_datafeed_actors( + open_test_pikerd: AsyncContextManager + +) -> None: + ''' + Verify that booting a data feed starts a `brokerd` + actor and a singleton global `samplerd` and opening + an order mode in paper opens the `paperboi` service. + + ''' + actor_name: str = 'brokerd' + backend: str = 'kraken' + brokerd_name: str = f'{actor_name}.{backend}' + + async def main(): + async with ( + open_test_pikerd(), + open_feed( + ['xbtusdt.kraken'], + loglevel='info', + ) as feed + ): + # halt rt quote streams since we aren't testing them + await feed.pause() + + async with ( + ensure_service(brokerd_name), + ensure_service('samplerd'), + ): + pass + + trio.run(main) + + +def test_ensure_ems_in_paper_actors( + open_test_pikerd: AsyncContextManager + +) -> None: + + actor_name: str = 'brokerd' + backend: str = 'kraken' + brokerd_name: str = f'{actor_name}.{backend}' + + async def main(): + + # type declares + book: OrderBook + trades_stream: tractor.MsgStream + pps: dict[str, list[BrokerdPosition]] + accounts: list[str] + dialogs: dict[str, Status] + + # ensure we timeout after is startup is too slow. + # TODO: something like this should be our start point for + # benchmarking end-to-end startup B) + with trio.fail_after(9): + async with ( + open_test_pikerd() as (_, _, _, services), + + open_ems( + 'xbtusdt.kraken', + mode='paper', + ) as ( + book, + trades_stream, + pps, + accounts, + dialogs, + ), + ): + # there should be no on-going positions, + # TODO: though eventually we'll want to validate against + # local ledger and `pps.toml` state ;) + assert not pps + assert not dialogs + + pikerd_subservices = ['emsd', 'samplerd'] + + async with ( + ensure_service('emsd'), + ensure_service(brokerd_name), + ensure_service(f'paperboi.{backend}'), + ): + for name in pikerd_subservices: + assert name in services.service_tasks + + # brokerd.kraken actor should have been started + # implicitly by the ems. + assert brokerd_name in services.service_tasks + + print('ALL SERVICES STARTED, terminating..') + await services.cancel_service('emsd') + + with pytest.raises( + tractor._exceptions.ContextCancelled, + ) as exc_info: + trio.run(main) + + cancel_msg: str = '`_emsd_main()` was remotely cancelled by its caller' + assert cancel_msg in exc_info.value.args[0]