Compare commits

..

No commits in common. "1c072b3ccd484c83bab72607d2ded391ab82f319" and "e4a68bdec992abb6b8be3f906e92802ca361ae90" have entirely different histories.

3 changed files with 16 additions and 36 deletions

View File

@ -587,7 +587,7 @@ async def get_bars(
data_cs.cancel()
# spawn new data reset task
data_cs, reset_done = await tn.start(
data_cs, reset_done = await nurse.start(
partial(
wait_on_data_reset,
proxy,
@ -607,11 +607,11 @@ async def get_bars(
# such that simultaneous symbol queries don't try data resettingn
# too fast..
unset_resetter: bool = False
async with trio.open_nursery() as tn:
async with trio.open_nursery() as nurse:
# start history request that we allow
# to run indefinitely until a result is acquired
tn.start_soon(query)
nurse.start_soon(query)
# start history reset loop which waits up to the timeout
# for a result before triggering a data feed reset.
@ -631,7 +631,7 @@ async def get_bars(
unset_resetter: bool = True
# spawn new data reset task
data_cs, reset_done = await tn.start(
data_cs, reset_done = await nurse.start(
partial(
wait_on_data_reset,
proxy,
@ -705,9 +705,7 @@ async def _setup_quote_stream(
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def teardown():
ticker.updateEvent.disconnect(push)
log.error(
f'Disconnected stream for `{symbol}`'
)
log.error(f"Disconnected stream for `{symbol}`")
client.ib.cancelMktData(contract)
# decouple broadcast mem chan
@ -763,10 +761,7 @@ async def open_aio_quote_stream(
symbol: str,
contract: Contract | None = None,
) -> (
trio.abc.Channel| # iface
tractor.to_asyncio.LinkedTaskChannel # actually
):
) -> trio.abc.ReceiveStream:
from tractor.trionics import broadcast_receiver
global _quote_streams
@ -783,7 +778,6 @@ async def open_aio_quote_stream(
yield from_aio
return
from_aio: tractor.to_asyncio.LinkedTaskChannel
async with tractor.to_asyncio.open_channel_from(
_setup_quote_stream,
symbol=symbol,
@ -989,18 +983,17 @@ async def stream_quotes(
)
cs: trio.CancelScope | None = None
startup: bool = True
iter_quotes: trio.abc.Channel
while (
startup
or cs.cancel_called
):
with trio.CancelScope() as cs:
async with (
trio.open_nursery() as tn,
trio.open_nursery() as nurse,
open_aio_quote_stream(
symbol=sym,
contract=con,
) as iter_quotes,
) as stream,
):
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
@ -1028,9 +1021,9 @@ async def stream_quotes(
await rt_ev.wait()
cs.cancel() # cancel called should now be set
tn.start_soon(reset_on_feed)
nurse.start_soon(reset_on_feed)
async with aclosing(iter_quotes):
async with aclosing(stream):
# if syminfo.get('no_vlm', False):
if not init_msg.shm_write_opts['has_vlm']:
@ -1045,21 +1038,19 @@ async def stream_quotes(
# wait for real volume on feed (trading might be
# closed)
while True:
ticker = await iter_quotes.receive()
ticker = await stream.receive()
# for a real volume contract we rait for
# the first "real" trade to take place
if (
# not calc_price
# and not ticker.rtTime
False
# not ticker.rtTime
not ticker.rtTime
):
# spin consuming tickers until we
# get a real market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first volume tick")
# ugh, clear ticks since we've
@ -1075,18 +1066,13 @@ async def stream_quotes(
log.debug(f"First ticker received {quote}")
# tell data-layer spawner-caller that live
# quotes are now active desptie not having
# necessarily received a first vlm/clearing
# tick.
ticker = await iter_quotes.receive()
# quotes are now streaming.
feed_is_live.set()
fqme: str = quote['fqme']
await send_chan.send({fqme: quote})
# last = time.time()
async for ticker in iter_quotes:
async for ticker in stream:
quote = normalize(ticker)
fqme: str = quote['fqme']
fqme = quote['fqme']
await send_chan.send({fqme: quote})
# ugh, clear ticks since we've consumed them

View File

@ -544,7 +544,7 @@ async def open_trade_dialog(
# to be reloaded.
balances: dict[str, float] = await client.get_balances()
await verify_balances(
verify_balances(
acnt,
src_fiat,
balances,

View File

@ -37,12 +37,6 @@ import tractor
from async_generator import asynccontextmanager
import numpy as np
import wrapt
# TODO, port to `httpx`/`trio-websocket` whenver i get back to
# writing a proper ws-api streamer for this backend (since the data
# feeds are free now) as per GH feat-req:
# https://github.com/pikers/piker/issues/509
#
import asks
from ..calc import humanize, percent_change