diff --git a/piker/data/feed.py b/piker/data/feed.py index aaedfcc8..e87c00be 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -22,19 +22,21 @@ This module is enabled for ``brokerd`` daemons. """ from __future__ import annotations from contextlib import asynccontextmanager as acm -from dataclasses import ( - dataclass, - field, -) +# from dataclasses import ( +# dataclass, +# field, +# ) from datetime import datetime from functools import partial from types import ModuleType from typing import ( Any, AsyncIterator, + AsyncContextManager, Callable, Optional, Awaitable, + Sequence, TYPE_CHECKING, Union, ) @@ -43,7 +45,10 @@ import trio from trio.abc import ReceiveChannel from trio_typing import TaskStatus import tractor -from tractor.trionics import maybe_open_context +from tractor.trionics import ( + maybe_open_context, + gather_contexts, +) import pendulum import numpy as np @@ -58,6 +63,7 @@ from ._sharedmem import ( maybe_open_shm_array, attach_shm_array, ShmArray, + _Token, _secs_in_day, ) from .ingest import get_ingestormod @@ -109,11 +115,6 @@ class _FeedsBus(Struct): task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() - # XXX: so weird but, apparently without this being `._` private - # pydantic will complain about private `tractor.Context` instance - # vars (namely `._portal` and `._cancel_scope`) at import time. - # Reported this bug: - # https://github.com/samuelcolvin/pydantic/issues/2816 _subscribers: dict[ str, list[ @@ -719,10 +720,14 @@ async def manage_history( buffer. ''' + + from tractor._state import _runtime_vars + port = _runtime_vars['_root_mailbox'][1] + # (maybe) allocate shm array for this broker/symbol which will # be used for fast near-term history capture and processing. hist_shm, opened = maybe_open_shm_array( - key=f'{fqsn}_hist', + key=f'{fqsn}_hist', #_p{port}', # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), @@ -739,7 +744,7 @@ async def manage_history( ) rt_shm, opened = maybe_open_shm_array( - key=f'{fqsn}_rt', + key=f'{fqsn}_rt', #_p{port}', # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), @@ -836,373 +841,61 @@ async def manage_history( await trio.sleep_forever() -async def allocate_persistent_feed( - bus: _FeedsBus, - - brokername: str, - symbol: str, - - loglevel: str, - start_stream: bool = True, - - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, - -) -> None: +class Flume(Struct): ''' - Create and maintain a "feed bus" which allocates tasks for real-time - streaming and optional historical data storage per broker/data provider - backend; this normally task runs *in* a `brokerd` actor. + Composite reference type which points to all the addressing handles + and other meta-data necessary for the read, measure and management + of a set of real-time updated data flows. - If none exists, this allocates a ``_FeedsBus`` which manages the - lifetimes of streaming tasks created for each requested symbol. + Can be thought of as a "flow descriptor" or "flow frame" which + describes the high level properties of a set of data flows that can + be used seamlessly across process-memory boundaries. - - 2 tasks are created: - - a real-time streaming task which connec + Each instance's sub-components normally includes: + - a msg oriented quote stream provided via an IPC transport + - history and real-time shm buffers which are both real-time + updated and backfilled. + - associated startup indexing information related to both buffer + real-time-append and historical prepend addresses. + - low level APIs to read and measure the updated data and manage + queuing properties. ''' - # load backend module - try: - mod = get_brokermod(brokername) - except ImportError: - mod = get_ingestormod(brokername) + symbol: Symbol + first_quote: dict + _hist_shm_token: _Token + _rt_shm_token: _Token - # mem chan handed to broker backend so it can push real-time - # quotes to this task for sampling and history storage (see below). - send, quote_stream = trio.open_memory_channel(616) - - # data sync signals for both history loading and market quotes - some_data_ready = trio.Event() - feed_is_live = trio.Event() - - # establish broker backend quote stream by calling - # ``stream_quotes()``, which is a required broker backend endpoint. - init_msg, first_quote = await bus.nursery.start( - partial( - mod.stream_quotes, - send_chan=send, - feed_is_live=feed_is_live, - symbols=[symbol], - loglevel=loglevel, - ) - ) - # the broker-specific fully qualified symbol name, - # but ensure it is lower-cased for external use. - bfqsn = init_msg[symbol]['fqsn'].lower() - init_msg[symbol]['fqsn'] = bfqsn - - # HISTORY, run 2 tasks: - # - a history loader / maintainer - # - a real-time streamer which consumers and sends new data to any - # consumers as well as writes to storage backends (as configured). - - # XXX: neither of these will raise but will cause an inf hang due to: - # https://github.com/python-trio/trio/issues/2258 - # bus.nursery.start_soon( - # await bus.start_task( - ( - izero_hist, - hist_shm, - izero_rt, - rt_shm, - ) = await bus.nursery.start( - manage_history, - mod, - bus, - '.'.join((bfqsn, brokername)), - some_data_ready, - feed_is_live, - ) - - # we hand an IPC-msg compatible shm token to the caller so it - # can read directly from the memory which will be written by - # this task. - msg = init_msg[symbol] - msg['hist_shm_token'] = hist_shm.token - msg['izero_hist'] = izero_hist - msg['izero_rt'] = izero_rt - msg['rt_shm_token'] = rt_shm.token - - # true fqsn - fqsn = '.'.join((bfqsn, brokername)) - # add a fqsn entry that includes the ``.`` suffix - # and an entry that includes the broker-specific fqsn (including - # any new suffixes or elements as injected by the backend). - init_msg[fqsn] = msg - init_msg[bfqsn] = msg - - # TODO: pretty sure we don't need this? why not just leave 1s as - # the fastest "sample period" since we'll probably always want that - # for most purposes. - # pass OHLC sample rate in seconds (be sure to use python int type) - # init_msg[symbol]['sample_rate'] = 1 #int(delay_s) - - # yield back control to starting nursery once we receive either - # some history or a real-time quote. - log.info(f'waiting on history to load: {fqsn}') - await some_data_ready.wait() - - # append ``.`` suffix to each quote symbol - acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}' - - generic_first_quotes = { - acceptable_not_fqsn_with_broker_suffix: first_quote, - fqsn: first_quote, - } - - # for ambiguous names we simply apply the retreived - # feed to that name (for now). - bus.feeds[symbol] = bus.feeds[bfqsn] = ( - init_msg, - generic_first_quotes, - ) - - # insert 1s ohlc into the increment buffer set - # to update and shift every second - sampler.ohlcv_shms.setdefault( - 1, - [] - ).append(rt_shm) - - task_status.started() - - if not start_stream: - await trio.sleep_forever() - - # begin real-time updates of shm and tsb once the feed goes live and - # the backend will indicate when real-time quotes have begun. - await feed_is_live.wait() - - # insert 1m ohlc into the increment buffer set - # to shift every 60s. - sampler.ohlcv_shms.setdefault(60, []).append(hist_shm) - - # create buffer a single incrementer task broker backend - # (aka `brokerd`) using the lowest sampler period. - if sampler.incrementers.get(_default_delay_s) is None: - await bus.start_task( - increment_ohlc_buffer, - _default_delay_s, - ) - - sum_tick_vlm: bool = init_msg.get( - 'shm_write_opts', {} - ).get('sum_tick_vlm', True) - - # NOTE: if no high-freq sampled data has (yet) been loaded, - # seed the buffer with a history datum - this is most handy - # for many backends which don't sample @ 1s OHLC but do have - # slower data such as 1m OHLC. - if not len(rt_shm.array): - rt_shm.push(hist_shm.array[-3:-1]) - ohlckeys = ['open', 'high', 'low', 'close'] - rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1] - rt_shm.array['volume'][-2] = 0 - - # start sample loop and shm incrementer task for OHLC style sampling - # at the above registered step periods. - try: - await sample_and_broadcast( - bus, - rt_shm, - hist_shm, - quote_stream, - brokername, - sum_tick_vlm - ) - finally: - log.warning(f'{fqsn} feed task terminated') - - -@tractor.context -async def open_feed_bus( - - ctx: tractor.Context, - brokername: str, - symbol: str, # normally expected to the broker-specific fqsn - loglevel: str, - tick_throttle: Optional[float] = None, - start_stream: bool = True, - -) -> None: - ''' - Open a data feed "bus": an actor-persistent per-broker task-oriented - data feed registry which allows managing real-time quote streams per - symbol. - - ''' - if loglevel is None: - loglevel = tractor.current_actor().loglevel - - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) - - # local state sanity checks - # TODO: check for any stale shm entries for this symbol - # (after we also group them in a nice `/dev/shm/piker/` subdir). - # ensure we are who we think we are - servicename = tractor.current_actor().name - assert 'brokerd' in servicename - assert brokername in servicename - - bus = get_feed_bus(brokername) - - # if no cached feed for this symbol has been created for this - # brokerd yet, start persistent stream and shm writer task in - # service nursery - entry = bus.feeds.get(symbol) - if entry is None: - # allocate a new actor-local stream bus which - # will persist for this `brokerd`'s service lifetime. - async with bus.task_lock: - await bus.nursery.start( - partial( - allocate_persistent_feed, - - bus=bus, - brokername=brokername, - # here we pass through the selected symbol in native - # "format" (i.e. upper vs. lowercase depending on - # provider). - symbol=symbol, - loglevel=loglevel, - start_stream=start_stream, - ) - ) - # TODO: we can remove this? - assert isinstance(bus.feeds[symbol], tuple) - - # XXX: ``first_quotes`` may be outdated here if this is secondary - # subscriber - init_msg, first_quotes = bus.feeds[symbol] - - msg = init_msg[symbol] - bfqsn = msg['fqsn'].lower() - - # true fqsn - fqsn = '.'.join([bfqsn, brokername]) - assert fqsn in first_quotes - assert bus.feeds[bfqsn] - - # broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib) - bsym = symbol + f'.{brokername}' - assert bsym in first_quotes - - # we use the broker-specific fqsn (bfqsn) for - # the sampler subscription since the backend isn't (yet) - # expected to append it's own name to the fqsn, so we filter - # on keys which *do not* include that name (e.g .ib) . - bus._subscribers.setdefault(bfqsn, []) - - # send this even to subscribers to existing feed? - # deliver initial info message a first quote asap - await ctx.started(( - init_msg, - first_quotes, - )) - - if not start_stream: - log.warning(f'Not opening real-time stream for {fqsn}') - await trio.sleep_forever() - - # real-time stream loop - async with ( - ctx.open_stream() as stream, - ): - # re-send to trigger display loop cycle (necessary especially - # when the mkt is closed and no real-time messages are - # expected). - await stream.send({fqsn: first_quotes}) - - # open a bg task which receives quotes over a mem chan - # and only pushes them to the target actor-consumer at - # a max ``tick_throttle`` instantaneous rate. - if tick_throttle: - send, recv = trio.open_memory_channel(2**10) - cs = await bus.start_task( - uniform_rate_send, - tick_throttle, - recv, - stream, - ) - sub = (send, ctx, tick_throttle) - - else: - sub = (stream, ctx, tick_throttle) - - subs = bus._subscribers[bfqsn] - subs.append(sub) - - try: - uid = ctx.chan.uid - - # ctrl protocol for start/stop of quote streams based on UI - # state (eg. don't need a stream when a symbol isn't being - # displayed). - async for msg in stream: - - if msg == 'pause': - if sub in subs: - log.info( - f'Pausing {fqsn} feed for {uid}') - subs.remove(sub) - - elif msg == 'resume': - if sub not in subs: - log.info( - f'Resuming {fqsn} feed for {uid}') - subs.append(sub) - else: - raise ValueError(msg) - finally: - log.info( - f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}') - - if tick_throttle: - # TODO: a one-cancels-one nursery - # n.cancel_scope.cancel() - cs.cancel() - try: - bus._subscribers[bfqsn].remove(sub) - except ValueError: - log.warning(f'{sub} for {symbol} was already removed?') - - -@dataclass -class Feed: - ''' - A data feed for client-side interaction with far-process real-time - data sources. - - This is an thin abstraction on top of ``tractor``'s portals for - interacting with IPC streams and storage APIs (shm and time-series - db). - - ''' - name: str - hist_shm: ShmArray - rt_shm: ShmArray - mod: ModuleType - first_quotes: dict # symbol names to first quote dicts - _portal: tractor.Portal - stream: trio.abc.ReceiveChannel[dict[str, Any]] - status: dict[str, Any] + # private shm refs loaded dynamically from tokens + _hist_shm: ShmArray | None = None + _rt_shm: ShmArray | None = None + stream: tractor.MsgStream | None = None izero_hist: int = 0 izero_rt: int = 0 - - throttle_rate: Optional[int] = None - - _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None - _max_sample_rate: int = 1 - - # cache of symbol info messages received as first message when - # a stream startsc. - symbols: dict[str, Symbol] = field(default_factory=dict) + throttle_rate: int | None = None @property - def portal(self) -> tractor.Portal: - return self._portal + def rt_shm(self) -> ShmArray: + + if self._rt_shm is None: + self._rt_shm = attach_shm_array( + token=self._rt_shm_token, + readonly=True, + ) + + return self._rt_shm + + @property + def hist_shm(self) -> ShmArray: + + if self._hist_shm is None: + self._hist_shm = attach_shm_array( + token=self._hist_shm_token, + readonly=True, + ) + + return self._hist_shm async def receive(self) -> dict: return await self.stream.receive() @@ -1267,6 +960,489 @@ class Feed: ratio, ) + # TODO: get native msgspec decoding for these workinn + def to_msg(self) -> dict: + msg = self.to_dict() + msg['symbol'] = msg['symbol'].to_dict() + # can't serialize the stream object, it's + # expected you'll have a ref to it since + # this msg should be rxed on a stream on + # whatever far end IPC.. + msg.pop('stream') + return msg + + @classmethod + def from_msg(cls, msg: dict) -> dict: + symbol = Symbol(**msg.pop('symbol')) + return cls( + symbol=symbol, + **msg, + ) + + +async def allocate_persistent_feed( + bus: _FeedsBus, + + brokername: str, + symstr: str, + + loglevel: str, + start_stream: bool = True, + + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Create and maintain a "feed bus" which allocates tasks for real-time + streaming and optional historical data storage per broker/data provider + backend; this normally task runs *in* a `brokerd` actor. + + If none exists, this allocates a ``_FeedsBus`` which manages the + lifetimes of streaming tasks created for each requested symbol. + + + 2 tasks are created: + - a real-time streaming task which connec + + ''' + # load backend module + try: + mod = get_brokermod(brokername) + except ImportError: + mod = get_ingestormod(brokername) + + # mem chan handed to broker backend so it can push real-time + # quotes to this task for sampling and history storage (see below). + send, quote_stream = trio.open_memory_channel(616) + + # data sync signals for both history loading and market quotes + some_data_ready = trio.Event() + feed_is_live = trio.Event() + + # establish broker backend quote stream by calling + # ``stream_quotes()``, which is a required broker backend endpoint. + init_msg, first_quote = await bus.nursery.start( + partial( + mod.stream_quotes, + send_chan=send, + feed_is_live=feed_is_live, + symbols=[symstr], + loglevel=loglevel, + ) + ) + # TODO: this is indexed by symbol for now since we've planned (for + # some time) to expect backends to handle single + # ``.stream_quotes()`` calls with multiple symbols inputs to just + # work such that a backend can do its own multiplexing if desired. + # + # Likely this will require some design changes: + # - the .started() should return some config output determining + # whether the backend does indeed multiplex multi-symbol quotes + # internally or whether separate task spawns should be done per + # symbol (as it is right now). + # - information about discovery of non-local host daemons which can + # be contacted in the case where we want to support load disti + # over multi-use clusters; eg. some new feed request is + # re-directed to another daemon cluster because the current one is + # at max capacity. + # - the same ideas ^ but when a local core is maxxed out (like how + # binance does often with hft XD + # - if a brokerd is non-local then we can't just allocate a mem + # channel here and have the brokerd write it, we instead need + # a small streaming machine around the remote feed which can then + # do the normal work of sampling and writing shm buffers + # (depending on if we want sampling done on the far end or not?) + msg = init_msg[symstr] + + # the broker-specific fully qualified symbol name, + # but ensure it is lower-cased for external use. + bfqsn = msg['fqsn'].lower() + + # true fqsn including broker/provider suffix + fqsn = '.'.join((bfqsn, brokername)) + # msg['fqsn'] = bfqsn + + symbol = Symbol.from_fqsn( + fqsn=fqsn, + info=msg, + ) + + # HISTORY storage, run 2 tasks: + # - a history loader / maintainer + # - a real-time streamer which consumers and sends new data to any + # consumers as well as writes to storage backends (as configured). + + # XXX: neither of these will raise but will cause an inf hang due to: + # https://github.com/python-trio/trio/issues/2258 + # bus.nursery.start_soon( + # await bus.start_task( + ( + izero_hist, + hist_shm, + izero_rt, + rt_shm, + ) = await bus.nursery.start( + manage_history, + mod, + bus, + fqsn, + some_data_ready, + feed_is_live, + ) + + # we hand an IPC-msg compatible shm token to the caller so it + # can read directly from the memory which will be written by + # this task. + + # msg['hist_shm_token'] = hist_shm.token + # msg['izero_hist'] = izero_hist + # msg['izero_rt'] = izero_rt + # msg['rt_shm_token'] = rt_shm.token + + # add a fqsn entry that includes the ``.`` suffix + # and an entry that includes the broker-specific fqsn (including + # any new suffixes or elements as injected by the backend). + # init_msg[fqsn] = msg + # init_msg[bfqsn] = msg + + # TODO: pretty sure we don't need this? why not just leave 1s as + # the fastest "sample period" since we'll probably always want that + # for most purposes. + # pass OHLC sample rate in seconds (be sure to use python int type) + # init_msg[symbol]['sample_rate'] = 1 #int(delay_s) + + # yield back control to starting nursery once we receive either + # some history or a real-time quote. + log.info(f'waiting on history to load: {fqsn}') + await some_data_ready.wait() + + # append ``.`` suffix to each quote symbol + # acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}' + + # generic_first_quotes = { + # acceptable_not_fqsn_with_broker_suffix: first_quote, + # fqsn: first_quote, + # } + + flume = Flume( + symbol=symbol, + _hist_shm_token=hist_shm.token, + _rt_shm_token=rt_shm.token, + first_quote=first_quote, + # stream=stream, + izero_hist=izero_hist, + izero_rt=izero_rt, + # throttle_rate=tick_throttle, + ) + + # for ambiguous names we simply apply the retreived + # feed to that name (for now). + bus.feeds[symstr] = bus.feeds[bfqsn] = flume + # init_msg, + # generic_first_quotes, + # ) + + # insert 1s ohlc into the increment buffer set + # to update and shift every second + sampler.ohlcv_shms.setdefault( + 1, + [] + ).append(rt_shm) + + task_status.started() + + if not start_stream: + await trio.sleep_forever() + + # begin real-time updates of shm and tsb once the feed goes live and + # the backend will indicate when real-time quotes have begun. + await feed_is_live.wait() + + # insert 1m ohlc into the increment buffer set + # to shift every 60s. + sampler.ohlcv_shms.setdefault(60, []).append(hist_shm) + + # create buffer a single incrementer task broker backend + # (aka `brokerd`) using the lowest sampler period. + if sampler.incrementers.get(_default_delay_s) is None: + await bus.start_task( + increment_ohlc_buffer, + _default_delay_s, + ) + + sum_tick_vlm: bool = init_msg.get( + 'shm_write_opts', {} + ).get('sum_tick_vlm', True) + + # NOTE: if no high-freq sampled data has (yet) been loaded, + # seed the buffer with a history datum - this is most handy + # for many backends which don't sample @ 1s OHLC but do have + # slower data such as 1m OHLC. + if not len(rt_shm.array): + rt_shm.push(hist_shm.array[-3:-1]) + ohlckeys = ['open', 'high', 'low', 'close'] + rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1] + rt_shm.array['volume'][-2] = 0 + + # start sample loop and shm incrementer task for OHLC style sampling + # at the above registered step periods. + try: + await sample_and_broadcast( + bus, + rt_shm, + hist_shm, + quote_stream, + brokername, + sum_tick_vlm + ) + finally: + log.warning(f'{fqsn} feed task terminated') + + +@tractor.context +async def open_feed_bus( + + ctx: tractor.Context, + brokername: str, + symbols: list[str], # normally expected to the broker-specific fqsn + + loglevel: str = 'error', + tick_throttle: Optional[float] = None, + start_stream: bool = True, + +) -> dict[ + str, # fqsn + tuple[dict, dict] # pair of dicts of the initmsg and first quotes +]: + ''' + Open a data feed "bus": an actor-persistent per-broker task-oriented + data feed registry which allows managing real-time quote streams per + symbol. + + ''' + if loglevel is None: + loglevel = tractor.current_actor().loglevel + + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + # local state sanity checks + # TODO: check for any stale shm entries for this symbol + # (after we also group them in a nice `/dev/shm/piker/` subdir). + # ensure we are who we think we are + servicename = tractor.current_actor().name + assert 'brokerd' in servicename + assert brokername in servicename + + bus = get_feed_bus(brokername) + + flumes: dict[str, Flume] = {} + for symbol in symbols: + # if no cached feed for this symbol has been created for this + # brokerd yet, start persistent stream and shm writer task in + # service nursery + entry = bus.feeds.get(symbol) + if entry is None: + # allocate a new actor-local stream bus which + # will persist for this `brokerd`'s service lifetime. + async with bus.task_lock: + await bus.nursery.start( + partial( + allocate_persistent_feed, + + bus=bus, + brokername=brokername, + # here we pass through the selected symbol in native + # "format" (i.e. upper vs. lowercase depending on + # provider). + symstr=symbol, + loglevel=loglevel, + start_stream=start_stream, + ) + ) + # TODO: we can remove this? + # assert isinstance(bus.feeds[symbol], tuple) + + # XXX: ``first_quotes`` may be outdated here if this is secondary + # subscriber + # init_msg, first_quotes = bus.feeds[symbol] + flume = bus.feeds[symbol] + # assert bus.feeds[bfqsn] is flume + + # msg = init_msg[symbol] + # bfqsn = msg['fqsn'].lower() + bfqsn = flume.symbol.key + + # true fqsn + fqsn = '.'.join([bfqsn, brokername]) + assert fqsn == flume.symbol.fqsn + # assert fqsn in first_quotes + + # broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib) + # bsym = symbol + f'.{brokername}' + # assert bsym in first_quotes + + # pack for ``.started()`` sync msg + flumes[fqsn] = flume + + # we use the broker-specific fqsn (bfqsn) for + # the sampler subscription since the backend isn't (yet) + # expected to append it's own name to the fqsn, so we filter + # on keys which *do not* include that name (e.g .ib) . + bus._subscribers.setdefault(bfqsn, []) + + # send this even to subscribers to existing feed? + # deliver initial info message a first quote asap + await ctx.started(flumes) + # init_msg, + # first_quotes, + # )) + + if not start_stream: + log.warning(f'Not opening real-time stream for {fqsn}') + await trio.sleep_forever() + + # real-time stream loop + async with ( + ctx.open_stream() as stream, + ): + + local_subs: list = [] + for fqsn, flume in flumes.items(): + # re-send to trigger display loop cycle (necessary especially + # when the mkt is closed and no real-time messages are + # expected). + await stream.send({fqsn: flume.first_quote}) + + # set a common msg stream for all requested symbols + flume.stream = stream + + # Add a real-time quote subscription to feed bus: + # This ``sub`` subscriber entry is added to the feed bus set so + # that the ``sample_and_broadcast()`` task (spawned inside + # ``allocate_persistent_feed()``) will push real-time quote + # (ticks) to this new consumer. + + if tick_throttle: + flume.throttle_rate = tick_throttle + + # open a bg task which receives quotes over a mem chan + # and only pushes them to the target actor-consumer at + # a max ``tick_throttle`` instantaneous rate. + send, recv = trio.open_memory_channel(2**10) + + cs = await bus.start_task( + uniform_rate_send, + tick_throttle, + recv, + stream, + ) + # NOTE: so the ``send`` channel here is actually a swapped + # in trio mem chan which gets pushed by the normal sampler + # task but instead of being sent directly over the IPC msg + # stream it's the throttle task does the work of + # incrementally forwarding to the IPC stream at the throttle + # rate. + sub = (send, ctx, tick_throttle) + + else: + sub = (stream, ctx, tick_throttle) + + # TODO: add an api for this on the bus? + # maybe use the current task-id to key the sub list that's + # added / removed? Or maybe we can add a general + # pause-resume by sub-key api? + bus_subs = bus._subscribers[bfqsn] + bus_subs.append(sub) + local_subs.append(sub) + + try: + uid = ctx.chan.uid + + # ctrl protocol for start/stop of quote streams based on UI + # state (eg. don't need a stream when a symbol isn't being + # displayed). + async for msg in stream: + + if msg == 'pause': + for sub in local_subs: + if sub in bus_subs: + log.info( + f'Pausing {fqsn} feed for {uid}') + bus_subs.remove(sub) + + elif msg == 'resume': + for sub in local_subs: + if sub not in bus_subs: + log.info( + f'Resuming {fqsn} feed for {uid}') + bus_subs.append(sub) + else: + raise ValueError(msg) + finally: + log.info( + f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}') + + if tick_throttle: + # TODO: a one-cancels-one nursery + # n.cancel_scope.cancel() + cs.cancel() + + # drop all subs for this task from the bus + for sub in local_subs: + try: + bus._subscribers[bfqsn].remove(sub) + except ValueError: + log.warning(f'{sub} for {symbol} was already removed?') + + +# @dataclass +class Feed(Struct): + ''' + A per-provider API for client-side consumption from real-time data + (streaming) sources, normally brokers and data services. + + This is a somewhat thin abstraction on top of + a ``tractor.MsgStream`` plus associate share memory buffers which + can be read in a readers-writer-lock style IPC configuration. + + Furhter, there is direct access to slower sampled historical data through + similarly allocated shm arrays. + + ''' + # name: str + # hist_shm: ShmArray + # rt_shm: ShmArray + mod: ModuleType + _portal: tractor.Portal + # symbol names to first quote dicts + # shms: dict[str, tuple[ShmArray, Shmarray]] + flumes: dict[str, Flume] = {} + # first_quotes: dict[str, dict] = {} + streams: dict[ + str, + trio.abc.ReceiveChannel[dict[str, Any]], + ] = {} + status: dict[str, Any] + + # izero_hist: int = 0 + # izero_rt: int = 0 + # throttle_rate: Optional[int] = None + + _max_sample_rate: int = 1 + + # cache of symbol info messages received as first message when + # a stream startsc. + # symbols: dict[str, Symbol] = {} + + @property + def portal(self) -> tractor.Portal: + return self._portal + + @property + def name(self) -> str: + return self.mod.name + @acm async def install_brokerd_search( @@ -1320,118 +1496,172 @@ async def open_feed( Open a "data feed" which provides streamed real-time quotes. ''' - fqsn = fqsns[0].lower() + # fqsn = fqsns[0].lower() - brokername, key, suffix = unpack_fqsn(fqsn) - bfqsn = fqsn.replace('.' + brokername, '') + providers: dict[ModuleType, list[str]] = {} - try: - mod = get_brokermod(brokername) - except ImportError: - mod = get_ingestormod(brokername) - - # no feed for broker exists so maybe spawn a data brokerd - async with ( - - # if no `brokerd` for this backend exists yet we spawn - # and actor for one. - maybe_spawn_brokerd( - brokername, - loglevel=loglevel - ) as portal, - - # (allocate and) connect to any feed bus for this broker - portal.open_context( - open_feed_bus, - brokername=brokername, - symbol=bfqsn, - loglevel=loglevel, - start_stream=start_stream, - tick_throttle=tick_throttle, - - ) as (ctx, (init_msg, first_quotes)), - - ctx.open_stream( - # XXX: be explicit about stream backpressure since we should - # **never** overrun on feeds being too fast, which will - # pretty much always happen with HFT XD - backpressure=backpressure, - ) as stream, - - ): - init = init_msg[bfqsn] - # we can only read from shm - hist_shm = attach_shm_array( - token=init['hist_shm_token'], - readonly=True, - ) - rt_shm = attach_shm_array( - token=init['rt_shm_token'], - readonly=True, - ) - - assert fqsn in first_quotes - - feed = Feed( - name=brokername, - hist_shm=hist_shm, - rt_shm=rt_shm, - mod=mod, - first_quotes=first_quotes, - stream=stream, - _portal=portal, - status={}, - izero_hist=init['izero_hist'], - izero_rt=init['izero_rt'], - throttle_rate=tick_throttle, - ) - - # fill out "status info" that the UI can show - host, port = feed.portal.channel.raddr - if host == '127.0.0.1': - host = 'localhost' - - feed.status.update({ - 'actor_name': feed.portal.channel.uid[0], - 'host': host, - 'port': port, - 'shm': f'{humanize(feed.hist_shm._shm.size)}', - 'throttle_rate': feed.throttle_rate, - }) - feed.status.update(init_msg.pop('status', {})) - - for sym, data in init_msg.items(): - si = data['symbol_info'] - fqsn = data['fqsn'] + f'.{brokername}' - symbol = Symbol.from_fqsn( - fqsn, - info=si, - ) - - # symbol.broker_info[brokername] = si - feed.symbols[fqsn] = symbol - feed.symbols[f'{sym}.{brokername}'] = symbol - - # cast shm dtype to list... can't member why we need this - for shm_key, shm in [ - ('rt_shm_token', rt_shm), - ('hist_shm_token', hist_shm), - ]: - shm_token = data[shm_key] - - # XXX: msgspec won't relay through the tuples XD - shm_token['dtype_descr'] = tuple( - map(tuple, shm_token['dtype_descr'])) - - assert shm_token == shm.token # sanity - - feed._max_sample_rate = 1 + for fqsn in fqsns: + brokername, key, suffix = unpack_fqsn(fqsn) + bfqsn = fqsn.replace('.' + brokername, '') try: - yield feed - finally: - # drop the infinite stream connection - await ctx.cancel() + mod = get_brokermod(brokername) + except ImportError: + mod = get_ingestormod(brokername) + + # built a per-provider map to instrument names + providers.setdefault(mod, []).append(bfqsn) + + # one actor per brokerd for now + brokerd_ctxs = [] + + for brokermod, bfqsns in providers.items(): + + # if no `brokerd` for this backend exists yet we spawn + # a daemon actor for it. + brokerd_ctxs.append( + maybe_spawn_brokerd( + brokermod.name, + loglevel=loglevel + ) + ) + + portals: tuple[tractor.Portal] + async with gather_contexts( + brokerd_ctxs, + ) as portals: + + bus_ctxs = [] + for ( + portal, + (brokermod, bfqsns), + ) in zip(portals, providers.items()): + + feed = Feed( + mod=brokermod, + _portal=portal, + status={}, + ) + # fill out "status info" that the UI can show + host, port = feed.portal.channel.raddr + if host == '127.0.0.1': + host = 'localhost' + + feed.status.update({ + 'actor_name': feed.portal.channel.uid[0], + 'host': host, + 'port': port, + # 'shm': f'{humanize(feed.hist_shm._shm.size)}', + # 'throttle_rate': feed.throttle_rate, + }) + # feed.status.update(init_msg.pop('status', {})) + + # (allocate and) connect to any feed bus for this broker + bus_ctxs.append( + portal.open_context( + open_feed_bus, + brokername=brokername, + symbols=bfqsns, + loglevel=loglevel, + start_stream=start_stream, + tick_throttle=tick_throttle, + ) + ) + + async with ( + gather_contexts(bus_ctxs) as ctxs, + ): + for ( + (ctx, flumes_msg_dict), + (brokermod, bfqsns), + ) in zip(ctxs, providers.items()): + + stream_ctxs = [] + for fqsn, flume_msg in flumes_msg_dict.items(): + flume = Flume.from_msg(flume_msg) + assert flume.symbol.fqsn == fqsn + feed.flumes[fqsn] = flume + + # TODO: this is ugly but eventually we could + # in theory do all this "tabling" of flumes on + # the brokerd-side, in which case we'll likely + # want to make each flume IPC-msg-native? + # bfqsn = list(init_msgs)[0] + # init = init_msg[bfqsn] + + # si = data['symbol_info'] + # fqsn = data['fqsn'] + f'.{brokername}' + # symbol = Symbol.from_fqsn( + # fqsn, + # info=si, + # ) + + # attach and cache shm handles + rt_shm = flume.rt_shm + assert rt_shm + hist_shm = flume.hist_shm + assert hist_shm + + stream_ctxs.append( + ctx.open_stream( + # XXX: be explicit about stream backpressure since we should + # **never** overrun on feeds being too fast, which will + # pretty much always happen with HFT XD + backpressure=backpressure, + ) + ) + + async with ( + gather_contexts(stream_ctxs) as streams, + ): + for ( + stream, + (brokermod, bfqsns), + ) in zip(streams, providers.items()): + + for bfqsn in bfqsns: + fqsn = '.'.join((bfqsn, brokermod.name)) + + # apply common rt steam to each flume + # (normally one per broker) + feed.flumes[fqsn].stream = stream + feed.streams[brokermod.name] = stream + + try: + yield feed + finally: + # drop the infinite stream connection + await ctx.cancel() + + # we can only read from shm + # hist_shm = attach_shm_array( + # token=init['hist_shm_token'], + # readonly=True, + # ) + # rt_shm = attach_shm_array( + # token=init['rt_shm_token'], + # readonly=True, + # ) + + # for sym, data in init_msg.items(): + + # symbol.broker_info[brokername] = si + # feed.symbols[fqsn] = symbol + # feed.symbols[f'{sym}.{brokername}'] = symbol + + # cast shm dtype to list... can't member why we need this + # for shm_key, shm in [ + # ('rt_shm_token', rt_shm), + # ('hist_shm_token', hist_shm), + # ]: + # shm_token = flume[shm_key] + + # # XXX: msgspec won't relay through the tuples XD + # shm_token['dtype_descr'] = tuple( + # map(tuple, shm_token['dtype_descr'])) + + # assert shm_token == shm.token # sanity + # assert fqsn in first_quotes @acm diff --git a/tests/test_feeds.py b/tests/test_feeds.py index 3c1104a9..6b91838c 100644 --- a/tests/test_feeds.py +++ b/tests/test_feeds.py @@ -5,6 +5,7 @@ Data feed layer APIs, performance, msg throttling. from pprint import pprint import pytest +import tractor import trio from piker import ( open_piker_runtime, @@ -16,7 +17,7 @@ from piker.data import ShmArray @pytest.mark.parametrize( 'fqsns', [ - ['btcusdt.binance'] + ['btcusdt.binance', 'ethusdt.binance'] ], ids=lambda param: f'fqsns={param}', ) @@ -30,7 +31,13 @@ def test_basic_rt_feed( ''' async def main(): async with ( - open_piker_runtime('test_basic_rt_feed'), + open_piker_runtime( + 'test_basic_rt_feed', + # XXX tractor BUG: this doesn't translate through to the + # ``tractor._state._runtimevars``... + registry_addr=('127.0.0.1', 6666), + debug_mode=True, + ), open_feed( fqsns, loglevel='info', @@ -42,24 +49,38 @@ def test_basic_rt_feed( ) as feed ): + # verify shm buffers exist for fqin in fqsns: - assert feed.symbols[fqin] + flume = feed.flumes[fqin] + ohlcv: ShmArray = flume.rt_shm + hist_ohlcv: ShmArray = flume.hist_shm - ohlcv: ShmArray = feed.rt_shm - hist_ohlcv: ShmArray = feed.hist_shm + quote_count: int = 0 + stream = feed.streams['binance'] + async for quotes in stream: + for fqsn, quote in quotes.items(): - count: int = 0 - async for quotes in feed.stream: + # await tractor.breakpoint() + flume = feed.flumes[fqsn] + ohlcv: ShmArray = flume.rt_shm + hist_ohlcv: ShmArray = flume.hist_shm - # print quote msg, rt and history - # buffer values on console. - pprint(quotes) - pprint(ohlcv.array[-1]) - pprint(hist_ohlcv.array[-1]) + # print quote msg, rt and history + # buffer values on console. + rt_row = ohlcv.array[-1] + hist_row = hist_ohlcv.array[-1] + # last = quote['last'] - if count >= 100: + # assert last == rt_row['close'] + # assert last == hist_row['close'] + pprint( + f'{fqsn}: {quote}\n' + f'rt_ohlc: {rt_row}\n' + f'hist_ohlc: {hist_row}\n' + ) + quote_count += 1 + + if quote_count >= 100: break - count += 1 - trio.run(main)