diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 67c3e799..b0d89d1b 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -25,7 +25,10 @@ import trio import tractor from ..log import get_logger, get_console_log -from ..data import maybe_spawn_brokerd +from ..data import ( + maybe_spawn_brokerd, iterticks, attach_shared_array, + incr_buffer, +) from ..ui._source import from_df @@ -104,7 +107,6 @@ _adhoc_cmdty_data_map = { # NOTE: cmdtys don't have trade data: # https://groups.io/g/twsapi/message/44174 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), - 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), } @@ -143,7 +145,7 @@ class Client: # durationStr='1 D', # time length calcs - durationStr='{count} S'.format(count=3000 * 5), + durationStr='{count} S'.format(count=5000 * 5), barSizeSetting='5 secs', # always use extended hours @@ -487,6 +489,7 @@ def normalize( # @tractor.msg.pub async def stream_quotes( symbols: List[str], + shared_array_token: Tuple[str, str], loglevel: str = None, # compat for @tractor.msg.pub topics: Any = None, @@ -508,26 +511,25 @@ async def stream_quotes( method='stream_ticker', symbol=sym, ) + + async with get_client() as client: + bars = await client.bars(symbol=sym) + async with aclosing(stream): # first quote can be ignored as a 2nd with newer data is sent? first_ticker = await stream.__anext__() + quote = normalize(first_ticker) + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is stateful trash) + first_ticker.ticks = [] + + log.debug(f"First ticker received {quote}") if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): suffix = 'exchange' calc_price = False # should be real volume for contract - quote = normalize(first_ticker) - log.debug(f"First ticker received {quote}") - - con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() - yield {topic: quote} - - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is stateful trash) - first_ticker.ticks = [] - async for ticker in stream: # spin consuming tickers until we get a real market datum if not ticker.rtTime: @@ -535,10 +537,6 @@ async def stream_quotes( continue else: log.debug("Received first real volume tick") - quote = normalize(ticker) - topic = '.'.join((con['symbol'], con[suffix])).lower() - yield {topic: quote} - # ugh, clear ticks since we've consumed them # (ahem, ib_insync is stateful trash) ticker.ticks = [] @@ -550,28 +548,65 @@ async def stream_quotes( # commodities don't have an exchange name for some reason? suffix = 'secType' calc_price = True + ticker = first_ticker - async for ticker in stream: - quote = normalize( - ticker, - calc_price=calc_price - ) - con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() - yield {topic: quote} + con = quote['contract'] + quote = normalize(ticker, calc_price=calc_price) + topic = '.'.join((con['symbol'], con[suffix])).lower() + first_quote = {topic: quote} + ticker.ticks = [] - # ugh, clear ticks since we've consumed them - ticker.ticks = [] + # load historical ohlcv in to shared mem + ss = tractor.current_actor().statespace + existing_shm = ss.get(f'ib_shm.{sym}') + if not existing_shm: + readonly = False + else: + readonly = True + shm = existing_shm + with attach_shared_array( + token=shared_array_token, + readonly=readonly + ) as shm: + if not existing_shm: + shm.push(bars) + ss[f'ib_shm.{sym}'] = shm -if __name__ == '__main__': - import sys - sym = sys.argv[1] + yield (first_quote, shm.token) + else: + yield (first_quote, None) - contract = asyncio.run( - _aio_run_client_method( - 'find_contract', - symbol=sym, - ) - ) - print(contract) + async for ticker in stream: + quote = normalize( + ticker, + calc_price=calc_price + ) + # TODO: in theory you can send the IPC msg *before* + # writing to the sharedmem array to decrease latency, + # however, that will require `tractor.msg.pub` support + # here or at least some way to prevent task switching + # at the yield such that the array write isn't delayed + # while another consumer is serviced.. + + # if we are the lone tick writer + if not existing_shm: + for tick in iterticks(quote, type='trade'): + last = tick['price'] + # print(f'broker last: {tick}') + + # update last entry + # benchmarked in the 4-5 us range + high, low = shm.array[-1][['high', 'low']] + shm.array[['high', 'low', 'close']][-1] = ( + max(high, last), + min(low, last), + last, + ) + + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + yield {topic: quote} + + # ugh, clear ticks since we've consumed them + ticker.ticks = [] diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 25efe088..3ef85c96 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -18,6 +18,20 @@ import tractor from ..brokers import get_brokermod from ..log import get_logger, get_console_log +from ._normalize import iterticks +from ._sharedmem import ( + maybe_open_shared_array, attach_shared_array, open_shared_array, +) +from ._buffer import incr_buffer + + +__all__ = [ + 'maybe_open_shared_array', + 'attach_shared_array', + 'open_shared_array', + 'iterticks', + 'incr_buffer', +] log = get_logger(__name__) @@ -27,7 +41,7 @@ __ingestors__ = [ ] -def get_ingestor(name: str) -> ModuleType: +def get_ingestormod(name: str) -> ModuleType: """Return the imported ingestor module by name. """ module = import_module('.' + name, 'piker.data') @@ -39,6 +53,7 @@ def get_ingestor(name: str) -> ModuleType: _data_mods = [ 'piker.brokers.core', 'piker.brokers.data', + 'piker.data', ] @@ -100,22 +115,40 @@ async def open_feed( if loglevel is None: loglevel = tractor.current_actor().loglevel - async with maybe_spawn_brokerd( - mod.name, - loglevel=loglevel, - ) as portal: - stream = await portal.run( - mod.__name__, - 'stream_quotes', - symbols=symbols, - topics=symbols, - ) - # Feed is required to deliver an initial quote asap. - # TODO: should we timeout and raise a more explicit error? - # with trio.fail_after(5): - with trio.fail_after(float('inf')): - # Retreive initial quote for each symbol - # such that consumer code can know the data layout - first_quote = await stream.__anext__() - log.info(f"Received first quote {first_quote}") - yield (first_quote, stream) + with maybe_open_shared_array( + name=f'{name}.{symbols[0]}.buf', + readonly=True, # we expect the sub-actor to write + ) as shmarr: + async with maybe_spawn_brokerd( + mod.name, + loglevel=loglevel, + ) as portal: + stream = await portal.run( + mod.__name__, + 'stream_quotes', + symbols=symbols, + shared_array_token=shmarr.token, + topics=symbols, + ) + # Feed is required to deliver an initial quote asap. + # TODO: should we timeout and raise a more explicit error? + # with trio.fail_after(5): + with trio.fail_after(float('inf')): + # Retreive initial quote for each symbol + # such that consumer code can know the data layout + first_quote, child_shmarr_token = await stream.__anext__() + log.info(f"Received first quote {first_quote}") + + if child_shmarr_token is not None: + # we are the buffer writer task + increment_stream = await portal.run( + 'piker.data', + 'incr_buffer', + shm_token=child_shmarr_token, + ) + + assert child_shmarr_token == shmarr.token + else: + increment_stream = None + + yield (first_quote, stream, increment_stream, shmarr) diff --git a/piker/data/_buffer.py b/piker/data/_buffer.py new file mode 100644 index 00000000..44a21d3f --- /dev/null +++ b/piker/data/_buffer.py @@ -0,0 +1,74 @@ +""" +Data buffers for fast shared humpy. +""" +import time + +import tractor +import numpy as np +import trio + +from ._sharedmem import SharedArray, attach_shared_array + + +@tractor.stream +async def incr_buffer( + ctx: tractor.Context, + shm_token: str, + # delay_s: Optional[float] = None, +): + """Task which inserts new bars into the provide shared memory array + every ``delay_s`` seconds. + """ + # TODO: right now we'll spin printing bars if the last time + # stamp is before a large period of no market activity. + # Likely the best way to solve this is to make this task + # aware of the instrument's tradable hours? + + with attach_shared_array( + token=shm_token, + readonly=False, + ) as shm: + + # determine ohlc delay between bars + # to determine time step between datums + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + + # adjust delay to compensate for trio processing time + ad = delay_s - 0.002 + + async def sleep(): + """Sleep until next time frames worth has passed from last bar. + """ + # last_ts = shm.array[-1]['time'] + # delay = max((last_ts + ad) - time.time(), 0) + # await trio.sleep(delay) + await trio.sleep(ad) + + while True: + # sleep for duration of current bar + await sleep() + + # append new entry to buffer thus "incrementing" the bar + array = shm.array + last = array[-1:].copy() + # last = np.array(last, dtype=array.dtype) + # shm.push(last) + # array = shm.array + # last = array[-1].copy() + (index, t, close) = last[0][['index', 'time', 'close']] + + # new = np.array(last, dtype=array.dtype) + + # this copies non-std fields (eg. vwap) from the last datum + last[ + ['index', 'time', 'volume', 'open', 'high', 'low', 'close'] + ][0] = (index + 1, t + delay_s, 0, close, close, close, close) + + # write to the buffer + print('incrementing array') + # await tractor.breakpoint() + shm.push(last) + + # yield the new buffer index value + await ctx.send_yield(shm._i.value)