Adjust and add notes for python-trio/trio#2258

async_hist_loading
Tyler Goodlet 2022-02-28 07:34:15 -05:00
parent 89a98c4aa2
commit b1cce8f9cf
1 changed files with 45 additions and 28 deletions

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0) # Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -25,7 +25,7 @@ from contextlib import asynccontextmanager
from functools import partial from functools import partial
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Any, Sequence, Any,
AsyncIterator, Optional, AsyncIterator, Optional,
Awaitable, Awaitable,
) )
@ -56,7 +56,9 @@ from ._source import (
) )
from ..ui import _search from ..ui import _search
from ._sampling import ( from ._sampling import (
_shms, # TODO: should probably group these in a compound type at this point XD
_ohlcv_shms,
_subscribers,
_incrementers, _incrementers,
increment_ohlc_buffer, increment_ohlc_buffer,
iter_ohlc_periods, iter_ohlc_periods,
@ -108,6 +110,7 @@ class _FeedsBus(BaseModel):
self, self,
target: Awaitable, target: Awaitable,
*args, *args,
) -> None: ) -> None:
async def start_with_cs( async def start_with_cs(
@ -159,7 +162,8 @@ def get_feed_bus(
@tractor.context @tractor.context
async def _setup_persistent_brokerd( async def _setup_persistent_brokerd(
ctx: tractor.Context, ctx: tractor.Context,
brokername: str brokername: str,
) -> None: ) -> None:
''' '''
Allocate a actor-wide service nursery in ``brokerd`` Allocate a actor-wide service nursery in ``brokerd``
@ -167,22 +171,22 @@ async def _setup_persistent_brokerd(
the broker backend as needed. the broker backend as needed.
''' '''
try: get_console_log(tractor.current_actor().loglevel)
async with trio.open_nursery() as service_nursery:
# assign a nursery to the feeds bus for spawning global _bus
# background tasks from clients assert not _bus
bus = get_feed_bus(brokername, service_nursery)
# unblock caller async with trio.open_nursery() as service_nursery:
await ctx.started() # assign a nursery to the feeds bus for spawning
# background tasks from clients
get_feed_bus(brokername, service_nursery)
# we pin this task to keep the feeds manager active until the # unblock caller
# parent actor decides to tear it down await ctx.started()
await trio.sleep_forever()
finally: # we pin this task to keep the feeds manager active until the
# TODO: this needs to be shielded? # parent actor decides to tear it down
bus.nursery.cancel_scope.cancel() await trio.sleep_forever()
async def manage_history( async def manage_history(
@ -194,6 +198,8 @@ async def manage_history(
some_data_ready: trio.Event, some_data_ready: trio.Event,
feed_is_live: trio.Event, feed_is_live: trio.Event,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' '''
Load and manage historical data including the loading of any Load and manage historical data including the loading of any
@ -202,13 +208,15 @@ async def manage_history(
buffer. buffer.
''' '''
# TODO: # TODO: history retreival, see if we can pull from an existing
# history retreival, see if we can pull from an existing
# ``marketstored`` daemon # ``marketstored`` daemon
# log.info('Scanning for existing `marketstored`') log.info('Scanning for existing `marketstored`')
# from .marketstore import load_history fqsn = mk_fqsn(mod.name, symbol)
# arrays = await load_history(symbol) # from .marketstore import manage_history
# arrays = await manage_history(symbol)
arrays = {} arrays = {}
task_status.started()
opened = we_opened_shm opened = we_opened_shm
# TODO: history validation # TODO: history validation
@ -218,6 +226,8 @@ async def manage_history(
if opened: if opened:
if arrays: if arrays:
await tractor.breakpoint()
# push to shm # push to shm
# set data ready # set data ready
# some_data_ready.set() # some_data_ready.set()
@ -245,7 +255,7 @@ async def manage_history(
await feed_is_live.wait() await feed_is_live.wait()
if opened: if opened:
_shms.setdefault(delay_s, []).append(shm) _ohlcv_shms.setdefault(delay_s, []).append(shm)
# start shm incrementing for OHLC sampling at the current # start shm incrementing for OHLC sampling at the current
# detected sampling period if one dne. # detected sampling period if one dne.
@ -309,7 +319,13 @@ async def allocate_persistent_feed(
# - a history loader / maintainer # - a history loader / maintainer
# - a real-time streamer which consumers and sends new data to any # - a real-time streamer which consumers and sends new data to any
# consumers as well as writes to storage backends (as configured). # consumers as well as writes to storage backends (as configured).
bus.nursery.start_soon(
# XXX: neither of these will raise but will cause an inf hang due to:
# https://github.com/python-trio/trio/issues/2258
# bus.nursery.start_soon(
# await bus.start_task(
await bus.nursery.start(
manage_history, manage_history,
mod, mod,
shm, shm,
@ -345,7 +361,9 @@ async def allocate_persistent_feed(
# yield back control to starting nursery once we receive either # yield back control to starting nursery once we receive either
# some history or a real-time quote. # some history or a real-time quote.
log.info(f'waiting on history to load: {fqsn}')
await some_data_ready.wait() await some_data_ready.wait()
bus.feeds[symbol.lower()] = (init_msg, first_quotes) bus.feeds[symbol.lower()] = (init_msg, first_quotes)
task_status.started((init_msg, first_quotes)) task_status.started((init_msg, first_quotes))
@ -518,8 +536,8 @@ async def open_sample_step_stream(
portal.open_stream_from, portal.open_stream_from,
iter_ohlc_periods, iter_ohlc_periods,
), ),
kwargs={'delay_s': delay_s}, kwargs={'delay_s': delay_s},
) as (cache_hit, istream): ) as (cache_hit, istream):
if cache_hit: if cache_hit:
# add a new broadcast subscription for the quote stream # add a new broadcast subscription for the quote stream
@ -623,9 +641,8 @@ async def install_brokerd_search(
@asynccontextmanager @asynccontextmanager
async def open_feed( async def open_feed(
brokername: str, brokername: str,
symbols: Sequence[str], symbols: list[str],
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
backpressure: bool = True, backpressure: bool = True,
@ -725,7 +742,7 @@ async def open_feed(
async def maybe_open_feed( async def maybe_open_feed(
brokername: str, brokername: str,
symbols: Sequence[str], symbols: list[str],
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
**kwargs, **kwargs,