`.brokers.ib.feed`: better `tractor.to_asyncio` typing and var naming throughout!

Tyler Goodlet 2025-02-13 12:32:43 -05:00
parent 1e3942fdc2
commit 7a21e87688
1 changed files with 29 additions and 15 deletions

View File

@ -587,7 +587,7 @@ async def get_bars(
data_cs.cancel()
# spawn new data reset task
data_cs, reset_done = await nurse.start(
data_cs, reset_done = await tn.start(
partial(
wait_on_data_reset,
proxy,
@ -607,11 +607,11 @@ async def get_bars(
# such that simultaneous symbol queries don't try data resettingn
# too fast..
unset_resetter: bool = False
async with trio.open_nursery() as nurse:
async with trio.open_nursery() as tn:
# start history request that we allow
# to run indefinitely until a result is acquired
nurse.start_soon(query)
tn.start_soon(query)
# start history reset loop which waits up to the timeout
# for a result before triggering a data feed reset.
@ -631,7 +631,7 @@ async def get_bars(
unset_resetter: bool = True
# spawn new data reset task
data_cs, reset_done = await nurse.start(
data_cs, reset_done = await tn.start(
partial(
wait_on_data_reset,
proxy,
@ -705,7 +705,9 @@ async def _setup_quote_stream(
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def teardown():
ticker.updateEvent.disconnect(push)
log.error(f"Disconnected stream for `{symbol}`")
log.error(
f'Disconnected stream for `{symbol}`'
)
client.ib.cancelMktData(contract)
# decouple broadcast mem chan
@ -761,7 +763,10 @@ async def open_aio_quote_stream(
symbol: str,
contract: Contract | None = None,
) -> trio.abc.ReceiveStream:
) -> (
trio.abc.Channel| # iface
tractor.to_asyncio.LinkedTaskChannel # actually
):
from tractor.trionics import broadcast_receiver
global _quote_streams
@ -778,6 +783,7 @@ async def open_aio_quote_stream(
yield from_aio
return
from_aio: tractor.to_asyncio.LinkedTaskChannel
async with tractor.to_asyncio.open_channel_from(
_setup_quote_stream,
symbol=symbol,
@ -983,17 +989,18 @@ async def stream_quotes(
)
cs: trio.CancelScope | None = None
startup: bool = True
iter_quotes: trio.abc.Channel
while (
startup
or cs.cancel_called
):
with trio.CancelScope() as cs:
async with (
trio.open_nursery() as nurse,
trio.open_nursery() as tn,
open_aio_quote_stream(
symbol=sym,
contract=con,
) as stream,
) as iter_quotes,
):
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
@ -1021,9 +1028,9 @@ async def stream_quotes(
await rt_ev.wait()
cs.cancel() # cancel called should now be set
nurse.start_soon(reset_on_feed)
tn.start_soon(reset_on_feed)
async with aclosing(stream):
async with aclosing(iter_quotes):
# if syminfo.get('no_vlm', False):
if not init_msg.shm_write_opts['has_vlm']:
@ -1038,19 +1045,21 @@ async def stream_quotes(
# wait for real volume on feed (trading might be
# closed)
while True:
ticker = await stream.receive()
ticker = await iter_quotes.receive()
# for a real volume contract we rait for
# the first "real" trade to take place
if (
# not calc_price
# and not ticker.rtTime
not ticker.rtTime
False
# not ticker.rtTime
):
# spin consuming tickers until we
# get a real market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first volume tick")
# ugh, clear ticks since we've
@ -1066,13 +1075,18 @@ async def stream_quotes(
log.debug(f"First ticker received {quote}")
# tell data-layer spawner-caller that live
# quotes are now streaming.
# quotes are now active desptie not having
# necessarily received a first vlm/clearing
# tick.
ticker = await iter_quotes.receive()
feed_is_live.set()
fqme: str = quote['fqme']
await send_chan.send({fqme: quote})
# last = time.time()
async for ticker in stream:
async for ticker in iter_quotes:
quote = normalize(ticker)
fqme = quote['fqme']
fqme: str = quote['fqme']
await send_chan.send({fqme: quote})
# ugh, clear ticks since we've consumed them