From 7de20ebe4223af133bb3f792ecd89302de38956f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Feb 2025 12:32:43 -0500 Subject: [PATCH] `.brokers.ib.feed`: better `tractor.to_asyncio` typing and var naming throughout! --- piker/brokers/ib/feed.py | 44 ++++++++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 2c1a9224..062b2c2e 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -587,7 +587,7 @@ async def get_bars( data_cs.cancel() # spawn new data reset task - data_cs, reset_done = await nurse.start( + data_cs, reset_done = await tn.start( partial( wait_on_data_reset, proxy, @@ -607,11 +607,11 @@ async def get_bars( # such that simultaneous symbol queries don't try data resettingn # too fast.. unset_resetter: bool = False - async with trio.open_nursery() as nurse: + async with trio.open_nursery() as tn: # start history request that we allow # to run indefinitely until a result is acquired - nurse.start_soon(query) + tn.start_soon(query) # start history reset loop which waits up to the timeout # for a result before triggering a data feed reset. @@ -631,7 +631,7 @@ async def get_bars( unset_resetter: bool = True # spawn new data reset task - data_cs, reset_done = await nurse.start( + data_cs, reset_done = await tn.start( partial( wait_on_data_reset, proxy, @@ -705,7 +705,9 @@ async def _setup_quote_stream( # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore def teardown(): ticker.updateEvent.disconnect(push) - log.error(f"Disconnected stream for `{symbol}`") + log.error( + f'Disconnected stream for `{symbol}`' + ) client.ib.cancelMktData(contract) # decouple broadcast mem chan @@ -761,7 +763,10 @@ async def open_aio_quote_stream( symbol: str, contract: Contract | None = None, -) -> trio.abc.ReceiveStream: +) -> ( + trio.abc.Channel| # iface + tractor.to_asyncio.LinkedTaskChannel # actually +): from tractor.trionics import broadcast_receiver global _quote_streams @@ -778,6 +783,7 @@ async def open_aio_quote_stream( yield from_aio return + from_aio: tractor.to_asyncio.LinkedTaskChannel async with tractor.to_asyncio.open_channel_from( _setup_quote_stream, symbol=symbol, @@ -983,17 +989,18 @@ async def stream_quotes( ) cs: trio.CancelScope | None = None startup: bool = True + iter_quotes: trio.abc.Channel while ( startup or cs.cancel_called ): with trio.CancelScope() as cs: async with ( - trio.open_nursery() as nurse, + trio.open_nursery() as tn, open_aio_quote_stream( symbol=sym, contract=con, - ) as stream, + ) as iter_quotes, ): # ugh, clear ticks since we've consumed them # (ahem, ib_insync is stateful trash) @@ -1021,9 +1028,9 @@ async def stream_quotes( await rt_ev.wait() cs.cancel() # cancel called should now be set - nurse.start_soon(reset_on_feed) + tn.start_soon(reset_on_feed) - async with aclosing(stream): + async with aclosing(iter_quotes): # if syminfo.get('no_vlm', False): if not init_msg.shm_write_opts['has_vlm']: @@ -1038,19 +1045,21 @@ async def stream_quotes( # wait for real volume on feed (trading might be # closed) while True: - ticker = await stream.receive() + ticker = await iter_quotes.receive() # for a real volume contract we rait for # the first "real" trade to take place if ( # not calc_price # and not ticker.rtTime - not ticker.rtTime + False + # not ticker.rtTime ): # spin consuming tickers until we # get a real market datum log.debug(f"New unsent ticker: {ticker}") continue + else: log.debug("Received first volume tick") # ugh, clear ticks since we've @@ -1066,13 +1075,18 @@ async def stream_quotes( log.debug(f"First ticker received {quote}") # tell data-layer spawner-caller that live - # quotes are now streaming. + # quotes are now active desptie not having + # necessarily received a first vlm/clearing + # tick. + ticker = await iter_quotes.receive() feed_is_live.set() + fqme: str = quote['fqme'] + await send_chan.send({fqme: quote}) # last = time.time() - async for ticker in stream: + async for ticker in iter_quotes: quote = normalize(ticker) - fqme = quote['fqme'] + fqme: str = quote['fqme'] await send_chan.send({fqme: quote}) # ugh, clear ticks since we've consumed them