diff --git a/piker/data/_buffer.py b/piker/data/_buffer.py index 44a21d3f..0c25309f 100644 --- a/piker/data/_buffer.py +++ b/piker/data/_buffer.py @@ -4,10 +4,9 @@ Data buffers for fast shared humpy. import time import tractor -import numpy as np import trio -from ._sharedmem import SharedArray, attach_shared_array +from ._sharedmem import attach_shared_array @tractor.stream @@ -40,35 +39,33 @@ async def incr_buffer( async def sleep(): """Sleep until next time frames worth has passed from last bar. """ - # last_ts = shm.array[-1]['time'] - # delay = max((last_ts + ad) - time.time(), 0) - # await trio.sleep(delay) - await trio.sleep(ad) + last_ts = shm.array[-1]['time'] + delay = max((last_ts + ad) - time.time(), 0) + await trio.sleep(delay) + # await trio.sleep(ad) while True: # sleep for duration of current bar await sleep() + # TODO: in theory we could make this faster by copying the + # "last" readable value into the underlying larger buffer's + # next value and then incrementing the counter instead of + # using ``.push()``? + # append new entry to buffer thus "incrementing" the bar array = shm.array last = array[-1:].copy() - # last = np.array(last, dtype=array.dtype) - # shm.push(last) - # array = shm.array - # last = array[-1].copy() (index, t, close) = last[0][['index', 'time', 'close']] - # new = np.array(last, dtype=array.dtype) - # this copies non-std fields (eg. vwap) from the last datum last[ ['index', 'time', 'volume', 'open', 'high', 'low', 'close'] ][0] = (index + 1, t + delay_s, 0, close, close, close, close) # write to the buffer - print('incrementing array') - # await tractor.breakpoint() shm.push(last) + # print('incrementing array') # yield the new buffer index value await ctx.send_yield(shm._i.value) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index cfe162a7..6747ed9d 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -58,7 +58,6 @@ class SharedInt: create=create, size=4, # std int ) - self._token = self._shm.name @property def value(self) -> int: @@ -68,6 +67,12 @@ class SharedInt: def value(self, value) -> None: self._shm.buf[:] = value.to_bytes(4, byteorder) + def destroy(self) -> None: + if shared_memory._USE_POSIX: + # We manually unlink to bypass all the "resource tracker" + # nonsense meant for non-SC systems. + shm_unlink(self._shm.name) + class SharedArray: def __init__( @@ -87,7 +92,7 @@ class SharedArray: @property def token(self) -> Tuple[str, str]: - return (self._shm.name, self._i._token) + return (self._shm.name, self._i._shm.name) @property def name(self) -> str: @@ -130,8 +135,8 @@ class SharedArray: if shared_memory._USE_POSIX: # We manually unlink to bypass all the "resource tracker" # nonsense meant for non-SC systems. - shm_unlink(self._i._shm.name) shm_unlink(self._shm.name) + self._i.destroy() def flush(self) -> None: # TODO: flush to storage backend like markestore? @@ -141,9 +146,8 @@ class SharedArray: @contextmanager def open_shared_array( name: Optional[str] = None, - create: bool = True, - # approx number of 5s bars in a "day" - size: int = int(60*60*10/5), + # approx number of 5s bars in a "day" x2 + size: int = int(2*60*60*10/5), dtype: np.dtype = base_ohlc_dtype, readonly: bool = False, ) -> SharedArray: @@ -155,7 +159,11 @@ def open_shared_array( # create new shared mem segment for which we # have write permission a = np.zeros(size, dtype=dtype) - shm = shared_memory.SharedMemory(name=name, create=True, size=a.nbytes) + shm = shared_memory.SharedMemory( + name=name, + create=True, + size=a.nbytes + ) shmarr = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) shmarr[:] = a[:] shmarr.setflags(write=int(not readonly)) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 780fe502..7695d89e 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -23,7 +23,6 @@ from .. import data from ..data._normalize import iterticks from ..log import get_logger from ._exec import run_qtractor -from ._source import base_ohlc_dtype from ._interaction import ChartView from .. import fsp @@ -588,7 +587,7 @@ class ChartPlotWidget(pg.PlotWidget): self.scene().leaveEvent(ev) -async def check_for_new_bars(delay_s, ohlcv, linked_charts): +async def check_for_new_bars(incr_stream, ohlcv, linked_charts): """Task which updates from new bars in the shared ohlcv buffer every ``delay_s`` seconds. """ @@ -597,36 +596,9 @@ async def check_for_new_bars(delay_s, ohlcv, linked_charts): # Likely the best way to solve this is to make this task # aware of the instrument's tradable hours? - # adjust delay to compensate for trio processing time - ad = delay_s - 0.002 - price_chart = linked_charts.chart - # ohlc = price_chart._array - - async def sleep(): - """Sleep until next time frames worth has passed from last bar. - """ - # last_ts = ohlcv.array[-1]['time'] - # delay = max((last_ts + ad) - time.time(), 0) - # await trio.sleep(delay) - await trio.sleep(ad) - - # sleep for duration of current bar - await sleep() - - while True: - # TODO: bunch of stuff: - # - I'm starting to think all this logic should be - # done in one place and "graphics update routines" - # should not be doing any length checking and array diffing. - # - don't keep appending, but instead increase the - # underlying array's size less frequently - # - handle odd lot orders - # - update last open price correctly instead - # of copying it from last bar's close - # - 5 sec bar lookback-autocorrection like tws does? - + async for index in incr_stream: # TODO: generalize this increment logic for name, chart in linked_charts.subplots.items(): data = chart._array @@ -635,15 +607,6 @@ async def check_for_new_bars(delay_s, ohlcv, linked_charts): np.array(data[-1], dtype=data.dtype) ) - # read value at "open" of bar - # last_quote = ohlc[-1] - # XXX: If the last bar has not changed print a flat line and - # move to the next. This is a "animation" choice that we may not - # keep. - # if last_quote == ohlc[-1]: - # log.debug("Printing flat line for {sym}") - # print(ohlcv.array) - # update chart historical bars graphics price_chart.update_from_array( price_chart.name, @@ -670,17 +633,6 @@ async def check_for_new_bars(delay_s, ohlcv, linked_charts): chart.update_from_array(chart.name, chart._array) chart._set_yrange() - # We **don't** update the bar right now - # since the next quote that arrives should in the - # tick streaming task - await sleep() - - # TODO: should we update a graphics again time here? - # Think about race conditions with data update task. - # UPDATE: don't think this should matter know since the last bar - # and the prior historical bars are being updated in 2 separate - # steps now. - async def _async_main( sym: str, @@ -721,11 +673,6 @@ async def _async_main( overlay=True, ) - # determine ohlc delay between bars - # to determine time step between datums - times = bars['time'] - delay = times[-1] - times[times != times[-1]][-1] - async with trio.open_nursery() as n: # load initial fsp chain (otherwise known as "indicators") @@ -759,7 +706,8 @@ async def _async_main( ) n.start_soon( check_for_new_bars, - delay, + incr_stream, + # delay, ohlcv, linked_charts ) @@ -776,41 +724,35 @@ async def chart_from_quotes( ) -> None: """The 'main' (price) chart real-time update loop. """ - + # TODO: bunch of stuff: + # - I'm starting to think all this logic should be + # done in one place and "graphics update routines" + # should not be doing any length checking and array diffing. + # - handle odd lot orders + # - update last open price correctly instead + # of copying it from last bar's close + # - 5 sec bar lookback-autocorrection like tws does? last_price_sticky = chart._ysticks[chart.name] - print_next = False async for quotes in stream: for sym, quote in quotes.items(): for tick in iterticks(quote, type='trade'): - # TODO: eventually we'll want to update - # bid/ask labels and other data as - # subscribed by underlying UI consumers. - # last_close = ohlcv.array[-1]['close'] - - # last = quote.get('last') or quote['close'] - # last = tick['price'] - - # if print_next: - # print(f"next last: {last}") - # print_next = False - - # if last_close != last: - # log.error(f"array last_close: {last_close}\nlast: {last}") - # print_next = True - - # update ohlc (I guess we're enforcing this - # for now?) overwrite from quote - # high, low = chart._array[-1][['high', 'low']] - # chart._array[['high', 'low', 'close']][-1] = ( - # max(high, last), - # min(low, last), - # last, - # ) - last = ohlcv.array[-1] + # TODO: + # - eventually we'll want to update bid/ask labels and + # other data as subscribed by underlying UI consumers. + # - in theory we should be able to read buffer data + # faster then msgs arrive.. needs some tinkering and + # testing + start = time.time() + array = ohlcv.array + diff = time.time() - start + print(f'read time: {diff}') + # last = ohlcv.array[-1] + last = array[-1] chart.update_from_array( chart.name, - ohlcv.array, + # ohlcv.array, + array, ) # update sticky(s) last_price_sticky.update_from_data( @@ -819,13 +761,11 @@ async def chart_from_quotes( vwap = quote.get('vwap') if vwap and vwap_in_history: - # chart._array['vwap'][-1] = vwap last['vwap'] = vwap print(f"vwap: {quote['vwap']}") # update vwap overlay line chart.update_from_array( 'vwap', - # chart._array['vwap'], ohlcv.array['vwap'], )