diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 99475afc..16d74b9d 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -385,54 +385,6 @@ class Client: formatDate=2, # timezone aware UTC datetime ) - async def stream_ticker( - self, - symbol: str, - opts: Tuple[int] = ('375', '233', '236'), - contract: Optional[Contract] = None, - ) -> None: - """Stream a ticker using the std L1 api. - """ - contract = contract or (await self.find_contract(symbol)) - ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) - - feed = self._feeds.get(symbol) - if feed: - # do something else - # await tractor.breakpoint() - to_trio, from_aio = feed - return from_aio.clone() - - # define a simple queue push routine that streams quote packets - # to trio over the ``to_trio`` memory channel. - to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore - - def push(t): - """Push quotes to trio task. - - """ - # log.debug(t) - try: - to_trio.send_nowait(t) - except trio.BrokenResourceError: - # XXX: eventkit's ``Event.emit()`` for whatever redic - # reason will catch and ignore regular exceptions - # resulting in tracebacks spammed to console.. - # Manually do the dereg ourselves. - ticker.updateEvent.disconnect(push) - log.error(f"Disconnected stream for `{symbol}`") - self.ib.cancelMktData(contract) - - # decouple broadcast mem chan - self._feeds.pop(symbol, None) - - ticker.updateEvent.connect(push) - - # cache feed for later consumers - self._feeds[symbol] = to_trio, from_aio - - return from_aio - async def get_quote( self, symbol: str, @@ -626,6 +578,8 @@ async def _aio_get_client( client_id: Optional[int] = None, ) -> Client: """Return an ``ib_insync.IB`` instance wrapped in our client API. + + Client instances are cached for later use. """ # first check cache for existing client @@ -665,8 +619,10 @@ async def _aio_get_client( # create and cache try: client = Client(ib) + _client_cache[(host, port)] = client log.debug(f"Caching client for {(host, port)}") + yield client except BaseException: @@ -918,65 +874,78 @@ asset_type_map = { } - _quote_streams: Dict[str, trio.abc.ReceiveStream] = {} -async def stream_quotes( - client, +async def _setup_quote_stream( symbol: str, opts: Tuple[int] = ('375', '233', '236'), contract: Optional[Contract] = None, ) -> None: """Stream a ticker using the std L1 api. """ - contract = contract or (await client.find_contract(symbol)) - ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) + global _quote_streams - chans = _quote_streams.get(symbol) + async with _aio_get_client() as client: - if feed: + contract = contract or (await client.find_contract(symbol)) + ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) - # if we already have a cached feed deliver a rx side clone to - # consumer - to_trio, from_aio = chans + # define a simple queue push routine that streams quote packets + # to trio over the ``to_trio`` memory channel. + to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore + + def push(t): + """Push quotes to trio task. + + """ + # log.debug(t) + try: + to_trio.send_nowait(t) + except trio.BrokenResourceError: + # XXX: eventkit's ``Event.emit()`` for whatever redic + # reason will catch and ignore regular exceptions + # resulting in tracebacks spammed to console.. + # Manually do the dereg ourselves. + ticker.updateEvent.disconnect(push) + log.error(f"Disconnected stream for `{symbol}`") + client.ib.cancelMktData(contract) + + # decouple broadcast mem chan + _quote_streams.pop(symbol, None) + + ticker.updateEvent.connect(push) + + return from_aio + + +async def start_aio_quote_stream( + symbol: str, + contract: Optional[Contract] = None, +) -> trio.abc.ReceiveStream: + + global _quote_streams + + from_aio = _quote_streams.get(symbol) + if from_aio: + + # if we already have a cached feed deliver a rx side clone to consumer return from_aio.clone() + else: - # define a simple queue push routine that streams quote packets - # to trio over the ``to_trio`` memory channel. - to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore + from_aio = await tractor.to_asyncio.run_task( + _setup_quote_stream, + symbol=symbol, + contract=contract, + ) - def push(t): - """Push quotes to trio task. + # cache feed for later consumers + _quote_streams[symbol] = from_aio - """ - # log.debug(t) - try: - to_trio.send_nowait(t) - except trio.BrokenResourceError: - # XXX: eventkit's ``Event.emit()`` for whatever redic - # reason will catch and ignore regular exceptions - # resulting in tracebacks spammed to console.. - # Manually do the dereg ourselves. - ticker.updateEvent.disconnect(push) - log.error(f"Disconnected stream for `{symbol}`") - self.ib.cancelMktData(contract) - - # decouple broadcast mem chan - self._feeds.pop(symbol, None) - - ticker.updateEvent.connect(push) - - # cache feed for later consumers - _quote_streams[symbol] = to_trio, from_aio - - return from_aio + return from_aio -# TODO: figure out how to share quote feeds sanely despite -# the wacky ``ib_insync`` api. -# @tractor.msg.pub @tractor.stream async def stream_quotes( ctx: tractor.Context, @@ -1004,11 +973,7 @@ async def stream_quotes( symbol=sym, ) - stream = await _trio_run_client_method( - method='stream_ticker', - contract=contract, # small speedup - symbol=sym, - ) + stream = await start_aio_quote_stream(symbol=sym, contract=contract) shm = None async with trio.open_nursery() as ln: @@ -1059,8 +1024,6 @@ async def stream_quotes( subscribe_ohlc_for_increment(shm, delay_s) # pass back some symbol info like min_tick, trading_hours, etc. - # con = asdict(contract) - # syminfo = contract syminfo = asdict(details) syminfo.update(syminfo['contract']) @@ -1111,8 +1074,6 @@ async def stream_quotes( # yield first quote asap await ctx.send_yield(first_quote) - # ticker.ticks = [] - # ugh, clear ticks since we've consumed them # (ahem, ib_insync is stateful trash) first_ticker.ticks = []