diff --git a/piker/data/feed.py b/piker/data/feed.py index 6d4c0689..c3d17ac2 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for piker0) +# Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -25,7 +25,7 @@ from contextlib import asynccontextmanager from functools import partial from types import ModuleType from typing import ( - Any, Sequence, + Any, AsyncIterator, Optional, Awaitable, ) @@ -56,7 +56,9 @@ from ._source import ( ) from ..ui import _search from ._sampling import ( - _shms, + # TODO: should probably group these in a compound type at this point XD + _ohlcv_shms, + _subscribers, _incrementers, increment_ohlc_buffer, iter_ohlc_periods, @@ -108,6 +110,7 @@ class _FeedsBus(BaseModel): self, target: Awaitable, *args, + ) -> None: async def start_with_cs( @@ -159,7 +162,8 @@ def get_feed_bus( @tractor.context async def _setup_persistent_brokerd( ctx: tractor.Context, - brokername: str + brokername: str, + ) -> None: ''' Allocate a actor-wide service nursery in ``brokerd`` @@ -167,22 +171,22 @@ async def _setup_persistent_brokerd( the broker backend as needed. ''' - try: - async with trio.open_nursery() as service_nursery: + get_console_log(tractor.current_actor().loglevel) - # assign a nursery to the feeds bus for spawning - # background tasks from clients - bus = get_feed_bus(brokername, service_nursery) + global _bus + assert not _bus - # unblock caller - await ctx.started() + async with trio.open_nursery() as service_nursery: + # assign a nursery to the feeds bus for spawning + # background tasks from clients + get_feed_bus(brokername, service_nursery) - # we pin this task to keep the feeds manager active until the - # parent actor decides to tear it down - await trio.sleep_forever() - finally: - # TODO: this needs to be shielded? - bus.nursery.cancel_scope.cancel() + # unblock caller + await ctx.started() + + # we pin this task to keep the feeds manager active until the + # parent actor decides to tear it down + await trio.sleep_forever() async def manage_history( @@ -194,6 +198,8 @@ async def manage_history( some_data_ready: trio.Event, feed_is_live: trio.Event, + task_status: TaskStatus = trio.TASK_STATUS_IGNORED, + ) -> None: ''' Load and manage historical data including the loading of any @@ -202,13 +208,15 @@ async def manage_history( buffer. ''' - # TODO: - # history retreival, see if we can pull from an existing + # TODO: history retreival, see if we can pull from an existing # ``marketstored`` daemon - # log.info('Scanning for existing `marketstored`') - # from .marketstore import load_history - # arrays = await load_history(symbol) + log.info('Scanning for existing `marketstored`') + fqsn = mk_fqsn(mod.name, symbol) + # from .marketstore import manage_history + # arrays = await manage_history(symbol) + arrays = {} + task_status.started() opened = we_opened_shm # TODO: history validation @@ -218,6 +226,8 @@ async def manage_history( if opened: if arrays: + await tractor.breakpoint() + # push to shm # set data ready # some_data_ready.set() @@ -245,7 +255,7 @@ async def manage_history( await feed_is_live.wait() if opened: - _shms.setdefault(delay_s, []).append(shm) + _ohlcv_shms.setdefault(delay_s, []).append(shm) # start shm incrementing for OHLC sampling at the current # detected sampling period if one dne. @@ -309,7 +319,13 @@ async def allocate_persistent_feed( # - a history loader / maintainer # - a real-time streamer which consumers and sends new data to any # consumers as well as writes to storage backends (as configured). - bus.nursery.start_soon( + + # XXX: neither of these will raise but will cause an inf hang due to: + # https://github.com/python-trio/trio/issues/2258 + # bus.nursery.start_soon( + # await bus.start_task( + + await bus.nursery.start( manage_history, mod, shm, @@ -345,7 +361,9 @@ async def allocate_persistent_feed( # yield back control to starting nursery once we receive either # some history or a real-time quote. + log.info(f'waiting on history to load: {fqsn}') await some_data_ready.wait() + bus.feeds[symbol.lower()] = (init_msg, first_quotes) task_status.started((init_msg, first_quotes)) @@ -518,8 +536,8 @@ async def open_sample_step_stream( portal.open_stream_from, iter_ohlc_periods, ), - kwargs={'delay_s': delay_s}, + ) as (cache_hit, istream): if cache_hit: # add a new broadcast subscription for the quote stream @@ -623,9 +641,8 @@ async def install_brokerd_search( @asynccontextmanager async def open_feed( - brokername: str, - symbols: Sequence[str], + symbols: list[str], loglevel: Optional[str] = None, backpressure: bool = True, @@ -725,7 +742,7 @@ async def open_feed( async def maybe_open_feed( brokername: str, - symbols: Sequence[str], + symbols: list[str], loglevel: Optional[str] = None, **kwargs,