diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 12f713ae..d0645dfe 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -49,9 +49,11 @@ from ..data import ( attach_shm_array, # get_shm_token, subscribe_ohlc_for_increment, + _buffer, ) from ..data._source import from_df from ._util import SymbolNotFound +from .._async_utils import maybe_with_if log = get_logger(__name__) @@ -355,11 +357,12 @@ class Client: symbol: str, to_trio, opts: Tuple[int] = ('375', '233',), + contract: Optional[Contract] = None, # opts: Tuple[int] = ('459',), ) -> None: """Stream a ticker using the std L1 api. """ - contract = await self.find_contract(symbol) + contract = contract or (await self.find_contract(symbol)) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) # define a simple queue push routine that streams quote packets @@ -386,6 +389,20 @@ class Client: # let the engine run and stream await self.ib.disconnectedEvent + async def get_quote( + self, + symbol: str, + ) -> Ticker: + """Return a single quote for symbol. + + """ + contract = await self.find_contract(symbol) + ticker: Ticker = self.ib.reqMktData( + contract, + snapshot=True, + ) + return contract, (await ticker.updateEvent) + # default config ports _tws_port: int = 7497 @@ -604,16 +621,21 @@ _local_buffer_writers = {} @asynccontextmanager async def activate_writer(key: str) -> (bool, trio.Nursery): + """Mark the current actor with module var determining + whether an existing shm writer task is already active. + + This avoids more then one writer resulting in data + clobbering. + """ + global _local_buffer_writers + try: - writer_already_exists = _local_buffer_writers.get(key, False) + assert not _local_buffer_writers.get(key, False) - if not writer_already_exists: - _local_buffer_writers[key] = True + _local_buffer_writers[key] = True - async with trio.open_nursery() as n: - yield writer_already_exists, n - else: - yield writer_already_exists, None + async with trio.open_nursery() as n: + yield n finally: _local_buffer_writers.pop(key, None) @@ -622,7 +644,7 @@ async def fill_bars( sym: str, first_bars: list, shm: 'ShmArray', # type: ignore # noqa - # count: int = 20, # NOTE: any more and we'll overrun the underlying buffer + # count: int = 20, # NOTE: any more and we'll overrun underlying buffer count: int = 2, # NOTE: any more and we'll overrun the underlying buffer ) -> None: """Fill historical bars into shared mem / storage afap. @@ -692,8 +714,14 @@ async def stream_quotes( # TODO: support multiple subscriptions sym = symbols[0] + contract, first_ticker = await _trio_run_client_method( + method='get_quote', + symbol=sym, + ) + stream = await _trio_run_client_method( method='stream_ticker', + contract=contract, # small speedup symbol=sym, ) @@ -701,14 +729,17 @@ async def stream_quotes( # check if a writer already is alive in a streaming task, # otherwise start one and mark it as now existing - async with activate_writer( - shm_token['shm_name'] - ) as (writer_already_exists, ln): - # maybe load historical ohlcv in to shared mem - # check if shm has already been created by previous - # feed initialization + key = shm_token['shm_name'] + + writer_already_exists = _local_buffer_writers.get(key, False) + + # maybe load historical ohlcv in to shared mem + # check if shm has already been created by previous + # feed initialization + async with trio.open_nursery() as ln: if not writer_already_exists: + _local_buffer_writers[key] = True shm = attach_shm_array( token=shm_token, @@ -744,12 +775,33 @@ async def stream_quotes( subscribe_ohlc_for_increment(shm, delay_s) # pass back token, and bool, signalling if we're the writer + # and that history has been written await ctx.send_yield((shm_token, not writer_already_exists)) - # first quote can be ignored as a 2nd with newer data is sent? - first_ticker = await stream.__anext__() + # check for special contract types + if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): + suffix = 'exchange' + # should be real volume for this contract + calc_price = False + else: + # commodities and forex don't have an exchange name and + # no real volume so we have to calculate the price + suffix = 'secType' + calc_price = True + ticker = first_ticker - quote = normalize(first_ticker) + # pass first quote asap + quote = normalize(first_ticker, calc_price=calc_price) + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + quote['symbol'] = topic + + first_quote = {topic: quote} + + # yield first quote asap + await ctx.send_yield(first_quote) + + # ticker.ticks = [] # ugh, clear ticks since we've consumed them # (ahem, ib_insync is stateful trash) @@ -762,39 +814,31 @@ async def stream_quotes( calc_price = False # should be real volume for contract - async for ticker in stream: + # wait for real volume on feed (trading might be closed) + async for ticker in stream: + + # for a real volume contract we rait for the first + # "real" trade to take place + if not calc_price and not ticker.rtTime: # spin consuming tickers until we get a real market datum - if not ticker.rtTime: - log.debug(f"New unsent ticker: {ticker}") - continue - else: - log.debug("Received first real volume tick") - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is truly stateful trash) - ticker.ticks = [] + log.debug(f"New unsent ticker: {ticker}") + continue + else: + log.debug("Received first real volume tick") + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is truly stateful trash) + ticker.ticks = [] - # XXX: this works because we don't use - # ``aclosing()`` above? - break - else: - # commodities don't have an exchange name for some reason? - suffix = 'secType' - calc_price = True - ticker = first_ticker + # tell incrementer task it can start + _buffer.shm_incrementing(key).set() - quote = normalize(ticker, calc_price=calc_price) - con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() - quote['symbol'] = topic - - first_quote = {topic: quote} - ticker.ticks = [] - - # yield first quote asap - await ctx.send_yield(first_quote) + # XXX: this works because we don't use + # ``aclosing()`` above? + break # real-time stream async for ticker in stream: + # print(ticker.vwap) quote = normalize( ticker,