Pass back interal cancel scope from data reset task
parent
3301619647
commit
ac7ba500be
|
@ -192,7 +192,6 @@ async def wait_on_data_reset(
|
||||||
|
|
||||||
log.warning('Sending DATA RESET request')
|
log.warning('Sending DATA RESET request')
|
||||||
await data_reset_hack(reset_type='data')
|
await data_reset_hack(reset_type='data')
|
||||||
task_status.started()
|
|
||||||
|
|
||||||
with trio.move_on_after(timeout) as cs:
|
with trio.move_on_after(timeout) as cs:
|
||||||
for name, ev in [
|
for name, ev in [
|
||||||
|
@ -203,16 +202,18 @@ async def wait_on_data_reset(
|
||||||
# sent?
|
# sent?
|
||||||
('history', hist_ev),
|
('history', hist_ev),
|
||||||
]:
|
]:
|
||||||
|
task_status.started(cs)
|
||||||
await ev.wait()
|
await ev.wait()
|
||||||
log.info(f"{name} DATA RESET")
|
log.info(f"{name} DATA RESET")
|
||||||
break
|
break
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if (
|
||||||
# fails += 1
|
cs.cancelled_caught
|
||||||
|
and not cs.cancel_called
|
||||||
|
):
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Data reset {name} timeout, retrying {i}.'
|
f'Data reset {name} timeout, retrying {i}.'
|
||||||
)
|
)
|
||||||
|
|
||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
|
|
||||||
|
@ -223,7 +224,6 @@ async def wait_on_data_reset(
|
||||||
'NO VNC DETECTED!\n'
|
'NO VNC DETECTED!\n'
|
||||||
'Manually press ctrl-alt-f on your IB java app'
|
'Manually press ctrl-alt-f on your IB java app'
|
||||||
)
|
)
|
||||||
# break
|
|
||||||
|
|
||||||
with trio.move_on_after(timeout) as cs:
|
with trio.move_on_after(timeout) as cs:
|
||||||
for name, ev in [
|
for name, ev in [
|
||||||
|
@ -238,7 +238,6 @@ async def wait_on_data_reset(
|
||||||
log.info(f"{name} DATA RESET")
|
log.info(f"{name} DATA RESET")
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
# fails += 1
|
|
||||||
log.warning('Data CONNECTION RESET timeout!?')
|
log.warning('Data CONNECTION RESET timeout!?')
|
||||||
|
|
||||||
|
|
||||||
|
@ -246,7 +245,7 @@ async def get_bars(
|
||||||
|
|
||||||
proxy: MethodProxy,
|
proxy: MethodProxy,
|
||||||
fqsn: str,
|
fqsn: str,
|
||||||
period: float,
|
timeframe: int,
|
||||||
|
|
||||||
# blank to start which tells ib to look up the latest datum
|
# blank to start which tells ib to look up the latest datum
|
||||||
end_dt: str = '',
|
end_dt: str = '',
|
||||||
|
@ -276,7 +275,7 @@ async def get_bars(
|
||||||
out = await proxy.bars(
|
out = await proxy.bars(
|
||||||
fqsn=fqsn,
|
fqsn=fqsn,
|
||||||
end_dt=end_dt,
|
end_dt=end_dt,
|
||||||
sample_period_s=period,
|
sample_period_s=timeframe,
|
||||||
)
|
)
|
||||||
timeout = 3
|
timeout = 3
|
||||||
|
|
||||||
|
@ -563,10 +562,11 @@ async def _setup_quote_stream(
|
||||||
# Manually do the dereg ourselves.
|
# Manually do the dereg ourselves.
|
||||||
teardown()
|
teardown()
|
||||||
except trio.WouldBlock:
|
except trio.WouldBlock:
|
||||||
log.warning(
|
# log.warning(
|
||||||
f'channel is blocking symbol feed for {symbol}?'
|
# f'channel is blocking symbol feed for {symbol}?'
|
||||||
f'\n{to_trio.statistics}'
|
# f'\n{to_trio.statistics}'
|
||||||
)
|
# )
|
||||||
|
pass
|
||||||
|
|
||||||
# except trio.WouldBlock:
|
# except trio.WouldBlock:
|
||||||
# # for slow debugging purposes to avoid clobbering prompt
|
# # for slow debugging purposes to avoid clobbering prompt
|
||||||
|
|
Loading…
Reference in New Issue