Pass back interal cancel scope from data reset task

ib_1m_hist
Tyler Goodlet 2022-09-17 14:15:24 -04:00
parent 6b34c9e866
commit 72dfeb2b4e
1 changed files with 12 additions and 12 deletions

View File

@ -192,7 +192,6 @@ async def wait_on_data_reset(
log.warning('Sending DATA RESET request') log.warning('Sending DATA RESET request')
await data_reset_hack(reset_type='data') await data_reset_hack(reset_type='data')
task_status.started()
with trio.move_on_after(timeout) as cs: with trio.move_on_after(timeout) as cs:
for name, ev in [ for name, ev in [
@ -203,16 +202,18 @@ async def wait_on_data_reset(
# sent? # sent?
('history', hist_ev), ('history', hist_ev),
]: ]:
task_status.started(cs)
await ev.wait() await ev.wait()
log.info(f"{name} DATA RESET") log.info(f"{name} DATA RESET")
break break
if cs.cancelled_caught: if (
# fails += 1 cs.cancelled_caught
and not cs.cancel_called
):
log.warning( log.warning(
f'Data reset {name} timeout, retrying {i}.' f'Data reset {name} timeout, retrying {i}.'
) )
continue continue
else: else:
@ -223,7 +224,6 @@ async def wait_on_data_reset(
'NO VNC DETECTED!\n' 'NO VNC DETECTED!\n'
'Manually press ctrl-alt-f on your IB java app' 'Manually press ctrl-alt-f on your IB java app'
) )
# break
with trio.move_on_after(timeout) as cs: with trio.move_on_after(timeout) as cs:
for name, ev in [ for name, ev in [
@ -238,7 +238,6 @@ async def wait_on_data_reset(
log.info(f"{name} DATA RESET") log.info(f"{name} DATA RESET")
if cs.cancelled_caught: if cs.cancelled_caught:
# fails += 1
log.warning('Data CONNECTION RESET timeout!?') log.warning('Data CONNECTION RESET timeout!?')
@ -246,7 +245,7 @@ async def get_bars(
proxy: MethodProxy, proxy: MethodProxy,
fqsn: str, fqsn: str,
period: float, timeframe: int,
# blank to start which tells ib to look up the latest datum # blank to start which tells ib to look up the latest datum
end_dt: str = '', end_dt: str = '',
@ -276,7 +275,7 @@ async def get_bars(
out = await proxy.bars( out = await proxy.bars(
fqsn=fqsn, fqsn=fqsn,
end_dt=end_dt, end_dt=end_dt,
sample_period_s=period, sample_period_s=timeframe,
) )
timeout = 3 timeout = 3
@ -563,10 +562,11 @@ async def _setup_quote_stream(
# Manually do the dereg ourselves. # Manually do the dereg ourselves.
teardown() teardown()
except trio.WouldBlock: except trio.WouldBlock:
log.warning( # log.warning(
f'channel is blocking symbol feed for {symbol}?' # f'channel is blocking symbol feed for {symbol}?'
f'\n{to_trio.statistics}' # f'\n{to_trio.statistics}'
) # )
pass
# except trio.WouldBlock: # except trio.WouldBlock:
# # for slow debugging purposes to avoid clobbering prompt # # for slow debugging purposes to avoid clobbering prompt