From 5c343aa7485d3a7a021e58439edbc4f2ba480c7a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 11 Feb 2022 10:41:47 -0500 Subject: [PATCH 01/24] Misc curve doc strings --- piker/ui/_curve.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/piker/ui/_curve.py b/piker/ui/_curve.py index 7fc43e4e..f10f874c 100644 --- a/piker/ui/_curve.py +++ b/piker/ui/_curve.py @@ -108,7 +108,6 @@ class FastAppendCurve(pg.PlotCurveItem): path redraw. ''' - def __init__( self, *args, @@ -167,7 +166,13 @@ class FastAppendCurve(pg.PlotCurveItem): y: np.ndarray, ) -> QtGui.QPainterPath: + ''' + Update curve from input 2-d data. + Compare with a cached "x-range" state and (pre/a)ppend based on + a length diff. + + ''' profiler = pg.debug.Profiler(disabled=not pg_profile_enabled()) flip_cache = False @@ -316,12 +321,19 @@ class FastAppendCurve(pg.PlotCurveItem): self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) def disable_cache(self) -> None: + ''' + Disable the use of the pixel coordinate cache and trigger a geo event. + + ''' # XXX: pretty annoying but, without this there's little # artefacts on the append updates to the curve... self.setCacheMode(QtWidgets.QGraphicsItem.NoCache) self.prepareGeometryChange() def boundingRect(self): + ''' + Compute and then cache our rect. + ''' if self.path is None: return QtGui.QPainterPath().boundingRect() else: @@ -331,9 +343,10 @@ class FastAppendCurve(pg.PlotCurveItem): return self._br() def _br(self): - """Post init ``.boundingRect()```. + ''' + Post init ``.boundingRect()```. - """ + ''' hb = self.path.controlPointRect() hb_size = hb.size() # print(f'hb_size: {hb_size}') From a073039b30310546c8dad64db78abc71b0b3e22d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Feb 2022 08:56:10 -0500 Subject: [PATCH 02/24] Drop dependence on `msgpack` and `msgpack_numpy` --- piker/__init__.py | 7 ------- setup.py | 1 - 2 files changed, 8 deletions(-) diff --git a/piker/__init__.py b/piker/__init__.py index c84bfac2..a6437f88 100644 --- a/piker/__init__.py +++ b/piker/__init__.py @@ -18,10 +18,3 @@ piker: trading gear for hackers. """ -import msgpack # noqa - -# TODO: remove this now right? -import msgpack_numpy - -# patch msgpack for numpy arrays -msgpack_numpy.patch() diff --git a/setup.py b/setup.py index 6f8fd898..faaa8dac 100755 --- a/setup.py +++ b/setup.py @@ -66,7 +66,6 @@ setup( 'numpy', 'numba', 'pandas', - 'msgpack-numpy', # UI 'PyQt5', From b1dd24d1f70305bf39fe1420eafc6ad7041fa234 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Feb 2022 12:20:17 -0500 Subject: [PATCH 03/24] Only throttle warn on rate >= display rate --- piker/ui/_display.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 3bfd327a..14e1c2c7 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -131,6 +131,7 @@ async def graphics_update_loop( # of copying it from last bar's close # - 1-5 sec bar lookback-autocorrection like tws does? # (would require a background history checker task) + display_rate = linked.godwidget.window.current_screen().refreshRate() chart = linked.chart @@ -215,7 +216,8 @@ async def graphics_update_loop( # in the absolute worst case we shouldn't see more then # twice the expected throttle rate right!? - and quote_rate >= _quote_throttle_rate * 1.5 + # and quote_rate >= _quote_throttle_rate * 2 + and quote_rate >= display_rate ): log.warning(f'High quote rate {symbol.key}: {quote_rate}') From 7252094f9080aa3a474e57b901f92cacd2b11e50 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Feb 2022 12:13:16 -0500 Subject: [PATCH 04/24] Add `open_piker_runtime()` to setup actor runtime correctly from non-daemons --- piker/_daemon.py | 56 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 77462d35..2464684b 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -34,9 +34,11 @@ from .brokers import get_brokermod log = get_logger(__name__) _root_dname = 'pikerd' + +_registry_addr = ('127.0.0.1', 6116) _tractor_kwargs: dict[str, Any] = { # use a different registry addr then tractor's default - 'arbiter_addr': ('127.0.0.1', 6116), + 'arbiter_addr': _registry_addr } _root_modules = [ __name__, @@ -150,7 +152,7 @@ async def open_pikerd( tractor.open_root_actor( # passed through to ``open_root_actor`` - arbiter_addr=_tractor_kwargs['arbiter_addr'], + arbiter_addr=_registry_addr, name=_root_dname, loglevel=loglevel, debug_mode=debug_mode, @@ -179,6 +181,47 @@ async def open_pikerd( yield _services +@asynccontextmanager +async def open_piker_runtime( + name: str, + enable_modules: list[str] = [], + start_method: str = 'trio', + loglevel: Optional[str] = None, + + # XXX: you should pretty much never want debug mode + # for data daemons when running in production. + debug_mode: bool = False, + +) -> Optional[tractor._portal.Portal]: + ''' + Start a piker actor who's runtime will automatically + sync with existing piker actors in local network + based on configuration. + + ''' + global _services + assert _services is None + + # XXX: this may open a root actor as well + async with ( + tractor.open_root_actor( + + # passed through to ``open_root_actor`` + arbiter_addr=_registry_addr, + name=name, + loglevel=loglevel, + debug_mode=debug_mode, + start_method=start_method, + + # TODO: eventually we should be able to avoid + # having the root have more then permissions to + # spawn other specialized daemons I think? + enable_modules=_root_modules, + ) as _, + ): + yield tractor.current_actor() + + @asynccontextmanager async def maybe_open_runtime( loglevel: Optional[str] = None, @@ -283,13 +326,20 @@ async def maybe_spawn_daemon( lock = Brokerd.locks[service_name] await lock.acquire() + log.info(f'Scanning for existing {service_name}') # attach to existing daemon by name if possible - async with tractor.find_actor(service_name) as portal: + async with tractor.find_actor( + service_name, + arbiter_sockaddr=_registry_addr, + + ) as portal: if portal is not None: lock.release() yield portal return + log.warning(f"Couldn't find any existing {service_name}") + # ask root ``pikerd`` daemon to spawn the daemon we need if # pikerd is not live we now become the root of the # process tree From c2a13c474cd8c934adaa2b5e62f8e95e954e528b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Feb 2022 12:16:07 -0500 Subject: [PATCH 05/24] Support no realtime stream sending with feed bus --- piker/data/feed.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 55f8b9b9..75a37545 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -257,6 +257,7 @@ async def open_feed_bus( symbol: str, loglevel: str, tick_throttle: Optional[float] = None, + start_stream: bool = True, ) -> None: @@ -305,6 +306,9 @@ async def open_feed_bus( # deliver initial info message a first quote asap await ctx.started((init_msg, first_quotes)) + if not start_stream: + await trio.sleep_forever() + async with ( ctx.open_stream() as stream, trio.open_nursery() as n, @@ -490,6 +494,8 @@ async def open_feed( symbols: Sequence[str], loglevel: Optional[str] = None, + backpressure: bool = True, + start_stream: bool = True, tick_throttle: Optional[float] = None, # Hz ) -> Feed: @@ -518,7 +524,7 @@ async def open_feed( brokername=brokername, symbol=sym, loglevel=loglevel, - + start_stream=start_stream, tick_throttle=tick_throttle, ) as (ctx, (init_msg, first_quotes)), @@ -527,7 +533,7 @@ async def open_feed( # XXX: be explicit about stream backpressure since we should # **never** overrun on feeds being too fast, which will # pretty much always happen with HFT XD - backpressure=True + backpressure=backpressure, ) as stream, ): @@ -607,6 +613,9 @@ async def maybe_open_feed( 'symbols': [sym], 'loglevel': loglevel, 'tick_throttle': kwargs.get('tick_throttle'), + 'backpressure': kwargs.get('backpressure'), + 'backpressure': kwargs.get('backpressure'), + 'start_stream': kwargs.get('start_stream'), }, key=sym, ) as (cache_hit, feed): From 23aa7eb31c500aa479b1a433e871656b3ba1c984 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Feb 2022 12:16:34 -0500 Subject: [PATCH 06/24] Stick time step in window header --- piker/ui/_display.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 14e1c2c7..ecae1307 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -550,7 +550,8 @@ async def display_symbol_data( # load in symbol's ohlc data godwidget.window.setWindowTitle( f'{symbol.key}@{symbol.brokers} ' - f'tick:{symbol.tick_size}' + f'tick:{symbol.tick_size} ' + f'step:1s ' ) linkedsplits = godwidget.linkedsplits From 832e4c97d2b29861ee2125f338353c842e18a1cb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 21 Feb 2022 13:08:04 -0500 Subject: [PATCH 07/24] Drop shm: ShmArray` to `stream_quotes()` endpoint --- piker/brokers/binance.py | 1 - piker/brokers/ib.py | 10 +++++----- piker/brokers/kraken.py | 1 - 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 4d82474b..f4732e54 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -386,7 +386,6 @@ async def stream_quotes( send_chan: trio.abc.SendChannel, symbols: List[str], - shm: ShmArray, feed_is_live: trio.Event, loglevel: str = None, diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 5cf7d2f0..9f1bd49b 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -517,11 +517,11 @@ class Client: contract, ticker, details = await self.get_sym_details(symbol) # ensure a last price gets filled in before we deliver quote - for _ in range(2): + for _ in range(1): if isnan(ticker.last): + await asyncio.sleep(0.1) log.warning(f'Quote for {symbol} timed out: market is closed?') ticker = await ticker.updateEvent - await asyncio.sleep(0.1) else: log.info(f'Got first quote for {symbol}') break @@ -1201,12 +1201,13 @@ async def backfill_bars( task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: - """Fill historical bars into shared mem / storage afap. + ''' + Fill historical bars into shared mem / storage afap. TODO: avoid pacing constraints: https://github.com/pikers/piker/issues/128 - """ + ''' if platform.system() == 'Windows': log.warning( 'Decreasing history query count to 4 since, windows...') @@ -1411,7 +1412,6 @@ async def stream_quotes( send_chan: trio.abc.SendChannel, symbols: list[str], - shm: ShmArray, feed_is_live: trio.Event, loglevel: str = None, diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 24d2dab3..0d899428 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -406,7 +406,6 @@ async def stream_quotes( send_chan: trio.abc.SendChannel, symbols: List[str], - shm: ShmArray, feed_is_live: trio.Event, loglevel: str = None, From 1d3ed6c333dede222a95c48fd904b4d93d56a7a9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 19 Feb 2022 16:09:14 -0500 Subject: [PATCH 08/24] Add `mk_` prefix since assignments will use `fqsn` --- piker/data/_source.py | 22 +++++++++++++++++++++- piker/data/feed.py | 25 +++++++++++++++++++------ 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/piker/data/_source.py b/piker/data/_source.py index 9b9b323d..0677df65 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -59,6 +59,19 @@ tf_in_1m = { } +def mk_fqsn( + provider: str, + symbol: str, + +) -> str: + ''' + Generate a "fully qualified symbol name" which is + a reverse-hierarchical cross broker/provider symbol + + ''' + return '.'.join([symbol, provider]).lower() + + def float_digits( value: float, ) -> int: @@ -118,6 +131,12 @@ class Symbol(BaseModel): self.key, ) + def iterfqsns(self) -> list[str]: + return [ + mk_fqsn(self.key, broker) + for broker in self.broker_info.keys() + ] + @validate_arguments def mk_symbol( @@ -129,7 +148,8 @@ def mk_symbol( broker_info: dict[str, Any] = {}, ) -> Symbol: - '''Create and return an instrument description for the + ''' + Create and return an instrument description for the "symbol" named as ``key``. ''' diff --git a/piker/data/feed.py b/piker/data/feed.py index 75a37545..b28c958e 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -47,7 +47,12 @@ from ._sharedmem import ( ShmArray, ) from .ingest import get_ingestormod -from ._source import base_iohlc_dtype, mk_symbol, Symbol +from ._source import ( + base_iohlc_dtype, + mk_symbol, + Symbol, + mk_fqsn, +) from ..ui import _search from ._sampling import ( _shms, @@ -276,11 +281,20 @@ async def open_feed_bus( bus._subscribers.setdefault(symbol, []) + fs = mk_fqsn(symbol, brokername) + # if no cached feed for this symbol has been created for this # brokerd yet, start persistent stream and shm writer task in # service nursery async with bus.task_lock: if entry is None: + + if not start_stream: + raise RuntimeError( + f'No stream feed exists for {fs}?\n' + f'You may need a `brokerd` started first.' + ) + init_msg, first_quotes = await bus.nursery.start( partial( allocate_persistent_feed, @@ -307,6 +321,7 @@ async def open_feed_bus( await ctx.started((init_msg, first_quotes)) if not start_stream: + log.warning(f'Not opening real-time stream for {fs}') await trio.sleep_forever() async with ( @@ -337,20 +352,19 @@ async def open_feed_bus( try: uid = ctx.chan.uid - fqsn = f'{symbol}.{brokername}' async for msg in stream: if msg == 'pause': if sub in subs: log.info( - f'Pausing {fqsn} feed for {uid}') + f'Pausing {fs} feed for {uid}') subs.remove(sub) elif msg == 'resume': if sub not in subs: log.info( - f'Resuming {fqsn} feed for {uid}') + f'Resuming {fs} feed for {uid}') subs.append(sub) else: raise ValueError(msg) @@ -614,8 +628,7 @@ async def maybe_open_feed( 'loglevel': loglevel, 'tick_throttle': kwargs.get('tick_throttle'), 'backpressure': kwargs.get('backpressure'), - 'backpressure': kwargs.get('backpressure'), - 'start_stream': kwargs.get('start_stream'), + 'start_stream': kwargs.get('start_stream', True), }, key=sym, ) as (cache_hit, feed): From bf3b58e861a1072b91d7e29f572372f6817be223 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Feb 2022 14:42:31 -0500 Subject: [PATCH 09/24] Async load data history, allow "offline" feed use Break up real-time quote feed and history loading into 2 separate tasks and deliver a client side `data.Feed` as soon as history is loaded (instead of waiting for a rt quote - the previous logic). If a symbol doesn't have history then likely the feed shouldn't be loaded (since presumably client code will need at least "some" datums history to do anything) and waiting on a real-time quote is dumb, since it'll hang if the market isn't open XD. If a symbol doesn't have history we can always write a zero/null array when we run into that case. This also greatly speeds up feed loading when both history and quotes are available. TL;DR summary: - add a `_Feedsbus.start_task()` one-cancel-scope-per-task method for assisting with (re-)starting and stopping long running persistent feeds (basically a "one cancels one" style nursery API). - add a `manage_history()` task which does all history loading (and eventually real-time writing) which has an independent signal and start it in a separate task. - drop the "sample rate per symbol" stuff since client code doesn't really care when it can just inspect shm indexing/time-steps itself. - run throttle tasks in the bus nursery thus avoiding cancelling the underlying sampler task on feed client disconnects. - don't store a repeated ref the bus nursery's cancel scope.. --- piker/data/feed.py | 299 +++++++++++++++++++++++++++++++-------------- 1 file changed, 210 insertions(+), 89 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index b28c958e..6d4c0689 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -27,6 +27,7 @@ from types import ModuleType from typing import ( Any, Sequence, AsyncIterator, Optional, + Awaitable, ) import trio @@ -72,12 +73,24 @@ class _FeedsBus(BaseModel): Data feeds broadcaster and persistence management. This is a brokerd side api used to manager persistent real-time - streams that can be allocated and left alive indefinitely. + streams that can be allocated and left alive indefinitely. A bus is + associated one-to-one with a particular broker backend where the + "bus" refers so a multi-symbol bus where quotes are interleaved in + time. + + Each "entry" in the bus includes: + - a stream used to push real time quotes (up to tick rates) + which is executed as a lone task that is cancellable via + a dedicated cancel scope. ''' + class Config: + arbitrary_types_allowed = True + underscore_attrs_are_private = False + brokername: str nursery: trio.Nursery - feeds: dict[str, tuple[trio.CancelScope, dict, dict]] = {} + feeds: dict[str, tuple[dict, dict]] = {} task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() @@ -91,14 +104,27 @@ class _FeedsBus(BaseModel): list[tuple[tractor.MsgStream, Optional[float]]] ] = {} - class Config: - arbitrary_types_allowed = True - underscore_attrs_are_private = False + async def start_task( + self, + target: Awaitable, + *args, + ) -> None: - async def cancel_all(self) -> None: - for sym, (cs, msg, quote) in self.feeds.items(): - log.debug(f'Cancelling cached feed for {self.brokername}:{sym}') - cs.cancel() + async def start_with_cs( + task_status: TaskStatus[ + trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> None: + with trio.CancelScope() as cs: + self.nursery.start_soon( + target, + *args, + ) + task_status.started(cs) + + return await self.nursery.start(start_with_cs) + + def cancel_task(self, task: trio.Task) -> bool: + pass _bus: _FeedsBus = None @@ -156,7 +182,78 @@ async def _setup_persistent_brokerd( await trio.sleep_forever() finally: # TODO: this needs to be shielded? - await bus.cancel_all() + bus.nursery.cancel_scope.cancel() + + +async def manage_history( + mod: ModuleType, + shm: ShmArray, + bus: _FeedsBus, + symbol: str, + we_opened_shm: bool, + some_data_ready: trio.Event, + feed_is_live: trio.Event, + +) -> None: + ''' + Load and manage historical data including the loading of any + available series from `marketstore` as well as conducting real-time + update of both that existing db and the allocated shared memory + buffer. + + ''' + # TODO: + # history retreival, see if we can pull from an existing + # ``marketstored`` daemon + # log.info('Scanning for existing `marketstored`') + # from .marketstore import load_history + # arrays = await load_history(symbol) + arrays = {} + + opened = we_opened_shm + # TODO: history validation + # assert opened, f'Persistent shm for {symbol} was already open?!' + # if not opened: + # raise RuntimeError("Persistent shm for sym was already open?!") + + if opened: + if arrays: + # push to shm + # set data ready + # some_data_ready.set() + raise ValueError('this should never execute yet') + + else: + # ask broker backend for new history + + # start history backfill task ``backfill_bars()`` is + # a required backend func this must block until shm is + # filled with first set of ohlc bars + cs = await bus.nursery.start(mod.backfill_bars, symbol, shm) + + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. + some_data_ready.set() + + # detect sample step size for sampled historical data + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + + # begin real-time updates of shm and tsb once the feed + # goes live. + await feed_is_live.wait() + + if opened: + _shms.setdefault(delay_s, []).append(shm) + + # start shm incrementing for OHLC sampling at the current + # detected sampling period if one dne. + if _incrementers.get(delay_s) is None: + cs = await bus.start_task(increment_ohlc_buffer, delay_s) + + await trio.sleep_forever() + cs.cancel() async def allocate_persistent_feed( @@ -168,17 +265,30 @@ async def allocate_persistent_feed( task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: + ''' + Create and maintain a "feed bus" which allocates tasks for real-time + streaming and optional historical data storage per broker/data provider + backend; this normally task runs *in* a `brokerd` actor. + If none exists, this allocates a ``_FeedsBus`` which manages the + lifetimes of streaming tasks created for each requested symbol. + + + 2 tasks are created: + - a real-time streaming task which connec + + ''' try: mod = get_brokermod(brokername) except ImportError: mod = get_ingestormod(brokername) - # allocate shm array for this broker/symbol - # XXX: we should get an error here if one already exists + fqsn = mk_fqsn(brokername, symbol) + # (maybe) allocate shm array for this broker/symbol which will + # be used for fast near-term history capture and processing. shm, opened = maybe_open_shm_array( - key=sym_to_shm_key(brokername, symbol), + key=fqsn, # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), @@ -187,69 +297,73 @@ async def allocate_persistent_feed( readonly=False, ) - # do history validation? - # assert opened, f'Persistent shm for {symbol} was already open?!' - # if not opened: - # raise RuntimeError("Persistent shm for sym was already open?!") - + # mem chan handed to broker backend so it can push real-time + # quotes to this task for sampling and history storage (see below). send, quote_stream = trio.open_memory_channel(10) + + # data sync signals for both history loading and market quotes + some_data_ready = trio.Event() feed_is_live = trio.Event() - # establish broker backend quote stream - # ``stream_quotes()`` is a required backend func + # run 2 tasks: + # - a history loader / maintainer + # - a real-time streamer which consumers and sends new data to any + # consumers as well as writes to storage backends (as configured). + bus.nursery.start_soon( + manage_history, + mod, + shm, + bus, + symbol, + opened, + some_data_ready, + feed_is_live, + ) + + # establish broker backend quote stream by calling + # ``stream_quotes()``, which is a required broker backend endpoint. init_msg, first_quotes = await bus.nursery.start( partial( mod.stream_quotes, send_chan=send, feed_is_live=feed_is_live, symbols=[symbol], - shm=shm, loglevel=loglevel, ) ) + # we hand an IPC-msg compatible shm token to the caller so it + # can read directly from the memory which will be written by + # this task. init_msg[symbol]['shm_token'] = shm.token - cs = bus.nursery.cancel_scope - - # TODO: make this into a composed type which also - # contains the backfiller cs for individual super-based - # resspawns when needed. - - # XXX: the ``symbol`` here is put into our native piker format (i.e. - # lower case). - bus.feeds[symbol.lower()] = (cs, init_msg, first_quotes) - - if opened: - # start history backfill task ``backfill_bars()`` is - # a required backend func this must block until shm is - # filled with first set of ohlc bars - await bus.nursery.start(mod.backfill_bars, symbol, shm) - - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] + # TODO: pretty sure we don't need this? why not just leave 1s as + # the fastest "sample period" since we'll probably always want that + # for most purposes. # pass OHLC sample rate in seconds (be sure to use python int type) - init_msg[symbol]['sample_rate'] = int(delay_s) + # init_msg[symbol]['sample_rate'] = 1 #int(delay_s) - # yield back control to starting nursery + # yield back control to starting nursery once we receive either + # some history or a real-time quote. + await some_data_ready.wait() + bus.feeds[symbol.lower()] = (init_msg, first_quotes) task_status.started((init_msg, first_quotes)) + # backend will indicate when real-time quotes have begun. await feed_is_live.wait() - if opened: - _shms.setdefault(delay_s, []).append(shm) - - # start shm incrementing for OHLC sampling - if _incrementers.get(delay_s) is None: - cs = await bus.nursery.start(increment_ohlc_buffer, delay_s) - sum_tick_vlm: bool = init_msg.get( 'shm_write_opts', {} ).get('sum_tick_vlm', True) # start sample loop try: - await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) + await sample_and_broadcast( + bus, + shm, + quote_stream, + sum_tick_vlm + ) finally: log.warning(f'{symbol}@{brokername} feed task terminated') @@ -265,36 +379,43 @@ async def open_feed_bus( start_stream: bool = True, ) -> None: + ''' + Open a data feed "bus": an actor-persistent per-broker task-oriented + data feed registry which allows managing real-time quote streams per + symbol. + ''' if loglevel is None: loglevel = tractor.current_actor().loglevel # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) + # local state sanity checks + # TODO: check for any stale shm entries for this symbol + # (after we also group them in a nice `/dev/shm/piker/` subdir). # ensure we are who we think we are assert 'brokerd' in tractor.current_actor().name bus = get_feed_bus(brokername) + bus._subscribers.setdefault(symbol, []) + fqsn = mk_fqsn(brokername, symbol) entry = bus.feeds.get(symbol) - bus._subscribers.setdefault(symbol, []) - - fs = mk_fqsn(symbol, brokername) - # if no cached feed for this symbol has been created for this # brokerd yet, start persistent stream and shm writer task in # service nursery - async with bus.task_lock: - if entry is None: - - if not start_stream: - raise RuntimeError( - f'No stream feed exists for {fs}?\n' - f'You may need a `brokerd` started first.' - ) + if entry is None: + if not start_stream: + raise RuntimeError( + f'No stream feed exists for {fqsn}?\n' + f'You may need a `brokerd` started first.' + ) + # allocate a new actor-local stream bus which will persist for + # this `brokerd`. + async with bus.task_lock: init_msg, first_quotes = await bus.nursery.start( partial( allocate_persistent_feed, @@ -310,25 +431,25 @@ async def open_feed_bus( loglevel=loglevel, ) ) + # TODO: we can remove this? assert isinstance(bus.feeds[symbol], tuple) # XXX: ``first_quotes`` may be outdated here if this is secondary # subscriber - cs, init_msg, first_quotes = bus.feeds[symbol] + init_msg, first_quotes = bus.feeds[symbol] # send this even to subscribers to existing feed? # deliver initial info message a first quote asap await ctx.started((init_msg, first_quotes)) if not start_stream: - log.warning(f'Not opening real-time stream for {fs}') + log.warning(f'Not opening real-time stream for {fqsn}') await trio.sleep_forever() + # real-time stream loop async with ( ctx.open_stream() as stream, - trio.open_nursery() as n, ): - if tick_throttle: # open a bg task which receives quotes over a mem chan @@ -336,7 +457,7 @@ async def open_feed_bus( # a max ``tick_throttle`` instantaneous rate. send, recv = trio.open_memory_channel(2**10) - n.start_soon( + cs = await bus.start_task( uniform_rate_send, tick_throttle, recv, @@ -358,21 +479,24 @@ async def open_feed_bus( if msg == 'pause': if sub in subs: log.info( - f'Pausing {fs} feed for {uid}') + f'Pausing {fqsn} feed for {uid}') subs.remove(sub) elif msg == 'resume': if sub not in subs: log.info( - f'Resuming {fs} feed for {uid}') + f'Resuming {fqsn} feed for {uid}') subs.append(sub) else: raise ValueError(msg) finally: log.info( f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}') + if tick_throttle: - n.cancel_scope.cancel() + # TODO: a one-cancels-one nursery + # n.cancel_scope.cancel() + cs.cancel() try: bus._subscribers[symbol].remove(sub) except ValueError: @@ -385,6 +509,7 @@ async def open_sample_step_stream( delay_s: int, ) -> tractor.ReceiveMsgStream: + # XXX: this should be singleton on a host, # a lone broker-daemon per provider should be # created for all practical purposes @@ -407,13 +532,15 @@ async def open_sample_step_stream( @dataclass class Feed: - """A data feed for client-side interaction with far-process# }}} - real-time data sources. + ''' + A data feed for client-side interaction with far-process real-time + data sources. This is an thin abstraction on top of ``tractor``'s portals for - interacting with IPC streams and conducting automatic - memory buffer orchestration. - """ + interacting with IPC streams and storage APIs (shm and time-series + db). + + ''' name: str shm: ShmArray mod: ModuleType @@ -425,7 +552,7 @@ class Feed: throttle_rate: Optional[int] = None _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None - _max_sample_rate: int = 0 + _max_sample_rate: int = 1 # cache of symbol info messages received as first message when # a stream startsc. @@ -460,13 +587,6 @@ class Feed: await self.stream.send('resume') -def sym_to_shm_key( - broker: str, - symbol: str, -) -> str: - return f'{broker}.{symbol}' - - @asynccontextmanager async def install_brokerd_search( @@ -527,13 +647,15 @@ async def open_feed( # no feed for broker exists so maybe spawn a data brokerd async with ( + # if no `brokerd` for this backend exists yet we spawn + # and actor for one. maybe_spawn_brokerd( brokername, loglevel=loglevel ) as portal, + # (allocate and) connect to any feed bus for this broker portal.open_context( - open_feed_bus, brokername=brokername, symbol=sym, @@ -566,12 +688,10 @@ async def open_feed( _portal=portal, throttle_rate=tick_throttle, ) - ohlc_sample_rates = [] for sym, data in init_msg.items(): si = data['symbol_info'] - ohlc_sample_rates.append(data['sample_rate']) symbol = mk_symbol( key=sym, @@ -592,9 +712,8 @@ async def open_feed( assert shm_token == shm.token # sanity - feed._max_sample_rate = max(ohlc_sample_rates) + feed._max_sample_rate = 1 - # yield feed try: yield feed finally: @@ -627,14 +746,16 @@ async def maybe_open_feed( 'symbols': [sym], 'loglevel': loglevel, 'tick_throttle': kwargs.get('tick_throttle'), - 'backpressure': kwargs.get('backpressure'), + + # XXX: super critical to have bool defaults here XD + 'backpressure': kwargs.get('backpressure', True), 'start_stream': kwargs.get('start_stream', True), }, key=sym, ) as (cache_hit, feed): if cache_hit: - print('USING CACHED FEED') + log.info(f'Using cached feed for {brokername}.{sym}') # add a new broadcast subscription for the quote stream # if this feed is likely already in use async with feed.stream.subscribe() as bstream: From 412c9ee6cf13615a36597348034f08299eebc92f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Feb 2022 15:08:41 -0500 Subject: [PATCH 10/24] Support view increment with a steps size --- piker/ui/_chart.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 6048ca42..2a3689a3 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -818,11 +818,18 @@ class ChartPlotWidget(pg.PlotWidget): def default_view( self, index: int = -1, - ) -> None: - """Set the view box to the "default" startup view of the scene. - """ - xlast = self._arrays[self.name][index]['index'] + ) -> None: + ''' + Set the view box to the "default" startup view of the scene. + + ''' + try: + xlast = self._arrays[self.name][index]['index'] + except IndexError: + log.warning(f'array for {self.name} not loaded yet?') + return + begin = xlast - _bars_to_left_in_follow_mode end = xlast + _bars_from_right_in_follow_mode @@ -840,6 +847,8 @@ class ChartPlotWidget(pg.PlotWidget): def increment_view( self, + steps: int = 1, + ) -> None: """ Increment the data view one step to the right thus "following" @@ -848,8 +857,8 @@ class ChartPlotWidget(pg.PlotWidget): """ l, r = self.view_range() self.view.setXRange( - min=l + 1, - max=r + 1, + min=l + steps, + max=r + steps, # TODO: holy shit, wtf dude... why tf would this not be 0 by # default... speechless. @@ -858,7 +867,6 @@ class ChartPlotWidget(pg.PlotWidget): def draw_ohlc( self, - name: str, data: np.ndarray, From cc55e1f4bbb0e4bea229cf028632c23b9b5ea3cb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Feb 2022 15:11:48 -0500 Subject: [PATCH 11/24] Drop task-driven sample step graphics updates Since moving to a "god loop" for graphics, we don't really need to have a dedicated task for updating graphics on new sample increments. The only UX difference will be that curves won't be updated until an actual new rt-quote-event triggers the graphics loop -> so we'll have the chart "jump" to a new position and new curve segments generated only when new data arrives. This is imo fine since it's just less "idle" updates where the chart would sit printing the same (last) value every step. Instead only update the view increment if a new index is detected by reading shm. If we ever want this dedicated task update again this commit can be easily reverted B) --- piker/ui/_display.py | 111 ++++++++++--------------------------------- 1 file changed, 25 insertions(+), 86 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index ecae1307..656554e1 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -30,7 +30,7 @@ import tractor import trio from .. import brokers -from ..data.feed import open_feed, Feed +from ..data.feed import open_feed from ._chart import ( ChartPlotWidget, LinkedSplits, @@ -43,7 +43,7 @@ from ._fsp import ( has_vlm, open_vlm_displays, ) -from ..data._sharedmem import ShmArray, try_read +from ..data._sharedmem import ShmArray from ._forms import ( FieldsForm, mk_order_pane_layout, @@ -90,7 +90,9 @@ def chart_maxmin( l, lbar, rbar, r = last_bars_range in_view = array[lbar - ifirst:rbar - ifirst + 1] - assert in_view.size + if not in_view.size: + log.warning('Resetting chart to data') + chart.default_view() mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low']) @@ -107,6 +109,7 @@ def chart_maxmin( async def graphics_update_loop( + linked: LinkedSplits, stream: tractor.MsgStream, ohlcv: np.ndarray, @@ -146,9 +149,8 @@ async def graphics_update_loop( vlm_view = vlm_chart.view maxmin = partial(chart_maxmin, chart, vlm_chart) - chart.default_view() - + last_bars_range: tuple[float, float] ( last_bars_range, last_mx, @@ -182,6 +184,7 @@ async def graphics_update_loop( chart.show() view = chart.view last_quote = time.time() + i_last = ohlcv.index # async def iter_drain_quotes(): # # NOTE: all code below this loop is expected to be synchronous @@ -246,6 +249,22 @@ async def graphics_update_loop( # https://github.com/pikers/piker/issues/116 array = ohlcv.array + # NOTE: this used to be implemented in a dedicated + # "increment tas": ``check_for_new_bars()`` but it doesn't + # make sense to do a whole task switch when we can just do + # this simple index-diff and all the fsp sub-curve graphics + # are diffed on each draw cycle anyway; so updates to the + # "curve" length is already automatic. + + # increment the view position by the sample offset. + i_step = ohlcv.index + i_diff = i_step - i_last + if i_diff > 0: + chart.increment_view( + steps=i_diff, + ) + i_last = i_step + if vlm_chart: vlm_chart.update_curve_from_array('volume', array) vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) @@ -427,79 +446,7 @@ async def graphics_update_loop( ) # chart.view._set_yrange() - -async def check_for_new_bars( - feed: Feed, - ohlcv: np.ndarray, - linkedsplits: LinkedSplits, - -) -> None: - ''' - Task which updates from new bars in the shared ohlcv buffer every - ``delay_s`` seconds. - - ''' - # TODO: right now we'll spin printing bars if the last time - # stamp is before a large period of no market activity. - # Likely the best way to solve this is to make this task - # aware of the instrument's tradable hours? - - price_chart = linkedsplits.chart - price_chart.default_view() - - async with feed.index_stream() as stream: - async for index in stream: - # update chart historical bars graphics by incrementing - # a time step and drawing the history and new bar - - # When appending a new bar, in the time between the insert - # from the writing process and the Qt render call, here, - # the index of the shm buffer may be incremented and the - # (render) call here might read the new flat bar appended - # to the buffer (since -1 index read). In that case H==L and the - # body will be set as None (not drawn) on what this render call - # *thinks* is the curent bar (even though it's reading data from - # the newly inserted flat bar. - # - # HACK: We need to therefore write only the history (not the - # current bar) and then either write the current bar manually - # or place a cursor for visual cue of the current time step. - - array = ohlcv.array - # avoid unreadable race case on backfills - while not try_read(array): - await trio.sleep(0.01) - - # XXX: this puts a flat bar on the current time step - # TODO: if we eventually have an x-axis time-step "cursor" - # we can get rid of this since it is extra overhead. - price_chart.update_ohlc_from_array( - price_chart.name, - array, - just_history=False, - ) - - # main chart overlays - # for name in price_chart._flows: - for curve_name in price_chart._flows: - price_chart.update_curve_from_array( - curve_name, - price_chart._arrays[curve_name] - ) - - # each subplot - for name, chart in linkedsplits.subplots.items(): - - # TODO: do we need the same unreadable guard as for the - # price chart (above) here? - chart.update_curve_from_array( - chart.name, - chart._shm.array, - array_key=chart.data_key - ) - - # shift the view if in follow mode - price_chart.increment_view() + # loop end async def display_symbol_data( @@ -630,14 +577,6 @@ async def display_symbol_data( vlm_chart, ) - # start sample step incrementer - ln.start_soon( - check_for_new_bars, - feed, - ohlcv, - linkedsplits - ) - async with ( open_order_mode( feed, From 81f8b4e14545845806428cc1e551ee359edec7a2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Feb 2022 15:22:36 -0500 Subject: [PATCH 12/24] Don't zero clearing rates on sample steps --- piker/ui/_fsp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index eac8f27d..ac35067c 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -813,7 +813,7 @@ async def open_vlm_displays( flow_rates, { # fsp engine conf 'func_name': 'flow_rates', - 'zero_on_step': True, + 'zero_on_step': False, }, # loglevel, ) From 11d4ebd0b557fe696e3ac3aab5e4b8a3dc993092 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Feb 2022 15:28:54 -0500 Subject: [PATCH 13/24] Just warn on double-remove of a sub --- piker/data/_sampling.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 669f624e..89228c96 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -15,7 +15,9 @@ # along with this program. If not, see . """ -Data buffers for fast shared humpy. +Sampling and broadcast machinery for (soft) real-time delivery of +financial data flows. + """ import time from typing import Dict, List @@ -48,7 +50,8 @@ async def increment_ohlc_buffer( delay_s: int, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ): - """Task which inserts new bars into the provide shared memory array + ''' + Task which inserts new bars into the provide shared memory array every ``delay_s`` seconds. This task fulfills 2 purposes: @@ -59,8 +62,8 @@ async def increment_ohlc_buffer( Note that if **no** actor has initiated this task then **none** of the underlying buffers will actually be incremented. - """ + ''' # # wait for brokerd to signal we should start sampling # await shm_incrementing(shm_token['shm_name']).wait() @@ -137,12 +140,12 @@ async def iter_ohlc_periods( delay_s: int, ) -> None: - """ + ''' Subscribe to OHLC sampling "step" events: when the time aggregation period increments, this event stream emits an index event. - """ + ''' # add our subscription global _subscribers subs = _subscribers.setdefault(delay_s, []) @@ -290,7 +293,10 @@ async def sample_and_broadcast( # so far seems like no since this should all # be single-threaded. Doing it anyway though # since there seems to be some kinda race.. - subs.remove((stream, tick_throttle)) + try: + subs.remove((stream, tick_throttle)) + except ValueError: + log.error(f'{stream} was already removed from subs!?') # TODO: a less naive throttler, here's some snippets: From 786ffde4e63a09213f77f0eb6ffc999e145cf21e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Feb 2022 18:16:12 -0500 Subject: [PATCH 14/24] Use 3.9+ annots --- piker/data/_source.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/piker/data/_source.py b/piker/data/_source.py index 0677df65..dfa48453 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -17,7 +17,7 @@ """ numpy data source coversion helpers. """ -from typing import Dict, Any, List +from typing import Any import decimal import numpy as np @@ -103,13 +103,13 @@ class Symbol(BaseModel): lot_tick_size: float # "volume" precision as min step value tick_size_digits: int lot_size_digits: int - broker_info: Dict[str, Dict[str, Any]] = {} + broker_info: dict[str, dict[str, Any]] = {} # specifies a "class" of financial instrument # ex. stock, futer, option, bond etc. @property - def brokers(self) -> List[str]: + def brokers(self) -> list[str]: return list(self.broker_info.keys()) def nearest_tick(self, value: float) -> float: From 7a943f0e1e61b3fd4a917f8fd149ca120da120af Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Feb 2022 06:56:25 -0500 Subject: [PATCH 15/24] Always transmit index event even when no shm is registered --- piker/data/_sampling.py | 52 ++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 89228c96..dc247630 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -20,7 +20,6 @@ financial data flows. """ import time -from typing import Dict, List import tractor import trio @@ -35,15 +34,17 @@ log = get_logger(__name__) # TODO: we could stick these in a composed type to avoid # angering the "i hate module scoped variables crowd" (yawn). -_shms: Dict[int, List[ShmArray]] = {} -_start_increment: Dict[str, trio.Event] = {} -_incrementers: Dict[int, trio.CancelScope] = {} -_subscribers: Dict[str, tractor.Context] = {} +_ohlcv_shms: dict[int, list[ShmArray]] = {} +# holds one-task-per-sample-period tasks which are spawned as-needed by +# data feed requests with a given detected time step usually from +# history loading. +_incrementers: dict[int, trio.CancelScope] = {} -def shm_incrementing(shm_token_name: str) -> trio.Event: - global _start_increment - return _start_increment.setdefault(shm_token_name, trio.Event()) +# holds all the ``tractor.Context`` remote subscriptions for +# a particular sample period increment event: all subscribers are +# notified on a step. +_subscribers: dict[int, tractor.Context] = {} async def increment_ohlc_buffer( @@ -72,13 +73,13 @@ async def increment_ohlc_buffer( # to solve this is to make this task aware of the instrument's # tradable hours? - global _incrementers + global _incrementers, _ohlcv_shms, _subscribers # adjust delay to compensate for trio processing time - ad = min(_shms.keys()) - 0.001 + ad = min(_ohlcv_shms.keys()) - 0.001 total_s = 0 # total seconds counted - lowest = min(_shms.keys()) + lowest = min(_ohlcv_shms.keys()) ad = lowest - 0.001 with trio.CancelScope() as cs: @@ -94,8 +95,10 @@ async def increment_ohlc_buffer( total_s += lowest # increment all subscribed shm arrays - # TODO: this in ``numba`` - for delay_s, shms in _shms.items(): + # TODO: + # - this in ``numba`` + # - just lookup shms for this step instead of iterating? + for delay_s, shms in _ohlcv_shms.items(): if total_s % delay_s != 0: continue @@ -120,18 +123,19 @@ async def increment_ohlc_buffer( # write to the buffer shm.push(last) - # broadcast the buffer index step - subs = _subscribers.get(delay_s, ()) + # broadcast the buffer index step to any subscribers for + # a given sample period. + subs = _subscribers.get(delay_s, ()) - for ctx in subs: - try: - await ctx.send_yield({'index': shm._last.value}) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - log.error(f'{ctx.chan.uid} dropped connection') - subs.remove(ctx) + for ctx in subs: + try: + await ctx.send_yield({'index': shm._last.value}) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + log.error(f'{ctx.chan.uid} dropped connection') + subs.remove(ctx) @tractor.stream From 89a98c4aa286388116a0a88223a69eb12907de63 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Feb 2022 07:04:28 -0500 Subject: [PATCH 16/24] Fix portal result `await`, comment some unused code --- piker/_daemon.py | 57 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 38 insertions(+), 19 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 2464684b..b4eed03d 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -80,7 +80,6 @@ class Services(BaseModel): ) -> Any: with trio.CancelScope() as cs: - async with portal.open_context( target, **kwargs, @@ -89,19 +88,17 @@ class Services(BaseModel): # unblock once the remote context has started task_status.started((cs, first)) - + log.info( + f'`pikerd` service {name} started with value {first}' + ) # wait on any context's return value ctx_res = await ctx.result() - log.info( - f'`pikerd` service {name} started with value {ctx_res}' - ) # wait on any error from the sub-actor # NOTE: this will block indefinitely until cancelled - # either by error from the target context function or - # by being cancelled here by the surroundingn cancel - # scope - return await (portal.result(), ctx_res) + # either by error from the target context function or by + # being cancelled here by the surrounding cancel scope + return (await portal.result(), ctx_res) cs, first = await self.service_n.start(open_context_in_task) @@ -111,16 +108,16 @@ class Services(BaseModel): return cs, first - async def cancel_service( - self, - name: str, - - ) -> Any: - - log.info(f'Cancelling `pikerd` service {name}') - cs, portal = self.service_tasks[name] - cs.cancel() - return await portal.cancel_actor() + # TODO: per service cancellation by scope, we aren't using this + # anywhere right? + # async def cancel_service( + # self, + # name: str, + # ) -> Any: + # log.info(f'Cancelling `pikerd` service {name}') + # cs, portal = self.service_tasks[name] + # cs.cancel() + # return await portal.cancel_actor() _services: Optional[Services] = None @@ -497,3 +494,25 @@ async def maybe_open_emsd( ) as portal: yield portal + + +# TODO: ideally we can start the tsdb "on demand" but it's +# probably going to require "rootless" docker, at least if we don't +# want to expect the user to start ``pikerd`` with root perms all the +# time. +# async def maybe_open_marketstored( +# loglevel: Optional[str] = None, +# **kwargs, + +# ) -> tractor._portal.Portal: # noqa + +# async with maybe_spawn_daemon( + +# 'marketstored', +# service_task_target=spawn_emsd, +# spawn_args={'loglevel': loglevel}, +# loglevel=loglevel, +# **kwargs, + +# ) as portal: +# yield portal From b1cce8f9cff6d17b4bdde7567423a3ddd5e3db0e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Feb 2022 07:34:15 -0500 Subject: [PATCH 17/24] Adjust and add notes for python-trio/trio#2258 --- piker/data/feed.py | 73 ++++++++++++++++++++++++++++------------------ 1 file changed, 45 insertions(+), 28 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 6d4c0689..c3d17ac2 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for piker0) +# Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -25,7 +25,7 @@ from contextlib import asynccontextmanager from functools import partial from types import ModuleType from typing import ( - Any, Sequence, + Any, AsyncIterator, Optional, Awaitable, ) @@ -56,7 +56,9 @@ from ._source import ( ) from ..ui import _search from ._sampling import ( - _shms, + # TODO: should probably group these in a compound type at this point XD + _ohlcv_shms, + _subscribers, _incrementers, increment_ohlc_buffer, iter_ohlc_periods, @@ -108,6 +110,7 @@ class _FeedsBus(BaseModel): self, target: Awaitable, *args, + ) -> None: async def start_with_cs( @@ -159,7 +162,8 @@ def get_feed_bus( @tractor.context async def _setup_persistent_brokerd( ctx: tractor.Context, - brokername: str + brokername: str, + ) -> None: ''' Allocate a actor-wide service nursery in ``brokerd`` @@ -167,22 +171,22 @@ async def _setup_persistent_brokerd( the broker backend as needed. ''' - try: - async with trio.open_nursery() as service_nursery: + get_console_log(tractor.current_actor().loglevel) - # assign a nursery to the feeds bus for spawning - # background tasks from clients - bus = get_feed_bus(brokername, service_nursery) + global _bus + assert not _bus - # unblock caller - await ctx.started() + async with trio.open_nursery() as service_nursery: + # assign a nursery to the feeds bus for spawning + # background tasks from clients + get_feed_bus(brokername, service_nursery) - # we pin this task to keep the feeds manager active until the - # parent actor decides to tear it down - await trio.sleep_forever() - finally: - # TODO: this needs to be shielded? - bus.nursery.cancel_scope.cancel() + # unblock caller + await ctx.started() + + # we pin this task to keep the feeds manager active until the + # parent actor decides to tear it down + await trio.sleep_forever() async def manage_history( @@ -194,6 +198,8 @@ async def manage_history( some_data_ready: trio.Event, feed_is_live: trio.Event, + task_status: TaskStatus = trio.TASK_STATUS_IGNORED, + ) -> None: ''' Load and manage historical data including the loading of any @@ -202,13 +208,15 @@ async def manage_history( buffer. ''' - # TODO: - # history retreival, see if we can pull from an existing + # TODO: history retreival, see if we can pull from an existing # ``marketstored`` daemon - # log.info('Scanning for existing `marketstored`') - # from .marketstore import load_history - # arrays = await load_history(symbol) + log.info('Scanning for existing `marketstored`') + fqsn = mk_fqsn(mod.name, symbol) + # from .marketstore import manage_history + # arrays = await manage_history(symbol) + arrays = {} + task_status.started() opened = we_opened_shm # TODO: history validation @@ -218,6 +226,8 @@ async def manage_history( if opened: if arrays: + await tractor.breakpoint() + # push to shm # set data ready # some_data_ready.set() @@ -245,7 +255,7 @@ async def manage_history( await feed_is_live.wait() if opened: - _shms.setdefault(delay_s, []).append(shm) + _ohlcv_shms.setdefault(delay_s, []).append(shm) # start shm incrementing for OHLC sampling at the current # detected sampling period if one dne. @@ -309,7 +319,13 @@ async def allocate_persistent_feed( # - a history loader / maintainer # - a real-time streamer which consumers and sends new data to any # consumers as well as writes to storage backends (as configured). - bus.nursery.start_soon( + + # XXX: neither of these will raise but will cause an inf hang due to: + # https://github.com/python-trio/trio/issues/2258 + # bus.nursery.start_soon( + # await bus.start_task( + + await bus.nursery.start( manage_history, mod, shm, @@ -345,7 +361,9 @@ async def allocate_persistent_feed( # yield back control to starting nursery once we receive either # some history or a real-time quote. + log.info(f'waiting on history to load: {fqsn}') await some_data_ready.wait() + bus.feeds[symbol.lower()] = (init_msg, first_quotes) task_status.started((init_msg, first_quotes)) @@ -518,8 +536,8 @@ async def open_sample_step_stream( portal.open_stream_from, iter_ohlc_periods, ), - kwargs={'delay_s': delay_s}, + ) as (cache_hit, istream): if cache_hit: # add a new broadcast subscription for the quote stream @@ -623,9 +641,8 @@ async def install_brokerd_search( @asynccontextmanager async def open_feed( - brokername: str, - symbols: Sequence[str], + symbols: list[str], loglevel: Optional[str] = None, backpressure: bool = True, @@ -725,7 +742,7 @@ async def open_feed( async def maybe_open_feed( brokername: str, - symbols: Sequence[str], + symbols: list[str], loglevel: Optional[str] = None, **kwargs, From 3e7d4f8717f8c733886b93e5401dd62fbaaf2e1b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Feb 2022 07:34:47 -0500 Subject: [PATCH 18/24] Detect and request sample period in fsp engine --- piker/fsp/_engine.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 1b853c60..f1dd49d7 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -123,7 +123,6 @@ async def fsp_compute( # TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no? # if the output array is multi-field then push # each respective field. - # await tractor.breakpoint() fields = getattr(dst.array.dtype, 'fields', None).copy() fields.pop('index') # TODO: nptyping here! @@ -269,7 +268,7 @@ async def cascade( f'Registered FSP set:\n{lines}' ) - # update actor local flows table which registers + # update actorlocal flows table which registers # readonly "instances" of this fsp for symbol/source # so that consumer fsps can look it up by source + fsp. # TODO: ugh i hate this wind/unwind to list over the wire @@ -381,14 +380,19 @@ async def cascade( s, step, ld = is_synced(src, dst) + # detect sample period step for subscription to increment + # signal + times = src.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. - async with feed.index_stream() as stream: + async with feed.index_stream(int(delay_s)) as istream: profiler(f'{func_name}: sample stream up') profiler.finish() - async for msg in stream: + async for _ in istream: # respawn the compute task if the source # array has been updated such that we compute From 6f3d78b7293517b511fca948ed68ec40fd04b415 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Feb 2022 07:40:02 -0500 Subject: [PATCH 19/24] Handle "no data" case in ranger calcs and avoid crashes --- piker/ui/_display.py | 1 + 1 file changed, 1 insertion(+) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 656554e1..c6d0f5aa 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -93,6 +93,7 @@ def chart_maxmin( if not in_view.size: log.warning('Resetting chart to data') chart.default_view() + return (last_bars_range, 0, 0, 0) mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low']) From c239faf4e52ee77c6b9e3eef47ee8852fc4639d7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Feb 2022 11:56:36 -0500 Subject: [PATCH 20/24] Add a `._sampling.sampler` registry composite type --- piker/data/_sampling.py | 49 ++++++++++++++++++++++++----------------- piker/data/feed.py | 18 +++++++-------- 2 files changed, 38 insertions(+), 29 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index dc247630..ecad241d 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -32,19 +32,27 @@ from ..log import get_logger log = get_logger(__name__) -# TODO: we could stick these in a composed type to avoid -# angering the "i hate module scoped variables crowd" (yawn). -_ohlcv_shms: dict[int, list[ShmArray]] = {} +class sampler: + ''' + Global sampling engine registry. -# holds one-task-per-sample-period tasks which are spawned as-needed by -# data feed requests with a given detected time step usually from -# history loading. -_incrementers: dict[int, trio.CancelScope] = {} + Manages state for sampling events, shm incrementing and + sample period logic. -# holds all the ``tractor.Context`` remote subscriptions for -# a particular sample period increment event: all subscribers are -# notified on a step. -_subscribers: dict[int, tractor.Context] = {} + ''' + # TODO: we could stick these in a composed type to avoid + # angering the "i hate module scoped variables crowd" (yawn). + ohlcv_shms: dict[int, list[ShmArray]] = {} + + # holds one-task-per-sample-period tasks which are spawned as-needed by + # data feed requests with a given detected time step usually from + # history loading. + incrementers: dict[int, trio.CancelScope] = {} + + # holds all the ``tractor.Context`` remote subscriptions for + # a particular sample period increment event: all subscribers are + # notified on a step. + subscribers: dict[int, tractor.Context] = {} async def increment_ohlc_buffer( @@ -73,19 +81,17 @@ async def increment_ohlc_buffer( # to solve this is to make this task aware of the instrument's # tradable hours? - global _incrementers, _ohlcv_shms, _subscribers - # adjust delay to compensate for trio processing time - ad = min(_ohlcv_shms.keys()) - 0.001 + ad = min(sampler.ohlcv_shms.keys()) - 0.001 total_s = 0 # total seconds counted - lowest = min(_ohlcv_shms.keys()) + lowest = min(sampler.ohlcv_shms.keys()) ad = lowest - 0.001 with trio.CancelScope() as cs: # register this time period step as active - _incrementers[delay_s] = cs + sampler.incrementers[delay_s] = cs task_status.started(cs) while True: @@ -98,7 +104,7 @@ async def increment_ohlc_buffer( # TODO: # - this in ``numba`` # - just lookup shms for this step instead of iterating? - for delay_s, shms in _ohlcv_shms.items(): + for delay_s, shms in sampler.ohlcv_shms.items(): if total_s % delay_s != 0: continue @@ -125,7 +131,7 @@ async def increment_ohlc_buffer( # broadcast the buffer index step to any subscribers for # a given sample period. - subs = _subscribers.get(delay_s, ()) + subs = sampler.subscribers.get(delay_s, ()) for ctx in subs: try: @@ -151,8 +157,7 @@ async def iter_ohlc_periods( ''' # add our subscription - global _subscribers - subs = _subscribers.setdefault(delay_s, []) + subs = sampler.subscribers.setdefault(delay_s, []) subs.append(ctx) try: @@ -313,6 +318,8 @@ async def uniform_rate_send( quote_stream: trio.abc.ReceiveChannel, stream: tractor.MsgStream, + task_status: TaskStatus = trio.TASK_STATUS_IGNORED, + ) -> None: # TODO: compute the approx overhead latency per cycle @@ -323,6 +330,8 @@ async def uniform_rate_send( last_send = time.time() diff = 0 + task_status.started() + while True: # compute the remaining time to sleep for this throttled cycle diff --git a/piker/data/feed.py b/piker/data/feed.py index c3d17ac2..0e1f1248 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -56,10 +56,7 @@ from ._source import ( ) from ..ui import _search from ._sampling import ( - # TODO: should probably group these in a compound type at this point XD - _ohlcv_shms, - _subscribers, - _incrementers, + sampler, increment_ohlc_buffer, iter_ohlc_periods, sample_and_broadcast, @@ -118,7 +115,7 @@ class _FeedsBus(BaseModel): trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: with trio.CancelScope() as cs: - self.nursery.start_soon( + await self.nursery.start( target, *args, ) @@ -255,23 +252,26 @@ async def manage_history( await feed_is_live.wait() if opened: - _ohlcv_shms.setdefault(delay_s, []).append(shm) + sampler.ohlcv_shms.setdefault(delay_s, []).append(shm) # start shm incrementing for OHLC sampling at the current # detected sampling period if one dne. - if _incrementers.get(delay_s) is None: - cs = await bus.start_task(increment_ohlc_buffer, delay_s) + if sampler.incrementers.get(delay_s) is None: + cs = await bus.start_task( + increment_ohlc_buffer, + delay_s, + ) await trio.sleep_forever() cs.cancel() async def allocate_persistent_feed( - bus: _FeedsBus, brokername: str, symbol: str, loglevel: str, + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: From adccb687fe511c569e8b89b2b0dbd1956f60f3b6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Feb 2022 11:56:55 -0500 Subject: [PATCH 21/24] Fix `piker services` cmd --- piker/cli/__init__.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 97b6ebc3..7eb7b5d1 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -109,13 +109,11 @@ def services(config, tl, names): ) as portal: registry = await portal.run_from_ns('self', 'get_registry') json_d = {} - for uid, socket in registry.items(): - name, uuid = uid + for key, socket in registry.items(): + # name, uuid = uid host, port = socket - json_d[f'{name}.{uuid}'] = f'{host}:{port}' - click.echo( - f"Available `piker` services:\n{colorize_json(json_d)}" - ) + json_d[key] = f'{host}:{port}' + click.echo(f"{colorize_json(json_d)}") tractor.run( list_services, From 9d4e1c885f34fd936045473d916ca0170ddc748c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Feb 2022 12:11:54 -0500 Subject: [PATCH 22/24] Ignore snippets dir --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index bdbd200a..70826d07 100644 --- a/.gitignore +++ b/.gitignore @@ -97,6 +97,9 @@ ENV/ # mkdocs documentation /site +# extra scripts dir +/snippets + # mypy .mypy_cache/ .vscode/settings.json From 09079b61fcea67de1b8d2d8660bbbe26ae35aa6e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 1 Mar 2022 12:37:31 -0500 Subject: [PATCH 23/24] Comment task canceller method prototype --- piker/data/feed.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 0e1f1248..97f4c046 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -123,8 +123,11 @@ class _FeedsBus(BaseModel): return await self.nursery.start(start_with_cs) - def cancel_task(self, task: trio.Task) -> bool: - pass + # def cancel_task( + # self, + # task: trio.lowlevel.Task + # ) -> bool: + # ... _bus: _FeedsBus = None From f7d03489d866dba325ee3ef4f8a55ba5bf528df4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 1 Mar 2022 12:39:12 -0500 Subject: [PATCH 24/24] Drop `marketstore` loading cruft (will come later) --- piker/data/feed.py | 27 +++++---------------------- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 97f4c046..e2e91d7b 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -208,14 +208,6 @@ async def manage_history( buffer. ''' - # TODO: history retreival, see if we can pull from an existing - # ``marketstored`` daemon - log.info('Scanning for existing `marketstored`') - fqsn = mk_fqsn(mod.name, symbol) - # from .marketstore import manage_history - # arrays = await manage_history(symbol) - - arrays = {} task_status.started() opened = we_opened_shm @@ -225,21 +217,12 @@ async def manage_history( # raise RuntimeError("Persistent shm for sym was already open?!") if opened: - if arrays: - await tractor.breakpoint() + # ask broker backend for new history - # push to shm - # set data ready - # some_data_ready.set() - raise ValueError('this should never execute yet') - - else: - # ask broker backend for new history - - # start history backfill task ``backfill_bars()`` is - # a required backend func this must block until shm is - # filled with first set of ohlc bars - cs = await bus.nursery.start(mod.backfill_bars, symbol, shm) + # start history backfill task ``backfill_bars()`` is + # a required backend func this must block until shm is + # filled with first set of ohlc bars + cs = await bus.nursery.start(mod.backfill_bars, symbol, shm) # indicate to caller that feed can be delivered to # remote requesting client since we've loaded history