diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 020bfecc..9f40a7d9 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -5,7 +5,7 @@ Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is built on it) and thus actor aware API calls must be spawned with ``infected_aio==True``. """ -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager, contextmanager from dataclasses import asdict from functools import partial from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable @@ -292,7 +292,7 @@ class Client: ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) def push(t): - log.debug(t) + # log.debug(t) try: to_trio.send_nowait(t) except trio.BrokenResourceError: @@ -497,6 +497,21 @@ def normalize( return data +_local_buffer_writers = {} + + +@contextmanager +def activate_writer(key: str): + try: + writer_already_exists = _local_buffer_writers.get(key, False) + if not writer_already_exists: + _local_buffer_writers[key] = True + + yield writer_already_exists + finally: + _local_buffer_writers.pop(key, None) + + # TODO: figure out how to share quote feeds sanely despite # the wacky ``ib_insync`` api. # @tractor.msg.pub @@ -528,108 +543,113 @@ async def stream_quotes( async with aclosing(stream): - # maybe load historical ohlcv in to shared mem - # check if shm has already been created by previous - # feed initialization - writer_exists = get_shm_token(shm_token['shm_name']) + # check if a writer already is alive in a streaming task, + # otherwise start one and mark it as now existing + with activate_writer(shm_token['shm_name']) as writer_already_exists: - if not writer_exists: - shm = attach_shm_array( - token=shm_token, - # we are writer - readonly=False, - ) - bars = await _trio_run_client_method( - method='bars', - symbol=sym, - ) + # maybe load historical ohlcv in to shared mem + # check if shm has already been created by previous + # feed initialization + if not writer_already_exists: - shm.push(bars) - shm_token = shm.token + shm = attach_shm_array( + token=shm_token, - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] - subscribe_ohlc_for_increment(shm, delay_s) + # we are the buffer writer + readonly=False, + ) + bars = await _trio_run_client_method( + method='bars', + symbol=sym, + ) - # pass back token, and bool, signalling if we're the writer - await ctx.send_yield((shm_token, not writer_exists)) + # write historical data to buffer + shm.push(bars) + shm_token = shm.token - # first quote can be ignored as a 2nd with newer data is sent? - first_ticker = await stream.__anext__() - quote = normalize(first_ticker) - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is stateful trash) - first_ticker.ticks = [] + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + subscribe_ohlc_for_increment(shm, delay_s) - log.debug(f"First ticker received {quote}") + # pass back token, and bool, signalling if we're the writer + await ctx.send_yield((shm_token, not writer_already_exists)) - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - suffix = 'exchange' + # first quote can be ignored as a 2nd with newer data is sent? + first_ticker = await stream.__anext__() + quote = normalize(first_ticker) + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is stateful trash) + first_ticker.ticks = [] - calc_price = False # should be real volume for contract + log.debug(f"First ticker received {quote}") - async for ticker in stream: - # spin consuming tickers until we get a real market datum - if not ticker.rtTime: - log.debug(f"New unsent ticker: {ticker}") - continue - else: - log.debug("Received first real volume tick") - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is truly stateful trash) - ticker.ticks = [] + if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): + suffix = 'exchange' - # XXX: this works because we don't use - # ``aclosing()`` above? - break - else: - # commodities don't have an exchange name for some reason? - suffix = 'secType' - calc_price = True - ticker = first_ticker + calc_price = False # should be real volume for contract - con = quote['contract'] - quote = normalize(ticker, calc_price=calc_price) - topic = '.'.join((con['symbol'], con[suffix])).lower() - first_quote = {topic: quote} - ticker.ticks = [] + async for ticker in stream: + # spin consuming tickers until we get a real market datum + if not ticker.rtTime: + log.debug(f"New unsent ticker: {ticker}") + continue + else: + log.debug("Received first real volume tick") + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is truly stateful trash) + ticker.ticks = [] - # yield first quote asap - await ctx.send_yield(first_quote) - - # real-time stream - async for ticker in stream: - quote = normalize( - ticker, - calc_price=calc_price - ) - # TODO: in theory you can send the IPC msg *before* - # writing to the sharedmem array to decrease latency, - # however, that will require `tractor.msg.pub` support - # here or at least some way to prevent task switching - # at the yield such that the array write isn't delayed - # while another consumer is serviced.. - - # if we are the lone tick writer start writing - # the buffer with appropriate trade data - if not writer_exists: - for tick in iterticks(quote, type='trade'): - last = tick['price'] - # print(f'broker last: {tick}') - - # update last entry - # benchmarked in the 4-5 us range - high, low = shm.array[-1][['high', 'low']] - shm.array[['high', 'low', 'close']][-1] = ( - max(high, last), - min(low, last), - last, - ) + # XXX: this works because we don't use + # ``aclosing()`` above? + break + else: + # commodities don't have an exchange name for some reason? + suffix = 'secType' + calc_price = True + ticker = first_ticker con = quote['contract'] + quote = normalize(ticker, calc_price=calc_price) topic = '.'.join((con['symbol'], con[suffix])).lower() - - await ctx.send_yield({topic: quote}) - - # ugh, clear ticks since we've consumed them + first_quote = {topic: quote} ticker.ticks = [] + + # yield first quote asap + await ctx.send_yield(first_quote) + + # real-time stream + async for ticker in stream: + quote = normalize( + ticker, + calc_price=calc_price + ) + # TODO: in theory you can send the IPC msg *before* + # writing to the sharedmem array to decrease latency, + # however, that will require `tractor.msg.pub` support + # here or at least some way to prevent task switching + # at the yield such that the array write isn't delayed + # while another consumer is serviced.. + + # if we are the lone tick writer start writing + # the buffer with appropriate trade data + if not writer_already_exists: + for tick in iterticks(quote, type='trade'): + last = tick['price'] + # print(f'broker last: {tick}') + + # update last entry + # benchmarked in the 4-5 us range + high, low = shm.array[-1][['high', 'low']] + shm.array[['high', 'low', 'close']][-1] = ( + max(high, last), + min(low, last), + last, + ) + + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + + await ctx.send_yield({topic: quote}) + + # ugh, clear ticks since we've consumed them + ticker.ticks = [] diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 153c1c8f..77bcba12 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -167,6 +167,7 @@ async def open_feed( # Attempt to allocate (or attach to) shm array for this broker/symbol shm, opened = maybe_open_shm_array( key=sym_to_shm_key(name, symbols[0]), + # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_ohlc_dtype), @@ -193,12 +194,13 @@ async def open_feed( # tests are in? shm_token, is_writer = await stream.receive() + if opened: + assert is_writer + log.info("Started shared mem bar writer") + shm_token['dtype_descr'] = list(shm_token['dtype_descr']) assert shm_token == shm.token # sanity - if is_writer: - log.info("Started shared mem bar writer") - yield Feed( name=name, stream=stream,