diff --git a/piker/data/feed.py b/piker/data/feed.py index b28c958e..6d4c0689 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -27,6 +27,7 @@ from types import ModuleType from typing import ( Any, Sequence, AsyncIterator, Optional, + Awaitable, ) import trio @@ -72,12 +73,24 @@ class _FeedsBus(BaseModel): Data feeds broadcaster and persistence management. This is a brokerd side api used to manager persistent real-time - streams that can be allocated and left alive indefinitely. + streams that can be allocated and left alive indefinitely. A bus is + associated one-to-one with a particular broker backend where the + "bus" refers so a multi-symbol bus where quotes are interleaved in + time. + + Each "entry" in the bus includes: + - a stream used to push real time quotes (up to tick rates) + which is executed as a lone task that is cancellable via + a dedicated cancel scope. ''' + class Config: + arbitrary_types_allowed = True + underscore_attrs_are_private = False + brokername: str nursery: trio.Nursery - feeds: dict[str, tuple[trio.CancelScope, dict, dict]] = {} + feeds: dict[str, tuple[dict, dict]] = {} task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() @@ -91,14 +104,27 @@ class _FeedsBus(BaseModel): list[tuple[tractor.MsgStream, Optional[float]]] ] = {} - class Config: - arbitrary_types_allowed = True - underscore_attrs_are_private = False + async def start_task( + self, + target: Awaitable, + *args, + ) -> None: - async def cancel_all(self) -> None: - for sym, (cs, msg, quote) in self.feeds.items(): - log.debug(f'Cancelling cached feed for {self.brokername}:{sym}') - cs.cancel() + async def start_with_cs( + task_status: TaskStatus[ + trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> None: + with trio.CancelScope() as cs: + self.nursery.start_soon( + target, + *args, + ) + task_status.started(cs) + + return await self.nursery.start(start_with_cs) + + def cancel_task(self, task: trio.Task) -> bool: + pass _bus: _FeedsBus = None @@ -156,7 +182,78 @@ async def _setup_persistent_brokerd( await trio.sleep_forever() finally: # TODO: this needs to be shielded? - await bus.cancel_all() + bus.nursery.cancel_scope.cancel() + + +async def manage_history( + mod: ModuleType, + shm: ShmArray, + bus: _FeedsBus, + symbol: str, + we_opened_shm: bool, + some_data_ready: trio.Event, + feed_is_live: trio.Event, + +) -> None: + ''' + Load and manage historical data including the loading of any + available series from `marketstore` as well as conducting real-time + update of both that existing db and the allocated shared memory + buffer. + + ''' + # TODO: + # history retreival, see if we can pull from an existing + # ``marketstored`` daemon + # log.info('Scanning for existing `marketstored`') + # from .marketstore import load_history + # arrays = await load_history(symbol) + arrays = {} + + opened = we_opened_shm + # TODO: history validation + # assert opened, f'Persistent shm for {symbol} was already open?!' + # if not opened: + # raise RuntimeError("Persistent shm for sym was already open?!") + + if opened: + if arrays: + # push to shm + # set data ready + # some_data_ready.set() + raise ValueError('this should never execute yet') + + else: + # ask broker backend for new history + + # start history backfill task ``backfill_bars()`` is + # a required backend func this must block until shm is + # filled with first set of ohlc bars + cs = await bus.nursery.start(mod.backfill_bars, symbol, shm) + + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. + some_data_ready.set() + + # detect sample step size for sampled historical data + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + + # begin real-time updates of shm and tsb once the feed + # goes live. + await feed_is_live.wait() + + if opened: + _shms.setdefault(delay_s, []).append(shm) + + # start shm incrementing for OHLC sampling at the current + # detected sampling period if one dne. + if _incrementers.get(delay_s) is None: + cs = await bus.start_task(increment_ohlc_buffer, delay_s) + + await trio.sleep_forever() + cs.cancel() async def allocate_persistent_feed( @@ -168,17 +265,30 @@ async def allocate_persistent_feed( task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: + ''' + Create and maintain a "feed bus" which allocates tasks for real-time + streaming and optional historical data storage per broker/data provider + backend; this normally task runs *in* a `brokerd` actor. + If none exists, this allocates a ``_FeedsBus`` which manages the + lifetimes of streaming tasks created for each requested symbol. + + + 2 tasks are created: + - a real-time streaming task which connec + + ''' try: mod = get_brokermod(brokername) except ImportError: mod = get_ingestormod(brokername) - # allocate shm array for this broker/symbol - # XXX: we should get an error here if one already exists + fqsn = mk_fqsn(brokername, symbol) + # (maybe) allocate shm array for this broker/symbol which will + # be used for fast near-term history capture and processing. shm, opened = maybe_open_shm_array( - key=sym_to_shm_key(brokername, symbol), + key=fqsn, # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), @@ -187,69 +297,73 @@ async def allocate_persistent_feed( readonly=False, ) - # do history validation? - # assert opened, f'Persistent shm for {symbol} was already open?!' - # if not opened: - # raise RuntimeError("Persistent shm for sym was already open?!") - + # mem chan handed to broker backend so it can push real-time + # quotes to this task for sampling and history storage (see below). send, quote_stream = trio.open_memory_channel(10) + + # data sync signals for both history loading and market quotes + some_data_ready = trio.Event() feed_is_live = trio.Event() - # establish broker backend quote stream - # ``stream_quotes()`` is a required backend func + # run 2 tasks: + # - a history loader / maintainer + # - a real-time streamer which consumers and sends new data to any + # consumers as well as writes to storage backends (as configured). + bus.nursery.start_soon( + manage_history, + mod, + shm, + bus, + symbol, + opened, + some_data_ready, + feed_is_live, + ) + + # establish broker backend quote stream by calling + # ``stream_quotes()``, which is a required broker backend endpoint. init_msg, first_quotes = await bus.nursery.start( partial( mod.stream_quotes, send_chan=send, feed_is_live=feed_is_live, symbols=[symbol], - shm=shm, loglevel=loglevel, ) ) + # we hand an IPC-msg compatible shm token to the caller so it + # can read directly from the memory which will be written by + # this task. init_msg[symbol]['shm_token'] = shm.token - cs = bus.nursery.cancel_scope - - # TODO: make this into a composed type which also - # contains the backfiller cs for individual super-based - # resspawns when needed. - - # XXX: the ``symbol`` here is put into our native piker format (i.e. - # lower case). - bus.feeds[symbol.lower()] = (cs, init_msg, first_quotes) - - if opened: - # start history backfill task ``backfill_bars()`` is - # a required backend func this must block until shm is - # filled with first set of ohlc bars - await bus.nursery.start(mod.backfill_bars, symbol, shm) - - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] + # TODO: pretty sure we don't need this? why not just leave 1s as + # the fastest "sample period" since we'll probably always want that + # for most purposes. # pass OHLC sample rate in seconds (be sure to use python int type) - init_msg[symbol]['sample_rate'] = int(delay_s) + # init_msg[symbol]['sample_rate'] = 1 #int(delay_s) - # yield back control to starting nursery + # yield back control to starting nursery once we receive either + # some history or a real-time quote. + await some_data_ready.wait() + bus.feeds[symbol.lower()] = (init_msg, first_quotes) task_status.started((init_msg, first_quotes)) + # backend will indicate when real-time quotes have begun. await feed_is_live.wait() - if opened: - _shms.setdefault(delay_s, []).append(shm) - - # start shm incrementing for OHLC sampling - if _incrementers.get(delay_s) is None: - cs = await bus.nursery.start(increment_ohlc_buffer, delay_s) - sum_tick_vlm: bool = init_msg.get( 'shm_write_opts', {} ).get('sum_tick_vlm', True) # start sample loop try: - await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) + await sample_and_broadcast( + bus, + shm, + quote_stream, + sum_tick_vlm + ) finally: log.warning(f'{symbol}@{brokername} feed task terminated') @@ -265,36 +379,43 @@ async def open_feed_bus( start_stream: bool = True, ) -> None: + ''' + Open a data feed "bus": an actor-persistent per-broker task-oriented + data feed registry which allows managing real-time quote streams per + symbol. + ''' if loglevel is None: loglevel = tractor.current_actor().loglevel # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) + # local state sanity checks + # TODO: check for any stale shm entries for this symbol + # (after we also group them in a nice `/dev/shm/piker/` subdir). # ensure we are who we think we are assert 'brokerd' in tractor.current_actor().name bus = get_feed_bus(brokername) + bus._subscribers.setdefault(symbol, []) + fqsn = mk_fqsn(brokername, symbol) entry = bus.feeds.get(symbol) - bus._subscribers.setdefault(symbol, []) - - fs = mk_fqsn(symbol, brokername) - # if no cached feed for this symbol has been created for this # brokerd yet, start persistent stream and shm writer task in # service nursery - async with bus.task_lock: - if entry is None: - - if not start_stream: - raise RuntimeError( - f'No stream feed exists for {fs}?\n' - f'You may need a `brokerd` started first.' - ) + if entry is None: + if not start_stream: + raise RuntimeError( + f'No stream feed exists for {fqsn}?\n' + f'You may need a `brokerd` started first.' + ) + # allocate a new actor-local stream bus which will persist for + # this `brokerd`. + async with bus.task_lock: init_msg, first_quotes = await bus.nursery.start( partial( allocate_persistent_feed, @@ -310,25 +431,25 @@ async def open_feed_bus( loglevel=loglevel, ) ) + # TODO: we can remove this? assert isinstance(bus.feeds[symbol], tuple) # XXX: ``first_quotes`` may be outdated here if this is secondary # subscriber - cs, init_msg, first_quotes = bus.feeds[symbol] + init_msg, first_quotes = bus.feeds[symbol] # send this even to subscribers to existing feed? # deliver initial info message a first quote asap await ctx.started((init_msg, first_quotes)) if not start_stream: - log.warning(f'Not opening real-time stream for {fs}') + log.warning(f'Not opening real-time stream for {fqsn}') await trio.sleep_forever() + # real-time stream loop async with ( ctx.open_stream() as stream, - trio.open_nursery() as n, ): - if tick_throttle: # open a bg task which receives quotes over a mem chan @@ -336,7 +457,7 @@ async def open_feed_bus( # a max ``tick_throttle`` instantaneous rate. send, recv = trio.open_memory_channel(2**10) - n.start_soon( + cs = await bus.start_task( uniform_rate_send, tick_throttle, recv, @@ -358,21 +479,24 @@ async def open_feed_bus( if msg == 'pause': if sub in subs: log.info( - f'Pausing {fs} feed for {uid}') + f'Pausing {fqsn} feed for {uid}') subs.remove(sub) elif msg == 'resume': if sub not in subs: log.info( - f'Resuming {fs} feed for {uid}') + f'Resuming {fqsn} feed for {uid}') subs.append(sub) else: raise ValueError(msg) finally: log.info( f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}') + if tick_throttle: - n.cancel_scope.cancel() + # TODO: a one-cancels-one nursery + # n.cancel_scope.cancel() + cs.cancel() try: bus._subscribers[symbol].remove(sub) except ValueError: @@ -385,6 +509,7 @@ async def open_sample_step_stream( delay_s: int, ) -> tractor.ReceiveMsgStream: + # XXX: this should be singleton on a host, # a lone broker-daemon per provider should be # created for all practical purposes @@ -407,13 +532,15 @@ async def open_sample_step_stream( @dataclass class Feed: - """A data feed for client-side interaction with far-process# }}} - real-time data sources. + ''' + A data feed for client-side interaction with far-process real-time + data sources. This is an thin abstraction on top of ``tractor``'s portals for - interacting with IPC streams and conducting automatic - memory buffer orchestration. - """ + interacting with IPC streams and storage APIs (shm and time-series + db). + + ''' name: str shm: ShmArray mod: ModuleType @@ -425,7 +552,7 @@ class Feed: throttle_rate: Optional[int] = None _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None - _max_sample_rate: int = 0 + _max_sample_rate: int = 1 # cache of symbol info messages received as first message when # a stream startsc. @@ -460,13 +587,6 @@ class Feed: await self.stream.send('resume') -def sym_to_shm_key( - broker: str, - symbol: str, -) -> str: - return f'{broker}.{symbol}' - - @asynccontextmanager async def install_brokerd_search( @@ -527,13 +647,15 @@ async def open_feed( # no feed for broker exists so maybe spawn a data brokerd async with ( + # if no `brokerd` for this backend exists yet we spawn + # and actor for one. maybe_spawn_brokerd( brokername, loglevel=loglevel ) as portal, + # (allocate and) connect to any feed bus for this broker portal.open_context( - open_feed_bus, brokername=brokername, symbol=sym, @@ -566,12 +688,10 @@ async def open_feed( _portal=portal, throttle_rate=tick_throttle, ) - ohlc_sample_rates = [] for sym, data in init_msg.items(): si = data['symbol_info'] - ohlc_sample_rates.append(data['sample_rate']) symbol = mk_symbol( key=sym, @@ -592,9 +712,8 @@ async def open_feed( assert shm_token == shm.token # sanity - feed._max_sample_rate = max(ohlc_sample_rates) + feed._max_sample_rate = 1 - # yield feed try: yield feed finally: @@ -627,14 +746,16 @@ async def maybe_open_feed( 'symbols': [sym], 'loglevel': loglevel, 'tick_throttle': kwargs.get('tick_throttle'), - 'backpressure': kwargs.get('backpressure'), + + # XXX: super critical to have bool defaults here XD + 'backpressure': kwargs.get('backpressure', True), 'start_stream': kwargs.get('start_stream', True), }, key=sym, ) as (cache_hit, feed): if cache_hit: - print('USING CACHED FEED') + log.info(f'Using cached feed for {brokername}.{sym}') # add a new broadcast subscription for the quote stream # if this feed is likely already in use async with feed.stream.subscribe() as bstream: