diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 1dc27d86..6799b6d9 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -41,7 +41,6 @@ import numpy as np from pendulum import ( now, from_timestamp, - # DateTime, Duration, duration as mk_duration, ) @@ -290,8 +289,9 @@ _pacing: str = ( async def wait_on_data_reset( proxy: MethodProxy, + reset_type: str = 'data', - timeout: float = 16, # float('inf'), + timeout: float = 16, task_status: TaskStatus[ tuple[ @@ -300,29 +300,47 @@ async def wait_on_data_reset( ] ] = trio.TASK_STATUS_IGNORED, ) -> bool: + ''' + Wait on a (global-ish) "data-farm" event to be emitted + by the IB api server. - # TODO: we might have to put a task lock around this - # method.. - hist_ev = proxy.status_event( + Allows syncing to reconnect event-messages emitted on the API + console, such as: + + - 'HMDS data farm connection is OK:ushmds' + - 'Market data farm is connecting:usfuture' + - 'Market data farm connection is OK:usfuture' + + Deliver a `(cs, done: Event)` pair to the caller to support it + waiting or cancelling the associated "data-reset-request"; + normally a manual data-reset-req is expected to be the cause and + thus trigger such events (such as our click-hack-magic from + `.ib._util`). + + ''' + # ?TODO, do we need a task-lock around this method? + # + # register for an API "status event" wrapped for `trio`-sync. + hist_ev: trio.Event = proxy.status_event( 'HMDS data farm connection is OK:ushmds' ) - - # TODO: other event messages we might want to try and - # wait for but i wasn't able to get any of this - # reliable.. + # + # ^TODO: other event-messages we might want to support waiting-for + # but i wasn't able to get reliable.. + # # reconnect_start = proxy.status_event( # 'Market data farm is connecting:usfuture' # ) # live_ev = proxy.status_event( # 'Market data farm connection is OK:usfuture' # ) + # try to wait on the reset event(s) to arrive, a timeout # will trigger a retry up to 6 times (for now). client: Client = proxy._aio_ns done = trio.Event() with trio.move_on_after(timeout) as cs: - task_status.started((cs, done)) log.warning( @@ -401,8 +419,9 @@ async def get_bars( bool, # timed out hint ]: ''' - Retrieve historical data from a ``trio``-side task using - a ``MethoProxy``. + Request-n-retrieve historical data frames from a `trio.Task` + using a `MethoProxy` to query the `asyncio`-side's + `.ib.api.Client` methods. ''' global _data_resetter_task, _failed_resets @@ -661,14 +680,14 @@ async def get_bars( ) +# per-actor cache of inter-eventloop-chans _quote_streams: dict[str, trio.abc.ReceiveStream] = {} +# TODO! update to the new style sig with, +# `chan: to_asyncio.LinkedTaskChannel,` async def _setup_quote_stream( - - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, - + chan: tractor.to_asyncio.LinkedTaskChannel, symbol: str, opts: tuple[int] = ( '375', # RT trade volume (excludes utrades) @@ -686,10 +705,13 @@ async def _setup_quote_stream( ) -> trio.abc.ReceiveChannel: ''' - Stream a ticker using the std L1 api. + Stream L1 quotes via the `Ticker.updateEvent.connect(push)` + callback API by registering a `push` callback which simply + `chan.send_nowait()`s quote msgs back to the calling + parent-`trio.Task`-side. - This task is ``asyncio``-side and must be called from - ``tractor.to_asyncio.open_channel_from()``. + NOTE, that this task-fn is run on the `asyncio.Task`-side ONLY + and is thus run via `tractor.to_asyncio.open_channel_from()`. ''' global _quote_streams @@ -698,43 +720,78 @@ async def _setup_quote_stream( disconnect_on_exit=False, ) as accts2clients: - # since asyncio.Task + # XXX since this is an `asyncio.Task`, we must use # tractor.pause_from_sync() caccount_name, client = get_preferred_data_client(accts2clients) - contract = contract or (await client.find_contract(symbol)) - to_trio.send_nowait(contract) # cuz why not + contract = ( + contract + or + (await client.find_contract(symbol)) + ) + chan.started_nowait(contract) # cuz why not ticker: Ticker = client.ib.reqMktData( contract, ','.join(opts), ) + maybe_exc: BaseException|None = None + handler_tries: int = 0 + aio_task: asyncio.Task = asyncio.current_task() - # NOTE: it's batch-wise and slow af but I guess could - # be good for backchecking? Seems to be every 5s maybe? + # ?TODO? this API is batch-wise and quite slow-af but, + # - seems to be 5s updates? + # - maybe we could use it for backchecking? + # # ticker: Ticker = client.ib.reqTickByTickData( # contract, 'Last', # ) - # # define a simple queue push routine that streams quote packets - # # to trio over the ``to_trio`` memory channel. - # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore + # define a very naive queue-pushing callback that relays + # quote-packets directly the calling (parent) `trio.Task`. + # Ensure on teardown we cancel the feed via their cancel API. + # def teardown(): + ''' + Disconnect our `push`-er callback and cancel the data-feed + for `contract`. + + ''' + nonlocal maybe_exc ticker.updateEvent.disconnect(push) - log.error(f"Disconnected stream for `{symbol}`") + report: str = f'Disconnected mkt-data for {symbol!r} due to ' + if maybe_exc is not None: + report += ( + 'error,\n' + f'{maybe_exc!r}\n' + ) + log.error(report) + else: + report += ( + 'cancellation.\n' + ) + log.cancel(report) + client.ib.cancelMktData(contract) # decouple broadcast mem chan _quote_streams.pop(symbol, None) - def push(t: Ticker) -> None: - """ - Push quotes to trio task. - - """ + def push( + t: Ticker, + tries_before_raise: int = 6, + ) -> None: + ''' + Push quotes verbatim to parent-side `trio.Task`. + ''' + nonlocal maybe_exc, handler_tries # log.debug(f'new IB quote: {t}\n') try: - to_trio.send_nowait(t) + chan.send_nowait(t) + + # XXX TODO XXX replicate in `tractor` tests + # as per `CancelledError`-handler notes below! + # assert 0 except ( trio.BrokenResourceError, @@ -754,29 +811,40 @@ async def _setup_quote_stream( # with log msgs except trio.WouldBlock: log.exception( - f'Asyncio->Trio `to_trio.send_nowait()` blocked !?\n' + f'Asyncio->Trio `chan.send_nowait()` blocked !?\n' f'\n' - f'{to_trio.statistics()}\n' + f'{chan._to_trio.statistics()}\n' ) # ?TODO, handle re-connection attempts? except BaseException as _berr: berr = _berr + if handler_tries >= tries_before_raise: + # breakpoint() + maybe_exc = _berr + # task.set_exception(berr) + aio_task.cancel(msg=berr.args) + raise berr + else: + handler_tries += 1 + log.exception( f'Failed to push ticker quote !?\n' - f'cause: {berr}\n' + f'handler_tries={handler_tries!r}\n' + f'ticker: {t!r}\n' f'\n' - f't: {t}\n' - f'{to_trio.statistics}\n' + f'{chan._to_trio.statistics()}\n' + f'\n' + f'CAUSE: {berr}\n' ) - # raise berr ticker.updateEvent.connect(push) try: await asyncio.sleep(float('inf')) - # XXX, just for debug.. + # XXX, for debug.. TODO? can we rm again? + # # tractor.pause_from_sync() # while True: # await asyncio.sleep(1.6) @@ -790,20 +858,52 @@ async def _setup_quote_stream( # 'UHH no ticker.ticks ??' # ) - finally: - teardown() + # XXX TODO XXX !?!? + # apparently **without this handler** and the subsequent + # re-raising of `maybe_exc from _taskc` cancelling the + # `aio_task` from the `push()`-callback will cause a very + # strange chain of exc raising that breaks alll sorts of + # downstream callers, tasks and remote-actor tasks!? + # + # -[ ] we need some lowlevel reproducting tests to replicate + # those worst-case scenarios in `tractor` core!! + # -[ ] likely we should factor-out the `tractor.to_asyncio` + # attempts at workarounds in `.translate_aio_errors()` + # for failed `asyncio.Task.set_exception()` to either + # call `aio_task.cancel()` and/or + # `aio_task._fut_waiter.set_exception()` to a re-useable + # toolset in something like a `.to_asyncio._utils`?? + # + except asyncio.CancelledError as _taskc: + if maybe_exc is not None: + raise maybe_exc from _taskc - # return from_aio + raise _taskc + + except BaseException as _berr: + # stash any crash cause for reporting in `teardown()` + maybe_exc = _berr + raise _berr + + finally: + # always disconnect our `push()` and cancel the + # ib-"mkt-data-feed". + teardown() @acm async def open_aio_quote_stream( - symbol: str, - contract: Contract | None = None, + contract: Contract|None = None, ) -> trio.abc.ReceiveStream: + ''' + Open a real-time `Ticker` quote stream from an `asyncio.Task` + spawned via `tractor.to_asyncio.open_channel_from()`, deliver the + inter-event-loop channel to the `trio.Task` caller and cache it + globally for re-use. + ''' from tractor.trionics import broadcast_receiver global _quote_streams @@ -828,6 +928,10 @@ async def open_aio_quote_stream( assert contract + # TODO? de-reg on teardown of last consumer task? + # -> why aren't we using `.trionics.maybe_open_context()` + # here again?? (we are in `open_client_proxies()` tho?) + # # cache feed for later consumers _quote_streams[symbol] = from_aio @@ -842,7 +946,12 @@ def normalize( calc_price: bool = False ) -> dict: + ''' + Translate `ib_async`'s `Ticker.ticks` values to a `piker` + normalized `dict` form for transmit to downstream `.data` layer + consumers. + ''' # check for special contract types con = ticker.contract fqme, calc_price = con2fqme(con) @@ -905,7 +1014,6 @@ def normalize( # stable. # async def stream_quotes( - send_chan: trio.abc.SendChannel, symbols: list[str], feed_is_live: trio.Event,