|
|
|
|
@ -1,5 +1,5 @@
|
|
|
|
|
# piker: trading gear for hackers
|
|
|
|
|
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
|
|
|
|
# Copyright (C) 2018-forever Tyler Goodlet (in stewardship for pikers)
|
|
|
|
|
|
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
|
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
|
|
|
@ -13,10 +13,12 @@
|
|
|
|
|
|
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
|
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
|
"""
|
|
|
|
|
Data feed endpoints pre-wrapped and ready for use with ``tractor``/``trio``.
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
'''
|
|
|
|
|
Data feed endpoints pre-wrapped and ready for use with `tractor`/`trio`
|
|
|
|
|
via "infected-asyncio-mode".
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
import asyncio
|
|
|
|
|
from contextlib import (
|
|
|
|
|
@ -39,7 +41,6 @@ import numpy as np
|
|
|
|
|
from pendulum import (
|
|
|
|
|
now,
|
|
|
|
|
from_timestamp,
|
|
|
|
|
# DateTime,
|
|
|
|
|
Duration,
|
|
|
|
|
duration as mk_duration,
|
|
|
|
|
)
|
|
|
|
|
@ -288,8 +289,9 @@ _pacing: str = (
|
|
|
|
|
|
|
|
|
|
async def wait_on_data_reset(
|
|
|
|
|
proxy: MethodProxy,
|
|
|
|
|
|
|
|
|
|
reset_type: str = 'data',
|
|
|
|
|
timeout: float = 16, # float('inf'),
|
|
|
|
|
timeout: float = 16,
|
|
|
|
|
|
|
|
|
|
task_status: TaskStatus[
|
|
|
|
|
tuple[
|
|
|
|
|
@ -298,29 +300,47 @@ async def wait_on_data_reset(
|
|
|
|
|
]
|
|
|
|
|
] = trio.TASK_STATUS_IGNORED,
|
|
|
|
|
) -> bool:
|
|
|
|
|
'''
|
|
|
|
|
Wait on a (global-ish) "data-farm" event to be emitted
|
|
|
|
|
by the IB api server.
|
|
|
|
|
|
|
|
|
|
# TODO: we might have to put a task lock around this
|
|
|
|
|
# method..
|
|
|
|
|
hist_ev = proxy.status_event(
|
|
|
|
|
Allows syncing to reconnect event-messages emitted on the API
|
|
|
|
|
console, such as:
|
|
|
|
|
|
|
|
|
|
- 'HMDS data farm connection is OK:ushmds'
|
|
|
|
|
- 'Market data farm is connecting:usfuture'
|
|
|
|
|
- 'Market data farm connection is OK:usfuture'
|
|
|
|
|
|
|
|
|
|
Deliver a `(cs, done: Event)` pair to the caller to support it
|
|
|
|
|
waiting or cancelling the associated "data-reset-request";
|
|
|
|
|
normally a manual data-reset-req is expected to be the cause and
|
|
|
|
|
thus trigger such events (such as our click-hack-magic from
|
|
|
|
|
`.ib._util`).
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
|
# ?TODO, do we need a task-lock around this method?
|
|
|
|
|
#
|
|
|
|
|
# register for an API "status event" wrapped for `trio`-sync.
|
|
|
|
|
hist_ev: trio.Event = proxy.status_event(
|
|
|
|
|
'HMDS data farm connection is OK:ushmds'
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# TODO: other event messages we might want to try and
|
|
|
|
|
# wait for but i wasn't able to get any of this
|
|
|
|
|
# reliable..
|
|
|
|
|
#
|
|
|
|
|
# ^TODO: other event-messages we might want to support waiting-for
|
|
|
|
|
# but i wasn't able to get reliable..
|
|
|
|
|
#
|
|
|
|
|
# reconnect_start = proxy.status_event(
|
|
|
|
|
# 'Market data farm is connecting:usfuture'
|
|
|
|
|
# )
|
|
|
|
|
# live_ev = proxy.status_event(
|
|
|
|
|
# 'Market data farm connection is OK:usfuture'
|
|
|
|
|
# )
|
|
|
|
|
|
|
|
|
|
# try to wait on the reset event(s) to arrive, a timeout
|
|
|
|
|
# will trigger a retry up to 6 times (for now).
|
|
|
|
|
client: Client = proxy._aio_ns
|
|
|
|
|
|
|
|
|
|
done = trio.Event()
|
|
|
|
|
with trio.move_on_after(timeout) as cs:
|
|
|
|
|
|
|
|
|
|
task_status.started((cs, done))
|
|
|
|
|
|
|
|
|
|
log.warning(
|
|
|
|
|
@ -399,8 +419,9 @@ async def get_bars(
|
|
|
|
|
bool, # timed out hint
|
|
|
|
|
]:
|
|
|
|
|
'''
|
|
|
|
|
Retrieve historical data from a ``trio``-side task using
|
|
|
|
|
a ``MethoProxy``.
|
|
|
|
|
Request-n-retrieve historical data frames from a `trio.Task`
|
|
|
|
|
using a `MethoProxy` to query the `asyncio`-side's
|
|
|
|
|
`.ib.api.Client` methods.
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
|
global _data_resetter_task, _failed_resets
|
|
|
|
|
@ -659,14 +680,14 @@ async def get_bars(
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# per-actor cache of inter-eventloop-chans
|
|
|
|
|
_quote_streams: dict[str, trio.abc.ReceiveStream] = {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# TODO! update to the new style sig with,
|
|
|
|
|
# `chan: to_asyncio.LinkedTaskChannel,`
|
|
|
|
|
async def _setup_quote_stream(
|
|
|
|
|
|
|
|
|
|
from_trio: asyncio.Queue,
|
|
|
|
|
to_trio: trio.abc.SendChannel,
|
|
|
|
|
|
|
|
|
|
chan: tractor.to_asyncio.LinkedTaskChannel,
|
|
|
|
|
symbol: str,
|
|
|
|
|
opts: tuple[int] = (
|
|
|
|
|
'375', # RT trade volume (excludes utrades)
|
|
|
|
|
@ -684,10 +705,13 @@ async def _setup_quote_stream(
|
|
|
|
|
|
|
|
|
|
) -> trio.abc.ReceiveChannel:
|
|
|
|
|
'''
|
|
|
|
|
Stream a ticker using the std L1 api.
|
|
|
|
|
Stream L1 quotes via the `Ticker.updateEvent.connect(push)`
|
|
|
|
|
callback API by registering a `push` callback which simply
|
|
|
|
|
`chan.send_nowait()`s quote msgs back to the calling
|
|
|
|
|
parent-`trio.Task`-side.
|
|
|
|
|
|
|
|
|
|
This task is ``asyncio``-side and must be called from
|
|
|
|
|
``tractor.to_asyncio.open_channel_from()``.
|
|
|
|
|
NOTE, that this task-fn is run on the `asyncio.Task`-side ONLY
|
|
|
|
|
and is thus run via `tractor.to_asyncio.open_channel_from()`.
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
|
global _quote_streams
|
|
|
|
|
@ -696,43 +720,78 @@ async def _setup_quote_stream(
|
|
|
|
|
disconnect_on_exit=False,
|
|
|
|
|
) as accts2clients:
|
|
|
|
|
|
|
|
|
|
# since asyncio.Task
|
|
|
|
|
# XXX since this is an `asyncio.Task`, we must use
|
|
|
|
|
# tractor.pause_from_sync()
|
|
|
|
|
|
|
|
|
|
caccount_name, client = get_preferred_data_client(accts2clients)
|
|
|
|
|
contract = contract or (await client.find_contract(symbol))
|
|
|
|
|
to_trio.send_nowait(contract) # cuz why not
|
|
|
|
|
contract = (
|
|
|
|
|
contract
|
|
|
|
|
or
|
|
|
|
|
(await client.find_contract(symbol))
|
|
|
|
|
)
|
|
|
|
|
chan.started_nowait(contract) # cuz why not
|
|
|
|
|
ticker: Ticker = client.ib.reqMktData(
|
|
|
|
|
contract,
|
|
|
|
|
','.join(opts),
|
|
|
|
|
)
|
|
|
|
|
maybe_exc: BaseException|None = None
|
|
|
|
|
handler_tries: int = 0
|
|
|
|
|
aio_task: asyncio.Task = asyncio.current_task()
|
|
|
|
|
|
|
|
|
|
# NOTE: it's batch-wise and slow af but I guess could
|
|
|
|
|
# be good for backchecking? Seems to be every 5s maybe?
|
|
|
|
|
# ?TODO? this API is batch-wise and quite slow-af but,
|
|
|
|
|
# - seems to be 5s updates?
|
|
|
|
|
# - maybe we could use it for backchecking?
|
|
|
|
|
#
|
|
|
|
|
# ticker: Ticker = client.ib.reqTickByTickData(
|
|
|
|
|
# contract, 'Last',
|
|
|
|
|
# )
|
|
|
|
|
|
|
|
|
|
# # define a simple queue push routine that streams quote packets
|
|
|
|
|
# # to trio over the ``to_trio`` memory channel.
|
|
|
|
|
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
|
|
|
|
|
# define a very naive queue-pushing callback that relays
|
|
|
|
|
# quote-packets directly the calling (parent) `trio.Task`.
|
|
|
|
|
# Ensure on teardown we cancel the feed via their cancel API.
|
|
|
|
|
#
|
|
|
|
|
def teardown():
|
|
|
|
|
'''
|
|
|
|
|
Disconnect our `push`-er callback and cancel the data-feed
|
|
|
|
|
for `contract`.
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
|
nonlocal maybe_exc
|
|
|
|
|
ticker.updateEvent.disconnect(push)
|
|
|
|
|
log.error(f"Disconnected stream for `{symbol}`")
|
|
|
|
|
report: str = f'Disconnected mkt-data for {symbol!r} due to '
|
|
|
|
|
if maybe_exc is not None:
|
|
|
|
|
report += (
|
|
|
|
|
'error,\n'
|
|
|
|
|
f'{maybe_exc!r}\n'
|
|
|
|
|
)
|
|
|
|
|
log.error(report)
|
|
|
|
|
else:
|
|
|
|
|
report += (
|
|
|
|
|
'cancellation.\n'
|
|
|
|
|
)
|
|
|
|
|
log.cancel(report)
|
|
|
|
|
|
|
|
|
|
client.ib.cancelMktData(contract)
|
|
|
|
|
|
|
|
|
|
# decouple broadcast mem chan
|
|
|
|
|
_quote_streams.pop(symbol, None)
|
|
|
|
|
|
|
|
|
|
def push(t: Ticker) -> None:
|
|
|
|
|
"""
|
|
|
|
|
Push quotes to trio task.
|
|
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
def push(
|
|
|
|
|
t: Ticker,
|
|
|
|
|
tries_before_raise: int = 6,
|
|
|
|
|
) -> None:
|
|
|
|
|
'''
|
|
|
|
|
Push quotes verbatim to parent-side `trio.Task`.
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
|
nonlocal maybe_exc, handler_tries
|
|
|
|
|
# log.debug(f'new IB quote: {t}\n')
|
|
|
|
|
try:
|
|
|
|
|
to_trio.send_nowait(t)
|
|
|
|
|
chan.send_nowait(t)
|
|
|
|
|
|
|
|
|
|
# XXX TODO XXX replicate in `tractor` tests
|
|
|
|
|
# as per `CancelledError`-handler notes below!
|
|
|
|
|
# assert 0
|
|
|
|
|
except (
|
|
|
|
|
trio.BrokenResourceError,
|
|
|
|
|
|
|
|
|
|
@ -752,29 +811,40 @@ async def _setup_quote_stream(
|
|
|
|
|
# with log msgs
|
|
|
|
|
except trio.WouldBlock:
|
|
|
|
|
log.exception(
|
|
|
|
|
f'Asyncio->Trio `to_trio.send_nowait()` blocked !?\n'
|
|
|
|
|
f'Asyncio->Trio `chan.send_nowait()` blocked !?\n'
|
|
|
|
|
f'\n'
|
|
|
|
|
f'{to_trio.statistics()}\n'
|
|
|
|
|
f'{chan._to_trio.statistics()}\n'
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# ?TODO, handle re-connection attempts?
|
|
|
|
|
except BaseException as _berr:
|
|
|
|
|
berr = _berr
|
|
|
|
|
if handler_tries >= tries_before_raise:
|
|
|
|
|
# breakpoint()
|
|
|
|
|
maybe_exc = _berr
|
|
|
|
|
# task.set_exception(berr)
|
|
|
|
|
aio_task.cancel(msg=berr.args)
|
|
|
|
|
raise berr
|
|
|
|
|
else:
|
|
|
|
|
handler_tries += 1
|
|
|
|
|
|
|
|
|
|
log.exception(
|
|
|
|
|
f'Failed to push ticker quote !?\n'
|
|
|
|
|
f'cause: {berr}\n'
|
|
|
|
|
f'handler_tries={handler_tries!r}\n'
|
|
|
|
|
f'ticker: {t!r}\n'
|
|
|
|
|
f'\n'
|
|
|
|
|
f't: {t}\n'
|
|
|
|
|
f'{to_trio.statistics}\n'
|
|
|
|
|
f'{chan._to_trio.statistics()}\n'
|
|
|
|
|
f'\n'
|
|
|
|
|
f'CAUSE: {berr}\n'
|
|
|
|
|
)
|
|
|
|
|
# raise berr
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ticker.updateEvent.connect(push)
|
|
|
|
|
try:
|
|
|
|
|
await asyncio.sleep(float('inf'))
|
|
|
|
|
|
|
|
|
|
# XXX, just for debug..
|
|
|
|
|
# XXX, for debug.. TODO? can we rm again?
|
|
|
|
|
#
|
|
|
|
|
# tractor.pause_from_sync()
|
|
|
|
|
# while True:
|
|
|
|
|
# await asyncio.sleep(1.6)
|
|
|
|
|
@ -788,20 +858,52 @@ async def _setup_quote_stream(
|
|
|
|
|
# 'UHH no ticker.ticks ??'
|
|
|
|
|
# )
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
teardown()
|
|
|
|
|
# XXX TODO XXX !?!?
|
|
|
|
|
# apparently **without this handler** and the subsequent
|
|
|
|
|
# re-raising of `maybe_exc from _taskc` cancelling the
|
|
|
|
|
# `aio_task` from the `push()`-callback will cause a very
|
|
|
|
|
# strange chain of exc raising that breaks alll sorts of
|
|
|
|
|
# downstream callers, tasks and remote-actor tasks!?
|
|
|
|
|
#
|
|
|
|
|
# -[ ] we need some lowlevel reproducting tests to replicate
|
|
|
|
|
# those worst-case scenarios in `tractor` core!!
|
|
|
|
|
# -[ ] likely we should factor-out the `tractor.to_asyncio`
|
|
|
|
|
# attempts at workarounds in `.translate_aio_errors()`
|
|
|
|
|
# for failed `asyncio.Task.set_exception()` to either
|
|
|
|
|
# call `aio_task.cancel()` and/or
|
|
|
|
|
# `aio_task._fut_waiter.set_exception()` to a re-useable
|
|
|
|
|
# toolset in something like a `.to_asyncio._utils`??
|
|
|
|
|
#
|
|
|
|
|
except asyncio.CancelledError as _taskc:
|
|
|
|
|
if maybe_exc is not None:
|
|
|
|
|
raise maybe_exc from _taskc
|
|
|
|
|
|
|
|
|
|
# return from_aio
|
|
|
|
|
raise _taskc
|
|
|
|
|
|
|
|
|
|
except BaseException as _berr:
|
|
|
|
|
# stash any crash cause for reporting in `teardown()`
|
|
|
|
|
maybe_exc = _berr
|
|
|
|
|
raise _berr
|
|
|
|
|
|
|
|
|
|
finally:
|
|
|
|
|
# always disconnect our `push()` and cancel the
|
|
|
|
|
# ib-"mkt-data-feed".
|
|
|
|
|
teardown()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@acm
|
|
|
|
|
async def open_aio_quote_stream(
|
|
|
|
|
|
|
|
|
|
symbol: str,
|
|
|
|
|
contract: Contract|None = None,
|
|
|
|
|
|
|
|
|
|
) -> trio.abc.ReceiveStream:
|
|
|
|
|
'''
|
|
|
|
|
Open a real-time `Ticker` quote stream from an `asyncio.Task`
|
|
|
|
|
spawned via `tractor.to_asyncio.open_channel_from()`, deliver the
|
|
|
|
|
inter-event-loop channel to the `trio.Task` caller and cache it
|
|
|
|
|
globally for re-use.
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
|
from tractor.trionics import broadcast_receiver
|
|
|
|
|
global _quote_streams
|
|
|
|
|
|
|
|
|
|
@ -826,6 +928,10 @@ async def open_aio_quote_stream(
|
|
|
|
|
|
|
|
|
|
assert contract
|
|
|
|
|
|
|
|
|
|
# TODO? de-reg on teardown of last consumer task?
|
|
|
|
|
# -> why aren't we using `.trionics.maybe_open_context()`
|
|
|
|
|
# here again?? (we are in `open_client_proxies()` tho?)
|
|
|
|
|
#
|
|
|
|
|
# cache feed for later consumers
|
|
|
|
|
_quote_streams[symbol] = from_aio
|
|
|
|
|
|
|
|
|
|
@ -840,7 +946,12 @@ def normalize(
|
|
|
|
|
calc_price: bool = False
|
|
|
|
|
|
|
|
|
|
) -> dict:
|
|
|
|
|
'''
|
|
|
|
|
Translate `ib_async`'s `Ticker.ticks` values to a `piker`
|
|
|
|
|
normalized `dict` form for transmit to downstream `.data` layer
|
|
|
|
|
consumers.
|
|
|
|
|
|
|
|
|
|
'''
|
|
|
|
|
# check for special contract types
|
|
|
|
|
con = ticker.contract
|
|
|
|
|
fqme, calc_price = con2fqme(con)
|
|
|
|
|
@ -895,8 +1006,14 @@ def normalize(
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ?TODO? feels like this task-fn could be factored to reduce some
|
|
|
|
|
# indentation levels?
|
|
|
|
|
# -[ ] the reconnect while loop on ib-gw "data farm connection.."s
|
|
|
|
|
# -[ ] everything embedded under the `async with aclosing(stream):`
|
|
|
|
|
# as the "meat" of the quote delivery once the connection is
|
|
|
|
|
# stable.
|
|
|
|
|
#
|
|
|
|
|
async def stream_quotes(
|
|
|
|
|
|
|
|
|
|
send_chan: trio.abc.SendChannel,
|
|
|
|
|
symbols: list[str],
|
|
|
|
|
feed_is_live: trio.Event,
|
|
|
|
|
|