Compare commits
2 Commits
d17160519e
...
8c274efd18
| Author | SHA1 | Date |
|---|---|---|
|
|
8c274efd18 | |
|
|
0b123c9af9 |
|
|
@ -338,15 +338,15 @@ class Client:
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
|
||||||
# EST in ISO 8601 format is required... below is EPOCH
|
# EST in ISO 8601 format is required... below is EPOCH
|
||||||
start_dt: datetime | str = "1970-01-01T00:00:00.000000-05:00",
|
start_dt: datetime|str = "1970-01-01T00:00:00.000000-05:00",
|
||||||
end_dt: datetime | str = "",
|
end_dt: datetime|str = "",
|
||||||
|
|
||||||
# ohlc sample period in seconds
|
# ohlc sample period in seconds
|
||||||
sample_period_s: int = 1,
|
sample_period_s: int = 1,
|
||||||
|
|
||||||
# optional "duration of time" equal to the
|
# optional "duration of time" equal to the
|
||||||
# length of the returned history frame.
|
# length of the returned history frame.
|
||||||
duration: str | None = None,
|
duration: str|None = None,
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
|
|
@ -720,8 +720,8 @@ class Client:
|
||||||
|
|
||||||
async def find_contracts(
|
async def find_contracts(
|
||||||
self,
|
self,
|
||||||
pattern: str | None = None,
|
pattern: str|None = None,
|
||||||
contract: Contract | None = None,
|
contract: Contract|None = None,
|
||||||
qualify: bool = True,
|
qualify: bool = True,
|
||||||
err_on_qualify: bool = True,
|
err_on_qualify: bool = True,
|
||||||
|
|
||||||
|
|
@ -866,7 +866,7 @@ class Client:
|
||||||
self,
|
self,
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
|
||||||
) -> datetime | None:
|
) -> datetime|None:
|
||||||
'''
|
'''
|
||||||
Return the first datetime stamp for `fqme` or `None`
|
Return the first datetime stamp for `fqme` or `None`
|
||||||
on request failure.
|
on request failure.
|
||||||
|
|
@ -922,7 +922,7 @@ class Client:
|
||||||
tries: int = 100,
|
tries: int = 100,
|
||||||
raise_on_timeout: bool = False,
|
raise_on_timeout: bool = False,
|
||||||
|
|
||||||
) -> Ticker | None:
|
) -> Ticker|None:
|
||||||
'''
|
'''
|
||||||
Return a single (snap) quote for symbol.
|
Return a single (snap) quote for symbol.
|
||||||
|
|
||||||
|
|
@ -934,7 +934,7 @@ class Client:
|
||||||
ready: ticker.TickerUpdateEvent = ticker.updateEvent
|
ready: ticker.TickerUpdateEvent = ticker.updateEvent
|
||||||
|
|
||||||
# ensure a last price gets filled in before we deliver quote
|
# ensure a last price gets filled in before we deliver quote
|
||||||
timeouterr: Exception | None = None
|
timeouterr: Exception|None = None
|
||||||
warnset: bool = False
|
warnset: bool = False
|
||||||
for _ in range(tries):
|
for _ in range(tries):
|
||||||
|
|
||||||
|
|
@ -1509,7 +1509,7 @@ class MethodProxy:
|
||||||
self,
|
self,
|
||||||
pattern: str,
|
pattern: str,
|
||||||
|
|
||||||
) -> dict[str, Any] | trio.Event:
|
) -> dict[str, Any]|trio.Event:
|
||||||
|
|
||||||
ev = self.event_table.get(pattern)
|
ev = self.event_table.get(pattern)
|
||||||
|
|
||||||
|
|
@ -1546,7 +1546,7 @@ async def open_aio_client_method_relay(
|
||||||
# relay all method requests to ``asyncio``-side client and deliver
|
# relay all method requests to ``asyncio``-side client and deliver
|
||||||
# back results
|
# back results
|
||||||
while not to_trio._closed:
|
while not to_trio._closed:
|
||||||
msg: tuple[str, dict] | dict | None = await from_trio.get()
|
msg: tuple[str, dict]|dict|None = await from_trio.get()
|
||||||
match msg:
|
match msg:
|
||||||
case None: # termination sentinel
|
case None: # termination sentinel
|
||||||
log.info('asyncio `Client` method-proxy SHUTDOWN!')
|
log.info('asyncio `Client` method-proxy SHUTDOWN!')
|
||||||
|
|
|
||||||
|
|
@ -547,7 +547,10 @@ async def open_trade_dialog(
|
||||||
),
|
),
|
||||||
|
|
||||||
# TODO: do this as part of `open_account()`!?
|
# TODO: do this as part of `open_account()`!?
|
||||||
open_symcache('ib', only_from_memcache=True) as symcache,
|
open_symcache(
|
||||||
|
'ib',
|
||||||
|
only_from_memcache=True,
|
||||||
|
) as symcache,
|
||||||
):
|
):
|
||||||
# Open a trade ledgers stack for appending trade records over
|
# Open a trade ledgers stack for appending trade records over
|
||||||
# multiple accounts.
|
# multiple accounts.
|
||||||
|
|
@ -556,7 +559,9 @@ async def open_trade_dialog(
|
||||||
tables: dict[str, Account] = {}
|
tables: dict[str, Account] = {}
|
||||||
order_msgs: list[Status] = []
|
order_msgs: list[Status] = []
|
||||||
conf = get_config()
|
conf = get_config()
|
||||||
accounts_def_inv: bidict[str, str] = bidict(conf['accounts']).inverse
|
accounts_def_inv: bidict[str, str] = bidict(
|
||||||
|
conf['accounts']
|
||||||
|
).inverse
|
||||||
|
|
||||||
with (
|
with (
|
||||||
ExitStack() as lstack,
|
ExitStack() as lstack,
|
||||||
|
|
@ -706,7 +711,11 @@ async def open_trade_dialog(
|
||||||
# client-account and build out position msgs to deliver to
|
# client-account and build out position msgs to deliver to
|
||||||
# EMS.
|
# EMS.
|
||||||
for acctid, acnt in tables.items():
|
for acctid, acnt in tables.items():
|
||||||
active_pps, closed_pps = acnt.dump_active()
|
active_pps: dict[str, Position]
|
||||||
|
(
|
||||||
|
active_pps,
|
||||||
|
closed_pps,
|
||||||
|
) = acnt.dump_active()
|
||||||
|
|
||||||
for pps in [active_pps, closed_pps]:
|
for pps in [active_pps, closed_pps]:
|
||||||
piker_pps: list[Position] = list(pps.values())
|
piker_pps: list[Position] = list(pps.values())
|
||||||
|
|
@ -722,6 +731,7 @@ async def open_trade_dialog(
|
||||||
)
|
)
|
||||||
if ibpos:
|
if ibpos:
|
||||||
bs_mktid: str = str(ibpos.contract.conId)
|
bs_mktid: str = str(ibpos.contract.conId)
|
||||||
|
|
||||||
msg = await update_and_audit_pos_msg(
|
msg = await update_and_audit_pos_msg(
|
||||||
acctid,
|
acctid,
|
||||||
pikerpos,
|
pikerpos,
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
# piker: trading gear for hackers
|
# piker: trading gear for hackers
|
||||||
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
# Copyright (C) 2018-forever Tyler Goodlet (in stewardship for pikers)
|
||||||
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
# This program is free software: you can redistribute it and/or modify
|
||||||
# it under the terms of the GNU Affero General Public License as published by
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
|
@ -13,10 +13,12 @@
|
||||||
|
|
||||||
# You should have received a copy of the GNU Affero General Public License
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
"""
|
|
||||||
Data feed endpoints pre-wrapped and ready for use with ``tractor``/``trio``.
|
|
||||||
|
|
||||||
"""
|
'''
|
||||||
|
Data feed endpoints pre-wrapped and ready for use with `tractor`/`trio`
|
||||||
|
via "infected-asyncio-mode".
|
||||||
|
|
||||||
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
import asyncio
|
import asyncio
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
|
|
@ -39,7 +41,6 @@ import numpy as np
|
||||||
from pendulum import (
|
from pendulum import (
|
||||||
now,
|
now,
|
||||||
from_timestamp,
|
from_timestamp,
|
||||||
# DateTime,
|
|
||||||
Duration,
|
Duration,
|
||||||
duration as mk_duration,
|
duration as mk_duration,
|
||||||
)
|
)
|
||||||
|
|
@ -288,8 +289,9 @@ _pacing: str = (
|
||||||
|
|
||||||
async def wait_on_data_reset(
|
async def wait_on_data_reset(
|
||||||
proxy: MethodProxy,
|
proxy: MethodProxy,
|
||||||
|
|
||||||
reset_type: str = 'data',
|
reset_type: str = 'data',
|
||||||
timeout: float = 16, # float('inf'),
|
timeout: float = 16,
|
||||||
|
|
||||||
task_status: TaskStatus[
|
task_status: TaskStatus[
|
||||||
tuple[
|
tuple[
|
||||||
|
|
@ -298,29 +300,47 @@ async def wait_on_data_reset(
|
||||||
]
|
]
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
|
'''
|
||||||
|
Wait on a (global-ish) "data-farm" event to be emitted
|
||||||
|
by the IB api server.
|
||||||
|
|
||||||
# TODO: we might have to put a task lock around this
|
Allows syncing to reconnect event-messages emitted on the API
|
||||||
# method..
|
console, such as:
|
||||||
hist_ev = proxy.status_event(
|
|
||||||
|
- 'HMDS data farm connection is OK:ushmds'
|
||||||
|
- 'Market data farm is connecting:usfuture'
|
||||||
|
- 'Market data farm connection is OK:usfuture'
|
||||||
|
|
||||||
|
Deliver a `(cs, done: Event)` pair to the caller to support it
|
||||||
|
waiting or cancelling the associated "data-reset-request";
|
||||||
|
normally a manual data-reset-req is expected to be the cause and
|
||||||
|
thus trigger such events (such as our click-hack-magic from
|
||||||
|
`.ib._util`).
|
||||||
|
|
||||||
|
'''
|
||||||
|
# ?TODO, do we need a task-lock around this method?
|
||||||
|
#
|
||||||
|
# register for an API "status event" wrapped for `trio`-sync.
|
||||||
|
hist_ev: trio.Event = proxy.status_event(
|
||||||
'HMDS data farm connection is OK:ushmds'
|
'HMDS data farm connection is OK:ushmds'
|
||||||
)
|
)
|
||||||
|
#
|
||||||
# TODO: other event messages we might want to try and
|
# ^TODO: other event-messages we might want to support waiting-for
|
||||||
# wait for but i wasn't able to get any of this
|
# but i wasn't able to get reliable..
|
||||||
# reliable..
|
#
|
||||||
# reconnect_start = proxy.status_event(
|
# reconnect_start = proxy.status_event(
|
||||||
# 'Market data farm is connecting:usfuture'
|
# 'Market data farm is connecting:usfuture'
|
||||||
# )
|
# )
|
||||||
# live_ev = proxy.status_event(
|
# live_ev = proxy.status_event(
|
||||||
# 'Market data farm connection is OK:usfuture'
|
# 'Market data farm connection is OK:usfuture'
|
||||||
# )
|
# )
|
||||||
|
|
||||||
# try to wait on the reset event(s) to arrive, a timeout
|
# try to wait on the reset event(s) to arrive, a timeout
|
||||||
# will trigger a retry up to 6 times (for now).
|
# will trigger a retry up to 6 times (for now).
|
||||||
client: Client = proxy._aio_ns
|
client: Client = proxy._aio_ns
|
||||||
|
|
||||||
done = trio.Event()
|
done = trio.Event()
|
||||||
with trio.move_on_after(timeout) as cs:
|
with trio.move_on_after(timeout) as cs:
|
||||||
|
|
||||||
task_status.started((cs, done))
|
task_status.started((cs, done))
|
||||||
|
|
||||||
log.warning(
|
log.warning(
|
||||||
|
|
@ -399,8 +419,9 @@ async def get_bars(
|
||||||
bool, # timed out hint
|
bool, # timed out hint
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
Retrieve historical data from a ``trio``-side task using
|
Request-n-retrieve historical data frames from a `trio.Task`
|
||||||
a ``MethoProxy``.
|
using a `MethoProxy` to query the `asyncio`-side's
|
||||||
|
`.ib.api.Client` methods.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
global _data_resetter_task, _failed_resets
|
global _data_resetter_task, _failed_resets
|
||||||
|
|
@ -659,14 +680,14 @@ async def get_bars(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# per-actor cache of inter-eventloop-chans
|
||||||
_quote_streams: dict[str, trio.abc.ReceiveStream] = {}
|
_quote_streams: dict[str, trio.abc.ReceiveStream] = {}
|
||||||
|
|
||||||
|
|
||||||
|
# TODO! update to the new style sig with,
|
||||||
|
# `chan: to_asyncio.LinkedTaskChannel,`
|
||||||
async def _setup_quote_stream(
|
async def _setup_quote_stream(
|
||||||
|
chan: tractor.to_asyncio.LinkedTaskChannel,
|
||||||
from_trio: asyncio.Queue,
|
|
||||||
to_trio: trio.abc.SendChannel,
|
|
||||||
|
|
||||||
symbol: str,
|
symbol: str,
|
||||||
opts: tuple[int] = (
|
opts: tuple[int] = (
|
||||||
'375', # RT trade volume (excludes utrades)
|
'375', # RT trade volume (excludes utrades)
|
||||||
|
|
@ -684,10 +705,13 @@ async def _setup_quote_stream(
|
||||||
|
|
||||||
) -> trio.abc.ReceiveChannel:
|
) -> trio.abc.ReceiveChannel:
|
||||||
'''
|
'''
|
||||||
Stream a ticker using the std L1 api.
|
Stream L1 quotes via the `Ticker.updateEvent.connect(push)`
|
||||||
|
callback API by registering a `push` callback which simply
|
||||||
|
`chan.send_nowait()`s quote msgs back to the calling
|
||||||
|
parent-`trio.Task`-side.
|
||||||
|
|
||||||
This task is ``asyncio``-side and must be called from
|
NOTE, that this task-fn is run on the `asyncio.Task`-side ONLY
|
||||||
``tractor.to_asyncio.open_channel_from()``.
|
and is thus run via `tractor.to_asyncio.open_channel_from()`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
global _quote_streams
|
global _quote_streams
|
||||||
|
|
@ -696,43 +720,78 @@ async def _setup_quote_stream(
|
||||||
disconnect_on_exit=False,
|
disconnect_on_exit=False,
|
||||||
) as accts2clients:
|
) as accts2clients:
|
||||||
|
|
||||||
# since asyncio.Task
|
# XXX since this is an `asyncio.Task`, we must use
|
||||||
# tractor.pause_from_sync()
|
# tractor.pause_from_sync()
|
||||||
|
|
||||||
caccount_name, client = get_preferred_data_client(accts2clients)
|
caccount_name, client = get_preferred_data_client(accts2clients)
|
||||||
contract = contract or (await client.find_contract(symbol))
|
contract = (
|
||||||
to_trio.send_nowait(contract) # cuz why not
|
contract
|
||||||
|
or
|
||||||
|
(await client.find_contract(symbol))
|
||||||
|
)
|
||||||
|
chan.started_nowait(contract) # cuz why not
|
||||||
ticker: Ticker = client.ib.reqMktData(
|
ticker: Ticker = client.ib.reqMktData(
|
||||||
contract,
|
contract,
|
||||||
','.join(opts),
|
','.join(opts),
|
||||||
)
|
)
|
||||||
|
maybe_exc: BaseException|None = None
|
||||||
|
handler_tries: int = 0
|
||||||
|
aio_task: asyncio.Task = asyncio.current_task()
|
||||||
|
|
||||||
# NOTE: it's batch-wise and slow af but I guess could
|
# ?TODO? this API is batch-wise and quite slow-af but,
|
||||||
# be good for backchecking? Seems to be every 5s maybe?
|
# - seems to be 5s updates?
|
||||||
|
# - maybe we could use it for backchecking?
|
||||||
|
#
|
||||||
# ticker: Ticker = client.ib.reqTickByTickData(
|
# ticker: Ticker = client.ib.reqTickByTickData(
|
||||||
# contract, 'Last',
|
# contract, 'Last',
|
||||||
# )
|
# )
|
||||||
|
|
||||||
# # define a simple queue push routine that streams quote packets
|
# define a very naive queue-pushing callback that relays
|
||||||
# # to trio over the ``to_trio`` memory channel.
|
# quote-packets directly the calling (parent) `trio.Task`.
|
||||||
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
|
# Ensure on teardown we cancel the feed via their cancel API.
|
||||||
|
#
|
||||||
def teardown():
|
def teardown():
|
||||||
|
'''
|
||||||
|
Disconnect our `push`-er callback and cancel the data-feed
|
||||||
|
for `contract`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
nonlocal maybe_exc
|
||||||
ticker.updateEvent.disconnect(push)
|
ticker.updateEvent.disconnect(push)
|
||||||
log.error(f"Disconnected stream for `{symbol}`")
|
report: str = f'Disconnected mkt-data for {symbol!r} due to '
|
||||||
|
if maybe_exc is not None:
|
||||||
|
report += (
|
||||||
|
'error,\n'
|
||||||
|
f'{maybe_exc!r}\n'
|
||||||
|
)
|
||||||
|
log.error(report)
|
||||||
|
else:
|
||||||
|
report += (
|
||||||
|
'cancellation.\n'
|
||||||
|
)
|
||||||
|
log.cancel(report)
|
||||||
|
|
||||||
client.ib.cancelMktData(contract)
|
client.ib.cancelMktData(contract)
|
||||||
|
|
||||||
# decouple broadcast mem chan
|
# decouple broadcast mem chan
|
||||||
_quote_streams.pop(symbol, None)
|
_quote_streams.pop(symbol, None)
|
||||||
|
|
||||||
def push(t: Ticker) -> None:
|
def push(
|
||||||
"""
|
t: Ticker,
|
||||||
Push quotes to trio task.
|
tries_before_raise: int = 6,
|
||||||
|
) -> None:
|
||||||
"""
|
'''
|
||||||
|
Push quotes verbatim to parent-side `trio.Task`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
nonlocal maybe_exc, handler_tries
|
||||||
# log.debug(f'new IB quote: {t}\n')
|
# log.debug(f'new IB quote: {t}\n')
|
||||||
try:
|
try:
|
||||||
to_trio.send_nowait(t)
|
chan.send_nowait(t)
|
||||||
|
|
||||||
|
# XXX TODO XXX replicate in `tractor` tests
|
||||||
|
# as per `CancelledError`-handler notes below!
|
||||||
|
# assert 0
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
|
|
||||||
|
|
@ -752,29 +811,40 @@ async def _setup_quote_stream(
|
||||||
# with log msgs
|
# with log msgs
|
||||||
except trio.WouldBlock:
|
except trio.WouldBlock:
|
||||||
log.exception(
|
log.exception(
|
||||||
f'Asyncio->Trio `to_trio.send_nowait()` blocked !?\n'
|
f'Asyncio->Trio `chan.send_nowait()` blocked !?\n'
|
||||||
f'\n'
|
f'\n'
|
||||||
f'{to_trio.statistics()}\n'
|
f'{chan._to_trio.statistics()}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# ?TODO, handle re-connection attempts?
|
# ?TODO, handle re-connection attempts?
|
||||||
except BaseException as _berr:
|
except BaseException as _berr:
|
||||||
berr = _berr
|
berr = _berr
|
||||||
|
if handler_tries >= tries_before_raise:
|
||||||
|
# breakpoint()
|
||||||
|
maybe_exc = _berr
|
||||||
|
# task.set_exception(berr)
|
||||||
|
aio_task.cancel(msg=berr.args)
|
||||||
|
raise berr
|
||||||
|
else:
|
||||||
|
handler_tries += 1
|
||||||
|
|
||||||
log.exception(
|
log.exception(
|
||||||
f'Failed to push ticker quote !?\n'
|
f'Failed to push ticker quote !?\n'
|
||||||
f'cause: {berr}\n'
|
f'handler_tries={handler_tries!r}\n'
|
||||||
|
f'ticker: {t!r}\n'
|
||||||
f'\n'
|
f'\n'
|
||||||
f't: {t}\n'
|
f'{chan._to_trio.statistics()}\n'
|
||||||
f'{to_trio.statistics}\n'
|
f'\n'
|
||||||
|
f'CAUSE: {berr}\n'
|
||||||
)
|
)
|
||||||
# raise berr
|
|
||||||
|
|
||||||
|
|
||||||
ticker.updateEvent.connect(push)
|
ticker.updateEvent.connect(push)
|
||||||
try:
|
try:
|
||||||
await asyncio.sleep(float('inf'))
|
await asyncio.sleep(float('inf'))
|
||||||
|
|
||||||
# XXX, just for debug..
|
# XXX, for debug.. TODO? can we rm again?
|
||||||
|
#
|
||||||
# tractor.pause_from_sync()
|
# tractor.pause_from_sync()
|
||||||
# while True:
|
# while True:
|
||||||
# await asyncio.sleep(1.6)
|
# await asyncio.sleep(1.6)
|
||||||
|
|
@ -788,20 +858,52 @@ async def _setup_quote_stream(
|
||||||
# 'UHH no ticker.ticks ??'
|
# 'UHH no ticker.ticks ??'
|
||||||
# )
|
# )
|
||||||
|
|
||||||
finally:
|
# XXX TODO XXX !?!?
|
||||||
teardown()
|
# apparently **without this handler** and the subsequent
|
||||||
|
# re-raising of `maybe_exc from _taskc` cancelling the
|
||||||
|
# `aio_task` from the `push()`-callback will cause a very
|
||||||
|
# strange chain of exc raising that breaks alll sorts of
|
||||||
|
# downstream callers, tasks and remote-actor tasks!?
|
||||||
|
#
|
||||||
|
# -[ ] we need some lowlevel reproducting tests to replicate
|
||||||
|
# those worst-case scenarios in `tractor` core!!
|
||||||
|
# -[ ] likely we should factor-out the `tractor.to_asyncio`
|
||||||
|
# attempts at workarounds in `.translate_aio_errors()`
|
||||||
|
# for failed `asyncio.Task.set_exception()` to either
|
||||||
|
# call `aio_task.cancel()` and/or
|
||||||
|
# `aio_task._fut_waiter.set_exception()` to a re-useable
|
||||||
|
# toolset in something like a `.to_asyncio._utils`??
|
||||||
|
#
|
||||||
|
except asyncio.CancelledError as _taskc:
|
||||||
|
if maybe_exc is not None:
|
||||||
|
raise maybe_exc from _taskc
|
||||||
|
|
||||||
# return from_aio
|
raise _taskc
|
||||||
|
|
||||||
|
except BaseException as _berr:
|
||||||
|
# stash any crash cause for reporting in `teardown()`
|
||||||
|
maybe_exc = _berr
|
||||||
|
raise _berr
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# always disconnect our `push()` and cancel the
|
||||||
|
# ib-"mkt-data-feed".
|
||||||
|
teardown()
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_aio_quote_stream(
|
async def open_aio_quote_stream(
|
||||||
|
|
||||||
symbol: str,
|
symbol: str,
|
||||||
contract: Contract | None = None,
|
contract: Contract|None = None,
|
||||||
|
|
||||||
) -> trio.abc.ReceiveStream:
|
) -> trio.abc.ReceiveStream:
|
||||||
|
'''
|
||||||
|
Open a real-time `Ticker` quote stream from an `asyncio.Task`
|
||||||
|
spawned via `tractor.to_asyncio.open_channel_from()`, deliver the
|
||||||
|
inter-event-loop channel to the `trio.Task` caller and cache it
|
||||||
|
globally for re-use.
|
||||||
|
|
||||||
|
'''
|
||||||
from tractor.trionics import broadcast_receiver
|
from tractor.trionics import broadcast_receiver
|
||||||
global _quote_streams
|
global _quote_streams
|
||||||
|
|
||||||
|
|
@ -826,6 +928,10 @@ async def open_aio_quote_stream(
|
||||||
|
|
||||||
assert contract
|
assert contract
|
||||||
|
|
||||||
|
# TODO? de-reg on teardown of last consumer task?
|
||||||
|
# -> why aren't we using `.trionics.maybe_open_context()`
|
||||||
|
# here again?? (we are in `open_client_proxies()` tho?)
|
||||||
|
#
|
||||||
# cache feed for later consumers
|
# cache feed for later consumers
|
||||||
_quote_streams[symbol] = from_aio
|
_quote_streams[symbol] = from_aio
|
||||||
|
|
||||||
|
|
@ -840,7 +946,12 @@ def normalize(
|
||||||
calc_price: bool = False
|
calc_price: bool = False
|
||||||
|
|
||||||
) -> dict:
|
) -> dict:
|
||||||
|
'''
|
||||||
|
Translate `ib_async`'s `Ticker.ticks` values to a `piker`
|
||||||
|
normalized `dict` form for transmit to downstream `.data` layer
|
||||||
|
consumers.
|
||||||
|
|
||||||
|
'''
|
||||||
# check for special contract types
|
# check for special contract types
|
||||||
con = ticker.contract
|
con = ticker.contract
|
||||||
fqme, calc_price = con2fqme(con)
|
fqme, calc_price = con2fqme(con)
|
||||||
|
|
@ -895,8 +1006,14 @@ def normalize(
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
# ?TODO? feels like this task-fn could be factored to reduce some
|
||||||
|
# indentation levels?
|
||||||
|
# -[ ] the reconnect while loop on ib-gw "data farm connection.."s
|
||||||
|
# -[ ] everything embedded under the `async with aclosing(stream):`
|
||||||
|
# as the "meat" of the quote delivery once the connection is
|
||||||
|
# stable.
|
||||||
|
#
|
||||||
async def stream_quotes(
|
async def stream_quotes(
|
||||||
|
|
||||||
send_chan: trio.abc.SendChannel,
|
send_chan: trio.abc.SendChannel,
|
||||||
symbols: list[str],
|
symbols: list[str],
|
||||||
feed_is_live: trio.Event,
|
feed_is_live: trio.Event,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue