`ib.feed`: finally solve `push()` exc propagation

Such that if/when the `push()` ticker callback (closure) errors
internally, we actually eventually bubble the error out-and-up from the
`asyncio.Task` and from there out the `.to_asyncio.open_channel_from()` to
the parent `trio.Task`..

It ended up being much more subtle to solve then i would have liked
thanks to,

- whatever `Ticker.updateEvent.connect()` does behind the scenes in
  terms of (clearly) swallowing with only log reporting any exc raised
  in the registered callback (in our case `push()`),

- `asyncio.Task.set_excepion()` never working and instead needing to
  resort to `Task.cancel()`, catching `CancelledError` and re-raising
  the stashed `maybe_exc` from `push()` when set..

Further this ports `.to_asyncio.open_channel_from()` usage to use
the new `chan: tractor.to_asyncio.LinkedTaskChannel` fn-sig API, namely
for `_setup_quote_stream()` task. Requires the latest `tractor` updates
to the inter-eventloop-chan iface providing a `.set_nowait()` and
`.get()` for the `asyncio`-side.

Impl deats within `_setup_quote_stream()`,
- implement `push()` error-bubbling by adding a `maybe_exc` which can be
  set by that callback itself or by its registering task; when set it is
  both,
  * reported on by the `teardown()` cb,
  * re-raised by the terminated (via `.cancel()`) `asyncio.Task` after
    woken from its sleep, aka "cancelled" (since that's apparently one
    of the only options.. see big rant further todo comments).
- add explicit error-tolerance-tuning via a `handler_tries: int` counter
  and `tries_before_raise: int` limit such that we only bubble
  a `push()` raised exc once enough tries have consecutively failed.
- as mentioned, use the new `chan` fn-sig support and thus the new
  method API for `asyncio` -> `trio` comms.
- a big TODO XXX around the need to use a better sys for terminating
  `asyncio.Task`s whether it's by delegating to some `.to_asyncio`
  internals after a factor-out OR by potentially going full bore `anyio`
  throughout `.to_asyncio`'s impl in general..
- mk `teardown()` use appropriate `log.<level>()`s based on outcome.

Surroundingly,
- add a ton of doc-strings to mod fns previously missing them.
- improved / added-new comments to `wait_on_data_reset()` internals and
  anything changed per ^above.
testing_utils
Tyler Goodlet 2025-09-21 22:38:05 -04:00
parent 0b123c9af9
commit 8c274efd18
1 changed files with 155 additions and 47 deletions

View File

@ -41,7 +41,6 @@ import numpy as np
from pendulum import (
now,
from_timestamp,
# DateTime,
Duration,
duration as mk_duration,
)
@ -290,8 +289,9 @@ _pacing: str = (
async def wait_on_data_reset(
proxy: MethodProxy,
reset_type: str = 'data',
timeout: float = 16, # float('inf'),
timeout: float = 16,
task_status: TaskStatus[
tuple[
@ -300,29 +300,47 @@ async def wait_on_data_reset(
]
] = trio.TASK_STATUS_IGNORED,
) -> bool:
'''
Wait on a (global-ish) "data-farm" event to be emitted
by the IB api server.
# TODO: we might have to put a task lock around this
# method..
hist_ev = proxy.status_event(
Allows syncing to reconnect event-messages emitted on the API
console, such as:
- 'HMDS data farm connection is OK:ushmds'
- 'Market data farm is connecting:usfuture'
- 'Market data farm connection is OK:usfuture'
Deliver a `(cs, done: Event)` pair to the caller to support it
waiting or cancelling the associated "data-reset-request";
normally a manual data-reset-req is expected to be the cause and
thus trigger such events (such as our click-hack-magic from
`.ib._util`).
'''
# ?TODO, do we need a task-lock around this method?
#
# register for an API "status event" wrapped for `trio`-sync.
hist_ev: trio.Event = proxy.status_event(
'HMDS data farm connection is OK:ushmds'
)
# TODO: other event messages we might want to try and
# wait for but i wasn't able to get any of this
# reliable..
#
# ^TODO: other event-messages we might want to support waiting-for
# but i wasn't able to get reliable..
#
# reconnect_start = proxy.status_event(
# 'Market data farm is connecting:usfuture'
# )
# live_ev = proxy.status_event(
# 'Market data farm connection is OK:usfuture'
# )
# try to wait on the reset event(s) to arrive, a timeout
# will trigger a retry up to 6 times (for now).
client: Client = proxy._aio_ns
done = trio.Event()
with trio.move_on_after(timeout) as cs:
task_status.started((cs, done))
log.warning(
@ -401,8 +419,9 @@ async def get_bars(
bool, # timed out hint
]:
'''
Retrieve historical data from a ``trio``-side task using
a ``MethoProxy``.
Request-n-retrieve historical data frames from a `trio.Task`
using a `MethoProxy` to query the `asyncio`-side's
`.ib.api.Client` methods.
'''
global _data_resetter_task, _failed_resets
@ -661,14 +680,14 @@ async def get_bars(
)
# per-actor cache of inter-eventloop-chans
_quote_streams: dict[str, trio.abc.ReceiveStream] = {}
# TODO! update to the new style sig with,
# `chan: to_asyncio.LinkedTaskChannel,`
async def _setup_quote_stream(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
chan: tractor.to_asyncio.LinkedTaskChannel,
symbol: str,
opts: tuple[int] = (
'375', # RT trade volume (excludes utrades)
@ -686,10 +705,13 @@ async def _setup_quote_stream(
) -> trio.abc.ReceiveChannel:
'''
Stream a ticker using the std L1 api.
Stream L1 quotes via the `Ticker.updateEvent.connect(push)`
callback API by registering a `push` callback which simply
`chan.send_nowait()`s quote msgs back to the calling
parent-`trio.Task`-side.
This task is ``asyncio``-side and must be called from
``tractor.to_asyncio.open_channel_from()``.
NOTE, that this task-fn is run on the `asyncio.Task`-side ONLY
and is thus run via `tractor.to_asyncio.open_channel_from()`.
'''
global _quote_streams
@ -698,43 +720,78 @@ async def _setup_quote_stream(
disconnect_on_exit=False,
) as accts2clients:
# since asyncio.Task
# XXX since this is an `asyncio.Task`, we must use
# tractor.pause_from_sync()
caccount_name, client = get_preferred_data_client(accts2clients)
contract = contract or (await client.find_contract(symbol))
to_trio.send_nowait(contract) # cuz why not
contract = (
contract
or
(await client.find_contract(symbol))
)
chan.started_nowait(contract) # cuz why not
ticker: Ticker = client.ib.reqMktData(
contract,
','.join(opts),
)
maybe_exc: BaseException|None = None
handler_tries: int = 0
aio_task: asyncio.Task = asyncio.current_task()
# NOTE: it's batch-wise and slow af but I guess could
# be good for backchecking? Seems to be every 5s maybe?
# ?TODO? this API is batch-wise and quite slow-af but,
# - seems to be 5s updates?
# - maybe we could use it for backchecking?
#
# ticker: Ticker = client.ib.reqTickByTickData(
# contract, 'Last',
# )
# # define a simple queue push routine that streams quote packets
# # to trio over the ``to_trio`` memory channel.
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
# define a very naive queue-pushing callback that relays
# quote-packets directly the calling (parent) `trio.Task`.
# Ensure on teardown we cancel the feed via their cancel API.
#
def teardown():
'''
Disconnect our `push`-er callback and cancel the data-feed
for `contract`.
'''
nonlocal maybe_exc
ticker.updateEvent.disconnect(push)
log.error(f"Disconnected stream for `{symbol}`")
report: str = f'Disconnected mkt-data for {symbol!r} due to '
if maybe_exc is not None:
report += (
'error,\n'
f'{maybe_exc!r}\n'
)
log.error(report)
else:
report += (
'cancellation.\n'
)
log.cancel(report)
client.ib.cancelMktData(contract)
# decouple broadcast mem chan
_quote_streams.pop(symbol, None)
def push(t: Ticker) -> None:
"""
Push quotes to trio task.
"""
def push(
t: Ticker,
tries_before_raise: int = 6,
) -> None:
'''
Push quotes verbatim to parent-side `trio.Task`.
'''
nonlocal maybe_exc, handler_tries
# log.debug(f'new IB quote: {t}\n')
try:
to_trio.send_nowait(t)
chan.send_nowait(t)
# XXX TODO XXX replicate in `tractor` tests
# as per `CancelledError`-handler notes below!
# assert 0
except (
trio.BrokenResourceError,
@ -754,29 +811,40 @@ async def _setup_quote_stream(
# with log msgs
except trio.WouldBlock:
log.exception(
f'Asyncio->Trio `to_trio.send_nowait()` blocked !?\n'
f'Asyncio->Trio `chan.send_nowait()` blocked !?\n'
f'\n'
f'{to_trio.statistics()}\n'
f'{chan._to_trio.statistics()}\n'
)
# ?TODO, handle re-connection attempts?
except BaseException as _berr:
berr = _berr
if handler_tries >= tries_before_raise:
# breakpoint()
maybe_exc = _berr
# task.set_exception(berr)
aio_task.cancel(msg=berr.args)
raise berr
else:
handler_tries += 1
log.exception(
f'Failed to push ticker quote !?\n'
f'cause: {berr}\n'
f'handler_tries={handler_tries!r}\n'
f'ticker: {t!r}\n'
f'\n'
f't: {t}\n'
f'{to_trio.statistics}\n'
f'{chan._to_trio.statistics()}\n'
f'\n'
f'CAUSE: {berr}\n'
)
# raise berr
ticker.updateEvent.connect(push)
try:
await asyncio.sleep(float('inf'))
# XXX, just for debug..
# XXX, for debug.. TODO? can we rm again?
#
# tractor.pause_from_sync()
# while True:
# await asyncio.sleep(1.6)
@ -790,20 +858,52 @@ async def _setup_quote_stream(
# 'UHH no ticker.ticks ??'
# )
finally:
teardown()
# XXX TODO XXX !?!?
# apparently **without this handler** and the subsequent
# re-raising of `maybe_exc from _taskc` cancelling the
# `aio_task` from the `push()`-callback will cause a very
# strange chain of exc raising that breaks alll sorts of
# downstream callers, tasks and remote-actor tasks!?
#
# -[ ] we need some lowlevel reproducting tests to replicate
# those worst-case scenarios in `tractor` core!!
# -[ ] likely we should factor-out the `tractor.to_asyncio`
# attempts at workarounds in `.translate_aio_errors()`
# for failed `asyncio.Task.set_exception()` to either
# call `aio_task.cancel()` and/or
# `aio_task._fut_waiter.set_exception()` to a re-useable
# toolset in something like a `.to_asyncio._utils`??
#
except asyncio.CancelledError as _taskc:
if maybe_exc is not None:
raise maybe_exc from _taskc
# return from_aio
raise _taskc
except BaseException as _berr:
# stash any crash cause for reporting in `teardown()`
maybe_exc = _berr
raise _berr
finally:
# always disconnect our `push()` and cancel the
# ib-"mkt-data-feed".
teardown()
@acm
async def open_aio_quote_stream(
symbol: str,
contract: Contract|None = None,
) -> trio.abc.ReceiveStream:
'''
Open a real-time `Ticker` quote stream from an `asyncio.Task`
spawned via `tractor.to_asyncio.open_channel_from()`, deliver the
inter-event-loop channel to the `trio.Task` caller and cache it
globally for re-use.
'''
from tractor.trionics import broadcast_receiver
global _quote_streams
@ -828,6 +928,10 @@ async def open_aio_quote_stream(
assert contract
# TODO? de-reg on teardown of last consumer task?
# -> why aren't we using `.trionics.maybe_open_context()`
# here again?? (we are in `open_client_proxies()` tho?)
#
# cache feed for later consumers
_quote_streams[symbol] = from_aio
@ -842,7 +946,12 @@ def normalize(
calc_price: bool = False
) -> dict:
'''
Translate `ib_async`'s `Ticker.ticks` values to a `piker`
normalized `dict` form for transmit to downstream `.data` layer
consumers.
'''
# check for special contract types
con = ticker.contract
fqme, calc_price = con2fqme(con)
@ -905,7 +1014,6 @@ def normalize(
# stable.
#
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,