Compare commits
No commits in common. "8c274efd18bc2121675e1aeffb11beb1ca69608c" and "d17160519e41e1056904ee864ff61171e5ec6ae3" have entirely different histories.
8c274efd18
...
d17160519e
|
|
@ -338,15 +338,15 @@ class Client:
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
|
||||||
# EST in ISO 8601 format is required... below is EPOCH
|
# EST in ISO 8601 format is required... below is EPOCH
|
||||||
start_dt: datetime|str = "1970-01-01T00:00:00.000000-05:00",
|
start_dt: datetime | str = "1970-01-01T00:00:00.000000-05:00",
|
||||||
end_dt: datetime|str = "",
|
end_dt: datetime | str = "",
|
||||||
|
|
||||||
# ohlc sample period in seconds
|
# ohlc sample period in seconds
|
||||||
sample_period_s: int = 1,
|
sample_period_s: int = 1,
|
||||||
|
|
||||||
# optional "duration of time" equal to the
|
# optional "duration of time" equal to the
|
||||||
# length of the returned history frame.
|
# length of the returned history frame.
|
||||||
duration: str|None = None,
|
duration: str | None = None,
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
|
|
@ -720,8 +720,8 @@ class Client:
|
||||||
|
|
||||||
async def find_contracts(
|
async def find_contracts(
|
||||||
self,
|
self,
|
||||||
pattern: str|None = None,
|
pattern: str | None = None,
|
||||||
contract: Contract|None = None,
|
contract: Contract | None = None,
|
||||||
qualify: bool = True,
|
qualify: bool = True,
|
||||||
err_on_qualify: bool = True,
|
err_on_qualify: bool = True,
|
||||||
|
|
||||||
|
|
@ -866,7 +866,7 @@ class Client:
|
||||||
self,
|
self,
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
|
||||||
) -> datetime|None:
|
) -> datetime | None:
|
||||||
'''
|
'''
|
||||||
Return the first datetime stamp for `fqme` or `None`
|
Return the first datetime stamp for `fqme` or `None`
|
||||||
on request failure.
|
on request failure.
|
||||||
|
|
@ -922,7 +922,7 @@ class Client:
|
||||||
tries: int = 100,
|
tries: int = 100,
|
||||||
raise_on_timeout: bool = False,
|
raise_on_timeout: bool = False,
|
||||||
|
|
||||||
) -> Ticker|None:
|
) -> Ticker | None:
|
||||||
'''
|
'''
|
||||||
Return a single (snap) quote for symbol.
|
Return a single (snap) quote for symbol.
|
||||||
|
|
||||||
|
|
@ -934,7 +934,7 @@ class Client:
|
||||||
ready: ticker.TickerUpdateEvent = ticker.updateEvent
|
ready: ticker.TickerUpdateEvent = ticker.updateEvent
|
||||||
|
|
||||||
# ensure a last price gets filled in before we deliver quote
|
# ensure a last price gets filled in before we deliver quote
|
||||||
timeouterr: Exception|None = None
|
timeouterr: Exception | None = None
|
||||||
warnset: bool = False
|
warnset: bool = False
|
||||||
for _ in range(tries):
|
for _ in range(tries):
|
||||||
|
|
||||||
|
|
@ -1509,7 +1509,7 @@ class MethodProxy:
|
||||||
self,
|
self,
|
||||||
pattern: str,
|
pattern: str,
|
||||||
|
|
||||||
) -> dict[str, Any]|trio.Event:
|
) -> dict[str, Any] | trio.Event:
|
||||||
|
|
||||||
ev = self.event_table.get(pattern)
|
ev = self.event_table.get(pattern)
|
||||||
|
|
||||||
|
|
@ -1546,7 +1546,7 @@ async def open_aio_client_method_relay(
|
||||||
# relay all method requests to ``asyncio``-side client and deliver
|
# relay all method requests to ``asyncio``-side client and deliver
|
||||||
# back results
|
# back results
|
||||||
while not to_trio._closed:
|
while not to_trio._closed:
|
||||||
msg: tuple[str, dict]|dict|None = await from_trio.get()
|
msg: tuple[str, dict] | dict | None = await from_trio.get()
|
||||||
match msg:
|
match msg:
|
||||||
case None: # termination sentinel
|
case None: # termination sentinel
|
||||||
log.info('asyncio `Client` method-proxy SHUTDOWN!')
|
log.info('asyncio `Client` method-proxy SHUTDOWN!')
|
||||||
|
|
|
||||||
|
|
@ -547,10 +547,7 @@ async def open_trade_dialog(
|
||||||
),
|
),
|
||||||
|
|
||||||
# TODO: do this as part of `open_account()`!?
|
# TODO: do this as part of `open_account()`!?
|
||||||
open_symcache(
|
open_symcache('ib', only_from_memcache=True) as symcache,
|
||||||
'ib',
|
|
||||||
only_from_memcache=True,
|
|
||||||
) as symcache,
|
|
||||||
):
|
):
|
||||||
# Open a trade ledgers stack for appending trade records over
|
# Open a trade ledgers stack for appending trade records over
|
||||||
# multiple accounts.
|
# multiple accounts.
|
||||||
|
|
@ -559,9 +556,7 @@ async def open_trade_dialog(
|
||||||
tables: dict[str, Account] = {}
|
tables: dict[str, Account] = {}
|
||||||
order_msgs: list[Status] = []
|
order_msgs: list[Status] = []
|
||||||
conf = get_config()
|
conf = get_config()
|
||||||
accounts_def_inv: bidict[str, str] = bidict(
|
accounts_def_inv: bidict[str, str] = bidict(conf['accounts']).inverse
|
||||||
conf['accounts']
|
|
||||||
).inverse
|
|
||||||
|
|
||||||
with (
|
with (
|
||||||
ExitStack() as lstack,
|
ExitStack() as lstack,
|
||||||
|
|
@ -711,11 +706,7 @@ async def open_trade_dialog(
|
||||||
# client-account and build out position msgs to deliver to
|
# client-account and build out position msgs to deliver to
|
||||||
# EMS.
|
# EMS.
|
||||||
for acctid, acnt in tables.items():
|
for acctid, acnt in tables.items():
|
||||||
active_pps: dict[str, Position]
|
active_pps, closed_pps = acnt.dump_active()
|
||||||
(
|
|
||||||
active_pps,
|
|
||||||
closed_pps,
|
|
||||||
) = acnt.dump_active()
|
|
||||||
|
|
||||||
for pps in [active_pps, closed_pps]:
|
for pps in [active_pps, closed_pps]:
|
||||||
piker_pps: list[Position] = list(pps.values())
|
piker_pps: list[Position] = list(pps.values())
|
||||||
|
|
@ -731,7 +722,6 @@ async def open_trade_dialog(
|
||||||
)
|
)
|
||||||
if ibpos:
|
if ibpos:
|
||||||
bs_mktid: str = str(ibpos.contract.conId)
|
bs_mktid: str = str(ibpos.contract.conId)
|
||||||
|
|
||||||
msg = await update_and_audit_pos_msg(
|
msg = await update_and_audit_pos_msg(
|
||||||
acctid,
|
acctid,
|
||||||
pikerpos,
|
pikerpos,
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
# piker: trading gear for hackers
|
# piker: trading gear for hackers
|
||||||
# Copyright (C) 2018-forever Tyler Goodlet (in stewardship for pikers)
|
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
||||||
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
# This program is free software: you can redistribute it and/or modify
|
||||||
# it under the terms of the GNU Affero General Public License as published by
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
|
@ -13,12 +13,10 @@
|
||||||
|
|
||||||
# You should have received a copy of the GNU Affero General Public License
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
"""
|
||||||
|
Data feed endpoints pre-wrapped and ready for use with ``tractor``/``trio``.
|
||||||
|
|
||||||
'''
|
"""
|
||||||
Data feed endpoints pre-wrapped and ready for use with `tractor`/`trio`
|
|
||||||
via "infected-asyncio-mode".
|
|
||||||
|
|
||||||
'''
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
import asyncio
|
import asyncio
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
|
|
@ -41,6 +39,7 @@ import numpy as np
|
||||||
from pendulum import (
|
from pendulum import (
|
||||||
now,
|
now,
|
||||||
from_timestamp,
|
from_timestamp,
|
||||||
|
# DateTime,
|
||||||
Duration,
|
Duration,
|
||||||
duration as mk_duration,
|
duration as mk_duration,
|
||||||
)
|
)
|
||||||
|
|
@ -289,9 +288,8 @@ _pacing: str = (
|
||||||
|
|
||||||
async def wait_on_data_reset(
|
async def wait_on_data_reset(
|
||||||
proxy: MethodProxy,
|
proxy: MethodProxy,
|
||||||
|
|
||||||
reset_type: str = 'data',
|
reset_type: str = 'data',
|
||||||
timeout: float = 16,
|
timeout: float = 16, # float('inf'),
|
||||||
|
|
||||||
task_status: TaskStatus[
|
task_status: TaskStatus[
|
||||||
tuple[
|
tuple[
|
||||||
|
|
@ -300,47 +298,29 @@ async def wait_on_data_reset(
|
||||||
]
|
]
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
) -> bool:
|
) -> bool:
|
||||||
'''
|
|
||||||
Wait on a (global-ish) "data-farm" event to be emitted
|
|
||||||
by the IB api server.
|
|
||||||
|
|
||||||
Allows syncing to reconnect event-messages emitted on the API
|
# TODO: we might have to put a task lock around this
|
||||||
console, such as:
|
# method..
|
||||||
|
hist_ev = proxy.status_event(
|
||||||
- 'HMDS data farm connection is OK:ushmds'
|
|
||||||
- 'Market data farm is connecting:usfuture'
|
|
||||||
- 'Market data farm connection is OK:usfuture'
|
|
||||||
|
|
||||||
Deliver a `(cs, done: Event)` pair to the caller to support it
|
|
||||||
waiting or cancelling the associated "data-reset-request";
|
|
||||||
normally a manual data-reset-req is expected to be the cause and
|
|
||||||
thus trigger such events (such as our click-hack-magic from
|
|
||||||
`.ib._util`).
|
|
||||||
|
|
||||||
'''
|
|
||||||
# ?TODO, do we need a task-lock around this method?
|
|
||||||
#
|
|
||||||
# register for an API "status event" wrapped for `trio`-sync.
|
|
||||||
hist_ev: trio.Event = proxy.status_event(
|
|
||||||
'HMDS data farm connection is OK:ushmds'
|
'HMDS data farm connection is OK:ushmds'
|
||||||
)
|
)
|
||||||
#
|
|
||||||
# ^TODO: other event-messages we might want to support waiting-for
|
# TODO: other event messages we might want to try and
|
||||||
# but i wasn't able to get reliable..
|
# wait for but i wasn't able to get any of this
|
||||||
#
|
# reliable..
|
||||||
# reconnect_start = proxy.status_event(
|
# reconnect_start = proxy.status_event(
|
||||||
# 'Market data farm is connecting:usfuture'
|
# 'Market data farm is connecting:usfuture'
|
||||||
# )
|
# )
|
||||||
# live_ev = proxy.status_event(
|
# live_ev = proxy.status_event(
|
||||||
# 'Market data farm connection is OK:usfuture'
|
# 'Market data farm connection is OK:usfuture'
|
||||||
# )
|
# )
|
||||||
|
|
||||||
# try to wait on the reset event(s) to arrive, a timeout
|
# try to wait on the reset event(s) to arrive, a timeout
|
||||||
# will trigger a retry up to 6 times (for now).
|
# will trigger a retry up to 6 times (for now).
|
||||||
client: Client = proxy._aio_ns
|
client: Client = proxy._aio_ns
|
||||||
|
|
||||||
done = trio.Event()
|
done = trio.Event()
|
||||||
with trio.move_on_after(timeout) as cs:
|
with trio.move_on_after(timeout) as cs:
|
||||||
|
|
||||||
task_status.started((cs, done))
|
task_status.started((cs, done))
|
||||||
|
|
||||||
log.warning(
|
log.warning(
|
||||||
|
|
@ -419,9 +399,8 @@ async def get_bars(
|
||||||
bool, # timed out hint
|
bool, # timed out hint
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
Request-n-retrieve historical data frames from a `trio.Task`
|
Retrieve historical data from a ``trio``-side task using
|
||||||
using a `MethoProxy` to query the `asyncio`-side's
|
a ``MethoProxy``.
|
||||||
`.ib.api.Client` methods.
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
global _data_resetter_task, _failed_resets
|
global _data_resetter_task, _failed_resets
|
||||||
|
|
@ -680,14 +659,14 @@ async def get_bars(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# per-actor cache of inter-eventloop-chans
|
|
||||||
_quote_streams: dict[str, trio.abc.ReceiveStream] = {}
|
_quote_streams: dict[str, trio.abc.ReceiveStream] = {}
|
||||||
|
|
||||||
|
|
||||||
# TODO! update to the new style sig with,
|
|
||||||
# `chan: to_asyncio.LinkedTaskChannel,`
|
|
||||||
async def _setup_quote_stream(
|
async def _setup_quote_stream(
|
||||||
chan: tractor.to_asyncio.LinkedTaskChannel,
|
|
||||||
|
from_trio: asyncio.Queue,
|
||||||
|
to_trio: trio.abc.SendChannel,
|
||||||
|
|
||||||
symbol: str,
|
symbol: str,
|
||||||
opts: tuple[int] = (
|
opts: tuple[int] = (
|
||||||
'375', # RT trade volume (excludes utrades)
|
'375', # RT trade volume (excludes utrades)
|
||||||
|
|
@ -705,13 +684,10 @@ async def _setup_quote_stream(
|
||||||
|
|
||||||
) -> trio.abc.ReceiveChannel:
|
) -> trio.abc.ReceiveChannel:
|
||||||
'''
|
'''
|
||||||
Stream L1 quotes via the `Ticker.updateEvent.connect(push)`
|
Stream a ticker using the std L1 api.
|
||||||
callback API by registering a `push` callback which simply
|
|
||||||
`chan.send_nowait()`s quote msgs back to the calling
|
|
||||||
parent-`trio.Task`-side.
|
|
||||||
|
|
||||||
NOTE, that this task-fn is run on the `asyncio.Task`-side ONLY
|
This task is ``asyncio``-side and must be called from
|
||||||
and is thus run via `tractor.to_asyncio.open_channel_from()`.
|
``tractor.to_asyncio.open_channel_from()``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
global _quote_streams
|
global _quote_streams
|
||||||
|
|
@ -720,78 +696,43 @@ async def _setup_quote_stream(
|
||||||
disconnect_on_exit=False,
|
disconnect_on_exit=False,
|
||||||
) as accts2clients:
|
) as accts2clients:
|
||||||
|
|
||||||
# XXX since this is an `asyncio.Task`, we must use
|
# since asyncio.Task
|
||||||
# tractor.pause_from_sync()
|
# tractor.pause_from_sync()
|
||||||
|
|
||||||
caccount_name, client = get_preferred_data_client(accts2clients)
|
caccount_name, client = get_preferred_data_client(accts2clients)
|
||||||
contract = (
|
contract = contract or (await client.find_contract(symbol))
|
||||||
contract
|
to_trio.send_nowait(contract) # cuz why not
|
||||||
or
|
|
||||||
(await client.find_contract(symbol))
|
|
||||||
)
|
|
||||||
chan.started_nowait(contract) # cuz why not
|
|
||||||
ticker: Ticker = client.ib.reqMktData(
|
ticker: Ticker = client.ib.reqMktData(
|
||||||
contract,
|
contract,
|
||||||
','.join(opts),
|
','.join(opts),
|
||||||
)
|
)
|
||||||
maybe_exc: BaseException|None = None
|
|
||||||
handler_tries: int = 0
|
|
||||||
aio_task: asyncio.Task = asyncio.current_task()
|
|
||||||
|
|
||||||
# ?TODO? this API is batch-wise and quite slow-af but,
|
# NOTE: it's batch-wise and slow af but I guess could
|
||||||
# - seems to be 5s updates?
|
# be good for backchecking? Seems to be every 5s maybe?
|
||||||
# - maybe we could use it for backchecking?
|
|
||||||
#
|
|
||||||
# ticker: Ticker = client.ib.reqTickByTickData(
|
# ticker: Ticker = client.ib.reqTickByTickData(
|
||||||
# contract, 'Last',
|
# contract, 'Last',
|
||||||
# )
|
# )
|
||||||
|
|
||||||
# define a very naive queue-pushing callback that relays
|
# # define a simple queue push routine that streams quote packets
|
||||||
# quote-packets directly the calling (parent) `trio.Task`.
|
# # to trio over the ``to_trio`` memory channel.
|
||||||
# Ensure on teardown we cancel the feed via their cancel API.
|
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
|
||||||
#
|
|
||||||
def teardown():
|
def teardown():
|
||||||
'''
|
|
||||||
Disconnect our `push`-er callback and cancel the data-feed
|
|
||||||
for `contract`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
nonlocal maybe_exc
|
|
||||||
ticker.updateEvent.disconnect(push)
|
ticker.updateEvent.disconnect(push)
|
||||||
report: str = f'Disconnected mkt-data for {symbol!r} due to '
|
log.error(f"Disconnected stream for `{symbol}`")
|
||||||
if maybe_exc is not None:
|
|
||||||
report += (
|
|
||||||
'error,\n'
|
|
||||||
f'{maybe_exc!r}\n'
|
|
||||||
)
|
|
||||||
log.error(report)
|
|
||||||
else:
|
|
||||||
report += (
|
|
||||||
'cancellation.\n'
|
|
||||||
)
|
|
||||||
log.cancel(report)
|
|
||||||
|
|
||||||
client.ib.cancelMktData(contract)
|
client.ib.cancelMktData(contract)
|
||||||
|
|
||||||
# decouple broadcast mem chan
|
# decouple broadcast mem chan
|
||||||
_quote_streams.pop(symbol, None)
|
_quote_streams.pop(symbol, None)
|
||||||
|
|
||||||
def push(
|
def push(t: Ticker) -> None:
|
||||||
t: Ticker,
|
"""
|
||||||
tries_before_raise: int = 6,
|
Push quotes to trio task.
|
||||||
) -> None:
|
|
||||||
'''
|
"""
|
||||||
Push quotes verbatim to parent-side `trio.Task`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
nonlocal maybe_exc, handler_tries
|
|
||||||
# log.debug(f'new IB quote: {t}\n')
|
# log.debug(f'new IB quote: {t}\n')
|
||||||
try:
|
try:
|
||||||
chan.send_nowait(t)
|
to_trio.send_nowait(t)
|
||||||
|
|
||||||
# XXX TODO XXX replicate in `tractor` tests
|
|
||||||
# as per `CancelledError`-handler notes below!
|
|
||||||
# assert 0
|
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
|
|
||||||
|
|
@ -811,40 +752,29 @@ async def _setup_quote_stream(
|
||||||
# with log msgs
|
# with log msgs
|
||||||
except trio.WouldBlock:
|
except trio.WouldBlock:
|
||||||
log.exception(
|
log.exception(
|
||||||
f'Asyncio->Trio `chan.send_nowait()` blocked !?\n'
|
f'Asyncio->Trio `to_trio.send_nowait()` blocked !?\n'
|
||||||
f'\n'
|
f'\n'
|
||||||
f'{chan._to_trio.statistics()}\n'
|
f'{to_trio.statistics()}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# ?TODO, handle re-connection attempts?
|
# ?TODO, handle re-connection attempts?
|
||||||
except BaseException as _berr:
|
except BaseException as _berr:
|
||||||
berr = _berr
|
berr = _berr
|
||||||
if handler_tries >= tries_before_raise:
|
|
||||||
# breakpoint()
|
|
||||||
maybe_exc = _berr
|
|
||||||
# task.set_exception(berr)
|
|
||||||
aio_task.cancel(msg=berr.args)
|
|
||||||
raise berr
|
|
||||||
else:
|
|
||||||
handler_tries += 1
|
|
||||||
|
|
||||||
log.exception(
|
log.exception(
|
||||||
f'Failed to push ticker quote !?\n'
|
f'Failed to push ticker quote !?\n'
|
||||||
f'handler_tries={handler_tries!r}\n'
|
f'cause: {berr}\n'
|
||||||
f'ticker: {t!r}\n'
|
|
||||||
f'\n'
|
f'\n'
|
||||||
f'{chan._to_trio.statistics()}\n'
|
f't: {t}\n'
|
||||||
f'\n'
|
f'{to_trio.statistics}\n'
|
||||||
f'CAUSE: {berr}\n'
|
|
||||||
)
|
)
|
||||||
|
# raise berr
|
||||||
|
|
||||||
|
|
||||||
ticker.updateEvent.connect(push)
|
ticker.updateEvent.connect(push)
|
||||||
try:
|
try:
|
||||||
await asyncio.sleep(float('inf'))
|
await asyncio.sleep(float('inf'))
|
||||||
|
|
||||||
# XXX, for debug.. TODO? can we rm again?
|
# XXX, just for debug..
|
||||||
#
|
|
||||||
# tractor.pause_from_sync()
|
# tractor.pause_from_sync()
|
||||||
# while True:
|
# while True:
|
||||||
# await asyncio.sleep(1.6)
|
# await asyncio.sleep(1.6)
|
||||||
|
|
@ -858,52 +788,20 @@ async def _setup_quote_stream(
|
||||||
# 'UHH no ticker.ticks ??'
|
# 'UHH no ticker.ticks ??'
|
||||||
# )
|
# )
|
||||||
|
|
||||||
# XXX TODO XXX !?!?
|
|
||||||
# apparently **without this handler** and the subsequent
|
|
||||||
# re-raising of `maybe_exc from _taskc` cancelling the
|
|
||||||
# `aio_task` from the `push()`-callback will cause a very
|
|
||||||
# strange chain of exc raising that breaks alll sorts of
|
|
||||||
# downstream callers, tasks and remote-actor tasks!?
|
|
||||||
#
|
|
||||||
# -[ ] we need some lowlevel reproducting tests to replicate
|
|
||||||
# those worst-case scenarios in `tractor` core!!
|
|
||||||
# -[ ] likely we should factor-out the `tractor.to_asyncio`
|
|
||||||
# attempts at workarounds in `.translate_aio_errors()`
|
|
||||||
# for failed `asyncio.Task.set_exception()` to either
|
|
||||||
# call `aio_task.cancel()` and/or
|
|
||||||
# `aio_task._fut_waiter.set_exception()` to a re-useable
|
|
||||||
# toolset in something like a `.to_asyncio._utils`??
|
|
||||||
#
|
|
||||||
except asyncio.CancelledError as _taskc:
|
|
||||||
if maybe_exc is not None:
|
|
||||||
raise maybe_exc from _taskc
|
|
||||||
|
|
||||||
raise _taskc
|
|
||||||
|
|
||||||
except BaseException as _berr:
|
|
||||||
# stash any crash cause for reporting in `teardown()`
|
|
||||||
maybe_exc = _berr
|
|
||||||
raise _berr
|
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# always disconnect our `push()` and cancel the
|
|
||||||
# ib-"mkt-data-feed".
|
|
||||||
teardown()
|
teardown()
|
||||||
|
|
||||||
|
# return from_aio
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_aio_quote_stream(
|
async def open_aio_quote_stream(
|
||||||
|
|
||||||
symbol: str,
|
symbol: str,
|
||||||
contract: Contract|None = None,
|
contract: Contract | None = None,
|
||||||
|
|
||||||
) -> trio.abc.ReceiveStream:
|
) -> trio.abc.ReceiveStream:
|
||||||
'''
|
|
||||||
Open a real-time `Ticker` quote stream from an `asyncio.Task`
|
|
||||||
spawned via `tractor.to_asyncio.open_channel_from()`, deliver the
|
|
||||||
inter-event-loop channel to the `trio.Task` caller and cache it
|
|
||||||
globally for re-use.
|
|
||||||
|
|
||||||
'''
|
|
||||||
from tractor.trionics import broadcast_receiver
|
from tractor.trionics import broadcast_receiver
|
||||||
global _quote_streams
|
global _quote_streams
|
||||||
|
|
||||||
|
|
@ -928,10 +826,6 @@ async def open_aio_quote_stream(
|
||||||
|
|
||||||
assert contract
|
assert contract
|
||||||
|
|
||||||
# TODO? de-reg on teardown of last consumer task?
|
|
||||||
# -> why aren't we using `.trionics.maybe_open_context()`
|
|
||||||
# here again?? (we are in `open_client_proxies()` tho?)
|
|
||||||
#
|
|
||||||
# cache feed for later consumers
|
# cache feed for later consumers
|
||||||
_quote_streams[symbol] = from_aio
|
_quote_streams[symbol] = from_aio
|
||||||
|
|
||||||
|
|
@ -946,12 +840,7 @@ def normalize(
|
||||||
calc_price: bool = False
|
calc_price: bool = False
|
||||||
|
|
||||||
) -> dict:
|
) -> dict:
|
||||||
'''
|
|
||||||
Translate `ib_async`'s `Ticker.ticks` values to a `piker`
|
|
||||||
normalized `dict` form for transmit to downstream `.data` layer
|
|
||||||
consumers.
|
|
||||||
|
|
||||||
'''
|
|
||||||
# check for special contract types
|
# check for special contract types
|
||||||
con = ticker.contract
|
con = ticker.contract
|
||||||
fqme, calc_price = con2fqme(con)
|
fqme, calc_price = con2fqme(con)
|
||||||
|
|
@ -1006,14 +895,8 @@ def normalize(
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
# ?TODO? feels like this task-fn could be factored to reduce some
|
|
||||||
# indentation levels?
|
|
||||||
# -[ ] the reconnect while loop on ib-gw "data farm connection.."s
|
|
||||||
# -[ ] everything embedded under the `async with aclosing(stream):`
|
|
||||||
# as the "meat" of the quote delivery once the connection is
|
|
||||||
# stable.
|
|
||||||
#
|
|
||||||
async def stream_quotes(
|
async def stream_quotes(
|
||||||
|
|
||||||
send_chan: trio.abc.SendChannel,
|
send_chan: trio.abc.SendChannel,
|
||||||
symbols: list[str],
|
symbols: list[str],
|
||||||
feed_is_live: trio.Event,
|
feed_is_live: trio.Event,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue