From d93ce84a992bc63d939fd167bc9149d46b5eb57b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Sep 2020 12:24:02 -0400 Subject: [PATCH] Variety of IB backend improvements - Move to new shared mem system only writing on the first (by process) entry to `stream_quotes()`. - Deliver bars before first quote arrives so that chart can populate and then wait for initial arrival. - Allow caching clients per actor. - Load bars using the same (cached) client that starts the quote stream thus speeding up initialization. --- piker/brokers/ib.py | 184 +++++++++++++++++++++++--------------------- 1 file changed, 96 insertions(+), 88 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index b0d89d1b..77a1018c 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -26,8 +26,10 @@ import tractor from ..log import get_logger, get_console_log from ..data import ( - maybe_spawn_brokerd, iterticks, attach_shared_array, - incr_buffer, + maybe_spawn_brokerd, + iterticks, + attach_shm_array, + get_shm_token ) from ..ui._source import from_df @@ -145,7 +147,7 @@ class Client: # durationStr='1 D', # time length calcs - durationStr='{count} S'.format(count=5000 * 5), + durationStr='{count} S'.format(count=1000 * 5), barSizeSetting='5 secs', # always use extended hours @@ -311,6 +313,8 @@ class Client: _tws_port: int = 7497 _gw_port: int = 4002 _try_ports = [_tws_port, _gw_port] +_client_ids = itertools.count() +_client_cache = {} @asynccontextmanager @@ -321,36 +325,39 @@ async def _aio_get_client( ) -> Client: """Return an ``ib_insync.IB`` instance wrapped in our client API. """ - if client_id is None: - # if this is a persistent brokerd, try to allocate a new id for - # each client - try: - ss = tractor.current_actor().statespace - client_id = next(ss.setdefault('client_ids', itertools.count())) - # TODO: in case the arbiter has no record - # of existing brokerd we need to broadcase for one. - except RuntimeError: - # tractor likely isn't running - client_id = 1 - - ib = NonShittyIB() - ports = _try_ports if port is None else [port] - _err = None - for port in ports: - try: - await ib.connectAsync(host, port, clientId=client_id) - break - except ConnectionRefusedError as ce: - _err = ce - log.warning(f'Failed to connect on {port}') - else: - raise ConnectionRefusedError(_err) + # first check cache for existing client try: - yield Client(ib) - except BaseException: - ib.disconnect() - raise + yield _client_cache[(host, port)] + except KeyError: + # TODO: in case the arbiter has no record + # of existing brokerd we need to broadcast for one. + + if client_id is None: + # if this is a persistent brokerd, try to allocate a new id for + # each client + client_id = next(_client_ids) + + ib = NonShittyIB() + ports = _try_ports if port is None else [port] + _err = None + for port in ports: + try: + await ib.connectAsync(host, port, clientId=client_id) + break + except ConnectionRefusedError as ce: + _err = ce + log.warning(f'Failed to connect on {port}') + else: + raise ConnectionRefusedError(_err) + + try: + client = Client(ib) + _client_cache[(host, port)] = client + yield client + except BaseException: + ib.disconnect() + raise async def _aio_run_client_method( @@ -489,7 +496,7 @@ def normalize( # @tractor.msg.pub async def stream_quotes( symbols: List[str], - shared_array_token: Tuple[str, str], + shm_token: Tuple[str, str, List[tuple]], loglevel: str = None, # compat for @tractor.msg.pub topics: Any = None, @@ -506,16 +513,35 @@ async def stream_quotes( # TODO: support multiple subscriptions sym = symbols[0] - stream = await tractor.to_asyncio.run_task( - _trio_run_client_method, + stream = await _trio_run_client_method( method='stream_ticker', symbol=sym, ) - async with get_client() as client: - bars = await client.bars(symbol=sym) - async with aclosing(stream): + + # maybe load historical ohlcv in to shared mem + # check if shm has already been created by previous + # feed initialization + writer_exists = get_shm_token(shm_token['shm_name']) + + if not writer_exists: + shm = attach_shm_array( + token=shm_token, + # we are writer + readonly=False, + ) + bars = await _trio_run_client_method( + method='bars', + symbol=sym, + ) + + shm.push(bars) + shm_token = shm.token + + # pass back token, and bool, signalling if we're the writer + yield shm_token, not writer_exists + # first quote can be ignored as a 2nd with newer data is sent? first_ticker = await stream.__anext__() quote = normalize(first_ticker) @@ -538,7 +564,7 @@ async def stream_quotes( else: log.debug("Received first real volume tick") # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is stateful trash) + # (ahem, ib_insync is truly stateful trash) ticker.ticks = [] # XXX: this works because we don't use @@ -555,58 +581,40 @@ async def stream_quotes( topic = '.'.join((con['symbol'], con[suffix])).lower() first_quote = {topic: quote} ticker.ticks = [] + # yield first quote asap + yield first_quote - # load historical ohlcv in to shared mem - ss = tractor.current_actor().statespace - existing_shm = ss.get(f'ib_shm.{sym}') - if not existing_shm: - readonly = False - else: - readonly = True - shm = existing_shm + async for ticker in stream: + quote = normalize( + ticker, + calc_price=calc_price + ) + # TODO: in theory you can send the IPC msg *before* + # writing to the sharedmem array to decrease latency, + # however, that will require `tractor.msg.pub` support + # here or at least some way to prevent task switching + # at the yield such that the array write isn't delayed + # while another consumer is serviced.. - with attach_shared_array( - token=shared_array_token, - readonly=readonly - ) as shm: - if not existing_shm: - shm.push(bars) - ss[f'ib_shm.{sym}'] = shm + # if we are the lone tick writer start writing + # the buffer with appropriate trade data + if not writer_exists: + for tick in iterticks(quote, type='trade'): + last = tick['price'] + # print(f'broker last: {tick}') - yield (first_quote, shm.token) - else: - yield (first_quote, None) + # update last entry + # benchmarked in the 4-5 us range + high, low = shm.array[-1][['high', 'low']] + shm.array[['high', 'low', 'close']][-1] = ( + max(high, last), + min(low, last), + last, + ) - async for ticker in stream: - quote = normalize( - ticker, - calc_price=calc_price - ) - # TODO: in theory you can send the IPC msg *before* - # writing to the sharedmem array to decrease latency, - # however, that will require `tractor.msg.pub` support - # here or at least some way to prevent task switching - # at the yield such that the array write isn't delayed - # while another consumer is serviced.. + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + yield {topic: quote} - # if we are the lone tick writer - if not existing_shm: - for tick in iterticks(quote, type='trade'): - last = tick['price'] - # print(f'broker last: {tick}') - - # update last entry - # benchmarked in the 4-5 us range - high, low = shm.array[-1][['high', 'low']] - shm.array[['high', 'low', 'close']][-1] = ( - max(high, last), - min(low, last), - last, - ) - - con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() - yield {topic: quote} - - # ugh, clear ticks since we've consumed them - ticker.ticks = [] + # ugh, clear ticks since we've consumed them + ticker.ticks = []