From aa61bf5a65299aed80fc8dda980ded39940dfb92 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 23 Mar 2021 08:35:11 -0400 Subject: [PATCH] Use mem-chans for quote streams; clone for multiple consumers --- piker/brokers/ib.py | 211 ++++++++++++++++++++++++++++++-------------- 1 file changed, 143 insertions(+), 68 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 2679e988..99475afc 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -168,6 +168,7 @@ class Client: # contract cache self._contracts: Dict[str, Contract] = {} + self._feeds: Dict[str, trio.abc.SendChannel] = {} # NOTE: the ib.client here is "throttled" to 45 rps by default @@ -387,7 +388,6 @@ class Client: async def stream_ticker( self, symbol: str, - to_trio, opts: Tuple[int] = ('375', '233', '236'), contract: Optional[Contract] = None, ) -> None: @@ -396,8 +396,16 @@ class Client: contract = contract or (await self.find_contract(symbol)) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) + feed = self._feeds.get(symbol) + if feed: + # do something else + # await tractor.breakpoint() + to_trio, from_aio = feed + return from_aio.clone() + # define a simple queue push routine that streams quote packets # to trio over the ``to_trio`` memory channel. + to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore def push(t): """Push quotes to trio task. @@ -415,10 +423,15 @@ class Client: log.error(f"Disconnected stream for `{symbol}`") self.ib.cancelMktData(contract) + # decouple broadcast mem chan + self._feeds.pop(symbol, None) + ticker.updateEvent.connect(push) - # let the engine run and stream - await self.ib.disconnectedEvent + # cache feed for later consumers + self._feeds[symbol] = to_trio, from_aio + + return from_aio async def get_quote( self, @@ -691,13 +704,14 @@ async def _trio_run_client_method( # if the method is an *async gen* stream for it meth = getattr(Client, method) - if inspect.isasyncgenfunction(meth): - kwargs['_treat_as_stream'] = True - # if the method is an *async func* but manually - # streams back results, make sure to also stream it args = tuple(inspect.getfullargspec(meth).args) - if 'to_trio' in args: + + if inspect.isasyncgenfunction(meth) or ( + # if the method is an *async func* but manually + # streams back results, make sure to also stream it + 'to_trio' in args + ): kwargs['_treat_as_stream'] = True result = await tractor.to_asyncio.run_task( @@ -780,7 +794,7 @@ def normalize( # convert named tuples to dicts so we send usable keys new_ticks = [] for tick in ticker.ticks: - if tick: + if tick and not isinstance(tick, dict): td = tick._asdict() td['type'] = tick_types.get(td['tickType'], 'n/a') @@ -840,7 +854,7 @@ async def fill_bars( first_bars: list, shm: 'ShmArray', # type: ignore # noqa # count: int = 20, # NOTE: any more and we'll overrun underlying buffer - count: int = 6, # NOTE: any more and we'll overrun the underlying buffer + count: int = 10, # NOTE: any more and we'll overrun the underlying buffer ) -> None: """Fill historical bars into shared mem / storage afap. @@ -904,6 +918,62 @@ asset_type_map = { } + +_quote_streams: Dict[str, trio.abc.ReceiveStream] = {} + + +async def stream_quotes( + client, + symbol: str, + opts: Tuple[int] = ('375', '233', '236'), + contract: Optional[Contract] = None, +) -> None: + """Stream a ticker using the std L1 api. + """ + contract = contract or (await client.find_contract(symbol)) + ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) + + chans = _quote_streams.get(symbol) + + if feed: + + # if we already have a cached feed deliver a rx side clone to + # consumer + to_trio, from_aio = chans + return from_aio.clone() + + + # define a simple queue push routine that streams quote packets + # to trio over the ``to_trio`` memory channel. + to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore + + def push(t): + """Push quotes to trio task. + + """ + # log.debug(t) + try: + to_trio.send_nowait(t) + except trio.BrokenResourceError: + # XXX: eventkit's ``Event.emit()`` for whatever redic + # reason will catch and ignore regular exceptions + # resulting in tracebacks spammed to console.. + # Manually do the dereg ourselves. + ticker.updateEvent.disconnect(push) + log.error(f"Disconnected stream for `{symbol}`") + self.ib.cancelMktData(contract) + + # decouple broadcast mem chan + self._feeds.pop(symbol, None) + + ticker.updateEvent.connect(push) + + # cache feed for later consumers + _quote_streams[symbol] = to_trio, from_aio + + return from_aio + + # TODO: figure out how to share quote feeds sanely despite # the wacky ``ib_insync`` api. # @tractor.msg.pub @@ -1058,6 +1128,7 @@ async def stream_quotes( # wait for real volume on feed (trading might be closed) async with aclosing(stream): + async for ticker in stream: # for a real volume contract we rait for the first @@ -1081,25 +1152,28 @@ async def stream_quotes( # enter stream loop try: - await stream_and_write( - stream=stream, - calc_price=calc_price, - topic=topic, - writer_already_exists=writer_already_exists, - shm=shm, - suffix=suffix, - ctx=ctx, - ) + async with stream: + await stream_and_write( + stream=stream, + calc_price=calc_price, + topic=topic, + write_shm=not writer_already_exists, + shm=shm, + suffix=suffix, + ctx=ctx, + ) finally: if not writer_already_exists: _local_buffer_writers[key] = False + stream.close() + async def stream_and_write( stream, calc_price: bool, topic: str, - writer_already_exists: bool, + write_shm: bool, suffix: str, ctx: tractor.Context, shm: Optional['SharedArray'], # noqa @@ -1108,64 +1182,65 @@ async def stream_and_write( """ # real-time stream - async for ticker in stream: + async with stream: + async for ticker in stream: - # print(ticker.vwap) - quote = normalize( - ticker, - calc_price=calc_price - ) - quote['symbol'] = topic - # TODO: in theory you can send the IPC msg *before* - # writing to the sharedmem array to decrease latency, - # however, that will require `tractor.msg.pub` support - # here or at least some way to prevent task switching - # at the yield such that the array write isn't delayed - # while another consumer is serviced.. + # print(ticker.vwap) + quote = normalize( + ticker, + calc_price=calc_price + ) + quote['symbol'] = topic + # TODO: in theory you can send the IPC msg *before* + # writing to the sharedmem array to decrease latency, + # however, that will require `tractor.msg.pub` support + # here or at least some way to prevent task switching + # at the yield such that the array write isn't delayed + # while another consumer is serviced.. - # if we are the lone tick writer start writing - # the buffer with appropriate trade data - if not writer_already_exists: - for tick in iterticks(quote, types=('trade', 'utrade',)): - last = tick['price'] + # if we are the lone tick writer start writing + # the buffer with appropriate trade data + if write_shm: + for tick in iterticks(quote, types=('trade', 'utrade',)): + last = tick['price'] - # print(f"{quote['symbol']}: {tick}") + # print(f"{quote['symbol']}: {tick}") - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] + # update last entry + # benchmarked in the 4-5 us range + o, high, low, v = shm.array[-1][ + ['open', 'high', 'low', 'volume'] + ] - new_v = tick.get('size', 0) + new_v = tick.get('size', 0) - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last + if v == 0 and new_v: + # no trades for this bar yet so the open + # is also the close/last trade price + o = last - shm.array[[ - 'open', - 'high', - 'low', - 'close', - 'volume', - ]][-1] = ( - o, - max(high, last), - min(low, last), - last, - v + new_v, - ) + shm.array[[ + 'open', + 'high', + 'low', + 'close', + 'volume', + ]][-1] = ( + o, + max(high, last), + min(low, last), + last, + v + new_v, + ) - con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() - quote['symbol'] = topic + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + quote['symbol'] = topic - await ctx.send_yield({topic: quote}) + await ctx.send_yield({topic: quote}) - # ugh, clear ticks since we've consumed them - ticker.ticks = [] + # ugh, clear ticks since we've consumed them + ticker.ticks = [] def pack_position(pos: Position) -> Dict[str, Any]: